node-kademlia.js

'use strict';

const async = require('async');
const { Writable: WritableStream } = require('node:stream');
const constants = require('./constants');
const { knuthShuffle: shuffle } = require('knuth-shuffle');
const utils = require('./utils');
const AbstractNode = require('./node-abstract');
const KademliaRules = require('./rules-kademlia');
const ContactList = require('./contact-list');
const MetaPipe = require('metapipe');


/**
 * Extends {@link AbstractNode} with Kademlia-specific rules
 * @class
 * @extends {AbstractNode}
 */
class KademliaNode extends AbstractNode {

  /**
   * @typedef {object} KademliaNode~entry
   * @property {string|object|array} value - The primary entry value
   * @property {string} publisher - Node identity of the original publisher
   * @property {number} timestamp - Last update/replicate time
   */

  /**
   * @constructor
   */
  constructor(options) {
    super(options);

    this._lookups = new Map(); // NB: Track the last lookup time for buckets
    this._pings = new Map();
    this._updateContactQueue = async.queue(
      (task, cb) => this._updateContactWorker(task, cb),
      1
    );

    this.replicatePipeline = new MetaPipe({ objectMode: true });
    this.expirePipeline = new MetaPipe({ objectMode: true });
  }

  /**
   * Adds the kademlia rule handlers before calling super#listen()
   */
  listen() {
    let handlers = new KademliaRules(this);

    this.use('PING', handlers.ping.bind(handlers));
    this.use('STORE', handlers.store.bind(handlers));
    this.use('FIND_NODE', handlers.findNode.bind(handlers));
    this.use('FIND_VALUE', handlers.findValue.bind(handlers));

    setInterval(
      utils.preventConvoy(() => this.refresh(0)),
      constants.T_REFRESH
    );
    setInterval(
      utils.preventConvoy(() => this.replicate(() => this.expire())),
      constants.T_REPLICATE
    );

    super.listen(...arguments);
  }

  /**
   * Inserts the given contact into the routing table and uses it to perform
   * a {@link KademliaNode#iterativeFindNode} for this node's identity,
   * then refreshes all buckets further than it's closest neighbor, which will
   * be in the occupied bucket with the lowest index
   * @param {Bucket~contact} peer - Peer to bootstrap from
   * @param {function} [joinListener] - Function to set as join listener
   * @returns {Promise}
   */
  join(peer, callback) {
    if (typeof callback === 'function') {
      return this._join(peer).then(function() {
        callback(null, ...arguments);
      }, callback);
    } else {
      return this._join(peer);
    }
  }

  /**
   * @private
   */
  _join([identity, contact]) {
    return new Promise((resolve, reject) => {
      this.router.addContactByNodeId(identity, contact);
      async.series([
        (next) => this.iterativeFindNode(this.identity.toString('hex'), next),
        (next) => this.refresh(this.router.getClosestBucket() + 1, next)
      ], (err) => {
        if (err) {
          reject(err);
        } else {
          resolve();
        }
      });
    });
  }

  /**
   * Sends a PING message to the supplied contact, resolves with latency
   * @param {Bucket~contact} peer
   * @param {KademliaNode~pingCallback} [callback]
   * @returns {Promise<number>}
   */
  ping(contact, callback) {
    if (typeof callback ==='function') {
      return this._ping(contact).then(function() {
        callback(null, ...arguments);
      }, callback);
    } else {
      return this._ping(contact);
    }
  }
  /**
   * @callback KademliaNode~pingCallback
   * @param {error|null} error
   * @param {number} latency - Milliseconds before response received
   */

  /**
   * @private
   */
  _ping(contact) {
    return new Promise((resolve, reject) => {
      const start = Date.now();

      this.send('PING', [], contact, (err) => {
        if (err) {
          return reject(err);
        }

        resolve(Date.now() - start);
      });
    });
  }

  /**
   * @private
   */
  _createStorageItem(value) {
    const keys = Object.keys(value);
    const alreadyHasMetadata = keys.includes('value') &&
                               keys.includes('publisher') &&
                               keys.includes('timestamp');

    if (alreadyHasMetadata) {
      value.timestamp = Date.now();
      value.publisher = value.publisher.toString('hex');
      return value;
    }

    return {
      value: value,
      timestamp: Date.now(),
      publisher: this.identity.toString('hex')
    };
  }

  /**
   * Performs a {@link KademliaNode#iterativeFindNode} to collect K contacts
   * nearest to the given key, sending a STORE message to each of them.
   * @param {buffer|string} key - Key to store data under
   * @param {buffer|string|object} value - Value to store by key
   * @param {KademliaNode~iterativeStoreCallback} callback
   * @returns {Promise<number>}
   */
  iterativeStore(key, value, callback) {
    if (typeof callback === 'function') {
      return this._iterativeStore(key, value).then(function() {
        callback(null, ...arguments);
      }, callback);
    } else {
      return this._iterativeStore(key, value);
    }
  }
  /**
   * Note that if there is a protocol/validation error, you will not receive
   * it as an error in the callback. Be sure to also check that stored > 0 as
   * part of error handling here.
   * @callback KademliaNode~iterativeStoreCallback
   * @param {error|null} error
   * @param {number} stored - Total nodes who stored the pair
   */

  /**
   * @private
   */
  _iterativeStore(key, value) {
    return new Promise((resolve, reject) => {
      key = key.toString('hex');
      let stored = 0;

      const createStoreRpc = (target) => {
        return ['STORE', [key, this._createStorageItem(value)], target];
      };

      const dispatchStoreRpcs = (contacts, callback) => {
        async.eachLimit(contacts, constants.ALPHA, (target, done) => {
          this.send(...createStoreRpc(target), (err) => {
            stored = err ? stored : stored + 1;
            done();
          });
        }, callback);
      };

      async.waterfall([
        (next) => this.iterativeFindNode(key, next),
        (contacts, next) => dispatchStoreRpcs(contacts, next),
        (next) => {
          this.storage.put(key, this._createStorageItem(value), {
            valueEncoding: 'json'
          }, next);
        }
      ], () => {
        if (stored === 0) {
          return reject(new Error('Failed to stored entry with peers'));
        }
        resolve(stored);
      });
    });
  }

  /**
   * Basic kademlia lookup operation that builds a set of K contacts closest
   * to the given key
   * @param {buffer|string} key - Reference key for node lookup
   * @param {KademliaNode~iterativeFindNodeCallback} [callback]
   * @returns {Promise<Bucket~contact[]>}
   */
  iterativeFindNode(key, callback) {
    key = key.toString('hex');

    if (typeof callback === 'function') {
      return this._iterativeFind('FIND_NODE', key).then(function() {
        callback(null, ...arguments);
      }, callback);
    } else {
      return this._iterativeFind('FIND_NODE', key);
    }
  }
  /**
   * @callback KademliaNode~iterativeFindNodeCallback
   * @param {error|null} error
   * @param {Bucket~contact[]} contacts - Result of the lookup operation
   */

  /**
   * Kademlia search operation that is conducted as a node lookup and builds
   * a list of K closest contacts. If at any time during the lookup the value
   * is returned, the search is abandoned. If no value is found, the K closest
   * contacts are returned. Upon success, we must store the value at the
   * nearest node seen during the search that did not return the value.
   * @param {buffer|string} key - Key for value lookup
   * @param {KademliaNode~iterativeFindValueCallback} [callback]
   * @returns {Promise<object>}
   */
  iterativeFindValue(key, callback) {
    key = key.toString('hex');

    if (typeof callback === 'function') {
      return this._iterativeFind('FIND_VALUE', key).then(function() {
        callback(null, ...arguments);
      }, callback);
    } else {
      return this._iterativeFind('FIND_VALUE', key);
    }
  }
  /**
   * @callback KademliaNode~iterativeFindValueCallback
   * @param {error|null} error
   * @param {KademliaNode~entry} value
   * @param {null|Bucket~contact} contact - Contact responded with entry
   */

  /**
   * Performs a scan of the storage adapter and performs
   * republishing/replication of items stored. Items that we did not publish
   * ourselves get republished every T_REPLICATE. Items we did publish get
   * republished every T_REPUBLISH.
   * @param {KademliaNode~replicateCallback} [callback]
   * @returns {Promise}
   */
  replicate(callback) {
    if (typeof callback === 'function') {
      return this._replicate().then(callback, callback);
    } else {
      return this._replicate();
    }
  }
  /**
   * @callback KademliaNode~replicateCallback
   * @param {error|null} error
   */

  /**
   * @private
   */
  _replicate() {
    const self = this;
    const now = Date.now();

    return new Promise((resolve, reject) => {
      const itemStream = this.storage.createReadStream({
        valueEncoding: 'json'
      });
      const replicateStream = new WritableStream({
        objectMode: true,
        write: maybeReplicate
      });

      function maybeReplicate({ key, value }, enc, next) {
        const isPublisher = value.publisher === self.identity.toString('hex');
        const republishDue = (value.timestamp + constants.T_REPUBLISH) <= now;
        const replicateDue = (value.timestamp + constants.T_REPLICATE) <= now;
        const shouldRepublish = isPublisher && republishDue;
        const shouldReplicate = !isPublisher && replicateDue;

        if (shouldReplicate || shouldRepublish) {
          return self.iterativeStore(key, value, next);
        }

        next();
      }

      function triggerCallback(err) {
        itemStream.removeAllListeners();
        replicateStream.removeAllListeners();

        if (err) {
          return reject(err);
        }

        resolve();
      }

      itemStream.on('error', triggerCallback);
      replicateStream.on('error', triggerCallback);
      replicateStream.on('finish', triggerCallback);
      itemStream.pipe(this.replicatePipeline).pipe(replicateStream);
    });
  }

  /**
   * Items expire T_EXPIRE seconds after the original publication. All items
   * are assigned an expiration time which is "exponentially inversely
   * proportional to the number of nodes between the current node and the node
   * whose ID is closest to the key", where this number is "inferred from the
   * bucket structure of the current node".
   * @param {KademliaNode~expireCallback} [callback]
   * @returns {Promise}
   */
  expire(callback) {
    if (typeof callback === 'function') {
      return this._expire().then(callback, callback);
    } else {
      return this._expire();
    }
  }
  /**
   * @callback KademliaNode~expireCallback
   * @param {error|null} error
   */

  /**
   * @private
   */
  _expire() {
    const self = this;
    const now = Date.now();

    return new Promise((resolve, reject) => {
      const itemStream = this.storage.createReadStream({
        valueEncoding: 'json'
      });
      const expireStream = new WritableStream({
        objectMode: true,
        write: maybeExpire
      });

      function maybeExpire({ key, value }, enc, next) {
        if ((value.timestamp + constants.T_EXPIRE) <= now) {
          return self.storage.del(key, next);
        }

        next();
      }

      function triggerCallback(err) {
        itemStream.removeAllListeners();
        expireStream.removeAllListeners();

        if (err) {
          return reject(err);
        }

        resolve();
      }

      itemStream.on('error', triggerCallback);
      expireStream.on('error', triggerCallback);
      expireStream.on('finish', triggerCallback);
      itemStream.pipe(this.expirePipeline).pipe(expireStream);
    });
  }

  /**
   * If no node lookups have been performed in any given bucket's range for
   * T_REFRESH, the node selects a random number in that range and does a
   * refresh, an iterativeFindNode using that number as key.
   * @param {number} startIndex - bucket index to start refresh from
   * @param {KademliaNode~refreshCallback} [callback]
   * @returns {Promise}
   */
  refresh(startIndex = 0, callback) {
    if (typeof callback === 'function') {
      return this._refresh(startIndex).then(callback, callback);
    } else {
      return this._refresh(startIndex);
    }
  }
  /**
   * @callback KademliaNode~refreshCallback
   * @param {error|null} error
   * @param {array} bucketsRefreshed
   */

  /**
   * @private
   */
  _refresh(startIndex) {
    const now = Date.now();
    const indices = [
      ...this.router.entries()
    ].slice(startIndex).map((entry) => entry[0]);

    // NB: We want to avoid high churn during refresh and prevent further
    // NB: refreshes if lookups in the next bucket do not return any new
    // NB: contacts. To do this we will shuffle the bucket indexes we are
    // NB: going to check and only continue to refresh if new contacts were
    // NB: discovered in the last MAX_UNIMPROVED_REFRESHES consecutive lookups.
    let results = new Set(), consecutiveUnimprovedLookups = 0;

    function isDiscoveringNewContacts() {
      return consecutiveUnimprovedLookups < constants.MAX_UNIMPROVED_REFRESHES;
    }

    return new Promise((resolve, reject) => {
      async.eachSeries(shuffle(indices), (index, next) => {
        if (!isDiscoveringNewContacts()) {
          return resolve();
        }

        const lastBucketLookup = this._lookups.get(index) || 0;
        const needsRefresh = lastBucketLookup + constants.T_REFRESH <= now;

        if (needsRefresh) {
          return this.iterativeFindNode(
            utils.getRandomBufferInBucketRange(this.identity, index)
              .toString('hex'),
            (err, contacts) => {
              if (err) {
                return next(err);
              }

              let discoveredNewContacts = false;

              for (let [identity] of contacts) {
                if (!results.has(identity)) {
                  discoveredNewContacts = true;
                  consecutiveUnimprovedLookups = 0;
                  results.add(identity);
                }
              }

              if (!discoveredNewContacts) {
                consecutiveUnimprovedLookups++;
              }

              next();
            }
          );
        }

        next();
      }, (err) => {
        if (err) {
          return reject(err);
        }

        resolve();
      });
    });
  }

  /**
   * Builds an list of closest contacts for a particular RPC
   * @private
   */
  _iterativeFind(method, key) {
    return new Promise((resolve) => {
      function createRpc(target) {
        return [method, [key], target];
      }

      let shortlist = new ContactList(key, [
        ...this.router.getClosestContactsToKey(key, constants.ALPHA)
      ]);
      let closest = shortlist.closest;

      this._lookups.set(utils.getBucketIndex(this.identity, key), Date.now());

      function iterativeLookup(selection, continueLookup = true) {
        if (!selection.length) {
          return resolve(shortlist.active.slice(0, constants.K));
        }

        async.each(selection, (contact, next) => {
          // NB: mark this node as contacted so as to avoid repeats
          shortlist.contacted(contact);

          this.send(...createRpc(contact), (err, result) => {
            if (err) {
              return next();
            }

            // NB: mark this node as active to include it in any return values
            shortlist.responded(contact);

            // NB: If the result is a contact/node list, just keep track of it
            // NB: Otherwise, do not proceed with iteration, just callback
            if (Array.isArray(result) || method !== 'FIND_VALUE') {
              shortlist
                .add(Array.isArray(result) ? result : [])
                .forEach(contact => {
                  // NB: If it wasn't in the shortlist, we haven't added to the
                  // NB: routing table, so do that now.
                  this._updateContact(...contact);
                });

              return next();
            }

            // NB: If we did get an item back, get the closest node we contacted
            // NB: who is missing the value and store a copy with them
            const closestMissingValue = shortlist.active[0]

            if (closestMissingValue) {
              this.send('STORE', [
                key,
                this._createStorageItem(result)
              ], closestMissingValue, () => null);
            }

            // NB: we found a value, so stop searching
            resolve(result, contact);
          });
        }, () => {

          // NB: If we have reached at least K active nodes, or haven't found a
          // NB: closer node, even on our finishing trip, return to the caller
          // NB: the K closest active nodes.
          if (shortlist.active.length >= constants.K ||
            (closest[0] === shortlist.closest[0] && !continueLookup)
          ) {
            return resolve(shortlist.active.slice(0, constants.K));
          }

          // NB: we haven't discovered a closer node, call k uncalled nodes and
          // NB: finish up
          if (closest[0] === shortlist.closest[0]) {
            return iterativeLookup.call(
              this,
              shortlist.uncontacted.slice(0, constants.K),
              false
            );
          }

          closest = shortlist.closest;

          // NB: continue the lookup with ALPHA close, uncontacted nodes
          iterativeLookup.call(
            this,
            shortlist.uncontacted.slice(0, constants.ALPHA),
            true
          );
        });
      }

      iterativeLookup.call(
        this,
        shortlist.uncontacted.slice(0, constants.ALPHA),
        true
      );
    });
  }
  /**
   * Adds the given contact to the routing table
   * @private
   */
  _updateContact(identity, contact) {
    this._updateContactQueue.push({ identity, contact }, (err, headId) => {
      if (err) {
        this.router.removeContactByNodeId(headId);
        this.router.addContactByNodeId(identity, contact);
      }
    });
  }

  /**
   * Worker for updating contact in a routing table bucket
   * @private
   */
  _updateContactWorker(task, callback) {
    const { identity, contact } = task;

    if (identity === this.identity.toString('hex')) {
      return callback();
    }

    const now = Date.now();
    const reset = 600000;
    const [, bucket, contactIndex] = this.router.addContactByNodeId(
      identity,
      contact
    );

    const [headId, headContact] = bucket.head;
    const lastPing = this._pings.get(headId);

    if (contactIndex !== -1) {
      return callback();
    }

    if (lastPing && lastPing.responded && lastPing.timestamp > (now - reset)) {
      return callback();
    }

    this.ping([headId, headContact], (err) => {
      this._pings.set(headId, { timestamp: Date.now(), responded: !err });
      callback(err, headId);
    });
  }

}

module.exports = KademliaNode;