redis.js 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269
  1. /*!
  2. * socket.io-node
  3. * Copyright(c) 2011 LearnBoost <[email protected]>
  4. * MIT Licensed
  5. */
  6. /**
  7. * Module dependencies.
  8. */
  9. var crypto = require('crypto')
  10. , Store = require('../store')
  11. , assert = require('assert');
  12. /**
  13. * Exports the constructor.
  14. */
  15. exports = module.exports = Redis;
  16. Redis.Client = Client;
  17. /**
  18. * Redis store.
  19. * Options:
  20. * - nodeId (fn) gets an id that uniquely identifies this node
  21. * - redis (fn) redis constructor, defaults to redis
  22. * - redisPub (object) options to pass to the pub redis client
  23. * - redisSub (object) options to pass to the sub redis client
  24. * - redisClient (object) options to pass to the general redis client
  25. * - pack (fn) custom packing, defaults to JSON or msgpack if installed
  26. * - unpack (fn) custom packing, defaults to JSON or msgpack if installed
  27. *
  28. * @api public
  29. */
  30. function Redis (opts) {
  31. opts = opts || {};
  32. // node id to uniquely identify this node
  33. var nodeId = opts.nodeId || function () {
  34. // by default, we generate a random id
  35. return Math.abs(Math.random() * Math.random() * Date.now() | 0);
  36. };
  37. this.nodeId = nodeId();
  38. // packing / unpacking mechanism
  39. if (opts.pack) {
  40. this.pack = opts.pack;
  41. this.unpack = opts.unpack;
  42. } else {
  43. try {
  44. var msgpack = require('msgpack');
  45. this.pack = msgpack.pack;
  46. this.unpack = msgpack.unpack;
  47. } catch (e) {
  48. this.pack = JSON.stringify;
  49. this.unpack = JSON.parse;
  50. }
  51. }
  52. var redis = opts.redis || require('redis')
  53. , RedisClient = redis.RedisClient;
  54. // initialize a pubsub client and a regular client
  55. if (opts.redisPub instanceof RedisClient) {
  56. this.pub = opts.redisPub;
  57. } else {
  58. opts.redisPub || (opts.redisPub = {});
  59. this.pub = redis.createClient(opts.redisPub.port, opts.redisPub.host, opts.redisPub);
  60. }
  61. if (opts.redisSub instanceof RedisClient) {
  62. this.sub = opts.redisSub;
  63. } else {
  64. opts.redisSub || (opts.redisSub = {});
  65. this.sub = redis.createClient(opts.redisSub.port, opts.redisSub.host, opts.redisSub);
  66. }
  67. if (opts.redisClient instanceof RedisClient) {
  68. this.cmd = opts.redisClient;
  69. } else {
  70. opts.redisClient || (opts.redisClient = {});
  71. this.cmd = redis.createClient(opts.redisClient.port, opts.redisClient.host, opts.redisClient);
  72. }
  73. Store.call(this, opts);
  74. this.sub.setMaxListeners(0);
  75. this.setMaxListeners(0);
  76. };
  77. /**
  78. * Inherits from Store.
  79. */
  80. Redis.prototype.__proto__ = Store.prototype;
  81. /**
  82. * Publishes a message.
  83. *
  84. * @api private
  85. */
  86. Redis.prototype.publish = function (name) {
  87. var args = Array.prototype.slice.call(arguments, 1);
  88. this.pub.publish(name, this.pack({ nodeId: this.nodeId, args: args }));
  89. this.emit.apply(this, ['publish', name].concat(args));
  90. };
  91. /**
  92. * Subscribes to a channel
  93. *
  94. * @api private
  95. */
  96. Redis.prototype.subscribe = function (name, consumer, fn) {
  97. this.sub.subscribe(name);
  98. if (consumer || fn) {
  99. var self = this;
  100. self.sub.on('subscribe', function subscribe (ch) {
  101. if (name == ch) {
  102. function message (ch, msg) {
  103. if (name == ch) {
  104. msg = self.unpack(msg);
  105. // we check that the message consumed wasnt emitted by this node
  106. if (self.nodeId != msg.nodeId) {
  107. consumer.apply(null, msg.args);
  108. }
  109. }
  110. };
  111. self.sub.on('message', message);
  112. self.on('unsubscribe', function unsubscribe (ch) {
  113. if (name == ch) {
  114. self.sub.removeListener('message', message);
  115. self.removeListener('unsubscribe', unsubscribe);
  116. }
  117. });
  118. self.sub.removeListener('subscribe', subscribe);
  119. fn && fn();
  120. }
  121. });
  122. }
  123. this.emit('subscribe', name, consumer, fn);
  124. };
  125. /**
  126. * Unsubscribes
  127. *
  128. * @api private
  129. */
  130. Redis.prototype.unsubscribe = function (name, fn) {
  131. this.sub.unsubscribe(name);
  132. if (fn) {
  133. var client = this.sub;
  134. client.on('unsubscribe', function unsubscribe (ch) {
  135. if (name == ch) {
  136. fn();
  137. client.removeListener('unsubscribe', unsubscribe);
  138. }
  139. });
  140. }
  141. this.emit('unsubscribe', name, fn);
  142. };
  143. /**
  144. * Destroys the store
  145. *
  146. * @api public
  147. */
  148. Redis.prototype.destroy = function () {
  149. Store.prototype.destroy.call(this);
  150. this.pub.end();
  151. this.sub.end();
  152. this.cmd.end();
  153. };
  154. /**
  155. * Client constructor
  156. *
  157. * @api private
  158. */
  159. function Client (store, id) {
  160. Store.Client.call(this, store, id);
  161. };
  162. /**
  163. * Inherits from Store.Client
  164. */
  165. Client.prototype.__proto__ = Store.Client;
  166. /**
  167. * Redis hash get
  168. *
  169. * @api private
  170. */
  171. Client.prototype.get = function (key, fn) {
  172. this.store.cmd.hget(this.id, key, fn);
  173. return this;
  174. };
  175. /**
  176. * Redis hash set
  177. *
  178. * @api private
  179. */
  180. Client.prototype.set = function (key, value, fn) {
  181. this.store.cmd.hset(this.id, key, value, fn);
  182. return this;
  183. };
  184. /**
  185. * Redis hash del
  186. *
  187. * @api private
  188. */
  189. Client.prototype.del = function (key, fn) {
  190. this.store.cmd.hdel(this.id, key, fn);
  191. return this;
  192. };
  193. /**
  194. * Redis hash has
  195. *
  196. * @api private
  197. */
  198. Client.prototype.has = function (key, fn) {
  199. this.store.cmd.hexists(this.id, key, function (err, has) {
  200. if (err) return fn(err);
  201. fn(null, !!has);
  202. });
  203. return this;
  204. };
  205. /**
  206. * Destroys client
  207. *
  208. * @param {Number} number of seconds to expire data
  209. * @api private
  210. */
  211. Client.prototype.destroy = function (expiration) {
  212. if ('number' != typeof expiration) {
  213. this.store.cmd.del(this.id);
  214. } else {
  215. this.store.cmd.expire(this.id, expiration);
  216. }
  217. return this;
  218. };