'use strict';
const merge = require('merge');
const assert = require('assert');
const { createLogger } = require('bunyan');
const { EventEmitter } = require('events');
/**
* Sets up a proxy server for use by the remote tunnel server
*/
class Proxy extends EventEmitter {
static get DEFAULTS() {
return {
id: null,
logger: createLogger({ name: 'diglet' }),
};
}
/**
* Manages proxy for tunneling hosts to connect
* @param {object} options
* @param {string} options.id - Unique ID for this proxy
* @param {Object} [options.logger=console] - Logger to use
*/
constructor(options) {
super();
this._opts = this._checkOptions(merge(Proxy.DEFAULTS, options));
this._waitingHandlers = [];
this._connectedSockets = [];
this._logger = this._opts.logger;
this._created = Date.now();
}
/**
* Validates options given to constructor
* @private
*/
_checkOptions(o) {
assert(typeof o.id === 'string', 'Invalid proxyId');
return o;
}
get id() {
return this._opts.id;
}
get info() {
return {
id: this.id,
connectedSockets: this._connectedSockets.length,
waitingHandlers: this._waitingHandlers.length,
createdTimestamp: this._created
};
}
/**
* Returns a connected socket off the list to process a request and places it
* back when the handler is finished
* @param {Proxy~socketHandler} socketHandler
*/
pop(socketHandler) {
const socket = this._connectedSockets.shift();
this._logger.info('getting socket from proxy tunnel');
if (!socket) {
this._logger.warn('no socket available, queuing handler');
return this._waitingHandlers.push(socketHandler);
} else if (socket.destroyed || !socket.writable) {
this._logger.warn('got destroyed socket, getting another...');
this._handleSocketClose(socket);
return this.pop(socketHandler);
}
this._logger.info('got tunnel socket, passing to handler');
socketHandler(socket, () => {
this._logger.info('socket handler finished, adding back to pool');
if (!socket.destroyed) {
this._connectedSockets.push(socket);
}
if (this._connectedSockets.length !== 0) {
this._processNextWaitingHandler();
}
});
}
/**
* @callback Proxy~socketHandler
* @param {net.Socket} socket - The socket back to the client
* @param {Proxy~socketHandlerCallback}
*/
/**
* @callback Proxy~socketHandlerCallback
* @param {Error|null} error - Possible error during handling
*/
/**
* Pulls the next waiting hanlder off the list and processes it
* @private
*/
_processNextWaitingHandler() {
const waitingHandler = this._waitingHandlers.shift();
if (waitingHandler) {
this.pop(waitingHandler);
}
}
/**
* Cleans up waiting and open connections
*/
clean() {
const destroyed = [];
this._logger.info('cleaning connection pool');
this._waitingHandlers.forEach(handler => handler(null));
this._connectedSockets.forEach(socket => {
if (socket.destroyed || !socket.writable) {
destroyed.push(socket);
}
});
destroyed.forEach(socket => {
this._connectedSockets.splice(this._connectedSockets.indexOf(socket), 1);
});
this.emit('end');
}
/**
* Processes incoming connections from tunnel client
* @param {net.Socket} socket - Raw socket from an incoming connection
*/
push(socket) {
this._logger.info('handling incoming tunnel connection');
socket.on('close', () => this._handleSocketClose(socket));
socket.on('error', (err) => this._handleSocketError(socket, err));
this._connectedSockets.push(socket);
this._processNextWaitingHandler();
}
/**
* Handles a socket error
* @private
*/
_handleSocketError(socket, err) {
this._logger.error('socket encountered an error: %s', err.message);
this.clean();
socket.destroy();
}
/**
* Handles a closed tunnel socket
* @private
*/
_handleSocketClose(socket) {
const index = this._connectedSockets.indexOf(socket);
if (index === -1) {
return;
}
this._connectedSockets.splice(index, 1);
}
}
module.exports = Proxy;