mirror of
https://github.com/samsonjs/kwikemon.git
synced 2026-03-25 09:05:51 +00:00
265 lines
6.3 KiB
JavaScript
265 lines
6.3 KiB
JavaScript
// Copyright 2013 Sami Samhuri
|
|
|
|
var async = require('async')
|
|
, redis = require('redis')
|
|
, LineEmitter = require('./line_emitter.js')
|
|
;
|
|
|
|
// Make the callback argument of a function optional.
|
|
// If the callback is passed it will call the function
|
|
// normally. If the callback isn't given a function
|
|
// that accepts the callback is returned, with the
|
|
// rest of the arguments fixed (like bind).
|
|
//
|
|
// function fetch(id, cb) { db.fetch(id, cb); }
|
|
// fetch = callbackOptional(fetch);
|
|
//
|
|
// function print(err, x) { if (err) throw err; console.log(x); }
|
|
//
|
|
// fetch(1, print);
|
|
//
|
|
// var fetch1 = fetch(1);
|
|
// fetch1(print);
|
|
function callbackOptional(fn, ctx) {
|
|
return function() {
|
|
var args = Array.prototype.slice.call(arguments);
|
|
var cb = args[args.length - 1];
|
|
if (typeof cb == 'function') {
|
|
fn.apply(ctx, arguments);
|
|
}
|
|
else {
|
|
return function(cb) {
|
|
args.push(cb);
|
|
fn.apply(ctx, args);
|
|
};
|
|
}
|
|
};
|
|
}
|
|
|
|
var kwikemon = module.exports = {
|
|
Monitor: Monitor
|
|
|
|
, defaultTTL: 86400
|
|
, keyPrefix: 'kwikemon:'
|
|
|
|
// get or set the redis connection
|
|
, redis: function(newRedis) {
|
|
// set
|
|
if (newRedis) {
|
|
if (kwikemon._redis) kwikemon._redis.end();
|
|
kwikemon._redis = newRedis;
|
|
}
|
|
// get, init if necessary
|
|
else {
|
|
if (!kwikemon._redis) {
|
|
kwikemon._redis = redis.createClient();
|
|
}
|
|
return kwikemon._redis;
|
|
}
|
|
}
|
|
|
|
, key: function(name) {
|
|
return kwikemon.keyPrefix + 'monitor:' + name;
|
|
}
|
|
|
|
, indexKey: function() {
|
|
return kwikemon.keyPrefix + 'monitors';
|
|
}
|
|
|
|
, count: function(cb) {
|
|
kwikemon.redis().scard(kwikemon.indexKey(), cb);
|
|
}
|
|
|
|
, sweep: function(cb) {
|
|
var keptNames = [];
|
|
kwikemon.redis().smembers(kwikemon.indexKey(), function(err, names) {
|
|
if (err) return cb(err);
|
|
var sweepers = names.map(function(name) {
|
|
return function(done) {
|
|
kwikemon.exists(name, function(err, exists) {
|
|
if (err) {
|
|
done();
|
|
}
|
|
// remove expired monitors
|
|
else if (!exists) {
|
|
new Monitor(name).remove(done);
|
|
}
|
|
else {
|
|
keptNames.push(name);
|
|
done();
|
|
}
|
|
});
|
|
};
|
|
});
|
|
async.parallel(sweepers, function(err, _) {
|
|
cb(err, keptNames);
|
|
});
|
|
});
|
|
}
|
|
|
|
, list: function(cb) {
|
|
kwikemon.sweep(function(err, names) {
|
|
if (err) return cb(err);
|
|
cb(null, names);
|
|
});
|
|
}
|
|
|
|
, fetchAll: function(cb) {
|
|
var monitors = {};
|
|
kwikemon.list(function(err, names) {
|
|
if (err) return cb(err);
|
|
var fetchers = names.sort().map(function(name) {
|
|
return function(done) {
|
|
kwikemon.fetch(name, function(err, mon) {
|
|
if (err) return done(err);
|
|
monitors[name] = mon;
|
|
done();
|
|
});
|
|
};
|
|
});
|
|
async.parallel(fetchers, function(err, _) {
|
|
if (err) return cb(err);
|
|
cb(null, monitors)
|
|
});
|
|
});
|
|
}
|
|
|
|
, clear: function(cb) {
|
|
kwikemon.list(function(err, names) {
|
|
if (err) return cb(err);
|
|
var removers = names.map(function(name) {
|
|
return function(done) {
|
|
new Monitor(name).remove(done);
|
|
};
|
|
});
|
|
async.parallel(removers, cb);
|
|
});
|
|
}
|
|
|
|
, exists: callbackOptional(function(name, cb) {
|
|
kwikemon.redis().exists(kwikemon.key(name), function(err, exists) {
|
|
if (err) return cb(err);
|
|
cb(null, exists == 1);
|
|
});
|
|
}, kwikemon)
|
|
|
|
, fetch: callbackOptional(function(name, cb) {
|
|
kwikemon.redis().hgetall(kwikemon.key(name), function(err, fields) {
|
|
if (err) return cb(err);
|
|
if (fields) {
|
|
cb(null, new Monitor(name, fields));
|
|
}
|
|
else {
|
|
cb(new Error('not found'));
|
|
}
|
|
});
|
|
}, kwikemon)
|
|
|
|
, ttl: callbackOptional(function(name, cb) {
|
|
new Monitor(name).ttl(cb);
|
|
}, kwikemon)
|
|
|
|
, set: callbackOptional(function(name, text, cb) {
|
|
return kwikemon.setex(name, text, null, cb);
|
|
}, kwikemon)
|
|
|
|
, setex: callbackOptional(function(name, text, ttl, cb) {
|
|
kwikemon.fetch(name, function(err, mon) {
|
|
if (err && err.message != 'not found') return cb(err);
|
|
mon = mon || new Monitor(name);
|
|
mon.text = text;
|
|
if (typeof ttl == 'number') {
|
|
mon.expire = ttl;
|
|
}
|
|
mon.save(cb);
|
|
});
|
|
}, kwikemon)
|
|
|
|
, writer: callbackOptional(function(name, ttl, cb) {
|
|
if (typeof ttl == 'function') {
|
|
cb = ttl;
|
|
ttl = null;
|
|
}
|
|
kwikemon.fetch(name, function(err, mon) {
|
|
if (err && err.message != 'not found') return cb(err);
|
|
mon = mon || new Monitor(name);
|
|
if (typeof ttl == 'number') {
|
|
mon.expire = ttl;
|
|
}
|
|
cb(null, mon.writer());
|
|
});
|
|
}, kwikemon)
|
|
|
|
, remove: callbackOptional(function(name, cb) {
|
|
new Monitor(name).remove(cb);
|
|
}, kwikemon)
|
|
|
|
};
|
|
|
|
function Monitor(name, fields) {
|
|
this.name = name;
|
|
if (fields) {
|
|
this.text = fields.text;
|
|
this.created = fields.created ? new Date(+fields.created) : null;
|
|
this.modified = fields.modified ? new Date(+fields.modified) : null;
|
|
this.updates = fields.updates || 0;
|
|
this.expire = typeof fields.expire == 'number' ? fields.expire : kwikemon.defaultTTL;
|
|
}
|
|
}
|
|
|
|
Monitor.prototype.key = function() {
|
|
return kwikemon.key(this.name);
|
|
};
|
|
|
|
Monitor.prototype.remove = function(cb) {
|
|
kwikemon.redis().multi()
|
|
.del(this.key())
|
|
.srem(kwikemon.indexKey(), this.name)
|
|
.exec(cb);
|
|
};
|
|
|
|
Monitor.prototype.update = function(text, cb) {
|
|
this.text = text;
|
|
this.save(cb);
|
|
};
|
|
|
|
Monitor.prototype.save = function(cb) {
|
|
var self = this
|
|
, key = this.key()
|
|
;
|
|
kwikemon.exists(this.name, function(err, exists) {
|
|
var fields = {
|
|
text: self.text
|
|
, expire: self.expire
|
|
, modified: Date.now()
|
|
}
|
|
, multi = kwikemon.redis().multi()
|
|
;
|
|
if (!exists) {
|
|
fields.created = Date.now();
|
|
}
|
|
multi
|
|
.hmset(key, fields)
|
|
.hincrby(key, 'updates', 1)
|
|
.expire(key, self.expire)
|
|
.sadd(kwikemon.indexKey(), self.name)
|
|
.exec(cb);
|
|
});
|
|
};
|
|
|
|
Monitor.prototype.ttl = function(cb) {
|
|
kwikemon.redis().ttl(this.key(), cb);
|
|
};
|
|
|
|
Monitor.prototype.writer = function() {
|
|
var self = this
|
|
, le = new LineEmitter()
|
|
;
|
|
le.on('line', function(line) {
|
|
self.update(line, function(err) {
|
|
if (err) throw err;
|
|
le.emit('monitor', self.name, line);
|
|
});
|
|
});
|
|
return le;
|
|
};
|