Files
server/lib/Client.js
2018-05-16 17:07:58 -04:00

132 lines
4.3 KiB
JavaScript

import http from 'http';
import Debug from 'debug';
import pump from 'pump';
import EventEmitter from 'events';
// A client encapsulates req/res handling using an agent
//
// If an agent is destroyed, the request handling will error
// The caller is responsible for handling a failed request
class Client extends EventEmitter {
constructor(options) {
super();
const agent = this.agent = options.agent;
const id = this.id = options.id;
this.debug = Debug(`lt:Client[${this.id}]`);
// client is given a grace period in which they can connect before they are _removed_
this.graceTimeout = setTimeout(() => {
this.close();
}, 1000).unref();
agent.on('online', () => {
this.debug('client online %s', id);
clearTimeout(this.graceTimeout);
});
agent.on('offline', () => {
this.debug('client offline %s', id);
// if there was a previous timeout set, we don't want to double trigger
clearTimeout(this.graceTimeout);
// client is given a grace period in which they can re-connect before they are _removed_
this.graceTimeout = setTimeout(() => {
this.close();
}, 1000).unref();
});
// TODO(roman): an agent error removes the client, the user needs to re-connect?
// how does a user realize they need to re-connect vs some random client being assigned same port?
agent.once('error', (err) => {
this.close();
});
}
stats() {
return this.agent.stats();
}
close() {
clearTimeout(this.graceTimeout);
this.agent.destroy();
this.emit('close');
}
handleRequest(req, res) {
this.debug('> %s', req.url);
const opt = {
path: req.url,
agent: this.agent,
method: req.method,
headers: req.headers
};
const clientReq = http.request(opt, (clientRes) => {
this.debug('< %s', req.url);
// write response code and headers
res.writeHead(clientRes.statusCode, clientRes.headers);
// using pump is deliberate - see the pump docs for why
pump(clientRes, res);
});
// this can happen when underlying agent produces an error
// in our case we 504 gateway error this?
// if we have already sent headers?
clientReq.once('error', (err) => {
// TODO(roman): if headers not sent - respond with gateway unavailable
});
// using pump is deliberate - see the pump docs for why
pump(req, clientReq);
}
handleUpgrade(req, socket) {
this.debug('> [up] %s', req.url);
socket.once('error', (err) => {
// These client side errors can happen if the client dies while we are reading
// We don't need to surface these in our logs.
if (err.code == 'ECONNRESET' || err.code == 'ETIMEDOUT') {
return;
}
console.error(err);
});
this.agent.createConnection({}, (err, conn) => {
this.debug('< [up] %s', req.url);
// any errors getting a connection mean we cannot service this request
if (err) {
socket.end();
return;
}
// socket met have disconnected while we waiting for a socket
if (!socket.readable || !socket.writable) {
conn.destroy();
socket.end();
return;
}
// websocket requests are special in that we simply re-create the header info
// then directly pipe the socket data
// avoids having to rebuild the request and handle upgrades via the http client
const arr = [`${req.method} ${req.url} HTTP/${req.httpVersion}`];
for (let i=0 ; i < (req.rawHeaders.length-1) ; i+=2) {
arr.push(`${req.rawHeaders[i]}: ${req.rawHeaders[i+1]}`);
}
arr.push('');
arr.push('');
// using pump is deliberate - see the pump docs for why
pump(conn, socket);
pump(socket, conn);
conn.write(arr.join('\r\n'));
});
}
}
export default Client;