/**
* @module kadence/quasar
*/
'use strict';
const assert = require('node:assert');
const merge = require('merge');
const async = require('async');
const { knuthShuffle } = require('knuth-shuffle');
const uuid = require('uuid');
const constants = require('./constants');
const utils = require('./utils');
const BloomFilter = require('atbf');
const LruCache = require('lru-cache');
/**
* Implements the handlers for Quasar message types
*/
class QuasarRules {
/**
* @constructor
* @param {module:kadence/quasar~QuasarPlugin} quasar
*/
constructor(quasar) {
this.quasar = quasar;
}
/**
* Upon receipt of a PUBLISH message, we validate it, then check if we or
* our neighbors are subscribed. If we are subscribed, we execute our
* handler. If our neighbors are subscribed, we relay the publication to
* ALPHA random of the closest K. If our neighbors are not subscribed, we
* relay the publication to a random contact
* @param {AbstractNode~request} request
* @param {AbstractNode~response} response
* @param {AbstractNode~next} next
*/
publish(request, response, next) {
/* eslint max-statements: [2, 18] */
let { ttl, topic, uuid, contents } = request.params;
let neighbors = [...this.quasar.node.router.getClosestContactsToKey(
this.quasar.node.identity,
constants.K
).entries()];
if (this.quasar.cached.get(uuid)) {
return next(new Error('Message previously routed'));
}
if (ttl > constants.MAX_RELAY_HOPS || ttl < 0) {
return next(new Error('Message includes invalid TTL'));
}
neighbors = knuthShuffle(neighbors.filter(([nodeId]) => {
return request.params.publishers.indexOf(nodeId) === -1;
})).splice(0, constants.ALPHA);
request.params.publishers.push(this.quasar.node.identity.toString('hex'));
this.quasar.cached.set(uuid, Date.now());
if (this.quasar.isSubscribedTo(topic)) {
this.quasar.groups.get(topic)(contents, topic);
async.each(neighbors, (contact, done) => {
this._relayPublication(request, contact, done);
});
return response.send([]);
}
if (ttl - 1 === 0) {
return response.send([]);
}
async.each(neighbors, (contact, done) => {
this.quasar.pullFilterFrom(contact, (err, filter) => {
if (err) {
return done();
}
if (!QuasarRules.shouldRelayPublication(request, filter)) {
contact = this.quasar._getRandomContact();
}
this._relayPublication(request, contact, done);
});
});
response.send([]);
}
/**
* Upon receipt of a SUBSCRIBE message, we simply respond with a serialized
* version of our attenuated bloom filter
* @param {AbstractNode~request} request
* @param {AbstractNode~response} response
*/
subscribe(request, response) {
response.send(this.quasar.filter.toHexArray());
}
/**
* Upon receipt of an UPDATE message we merge the delivered attenuated bloom
* filter with our own
* @param {AbstractNode~request} request
* @param {AbstractNode~response} response
* @param {AbstractNode~next} next
*/
update(request, response, next) {
if (!Array.isArray(request.params)) {
return next(new Error('Invalid bloom filters supplied'));
}
try {
request.params.forEach(str => assert(utils.isHexaString(str),
'Invalid hex string'));
this.quasar.filter.merge(BloomFilter.from(request.params));
} catch (err) {
return next(err);
}
response.send([]);
}
/**
* Returns a boolean indicating if we should relay the message to the contact
* @param {AbstractNode~request} request
* @param {array} attenuatedBloomFilter - List of topic bloom filters
*/
static shouldRelayPublication(request, filter) {
let negated = true;
filter.forEach((level) => {
if (level.has(request.params.topic)) {
negated = false;
}
});
request.params.publishers.forEach((pub) => {
filter.forEach((level) => {
if (level.has(pub)) {
negated = true;
}
});
});
return !negated;
}
/**
* Takes a request object for a publication and relays it to the supplied
* contact
* @private
*/
_relayPublication(request, contact, callback) {
this.quasar.node.send(
request.method,
merge({}, request.params, { ttl: request.params.ttl - 1 }),
contact,
callback
);
}
}
/**
* Implements the primary interface for the publish-subscribe system
* and decorates the given node object with it's public methods
*/
class QuasarPlugin {
static get PUBLISH_METHOD() {
return 'PUBLISH';
}
static get SUBSCRIBE_METHOD() {
return 'SUBSCRIBE';
}
static get UPDATE_METHOD() {
return 'UPDATE';
}
/**
* @constructor
* @param {KademliaNode} node
*/
constructor(node) {
const handlers = new QuasarRules(this);
this.cached = new LruCache(constants.LRU_CACHE_SIZE)
this.groups = new Map();
this.filter = new BloomFilter({
filterDepth: constants.FILTER_DEPTH,
bitfieldSize: constants.B
});
this._lastUpdate = 0;
this.node = node;
this.node.quasarSubscribe = this.quasarSubscribe.bind(this);
this.node.quasarPublish = this.quasarPublish.bind(this);
this.node.use(QuasarPlugin.UPDATE_METHOD, handlers.update.bind(handlers));
this.node.use(QuasarPlugin.PUBLISH_METHOD,
handlers.publish.bind(handlers));
this.node.use(QuasarPlugin.SUBSCRIBE_METHOD,
handlers.subscribe.bind(handlers));
this.filter[0].add(this.node.identity.toString('hex'));
}
/**
* Returns our ALPHA closest neighbors
* @property {Bucket~contact[]} neighbors
*/
get neighbors() {
return [...this.node.router.getClosestContactsToKey(
this.node.identity.toString('hex'),
constants.ALPHA
).entries()];
}
/**
* Publishes the content to the network by selecting ALPHA contacts closest
* to the node identity (or the supplied routing key). Errors if message is
* unable to be delivered to any contacts. Tries to deliver to ALPHA contacts
* until exhausted.
* @param {string} topic - Identifier for subscribers
* @param {object} contents - Arbitrary publication payload
* @param {object} [options]
* @param {string} [options.routingKey] - Publish to neighbors close to this
* key instead of our own identity
* @param {QuasarPlugin~quasarPublishCallback} [callback]
*/
quasarPublish(topic, contents, options = {}, callback = () => null) {
if (typeof options === 'function') {
callback = options;
options = {};
}
const publicationId = uuid.v4();
const neighbors = [...this.node.router.getClosestContactsToKey(
options.routingKey || this.node.identity.toString('hex'),
this.node.router.size
).entries()];
let deliveries = [];
async.until(() => {
return deliveries.length === constants.ALPHA || !neighbors.length;
}, done => {
const candidates = [];
for (let i = 0; i < constants.ALPHA - deliveries.length; i++) {
candidates.push(neighbors.shift());
}
async.each(candidates, (contact, next) => {
this.node.send(QuasarPlugin.PUBLISH_METHOD, {
uuid: publicationId,
topic,
contents,
publishers: [this.node.identity.toString('hex')],
ttl: constants.MAX_RELAY_HOPS
}, contact, err => {
if (err) {
this.node.logger.warn(err.message);
} else {
deliveries.push(contact);
}
next();
});
}, done);
}, err => {
if (!err && deliveries.length === 0) {
err = new Error('Failed to deliver any publication messages');
}
callback(err, deliveries);
});
}
/**
* @callback QuasarPlugin~quasarPublishCallback
* @param {error|null} err
* @param {Bucket~contact[]} deliveries
*/
/**
* Publishes the content to the network
* @param {string|string[]} topics - Identifier for subscribers
* @param {QuasarPlugin~quasarSubscribeHandler} handler
*/
quasarSubscribe(topics, handler) {
const self = this;
if (Array.isArray(topics)) {
topics.forEach((topic) => addTopicToFilter(topic));
} else {
addTopicToFilter(topics);
}
function addTopicToFilter(topic) {
self.filter[0].add(topic);
self.groups.set(topic, handler);
}
this.pullFilters(() => this.pushFilters());
}
/**
* @callback QuasarPlugin~quasarSubscribeHandler
* @param {object} publicationContent
*/
/**
* Requests neighbor bloom filters and merges with our records
* @param {function} [callback]
*/
pullFilters(callback = () => null) {
const now = Date.now();
if (this._lastUpdate > now - constants.SOFT_STATE_TIMEOUT) {
return callback();
} else {
this._lastUpdate = now;
}
async.each(this.neighbors, (contact, done) => {
this.pullFilterFrom(contact, (err, filter) => {
if (err) {
this.node.logger.warn('failed to pull filter from %s, reason: %s',
contact[0], err.message);
} else {
this.filter.merge(filter);
}
done(err);
});
}, callback);
}
/**
* Requests the attenuated bloom filter from the supplied contact
* @param {Bucket~contact} contact
* @param {function} callback
*/
pullFilterFrom(contact, callback) {
const method = QuasarPlugin.SUBSCRIBE_METHOD;
this.node.send(method, [], contact, (err, result) => {
if (err) {
return callback(err);
}
try {
result.forEach(str => assert(utils.isHexaString(str),
'Invalid hex string'));
return callback(null, BloomFilter.from(result));
} catch (err) {
return callback(err);
}
});
}
/**
* Notifies neighbors that our subscriptions have changed
* @param {function} [callback]
*/
pushFilters(callback = () => null) {
const now = Date.now();
if (this._lastUpdate > now - constants.SOFT_STATE_TIMEOUT) {
return callback();
} else {
this._lastUpdate = now;
}
async.each(this.neighbors, (contact, done) => {
this.pushFilterTo(contact, done);
}, callback);
}
/**
* Sends our attenuated bloom filter to the supplied contact
* @param {Bucket~contact} contact
* @param {function} callback
*/
pushFilterTo(contact, callback) {
this.node.send(QuasarPlugin.UPDATE_METHOD, this.filter.toHexArray(),
contact, callback);
}
/**
* Check if we are subscribed to the topic
* @param {string} topic - Topic to check subscription
* @returns {boolean}
*/
isSubscribedTo(topic) {
return this.filter[0].has(topic) && this.groups.has(topic);
}
/**
* Check if our neighbors are subscribed to the topic
* @param {string} topic - Topic to check subscription
* @returns {boolean}
*/
hasNeighborSubscribedTo(topic) {
let index = 1;
while (this.filter[index]) {
if (this.filter[index].has(topic)) {
return true;
} else {
index++;
}
}
return false;
}
/**
* Returns a random contact from the routing table
* @private
*/
_getRandomContact() {
return knuthShuffle([...this.node.router.getClosestContactsToKey(
this.node.identity.toString('hex'),
this.node.router.size,
true
).entries()]).shift();
}
}
/**
* Registers a {@link module:kadence/quasar~QuasarPlugin} with a {@link KademliaNode}
*/
module.exports = function() {
return function(node) {
return new QuasarPlugin(node);
};
};
module.exports.QuasarPlugin = QuasarPlugin;
module.exports.QuasarRules = QuasarRules;