diff --git a/client.js b/client.js index 765e5f6..b932e35 100644 --- a/client.js +++ b/client.js @@ -10,6 +10,9 @@ var argv = require('optimist') default: 'http://localtunnel.me', describe: 'upstream server providing forwarding' }) + .options('subdomain', { + describe: 'request this subdomain' + }) .describe('port', 'internal http server port') .argv; @@ -29,67 +32,88 @@ var opt = { var base_uri = 'http://' + opt.host + ':' + opt.port + opt.path; -var internal; -var upstream; -var prev_id; +var prev_id = argv.subdomain || ''; (function connect_proxy() { opt.uri = base_uri + ((prev_id) ? prev_id : '?new'); request(opt, function(err, res, body) { if (err) { - console.error('upstream not available: %s', err.message); - return process.exit(-1); + console.error('tunnel server not available: %s, retry 1s', err.message); + + // retry interval for id request + return setTimeout(function() { + connect_proxy(); + }, 1000); } // our assigned hostname and tcp port var port = body.port; var host = opt.host; + var max_conn = body.max_conn_count || 1; // store the id so we can try to get the same one prev_id = body.id; console.log('your url is: %s', body.url); - // connect to remote tcp server - upstream = net.createConnection(port, host); + var count = 0; - // reconnect internal - connect_internal(); - - upstream.on('end', function() { - console.log('> upstream connection terminated'); - - // sever connection to internal server - // on reconnect we will re-establish - internal.end(); - - setTimeout(function() { - connect_proxy(); - }, 1000); - }); + // open 5 connections to the localtunnel server + // allows for resources to be served faster + for (var count = 0 ; count < max_conn ; ++count) { + var upstream = duplex(port, host, local_port, 'localhost'); + upstream.once('end', function() { + // all upstream connections have been closed + if (--count <= 0) { + connect_proxy(); + } + }); + } }); })(); -function connect_internal() { +function duplex(port, host, local_port, local_host) { - internal = net.createConnection(local_port); - internal.on('error', function(err) { - console.log('error connecting to local server. retrying in 1s'); + // connect to remote tcp server + var upstream = net.createConnection(port, host); + var internal = net.createConnection(local_port, local_host); - setTimeout(function() { - connect_internal(); - }, 1000); + // when upstream connection is closed, close other associated connections + upstream.on('end', function() { + console.log('> upstream connection terminated'); + + // sever connection to internal server + // on reconnect we will re-establish + internal.end(); }); - internal.on('end', function() { - console.log('disconnected from local server. retrying in 1s'); - setTimeout(function() { - connect_internal(); - }, 1000); + upstream.on('error', function(err) { + console.error(err); }); - upstream.pipe(internal); - internal.pipe(upstream); + (function connect_internal() { + + //internal = net.createConnection(local_port); + internal.on('error', function(err) { + console.log('error connecting to local server. retrying in 1s'); + setTimeout(function() { + connect_internal(); + }, 1000); + }); + + internal.on('end', function() { + console.log('disconnected from local server. retrying in 1s'); + setTimeout(function() { + connect_internal(); + }, 1000); + }); + + upstream.pipe(internal); + internal.pipe(upstream); + })(); + + return upstream; } + diff --git a/lib/rand_id.js b/lib/rand_id.js new file mode 100644 index 0000000..25f4815 --- /dev/null +++ b/lib/rand_id.js @@ -0,0 +1,12 @@ + +var chars = 'abcdefghiklmnopqrstuvwxyz'; +module.exports = function rand_id() { + var randomstring = ''; + for (var i=0; i<4; ++i) { + var rnum = Math.floor(Math.random() * chars.length); + randomstring += chars[rnum]; + } + + return randomstring; +} + diff --git a/server.js b/server.js index 9446d5e..05eb566 100644 --- a/server.js +++ b/server.js @@ -3,47 +3,24 @@ var http = require('http'); var net = require('net'); var url = require('url'); -var FreeList = require('freelist').FreeList; - -var argv = require('optimist') - .usage('Usage: $0 --port [num]') - .options('port', { - default: '80', - describe: 'listen on this port for outside requests' - }) - .argv; - -if (argv.help) { - require('optimist').showHelp(); - process.exit(); -} // here be dragons var HTTPParser = process.binding('http_parser').HTTPParser; var ServerResponse = http.ServerResponse; var IncomingMessage = http.IncomingMessage; +// vendor var log = require('book'); -var chars = 'abcdefghiklmnopqrstuvwxyz'; -function rand_id() { - var randomstring = ''; - for (var i=0; i<4; ++i) { - var rnum = Math.floor(Math.random() * chars.length); - randomstring += chars[rnum]; - } - - return randomstring; -} +// local +var rand_id = require('./lib/rand_id'); var server = http.createServer(); // id -> client http server var clients = {}; -// id -> list of sockets waiting for a valid response -var wait_list = {}; - +// available parsers var parsers = http.parsers; // data going back to a client (the last client that made a request) @@ -52,23 +29,26 @@ function socketOnData(d, start, end) { var socket = this; var req = this._httpMessage; - var current = clients[socket.subdomain].current; - - if (!current) { - log.error('no current for http response from backend'); + var response_socket = socket.respond_socket; + if (!response_socket) { + log.error('no response socket assigned for http response from backend'); return; } - // send the goodies - current.write(d.slice(start, end)); + // pass the response from our client back to the requesting socket + response_socket.write(d.slice(start, end)); - // invoke parsing so we know when all the goodies have been sent - var parser = current.out_parser; + if (socket.for_websocket) { + return; + } + + // invoke parsing so we know when the response is complete + var parser = response_socket.out_parser; parser.socket = socket; var ret = parser.execute(d, start, end - start); if (ret instanceof Error) { - debug('parse error'); + log.error(ret); freeParser(parser, req); socket.destroy(ret); } @@ -99,83 +79,103 @@ server.on('connection', function(socket) { var self = this; - var for_client = false; - var client_id; - - var request; - + // parser handles incoming requests for the socket + // the request is what lets us know if we proxy or not var parser = parsers.alloc(); parser.socket = socket; parser.reinitialize(HTTPParser.REQUEST); + function our_request(req) { + var res = new ServerResponse(req); + res.assignSocket(socket); + self.emit('request', req, res); + return; + } + // a full request is complete // we wait for the response from the server parser.onIncoming = function(req, shouldKeepAlive) { log.trace('request', req.url); - request = req; - for_client = false; + // default is that the data is not for the client + delete parser.sock; + delete parser.buffer; + delete parser.client; var hostname = req.headers.host; - if (!hostname) { log.trace('no hostname: %j', req.headers); - // normal processing if not proxy - var res = new ServerResponse(req); - - // TODO(shtylman) skip favicon for now, it caused problems - if (req.url === '/favicon.ico') { - return; - } - - res.assignSocket(parser.socket); - self.emit('request', req, res); - return; + return our_request(req); } var match = hostname.match(/^([a-z]{4})[.].*/); - if (!match) { - // normal processing if not proxy - var res = new ServerResponse(req); + return our_request(req); + } - // TODO(shtylman) skip favicon for now, it caused problems - if (req.url === '/favicon.ico') { - return; - } + var client_id = match[1]; + var client = clients[client_id]; - res.assignSocket(parser.socket); - self.emit('request', req, res); + // requesting a subdomain that doesn't exist + if (!client) { + return socket.end(); + } + + parser.client = client; + + // assigned socket for the client + var sock = client.sockets.shift(); + + // no free sockets, queue + if (!sock) { + parser.buffer = true; return; } - client_id = match[1]; - for_client = true; + // for tcp proxying + parser.sock = sock; + + // set who we will respond back to + sock.respond_socket = socket; var out_parser = parsers.alloc(); out_parser.reinitialize(HTTPParser.RESPONSE); socket.out_parser = out_parser; - // we have a response - out_parser.onIncoming = function(res) { + // we have completed a response + // the tcp socket is free again + out_parser.onIncoming = function (res) { res.on('end', function() { log.trace('done with response for: %s', req.url); // done with the parser parsers.free(out_parser); - var next = wait_list[client_id].shift(); - - clients[client_id].current = next; + // unset the response + delete sock.respond_socket; + var next = client.waiting.shift(); if (!next) { + // return socket to available + client.sockets.push(sock); return; } - // write original bytes that we held cause client was busy - clients[client_id].write(next.queue); + // reuse avail socket for next connection + sock.respond_socket = next; + + // needed to know when this response will be done + out_parser.reinitialize(HTTPParser.RESPONSE); + next.out_parser = out_parser; + + // write original bytes we held cause we were busy + sock.write(next.queue); + + // continue with other bytes next.resume(); + + return; }); }; }; @@ -183,68 +183,87 @@ server.on('connection', function(socket) { // process new data on the client socket // we may need to forward this it the backend socket.ondata = function(d, start, end) { + + // run through request parser to determine if we should pass to tcp + // onIncoming will be run before this returns var ret = parser.execute(d, start, end - start); // invalid request from the user if (ret instanceof Error) { - debug('parse error'); + log.error(ret); socket.destroy(ret); return; } - // only write data if previous request to this client is done? - log.trace('%s %s', parser.incoming && parser.incoming.upgrade, for_client); - - // what if the subdomains are treated differently - // as individual channels to the backend if available? - // how can I do that? - + // websocket stuff if (parser.incoming && parser.incoming.upgrade) { - // websocket shit - } + log.trace('upgrade request'); - // wtf do you do with upgraded connections? + parser.finish(); - // forward the data to the backend - if (for_client) { + var hostname = parser.incoming.headers.host; + + var match = hostname.match(/^([a-z]{4})[.].*/); + if (!match) { + return our_request(req); + } + + var client_id = match[1]; var client = clients[client_id]; - // requesting a subdomain that doesn't exist - if (!client) { - return; + var sock = client.sockets.shift(); + sock.respond_socket = socket; + sock.for_websocket = true; + + socket.ondata = function(d, start, end) { + sock.write(d.slice(start, end)); + }; + + socket.end = function() { + log.trace('websocket end'); + + delete sock.respond_socket; + client.sockets.push(sock); } - // if the client is already processing something - // then new connections need to go into pause mode - // and when they are revived, then they can send data along - if (client.current && client.current !== socket) { - log.trace('pausing', request.url); - // prevent new data from gathering for this connection - // we are waiting for a response to a previous request - socket.pause(); + sock.write(d.slice(start, end)); - var copy = Buffer(end - start); - d.copy(copy, 0, start, end); - socket.queue = copy; - - wait_list[client_id].push(socket); - - return; - } - - // this socket needs to receive responses - client.current = socket; - - // send through tcp tunnel - client.write(d.slice(start, end)); + return; } + + // if no available socket, buffer the request for later + if (parser.buffer) { + + // pause any further data on this socket + socket.pause(); + + // copy the current data since we have already received it + var copy = Buffer(end - start); + d.copy(copy, 0, start, end); + socket.queue = copy; + + // add socket to queue + parser.client.waiting.push(socket); + + return; + } + + if (!parser.sock) { + return; + } + + // assert, respond socket should be set + + // send through tcp tunnel + // responses will go back to the respond_socket + parser.sock.write(d.slice(start, end)); }; socket.onend = function() { var ret = parser.finish(); if (ret instanceof Error) { - log.trace('parse error'); + log.error(ret); socket.destroy(ret); return; } @@ -271,8 +290,12 @@ server.on('request', function(req, res) { if (req.url === '/' && !parsed.query.new) { res.writeHead(301, { Location: 'http://shtylman.github.com/localtunnel/' }); res.end(); + return; } + // at this point, the client is requesting a new tunnel setup + // either generate an id or use the one they requested + var match = req.url.match(/\/([a-z]{4})?/); // user can request a particular set of characters @@ -285,17 +308,17 @@ server.on('request', function(req, res) { } var id = requested_id || rand_id(); - if (wait_list[id]) { - // new id - id = rand_id(); - } - // generate new shit for client - if (wait_list[id]) { - wait_list[id].forEach(function(waiting) { - waiting.end(); - }); - } + // maximum number of tcp connections the client can setup + // each tcp channel allows for more parallel requests + var max_tcp_sockets = 4; + + // sockets is a list of available sockets for the connection + // waiting is? + var client = clients[id] = { + sockets: [], + waiting: [] + }; var client_server = net.createServer(); client_server.listen(function() { @@ -305,7 +328,12 @@ server.on('request', function(req, res) { var url = 'http://' + id + '.' + req.headers.host; res.writeHead(200, { 'Content-Type': 'application/json' }); - res.end(JSON.stringify({ url: url, id: id, port: port })); + res.end(JSON.stringify({ + url: url, + id: id, + port: port, + max_conn_count: max_tcp_sockets + })); }); // user has 5 seconds to connect before their slot is given up @@ -313,31 +341,50 @@ server.on('request', function(req, res) { client_server.close(); }, 5000); + // no longer accepting connections for this id + client_server.on('close', function() { + delete clients[id]; + }); + + var count = 0; client_server.on('connection', function(socket) { - // who the info should route back to - socket.subdomain = id; + // no more socket connections allowed + if (count++ >= max_tcp_sockets) { + return socket.end(); + } + + log.trace('new connection for id: %s', id); // multiplexes socket data out to clients socket.ondata = socketOnData; + // no need to close the client server clearTimeout(conn_timeout); - log.trace('new connection for id: %s', id); - clients[id] = socket; - wait_list[id] = []; + // add socket to pool for this id + var idx = client.sockets.push(socket) - 1; - socket.on('end', function() { - delete clients[id]; + socket.on('close', function(had_error) { + count--; + client.sockets.splice(idx, 1); + + // no more sockets for this ident + if (client.sockets.length === 0) { + delete clients[id]; + } + }); + + // close will be emitted after this + socket.on('error', function(err) { + log.error(err); }); }); - client_server.on('err', function(err) { + client_server.on('error', function(err) { log.error(err); }); }); -server.listen(argv.port, function() { - log.info('server listening on port: %d', server.address().port); -}); +module.exports = server;