/**
* @module kadence/hibernate
*/
'use strict';
const { EventEmitter } = require('node:events');
const { Transform } = require('node:stream');
const merge = require('merge');
const bytes = require('bytes');
const ms = require('ms');
/**
* Represents a bandwidth meter which will trigger hibernation
*/
class HibernatePlugin extends EventEmitter {
static get DEFAULTS() {
return {
limit: '5gb',
interval: '1d',
reject: ['STORE', 'FIND_VALUE']
};
}
/**
* @constructor
* @param {KademliaNode} node
* @param {object} [options]
* @param {string} [options.limit=5gb] - The accounting max bandwidth
* @param {string} [options.interval=1d] - The accounting reset interval
* @param {string[]} [options.reject] - List of methods to reject during
* hibernation
*/
constructor(node, options) {
super();
this.node = node;
this.opts = merge(HibernatePlugin.DEFAULTS, options);
this.limit = bytes(this.opts.limit);
this.interval = ms(this.opts.interval);
this.reject = this.opts.reject;
// This plugin could potentially be used for denial of service attacks
// so let's warn users that it could be problematic
this.node.logger.warn(
'the hibernation plugin may not be suitable for production networks'
);
this.node.rpc.deserializer.prepend(() => this.meter('inbound'));
this.node.rpc.serializer.append(() => this.meter('outbound'));
this.node.use((req, res, next) => this.detect(req, res, next));
this.start();
}
/**
* @property {boolean} hibernating - Indicates if our limits are reached
*/
get hibernating() {
return this.accounting.total >= this.limit;
}
/**
* Starts the accounting reset timeout
*/
start() {
const now = Date.now();
if (this.accounting) {
this.emit('reset', merge({}, this.accounting, {
hibernating: this.hibernating
}));
} else {
this.emit('start');
}
this.accounting = {
start: now,
end: now + this.interval,
inbound: 0,
outbound: 0,
unknown: 0,
get total() {
return this.inbound + this.outbound + this.unknown;
},
get reset() {
return this.end - Date.now();
}
};
setTimeout(() => this.start(), this.interval);
}
/**
* Return a meter stream that increments the given accounting property
* @param {string} type - ['inbound', 'outbound', 'unknown']
* @returns {stream.Transform}
*/
meter(type) {
if (!['inbound', 'outbound'].includes(type)) {
type = 'unknown';
}
const inc = (data) => {
if (Buffer.isBuffer(data)) {
this.accounting[type] += data.length;
} else if (Array.isArray(data)) {
this.accounting[type] += data[1].length;
} else {
this.accounting[type] = Buffer.from(data).length;
}
}
return new Transform({
transform: (data, enc, callback) => {
inc(data);
callback(null, data);
},
objectMode: true
});
}
/**
* Check if hibernating when messages received
* @param {AbstractNode~request} request
* @param {AbstractNode~response} response
* @param {AbstractNode~next} next
*/
detect(request, response, next) {
if (this.hibernating && this.reject.includes(request.method)) {
next(new Error(`Hibernating, try ${request.method} again later`));
} else {
next();
}
}
}
/**
* Regsiters a {@link HibernatePlugin} with an {@link AbstractNode}
* @param {object} [options]
* @param {string} [options.limit=5gb] - The accounting max bandwidth
* @param {string} [options.interval=1d] - The accounting reset interval
* @param {string[]} [options.reject] - List of methods to reject during
* hibernation
*/
module.exports = function(options) {
return function(node) {
return new module.exports.HibernatePlugin(node, options);
};
};
module.exports.HibernatePlugin = HibernatePlugin;