From 57f3f5f4bc79f3b74d6ec172120ceecd2a1bb653 Mon Sep 17 00:00:00 2001 From: Sami Samhuri Date: Sat, 8 Jun 2013 16:16:00 -0700 Subject: [PATCH] wip --- Readme.md | 52 ++++++-- bin/kwikemon | 10 +- kwikemon.js | 355 ++++++++++++++++++++++++++++++--------------------- test.js | 64 +++++----- 4 files changed, 287 insertions(+), 194 deletions(-) diff --git a/Readme.md b/Readme.md index 849d626..17a785a 100644 --- a/Readme.md +++ b/Readme.md @@ -41,26 +41,54 @@ This is very much a work in progress. You can use kwikemon as a library. - var kwikemon = require('kiwkemon'); + var kwikemon = require('kiwkemon') - kwikemon.set('foo', 'bar', function(err) { - kwikemon.fetch('foo', function(err, text) { - console.log('foo = ' + text); - }); - }); +Change the redis connection: - kwikemon.fetchAll(function(err, monitors) { - Object.keys(monitors).forEach(function (name) { - console.log(name + ' = ' + monitors[name]); - }); - }); + kwikemon.redis([newRedis]) + +Configure: + + kwikemon.keyPrefix = 'custom:'; + kwikemon.defaultTTL = 3600; // one hour + +#### Writing + + kwikemon.set(name, text, function(err)) Monitors expire 1 day after the last time they were set by default. You can pass in any `ttl` you want though. // never expire - kwikemon.set('foo', 'bar', { ttl: 0 }); + kwikemon.setex(name, text, 0) +Get a stream for writing to a monitor: + + w = kwikemon.writer(name) + w.write('status\n') + +With a custom TTL: + + w = kwikemon.writer(name, ttl) + +There's a 'monitor' event if you care when it sets text. + + w.on('monitor', function(name, text)); + +#### Reading + + kwikemon.exists(name, function(err, exists)) + kwikemon.fetch(name, function(err, mon)) + kwikemon.ttl(name, function(err, ttl)) + kwikemon.list(function(err, names)) + kwikemon.fetchAll(function(err, monitors)) + kwikemon.count(function(err, n)) + +#### Deleting + + kwikemon.remove(function(err)) + kwikemon.clear(function(err)) + kwikemon.sweep(function(err)) ## Protocol diff --git a/bin/kwikemon b/bin/kwikemon index 162c0a3..9a30965 100755 --- a/bin/kwikemon +++ b/bin/kwikemon @@ -25,7 +25,7 @@ if (opt && opt[0] == '-') { switch (opt) { case '-c': case '--clear': - kwikemon.removeAll(function(err) { + kwikemon.clear(function(err) { process.exit(0); }); break; @@ -110,9 +110,11 @@ else if (name && text) { }); } else if (name) { - process.stdin.pipe(kwikemon.createWriter(name)); - process.stdin.on('end', function() { - process.exit(0); + kwikemon.writer(name, function(err, writer) { + process.stdin.pipe(writer); + process.stdin.on('end', function() { + process.exit(0); + }); }); } else { diff --git a/kwikemon.js b/kwikemon.js index 8488fc5..f355184 100644 --- a/kwikemon.js +++ b/kwikemon.js @@ -1,38 +1,10 @@ // Copyright 2013 Sami Samhuri -module.exports = { - - // write - set: callbackOptional(set) -, writer: writer - - // read -, exists: callbackOptional(exists) -, fetch: callbackOptional(fetch) -, fetchTTL: callbackOptional(fetchTTL) -, list: list -, fetchAll: fetchAll -, count: count - - // remove -, remove: callbackOptional(remove) -, removeAll: removeAll -, sweep: sweep - - // change redis client -, redis: setRedis -}; - var async = require('async') - , redis = require('redis').createClient() + , redis = require('redis') , LineEmitter = require('./line_emitter.js') ; -function setRedis(newRedis) { - if (redis) redis.end(); - redis = newRedis; -} - // Make the callback argument of a function optional. // If the callback is passed it will call the function // normally. If the callback isn't given a function @@ -64,141 +36,230 @@ function callbackOptional(fn, ctx) { }; } -function k(name) { - return 'kwikemon:monitor:' + name; -} +var kwikemon = module.exports = { + Monitor: Monitor -function exists(name, cb) { - redis.exists(k(name), function(err, exists) { - if (err) return cb(err); - cb(null, exists == 1); - }); -} +, defaultTTL: 86400 +, keyPrefix: 'kwikemon:' -// options: -// - ttl: time to live in seconds, <= 0 to never expire -function set(name, text, options, cb) { - if (typeof options == 'function') { - cb = options; - options = null; + // get or set the redis connection +, redis: function(newRedis) { + // set + if (newRedis) { + if (kwikemon._redis) kwikemon._redis.end(); + kwikemon._redis = newRedis; + } + // get, init if necessary + else { + if (!kwikemon._redis) { + kwikemon._redis = redis.createClient(); + } + return kwikemon._redis; + } } - options = options || {}; - var key = k(name) - , ttl = ('ttl' in options) ? options.ttl : 86400 + +, key: function(name) { + return kwikemon.keyPrefix + 'monitor:' + name; + } + +, indexKey: function() { + return kwikemon.keyPrefix + 'monitors'; + } + +, count: function(cb) { + kwikemon.redis().scard(kwikemon.indexKey(), cb); + } + +, sweep: function(cb) { + var keptNames = []; + kwikemon.redis().smembers(kwikemon.indexKey(), function(err, names) { + if (err) return cb(err); + var sweepers = names.map(function(name) { + return function(done) { + kwikemon.exists(name, function(err, exists) { + if (err) { + done(); + } + // remove expired monitors + else if (!exists) { + new Monitor(name).remove(done); + } + else { + keptNames.push(name); + done(); + } + }); + }; + }); + async.parallel(sweepers, function(err, _) { + cb(err, keptNames); + }); + }); + } + +, list: function(cb) { + kwikemon.sweep(function(err, names) { + if (err) return cb(err); + cb(null, names); + }); + } + +, fetchAll: function(cb) { + var monitors = {}; + kwikemon.list(function(err, names) { + if (err) return cb(err); + var fetchers = names.sort().map(function(name) { + return function(done) { + kwikemon.fetch(name, function(err, mon) { + if (err) return done(err); + monitors[name] = mon; + done(); + }); + }; + }); + async.parallel(fetchers, function(err, _) { + if (err) return cb(err); + cb(null, monitors) + }); + }); + } + +, clear: function(cb) { + kwikemon.list(function(err, names) { + if (err) return cb(err); + var removers = names.map(function(name) { + return function(done) { + new Monitor(name).remove(done); + }; + }); + async.parallel(removers, cb); + }); + } + +, exists: callbackOptional(function(name, cb) { + kwikemon.redis().exists(kwikemon.key(name), function(err, exists) { + if (err) return cb(err); + cb(null, exists == 1); + }); + }, kwikemon) + +, fetch: callbackOptional(function(name, cb) { + kwikemon.redis().hgetall(kwikemon.key(name), function(err, fields) { + if (err) return cb(err); + if (fields) { + cb(null, new Monitor(name, fields)); + } + else { + cb(new Error('not found')); + } + }); + }, kwikemon) + +, ttl: callbackOptional(function(name, cb) { + new Monitor(name).ttl(cb); + }, kwikemon) + +, set: callbackOptional(function(name, text, cb) { + return kwikemon.setex(name, text, null, cb); + }, kwikemon) + +, setex: callbackOptional(function(name, text, ttl, cb) { + kwikemon.fetch(name, function(err, mon) { + if (err && err.message != 'not found') return cb(err); + mon = mon || new Monitor(name); + mon.text = text; + if (typeof ttl == 'number') { + mon.expire = ttl; + } + mon.save(cb); + }); + }, kwikemon) + +, writer: callbackOptional(function(name, ttl, cb) { + if (typeof ttl == 'function') { + cb = ttl; + ttl = null; + } + kwikemon.fetch(name, function(err, mon) { + if (err && err.message != 'not found') return cb(err); + mon = mon || new Monitor(name); + if (typeof ttl == 'number') { + mon.expire = ttl; + } + cb(null, mon.writer()); + }); + }, kwikemon) + +, remove: callbackOptional(function(name, cb) { + new Monitor(name).remove(cb); + }, kwikemon) + +}; + +function Monitor(name, fields) { + this.name = name; + if (fields) { + this.text = fields.text; + this.created = fields.created ? new Date(+fields.created) : null; + this.modified = fields.modified ? new Date(+fields.modified) : null; + this.updates = fields.updates || 0; + this.expire = typeof fields.expire == 'number' ? fields.expire : kwikemon.defaultTTL; + } +} + +Monitor.prototype.key = function() { + return kwikemon.key(this.name); +}; + +Monitor.prototype.remove = function(cb) { + kwikemon.redis().multi() + .del(this.key()) + .srem(kwikemon.indexKey(), this.name) + .exec(cb); +}; + +Monitor.prototype.update = function(text, cb) { + this.text = text; + this.save(cb); +}; + +Monitor.prototype.save = function(cb) { + var self = this + , key = this.key() ; - exists(name, function(err, exists) { + kwikemon.exists(this.name, function(err, exists) { var fields = { - text: text + text: self.text + , expire: self.expire , modified: Date.now() } - , multi = redis.multi() + , multi = kwikemon.redis().multi() ; if (!exists) { fields.created = Date.now(); } multi .hmset(key, fields) - .hincrby(key, 'updates', 1); - if (ttl != null) { - multi.expire(key, ttl); - } - multi.sadd('kwikemon:monitors', name); - multi.exec(cb); + .hincrby(key, 'updates', 1) + .expire(key, self.expire) + .sadd(kwikemon.indexKey(), self.name) + .exec(cb); }); -} +}; -function writer(name) { - var le = new LineEmitter(); +Monitor.prototype.ttl = function(cb) { + kwikemon.redis().ttl(this.key(), cb); +}; + +Monitor.prototype.writer = function() { + var self = this + , le = new LineEmitter() + ; le.on('line', function(line) { - set(name, line, function(err) { + self.update(line, function(err) { if (err) throw err; - le.emit('monitor', name, line); + le.emit('monitor', self.name, line); }); }); return le; -} - -function fetch(name, cb) { - redis.hgetall(k(name), cb); -} - -function fetchTTL(name, cb) { - redis.ttl(k(name), cb); -} - -function count(cb) { - redis.scard('kwikemon:monitors', cb); -} - -function sweep(cb) { - var i = 0 - , n - , checkIfDone = function() { - i += 1; - if (i == n) cb(); - } - ; - redis.smembers('kwikemon:monitors', function(err, names) { - if (err) return cb(err); - n = names.length; - if (n == 0) return cb(); - names.forEach(function(name) { - exists(name, function(err, exists) { - if (err) { - // meh, ignore it - } - // remove expired monitors - else if (!exists) { - remove(name); - } - checkIfDone(); - }); - }); - }); -} - -function list(cb) { - sweep(function(err) { - if (err) return cb(err); - redis.smembers('kwikemon:monitors', cb); - }); -} - -function fetchAll(cb) { - var monitors = {}; - list(function(err, names) { - if (err) return cb(err); - var fetchers = names.sort().map(function(name) { - return function(done) { - fetch(name, function(err, text) { - if (err) return done(err); - monitors[name] = text; - done(); - }); - }; - }); - async.parallel(fetchers, function(err, _) { - if (err) return cb(err); - cb(null, monitors) - }); - }); -} - -function remove(name, cb) { - redis.multi() - .del(k(name)) - .srem('kwikemon:monitors', name) - .exec(cb); -} - -function removeAll(cb) { - redis.smembers('kwikemon:monitors', function(err, names) { - if (err) return cb(err); - var multi = redis.multi(); - names.forEach(function(name) { - multi.del(k(name)); - multi.srem('kwikemon:monitors', name); - }); - multi.exec(cb); - }); -} +}; diff --git a/test.js b/test.js index fd502af..93bb662 100755 --- a/test.js +++ b/test.js @@ -12,8 +12,8 @@ before(function(done) { done(); }); }); -beforeEach(kwikemon.removeAll); -after(kwikemon.removeAll); +beforeEach(kwikemon.clear); +after(kwikemon.clear); describe("kwikemon", function() { describe("#set", function() { @@ -36,17 +36,17 @@ describe("kwikemon", function() { }); it("should set custom ttls", function(done) { - kwikemon.set('foo', 'bar', { ttl: 1 }, function(err) { - kwikemon.fetchTTL('foo', function(err, ttl) { - assert(ttl <= 1); + kwikemon.setex('foo', 'bar', 5, function(err) { + kwikemon.ttl('foo', function(err, ttl) { + assert(ttl <= 5); done(); }); }) }); it("should not expire with a ttl of zero", function(done) { - kwikemon.set('foo', 'bar', { ttl: 0 }, function(err) { - kwikemon.fetchTTL('foo', function(err, ttl) { + kwikemon.setex('foo', 'bar', 0, function(err) { + kwikemon.ttl('foo', function(err, ttl) { assert(ttl == -1); done(); }); @@ -54,8 +54,8 @@ describe("kwikemon", function() { }); it("should not expire when ttl is < 0", function(done) { - kwikemon.set('foo', 'bar', { ttl: -1 }, function(err) { - kwikemon.fetchTTL('foo', function(err, ttl) { + kwikemon.setex('foo', 'bar', -1, function(err) { + kwikemon.ttl('foo', function(err, ttl) { assert(ttl == -1); done(); }); @@ -65,30 +65,32 @@ describe("kwikemon", function() { describe("#writer", function() { it("should monitor each line of text written", function(done) { - var writer = kwikemon.writer('foo'); - writer.once('monitor', function(name, text) { - assert(text == 'a'); + kwikemon.writer('foo', function(err, writer) { writer.once('monitor', function(name, text) { - assert(text == 'b'); - done(); + assert(text == 'a'); + writer.once('monitor', function(name, text) { + assert(text == 'b'); + done(); + }); + writer.write("b\n"); }); - writer.write("b\n"); + writer.write("a\n"); }); - writer.write("a\n"); }); it("should only monitor complete lines of text", function(done) { - var writer = kwikemon.writer('foo'); - writer.once('monitor', function(name, text) { - assert(text == 'complete'); + kwikemon.writer('foo', function(err, writer) { writer.once('monitor', function(name, text) { - assert(text == 'incomplete'); - done(); + assert(text == 'complete'); + writer.once('monitor', function(name, text) { + assert(text == 'incomplete'); + done(); + }); + writer.write("plete\n"); }); - writer.write("plete\n"); + writer.write("complete\n"); + writer.write("incom"); }); - writer.write("complete\n"); - writer.write("incom"); }); }); @@ -115,10 +117,10 @@ describe("kwikemon", function() { }); }); - describe("#fetchTTL", function() { + describe("#ttl", function() { it("should fetch the last TTL set", function(done) { - kwikemon.set('foo', 'bar', { ttl: 300 }, function(err) { - kwikemon.fetchTTL('foo', function(err, ttl) { + kwikemon.setex('foo', 'bar', 300, function(err) { + kwikemon.ttl('foo', function(err, ttl) { assert(ttl <= 300); done(); }); @@ -126,7 +128,7 @@ describe("kwikemon", function() { }); it("should return -1 for non-existent monitors", function(done) { - kwikemon.fetchTTL('non-existent', function(err, ttl) { + kwikemon.ttl('non-existent', function(err, ttl) { assert(ttl == -1); done(); }); @@ -187,12 +189,12 @@ describe("kwikemon", function() { }); }); - describe("#removeAll", function() { - it("should remove the named monitor", function(done) { + describe("#clear", function() { + it("should remove all monitors", function(done) { async.series([ kwikemon.set('foo', 'bar') , kwikemon.set('baz', 'quux') - , kwikemon.removeAll + , kwikemon.clear , kwikemon.exists('foo') , kwikemon.exists('baz') , kwikemon.count