kwikemon/kwikemon.js
2013-06-08 16:16:00 -07:00

265 lines
6.3 KiB
JavaScript

// Copyright 2013 Sami Samhuri
var async = require('async')
, redis = require('redis')
, LineEmitter = require('./line_emitter.js')
;
// Make the callback argument of a function optional.
// If the callback is passed it will call the function
// normally. If the callback isn't given a function
// that accepts the callback is returned, with the
// rest of the arguments fixed (like bind).
//
// function fetch(id, cb) { db.fetch(id, cb); }
// fetch = callbackOptional(fetch);
//
// function print(err, x) { if (err) throw err; console.log(x); }
//
// fetch(1, print);
//
// var fetch1 = fetch(1);
// fetch1(print);
function callbackOptional(fn, ctx) {
return function() {
var args = Array.prototype.slice.call(arguments);
var cb = args[args.length - 1];
if (typeof cb == 'function') {
fn.apply(ctx, arguments);
}
else {
return function(cb) {
args.push(cb);
fn.apply(ctx, args);
};
}
};
}
var kwikemon = module.exports = {
Monitor: Monitor
, defaultTTL: 86400
, keyPrefix: 'kwikemon:'
// get or set the redis connection
, redis: function(newRedis) {
// set
if (newRedis) {
if (kwikemon._redis) kwikemon._redis.end();
kwikemon._redis = newRedis;
}
// get, init if necessary
else {
if (!kwikemon._redis) {
kwikemon._redis = redis.createClient();
}
return kwikemon._redis;
}
}
, key: function(name) {
return kwikemon.keyPrefix + 'monitor:' + name;
}
, indexKey: function() {
return kwikemon.keyPrefix + 'monitors';
}
, count: function(cb) {
kwikemon.redis().scard(kwikemon.indexKey(), cb);
}
, sweep: function(cb) {
var keptNames = [];
kwikemon.redis().smembers(kwikemon.indexKey(), function(err, names) {
if (err) return cb(err);
var sweepers = names.map(function(name) {
return function(done) {
kwikemon.exists(name, function(err, exists) {
if (err) {
done();
}
// remove expired monitors
else if (!exists) {
new Monitor(name).remove(done);
}
else {
keptNames.push(name);
done();
}
});
};
});
async.parallel(sweepers, function(err, _) {
cb(err, keptNames);
});
});
}
, list: function(cb) {
kwikemon.sweep(function(err, names) {
if (err) return cb(err);
cb(null, names);
});
}
, fetchAll: function(cb) {
var monitors = {};
kwikemon.list(function(err, names) {
if (err) return cb(err);
var fetchers = names.sort().map(function(name) {
return function(done) {
kwikemon.fetch(name, function(err, mon) {
if (err) return done(err);
monitors[name] = mon;
done();
});
};
});
async.parallel(fetchers, function(err, _) {
if (err) return cb(err);
cb(null, monitors)
});
});
}
, clear: function(cb) {
kwikemon.list(function(err, names) {
if (err) return cb(err);
var removers = names.map(function(name) {
return function(done) {
new Monitor(name).remove(done);
};
});
async.parallel(removers, cb);
});
}
, exists: callbackOptional(function(name, cb) {
kwikemon.redis().exists(kwikemon.key(name), function(err, exists) {
if (err) return cb(err);
cb(null, exists == 1);
});
}, kwikemon)
, fetch: callbackOptional(function(name, cb) {
kwikemon.redis().hgetall(kwikemon.key(name), function(err, fields) {
if (err) return cb(err);
if (fields) {
cb(null, new Monitor(name, fields));
}
else {
cb(new Error('not found'));
}
});
}, kwikemon)
, ttl: callbackOptional(function(name, cb) {
new Monitor(name).ttl(cb);
}, kwikemon)
, set: callbackOptional(function(name, text, cb) {
return kwikemon.setex(name, text, null, cb);
}, kwikemon)
, setex: callbackOptional(function(name, text, ttl, cb) {
kwikemon.fetch(name, function(err, mon) {
if (err && err.message != 'not found') return cb(err);
mon = mon || new Monitor(name);
mon.text = text;
if (typeof ttl == 'number') {
mon.expire = ttl;
}
mon.save(cb);
});
}, kwikemon)
, writer: callbackOptional(function(name, ttl, cb) {
if (typeof ttl == 'function') {
cb = ttl;
ttl = null;
}
kwikemon.fetch(name, function(err, mon) {
if (err && err.message != 'not found') return cb(err);
mon = mon || new Monitor(name);
if (typeof ttl == 'number') {
mon.expire = ttl;
}
cb(null, mon.writer());
});
}, kwikemon)
, remove: callbackOptional(function(name, cb) {
new Monitor(name).remove(cb);
}, kwikemon)
};
function Monitor(name, fields) {
this.name = name;
if (fields) {
this.text = fields.text;
this.created = fields.created ? new Date(+fields.created) : null;
this.modified = fields.modified ? new Date(+fields.modified) : null;
this.updates = fields.updates || 0;
this.expire = typeof fields.expire == 'number' ? fields.expire : kwikemon.defaultTTL;
}
}
Monitor.prototype.key = function() {
return kwikemon.key(this.name);
};
Monitor.prototype.remove = function(cb) {
kwikemon.redis().multi()
.del(this.key())
.srem(kwikemon.indexKey(), this.name)
.exec(cb);
};
Monitor.prototype.update = function(text, cb) {
this.text = text;
this.save(cb);
};
Monitor.prototype.save = function(cb) {
var self = this
, key = this.key()
;
kwikemon.exists(this.name, function(err, exists) {
var fields = {
text: self.text
, expire: self.expire
, modified: Date.now()
}
, multi = kwikemon.redis().multi()
;
if (!exists) {
fields.created = Date.now();
}
multi
.hmset(key, fields)
.hincrby(key, 'updates', 1)
.expire(key, self.expire)
.sadd(kwikemon.indexKey(), self.name)
.exec(cb);
});
};
Monitor.prototype.ttl = function(cb) {
kwikemon.redis().ttl(this.key(), cb);
};
Monitor.prototype.writer = function() {
var self = this
, le = new LineEmitter()
;
le.on('line', function(line) {
self.update(line, function(err) {
if (err) throw err;
le.emit('monitor', self.name, line);
});
});
return le;
};