'use strict';
const uuid = require('uuid');
const async = require('async');
const assert = require('assert');
const bunyan = require('bunyan');
const merge = require('merge');
const constants = require('./constants');
const utils = require('./utils');
const { EventEmitter } = require('events');
const RoutingTable = require('./routing-table');
const Messenger = require('./messenger');
const ErrorRules = require('./rules-errors');
/**
* @typedef {object} AbstractNode~logger
* @property {function} debug - Passed string of debug information
* @property {function} info - Passed string of general information
* @property {function} warn - Passed string of warnings
* @property {function} error - Passed string of error message
*/
/**
* @typedef {object} AbstractNode~transport
* @property {function} read - Returns raw message buffer if available
* @property {function} write - Passed raw message buffer
*/
/**
* @typedef {object} AbstractNode~storage
* @description Implements a subset of the LevelUP interface
* @property {function} get
* @property {function} put
* @property {function} del
* @property {function} createReadStream
*/
/**
* @typedef AbstractNode~request
* @property {array} contact - Peer who sent this request
* @property {string} contact.0 - Peer's node identity
* @property {object} contact.1 - Peer's contact information (varies by plugin)
* @property {array|object} params - Method parameters (varies by method)
* @property {string} method - Method name being called
*/
/**
* @typedef AbstractNode~response
* @property {AbstractNode~responseSend} send
* @property {AbstractNode~responseError} error
*/
/**
* @typedef {function} AbstractNode~next
* @param {error|null} error - Indicates to exit the middleware stack
*/
/**
* @typedef AbstractNode~sendError
* @property {string} message - Error description
* @property {string} type - Error type
* @property {object} request - Request the error is from
* @property {string} request.id - Message id
* @property {array} request.params - Parameters sent
* @property {Bucket~contact} request.target - Contact message was for
* @property {string} request.method - RPC method in message
*/
/**
* @method AbstractNode~responseSend
* @param {array|object} results - Result parameters to respond with
*/
/**
* @method AbstractNode~responseError
* @param {string} errorMessage - Text describing the error encountered
* @param {number} [errorCode] - Error code
*/
/**
* Represents a network node
*/
class AbstractNode extends EventEmitter {
/**
* Join event is triggered when the routing table is no longer empty
* @event AbstractNode#join
*/
/**
* Error event fires when a critical failure has occurred; if no handler is
* specified, then it will throw
* @event AbstractNode#error
* @type {Error}
*/
static get DEFAULTS() {
return {
logger: bunyan.createLogger({ name: 'kadence' }),
identity: utils.getRandomKeyBuffer(),
transport: null,
storage: null,
messenger: new Messenger(),
contact: {}
};
}
static validate(options) {
if (typeof options.identity === 'string') {
options.identity = Buffer.from(options.identity, 'hex');
}
utils.validateStorageAdapter(options.storage);
utils.validateLogger(options.logger);
utils.validateTransport(options.transport);
assert.ok(utils.keyBufferIsValid(options.identity), 'Invalid identity');
}
/**
* Contructs the primary interface for a kad node
* @constructor
* @param {object} options
* @param {AbstractNode~transport} options.transport - See {@tutorial transport-adapters}
* @param {buffer} options.identity - See {@tutorial identities}
* @param {Bucket~contact} options.contact - See {@tutorial identities}
* @param {AbstractNode~storage} options.storage - See {@tutorial storage-adapters}
* @param {AbstractNode~logger} [options.logger]
* @param {Messenger} [options.messenger] - See {@tutorial messengers}
*/
constructor(options) {
AbstractNode.validate(options = merge(AbstractNode.DEFAULTS, options));
super();
this._middlewares = { '*': [] };
this._errors = { '*': [] };
this._pending = new Map();
this.rpc = options.messenger;
this.transport = options.transport;
this.storage = options.storage;
this.identity = options.identity;
this.contact = options.contact;
this.logger = options.logger;
this.router = new RoutingTable(this.identity);
this._init();
}
/**
* Establishes listeners and creates the message pipeline
* @private
*/
_init() {
this.transport.on('error', (err) => {
this.logger.warn(err.message.toLowerCase());
if (err.dispose && this._pending.get(err.dispose)) {
const pending = this._pending.get(err.dispose);
err.type = 'TIMEOUT';
pending.handler(err);
this._pending.delete(err.dispose);
}
});
this.transport.on('data', data => {
this.rpc.deserializer.create()
.once('error', err => this.logger.warn(err.message.toLowerCase()))
.once('data', data => this._process(data))
.write(data);
});
setInterval(() => this._timeout(), constants.T_RESPONSETIMEOUT);
}
/**
* Processes deserialized messages
* @private
*/
_process([message, contact]) {
/* eslint complexity: [2, 8] */
this._updateContact(...contact.payload.params);
// NB: If we are receiving a request, then pass it through the middleware
// NB: stacks to process it
if (message.type === 'request') {
return this.receive(
merge({}, message.payload, { contact: contact.payload.params }),
{
send: (data) => {
this.rpc.serializer.create()
.once('data', data => this.transport.write(data))
.once('error', err => this.logger.warn(err.message.toLowerCase()))
.write([
merge({ id: message.payload.id }, { result: data }),
[this.identity.toString('hex'), this.contact],
contact.payload.params
]);
},
error: (msg, code = -32000) => {
this.rpc.serializer.create()
.once('data', data => this.transport.write(data))
.once('error', err => this.logger.warn(err.message.toLowerCase()))
.write([
merge({ id: message.payload.id }, {
error: { message: msg, code }
}),
[this.identity.toString('hex'), this.contact],
contact.payload.params
]);
}
}
);
}
// NB: If we aren't expecting this message, just throw it away
if (!this._pending.has(message.payload.id)) {
return this.logger.warn(
`received late or invalid response from ${contact.payload.params[0]}`
);
}
// NB: Check to make sure that the response comes from the identity
// NB: that the request was origninally intended for, unless the message
// NB: was sent to a null identity (such as during bootstrapping)
const { handler, fingerprint } = this._pending.get(message.payload.id);
const nullFingerprint = Buffer.alloc(constants.B / 8, 0).toString('hex');
const msgSentToNullFingerprint = fingerprint === nullFingerprint;
const fingerprintsMatch = fingerprint === contact.payload.params[0];
if (!msgSentToNullFingerprint && !fingerprintsMatch) {
handler(new Error(
'Response fingerprint differs from request destination'
), null);
this._pending.delete(message.payload.id);
return;
}
// NB: Otherwise, check if we are waiting on a response to a pending
// NB: message and fire the result handler
const handlerArgs = [
(message.type === 'error'
? new Error(message.payload.error.message)
: null),
(message.type === 'success'
? message.payload.result
: null)
];
handler(...handlerArgs);
this._pending.delete(message.payload.id);
}
/**
* Enumerates all pending handlers and fires them with a timeout error if
* they have been pending too long
* @private
*/
_timeout() {
let now = Date.now();
let err = new Error('Timed out waiting for response');
err.type = 'TIMEOUT';
for (let [id, entry] of this._pending.entries()) {
if (entry.timestamp + constants.T_RESPONSETIMEOUT >= now) {
continue;
}
entry.handler(err);
this._pending.delete(id);
}
}
/**
* Adds the given contact to the routing table
* @private
*/
_updateContact(identity, contact) {
if (identity === this.identity.toString('hex')) {
return null;
} else {
return this.router.addContactByNodeId(identity, contact);
}
}
/**
* Validates the contact tuple
* @private
*/
_validateContact(target) {
return (Array.isArray(target) && target[0] && target[1])
&& (this.transport._validate ? this.transport._validate(target) : true);
}
/**
* Sends the [method, params] to the contact and executes the handler on
* response or timeout
* @param {string} method - RPC method name
* @param {object|array} params - RPC parameters
* @param {Bucket~contact} contact - Destination address information
* @param {AbstractNode~sendCallback} [callback]
* @returns {Promise<object|array,Error>}
*/
send(method, params, target, handler) {
if (typeof handler === 'function') {
return this._send(method, params, target).then(function() {
handler(null, ...arguments);
}, handler);
} else {
return this._send(method, params, target);
}
}
/**
* @callback AbstractNode~sendCallback
* @param {null|AbstractNode~sendError} error
* @param {object|array|string|number} result
*/
/**
* @private
*/
_send(method, params, target) {
return new Promise((resolve, reject) => {
const id = uuid();
const timestamp = Date.now();
if (!this._validateContact(target)) {
return reject(new Error('Refusing to send message to invalid contact'));
}
target[0] = target[0].toString('hex'); // NB: Allow identity to be a buffer
function wrapped(err, ...params) {
if (err) {
err.request = { id, method, params, target };
return reject(err);
}
resolve(...params);
}
this._pending.set(id, {
handler: wrapped,
timestamp,
fingerprint: target[0]
});
this.rpc.serializer.create()
.once('error', err => reject(err))
.once('data', data => this.transport.write(data))
.write([
{ id, method, params },
[this.identity.toString('hex'), this.contact],
target
]);
});
}
/**
* Accepts an arbitrary function that receives this node as context
* for mounting protocol handlers and extending the node with other
* methods
* @param {function} plugin - {@tutorial plugins}
*/
plugin(func) {
assert(typeof func === 'function', 'Invalid plugin supplied');
return func(this);
}
/**
* Mounts a message handler route for processing incoming RPC messages
* @param {string} [method] - RPC method name to route through
* @param {AbstractNode~middleware} middleware
*/
use(method, middleware) {
if (typeof method === 'function') {
middleware = method;
method = '*';
}
// NB: If middleware function takes 4 arguments, it is an error handler
const type = middleware.length === 4 ? '_errors' : '_middlewares';
const stack = this[type][method] = this[type][method] || [];
stack.push(middleware);
}
/**
* @callback AbstractNode~middleware
* @param {error} [error] - Error object resulting from a middleware
* @param {AbstractNode~request} request - The incoming message object
* @param {AbstractNode~response} response - The outgoing response object
* @param {AbstractNode~next} next - Call to proceed to next middleware
*/
/**
* Passes through to the {@link AbstractNode~transport}
*/
listen() {
let handlers = new ErrorRules(this);
this.use(handlers.methodNotFound.bind(handlers));
this.use(handlers.internalError.bind(handlers));
this.transport.listen(...arguments);
}
/**
* Processes a the given arguments by sending them through the appropriate
* middleware stack
* @param {AbstractNode~request} request
* @param {AbstractNode~response} response
*/
receive(request, response) {
const self = this;
const { method } = request;
// NB: First pass the the arguments through the * middleware stack
// NB: Then pass the arguments through the METHOD middleware stack
function processRequest(callback) {
async.series([
(next) => self._middleware('*', [request, response], next),
(next) => self._middleware(method, [request, response], next)
], callback)
}
// NB: Repeat the same steps for the error stack
function handleErrors(err) {
async.series([
(next) => self._error('*', [err, request, response], next),
(next) => self._error(method, [err, request, response], next)
]);
}
processRequest(handleErrors);
}
/**
* Send the arguments through the stack type
* @private
*/
_stack(type, method, args, callback) {
async.eachSeries(this[type][method] || [], (middleware, done) => {
try {
middleware(...args, done);
} catch (err) {
done(err);
}
}, callback);
}
/**
* Send the arguments through the middleware
* @private
*/
_middleware() {
this._stack('_middlewares', ...arguments);
}
/**
* Send the arguments through the error handlers
* @private
*/
_error() {
this._stack('_errors', ...arguments);
}
}
module.exports = AbstractNode;