mirror of
https://github.com/bitinflow/server.git
synced 2026-03-13 13:35:53 +00:00
refactor to use http-raw for lt server incoming
- http raw exposes a socket to the req/res pair - cleanup client to be more resilient - add test for queued requests
This commit is contained in:
@@ -23,7 +23,9 @@ process.once('uncaughtException', function(err) {
|
|||||||
return;
|
return;
|
||||||
});
|
});
|
||||||
|
|
||||||
var server = require('../server');
|
var server = require('../server')({
|
||||||
|
max_tcp_sockets: 5
|
||||||
|
});
|
||||||
|
|
||||||
server.listen(argv.port, function() {
|
server.listen(argv.port, function() {
|
||||||
log.info('server listening on port: %d', server.address().port);
|
log.info('server listening on port: %d', server.address().port);
|
||||||
|
|||||||
133
client.js
133
client.js
@@ -30,24 +30,74 @@ var connect = function(opt) {
|
|||||||
var assigned_domain = opt.subdomain;
|
var assigned_domain = opt.subdomain;
|
||||||
|
|
||||||
// connect to upstream given connection parameters
|
// connect to upstream given connection parameters
|
||||||
var tunnel = function (remote_host, remote_port, max_conn) {
|
var tunnel = function (remote_host, remote_port) {
|
||||||
var count = 0;
|
|
||||||
|
|
||||||
// open 5 connections to the localtunnel server
|
var remote_opt = {
|
||||||
// allows for resources to be served faster
|
host: remote_host,
|
||||||
for (var count = 0 ; count < max_conn ; ++count) {
|
port: remote_port
|
||||||
var upstream = duplex(remote_host, remote_port, 'localhost', local_port);
|
};
|
||||||
upstream.once('end', function() {
|
|
||||||
// all upstream connections have been closed
|
var local_opt = {
|
||||||
if (--count <= 0) {
|
host: 'localhost',
|
||||||
tunnel(remote_host, remote_port, max_conn);
|
port: local_port
|
||||||
|
};
|
||||||
|
|
||||||
|
var remote_attempts = 0;
|
||||||
|
|
||||||
|
(function conn(conn_had_error) {
|
||||||
|
if (conn_had_error) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (++remote_attempts >= 3) {
|
||||||
|
console.error('localtunnel server offline - try again');
|
||||||
|
process.exit(-1);
|
||||||
|
}
|
||||||
|
|
||||||
|
// connection to localtunnel server
|
||||||
|
var remote = net.connect(remote_opt);
|
||||||
|
|
||||||
|
remote.once('error', function(err) {
|
||||||
|
if (err.code !== 'ECONNREFUSED') {
|
||||||
|
local.emit('error', err);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// retrying connection to local server
|
||||||
|
setTimeout(conn, 1000);
|
||||||
});
|
});
|
||||||
|
|
||||||
upstream.on('error', function(err) {
|
function recon_local() {
|
||||||
console.error(err);
|
remote.pause();
|
||||||
});
|
remote_attempts = 0;
|
||||||
}
|
|
||||||
|
// connection to local http server
|
||||||
|
var local = net.connect(local_opt);
|
||||||
|
|
||||||
|
local.once('error', function(err) {
|
||||||
|
if (err.code !== 'ECONNREFUSED') {
|
||||||
|
local.emit('error', err);
|
||||||
|
}
|
||||||
|
|
||||||
|
// retrying connection to local server
|
||||||
|
setTimeout(recon_local, 1000);
|
||||||
|
});
|
||||||
|
|
||||||
|
local.once('connect', function() {
|
||||||
|
remote.resume();
|
||||||
|
remote.pipe(local).pipe(remote, {end: false});
|
||||||
|
});
|
||||||
|
|
||||||
|
local.once('close', function(had_error) {
|
||||||
|
if (had_error) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
recon_local();
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
remote.once('close', conn);
|
||||||
|
remote.once('connect', recon_local);
|
||||||
|
})();
|
||||||
};
|
};
|
||||||
|
|
||||||
var params = {
|
var params = {
|
||||||
@@ -58,6 +108,7 @@ var connect = function(opt) {
|
|||||||
// where to quest
|
// where to quest
|
||||||
params.uri = base_uri + ((assigned_domain) ? assigned_domain : '?new');
|
params.uri = base_uri + ((assigned_domain) ? assigned_domain : '?new');
|
||||||
|
|
||||||
|
// get an id from lt server and setup forwarding tcp connections
|
||||||
request_url(params, function(err, body) {
|
request_url(params, function(err, body) {
|
||||||
|
|
||||||
if (err) {
|
if (err) {
|
||||||
@@ -76,7 +127,10 @@ var connect = function(opt) {
|
|||||||
// store the id so we can try to get the same one
|
// store the id so we can try to get the same one
|
||||||
assigned_domain = body.id;
|
assigned_domain = body.id;
|
||||||
|
|
||||||
tunnel(host, port, body.max_conn_count || 1);
|
var max_conn = body.max_conn_count || 1;
|
||||||
|
for (var count = 0 ; count < max_conn ; ++count) {
|
||||||
|
tunnel(host, port);
|
||||||
|
}
|
||||||
|
|
||||||
ev.emit('url', body.url);
|
ev.emit('url', body.url);
|
||||||
});
|
});
|
||||||
@@ -84,54 +138,5 @@ var connect = function(opt) {
|
|||||||
return ev;
|
return ev;
|
||||||
};
|
};
|
||||||
|
|
||||||
var duplex = function(remote_host, remote_port, local_host, local_port) {
|
|
||||||
var ev = new EventEmitter();
|
|
||||||
|
|
||||||
// connect to remote tcp server
|
|
||||||
var upstream = net.createConnection(remote_port, remote_host);
|
|
||||||
var internal;
|
|
||||||
|
|
||||||
// when upstream connection is closed, close other associated connections
|
|
||||||
upstream.once('end', function() {
|
|
||||||
ev.emit('error', new Error('upstream connection terminated'));
|
|
||||||
|
|
||||||
// sever connection to internal server
|
|
||||||
// on reconnect we will re-establish
|
|
||||||
internal.end();
|
|
||||||
|
|
||||||
ev.emit('end');
|
|
||||||
});
|
|
||||||
|
|
||||||
upstream.on('error', function(err) {
|
|
||||||
ev.emit('error', err);
|
|
||||||
});
|
|
||||||
|
|
||||||
(function connect_internal() {
|
|
||||||
|
|
||||||
internal = net.createConnection(local_port, local_host);
|
|
||||||
internal.on('error', function() {
|
|
||||||
ev.emit('error', new Error('error connecting to local server. retrying in 1s'));
|
|
||||||
setTimeout(function() {
|
|
||||||
connect_internal();
|
|
||||||
}, 1000);
|
|
||||||
});
|
|
||||||
|
|
||||||
internal.on('end', function() {
|
|
||||||
ev.emit('error', new Error('disconnected from local server. retrying in 1s'));
|
|
||||||
setTimeout(function() {
|
|
||||||
connect_internal();
|
|
||||||
}, 1000);
|
|
||||||
});
|
|
||||||
|
|
||||||
internal.on('connect', function() {
|
|
||||||
console.log('connected to local server');
|
|
||||||
});
|
|
||||||
|
|
||||||
upstream.pipe(internal).pipe(upstream);
|
|
||||||
})();
|
|
||||||
|
|
||||||
return ev;
|
|
||||||
}
|
|
||||||
|
|
||||||
module.exports.connect = connect;
|
module.exports.connect = connect;
|
||||||
|
|
||||||
|
|||||||
@@ -10,7 +10,8 @@
|
|||||||
"dependencies": {
|
"dependencies": {
|
||||||
"request": "2.11.4",
|
"request": "2.11.4",
|
||||||
"book": "1.2.0",
|
"book": "1.2.0",
|
||||||
"optimist": "0.3.4"
|
"optimist": "0.3.4",
|
||||||
|
"http-raw": "1.1.0"
|
||||||
},
|
},
|
||||||
"devDependencies": {
|
"devDependencies": {
|
||||||
"mocha": "1.6.0"
|
"mocha": "1.6.0"
|
||||||
|
|||||||
424
server.js
424
server.js
@@ -6,277 +6,106 @@ var url = require('url');
|
|||||||
|
|
||||||
// here be dragons
|
// here be dragons
|
||||||
var HTTPParser = process.binding('http_parser').HTTPParser;
|
var HTTPParser = process.binding('http_parser').HTTPParser;
|
||||||
var ServerResponse = http.ServerResponse;
|
|
||||||
var IncomingMessage = http.IncomingMessage;
|
|
||||||
|
|
||||||
// vendor
|
// vendor
|
||||||
var log = require('book');
|
var log = require('book');
|
||||||
|
var createRawServer = require('http-raw');
|
||||||
|
|
||||||
// local
|
// local
|
||||||
var rand_id = require('./lib/rand_id');
|
var rand_id = require('./lib/rand_id');
|
||||||
|
|
||||||
var server = http.createServer();
|
|
||||||
|
|
||||||
// id -> client http server
|
// id -> client http server
|
||||||
var clients = {};
|
var clients = {};
|
||||||
|
|
||||||
// available parsers
|
// available parsers
|
||||||
var parsers = http.parsers;
|
var parsers = http.parsers;
|
||||||
|
|
||||||
// data going back to a client (the last client that made a request)
|
// send this request to the appropriate client
|
||||||
function socketOnData(d, start, end) {
|
// in -> incoming request stream
|
||||||
|
function proxy_request(client, req, res, rs, ws) {
|
||||||
|
|
||||||
var socket = this;
|
rs = rs || req.createRawStream();
|
||||||
var req = this._httpMessage;
|
ws = ws || res.createRawStream();
|
||||||
|
|
||||||
var response_socket = socket.respond_socket;
|
// socket is a tcp connection back to the user hosting the site
|
||||||
if (!response_socket) {
|
var sock = client.sockets.shift();
|
||||||
log.error('no response socket assigned for http response from backend');
|
|
||||||
|
// queue request
|
||||||
|
if (!sock) {
|
||||||
|
log.info('no more clients, queued: %s', req.url);
|
||||||
|
rs.pause();
|
||||||
|
client.waiting.push([req, res, rs, ws]);
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
// pass the response from our client back to the requesting socket
|
log.info('handle req: %s', req.url);
|
||||||
response_socket.write(d.slice(start, end));
|
|
||||||
|
|
||||||
if (socket.for_websocket) {
|
// pipe incoming request into tcp socket
|
||||||
return;
|
// incoming request isn't allowed to end the socket back to lt client
|
||||||
}
|
rs.pipe(sock, { end: false });
|
||||||
|
|
||||||
|
sock.ws = ws;
|
||||||
|
sock.req = req;
|
||||||
|
|
||||||
|
// since tcp connection to upstream are kept open
|
||||||
// invoke parsing so we know when the response is complete
|
// invoke parsing so we know when the response is complete
|
||||||
var parser = response_socket.out_parser;
|
var parser = sock.parser;
|
||||||
parser.socket = socket;
|
parser.reinitialize(HTTPParser.RESPONSE);
|
||||||
|
parser.socket = sock;
|
||||||
|
|
||||||
var ret = parser.execute(d, start, end - start);
|
// we have completed a response
|
||||||
|
// the tcp socket is free again
|
||||||
|
parser.onIncoming = function (res) {
|
||||||
|
parser.onMessageComplete = function() {
|
||||||
|
log.info('ended response: %s', req.url);
|
||||||
|
|
||||||
|
// any request we had going on is now done
|
||||||
|
ws.end();
|
||||||
|
|
||||||
|
// no more forwarding
|
||||||
|
delete sock.ws;
|
||||||
|
delete parser.onIncoming;
|
||||||
|
|
||||||
|
// return socket to available pool
|
||||||
|
client.sockets.push(sock);
|
||||||
|
|
||||||
|
var next = client.waiting.shift();
|
||||||
|
if (next) {
|
||||||
|
log.trace('popped');
|
||||||
|
proxy_request(client, next[0], next[1], next[2], next[3]);
|
||||||
|
}
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
rs.resume();
|
||||||
|
}
|
||||||
|
|
||||||
|
function upstream_response(d, start, end) {
|
||||||
|
var socket = this;
|
||||||
|
|
||||||
|
var ws = socket.ws;
|
||||||
|
if (!ws) {
|
||||||
|
log.warn('no stream set for req:', socket.req.url);
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
ws.write(d.slice(start, end));
|
||||||
|
|
||||||
|
if (socket.upgraded) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var ret = socket.parser.execute(d, start, end - start);
|
||||||
if (ret instanceof Error) {
|
if (ret instanceof Error) {
|
||||||
log.error(ret);
|
log.error(ret);
|
||||||
freeParser(parser, req);
|
parsers.free(parser);
|
||||||
socket.destroy(ret);
|
socket.destroy(ret);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
function freeParser(parser, req) {
|
var handle_req = function (req, res) {
|
||||||
if (parser) {
|
|
||||||
parser._headers = [];
|
|
||||||
parser.onIncoming = null;
|
|
||||||
if (parser.socket) {
|
|
||||||
parser.socket.onend = null;
|
|
||||||
parser.socket.ondata = null;
|
|
||||||
parser.socket.parser = null;
|
|
||||||
}
|
|
||||||
parser.socket = null;
|
|
||||||
parser.incoming = null;
|
|
||||||
parsers.free(parser);
|
|
||||||
parser = null;
|
|
||||||
}
|
|
||||||
if (req) {
|
|
||||||
req.parser = null;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// single http connection
|
var max_tcp_sockets = req.socket.server.max_tcp_sockets;
|
||||||
// gets a single http response back
|
|
||||||
server.on('connection', function(socket) {
|
|
||||||
|
|
||||||
var self = this;
|
|
||||||
|
|
||||||
// parser handles incoming requests for the socket
|
|
||||||
// the request is what lets us know if we proxy or not
|
|
||||||
var parser = parsers.alloc();
|
|
||||||
parser.socket = socket;
|
|
||||||
parser.reinitialize(HTTPParser.REQUEST);
|
|
||||||
|
|
||||||
function our_request(req) {
|
|
||||||
var res = new ServerResponse(req);
|
|
||||||
res.assignSocket(socket);
|
|
||||||
self.emit('request', req, res);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// a full request is complete
|
|
||||||
// we wait for the response from the server
|
|
||||||
parser.onIncoming = function(req, shouldKeepAlive) {
|
|
||||||
|
|
||||||
log.trace('request', req.url);
|
|
||||||
|
|
||||||
// default is that the data is not for the client
|
|
||||||
delete parser.sock;
|
|
||||||
delete parser.buffer;
|
|
||||||
delete parser.client;
|
|
||||||
|
|
||||||
var hostname = req.headers.host;
|
|
||||||
if (!hostname) {
|
|
||||||
log.trace('no hostname: %j', req.headers);
|
|
||||||
return our_request(req);
|
|
||||||
}
|
|
||||||
|
|
||||||
var match = hostname.match(/^([a-z]{4})[.].*/);
|
|
||||||
if (!match) {
|
|
||||||
return our_request(req);
|
|
||||||
}
|
|
||||||
|
|
||||||
var client_id = match[1];
|
|
||||||
var client = clients[client_id];
|
|
||||||
|
|
||||||
// requesting a subdomain that doesn't exist
|
|
||||||
if (!client) {
|
|
||||||
return socket.end();
|
|
||||||
}
|
|
||||||
|
|
||||||
parser.client = client;
|
|
||||||
|
|
||||||
// assigned socket for the client
|
|
||||||
var sock = client.sockets.shift();
|
|
||||||
|
|
||||||
// no free sockets, queue
|
|
||||||
if (!sock) {
|
|
||||||
parser.buffer = true;
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// for tcp proxying
|
|
||||||
parser.sock = sock;
|
|
||||||
|
|
||||||
// set who we will respond back to
|
|
||||||
sock.respond_socket = socket;
|
|
||||||
|
|
||||||
var out_parser = parsers.alloc();
|
|
||||||
out_parser.reinitialize(HTTPParser.RESPONSE);
|
|
||||||
socket.out_parser = out_parser;
|
|
||||||
|
|
||||||
// we have completed a response
|
|
||||||
// the tcp socket is free again
|
|
||||||
out_parser.onIncoming = function (res) {
|
|
||||||
res.on('end', function() {
|
|
||||||
log.trace('done with response for: %s', req.url);
|
|
||||||
|
|
||||||
// done with the parser
|
|
||||||
parsers.free(out_parser);
|
|
||||||
|
|
||||||
// unset the response
|
|
||||||
delete sock.respond_socket;
|
|
||||||
|
|
||||||
var next = client.waiting.shift();
|
|
||||||
if (!next) {
|
|
||||||
// return socket to available
|
|
||||||
client.sockets.push(sock);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// reuse avail socket for next connection
|
|
||||||
sock.respond_socket = next;
|
|
||||||
|
|
||||||
// needed to know when this response will be done
|
|
||||||
out_parser.reinitialize(HTTPParser.RESPONSE);
|
|
||||||
next.out_parser = out_parser;
|
|
||||||
|
|
||||||
// write original bytes we held cause we were busy
|
|
||||||
sock.write(next.queue);
|
|
||||||
|
|
||||||
// continue with other bytes
|
|
||||||
next.resume();
|
|
||||||
|
|
||||||
return;
|
|
||||||
});
|
|
||||||
};
|
|
||||||
};
|
|
||||||
|
|
||||||
// process new data on the client socket
|
|
||||||
// we may need to forward this it the backend
|
|
||||||
socket.ondata = function(d, start, end) {
|
|
||||||
|
|
||||||
// run through request parser to determine if we should pass to tcp
|
|
||||||
// onIncoming will be run before this returns
|
|
||||||
var ret = parser.execute(d, start, end - start);
|
|
||||||
|
|
||||||
// invalid request from the user
|
|
||||||
if (ret instanceof Error) {
|
|
||||||
log.error(ret);
|
|
||||||
socket.destroy(ret);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// websocket stuff
|
|
||||||
if (parser.incoming && parser.incoming.upgrade) {
|
|
||||||
log.trace('upgrade request');
|
|
||||||
|
|
||||||
parser.finish();
|
|
||||||
|
|
||||||
var hostname = parser.incoming.headers.host;
|
|
||||||
|
|
||||||
var match = hostname.match(/^([a-z]{4})[.].*/);
|
|
||||||
if (!match) {
|
|
||||||
return our_request(req);
|
|
||||||
}
|
|
||||||
|
|
||||||
var client_id = match[1];
|
|
||||||
var client = clients[client_id];
|
|
||||||
|
|
||||||
var sock = client.sockets.shift();
|
|
||||||
sock.respond_socket = socket;
|
|
||||||
sock.for_websocket = true;
|
|
||||||
|
|
||||||
socket.ondata = function(d, start, end) {
|
|
||||||
sock.write(d.slice(start, end));
|
|
||||||
};
|
|
||||||
|
|
||||||
socket.end = function() {
|
|
||||||
log.trace('websocket end');
|
|
||||||
|
|
||||||
delete sock.respond_socket;
|
|
||||||
client.sockets.push(sock);
|
|
||||||
}
|
|
||||||
|
|
||||||
sock.write(d.slice(start, end));
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// if no available socket, buffer the request for later
|
|
||||||
if (parser.buffer) {
|
|
||||||
|
|
||||||
// pause any further data on this socket
|
|
||||||
socket.pause();
|
|
||||||
|
|
||||||
// copy the current data since we have already received it
|
|
||||||
var copy = Buffer(end - start);
|
|
||||||
d.copy(copy, 0, start, end);
|
|
||||||
socket.queue = copy;
|
|
||||||
|
|
||||||
// add socket to queue
|
|
||||||
parser.client.waiting.push(socket);
|
|
||||||
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
if (!parser.sock) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
// assert, respond socket should be set
|
|
||||||
|
|
||||||
// send through tcp tunnel
|
|
||||||
// responses will go back to the respond_socket
|
|
||||||
parser.sock.write(d.slice(start, end));
|
|
||||||
};
|
|
||||||
|
|
||||||
socket.onend = function() {
|
|
||||||
var ret = parser.finish();
|
|
||||||
|
|
||||||
if (ret instanceof Error) {
|
|
||||||
log.error(ret);
|
|
||||||
socket.destroy(ret);
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
|
|
||||||
socket.end();
|
|
||||||
};
|
|
||||||
|
|
||||||
socket.on('close', function() {
|
|
||||||
parsers.free(parser);
|
|
||||||
});
|
|
||||||
});
|
|
||||||
|
|
||||||
server.on('request', function(req, res) {
|
|
||||||
|
|
||||||
// ignore favicon
|
// ignore favicon
|
||||||
if (req.url === '/favicon.ico') {
|
if (req.url === '/favicon.ico') {
|
||||||
@@ -284,7 +113,26 @@ server.on('request', function(req, res) {
|
|||||||
return res.end();
|
return res.end();
|
||||||
}
|
}
|
||||||
|
|
||||||
var parsed = url.parse(req.url, true);
|
var hostname = req.headers.host;
|
||||||
|
if (!hostname) {
|
||||||
|
log.trace('no hostname: %j', req.headers);
|
||||||
|
return res.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
var match = hostname.match(/^([a-z]{4})[.].*/);
|
||||||
|
if (match) {
|
||||||
|
var client_id = match[1];
|
||||||
|
var client = clients[client_id];
|
||||||
|
|
||||||
|
// no such subdomain
|
||||||
|
if (!client) {
|
||||||
|
log.trace('no client found for id: ' + client_id);
|
||||||
|
res.statusCode = 404;
|
||||||
|
return res.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
return proxy_request(client, req, res);
|
||||||
|
}
|
||||||
|
|
||||||
// redirect main page to github reference
|
// redirect main page to github reference
|
||||||
if (req.url === '/' && !parsed.query.new) {
|
if (req.url === '/' && !parsed.query.new) {
|
||||||
@@ -314,10 +162,6 @@ server.on('request', function(req, res) {
|
|||||||
id = rand_id();
|
id = rand_id();
|
||||||
}
|
}
|
||||||
|
|
||||||
// maximum number of tcp connections the client can setup
|
|
||||||
// each tcp channel allows for more parallel requests
|
|
||||||
var max_tcp_sockets = 4;
|
|
||||||
|
|
||||||
// sockets is a list of available sockets for the connection
|
// sockets is a list of available sockets for the connection
|
||||||
// waiting is?
|
// waiting is?
|
||||||
var client = clients[id] = {
|
var client = clients[id] = {
|
||||||
@@ -345,9 +189,7 @@ server.on('request', function(req, res) {
|
|||||||
|
|
||||||
// user has 5 seconds to connect before their slot is given up
|
// user has 5 seconds to connect before their slot is given up
|
||||||
function maybe_tcp_close() {
|
function maybe_tcp_close() {
|
||||||
conn_timeout = setTimeout(function() {
|
conn_timeout = setTimeout(client_server.close.bind(client_server), 5000);
|
||||||
client_server.close();
|
|
||||||
}, 5000);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
maybe_tcp_close();
|
maybe_tcp_close();
|
||||||
@@ -371,12 +213,16 @@ server.on('request', function(req, res) {
|
|||||||
// no need to close the client server
|
// no need to close the client server
|
||||||
clearTimeout(conn_timeout);
|
clearTimeout(conn_timeout);
|
||||||
|
|
||||||
// multiplexes socket data out to clients
|
// allocate a response parser for the socket
|
||||||
socket.ondata = socketOnData;
|
// it only needs one since it will reuse it
|
||||||
|
socket.parser = parsers.alloc();
|
||||||
|
|
||||||
|
socket._orig_ondata = socket.ondata;
|
||||||
|
socket.ondata = upstream_response;
|
||||||
|
|
||||||
client.sockets.push(socket);
|
client.sockets.push(socket);
|
||||||
|
|
||||||
socket.on('close', function(had_error) {
|
socket.once('close', function(had_error) {
|
||||||
log.trace('client %s closed socket', id);
|
log.trace('client %s closed socket', id);
|
||||||
|
|
||||||
// remove this socket
|
// remove this socket
|
||||||
@@ -402,7 +248,75 @@ server.on('request', function(req, res) {
|
|||||||
client_server.on('error', function(err) {
|
client_server.on('error', function(err) {
|
||||||
log.error(err);
|
log.error(err);
|
||||||
});
|
});
|
||||||
});
|
};
|
||||||
|
|
||||||
module.exports = server;
|
var handle_upgrade = function(req, ws) {
|
||||||
|
|
||||||
|
if (req.headers.connection !== 'Upgrade') {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
|
var hostname = req.headers.host;
|
||||||
|
if (!hostname) {
|
||||||
|
return res.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
var match = hostname.match(/^([a-z]{4})[.].*/);
|
||||||
|
|
||||||
|
// not a valid client
|
||||||
|
if (!match) {
|
||||||
|
return res.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
var client_id = match[1];
|
||||||
|
var client = clients[client_id];
|
||||||
|
|
||||||
|
if (!client) {
|
||||||
|
// no such subdomain
|
||||||
|
return res.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
var socket = client.sockets.shift();
|
||||||
|
if (!socket) {
|
||||||
|
// no available sockets to upgrade to
|
||||||
|
return res.end();
|
||||||
|
}
|
||||||
|
|
||||||
|
var stream = req.createRawStream();
|
||||||
|
|
||||||
|
socket.ws = ws;
|
||||||
|
socket.upgraded = true;
|
||||||
|
|
||||||
|
stream.once('end', function() {
|
||||||
|
delete socket.ws;
|
||||||
|
|
||||||
|
// when this ends, we just reset the socket to the lt client
|
||||||
|
// this is easier than trying to figure anything else out
|
||||||
|
socket.end();
|
||||||
|
|
||||||
|
// put socket back into available pool
|
||||||
|
client.sockets.push(socket);
|
||||||
|
|
||||||
|
var next = client.waiting.shift();
|
||||||
|
if (next) {
|
||||||
|
log.trace('popped');
|
||||||
|
proxy_request(client, next[0], next[1], next[2], next[3]);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
stream.pipe(socket, {end: false});
|
||||||
|
socket.once('end', ws.end.bind(ws));
|
||||||
|
};
|
||||||
|
|
||||||
|
module.exports = function(opt) {
|
||||||
|
opt = opt || {};
|
||||||
|
|
||||||
|
var server = createRawServer();
|
||||||
|
|
||||||
|
server.max_tcp_sockets = opt.max_tcp_sockets || 5;
|
||||||
|
server.on('request', handle_req);
|
||||||
|
server.on('upgrade', handle_upgrade);
|
||||||
|
|
||||||
|
return server;
|
||||||
|
};
|
||||||
|
|
||||||
|
|||||||
@@ -2,8 +2,8 @@ var http = require('http');
|
|||||||
var url = require('url');
|
var url = require('url');
|
||||||
var assert = require('assert');
|
var assert = require('assert');
|
||||||
|
|
||||||
var localtunnel_server = require('./').server;
|
var localtunnel_server = require('../').server();
|
||||||
var localtunnel_client = require('./').client;
|
var localtunnel_client = require('../').client;
|
||||||
|
|
||||||
test('setup localtunnel server', function(done) {
|
test('setup localtunnel server', function(done) {
|
||||||
localtunnel_server.listen(3000, function() {
|
localtunnel_server.listen(3000, function() {
|
||||||
@@ -93,3 +93,7 @@ test('request specific domain', function(done) {
|
|||||||
});
|
});
|
||||||
});
|
});
|
||||||
|
|
||||||
|
test('shutdown', function() {
|
||||||
|
localtunnel_server.close();
|
||||||
|
});
|
||||||
|
|
||||||
105
test/queue.js
Normal file
105
test/queue.js
Normal file
@@ -0,0 +1,105 @@
|
|||||||
|
var http = require('http');
|
||||||
|
var url = require('url');
|
||||||
|
var assert = require('assert');
|
||||||
|
|
||||||
|
var localtunnel_server = require('../').server({
|
||||||
|
max_tcp_sockets: 1
|
||||||
|
});
|
||||||
|
|
||||||
|
var localtunnel_client = require('../').client;
|
||||||
|
|
||||||
|
var server;
|
||||||
|
|
||||||
|
test('setup localtunnel server', function(done) {
|
||||||
|
localtunnel_server.listen(3000, function() {
|
||||||
|
console.log('lt server on:', 3000);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('setup local http server', function(done) {
|
||||||
|
server = http.createServer();
|
||||||
|
server.on('request', function(req, res) {
|
||||||
|
// respond sometime later
|
||||||
|
setTimeout(function() {
|
||||||
|
res.setHeader('x-count', req.headers['x-count']);
|
||||||
|
res.end('foo');
|
||||||
|
}, 100);
|
||||||
|
});
|
||||||
|
|
||||||
|
server.listen(function() {
|
||||||
|
var port = server.address().port;
|
||||||
|
|
||||||
|
test._fake_port = port;
|
||||||
|
console.log('local http on:', port);
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('setup localtunnel client', function(done) {
|
||||||
|
var client = localtunnel_client.connect({
|
||||||
|
host: 'http://localhost:' + 3000,
|
||||||
|
port: test._fake_port
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on('url', function(url) {
|
||||||
|
assert.ok(/^http:\/\/.*localhost:3000$/.test(url));
|
||||||
|
test._fake_url = url;
|
||||||
|
done();
|
||||||
|
});
|
||||||
|
|
||||||
|
client.on('error', function(err) {
|
||||||
|
console.error(err);
|
||||||
|
});
|
||||||
|
});
|
||||||
|
|
||||||
|
test('query localtunnel server w/ ident', function(done) {
|
||||||
|
var uri = test._fake_url;
|
||||||
|
var hostname = url.parse(uri).hostname;
|
||||||
|
|
||||||
|
var count = 0;
|
||||||
|
var opt = {
|
||||||
|
host: 'localhost',
|
||||||
|
port: 3000,
|
||||||
|
agent: false,
|
||||||
|
headers: {
|
||||||
|
host: hostname
|
||||||
|
},
|
||||||
|
path: '/'
|
||||||
|
}
|
||||||
|
|
||||||
|
var num_requests = 2;
|
||||||
|
var responses = 0;
|
||||||
|
|
||||||
|
function maybe_done() {
|
||||||
|
if (++responses >= num_requests) {
|
||||||
|
done();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
function make_req() {
|
||||||
|
opt.headers['x-count'] = count++;
|
||||||
|
http.get(opt, function(res) {
|
||||||
|
res.setEncoding('utf8');
|
||||||
|
var body = '';
|
||||||
|
|
||||||
|
res.on('data', function(chunk) {
|
||||||
|
body += chunk;
|
||||||
|
});
|
||||||
|
|
||||||
|
res.on('end', function() {
|
||||||
|
assert.equal('foo', body);
|
||||||
|
maybe_done();
|
||||||
|
});
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
for (var i=0 ; i<num_requests ; ++i) {
|
||||||
|
make_req();
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
test('shutdown', function() {
|
||||||
|
localtunnel_server.close();
|
||||||
|
});
|
||||||
|
|
||||||
Reference in New Issue
Block a user