refactor server tcp handling

- limit on number of tcp connections
- preliminary support for websockets
This commit is contained in:
Roman Shtylman
2012-10-17 22:50:59 -04:00
parent ab28444802
commit 51d91ce0e8
3 changed files with 252 additions and 169 deletions

View File

@@ -10,6 +10,9 @@ var argv = require('optimist')
default: 'http://localtunnel.me', default: 'http://localtunnel.me',
describe: 'upstream server providing forwarding' describe: 'upstream server providing forwarding'
}) })
.options('subdomain', {
describe: 'request this subdomain'
})
.describe('port', 'internal http server port') .describe('port', 'internal http server port')
.argv; .argv;
@@ -29,54 +32,71 @@ var opt = {
var base_uri = 'http://' + opt.host + ':' + opt.port + opt.path; var base_uri = 'http://' + opt.host + ':' + opt.port + opt.path;
var internal; var prev_id = argv.subdomain || '';
var upstream;
var prev_id;
(function connect_proxy() { (function connect_proxy() {
opt.uri = base_uri + ((prev_id) ? prev_id : '?new'); opt.uri = base_uri + ((prev_id) ? prev_id : '?new');
request(opt, function(err, res, body) { request(opt, function(err, res, body) {
if (err) { if (err) {
console.error('upstream not available: %s', err.message); console.error('tunnel server not available: %s, retry 1s', err.message);
return process.exit(-1);
// retry interval for id request
return setTimeout(function() {
connect_proxy();
}, 1000);
} }
// our assigned hostname and tcp port // our assigned hostname and tcp port
var port = body.port; var port = body.port;
var host = opt.host; var host = opt.host;
var max_conn = body.max_conn_count || 1;
// store the id so we can try to get the same one // store the id so we can try to get the same one
prev_id = body.id; prev_id = body.id;
console.log('your url is: %s', body.url); console.log('your url is: %s', body.url);
var count = 0;
// open 5 connections to the localtunnel server
// allows for resources to be served faster
for (var count = 0 ; count < max_conn ; ++count) {
var upstream = duplex(port, host, local_port, 'localhost');
upstream.once('end', function() {
// all upstream connections have been closed
if (--count <= 0) {
connect_proxy();
}
});
}
});
})();
function duplex(port, host, local_port, local_host) {
// connect to remote tcp server // connect to remote tcp server
upstream = net.createConnection(port, host); var upstream = net.createConnection(port, host);
var internal = net.createConnection(local_port, local_host);
// reconnect internal
connect_internal();
// when upstream connection is closed, close other associated connections
upstream.on('end', function() { upstream.on('end', function() {
console.log('> upstream connection terminated'); console.log('> upstream connection terminated');
// sever connection to internal server // sever connection to internal server
// on reconnect we will re-establish // on reconnect we will re-establish
internal.end(); internal.end();
setTimeout(function() {
connect_proxy();
}, 1000);
}); });
upstream.on('error', function(err) {
console.error(err);
}); });
})();
function connect_internal() { (function connect_internal() {
internal = net.createConnection(local_port); //internal = net.createConnection(local_port);
internal.on('error', function(err) { internal.on('error', function(err) {
console.log('error connecting to local server. retrying in 1s'); console.log('error connecting to local server. retrying in 1s');
setTimeout(function() { setTimeout(function() {
connect_internal(); connect_internal();
}, 1000); }, 1000);
@@ -91,5 +111,9 @@ function connect_internal() {
upstream.pipe(internal); upstream.pipe(internal);
internal.pipe(upstream); internal.pipe(upstream);
})();
return upstream;
} }

12
lib/rand_id.js Normal file
View File

@@ -0,0 +1,12 @@
var chars = 'abcdefghiklmnopqrstuvwxyz';
module.exports = function rand_id() {
var randomstring = '';
for (var i=0; i<4; ++i) {
var rnum = Math.floor(Math.random() * chars.length);
randomstring += chars[rnum];
}
return randomstring;
}

291
server.js
View File

@@ -3,47 +3,24 @@
var http = require('http'); var http = require('http');
var net = require('net'); var net = require('net');
var url = require('url'); var url = require('url');
var FreeList = require('freelist').FreeList;
var argv = require('optimist')
.usage('Usage: $0 --port [num]')
.options('port', {
default: '80',
describe: 'listen on this port for outside requests'
})
.argv;
if (argv.help) {
require('optimist').showHelp();
process.exit();
}
// here be dragons // here be dragons
var HTTPParser = process.binding('http_parser').HTTPParser; var HTTPParser = process.binding('http_parser').HTTPParser;
var ServerResponse = http.ServerResponse; var ServerResponse = http.ServerResponse;
var IncomingMessage = http.IncomingMessage; var IncomingMessage = http.IncomingMessage;
// vendor
var log = require('book'); var log = require('book');
var chars = 'abcdefghiklmnopqrstuvwxyz'; // local
function rand_id() { var rand_id = require('./lib/rand_id');
var randomstring = '';
for (var i=0; i<4; ++i) {
var rnum = Math.floor(Math.random() * chars.length);
randomstring += chars[rnum];
}
return randomstring;
}
var server = http.createServer(); var server = http.createServer();
// id -> client http server // id -> client http server
var clients = {}; var clients = {};
// id -> list of sockets waiting for a valid response // available parsers
var wait_list = {};
var parsers = http.parsers; var parsers = http.parsers;
// data going back to a client (the last client that made a request) // data going back to a client (the last client that made a request)
@@ -52,23 +29,26 @@ function socketOnData(d, start, end) {
var socket = this; var socket = this;
var req = this._httpMessage; var req = this._httpMessage;
var current = clients[socket.subdomain].current; var response_socket = socket.respond_socket;
if (!response_socket) {
if (!current) { log.error('no response socket assigned for http response from backend');
log.error('no current for http response from backend');
return; return;
} }
// send the goodies // pass the response from our client back to the requesting socket
current.write(d.slice(start, end)); response_socket.write(d.slice(start, end));
// invoke parsing so we know when all the goodies have been sent if (socket.for_websocket) {
var parser = current.out_parser; return;
}
// invoke parsing so we know when the response is complete
var parser = response_socket.out_parser;
parser.socket = socket; parser.socket = socket;
var ret = parser.execute(d, start, end - start); var ret = parser.execute(d, start, end - start);
if (ret instanceof Error) { if (ret instanceof Error) {
debug('parse error'); log.error(ret);
freeParser(parser, req); freeParser(parser, req);
socket.destroy(ret); socket.destroy(ret);
} }
@@ -99,83 +79,103 @@ server.on('connection', function(socket) {
var self = this; var self = this;
var for_client = false; // parser handles incoming requests for the socket
var client_id; // the request is what lets us know if we proxy or not
var request;
var parser = parsers.alloc(); var parser = parsers.alloc();
parser.socket = socket; parser.socket = socket;
parser.reinitialize(HTTPParser.REQUEST); parser.reinitialize(HTTPParser.REQUEST);
function our_request(req) {
var res = new ServerResponse(req);
res.assignSocket(socket);
self.emit('request', req, res);
return;
}
// a full request is complete // a full request is complete
// we wait for the response from the server // we wait for the response from the server
parser.onIncoming = function(req, shouldKeepAlive) { parser.onIncoming = function(req, shouldKeepAlive) {
log.trace('request', req.url); log.trace('request', req.url);
request = req;
for_client = false; // default is that the data is not for the client
delete parser.sock;
delete parser.buffer;
delete parser.client;
var hostname = req.headers.host; var hostname = req.headers.host;
if (!hostname) { if (!hostname) {
log.trace('no hostname: %j', req.headers); log.trace('no hostname: %j', req.headers);
// normal processing if not proxy return our_request(req);
var res = new ServerResponse(req);
// TODO(shtylman) skip favicon for now, it caused problems
if (req.url === '/favicon.ico') {
return;
}
res.assignSocket(parser.socket);
self.emit('request', req, res);
return;
} }
var match = hostname.match(/^([a-z]{4})[.].*/); var match = hostname.match(/^([a-z]{4})[.].*/);
if (!match) { if (!match) {
// normal processing if not proxy return our_request(req);
var res = new ServerResponse(req); }
// TODO(shtylman) skip favicon for now, it caused problems var client_id = match[1];
if (req.url === '/favicon.ico') { var client = clients[client_id];
// requesting a subdomain that doesn't exist
if (!client) {
return socket.end();
}
parser.client = client;
// assigned socket for the client
var sock = client.sockets.shift();
// no free sockets, queue
if (!sock) {
parser.buffer = true;
return; return;
} }
res.assignSocket(parser.socket); // for tcp proxying
self.emit('request', req, res); parser.sock = sock;
return;
}
client_id = match[1]; // set who we will respond back to
for_client = true; sock.respond_socket = socket;
var out_parser = parsers.alloc(); var out_parser = parsers.alloc();
out_parser.reinitialize(HTTPParser.RESPONSE); out_parser.reinitialize(HTTPParser.RESPONSE);
socket.out_parser = out_parser; socket.out_parser = out_parser;
// we have a response // we have completed a response
out_parser.onIncoming = function(res) { // the tcp socket is free again
out_parser.onIncoming = function (res) {
res.on('end', function() { res.on('end', function() {
log.trace('done with response for: %s', req.url); log.trace('done with response for: %s', req.url);
// done with the parser // done with the parser
parsers.free(out_parser); parsers.free(out_parser);
var next = wait_list[client_id].shift(); // unset the response
delete sock.respond_socket;
clients[client_id].current = next;
var next = client.waiting.shift();
if (!next) { if (!next) {
// return socket to available
client.sockets.push(sock);
return; return;
} }
// write original bytes that we held cause client was busy // reuse avail socket for next connection
clients[client_id].write(next.queue); sock.respond_socket = next;
// needed to know when this response will be done
out_parser.reinitialize(HTTPParser.RESPONSE);
next.out_parser = out_parser;
// write original bytes we held cause we were busy
sock.write(next.queue);
// continue with other bytes
next.resume(); next.resume();
return;
}); });
}; };
}; };
@@ -183,68 +183,87 @@ server.on('connection', function(socket) {
// process new data on the client socket // process new data on the client socket
// we may need to forward this it the backend // we may need to forward this it the backend
socket.ondata = function(d, start, end) { socket.ondata = function(d, start, end) {
// run through request parser to determine if we should pass to tcp
// onIncoming will be run before this returns
var ret = parser.execute(d, start, end - start); var ret = parser.execute(d, start, end - start);
// invalid request from the user // invalid request from the user
if (ret instanceof Error) { if (ret instanceof Error) {
debug('parse error'); log.error(ret);
socket.destroy(ret); socket.destroy(ret);
return; return;
} }
// only write data if previous request to this client is done? // websocket stuff
log.trace('%s %s', parser.incoming && parser.incoming.upgrade, for_client);
// what if the subdomains are treated differently
// as individual channels to the backend if available?
// how can I do that?
if (parser.incoming && parser.incoming.upgrade) { if (parser.incoming && parser.incoming.upgrade) {
// websocket shit log.trace('upgrade request');
parser.finish();
var hostname = parser.incoming.headers.host;
var match = hostname.match(/^([a-z]{4})[.].*/);
if (!match) {
return our_request(req);
} }
// wtf do you do with upgraded connections? var client_id = match[1];
// forward the data to the backend
if (for_client) {
var client = clients[client_id]; var client = clients[client_id];
// requesting a subdomain that doesn't exist var sock = client.sockets.shift();
if (!client) { sock.respond_socket = socket;
sock.for_websocket = true;
socket.ondata = function(d, start, end) {
sock.write(d.slice(start, end));
};
socket.end = function() {
log.trace('websocket end');
delete sock.respond_socket;
client.sockets.push(sock);
}
sock.write(d.slice(start, end));
return; return;
} }
// if the client is already processing something // if no available socket, buffer the request for later
// then new connections need to go into pause mode if (parser.buffer) {
// and when they are revived, then they can send data along
if (client.current && client.current !== socket) { // pause any further data on this socket
log.trace('pausing', request.url);
// prevent new data from gathering for this connection
// we are waiting for a response to a previous request
socket.pause(); socket.pause();
// copy the current data since we have already received it
var copy = Buffer(end - start); var copy = Buffer(end - start);
d.copy(copy, 0, start, end); d.copy(copy, 0, start, end);
socket.queue = copy; socket.queue = copy;
wait_list[client_id].push(socket); // add socket to queue
parser.client.waiting.push(socket);
return; return;
} }
// this socket needs to receive responses if (!parser.sock) {
client.current = socket; return;
}
// assert, respond socket should be set
// send through tcp tunnel // send through tcp tunnel
client.write(d.slice(start, end)); // responses will go back to the respond_socket
} parser.sock.write(d.slice(start, end));
}; };
socket.onend = function() { socket.onend = function() {
var ret = parser.finish(); var ret = parser.finish();
if (ret instanceof Error) { if (ret instanceof Error) {
log.trace('parse error'); log.error(ret);
socket.destroy(ret); socket.destroy(ret);
return; return;
} }
@@ -271,8 +290,12 @@ server.on('request', function(req, res) {
if (req.url === '/' && !parsed.query.new) { if (req.url === '/' && !parsed.query.new) {
res.writeHead(301, { Location: 'http://shtylman.github.com/localtunnel/' }); res.writeHead(301, { Location: 'http://shtylman.github.com/localtunnel/' });
res.end(); res.end();
return;
} }
// at this point, the client is requesting a new tunnel setup
// either generate an id or use the one they requested
var match = req.url.match(/\/([a-z]{4})?/); var match = req.url.match(/\/([a-z]{4})?/);
// user can request a particular set of characters // user can request a particular set of characters
@@ -285,17 +308,17 @@ server.on('request', function(req, res) {
} }
var id = requested_id || rand_id(); var id = requested_id || rand_id();
if (wait_list[id]) {
// new id
id = rand_id();
}
// generate new shit for client // maximum number of tcp connections the client can setup
if (wait_list[id]) { // each tcp channel allows for more parallel requests
wait_list[id].forEach(function(waiting) { var max_tcp_sockets = 4;
waiting.end();
}); // sockets is a list of available sockets for the connection
} // waiting is?
var client = clients[id] = {
sockets: [],
waiting: []
};
var client_server = net.createServer(); var client_server = net.createServer();
client_server.listen(function() { client_server.listen(function() {
@@ -305,7 +328,12 @@ server.on('request', function(req, res) {
var url = 'http://' + id + '.' + req.headers.host; var url = 'http://' + id + '.' + req.headers.host;
res.writeHead(200, { 'Content-Type': 'application/json' }); res.writeHead(200, { 'Content-Type': 'application/json' });
res.end(JSON.stringify({ url: url, id: id, port: port })); res.end(JSON.stringify({
url: url,
id: id,
port: port,
max_conn_count: max_tcp_sockets
}));
}); });
// user has 5 seconds to connect before their slot is given up // user has 5 seconds to connect before their slot is given up
@@ -313,31 +341,50 @@ server.on('request', function(req, res) {
client_server.close(); client_server.close();
}, 5000); }, 5000);
// no longer accepting connections for this id
client_server.on('close', function() {
delete clients[id];
});
var count = 0;
client_server.on('connection', function(socket) { client_server.on('connection', function(socket) {
// who the info should route back to // no more socket connections allowed
socket.subdomain = id; if (count++ >= max_tcp_sockets) {
return socket.end();
}
log.trace('new connection for id: %s', id);
// multiplexes socket data out to clients // multiplexes socket data out to clients
socket.ondata = socketOnData; socket.ondata = socketOnData;
// no need to close the client server
clearTimeout(conn_timeout); clearTimeout(conn_timeout);
log.trace('new connection for id: %s', id); // add socket to pool for this id
clients[id] = socket; var idx = client.sockets.push(socket) - 1;
wait_list[id] = [];
socket.on('end', function() { socket.on('close', function(had_error) {
count--;
client.sockets.splice(idx, 1);
// no more sockets for this ident
if (client.sockets.length === 0) {
delete clients[id]; delete clients[id];
}
});
// close will be emitted after this
socket.on('error', function(err) {
log.error(err);
}); });
}); });
client_server.on('err', function(err) { client_server.on('error', function(err) {
log.error(err); log.error(err);
}); });
}); });
server.listen(argv.port, function() { module.exports = server;
log.info('server listening on port: %d', server.address().port);
});