This commit is contained in:
Sami Samhuri 2013-06-08 16:16:00 -07:00
parent e6e122cbf4
commit 57f3f5f4bc
4 changed files with 287 additions and 194 deletions

View file

@ -41,26 +41,54 @@ This is very much a work in progress.
You can use kwikemon as a library.
var kwikemon = require('kiwkemon');
var kwikemon = require('kiwkemon')
kwikemon.set('foo', 'bar', function(err) {
kwikemon.fetch('foo', function(err, text) {
console.log('foo = ' + text);
});
});
Change the redis connection:
kwikemon.fetchAll(function(err, monitors) {
Object.keys(monitors).forEach(function (name) {
console.log(name + ' = ' + monitors[name]);
});
});
kwikemon.redis([newRedis])
Configure:
kwikemon.keyPrefix = 'custom:';
kwikemon.defaultTTL = 3600; // one hour
#### Writing
kwikemon.set(name, text, function(err))
Monitors expire 1 day after the last time they were set by default. You can pass in any `ttl` you
want though.
// never expire
kwikemon.set('foo', 'bar', { ttl: 0 });
kwikemon.setex(name, text, 0)
Get a stream for writing to a monitor:
w = kwikemon.writer(name)
w.write('status\n')
With a custom TTL:
w = kwikemon.writer(name, ttl)
There's a 'monitor' event if you care when it sets text.
w.on('monitor', function(name, text));
#### Reading
kwikemon.exists(name, function(err, exists))
kwikemon.fetch(name, function(err, mon))
kwikemon.ttl(name, function(err, ttl))
kwikemon.list(function(err, names))
kwikemon.fetchAll(function(err, monitors))
kwikemon.count(function(err, n))
#### Deleting
kwikemon.remove(function(err))
kwikemon.clear(function(err))
kwikemon.sweep(function(err))
## Protocol

View file

@ -25,7 +25,7 @@ if (opt && opt[0] == '-') {
switch (opt) {
case '-c':
case '--clear':
kwikemon.removeAll(function(err) {
kwikemon.clear(function(err) {
process.exit(0);
});
break;
@ -110,9 +110,11 @@ else if (name && text) {
});
}
else if (name) {
process.stdin.pipe(kwikemon.createWriter(name));
process.stdin.on('end', function() {
process.exit(0);
kwikemon.writer(name, function(err, writer) {
process.stdin.pipe(writer);
process.stdin.on('end', function() {
process.exit(0);
});
});
}
else {

View file

@ -1,38 +1,10 @@
// Copyright 2013 Sami Samhuri
module.exports = {
// write
set: callbackOptional(set)
, writer: writer
// read
, exists: callbackOptional(exists)
, fetch: callbackOptional(fetch)
, fetchTTL: callbackOptional(fetchTTL)
, list: list
, fetchAll: fetchAll
, count: count
// remove
, remove: callbackOptional(remove)
, removeAll: removeAll
, sweep: sweep
// change redis client
, redis: setRedis
};
var async = require('async')
, redis = require('redis').createClient()
, redis = require('redis')
, LineEmitter = require('./line_emitter.js')
;
function setRedis(newRedis) {
if (redis) redis.end();
redis = newRedis;
}
// Make the callback argument of a function optional.
// If the callback is passed it will call the function
// normally. If the callback isn't given a function
@ -64,141 +36,230 @@ function callbackOptional(fn, ctx) {
};
}
function k(name) {
return 'kwikemon:monitor:' + name;
}
var kwikemon = module.exports = {
Monitor: Monitor
function exists(name, cb) {
redis.exists(k(name), function(err, exists) {
if (err) return cb(err);
cb(null, exists == 1);
});
}
, defaultTTL: 86400
, keyPrefix: 'kwikemon:'
// options:
// - ttl: time to live in seconds, <= 0 to never expire
function set(name, text, options, cb) {
if (typeof options == 'function') {
cb = options;
options = null;
// get or set the redis connection
, redis: function(newRedis) {
// set
if (newRedis) {
if (kwikemon._redis) kwikemon._redis.end();
kwikemon._redis = newRedis;
}
// get, init if necessary
else {
if (!kwikemon._redis) {
kwikemon._redis = redis.createClient();
}
return kwikemon._redis;
}
}
options = options || {};
var key = k(name)
, ttl = ('ttl' in options) ? options.ttl : 86400
, key: function(name) {
return kwikemon.keyPrefix + 'monitor:' + name;
}
, indexKey: function() {
return kwikemon.keyPrefix + 'monitors';
}
, count: function(cb) {
kwikemon.redis().scard(kwikemon.indexKey(), cb);
}
, sweep: function(cb) {
var keptNames = [];
kwikemon.redis().smembers(kwikemon.indexKey(), function(err, names) {
if (err) return cb(err);
var sweepers = names.map(function(name) {
return function(done) {
kwikemon.exists(name, function(err, exists) {
if (err) {
done();
}
// remove expired monitors
else if (!exists) {
new Monitor(name).remove(done);
}
else {
keptNames.push(name);
done();
}
});
};
});
async.parallel(sweepers, function(err, _) {
cb(err, keptNames);
});
});
}
, list: function(cb) {
kwikemon.sweep(function(err, names) {
if (err) return cb(err);
cb(null, names);
});
}
, fetchAll: function(cb) {
var monitors = {};
kwikemon.list(function(err, names) {
if (err) return cb(err);
var fetchers = names.sort().map(function(name) {
return function(done) {
kwikemon.fetch(name, function(err, mon) {
if (err) return done(err);
monitors[name] = mon;
done();
});
};
});
async.parallel(fetchers, function(err, _) {
if (err) return cb(err);
cb(null, monitors)
});
});
}
, clear: function(cb) {
kwikemon.list(function(err, names) {
if (err) return cb(err);
var removers = names.map(function(name) {
return function(done) {
new Monitor(name).remove(done);
};
});
async.parallel(removers, cb);
});
}
, exists: callbackOptional(function(name, cb) {
kwikemon.redis().exists(kwikemon.key(name), function(err, exists) {
if (err) return cb(err);
cb(null, exists == 1);
});
}, kwikemon)
, fetch: callbackOptional(function(name, cb) {
kwikemon.redis().hgetall(kwikemon.key(name), function(err, fields) {
if (err) return cb(err);
if (fields) {
cb(null, new Monitor(name, fields));
}
else {
cb(new Error('not found'));
}
});
}, kwikemon)
, ttl: callbackOptional(function(name, cb) {
new Monitor(name).ttl(cb);
}, kwikemon)
, set: callbackOptional(function(name, text, cb) {
return kwikemon.setex(name, text, null, cb);
}, kwikemon)
, setex: callbackOptional(function(name, text, ttl, cb) {
kwikemon.fetch(name, function(err, mon) {
if (err && err.message != 'not found') return cb(err);
mon = mon || new Monitor(name);
mon.text = text;
if (typeof ttl == 'number') {
mon.expire = ttl;
}
mon.save(cb);
});
}, kwikemon)
, writer: callbackOptional(function(name, ttl, cb) {
if (typeof ttl == 'function') {
cb = ttl;
ttl = null;
}
kwikemon.fetch(name, function(err, mon) {
if (err && err.message != 'not found') return cb(err);
mon = mon || new Monitor(name);
if (typeof ttl == 'number') {
mon.expire = ttl;
}
cb(null, mon.writer());
});
}, kwikemon)
, remove: callbackOptional(function(name, cb) {
new Monitor(name).remove(cb);
}, kwikemon)
};
function Monitor(name, fields) {
this.name = name;
if (fields) {
this.text = fields.text;
this.created = fields.created ? new Date(+fields.created) : null;
this.modified = fields.modified ? new Date(+fields.modified) : null;
this.updates = fields.updates || 0;
this.expire = typeof fields.expire == 'number' ? fields.expire : kwikemon.defaultTTL;
}
}
Monitor.prototype.key = function() {
return kwikemon.key(this.name);
};
Monitor.prototype.remove = function(cb) {
kwikemon.redis().multi()
.del(this.key())
.srem(kwikemon.indexKey(), this.name)
.exec(cb);
};
Monitor.prototype.update = function(text, cb) {
this.text = text;
this.save(cb);
};
Monitor.prototype.save = function(cb) {
var self = this
, key = this.key()
;
exists(name, function(err, exists) {
kwikemon.exists(this.name, function(err, exists) {
var fields = {
text: text
text: self.text
, expire: self.expire
, modified: Date.now()
}
, multi = redis.multi()
, multi = kwikemon.redis().multi()
;
if (!exists) {
fields.created = Date.now();
}
multi
.hmset(key, fields)
.hincrby(key, 'updates', 1);
if (ttl != null) {
multi.expire(key, ttl);
}
multi.sadd('kwikemon:monitors', name);
multi.exec(cb);
.hincrby(key, 'updates', 1)
.expire(key, self.expire)
.sadd(kwikemon.indexKey(), self.name)
.exec(cb);
});
}
};
function writer(name) {
var le = new LineEmitter();
Monitor.prototype.ttl = function(cb) {
kwikemon.redis().ttl(this.key(), cb);
};
Monitor.prototype.writer = function() {
var self = this
, le = new LineEmitter()
;
le.on('line', function(line) {
set(name, line, function(err) {
self.update(line, function(err) {
if (err) throw err;
le.emit('monitor', name, line);
le.emit('monitor', self.name, line);
});
});
return le;
}
function fetch(name, cb) {
redis.hgetall(k(name), cb);
}
function fetchTTL(name, cb) {
redis.ttl(k(name), cb);
}
function count(cb) {
redis.scard('kwikemon:monitors', cb);
}
function sweep(cb) {
var i = 0
, n
, checkIfDone = function() {
i += 1;
if (i == n) cb();
}
;
redis.smembers('kwikemon:monitors', function(err, names) {
if (err) return cb(err);
n = names.length;
if (n == 0) return cb();
names.forEach(function(name) {
exists(name, function(err, exists) {
if (err) {
// meh, ignore it
}
// remove expired monitors
else if (!exists) {
remove(name);
}
checkIfDone();
});
});
});
}
function list(cb) {
sweep(function(err) {
if (err) return cb(err);
redis.smembers('kwikemon:monitors', cb);
});
}
function fetchAll(cb) {
var monitors = {};
list(function(err, names) {
if (err) return cb(err);
var fetchers = names.sort().map(function(name) {
return function(done) {
fetch(name, function(err, text) {
if (err) return done(err);
monitors[name] = text;
done();
});
};
});
async.parallel(fetchers, function(err, _) {
if (err) return cb(err);
cb(null, monitors)
});
});
}
function remove(name, cb) {
redis.multi()
.del(k(name))
.srem('kwikemon:monitors', name)
.exec(cb);
}
function removeAll(cb) {
redis.smembers('kwikemon:monitors', function(err, names) {
if (err) return cb(err);
var multi = redis.multi();
names.forEach(function(name) {
multi.del(k(name));
multi.srem('kwikemon:monitors', name);
});
multi.exec(cb);
});
}
};

64
test.js
View file

@ -12,8 +12,8 @@ before(function(done) {
done();
});
});
beforeEach(kwikemon.removeAll);
after(kwikemon.removeAll);
beforeEach(kwikemon.clear);
after(kwikemon.clear);
describe("kwikemon", function() {
describe("#set", function() {
@ -36,17 +36,17 @@ describe("kwikemon", function() {
});
it("should set custom ttls", function(done) {
kwikemon.set('foo', 'bar', { ttl: 1 }, function(err) {
kwikemon.fetchTTL('foo', function(err, ttl) {
assert(ttl <= 1);
kwikemon.setex('foo', 'bar', 5, function(err) {
kwikemon.ttl('foo', function(err, ttl) {
assert(ttl <= 5);
done();
});
})
});
it("should not expire with a ttl of zero", function(done) {
kwikemon.set('foo', 'bar', { ttl: 0 }, function(err) {
kwikemon.fetchTTL('foo', function(err, ttl) {
kwikemon.setex('foo', 'bar', 0, function(err) {
kwikemon.ttl('foo', function(err, ttl) {
assert(ttl == -1);
done();
});
@ -54,8 +54,8 @@ describe("kwikemon", function() {
});
it("should not expire when ttl is < 0", function(done) {
kwikemon.set('foo', 'bar', { ttl: -1 }, function(err) {
kwikemon.fetchTTL('foo', function(err, ttl) {
kwikemon.setex('foo', 'bar', -1, function(err) {
kwikemon.ttl('foo', function(err, ttl) {
assert(ttl == -1);
done();
});
@ -65,30 +65,32 @@ describe("kwikemon", function() {
describe("#writer", function() {
it("should monitor each line of text written", function(done) {
var writer = kwikemon.writer('foo');
writer.once('monitor', function(name, text) {
assert(text == 'a');
kwikemon.writer('foo', function(err, writer) {
writer.once('monitor', function(name, text) {
assert(text == 'b');
done();
assert(text == 'a');
writer.once('monitor', function(name, text) {
assert(text == 'b');
done();
});
writer.write("b\n");
});
writer.write("b\n");
writer.write("a\n");
});
writer.write("a\n");
});
it("should only monitor complete lines of text", function(done) {
var writer = kwikemon.writer('foo');
writer.once('monitor', function(name, text) {
assert(text == 'complete');
kwikemon.writer('foo', function(err, writer) {
writer.once('monitor', function(name, text) {
assert(text == 'incomplete');
done();
assert(text == 'complete');
writer.once('monitor', function(name, text) {
assert(text == 'incomplete');
done();
});
writer.write("plete\n");
});
writer.write("plete\n");
writer.write("complete\n");
writer.write("incom");
});
writer.write("complete\n");
writer.write("incom");
});
});
@ -115,10 +117,10 @@ describe("kwikemon", function() {
});
});
describe("#fetchTTL", function() {
describe("#ttl", function() {
it("should fetch the last TTL set", function(done) {
kwikemon.set('foo', 'bar', { ttl: 300 }, function(err) {
kwikemon.fetchTTL('foo', function(err, ttl) {
kwikemon.setex('foo', 'bar', 300, function(err) {
kwikemon.ttl('foo', function(err, ttl) {
assert(ttl <= 300);
done();
});
@ -126,7 +128,7 @@ describe("kwikemon", function() {
});
it("should return -1 for non-existent monitors", function(done) {
kwikemon.fetchTTL('non-existent', function(err, ttl) {
kwikemon.ttl('non-existent', function(err, ttl) {
assert(ttl == -1);
done();
});
@ -187,12 +189,12 @@ describe("kwikemon", function() {
});
});
describe("#removeAll", function() {
it("should remove the named monitor", function(done) {
describe("#clear", function() {
it("should remove all monitors", function(done) {
async.series([
kwikemon.set('foo', 'bar')
, kwikemon.set('baz', 'quux')
, kwikemon.removeAll
, kwikemon.clear
, kwikemon.exists('foo')
, kwikemon.exists('baz')
, kwikemon.count