Source: counter.js

'use strict';

const _ = require('lodash'),
  moment = require('moment'),
  timeGranularities = require('./constants').timeGranularities,
  utils = require('./utils'),
  lua = require('./lua');

// The default expiration times aim at having less than 800 counters for a
// given counter key (event) since the counter keys are stored with a specific
// timestamp. For example, second-based counters expire after 10 minutes which
// means that there will be 600 counter keys in the worst-case for a single
// event.
const defaultExpiration = {
  total: -1,
  year: -1,
  month:  10 * 365 * 24 * 60 * 60, // 10 years = 120 counters worst-case
  day:  2 * 365 * 24 * 60 * 60,    // 2 years = 730 counters worst-case
  hour:  31 * 24 * 60 * 60,        // 31 days = 744 counters worst-case
  minute:  12 * 60 * 60,           // 12 hours = 720 counters worst-case
  second:  10 * 60                 // 10 minutes = 600 counters worst-case
};
// BTW, good luck keeping your Redis server around for 10 years :-)

// Translate the "nice looking expiration times above to "real" keys that
// correspond to time granularities.
_.keys(defaultExpiration).forEach(key => {
  const newKey = timeGranularities[key];
  const value = defaultExpiration[key];
  delete defaultExpiration[key];
  defaultExpiration[newKey] = value;
});

const defaults = {
  namespace: 'c', // Short for counter.
  timeGranularity: timeGranularities.none,
  expireKeys: true,
  expiration: defaultExpiration
};

const momentFormat = 'YYYYMMDDHHmmss';

/**
 * Convert the given granularity level into the internal representation (a
 * number).
 * @returns {module:constants~timeGranularities}
 * @private
 */
const parseTimeGranularity = timeGranularity => {
  timeGranularity = timeGranularities[timeGranularity];
  if (timeGranularity) return timeGranularity;
  else return timeGranularities.none;
};

/**
 * Return a list of Redis keys that are associated with this counter at the
 * current point in time and will be written to Redis.
 * @param {String} baseKey - The base key to use.
 * @param {String} formattedTimestamp - A formatted timestamp string.
 * @param {module:constants~timeGranularities} timeGranularity - The
 *   granularity level to return keys for.
 * @returns {Array} Contains the counter keys for the given timestamp.
 * @private
 */
const getKeys = (baseKey, formattedTimestamp, timeGranularity) => {
  // Always add the baseKey itself.
  const keys = [baseKey];

  // If no time granularity is chosen, the timestamped keys will not be used so
  // just return the default key.
  if (timeGranularity === timeGranularities.none) {
    return keys;
  }

  for (let i = 1; i <= timeGranularity; i++) {
    keys.push(`${baseKey}:${formattedTimestamp.slice(0, i * 2 + 2)}`);
  }

  return keys;
};

/**
 * Creates a function that parses a list of Redis results and matches them up
 * with the given keyRange
 * @param {array} keyRange - The list of keys to match with the results.
 * @returns {function}
 * @private
 */
const createRangeParser = keyRange =>
  results => _.zipObject(keyRange, utils.parseIntArray(results));

/**
 * Creates a function that parses a list of Redis results and returns the total.
 * @returns {function}
 * @private
 */
const createRangeTotalParser = () =>
  results => _.sum(utils.parseIntArray(results));

const incrSingle = (client, key, amount, eventObj, ttl, cb) => {
  if (eventObj) {
    key += ':z';

    if (ttl > 0) {
      if (cb) client.eval(lua.zincrbyExpire, 1, key, amount, eventObj, ttl, cb);
      else client.eval(lua.zincrbyExpire, 1, key, amount, eventObj, ttl);
    } else {
      if (cb) client.zincrby(key, amount, eventObj, cb);
      else client.zincrby(key, amount, eventObj);
    }

  } else { // No event object

    if (ttl > 0) {
      if (cb) client.eval(lua.incrbyExpire, 1, key, amount, ttl, cb);
      else client.eval(lua.incrbyExpire, 1, key, amount, ttl);
    } else {
      if (cb) client.incrby(key, amount, cb);
      else client.incrby(key, amount);
    }
  }
};

/**
 * Parse a rank result from redis
 * @param  {array} rank In this format: [ 'foo', '39', 'bar', '13' ]
 * @return {object} In this format: [ { foo: 39 }, { bar: 13 } ]
 * @private
 */
const rankParser = rank => {
  // groups the array for each second row
  rank = _.groupBy(rank, (row, index) => Math.floor(index / 2));

  // transform the object into an array
  // and the values to integer
  return _.toArray(rank)
    .map(row => {
      const _row = {};
      _row[row[0]] = parseInt(row[1], 10);
      return _row;
    });
};

/**
 * A timestamped event counter.
 *
 * The timestamped counter stores one or more Redis keys based on the given
 * event name and time granularity appends a timestamp to the key before
 * storing the key in Redis. The counter can then report an overall aggregated
 * count or a specific count for a time range, depending on the chosen
 * granularity of the timestamp.
 *
 * If no time granularity is chosen at creation time, the counter will work
 * just like a global counter for the given key, i.e. events will not be
 * timestamped.
 *
 * **Notice**: The constructor for this class is usually not called directly
 * but through the {@link RedisMetrics#counter} function.
 *
 * @param {RedisMetrics} metrics - An instance of a RedisMetrics client.
 * @param {string} key - The base key to use for this counter.
 * @param {Object} options - The options to use for this counter. The available
 *   options are specified in {@link RedisMetrics#counter}.
 * @class
 */
class TimestampedCounter {
  constructor(metrics, key, options) {
    this.options = options || {};
    _.defaults(this.options, _.cloneDeep(defaults));

    this.metrics = metrics;
    this.key = this.options.namespace + ':' + key;

    // Translate the expiration keys of the options.
    _.keys(this.options.expiration).forEach(key => {
      const newKey = timeGranularities[key];
      const value = this.options.expiration[key];
      delete this.options.expiration[key];
      this.options.expiration[newKey] = value;
    });

    this.options.timeGranularity = parseTimeGranularity(this.options.timeGranularity);
  }

  /**
   * Return a list of Redis keys that are associated with this counter at the
   * current point in time (default) and will be written to Redis.
   * @param {Moment} [time=now] - A specific time to get keys for.
   * @param {module:constants~timeGranularities} [timeGranularity] - The
   *   granularity level to return keys for. The default is the granularity from
   *   the options.
   * @returns {Array}
   */
  getKeys(time, timeGranularity) {
    return getKeys(
      this.key,
      (time || moment.utc()).format(momentFormat),
      parseTimeGranularity(timeGranularity) || this.options.timeGranularity
    );
  }

  /**
   * Finds the configured time to live for the given key.
   * @param {string} key - The full key (including timestamp) for the key to
   *   determine the ttl for.
   * @returns {number} Number of seconds that the key should live.
   */
  getKeyTTL(key) {
    if (!this.options.expireKeys) return -1;

    const timePart = key.replace(this.key, '').split(':')[1] || '';
    let timeGranularity = timeGranularities.none;
    switch (timePart.length) {
      case 4:
        timeGranularity = timeGranularities.year;
        break;
      case 6:
        timeGranularity = timeGranularities.month;
        break;
      case 8:
        timeGranularity = timeGranularities.day;
        break;
      case 10:
        timeGranularity = timeGranularities.hour;
        break;
      case 12:
        timeGranularity = timeGranularities.minute;
        break;
      case 14:
        timeGranularity = timeGranularities.second;
        break;
    }
    let ttl = this.options.expiration[timeGranularity];
    if (typeof ttl === 'undefined') ttl = defaultExpiration[timeGranularity];
    return ttl;
  }

  /**
   * Increments this counter with 1.
   *
   * For some use cases, it makes sense to pass in an event object to get more
   * precise statistics for a specific event. For example, when counting page
   * views on a page, it makes sense to increment a counter per specific page.
   * For this use case, the eventObj parameter is a good fit.
   *
   * @param {Object|string} [eventObj] - Extra event information used to
   *   determine what counter to increment.
   * @param {function} [callback] - Optional callback.
   * @returns {Promise} A promise that resolves to the results from Redis. Can
   *   be used instead of the callback function.
   * @since 0.1.0
   */
  incr(eventObj, callback) {
    return this.incrby(1, eventObj, callback);
  }

  /**
   * Increments this counter with the given amount.
   *
   * @param {number} amount - The amount to increment with.
   * @param {Object|string} [eventObj] - Extra event information used to
   *   determine what counter to increment. See {@link TimestampedCounter#incr}.
   * @param {function} [callback] - Optional callback.
   * @returns {Promise} A promise that resolves to the results from Redis. Can
   *   be used instead of the callback function.
   * @see {@link TimestampedCounter#incr}
   * @since 0.2.0
   */
  incrby(amount, eventObj, callback) {
    // The event object is optional so it might be a callback.
    if (_.isFunction(eventObj)) {
      callback = eventObj;
      eventObj = null;
    }
    if (eventObj) eventObj = String(eventObj);
    const deferred = utils.defer();
    const cb = utils.createRedisCallback(deferred, callback);
    this._incrby(amount, eventObj, cb);
    return deferred.promise;
  }

  _incrby(amount, eventObj, cb) {
    const keys = this.getKeys();
    // Optimize for the case where there is only a single key to increment.
    if (keys.length === 1) {
      incrSingle(this.metrics.client, keys[0], amount, eventObj, this.getKeyTTL(keys[0]), cb);
    } else {
      const multi = this.metrics.client.multi();
      keys.forEach(key => {
        incrSingle(multi, key, amount, eventObj, this.getKeyTTL(key));
      });
      multi.exec(cb);
    }
  }

  /**
   * Returns the current count for this counter.
   *
   * If a specific time granularity is given, the value returned is the current
   * value at the given granularity level. Effectively, this provides a single
   * answer to questions such as "what is the count for the current day".
   *
   * **Notice**: Counts cannot be returned for a given time granularity if it was
   * not incremented at this granularity level in the first place.
   *
   * @example
   * myCounter.count((err, result) => {
   *   console.log(result); // Outputs the global count
   * });
   * @example
   * myCounter.count('year', (err, result) => {
   *   console.log(result); // Outputs the count for the current year
   * });
   * @example
   * myCounter.count('year', '/foo.html', (err, result) => {
   *   // Outputs the count for the current year for the event object '/foo.html'
   *   console.log(result);
   * });
   *
   * @param {module:constants~timeGranularities} [timeGranularity='total'] - The
   *   granularity level to report the count for.
   * @param {string|object} [eventObj] - The event object. See
   *   {@link TimestampedCounter#incr} for more info on event objects.
   * @param {function} [callback] - Optional callback.
   * @returns {Promise} A promise that resolves to the result from Redis. Can
   *   be used instead of the callback function.
   * @since 0.1.0
   */
  count(timeGranularity, eventObj, callback) {
    const args = Array.prototype.slice.call(arguments);

    // Last argument is callback;
    callback = typeof args[args.length - 1] === 'function' ? args.pop() : null;

    // Event object requires that the time granularity is specified, otherwise we
    // can't reliably distinguish between them because both the eventObj and time
    // granularity can be strings. I miss Python.
    eventObj = args.length > 1 ? args.pop() : null;

    // Still any arguments left? That's a time granularity.
    timeGranularity = args.length > 0 ? args.pop() : 'none';
    timeGranularity = parseTimeGranularity(timeGranularity);

    const deferred = utils.defer();
    const cb = utils.createRedisCallback(deferred, callback, utils.parseInt);
    this._count(timeGranularity, eventObj, cb);
    return deferred.promise;
  }

  _count(timeGranularity, eventObj, cb) {
    const theKey = this.getKeys()[timeGranularity];
    if (eventObj) {
      this.metrics.client.zscore(theKey + ':z', eventObj, cb);
    } else {
      this.metrics.client.get(theKey, cb);
    }
  }

  /**
   * Returns an object mapping timestamps to counts in the given time range at a
   * specific time granularity level.
   *
   * Notice: This function does not make sense for the "none" time granularity.
   *
   * @param {module:constants~timeGranularities} timeGranularity - The
   *   granularity level to report the count for.
   * @param {Date|Object|string|number} startDate - Start date for the range
   *   (inclusive). Accepts the same argument as the constructor of a
   *   {@link http://momentjs.com/|moment} date.
   * @param {Date|Object|string|number} [endDate=new Date()] - End date for the
   *   range (inclusive). Accepts the same arguments as the constructor of a
   *   {@link http://momentjs.com/|moment} date.
   * @param {string|object} [eventObj] - The event object. See
   *   {@link TimestampedCounter#incr} for more info on event objects.
   * @param {function} [callback] - Optional callback.
   * @returns {Promise} A promise that resolves to the result from Redis. Can
   *   be used instead of the callback function.
   * @since 0.1.0
   */
  countRange(timeGranularity, startDate, endDate, eventObj, callback) {
    timeGranularity = parseTimeGranularity(timeGranularity);
    if (_.isFunction(eventObj)) {
      callback = eventObj;
      eventObj = null;
    } else if (_.isFunction(endDate)) {
      callback = endDate;
      endDate = moment.utc();
    } else {
      endDate = endDate || moment.utc();
    }
    if (eventObj) eventObj = String(eventObj);

    // Save the report time granularity because it might change.
    const reportTimeGranularity = timeGranularity;

    // If the range granularity is total, fall back to the granularity specified
    // at the counter level and then add the numbers together when parsing the
    // result.
    if (timeGranularity === timeGranularities.total) {
      timeGranularity = this.options.timeGranularity;

      // If the rangeGranularity is still total, it does not make sense to report
      // a range for the counter and we throw an error.
      if (timeGranularity === timeGranularities.total) {
        throw new Error('total granularity not supported for this counter');
      }
    }

    const momentRange = utils.momentRange(startDate, endDate, timeGranularity);
    const keyRange = [];
    const momentKeyRange = [];

    // Create the range of keys to fetch from Redis as well as the keys to use in
    // the returned data object.
    momentRange.forEach(m => {
      // Redis key range
      const mKeyFormat = m.format(momentFormat).slice(0, timeGranularity * 2 + 2);
      keyRange.push(`${this.key}:${mKeyFormat}`);

      // Timestamp range. Use ISO format for easy parsing back to a timestamp.
      momentKeyRange.push(m.format());
    });

    const deferred = utils.defer();
    const parser = reportTimeGranularity === timeGranularities.total ?
      createRangeTotalParser() : createRangeParser(momentKeyRange);
    const cb = utils.createRedisCallback(deferred, callback, parser);

    this._countRange(keyRange, eventObj, cb);

    return deferred.promise;
  }

  _countRange(keys, eventObj, cb) {
    if (eventObj) {
      const multi = this.metrics.client.multi();
      keys.forEach(key => multi.zscore(key + ':z', eventObj));
      multi.exec(cb);
    } else {
      this.metrics.client.mget(keys, cb);
    }
  }

  /**
   * Returns the current top elements for this counter.
   *
   * If a specific time granularity is given, the value returned is the current
   * value at the given granularity level. Effectively, this provides a single
   * answer to questions such as "what is the rank for the current day".
   *
   * @example
   * myCounter.top((err, result) => {
   *   console.log(result); // Outputs the global rank
   * });
   * @example
   * myCounter.top('year', (err, result) => {
   *   console.log(result); // Outputs the rank for the current year
   * });
   *
   * @param {module:constants~timeGranularities} [timeGranularity='total'] - The
   *   granularity level to report the rank for.
   * @param {string} [direction=desc] - Optional sort direction, can be "asc" or "desc"
   * @param {integer} [startingAt=0] - Optional starting row.
   * @param {integer} [limit=-1] - Optional number of results to return.
   * @param {function} [callback] - Optional callback.
   * @returns {Promise} A promise that resolves to the result from Redis. Can
   *   be used instead of the callback function.
   * @since 0.1.1
   */
  top(timeGranularity, direction, startingAt, limit, callback) {
    const args = Array.prototype.slice.call(arguments);

    // Last argument is callback;
    callback = typeof args[args.length - 1] === 'function' ? args.pop() : null;

    limit = args.length > 3 ? args.pop() : -1;
    startingAt = args.length > 2 ? args.pop() : 0;
    direction = args.length > 1 ? args.pop() : 'desc';

    if (['asc', 'desc'].indexOf(direction) === -1) {
      throw new Error(
        'The direction parameter is expected to be one between ' +
        '"asc" or "desc", got "' + direction + '".'
      );
    }

    timeGranularity = parseTimeGranularity(timeGranularity);

    const deferred = utils.defer();
    const cb = utils.createRedisCallback(deferred, callback, rankParser);
    this._top(timeGranularity, direction, startingAt, limit, cb);
    return deferred.promise;
  }

  _top(timeGranularity, direction, startingAt, limit, cb) {
    const theKey = this.getKeys()[timeGranularity];

    if (direction === 'asc') {
      return this.metrics.client.zrange(
        theKey + ':z',
        startingAt,
        limit,
        'WITHSCORES',
        cb
      );
    }

    this.metrics.client.zrevrange(
      theKey + ':z',
      startingAt,
      limit,
      'WITHSCORES',
      cb
    );
  }

  /**
   * Permanently remove event objects for this counter so only the top-N
   * elements remain in either descending (the default) or ascending order.
   *
   * The event objects are removed with Redis' removal by rank for sorted sets
   * ({@link https://redis.io/commands/zremrangebyrank|ZREMRANGEBYRANK}).
   *
   * The removal happens at each granularity level and only supports daily
   * granularity and above (month, year, total) to optimize for the number of
   * Redis keys to consider. It removes data going 5 years back in time.
   *
   * If the function is used too often, there is a risk that the top-N elements
   * will stay the same forever, because lower-ranking elements get removed
   * before their scores have a change to increase.
   *
   * Therefore, the function should only be used to try and reclaim space for
   * big counters with a lot of event objects, and only rarely used.
   *
   * @param {string} [keepDirection=desc] - Sort direction for the top-N
   * elements to keep, can be "asc" or "desc".
   * @param {integer} [limit=1000] - Number of results to keep.
   * @param {function} [callback] - Callback
   * @returns {Promise} Resolves when the values have been removed.
   * @example
   * // Keeps the top-5 elements with highest score
   * myCounter.trimEvents('desc', 5)
   * @example
   * // Keeps the top-5 elements with lowest score
   * myCounter.trimEvents('asc', 5)
   * @since 1.1.0
   */
  trimEvents(keepDirection, limit, callback) {
    const args = Array.prototype.slice.call(arguments);
    callback = typeof args[args.length - 1] === 'function' ? args.pop() : null;
    limit = args.length > 1 ? args.pop() : 1000;
    keepDirection = args.length > 0 ? args.pop() : 'desc';

    if (['asc', 'desc'].indexOf(keepDirection) === -1) {
      throw new Error(
        'The keepDirection parameter is expected to be one between ' +
        '"asc" or "desc", got "' + keepDirection + '".'
      );
    }

    // If we want to keep top-5 lowest scores, remove rank 5 to -1
    // If we want to keep top-5 highest scores, remove rank 0 to -(5 + 1)
    const startIndex = keepDirection === 'asc' ? limit : 0;
    const endIndex = keepDirection === 'asc' ? -1 : -(limit + 1);

    const deferred = utils.defer();
    const cb = utils.createRedisCallback(deferred, callback);

    // If the counter does not have a time granularity, our job is easy.
    if (this.options.timeGranularity === timeGranularities.none) {
      this.metrics.client.zremrangebyrank(`${this.key}:z`, startIndex, endIndex, cb);
      return deferred.promise;
    }

    // Otherwise trim keys for the last five years.
    const currentTime = moment.utc().subtract(5, 'year')
      .startOf('year')
      .startOf('day');
    const end = moment.utc();
    const keySet = new Set();
    while (currentTime.isBefore(end)) {
      for (const key of this.getKeys(currentTime, timeGranularities.day)) {
        keySet.add(key);
      }

      // Mutates the time.
      currentTime.add(1, 'day');
    }

    // Each key is mapped to a function that returns a Promise.
    const mappedPromiseFunctions = Array.from(keySet).map(key => {
      return () => {
        const subDeferred = utils.defer();
        const subCallback = utils.createRedisCallback(subDeferred);
        this.metrics.client.zremrangebyrank(`${key}:z`, startIndex, endIndex, subCallback);
        return subDeferred.promise;
      };
    });

    // Each function is executed sequentially using reduce.
    mappedPromiseFunctions.reduce((promise, f) => {
      return promise.then(totalRemoved => f().then(removed => totalRemoved + removed));
    }, Promise.resolve(0))
    .then(totalRemoved => cb(null, totalRemoved))
    .catch(err => cb(err));

    return deferred.promise;
  }
}

module.exports = TimestampedCounter;