use pump to pipe sockets

Ensures that destination socket close or destroy also does the same for
the source socket.
This commit is contained in:
Roman Shtylman
2018-05-16 10:21:56 -04:00
parent 743895720c
commit 317db73bdc
7 changed files with 97 additions and 50 deletions

View File

@@ -1,6 +1,6 @@
import http from 'http'; import http from 'http';
import Debug from 'debug';
import TunnelAgent from './TunnelAgent'; import pump from 'pump';
// A client encapsulates req/res handling using an agent // A client encapsulates req/res handling using an agent
// //
@@ -9,9 +9,11 @@ import TunnelAgent from './TunnelAgent';
class Client { class Client {
constructor(options) { constructor(options) {
this.agent = options.agent; this.agent = options.agent;
this.debug = Debug('lt:Client');
} }
handleRequest(req, res) { handleRequest(req, res) {
this.debug('> %s', req.url);
const opt = { const opt = {
path: req.url, path: req.url,
agent: this.agent, agent: this.agent,
@@ -20,23 +22,38 @@ class Client {
}; };
const clientReq = http.request(opt, (clientRes) => { const clientReq = http.request(opt, (clientRes) => {
this.debug('< %s', req.url);
// write response code and headers // write response code and headers
res.writeHead(clientRes.statusCode, clientRes.headers); res.writeHead(clientRes.statusCode, clientRes.headers);
clientRes.pipe(res);
// using pump is deliberate - see the pump docs for why
pump(clientRes, res);
}); });
// this can happen when underlying agent produces an error // this can happen when underlying agent produces an error
// in our case we 504 gateway error this? // in our case we 504 gateway error this?
// if we have already sent headers? // if we have already sent headers?
clientReq.once('error', (err) => { clientReq.once('error', (err) => {
// TODO(roman): if headers not sent - respond with gateway unavailable
}); });
req.pipe(clientReq); // using pump is deliberate - see the pump docs for why
pump(req, clientReq);
} }
handleUpgrade(req, socket) { handleUpgrade(req, socket) {
this.debug('> [up] %s', req.url);
socket.once('error', (err) => {
// These client side errors can happen if the client dies while we are reading
// We don't need to surface these in our logs.
if (err.code == 'ECONNRESET' || err.code == 'ETIMEDOUT') {
return;
}
console.error(err);
});
this.agent.createConnection({}, (err, conn) => { this.agent.createConnection({}, (err, conn) => {
this.debug('< [up] %s', req.url);
// any errors getting a connection mean we cannot service this request // any errors getting a connection mean we cannot service this request
if (err) { if (err) {
socket.end(); socket.end();
@@ -45,6 +62,7 @@ class Client {
// socket met have disconnected while we waiting for a socket // socket met have disconnected while we waiting for a socket
if (!socket.readable || !socket.writable) { if (!socket.readable || !socket.writable) {
conn.destroy();
socket.end(); socket.end();
return; return;
} }
@@ -60,7 +78,9 @@ class Client {
arr.push(''); arr.push('');
arr.push(''); arr.push('');
conn.pipe(socket).pipe(conn); // using pump is deliberate - see the pump docs for why
pump(conn, socket);
pump(socket, conn);
conn.write(arr.join('\r\n')); conn.write(arr.join('\r\n'));
}); });
} }

View File

@@ -20,6 +20,8 @@ class ClientManager {
}; };
this.debug = Debug('lt:ClientManager'); this.debug = Debug('lt:ClientManager');
this.graceTimeout = null;
} }
// create a new tunnel with `id` // create a new tunnel with `id`
@@ -36,11 +38,13 @@ class ClientManager {
const maxSockets = this.opt.max_tcp_sockets; const maxSockets = this.opt.max_tcp_sockets;
const agent = new TunnelAgent({ const agent = new TunnelAgent({
clientId: id,
maxSockets: 10, maxSockets: 10,
}); });
agent.on('online', () => { agent.on('online', () => {
this.debug('client online %s', id); this.debug('client online %s', id);
clearTimeout(this.graceTimeout);
}); });
agent.on('offline', () => { agent.on('offline', () => {
@@ -48,7 +52,11 @@ class ClientManager {
// this period is short as the client is expected to maintain connections actively // this period is short as the client is expected to maintain connections actively
// if they client does not reconnect on a dropped connection they need to re-establish // if they client does not reconnect on a dropped connection they need to re-establish
this.debug('client offline %s', id); this.debug('client offline %s', id);
this.removeClient(id);
// client is given a grace period in which they can re-connect before they are _removed_
this.graceTimeout = setTimeout(() => {
this.removeClient(id);
}, 1000);
}); });
// TODO(roman): an agent error removes the client, the user needs to re-connect? // TODO(roman): an agent error removes the client, the user needs to re-connect?
@@ -81,6 +89,7 @@ class ClientManager {
} }
removeClient(id) { removeClient(id) {
this.debug('removing client: %s', id);
const client = this.clients[id]; const client = this.clients[id];
if (!client) { if (!client) {
return; return;

View File

@@ -46,6 +46,12 @@ describe('ClientManager', () => {
const closePromise = new Promise(resolve => socket.once('close', resolve)); const closePromise = new Promise(resolve => socket.once('close', resolve));
socket.end(); socket.end();
await closePromise; await closePromise;
// should still have client - grace period has not expired
assert(manager.hasClient('foobar'));
// wait past grace period (1s)
await new Promise(resolve => setTimeout(resolve, 1500));
assert(!manager.hasClient('foobar')); assert(!manager.hasClient('foobar'));
}); }).timeout(5000);
}); });

View File

@@ -25,10 +25,10 @@ class TunnelAgent extends Agent {
// once a socket is available it is handed out to the next callback // once a socket is available it is handed out to the next callback
this.waitingCreateConn = []; this.waitingCreateConn = [];
this.debug = Debug('lt:TunnelAgent'); this.debug = Debug(`lt:TunnelAgent[${options.clientId}]`);
// track maximum allowed sockets // track maximum allowed sockets
this.activeSockets = 0; this.connectedSockets = 0;
this.maxTcpSockets = options.maxTcpSockets || DEFAULT_MAX_SOCKETS; this.maxTcpSockets = options.maxTcpSockets || DEFAULT_MAX_SOCKETS;
// new tcp server to service requests for this client // new tcp server to service requests for this client
@@ -36,6 +36,7 @@ class TunnelAgent extends Agent {
// flag to avoid double starts // flag to avoid double starts
this.started = false; this.started = false;
this.closed = false;
} }
listen() { listen() {
@@ -48,8 +49,7 @@ class TunnelAgent extends Agent {
server.on('close', this._onClose.bind(this)); server.on('close', this._onClose.bind(this));
server.on('connection', this._onConnection.bind(this)); server.on('connection', this._onConnection.bind(this));
server.on('error', (err) => { server.on('error', (err) => {
// where do these errors come from? // These errors happen from killed connections, we don't worry about them
// other side creates a connection and then is killed?
if (err.code == 'ECONNRESET' || err.code == 'ETIMEDOUT') { if (err.code == 'ECONNRESET' || err.code == 'ETIMEDOUT') {
return; return;
} }
@@ -70,11 +70,12 @@ class TunnelAgent extends Agent {
} }
_onClose() { _onClose() {
this.closed = true;
this.debug('closed tcp socket'); this.debug('closed tcp socket');
clearTimeout(this.connTimeout); // flush any waiting connections
// we will not invoke these callbacks? for (const conn of this.waitingCreateConn) {
// TODO(roman): we could invoke these with errors...? conn(new Error('closed'), null);
// this makes downstream have to handle this }
this.waitingCreateConn = []; this.waitingCreateConn = [];
this.emit('end'); this.emit('end');
} }
@@ -82,37 +83,23 @@ class TunnelAgent extends Agent {
// new socket connection from client for tunneling requests to client // new socket connection from client for tunneling requests to client
_onConnection(socket) { _onConnection(socket) {
// no more socket connections allowed // no more socket connections allowed
if (this.activeSockets >= this.maxTcpSockets) { if (this.connectedSockets >= this.maxTcpSockets) {
this.debug('no more sockets allowed'); this.debug('no more sockets allowed');
socket.destroy(); socket.destroy();
return false; return false;
} }
// a new socket becomes available socket.once('close', (hadError) => {
if (this.activeSockets == 0) { this.debug('closed socket (error: %s)', hadError);
this.emit('online'); this.connectedSockets -= 1;
}
this.activeSockets += 1;
this.debug('new connection from: %s:%s', socket.address().address, socket.address().port);
// a single connection is enough to keep client id slot open
clearTimeout(this.connTimeout);
socket.once('close', (had_error) => {
this.debug('closed socket (error: %s)', had_error);
this.debug('removing socket');
this.activeSockets -= 1;
// remove the socket from available list // remove the socket from available list
const idx = this.availableSockets.indexOf(socket); const idx = this.availableSockets.indexOf(socket);
if (idx >= 0) { if (idx >= 0) {
this.availableSockets.splice(idx, 1); this.availableSockets.splice(idx, 1);
} }
// need to track total sockets, not just active available
this.debug('remaining client sockets: %s', this.availableSockets.length); this.debug('connected sockets: %s', this.connectedSockets);
// no more sockets for this session if (this.connectedSockets <= 0) {
// the session will become inactive if client does not reconnect
if (this.availableSockets.length <= 0) {
this.debug('all sockets disconnected'); this.debug('all sockets disconnected');
this.emit('offline'); this.emit('offline');
} }
@@ -125,28 +112,38 @@ class TunnelAgent extends Agent {
socket.destroy(); socket.destroy();
}); });
// make socket available for those waiting on sockets if (this.connectedSockets === 0) {
this.availableSockets.push(socket); this.emit('online');
}
// flush anyone waiting on sockets this.connectedSockets += 1;
this._callWaitingCreateConn(); this.debug('new connection from: %s:%s', socket.address().address, socket.address().port);
}
// invoke when a new socket is available and there may be waiting createConnection calls // if there are queued callbacks, give this socket now and don't queue into available
_callWaitingCreateConn() {
const fn = this.waitingCreateConn.shift(); const fn = this.waitingCreateConn.shift();
if (!fn) { if (fn) {
this.debug('giving socket to queued conn request');
setTimeout(() => {
fn(null, socket);
}, 0);
return; return;
} }
this.debug('handling queued request'); // make socket available for those waiting on sockets
this.createConnection({}, fn); this.availableSockets.push(socket);
} }
// fetch a socket from the available socket pool for the agent // fetch a socket from the available socket pool for the agent
// if no socket is available, queue // if no socket is available, queue
// cb(err, socket) // cb(err, socket)
createConnection(options, cb) { createConnection(options, cb) {
if (this.closed) {
cb(new Error('closed'));
return;
}
this.debug('create connection');
// socket is a tcp connection back to the user hosting the site // socket is a tcp connection back to the user hosting the site
const sock = this.availableSockets.shift(); const sock = this.availableSockets.shift();
@@ -154,7 +151,8 @@ class TunnelAgent extends Agent {
// wait until we have one // wait until we have one
if (!sock) { if (!sock) {
this.waitingCreateConn.push(cb); this.waitingCreateConn.push(cb);
this.debug('waiting'); this.debug('waiting connected: %s', this.connectedSockets);
this.debug('waiting available: %s', this.availableSockets.length);
return; return;
} }

View File

@@ -16,6 +16,7 @@
"koa": "2.5.1", "koa": "2.5.1",
"localenv": "0.2.2", "localenv": "0.2.2",
"optimist": "0.6.1", "optimist": "0.6.1",
"pump": "3.0.0",
"tldjs": "2.3.1" "tldjs": "2.3.1"
}, },
"devDependencies": { "devDependencies": {

View File

@@ -146,7 +146,7 @@ export default function(opt) {
const client = manager.getClient(clientId); const client = manager.getClient(clientId);
if (!client) { if (!client) {
sock.destroy(); socket.destroy();
return; return;
} }

View File

@@ -217,6 +217,12 @@ ee-first@1.1.1:
version "1.1.1" version "1.1.1"
resolved "https://registry.yarnpkg.com/ee-first/-/ee-first-1.1.1.tgz#590c61156b0ae2f4f0255732a158b266bc56b21d" resolved "https://registry.yarnpkg.com/ee-first/-/ee-first-1.1.1.tgz#590c61156b0ae2f4f0255732a158b266bc56b21d"
end-of-stream@^1.1.0:
version "1.4.1"
resolved "https://registry.yarnpkg.com/end-of-stream/-/end-of-stream-1.4.1.tgz#ed29634d19baba463b6ce6b80a37213eab71ec43"
dependencies:
once "^1.4.0"
error-ex@^1.2.0: error-ex@^1.2.0:
version "1.3.1" version "1.3.1"
resolved "https://registry.yarnpkg.com/error-ex/-/error-ex-1.3.1.tgz#f855a86ce61adc4e8621c3cda21e7a7612c3a8dc" resolved "https://registry.yarnpkg.com/error-ex/-/error-ex-1.3.1.tgz#f855a86ce61adc4e8621c3cda21e7a7612c3a8dc"
@@ -735,7 +741,7 @@ on-finished@^2.1.0:
dependencies: dependencies:
ee-first "1.1.1" ee-first "1.1.1"
once@^1.3.0: once@^1.3.0, once@^1.3.1, once@^1.4.0:
version "1.4.0" version "1.4.0"
resolved "https://registry.yarnpkg.com/once/-/once-1.4.0.tgz#583b1aa775961d4b113ac17d9c50baef9dd76bd1" resolved "https://registry.yarnpkg.com/once/-/once-1.4.0.tgz#583b1aa775961d4b113ac17d9c50baef9dd76bd1"
dependencies: dependencies:
@@ -802,6 +808,13 @@ process-nextick-args@~2.0.0:
version "2.0.0" version "2.0.0"
resolved "https://registry.yarnpkg.com/process-nextick-args/-/process-nextick-args-2.0.0.tgz#a37d732f4271b4ab1ad070d35508e8290788ffaa" resolved "https://registry.yarnpkg.com/process-nextick-args/-/process-nextick-args-2.0.0.tgz#a37d732f4271b4ab1ad070d35508e8290788ffaa"
pump@3.0.0:
version "3.0.0"
resolved "https://registry.yarnpkg.com/pump/-/pump-3.0.0.tgz#b4a2116815bde2f4e1ea602354e8c75565107a64"
dependencies:
end-of-stream "^1.1.0"
once "^1.3.1"
punycode@^1.4.1: punycode@^1.4.1:
version "1.4.1" version "1.4.1"
resolved "https://registry.yarnpkg.com/punycode/-/punycode-1.4.1.tgz#c0d5a63b2718800ad8e1eb0fa5269c84dd41845e" resolved "https://registry.yarnpkg.com/punycode/-/punycode-1.4.1.tgz#c0d5a63b2718800ad8e1eb0fa5269c84dd41845e"