Source: lib/s-bucket.js

'use strict';

const merge = require('merge');
const leveldown = require('leveldown');
const {EventEmitter} = require('events');
const constants = require('./constants');
const utils = require('./utils');
const WritableFileStream = require('./write-stream');
const ReadableFileStream = require('./read-stream');
const BlockStream = require('./block-stream');
const async = require('async');

/**
 * Capped LevelDB database within a {@link Btable}
 */
class Sbucket extends EventEmitter {

  static get CLOSED() {
    return 4;
  }

  static get CLOSING() {
    return 3;
  }

  static get OPENED() {
    return 2;
  }

  static get OPENING() {
    return 1;
  }

  static get SIZE_START_KEY() {
    return '0';
  }

  static get SIZE_END_KEY() {
    return 'z';
  }

  static get DEFAULTS() {
    return {
      maxOpenFiles: 1000,
      compression: false,
      cacheSize: 8 * (1024 * 1024),
      createIfMissing: true,
      errorIfExists: false,
      writeBufferSize: 4 * (1024 * 1024),
      blockSize: 4096,
      blockRestartInterval: 16,
      maxSize: constants.S,
      chunkSize: constants.C
    };
  }

  /**
   * @constructor
   * @param {String} dbPath - The path to database on disk
   * @param {Object} [options] - Options to pass through to leveldown#open
   * @param {Number} [options.maxOpenFiles=1000]
   * @param {Boolean} [options.compression=false]
   * @param {Number} [options.cacheSize=8388608]
   * @param {Boolean} [options.createIfMissing=true]
   * @param {Boolean} [options.errorIfExists=false]
   * @param {Number} [options.writeBufferSize=4194304]
   * @param {Number} [options.blockSize=4096]
   * @param {Number} [options.blockRestartInterval=16]
   */
  constructor(dbPath, options) {
    super();
    this.setMaxListeners(Infinity);
    this._dbPath = dbPath;
    this._options = merge(Sbucket.DEFAULTS, options);
    this._db = leveldown(dbPath);
    this._pendingOperations = 0;
    this._maxSize = this._options.maxSize;
    this.readyState = Sbucket.CLOSED;
  }

  /**
   * Triggered when the underlying database opens
   * @event Sbucket#open
   */

  /**
   * Triggered when the underlying database closes
   * @event Sbucket#close
   */

  /**
   * Triggered when there are no more pending operations
   * @event Sbucket#idle
   */

  /**
   * Triggered when the bucket is locked for flushing
   * @event Sbucket#locked
   */

  /**
   * Triggered when the bucket is unlocked
   * @event Sbucket#unlocked
   */

  /**
   * Opens the underlying database
   * @fires Sbucket#open
   * @param {Sbucket~openCallback}
   */
  open(callback=utils.noop) {
    const self = this;

    function _open() {
      self.readyState = Sbucket.OPENING;
      self._db.open(self._options, function(err) {
        if (err) {
          return self.emit('error', err);
        }

        self.readyState = Sbucket.OPENED;
        self.emit('open');
        self._idleCheckInterval = setInterval(
          () => self._checkIdleState(),
          constants.SBUCKET_IDLE
        );
      });
    }

    function _onError(err) {
      self.removeListener('open', _onOpen);
      callback(err);
    }

    function _onOpen() {
      self.removeListener('error', _onError);
      callback(null);
    }

    this.once('open', _onOpen).once('error', _onError);

    if (this.readyState === Sbucket.OPENED) {
      return this.emit('open');
    }

    if (this.readyState === Sbucket.OPENING) {
      return;
    }

    if (this.readyState === Sbucket.CLOSING) {
      return this.once('close', _open);
    }

    _open();
  }
  /**
   * @callback Sbucket~openCallback
   * @param {Error} [error]
   */

  /**
   * Closes the underlying database
   * @fires Sbucket#close
   * @param {Sbucket~closeCallback}
   */
  close(callback=utils.noop) {
    const self = this;

    function _close() {
      self.readyState = Sbucket.CLOSING;
      self._db.close(function(err) {
        if (err) {
          return self.emit('error', err);
        }

        self.readyState = Sbucket.CLOSED;
        self.emit('close');
        clearInterval(self._idleCheckInterval);
      });
    }

    function _onError(err) {
      self.removeListener('close', _onClose);
      callback(err);
    }

    function _onClose() {
      self.removeListener('error', _onError);
      callback(null);
    }

    this.once('close', _onClose).once('error', _onError);

    if (this.readyState === Sbucket.CLOSED) {
      return this.emit('close');
    }

    if (this.readyState === Sbucket.CLOSING) {
      return;
    }

    if (this.readyState === Sbucket.OPENING) {
      return this.once('open', _close);
    }

    _close();
  }
  /**
   * @callback Sbucket~closeCallback
   * @param {Error} [error]
   */

  /**
   * Determines if the file is already stored in the db
   * @param {String} key - The key for the file stored
   * @param {Sbucket~existsCallback}
   */
  exists(key, callback) {
    this._incPendingOps();
    this._db.get(utils.createItemKeyFromIndex(key, 0), (err) => {
      this._decPendingOps();
      callback(null, !err);
    });
  }
  /**
   * @callback Sbucket~existsCallback
   * @param {Error} [error]
   * @param {Boolean} fileDoesExist
   */

  /**
   * Deletes the file chunks from the database
   * @param {String} key - The key for the file stored
   * @param {Sbucket~unlinkCallback}
   */
  unlink(key, callback) {
    const self = this;
    let index = 0;

    function _del(index, callback) {
      const itemKey = utils.createItemKeyFromIndex(key, index);

      self._db.get(itemKey, function(err) {
        index++;

        if (!err) {
          self._db.del(itemKey, () => _del(index, callback));
        } else if (utils.isNotFoundError(err)) {
          self._decPendingOps();
          callback(null);
        } else {
          self._decPendingOps();
          callback(err);
        }
      });
    }

    this._incPendingOps();
    _del(index, callback);
  }
  /**
   * @callback Sbucket~unlinkCallback
   * @param {Error} [error]
   */

  /**
   * Reads the file at the given key into a buffer
   * @param {String} key - The key for the file to read
   * @param {Sbucket~readFileCallback}
   */
  readFile(key, callback) {
    let fileBuffer = Buffer.from([], 'binary');
    const readStream = this.createReadStream(key);

    readStream.on('data', (data) => {
      fileBuffer = Buffer.concat([fileBuffer, data]);
    });

    readStream.on('end', () => {
      this._decPendingOps();
      callback(null, fileBuffer);
    });

    readStream.on('error', (err) => {
      this._decPendingOps();
      readStream.removeAllListeners();
      callback(err);
    });

    this._incPendingOps();
  }
  /**
   * @callback Sbucket~readFileCallback
   * @param {Error} [error]
   * @param {Buffer} fileBuffer
   */

  /**
   * Writes the buffer to the given key
   * @param {String} key - The key for the file to write
   * @param {Buffer} buffer - The data to write to the given key
   * @param {Sbucket~writeFileCallback}
   */
  writeFile(key, buffer, callback) {
    const self = this;
    const writeStream = this.createWriteStream(key);
    let whichSlice = 0;

    function _writeFileSlice() {
      var startIndex = whichSlice * self._options.chunkSize;
      var endIndex = startIndex + self._options.chunkSize;
      var bufferSlice = buffer.slice(startIndex, endIndex);

      if (bufferSlice.length === 0) {
        return writeStream.end();
      }

      whichSlice++;
      writeStream.write(bufferSlice);
      _writeFileSlice();
    }

    writeStream.on('finish', () => {
      this._decPendingOps();
      callback(null);
    });

    writeStream.on('error', (err) => {
      this._decPendingOps();
      writeStream.removeAllListeners();
      callback(err);
    });

    this._incPendingOps();
    this.unlink(key, _writeFileSlice);
  }
  /**
   * @callback Sbucket~writeFileCallback
   * @param {Error} [error]
   */

  /**
   * Returns a readable stream of the file at the given key
   * @param {String} key - The key for the file to read
   * @returns {ReadableFileStream}
   */
  createReadStream(key) {
    const rs = new ReadableFileStream({
      sBucket: this,
      fileKey: key
    });

    this._incPendingOps();
    rs.on('end', () => this._decPendingOps());

    return rs;
  }

  /**
   * Returns a writable stream for a file at the given key
   * @param {String} key - The key for the file to read
   * @returns {WritableFileStream}
   */
  createWriteStream(key) {
    const bs = new BlockStream({
      chunkSize: this._options.chunkSize,
      padLastChunk: false
    });
    const ws = new WritableFileStream({
      sBucket: this,
      fileKey: key
    });

    // NB: Expose the underyling writable stream's #destroy method
    bs.destroy = (cb) => ws.destroy(cb);

    this._incPendingOps();
    bs.pipe(ws).on('finish', () => this._decPendingOps());

    return bs;
  }

  /**
   * Get stats for this bucket
   * @param {Sbucket~statCallback}
   */
  stat(callback) {
    const [start, end] = [Sbucket.SIZE_START_KEY, Sbucket.SIZE_END_KEY];

    this._incPendingOps();
    this._db.approximateSize(start, end, (err, size) => {
      this._decPendingOps();

      if (err) {
        return callback(err);
      }

      callback(null, {
        size: size,
        free: this._maxSize - size
      });
    });
  }
  /**
   * @callback Sbucket~statCallback
   * @param {Error} [error]
   * @param {Object} bucketStats
   * @param {Number} bucketStats.size - The used space in bytes
   * @param {Number} bucketStats.free - The free space left in bytes
   */

  /**
   * Get a list of file keys in the bucket and their approximate size
   * @param {Sbucket~listCallback}
   */
  list(callback) {
    const self = this;
    const iterator = this._db.iterator({
      gte: Sbucket.SIZE_START_KEY,
      lte: Sbucket.SIZE_END_KEY,
      values: false,
      keyAsBuffer: false
    });
    const keys = {};
    let currentResult = null;

    function _test() {
      return currentResult === null;
    }

    function _accumulateKey(next) {
      iterator.next((err, key) => {
        if (err) {
          return next(err);
        }

        if (!key) {
          currentResult = null;
          return next();
        }

        currentResult = key.split(' ')[0];
        keys[currentResult] = keys[currentResult]
                            ? keys[currentResult] + self._options.chunkSize
                            : self._options.chunkSize;
        next();
      });
    }

    this._incPendingOps();
    async.doUntil(_accumulateKey, _test, (err) => {
      this._decPendingOps();

      if (err) {
        return callback(err);
      }

      var results = [];

      for (var key in keys) {
        results.push({
          baseKey: key,
          approximateSize: keys[key]
        });
      }

      callback(null, results);
    });
  }
  /**
   * @callback Sbucket~listCallback
   * @param {Error} [error]
   * @param {Object[]} results
   * @param {String} results.baseKey
   * @param {Number} results.approximateSize
   */

  /**
   * Trigger a compaction for the S-bucket
   * @param {Sbucket~flushCallback}
   */
  flush(callback) {
    this._db.compactRange(Sbucket.SIZE_START_KEY, Sbucket.SIZE_END_KEY,
                          callback);
  }
  /**
   * @callback Sbucket~flushCallback
   * @param {Error|null} error
   */

  /**
   * Increments the pending operations counter
   * @private
   */
  _incPendingOps() {
    this._pendingOperations++;
  }

  /**
   * Decrements the pending operations counter
   * @private
   * @fires Sbucket#idle
   */
  _decPendingOps() {
    this._pendingOperations--;
    setImmediate(() => this._checkIdleState());
  }

  /**
   * Emits the idle event if state is idle
   * @private
   */
  _emitIfStateIsIdle() {
    if (this._pendingOperations === 0) {
      this.emit('idle');
      return true;
    }

    return false;
  }

  /**
   * Checks the idle state and triggers a timeout for emitting the idle event
   * @private
   * @returns {Boolean} hasNoPendingOperations
   */
  _checkIdleState() {
    if (this._pendingOperations !== 0) {
      return false;
    }

    setTimeout(() => this._emitIfStateIsIdle(), constants.SBUCKET_IDLE);
    return true;
  }

}

module.exports = Sbucket;