From 317db73bdcdcfb23f64056b64da9ba4034374775 Mon Sep 17 00:00:00 2001 From: Roman Shtylman Date: Wed, 16 May 2018 10:21:56 -0400 Subject: [PATCH] use pump to pipe sockets Ensures that destination socket close or destroy also does the same for the source socket. --- lib/Client.js | 32 +++++++++++++--- lib/ClientManager.js | 11 +++++- lib/ClientManager.test.js | 8 +++- lib/TunnelAgent.js | 78 +++++++++++++++++++-------------------- package.json | 1 + server.js | 2 +- yarn.lock | 15 +++++++- 7 files changed, 97 insertions(+), 50 deletions(-) diff --git a/lib/Client.js b/lib/Client.js index aae3df0..ccb7274 100644 --- a/lib/Client.js +++ b/lib/Client.js @@ -1,6 +1,6 @@ import http from 'http'; - -import TunnelAgent from './TunnelAgent'; +import Debug from 'debug'; +import pump from 'pump'; // A client encapsulates req/res handling using an agent // @@ -9,9 +9,11 @@ import TunnelAgent from './TunnelAgent'; class Client { constructor(options) { this.agent = options.agent; + this.debug = Debug('lt:Client'); } handleRequest(req, res) { + this.debug('> %s', req.url); const opt = { path: req.url, agent: this.agent, @@ -20,23 +22,38 @@ class Client { }; const clientReq = http.request(opt, (clientRes) => { + this.debug('< %s', req.url); // write response code and headers res.writeHead(clientRes.statusCode, clientRes.headers); - clientRes.pipe(res); + + // using pump is deliberate - see the pump docs for why + pump(clientRes, res); }); // this can happen when underlying agent produces an error // in our case we 504 gateway error this? // if we have already sent headers? clientReq.once('error', (err) => { - + // TODO(roman): if headers not sent - respond with gateway unavailable }); - req.pipe(clientReq); + // using pump is deliberate - see the pump docs for why + pump(req, clientReq); } handleUpgrade(req, socket) { + this.debug('> [up] %s', req.url); + socket.once('error', (err) => { + // These client side errors can happen if the client dies while we are reading + // We don't need to surface these in our logs. + if (err.code == 'ECONNRESET' || err.code == 'ETIMEDOUT') { + return; + } + console.error(err); + }); + this.agent.createConnection({}, (err, conn) => { + this.debug('< [up] %s', req.url); // any errors getting a connection mean we cannot service this request if (err) { socket.end(); @@ -45,6 +62,7 @@ class Client { // socket met have disconnected while we waiting for a socket if (!socket.readable || !socket.writable) { + conn.destroy(); socket.end(); return; } @@ -60,7 +78,9 @@ class Client { arr.push(''); arr.push(''); - conn.pipe(socket).pipe(conn); + // using pump is deliberate - see the pump docs for why + pump(conn, socket); + pump(socket, conn); conn.write(arr.join('\r\n')); }); } diff --git a/lib/ClientManager.js b/lib/ClientManager.js index b41270f..6279982 100644 --- a/lib/ClientManager.js +++ b/lib/ClientManager.js @@ -20,6 +20,8 @@ class ClientManager { }; this.debug = Debug('lt:ClientManager'); + + this.graceTimeout = null; } // create a new tunnel with `id` @@ -36,11 +38,13 @@ class ClientManager { const maxSockets = this.opt.max_tcp_sockets; const agent = new TunnelAgent({ + clientId: id, maxSockets: 10, }); agent.on('online', () => { this.debug('client online %s', id); + clearTimeout(this.graceTimeout); }); agent.on('offline', () => { @@ -48,7 +52,11 @@ class ClientManager { // this period is short as the client is expected to maintain connections actively // if they client does not reconnect on a dropped connection they need to re-establish this.debug('client offline %s', id); - this.removeClient(id); + + // client is given a grace period in which they can re-connect before they are _removed_ + this.graceTimeout = setTimeout(() => { + this.removeClient(id); + }, 1000); }); // TODO(roman): an agent error removes the client, the user needs to re-connect? @@ -81,6 +89,7 @@ class ClientManager { } removeClient(id) { + this.debug('removing client: %s', id); const client = this.clients[id]; if (!client) { return; diff --git a/lib/ClientManager.test.js b/lib/ClientManager.test.js index 63eb60e..3282553 100644 --- a/lib/ClientManager.test.js +++ b/lib/ClientManager.test.js @@ -46,6 +46,12 @@ describe('ClientManager', () => { const closePromise = new Promise(resolve => socket.once('close', resolve)); socket.end(); await closePromise; + + // should still have client - grace period has not expired + assert(manager.hasClient('foobar')); + + // wait past grace period (1s) + await new Promise(resolve => setTimeout(resolve, 1500)); assert(!manager.hasClient('foobar')); - }); + }).timeout(5000); }); diff --git a/lib/TunnelAgent.js b/lib/TunnelAgent.js index 215b1f8..f940aa5 100644 --- a/lib/TunnelAgent.js +++ b/lib/TunnelAgent.js @@ -25,10 +25,10 @@ class TunnelAgent extends Agent { // once a socket is available it is handed out to the next callback this.waitingCreateConn = []; - this.debug = Debug('lt:TunnelAgent'); + this.debug = Debug(`lt:TunnelAgent[${options.clientId}]`); // track maximum allowed sockets - this.activeSockets = 0; + this.connectedSockets = 0; this.maxTcpSockets = options.maxTcpSockets || DEFAULT_MAX_SOCKETS; // new tcp server to service requests for this client @@ -36,6 +36,7 @@ class TunnelAgent extends Agent { // flag to avoid double starts this.started = false; + this.closed = false; } listen() { @@ -48,8 +49,7 @@ class TunnelAgent extends Agent { server.on('close', this._onClose.bind(this)); server.on('connection', this._onConnection.bind(this)); server.on('error', (err) => { - // where do these errors come from? - // other side creates a connection and then is killed? + // These errors happen from killed connections, we don't worry about them if (err.code == 'ECONNRESET' || err.code == 'ETIMEDOUT') { return; } @@ -70,11 +70,12 @@ class TunnelAgent extends Agent { } _onClose() { + this.closed = true; this.debug('closed tcp socket'); - clearTimeout(this.connTimeout); - // we will not invoke these callbacks? - // TODO(roman): we could invoke these with errors...? - // this makes downstream have to handle this + // flush any waiting connections + for (const conn of this.waitingCreateConn) { + conn(new Error('closed'), null); + } this.waitingCreateConn = []; this.emit('end'); } @@ -82,37 +83,23 @@ class TunnelAgent extends Agent { // new socket connection from client for tunneling requests to client _onConnection(socket) { // no more socket connections allowed - if (this.activeSockets >= this.maxTcpSockets) { + if (this.connectedSockets >= this.maxTcpSockets) { this.debug('no more sockets allowed'); socket.destroy(); return false; } - // a new socket becomes available - if (this.activeSockets == 0) { - this.emit('online'); - } - - this.activeSockets += 1; - this.debug('new connection from: %s:%s', socket.address().address, socket.address().port); - - // a single connection is enough to keep client id slot open - clearTimeout(this.connTimeout); - - socket.once('close', (had_error) => { - this.debug('closed socket (error: %s)', had_error); - this.debug('removing socket'); - this.activeSockets -= 1; + socket.once('close', (hadError) => { + this.debug('closed socket (error: %s)', hadError); + this.connectedSockets -= 1; // remove the socket from available list const idx = this.availableSockets.indexOf(socket); if (idx >= 0) { this.availableSockets.splice(idx, 1); } - // need to track total sockets, not just active available - this.debug('remaining client sockets: %s', this.availableSockets.length); - // no more sockets for this session - // the session will become inactive if client does not reconnect - if (this.availableSockets.length <= 0) { + + this.debug('connected sockets: %s', this.connectedSockets); + if (this.connectedSockets <= 0) { this.debug('all sockets disconnected'); this.emit('offline'); } @@ -125,28 +112,38 @@ class TunnelAgent extends Agent { socket.destroy(); }); - // make socket available for those waiting on sockets - this.availableSockets.push(socket); + if (this.connectedSockets === 0) { + this.emit('online'); + } - // flush anyone waiting on sockets - this._callWaitingCreateConn(); - } + this.connectedSockets += 1; + this.debug('new connection from: %s:%s', socket.address().address, socket.address().port); - // invoke when a new socket is available and there may be waiting createConnection calls - _callWaitingCreateConn() { + // if there are queued callbacks, give this socket now and don't queue into available const fn = this.waitingCreateConn.shift(); - if (!fn) { + if (fn) { + this.debug('giving socket to queued conn request'); + setTimeout(() => { + fn(null, socket); + }, 0); return; } - this.debug('handling queued request'); - this.createConnection({}, fn); + // make socket available for those waiting on sockets + this.availableSockets.push(socket); } // fetch a socket from the available socket pool for the agent // if no socket is available, queue // cb(err, socket) createConnection(options, cb) { + if (this.closed) { + cb(new Error('closed')); + return; + } + + this.debug('create connection'); + // socket is a tcp connection back to the user hosting the site const sock = this.availableSockets.shift(); @@ -154,7 +151,8 @@ class TunnelAgent extends Agent { // wait until we have one if (!sock) { this.waitingCreateConn.push(cb); - this.debug('waiting'); + this.debug('waiting connected: %s', this.connectedSockets); + this.debug('waiting available: %s', this.availableSockets.length); return; } diff --git a/package.json b/package.json index 1624eef..047eb25 100644 --- a/package.json +++ b/package.json @@ -16,6 +16,7 @@ "koa": "2.5.1", "localenv": "0.2.2", "optimist": "0.6.1", + "pump": "3.0.0", "tldjs": "2.3.1" }, "devDependencies": { diff --git a/server.js b/server.js index c5c395a..fb5124b 100644 --- a/server.js +++ b/server.js @@ -146,7 +146,7 @@ export default function(opt) { const client = manager.getClient(clientId); if (!client) { - sock.destroy(); + socket.destroy(); return; } diff --git a/yarn.lock b/yarn.lock index 9c33483..eba9339 100644 --- a/yarn.lock +++ b/yarn.lock @@ -217,6 +217,12 @@ ee-first@1.1.1: version "1.1.1" resolved "https://registry.yarnpkg.com/ee-first/-/ee-first-1.1.1.tgz#590c61156b0ae2f4f0255732a158b266bc56b21d" +end-of-stream@^1.1.0: + version "1.4.1" + resolved "https://registry.yarnpkg.com/end-of-stream/-/end-of-stream-1.4.1.tgz#ed29634d19baba463b6ce6b80a37213eab71ec43" + dependencies: + once "^1.4.0" + error-ex@^1.2.0: version "1.3.1" resolved "https://registry.yarnpkg.com/error-ex/-/error-ex-1.3.1.tgz#f855a86ce61adc4e8621c3cda21e7a7612c3a8dc" @@ -735,7 +741,7 @@ on-finished@^2.1.0: dependencies: ee-first "1.1.1" -once@^1.3.0: +once@^1.3.0, once@^1.3.1, once@^1.4.0: version "1.4.0" resolved "https://registry.yarnpkg.com/once/-/once-1.4.0.tgz#583b1aa775961d4b113ac17d9c50baef9dd76bd1" dependencies: @@ -802,6 +808,13 @@ process-nextick-args@~2.0.0: version "2.0.0" resolved "https://registry.yarnpkg.com/process-nextick-args/-/process-nextick-args-2.0.0.tgz#a37d732f4271b4ab1ad070d35508e8290788ffaa" +pump@3.0.0: + version "3.0.0" + resolved "https://registry.yarnpkg.com/pump/-/pump-3.0.0.tgz#b4a2116815bde2f4e1ea602354e8c75565107a64" + dependencies: + end-of-stream "^1.1.0" + once "^1.3.1" + punycode@^1.4.1: version "1.4.1" resolved "https://registry.yarnpkg.com/punycode/-/punycode-1.4.1.tgz#c0d5a63b2718800ad8e1eb0fa5269c84dd41845e"