diff --git a/bin/server b/bin/server index b489fd9..34c7b0f 100755 --- a/bin/server +++ b/bin/server @@ -45,5 +45,14 @@ process.on('SIGTERM', () => { process.exit(); }); +process.on('uncaughtException', (err) => { + log.error(err); + process.exit(1); +}); + +process.on('unhandledRejection', (reason, promise) => { + log.error(reason); +}); + // vim: ft=javascript diff --git a/bookrc.js b/bookrc.js deleted file mode 100644 index 9147619..0000000 --- a/bookrc.js +++ /dev/null @@ -1,10 +0,0 @@ -/// bookrc logging setup -const log = require('book').default(); - -process.on('uncaughtException', (err) => { - log.error(err); - process.exit(1); -}); - -module.exports = log; - diff --git a/lib/ClientManager.js b/lib/ClientManager.js new file mode 100644 index 0000000..0aef2f3 --- /dev/null +++ b/lib/ClientManager.js @@ -0,0 +1,204 @@ +import Proxy from './Proxy'; + +// maybe remove? +import on_finished from 'on-finished'; +import http from 'http'; +import pump from 'pump'; + +import rand_id from './rand_id'; +import BindingAgent from './BindingAgent'; + +const NoOp = () => {}; + +// Manage sets of clients +// +// A client is a "user session" established to service a remote localtunnel client +class ClientManager { + constructor(opt) { + this.opt = opt; + + this.reqId = 0; + + // id -> client instance + this.clients = Object.create(null); + + // statistics + this.stats = { + tunnels: 0 + }; + } + + stats() { + return this.stats; + } + + // create a new tunnel with `id` + // if the id is already used, a random id is assigned + async newClient (id) { + const clients = this.clients; + const stats = this.stats; + + // can't ask for id already is use + if (clients[id]) { + id = rand_id(); + } + + const popt = { + id: id, + max_tcp_sockets: this.opt.max_tcp_sockets + }; + + const client = Proxy(popt); + + // add to clients map immediately + // avoiding races with other clients requesting same id + clients[id] = client; + + client.on('end', () => { + --stats.tunnels; + delete clients[id]; + }); + + return new Promise((resolve, reject) => { + // each local client has a tcp server to link with the remove localtunnel client + // this starts the server and waits until it is listening + client.start((err, info) => { + if (err) { + // clear the reserved client id + delete clients[id]; + reject(err); + return; + } + + ++stats.tunnels; + info.id = id; + resolve(info); + }); + }); + } + + hasClient(id) { + return this.clients[id]; + } + + // handle http request + handleRequest(clientId, req, res) { + const client = this.clients[clientId]; + if (!client) { + return; + } + + const reqId = this.reqId; + this.reqId = this.reqId + 1; + + let endRes = () => { + endRes = NoOp; + res.end(); + }; + + on_finished(res, () => { + endRes = NoOp; + }); + + client.nextSocket((clientSocket) => { + // response ended before we even got a socket to respond on + if (endRes === NoOp) { + return; + } + + // happens when client upstream is disconnected (or disconnects) + // and the proxy iterates the waiting list and clears the callbacks + // we gracefully inform the user and kill their conn + // without this, the browser will leave some connections open + // and try to use them again for new requests + // TODO(roman) we could instead have a timeout above + // if no socket becomes available within some time, + // we just tell the user no resource available to service request + if (!clientSocket) { + endRes(); + return; + } + + const agent = new BindingAgent({ + socket: clientSocket, + }); + + const opt = { + path: req.url, + agent: agent, + method: req.method, + headers: req.headers + }; + + return new Promise((resolve) => { + // what if error making this request? + const clientReq = http.request(opt, (clientRes) => { + // write response code and headers + res.writeHead(clientRes.statusCode, clientRes.headers); + + // when this pump is done, we end our response + pump(clientRes, res, (err) => { + endRes(); + resolve(); + }); + }); + + // we don't care about when this ends, only if there is error + pump(req, clientReq, (err) => { + if (err) { + endRes(); + resolve(); + } + }); + }); + }); + } + + // handle http upgrade + handleUpgrade(clientId, req, sock) { + const client = this.clients[clientId]; + if (!client) { + return; + } + + client.nextSocket(async (clientSocket) => { + if (!sock.readable || !sock.writable) { + sock.end(); + return; + } + + // happens when client upstream is disconnected (or disconnects) + // and the proxy iterates the waiting list and clears the callbacks + // we gracefully inform the user and kill their conn + // without this, the browser will leave some connections open + // and try to use them again for new requests + // TODO(roman) we could instead have a timeout above + // if no socket becomes available within some time, + // we just tell the user no resource available to service request + if (!clientSocket) { + sock.end(); + return; + } + + // websocket requests are special in that we simply re-create the header info + // then directly pipe the socket data + // avoids having to rebuild the request and handle upgrades via the http client + const arr = [`${req.method} ${req.url} HTTP/${req.httpVersion}`]; + for (let i=0 ; i < (req.rawHeaders.length-1) ; i+=2) { + arr.push(`${req.rawHeaders[i]}: ${req.rawHeaders[i+1]}`); + } + + arr.push(''); + arr.push(''); + + clientSocket.pipe(sock).pipe(clientSocket); + clientSocket.write(arr.join('\r\n')); + + await new Promise((resolve) => { + sock.once('end', resolve); + }); + }); + } +} + +export default ClientManager; diff --git a/lib/Proxy.js b/lib/Proxy.js index a791a7f..ccd0ebf 100644 --- a/lib/Proxy.js +++ b/lib/Proxy.js @@ -16,6 +16,8 @@ const Proxy = function(opt) { self.waiting = []; self.id = opt.id; + self.activeSockets = 0; + // default max is 10 self.max_tcp_sockets = opt.max_tcp_sockets || 10; @@ -72,6 +74,8 @@ Proxy.prototype._maybe_destroy = function() { const self = this; clearTimeout(self.conn_timeout); + + // After last socket is gone, we give opportunity to connect again quickly self.conn_timeout = setTimeout(function() { // sometimes the server is already closed but the event has not fired? try { @@ -81,7 +85,7 @@ Proxy.prototype._maybe_destroy = function() { catch (err) { self._cleanup(); } - }, 5000); + }, 1000); } // new socket connection from client for tunneling requests to client @@ -89,16 +93,19 @@ Proxy.prototype._handle_socket = function(socket) { const self = this; // no more socket connections allowed - if (self.sockets.length >= self.max_tcp_sockets) { + if (self.activeSockets >= self.max_tcp_sockets) { return socket.end(); } + self.activeSockets = self.activeSockets + 1; + self.debug('new connection from: %s:%s', socket.address().address, socket.address().port); // a single connection is enough to keep client id slot open clearTimeout(self.conn_timeout); socket.once('close', function(had_error) { + self.activeSockets = self.activeSockets - 1; self.debug('closed socket (error: %s)', had_error); // what if socket was servicing a request at this time? @@ -133,10 +140,10 @@ Proxy.prototype._handle_socket = function(socket) { Proxy.prototype._process_waiting = function() { const self = this; - const wait_cb = self.waiting.shift(); - if (wait_cb) { + const fn = self.waiting.shift(); + if (fn) { self.debug('handling queued request'); - self.next_socket(wait_cb); + self.nextSocket(fn); } }; @@ -152,48 +159,31 @@ Proxy.prototype._cleanup = function() { self.emit('end'); }; -Proxy.prototype.next_socket = function(handler) { +Proxy.prototype.nextSocket = async function(fn) { const self = this; // socket is a tcp connection back to the user hosting the site const sock = self.sockets.shift(); - if (!sock) { - self.debug('no more client, queue callback'); - self.waiting.push(handler); + self.debug('no more clients, queue callback'); + self.waiting.push(fn); return; } self.debug('processing request'); - handler(sock) - .then(() => { - if (!sock.destroyed) { - self.debug('retuning socket'); - self.sockets.push(sock); - } + await fn(sock); - // no sockets left to process waiting requests - if (self.sockets.length === 0) { - return; - } + if (!sock.destroyed) { + self.debug('retuning socket'); + self.sockets.push(sock); + } - self._process_waiting(); - }) - .catch((err) => { - log.error(err); + // no sockets left to process waiting requests + if (self.sockets.length === 0) { + return; + } - if (!sock.destroyed) { - self.debug('retuning socket'); - self.sockets.push(sock); - } - - // no sockets left to process waiting requests - if (self.sockets.length === 0) { - return; - } - - self._process_waiting(); - }) + self._process_waiting(); }; Proxy.prototype._done = function() { diff --git a/package.json b/package.json index 3ed63dd..0ba80a0 100644 --- a/package.json +++ b/package.json @@ -18,6 +18,7 @@ "localenv": "0.2.2", "on-finished": "2.3.0", "optimist": "0.6.1", + "pump": "2.0.0", "tldjs": "1.6.2" }, "devDependencies": { diff --git a/server.js b/server.js index 946dc8f..b67d708 100644 --- a/server.js +++ b/server.js @@ -1,233 +1,24 @@ import log from 'book'; import Koa from 'koa'; import tldjs from 'tldjs'; -import on_finished from 'on-finished'; import Debug from 'debug'; import http from 'http'; import Promise from 'bluebird'; -import Proxy from './lib/Proxy'; +import ClientManager from './lib/ClientManager'; import rand_id from './lib/rand_id'; -import BindingAgent from './lib/BindingAgent'; const debug = Debug('localtunnel:server'); -const PRODUCTION = process.env.NODE_ENV === 'production'; - -// id -> client http server -const clients = Object.create(null); - -// proxy statistics -const stats = { - tunnels: 0 -}; - -// handle proxying a request to a client -// will wait for a tunnel socket to become available -function DoBounce(req, res, sock) { - req.on('error', (err) => { - console.error('request', err); - }); - - if (res) { - res.on('error', (err) => { - console.error('response', err); - }); - } - - if (sock) { - sock.on('error', (err) => { - console.error('response', err); - }); - } - - // without a hostname, we won't know who the request is for - const hostname = req.headers.host; - if (!hostname) { - return false; - } - - const subdomain = tldjs.getSubdomain(hostname); - if (!subdomain) { - return false; - } - - const client = clients[subdomain]; - - // no such subdomain - // we use 502 error to the client to signify we can't service the request - if (!client) { - if (res) { - res.statusCode = 502; - res.end(`no active client for '${subdomain}'`); - req.connection.destroy(); - } - else if (sock) { - sock.destroy(); - } - - return true; - } - - let finished = false; - if (sock) { - sock.once('end', function() { - finished = true; - }); - } - else if (res) { - // flag if we already finished before we get a socket - // we can't respond to these requests - on_finished(res, function(err) { - finished = true; - req.connection.destroy(); - }); - } - // not something we are expecting, need a sock or a res - else { - req.connection.destroy(); - return true; - } - - // TODO add a timeout, if we run out of sockets, then just 502 - - // get client port - client.next_socket(async (socket) => { - // the request already finished or client disconnected - if (finished) { - return; - } - - // happens when client upstream is disconnected (or disconnects) - // and the proxy iterates the waiting list and clears the callbacks - // we gracefully inform the user and kill their conn - // without this, the browser will leave some connections open - // and try to use them again for new requests - // we cannot have this as we need bouncy to assign the requests again - // TODO(roman) we could instead have a timeout above - // if no socket becomes available within some time, - // we just tell the user no resource available to service request - else if (!socket) { - if (res) { - res.statusCode = 504; - res.end(); - } - - if (sock) { - sock.destroy(); - } - - req.connection.destroy(); - return; - } - - // websocket requests are special in that we simply re-create the header info - // and directly pipe the socket data - // avoids having to rebuild the request and handle upgrades via the http client - if (res === null) { - const arr = [`${req.method} ${req.url} HTTP/${req.httpVersion}`]; - for (let i=0 ; i < (req.rawHeaders.length-1) ; i+=2) { - arr.push(`${req.rawHeaders[i]}: ${req.rawHeaders[i+1]}`); - } - - arr.push(''); - arr.push(''); - - socket.pipe(sock).pipe(socket); - socket.write(arr.join('\r\n')); - - await new Promise((resolve) => { - socket.once('end', resolve); - }); - - return; - } - - // regular http request - - const agent = new BindingAgent({ - socket: socket - }); - - const opt = { - path: req.url, - agent: agent, - method: req.method, - headers: req.headers - }; - - await new Promise((resolve) => { - // what if error making this request? - const client_req = http.request(opt, function(client_res) { - // write response code and headers - res.writeHead(client_res.statusCode, client_res.headers); - - client_res.pipe(res); - on_finished(client_res, function(err) { - resolve(); - }); - }); - - // happens if the other end dies while we are making the request - // so we just end the req and move on - // we can't really do more with the response here because headers - // may already be sent - client_req.on('error', (err) => { - req.connection.destroy(); - }); - - req.pipe(client_req); - }); - }); - - return true; -} - -// create a new tunnel with `id` -// if the id is already used, a random id is assigned -const NewClient = async (id, opt) => { - // can't ask for id already is use - if (clients[id]) { - id = rand_id(); - } - - const popt = { - id: id, - max_tcp_sockets: opt.max_tcp_sockets - }; - - const client = Proxy(popt); - - // add to clients map immediately - // avoiding races with other clients requesting same id - clients[id] = client; - - client.on('end', function() { - --stats.tunnels; - delete clients[id]; - }); - - return new Promise((resolve, reject) => { - // each local client has a tcp server to link with the remove localtunnel client - // this starts the server and waits until it is listening - client.start((err, info) => { - if (err) { - // clear the reserved client id - delete clients[id]; - reject(err); - return; - } - - ++stats.tunnels; - info.id = id; - resolve(info); - }); - }); +function GetClientIdFromHostname(hostname) { + return tldjs.getSubdomain(hostname); } module.exports = function(opt) { opt = opt || {}; + const manager = new ClientManager(opt); + const schema = opt.secure ? 'https' : 'http'; const app = new Koa(); @@ -240,6 +31,8 @@ module.exports = function(opt) { return; } + const stats = manager.stats(); + ctx.body = { tunnels: stats.tunnels, mem: process.memoryUsage(), @@ -260,7 +53,7 @@ module.exports = function(opt) { if (isNewClientRequest) { const req_id = rand_id(); debug('making new client with id %s', req_id); - const info = await NewClient(req_id, opt); + const info = await manager.newClient(req_id); const url = schema + '://' + info.id + '.' + ctx.request.host; info.url = url; @@ -298,7 +91,7 @@ module.exports = function(opt) { } debug('making new client with id %s', req_id); - const info = await NewClient(req_id, opt); + const info = await manager.newClient(req_id); const url = schema + '://' + info.id + '.' + ctx.request.host; info.url = url; @@ -310,17 +103,46 @@ module.exports = function(opt) { const appCallback = app.callback(); server.on('request', (req, res) => { - if (DoBounce(req, res, null)) { + // without a hostname, we won't know who the request is for + const hostname = req.headers.host; + if (!hostname) { + res.statusCode = 400; + res.end('Host header is required'); return; } - appCallback(req, res); + const clientId = GetClientIdFromHostname(hostname); + if (!clientId) { + appCallback(req, res); + return; + } + + if (manager.hasClient(clientId)) { + manager.handleRequest(clientId, req, res); + return; + } + + res.statusCode = 404; + res.end('404'); }); server.on('upgrade', (req, socket, head) => { - if (DoBounce(req, null, socket)) { + const hostname = req.headers.host; + if (!hostname) { + sock.destroy(); return; - }; + } + + const clientId = GetClientIdFromHostname(hostname); + if (!clientId) { + sock.destroy(); + return; + } + + if (manager.hasClient(clientId)) { + manager.handleUpgrade(clientId, req, socket); + return; + } socket.destroy(); }); diff --git a/test/basic.js b/test/basic.js index ed66574..f1c829d 100644 --- a/test/basic.js +++ b/test/basic.js @@ -5,9 +5,17 @@ var localtunnel = require('localtunnel'); var localtunnel_server = require('../server')(); +process.on('uncaughtException', (err) => { + console.error(err); +}); + +process.on('unhandledRejection', (reason, promise) => { + console.error(reason); +}); + suite('basic'); -var lt_server_port +var lt_server_port; before('set up localtunnel server', function(done) { var server = localtunnel_server.listen(function() { diff --git a/yarn.lock b/yarn.lock index cb78bca..265f847 100644 --- a/yarn.lock +++ b/yarn.lock @@ -436,6 +436,12 @@ ee-first@1.1.1: version "1.1.1" resolved "https://registry.yarnpkg.com/ee-first/-/ee-first-1.1.1.tgz#590c61156b0ae2f4f0255732a158b266bc56b21d" +end-of-stream@^1.1.0: + version "1.4.0" + resolved "https://registry.yarnpkg.com/end-of-stream/-/end-of-stream-1.4.0.tgz#7a90d833efda6cfa6eac0f4949dbb0fad3a63206" + dependencies: + once "^1.4.0" + error-ex@^1.2.0: version "1.3.1" resolved "https://registry.yarnpkg.com/error-ex/-/error-ex-1.3.1.tgz#f855a86ce61adc4e8621c3cda21e7a7612c3a8dc" @@ -1087,6 +1093,12 @@ on-finished@2.3.0, on-finished@^2.1.0: dependencies: ee-first "1.1.1" +once@^1.3.1, once@^1.4.0: + version "1.4.0" + resolved "https://registry.yarnpkg.com/once/-/once-1.4.0.tgz#583b1aa775961d4b113ac17d9c50baef9dd76bd1" + dependencies: + wrappy "1" + only@0.0.2: version "0.0.2" resolved "https://registry.yarnpkg.com/only/-/only-0.0.2.tgz#2afde84d03e50b9a8edc444e30610a70295edfb4" @@ -1174,6 +1186,13 @@ process-nextick-args@~1.0.6: version "1.0.7" resolved "https://registry.yarnpkg.com/process-nextick-args/-/process-nextick-args-1.0.7.tgz#150e20b756590ad3f91093f25a4f2ad8bff30ba3" +pump@2.0.0: + version "2.0.0" + resolved "https://registry.yarnpkg.com/pump/-/pump-2.0.0.tgz#7946da1c8d622b098e2ceb2d3476582470829c9d" + dependencies: + end-of-stream "^1.1.0" + once "^1.3.1" + qs@~5.2.0: version "5.2.1" resolved "https://registry.yarnpkg.com/qs/-/qs-5.2.1.tgz#801fee030e0b9450d6385adc48a4cc55b44aedfc" @@ -1443,6 +1462,10 @@ wrap-ansi@^2.0.0: string-width "^1.0.1" strip-ansi "^3.0.1" +wrappy@1: + version "1.0.2" + resolved "https://registry.yarnpkg.com/wrappy/-/wrappy-1.0.2.tgz#b5243d8f3ec1aa35f1364605bc0d1036e30ab69f" + ws@0.8.0: version "0.8.0" resolved "https://registry.yarnpkg.com/ws/-/ws-0.8.0.tgz#ac60ebad312121d01e16cc3383d7ec67ad0f0f1f"