diff --git a/bin/server b/bin/server index dfd5d5a..90c72b1 100755 --- a/bin/server +++ b/bin/server @@ -23,7 +23,9 @@ process.once('uncaughtException', function(err) { return; }); -var server = require('../server'); +var server = require('../server')({ + max_tcp_sockets: 5 +}); server.listen(argv.port, function() { log.info('server listening on port: %d', server.address().port); diff --git a/client.js b/client.js index e71195c..8f8a5bc 100644 --- a/client.js +++ b/client.js @@ -30,24 +30,74 @@ var connect = function(opt) { var assigned_domain = opt.subdomain; // connect to upstream given connection parameters - var tunnel = function (remote_host, remote_port, max_conn) { - var count = 0; + var tunnel = function (remote_host, remote_port) { - // open 5 connections to the localtunnel server - // allows for resources to be served faster - for (var count = 0 ; count < max_conn ; ++count) { - var upstream = duplex(remote_host, remote_port, 'localhost', local_port); - upstream.once('end', function() { - // all upstream connections have been closed - if (--count <= 0) { - tunnel(remote_host, remote_port, max_conn); + var remote_opt = { + host: remote_host, + port: remote_port + }; + + var local_opt = { + host: 'localhost', + port: local_port + }; + + var remote_attempts = 0; + + (function conn(conn_had_error) { + if (conn_had_error) { + return; + } + + if (++remote_attempts >= 3) { + console.error('localtunnel server offline - try again'); + process.exit(-1); + } + + // connection to localtunnel server + var remote = net.connect(remote_opt); + + remote.once('error', function(err) { + if (err.code !== 'ECONNREFUSED') { + local.emit('error', err); } + + // retrying connection to local server + setTimeout(conn, 1000); }); - upstream.on('error', function(err) { - console.error(err); - }); - } + function recon_local() { + remote.pause(); + remote_attempts = 0; + + // connection to local http server + var local = net.connect(local_opt); + + local.once('error', function(err) { + if (err.code !== 'ECONNREFUSED') { + local.emit('error', err); + } + + // retrying connection to local server + setTimeout(recon_local, 1000); + }); + + local.once('connect', function() { + remote.resume(); + remote.pipe(local).pipe(remote, {end: false}); + }); + + local.once('close', function(had_error) { + if (had_error) { + return; + } + recon_local(); + }); + } + + remote.once('close', conn); + remote.once('connect', recon_local); + })(); }; var params = { @@ -58,6 +108,7 @@ var connect = function(opt) { // where to quest params.uri = base_uri + ((assigned_domain) ? assigned_domain : '?new'); + // get an id from lt server and setup forwarding tcp connections request_url(params, function(err, body) { if (err) { @@ -76,7 +127,10 @@ var connect = function(opt) { // store the id so we can try to get the same one assigned_domain = body.id; - tunnel(host, port, body.max_conn_count || 1); + var max_conn = body.max_conn_count || 1; + for (var count = 0 ; count < max_conn ; ++count) { + tunnel(host, port); + } ev.emit('url', body.url); }); @@ -84,54 +138,5 @@ var connect = function(opt) { return ev; }; -var duplex = function(remote_host, remote_port, local_host, local_port) { - var ev = new EventEmitter(); - - // connect to remote tcp server - var upstream = net.createConnection(remote_port, remote_host); - var internal; - - // when upstream connection is closed, close other associated connections - upstream.once('end', function() { - ev.emit('error', new Error('upstream connection terminated')); - - // sever connection to internal server - // on reconnect we will re-establish - internal.end(); - - ev.emit('end'); - }); - - upstream.on('error', function(err) { - ev.emit('error', err); - }); - - (function connect_internal() { - - internal = net.createConnection(local_port, local_host); - internal.on('error', function() { - ev.emit('error', new Error('error connecting to local server. retrying in 1s')); - setTimeout(function() { - connect_internal(); - }, 1000); - }); - - internal.on('end', function() { - ev.emit('error', new Error('disconnected from local server. retrying in 1s')); - setTimeout(function() { - connect_internal(); - }, 1000); - }); - - internal.on('connect', function() { - console.log('connected to local server'); - }); - - upstream.pipe(internal).pipe(upstream); - })(); - - return ev; -} - module.exports.connect = connect; diff --git a/package.json b/package.json index 0a16717..9509443 100644 --- a/package.json +++ b/package.json @@ -10,7 +10,8 @@ "dependencies": { "request": "2.11.4", "book": "1.2.0", - "optimist": "0.3.4" + "optimist": "0.3.4", + "http-raw": "1.1.0" }, "devDependencies": { "mocha": "1.6.0" diff --git a/server.js b/server.js index e294b59..3ed5e36 100644 --- a/server.js +++ b/server.js @@ -6,277 +6,106 @@ var url = require('url'); // here be dragons var HTTPParser = process.binding('http_parser').HTTPParser; -var ServerResponse = http.ServerResponse; -var IncomingMessage = http.IncomingMessage; // vendor var log = require('book'); +var createRawServer = require('http-raw'); // local var rand_id = require('./lib/rand_id'); -var server = http.createServer(); - // id -> client http server var clients = {}; // available parsers var parsers = http.parsers; -// data going back to a client (the last client that made a request) -function socketOnData(d, start, end) { +// send this request to the appropriate client +// in -> incoming request stream +function proxy_request(client, req, res, rs, ws) { - var socket = this; - var req = this._httpMessage; + rs = rs || req.createRawStream(); + ws = ws || res.createRawStream(); - var response_socket = socket.respond_socket; - if (!response_socket) { - log.error('no response socket assigned for http response from backend'); + // socket is a tcp connection back to the user hosting the site + var sock = client.sockets.shift(); + + // queue request + if (!sock) { + log.info('no more clients, queued: %s', req.url); + rs.pause(); + client.waiting.push([req, res, rs, ws]); return; } - // pass the response from our client back to the requesting socket - response_socket.write(d.slice(start, end)); + log.info('handle req: %s', req.url); - if (socket.for_websocket) { - return; - } + // pipe incoming request into tcp socket + // incoming request isn't allowed to end the socket back to lt client + rs.pipe(sock, { end: false }); + sock.ws = ws; + sock.req = req; + + // since tcp connection to upstream are kept open // invoke parsing so we know when the response is complete - var parser = response_socket.out_parser; - parser.socket = socket; + var parser = sock.parser; + parser.reinitialize(HTTPParser.RESPONSE); + parser.socket = sock; - var ret = parser.execute(d, start, end - start); + // we have completed a response + // the tcp socket is free again + parser.onIncoming = function (res) { + parser.onMessageComplete = function() { + log.info('ended response: %s', req.url); + + // any request we had going on is now done + ws.end(); + + // no more forwarding + delete sock.ws; + delete parser.onIncoming; + + // return socket to available pool + client.sockets.push(sock); + + var next = client.waiting.shift(); + if (next) { + log.trace('popped'); + proxy_request(client, next[0], next[1], next[2], next[3]); + } + }; + }; + + rs.resume(); +} + +function upstream_response(d, start, end) { + var socket = this; + + var ws = socket.ws; + if (!ws) { + log.warn('no stream set for req:', socket.req.url); + return; + } + + ws.write(d.slice(start, end)); + + if (socket.upgraded) { + return; + } + + var ret = socket.parser.execute(d, start, end - start); if (ret instanceof Error) { log.error(ret); - freeParser(parser, req); + parsers.free(parser); socket.destroy(ret); } } -function freeParser(parser, req) { - if (parser) { - parser._headers = []; - parser.onIncoming = null; - if (parser.socket) { - parser.socket.onend = null; - parser.socket.ondata = null; - parser.socket.parser = null; - } - parser.socket = null; - parser.incoming = null; - parsers.free(parser); - parser = null; - } - if (req) { - req.parser = null; - } -} +var handle_req = function (req, res) { -// single http connection -// gets a single http response back -server.on('connection', function(socket) { - - var self = this; - - // parser handles incoming requests for the socket - // the request is what lets us know if we proxy or not - var parser = parsers.alloc(); - parser.socket = socket; - parser.reinitialize(HTTPParser.REQUEST); - - function our_request(req) { - var res = new ServerResponse(req); - res.assignSocket(socket); - self.emit('request', req, res); - return; - } - - // a full request is complete - // we wait for the response from the server - parser.onIncoming = function(req, shouldKeepAlive) { - - log.trace('request', req.url); - - // default is that the data is not for the client - delete parser.sock; - delete parser.buffer; - delete parser.client; - - var hostname = req.headers.host; - if (!hostname) { - log.trace('no hostname: %j', req.headers); - return our_request(req); - } - - var match = hostname.match(/^([a-z]{4})[.].*/); - if (!match) { - return our_request(req); - } - - var client_id = match[1]; - var client = clients[client_id]; - - // requesting a subdomain that doesn't exist - if (!client) { - return socket.end(); - } - - parser.client = client; - - // assigned socket for the client - var sock = client.sockets.shift(); - - // no free sockets, queue - if (!sock) { - parser.buffer = true; - return; - } - - // for tcp proxying - parser.sock = sock; - - // set who we will respond back to - sock.respond_socket = socket; - - var out_parser = parsers.alloc(); - out_parser.reinitialize(HTTPParser.RESPONSE); - socket.out_parser = out_parser; - - // we have completed a response - // the tcp socket is free again - out_parser.onIncoming = function (res) { - res.on('end', function() { - log.trace('done with response for: %s', req.url); - - // done with the parser - parsers.free(out_parser); - - // unset the response - delete sock.respond_socket; - - var next = client.waiting.shift(); - if (!next) { - // return socket to available - client.sockets.push(sock); - return; - } - - // reuse avail socket for next connection - sock.respond_socket = next; - - // needed to know when this response will be done - out_parser.reinitialize(HTTPParser.RESPONSE); - next.out_parser = out_parser; - - // write original bytes we held cause we were busy - sock.write(next.queue); - - // continue with other bytes - next.resume(); - - return; - }); - }; - }; - - // process new data on the client socket - // we may need to forward this it the backend - socket.ondata = function(d, start, end) { - - // run through request parser to determine if we should pass to tcp - // onIncoming will be run before this returns - var ret = parser.execute(d, start, end - start); - - // invalid request from the user - if (ret instanceof Error) { - log.error(ret); - socket.destroy(ret); - return; - } - - // websocket stuff - if (parser.incoming && parser.incoming.upgrade) { - log.trace('upgrade request'); - - parser.finish(); - - var hostname = parser.incoming.headers.host; - - var match = hostname.match(/^([a-z]{4})[.].*/); - if (!match) { - return our_request(req); - } - - var client_id = match[1]; - var client = clients[client_id]; - - var sock = client.sockets.shift(); - sock.respond_socket = socket; - sock.for_websocket = true; - - socket.ondata = function(d, start, end) { - sock.write(d.slice(start, end)); - }; - - socket.end = function() { - log.trace('websocket end'); - - delete sock.respond_socket; - client.sockets.push(sock); - } - - sock.write(d.slice(start, end)); - - return; - } - - // if no available socket, buffer the request for later - if (parser.buffer) { - - // pause any further data on this socket - socket.pause(); - - // copy the current data since we have already received it - var copy = Buffer(end - start); - d.copy(copy, 0, start, end); - socket.queue = copy; - - // add socket to queue - parser.client.waiting.push(socket); - - return; - } - - if (!parser.sock) { - return; - } - - // assert, respond socket should be set - - // send through tcp tunnel - // responses will go back to the respond_socket - parser.sock.write(d.slice(start, end)); - }; - - socket.onend = function() { - var ret = parser.finish(); - - if (ret instanceof Error) { - log.error(ret); - socket.destroy(ret); - return; - } - - socket.end(); - }; - - socket.on('close', function() { - parsers.free(parser); - }); -}); - -server.on('request', function(req, res) { + var max_tcp_sockets = req.socket.server.max_tcp_sockets; // ignore favicon if (req.url === '/favicon.ico') { @@ -284,7 +113,26 @@ server.on('request', function(req, res) { return res.end(); } - var parsed = url.parse(req.url, true); + var hostname = req.headers.host; + if (!hostname) { + log.trace('no hostname: %j', req.headers); + return res.end(); + } + + var match = hostname.match(/^([a-z]{4})[.].*/); + if (match) { + var client_id = match[1]; + var client = clients[client_id]; + + // no such subdomain + if (!client) { + log.trace('no client found for id: ' + client_id); + res.statusCode = 404; + return res.end(); + } + + return proxy_request(client, req, res); + } // redirect main page to github reference if (req.url === '/' && !parsed.query.new) { @@ -314,10 +162,6 @@ server.on('request', function(req, res) { id = rand_id(); } - // maximum number of tcp connections the client can setup - // each tcp channel allows for more parallel requests - var max_tcp_sockets = 4; - // sockets is a list of available sockets for the connection // waiting is? var client = clients[id] = { @@ -345,9 +189,7 @@ server.on('request', function(req, res) { // user has 5 seconds to connect before their slot is given up function maybe_tcp_close() { - conn_timeout = setTimeout(function() { - client_server.close(); - }, 5000); + conn_timeout = setTimeout(client_server.close.bind(client_server), 5000); } maybe_tcp_close(); @@ -371,12 +213,16 @@ server.on('request', function(req, res) { // no need to close the client server clearTimeout(conn_timeout); - // multiplexes socket data out to clients - socket.ondata = socketOnData; + // allocate a response parser for the socket + // it only needs one since it will reuse it + socket.parser = parsers.alloc(); + + socket._orig_ondata = socket.ondata; + socket.ondata = upstream_response; client.sockets.push(socket); - socket.on('close', function(had_error) { + socket.once('close', function(had_error) { log.trace('client %s closed socket', id); // remove this socket @@ -402,7 +248,75 @@ server.on('request', function(req, res) { client_server.on('error', function(err) { log.error(err); }); -}); +}; -module.exports = server; +var handle_upgrade = function(req, ws) { + + if (req.headers.connection !== 'Upgrade') { + return; + } + + var hostname = req.headers.host; + if (!hostname) { + return res.end(); + } + + var match = hostname.match(/^([a-z]{4})[.].*/); + + // not a valid client + if (!match) { + return res.end(); + } + + var client_id = match[1]; + var client = clients[client_id]; + + if (!client) { + // no such subdomain + return res.end(); + } + + var socket = client.sockets.shift(); + if (!socket) { + // no available sockets to upgrade to + return res.end(); + } + + var stream = req.createRawStream(); + + socket.ws = ws; + socket.upgraded = true; + + stream.once('end', function() { + delete socket.ws; + + // when this ends, we just reset the socket to the lt client + // this is easier than trying to figure anything else out + socket.end(); + + // put socket back into available pool + client.sockets.push(socket); + + var next = client.waiting.shift(); + if (next) { + log.trace('popped'); + proxy_request(client, next[0], next[1], next[2], next[3]); + } + }); + + stream.pipe(socket, {end: false}); + socket.once('end', ws.end.bind(ws)); +}; + +module.exports = function(opt) { + opt = opt || {}; + + var server = createRawServer(); + + server.max_tcp_sockets = opt.max_tcp_sockets || 5; + server.on('request', handle_req); + server.on('upgrade', handle_upgrade); + + return server; +}; diff --git a/test.js b/test/basic.js similarity index 92% rename from test.js rename to test/basic.js index 1d3f771..34434e3 100644 --- a/test.js +++ b/test/basic.js @@ -2,8 +2,8 @@ var http = require('http'); var url = require('url'); var assert = require('assert'); -var localtunnel_server = require('./').server; -var localtunnel_client = require('./').client; +var localtunnel_server = require('../').server(); +var localtunnel_client = require('../').client; test('setup localtunnel server', function(done) { localtunnel_server.listen(3000, function() { @@ -93,3 +93,7 @@ test('request specific domain', function(done) { }); }); +test('shutdown', function() { + localtunnel_server.close(); +}); + diff --git a/test/queue.js b/test/queue.js new file mode 100644 index 0000000..c6268b1 --- /dev/null +++ b/test/queue.js @@ -0,0 +1,105 @@ +var http = require('http'); +var url = require('url'); +var assert = require('assert'); + +var localtunnel_server = require('../').server({ + max_tcp_sockets: 1 +}); + +var localtunnel_client = require('../').client; + +var server; + +test('setup localtunnel server', function(done) { + localtunnel_server.listen(3000, function() { + console.log('lt server on:', 3000); + done(); + }); +}); + +test('setup local http server', function(done) { + server = http.createServer(); + server.on('request', function(req, res) { + // respond sometime later + setTimeout(function() { + res.setHeader('x-count', req.headers['x-count']); + res.end('foo'); + }, 100); + }); + + server.listen(function() { + var port = server.address().port; + + test._fake_port = port; + console.log('local http on:', port); + done(); + }); +}); + +test('setup localtunnel client', function(done) { + var client = localtunnel_client.connect({ + host: 'http://localhost:' + 3000, + port: test._fake_port + }); + + client.on('url', function(url) { + assert.ok(/^http:\/\/.*localhost:3000$/.test(url)); + test._fake_url = url; + done(); + }); + + client.on('error', function(err) { + console.error(err); + }); +}); + +test('query localtunnel server w/ ident', function(done) { + var uri = test._fake_url; + var hostname = url.parse(uri).hostname; + + var count = 0; + var opt = { + host: 'localhost', + port: 3000, + agent: false, + headers: { + host: hostname + }, + path: '/' + } + + var num_requests = 2; + var responses = 0; + + function maybe_done() { + if (++responses >= num_requests) { + done(); + } + } + + function make_req() { + opt.headers['x-count'] = count++; + http.get(opt, function(res) { + res.setEncoding('utf8'); + var body = ''; + + res.on('data', function(chunk) { + body += chunk; + }); + + res.on('end', function() { + assert.equal('foo', body); + maybe_done(); + }); + }); + } + + for (var i=0 ; i