index.js 38 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726727728729730731732733734735736737738739740741742743744745746747748749750751752753754755756757758759760761762763764765766767768769770771772773774775776777778779780781782783784785786787788789790791792793794795796797798799800801802803804805806807808809810811812813814815816817818819820821822823824825826827828829830831832833834835836837838839840841842843844845846847848849850851852853854855856857858859860861862863864865866867868869870871872873874875876877878879880881882883884885886887888889890891892893894895896897898899900901902903904905906907908909910911912913914915916917918919920921922923924925926927928929930931932933934935936937938939940941942943944945946947948949950951952953954955956957958959960961962963964965966967968969970971972973974975976977978979980981982983984985986987988989990991992993994995996997998999100010011002100310041005100610071008100910101011101210131014101510161017101810191020102110221023102410251026102710281029103010311032103310341035103610371038103910401041104210431044104510461047104810491050105110521053105410551056105710581059106010611062106310641065106610671068106910701071107210731074107510761077107810791080108110821083108410851086108710881089109010911092109310941095109610971098109911001101110211031104110511061107110811091110111111121113
  1. /*global Buffer require exports console setTimeout */
  2. var net = require("net"),
  3. util = require("./lib/util"),
  4. Queue = require("./lib/queue"),
  5. to_array = require("./lib/to_array"),
  6. events = require("events"),
  7. crypto = require("crypto"),
  8. parsers = [], commands,
  9. connection_id = 0,
  10. default_port = 6379,
  11. default_host = "127.0.0.1";
  12. // can set this to true to enable for all connections
  13. exports.debug_mode = false;
  14. // hiredis might not be installed
  15. try {
  16. require("./lib/parser/hiredis");
  17. parsers.push(require("./lib/parser/hiredis"));
  18. } catch (err) {
  19. if (exports.debug_mode) {
  20. console.warn("hiredis parser not installed.");
  21. }
  22. }
  23. parsers.push(require("./lib/parser/javascript"));
  24. function RedisClient(stream, options) {
  25. this.stream = stream;
  26. this.options = options = options || {};
  27. this.connection_id = ++connection_id;
  28. this.connected = false;
  29. this.ready = false;
  30. this.connections = 0;
  31. if (this.options.socket_nodelay === undefined) {
  32. this.options.socket_nodelay = true;
  33. }
  34. this.should_buffer = false;
  35. this.command_queue_high_water = this.options.command_queue_high_water || 1000;
  36. this.command_queue_low_water = this.options.command_queue_low_water || 0;
  37. this.max_attempts = null;
  38. if (options.max_attempts && !isNaN(options.max_attempts) && options.max_attempts > 0) {
  39. this.max_attempts = +options.max_attempts;
  40. }
  41. this.command_queue = new Queue(); // holds sent commands to de-pipeline them
  42. this.offline_queue = new Queue(); // holds commands issued but not able to be sent
  43. this.commands_sent = 0;
  44. this.connect_timeout = false;
  45. if (options.connect_timeout && !isNaN(options.connect_timeout) && options.connect_timeout > 0) {
  46. this.connect_timeout = +options.connect_timeout;
  47. }
  48. this.enable_offline_queue = true;
  49. if (typeof this.options.enable_offline_queue === "boolean") {
  50. this.enable_offline_queue = this.options.enable_offline_queue;
  51. }
  52. this.initialize_retry_vars();
  53. this.pub_sub_mode = false;
  54. this.subscription_set = {};
  55. this.monitoring = false;
  56. this.closing = false;
  57. this.server_info = {};
  58. this.auth_pass = null;
  59. this.parser_module = null;
  60. this.selected_db = null; // save the selected db here, used when reconnecting
  61. this.old_state = null;
  62. var self = this;
  63. this.stream.on("connect", function () {
  64. self.on_connect();
  65. });
  66. this.stream.on("data", function (buffer_from_socket) {
  67. self.on_data(buffer_from_socket);
  68. });
  69. this.stream.on("error", function (msg) {
  70. self.on_error(msg.message);
  71. });
  72. this.stream.on("close", function () {
  73. self.connection_gone("close");
  74. });
  75. this.stream.on("end", function () {
  76. self.connection_gone("end");
  77. });
  78. this.stream.on("drain", function () {
  79. self.should_buffer = false;
  80. self.emit("drain");
  81. });
  82. events.EventEmitter.call(this);
  83. }
  84. util.inherits(RedisClient, events.EventEmitter);
  85. exports.RedisClient = RedisClient;
  86. RedisClient.prototype.initialize_retry_vars = function () {
  87. this.retry_timer = null;
  88. this.retry_totaltime = 0;
  89. this.retry_delay = 150;
  90. this.retry_backoff = 1.7;
  91. this.attempts = 1;
  92. };
  93. // flush offline_queue and command_queue, erroring any items with a callback first
  94. RedisClient.prototype.flush_and_error = function (message) {
  95. var command_obj;
  96. while (this.offline_queue.length > 0) {
  97. command_obj = this.offline_queue.shift();
  98. if (typeof command_obj.callback === "function") {
  99. command_obj.callback(message);
  100. }
  101. }
  102. this.offline_queue = new Queue();
  103. while (this.command_queue.length > 0) {
  104. command_obj = this.command_queue.shift();
  105. if (typeof command_obj.callback === "function") {
  106. command_obj.callback(message);
  107. }
  108. }
  109. this.command_queue = new Queue();
  110. };
  111. RedisClient.prototype.on_error = function (msg) {
  112. var message = "Redis connection to " + this.host + ":" + this.port + " failed - " + msg,
  113. self = this, command_obj;
  114. if (this.closing) {
  115. return;
  116. }
  117. if (exports.debug_mode) {
  118. console.warn(message);
  119. }
  120. this.flush_and_error(message);
  121. this.connected = false;
  122. this.ready = false;
  123. this.emit("error", new Error(message));
  124. // "error" events get turned into exceptions if they aren't listened for. If the user handled this error
  125. // then we should try to reconnect.
  126. this.connection_gone("error");
  127. };
  128. RedisClient.prototype.do_auth = function () {
  129. var self = this;
  130. if (exports.debug_mode) {
  131. console.log("Sending auth to " + self.host + ":" + self.port + " id " + self.connection_id);
  132. }
  133. self.send_anyway = true;
  134. self.send_command("auth", [this.auth_pass], function (err, res) {
  135. if (err) {
  136. if (err.toString().match("LOADING")) {
  137. // if redis is still loading the db, it will not authenticate and everything else will fail
  138. console.log("Redis still loading, trying to authenticate later");
  139. setTimeout(function () {
  140. self.do_auth();
  141. }, 2000); // TODO - magic number alert
  142. return;
  143. } else {
  144. return self.emit("error", new Error("Auth error: " + err.message));
  145. }
  146. }
  147. if (res.toString() !== "OK") {
  148. return self.emit("error", new Error("Auth failed: " + res.toString()));
  149. }
  150. if (exports.debug_mode) {
  151. console.log("Auth succeeded " + self.host + ":" + self.port + " id " + self.connection_id);
  152. }
  153. if (self.auth_callback) {
  154. self.auth_callback(err, res);
  155. self.auth_callback = null;
  156. }
  157. // now we are really connected
  158. self.emit("connect");
  159. if (self.options.no_ready_check) {
  160. self.on_ready();
  161. } else {
  162. self.ready_check();
  163. }
  164. });
  165. self.send_anyway = false;
  166. };
  167. RedisClient.prototype.on_connect = function () {
  168. if (exports.debug_mode) {
  169. console.log("Stream connected " + this.host + ":" + this.port + " id " + this.connection_id);
  170. }
  171. var self = this;
  172. this.connected = true;
  173. this.ready = false;
  174. this.attempts = 0;
  175. this.connections += 1;
  176. this.command_queue = new Queue();
  177. this.emitted_end = false;
  178. this.initialize_retry_vars();
  179. if (this.options.socket_nodelay) {
  180. this.stream.setNoDelay();
  181. }
  182. this.stream.setTimeout(0);
  183. this.init_parser();
  184. if (this.auth_pass) {
  185. this.do_auth();
  186. } else {
  187. this.emit("connect");
  188. if (this.options.no_ready_check) {
  189. this.on_ready();
  190. } else {
  191. this.ready_check();
  192. }
  193. }
  194. };
  195. RedisClient.prototype.init_parser = function () {
  196. var self = this;
  197. if (this.options.parser) {
  198. if (! parsers.some(function (parser) {
  199. if (parser.name === self.options.parser) {
  200. self.parser_module = parser;
  201. if (exports.debug_mode) {
  202. console.log("Using parser module: " + self.parser_module.name);
  203. }
  204. return true;
  205. }
  206. })) {
  207. throw new Error("Couldn't find named parser " + self.options.parser + " on this system");
  208. }
  209. } else {
  210. if (exports.debug_mode) {
  211. console.log("Using default parser module: " + parsers[0].name);
  212. }
  213. this.parser_module = parsers[0];
  214. }
  215. this.parser_module.debug_mode = exports.debug_mode;
  216. // return_buffers sends back Buffers from parser to callback. detect_buffers sends back Buffers from parser, but
  217. // converts to Strings if the input arguments are not Buffers.
  218. this.reply_parser = new this.parser_module.Parser({
  219. return_buffers: self.options.return_buffers || self.options.detect_buffers || false
  220. });
  221. // "reply error" is an error sent back by Redis
  222. this.reply_parser.on("reply error", function (reply) {
  223. self.return_error(new Error(reply));
  224. });
  225. this.reply_parser.on("reply", function (reply) {
  226. self.return_reply(reply);
  227. });
  228. // "error" is bad. Somehow the parser got confused. It'll try to reset and continue.
  229. this.reply_parser.on("error", function (err) {
  230. self.emit("error", new Error("Redis reply parser error: " + err.stack));
  231. });
  232. };
  233. RedisClient.prototype.on_ready = function () {
  234. var self = this;
  235. this.ready = true;
  236. if (this.old_state !== null) {
  237. this.monitoring = this.old_state.monitoring;
  238. this.pub_sub_mode = this.old_state.pub_sub_mode;
  239. this.selected_db = this.old_state.selected_db;
  240. this.old_state = null;
  241. }
  242. // magically restore any modal commands from a previous connection
  243. if (this.selected_db !== null) {
  244. this.send_command('select', [this.selected_db]);
  245. }
  246. if (this.pub_sub_mode === true) {
  247. // only emit "ready" when all subscriptions were made again
  248. var callback_count = 0;
  249. var callback = function() {
  250. callback_count--;
  251. if (callback_count == 0) {
  252. self.emit("ready");
  253. }
  254. }
  255. Object.keys(this.subscription_set).forEach(function (key) {
  256. var parts = key.split(" ");
  257. if (exports.debug_mode) {
  258. console.warn("sending pub/sub on_ready " + parts[0] + ", " + parts[1]);
  259. }
  260. callback_count++;
  261. self.send_command(parts[0] + "scribe", [parts[1]], callback);
  262. });
  263. return;
  264. } else if (this.monitoring) {
  265. this.send_command("monitor");
  266. } else {
  267. this.send_offline_queue();
  268. }
  269. this.emit("ready");
  270. };
  271. RedisClient.prototype.on_info_cmd = function (err, res) {
  272. var self = this, obj = {}, lines, retry_time;
  273. if (err) {
  274. return self.emit("error", new Error("Ready check failed: " + err.message));
  275. }
  276. lines = res.toString().split("\r\n");
  277. lines.forEach(function (line) {
  278. var parts = line.split(':');
  279. if (parts[1]) {
  280. obj[parts[0]] = parts[1];
  281. }
  282. });
  283. obj.versions = [];
  284. obj.redis_version.split('.').forEach(function (num) {
  285. obj.versions.push(+num);
  286. });
  287. // expose info key/vals to users
  288. this.server_info = obj;
  289. if (!obj.loading || (obj.loading && obj.loading === "0")) {
  290. if (exports.debug_mode) {
  291. console.log("Redis server ready.");
  292. }
  293. this.on_ready();
  294. } else {
  295. retry_time = obj.loading_eta_seconds * 1000;
  296. if (retry_time > 1000) {
  297. retry_time = 1000;
  298. }
  299. if (exports.debug_mode) {
  300. console.log("Redis server still loading, trying again in " + retry_time);
  301. }
  302. setTimeout(function () {
  303. self.ready_check();
  304. }, retry_time);
  305. }
  306. };
  307. RedisClient.prototype.ready_check = function () {
  308. var self = this;
  309. if (exports.debug_mode) {
  310. console.log("checking server ready state...");
  311. }
  312. this.send_anyway = true; // secret flag to send_command to send something even if not "ready"
  313. this.info(function (err, res) {
  314. self.on_info_cmd(err, res);
  315. });
  316. this.send_anyway = false;
  317. };
  318. RedisClient.prototype.send_offline_queue = function () {
  319. var command_obj, buffered_writes = 0;
  320. while (this.offline_queue.length > 0) {
  321. command_obj = this.offline_queue.shift();
  322. if (exports.debug_mode) {
  323. console.log("Sending offline command: " + command_obj.command);
  324. }
  325. buffered_writes += !this.send_command(command_obj.command, command_obj.args, command_obj.callback);
  326. }
  327. this.offline_queue = new Queue();
  328. // Even though items were shifted off, Queue backing store still uses memory until next add, so just get a new Queue
  329. if (!buffered_writes) {
  330. this.should_buffer = false;
  331. this.emit("drain");
  332. }
  333. };
  334. RedisClient.prototype.connection_gone = function (why) {
  335. var self = this, message;
  336. // If a retry is already in progress, just let that happen
  337. if (this.retry_timer) {
  338. return;
  339. }
  340. if (exports.debug_mode) {
  341. console.warn("Redis connection is gone from " + why + " event.");
  342. }
  343. this.connected = false;
  344. this.ready = false;
  345. if (this.old_state === null) {
  346. var state = {
  347. monitoring: this.monitoring,
  348. pub_sub_mode: this.pub_sub_mode,
  349. selected_db: this.selected_db
  350. };
  351. this.old_state = state;
  352. this.monitoring = false;
  353. this.pub_sub_mode = false;
  354. this.selected_db = null;
  355. }
  356. // since we are collapsing end and close, users don't expect to be called twice
  357. if (! this.emitted_end) {
  358. this.emit("end");
  359. this.emitted_end = true;
  360. }
  361. this.flush_and_error("Redis connection gone from " + why + " event.");
  362. // If this is a requested shutdown, then don't retry
  363. if (this.closing) {
  364. this.retry_timer = null;
  365. if (exports.debug_mode) {
  366. console.warn("connection ended from quit command, not retrying.");
  367. }
  368. return;
  369. }
  370. this.retry_delay = Math.floor(this.retry_delay * this.retry_backoff);
  371. if (exports.debug_mode) {
  372. console.log("Retry connection in " + this.current_retry_delay + " ms");
  373. }
  374. if (this.max_attempts && this.attempts >= this.max_attempts) {
  375. this.retry_timer = null;
  376. // TODO - some people need a "Redis is Broken mode" for future commands that errors immediately, and others
  377. // want the program to exit. Right now, we just log, which doesn't really help in either case.
  378. console.error("node_redis: Couldn't get Redis connection after " + this.max_attempts + " attempts.");
  379. return;
  380. }
  381. this.attempts += 1;
  382. this.emit("reconnecting", {
  383. delay: self.retry_delay,
  384. attempt: self.attempts
  385. });
  386. this.retry_timer = setTimeout(function () {
  387. if (exports.debug_mode) {
  388. console.log("Retrying connection...");
  389. }
  390. self.retry_totaltime += self.current_retry_delay;
  391. if (self.connect_timeout && self.retry_totaltime >= self.connect_timeout) {
  392. self.retry_timer = null;
  393. // TODO - engage Redis is Broken mode for future commands, or whatever
  394. console.error("node_redis: Couldn't get Redis connection after " + self.retry_totaltime + "ms.");
  395. return;
  396. }
  397. self.stream.connect(self.port, self.host);
  398. self.retry_timer = null;
  399. }, this.retry_delay);
  400. };
  401. RedisClient.prototype.on_data = function (data) {
  402. if (exports.debug_mode) {
  403. console.log("net read " + this.host + ":" + this.port + " id " + this.connection_id + ": " + data.toString());
  404. }
  405. try {
  406. this.reply_parser.execute(data);
  407. } catch (err) {
  408. // This is an unexpected parser problem, an exception that came from the parser code itself.
  409. // Parser should emit "error" events if it notices things are out of whack.
  410. // Callbacks that throw exceptions will land in return_reply(), below.
  411. // TODO - it might be nice to have a different "error" event for different types of errors
  412. this.emit("error", err);
  413. }
  414. };
  415. RedisClient.prototype.return_error = function (err) {
  416. var command_obj = this.command_queue.shift(), queue_len = this.command_queue.getLength();
  417. if (this.pub_sub_mode === false && queue_len === 0) {
  418. this.emit("idle");
  419. this.command_queue = new Queue();
  420. }
  421. if (this.should_buffer && queue_len <= this.command_queue_low_water) {
  422. this.emit("drain");
  423. this.should_buffer = false;
  424. }
  425. if (command_obj && typeof command_obj.callback === "function") {
  426. try {
  427. command_obj.callback(err);
  428. } catch (callback_err) {
  429. // if a callback throws an exception, re-throw it on a new stack so the parser can keep going
  430. process.nextTick(function () {
  431. throw callback_err;
  432. });
  433. }
  434. } else {
  435. console.log("node_redis: no callback to send error: " + err.message);
  436. // this will probably not make it anywhere useful, but we might as well throw
  437. process.nextTick(function () {
  438. throw err;
  439. });
  440. }
  441. };
  442. // if a callback throws an exception, re-throw it on a new stack so the parser can keep going.
  443. // put this try/catch in its own function because V8 doesn't optimize this well yet.
  444. function try_callback(callback, reply) {
  445. try {
  446. callback(null, reply);
  447. } catch (err) {
  448. process.nextTick(function () {
  449. throw err;
  450. });
  451. }
  452. }
  453. // hgetall converts its replies to an Object. If the reply is empty, null is returned.
  454. function reply_to_object(reply) {
  455. var obj = {}, j, jl, key, val;
  456. if (reply.length === 0) {
  457. return null;
  458. }
  459. for (j = 0, jl = reply.length; j < jl; j += 2) {
  460. key = reply[j].toString();
  461. val = reply[j + 1];
  462. obj[key] = val;
  463. }
  464. return obj;
  465. }
  466. function reply_to_strings(reply) {
  467. var i;
  468. if (Buffer.isBuffer(reply)) {
  469. return reply.toString();
  470. }
  471. if (Array.isArray(reply)) {
  472. for (i = 0; i < reply.length; i++) {
  473. reply[i] = reply[i].toString();
  474. }
  475. return reply;
  476. }
  477. return reply;
  478. }
  479. RedisClient.prototype.return_reply = function (reply) {
  480. var command_obj, obj, i, len, type, timestamp, argindex, args, queue_len;
  481. command_obj = this.command_queue.shift(),
  482. queue_len = this.command_queue.getLength();
  483. if (this.pub_sub_mode === false && queue_len === 0) {
  484. this.emit("idle");
  485. this.command_queue = new Queue(); // explicitly reclaim storage from old Queue
  486. }
  487. if (this.should_buffer && queue_len <= this.command_queue_low_water) {
  488. this.emit("drain");
  489. this.should_buffer = false;
  490. }
  491. if (command_obj && !command_obj.sub_command) {
  492. if (typeof command_obj.callback === "function") {
  493. if (this.options.detect_buffers && command_obj.buffer_args === false) {
  494. // If detect_buffers option was specified, then the reply from the parser will be Buffers.
  495. // If this command did not use Buffer arguments, then convert the reply to Strings here.
  496. reply = reply_to_strings(reply);
  497. }
  498. // TODO - confusing and error-prone that hgetall is special cased in two places
  499. if (reply && 'hgetall' === command_obj.command.toLowerCase()) {
  500. reply = reply_to_object(reply);
  501. }
  502. try_callback(command_obj.callback, reply);
  503. } else if (exports.debug_mode) {
  504. console.log("no callback for reply: " + (reply && reply.toString && reply.toString()));
  505. }
  506. } else if (this.pub_sub_mode || (command_obj && command_obj.sub_command)) {
  507. if (Array.isArray(reply)) {
  508. type = reply[0].toString();
  509. if (type === "message") {
  510. this.emit("message", reply[1].toString(), reply[2]); // channel, message
  511. } else if (type === "pmessage") {
  512. this.emit("pmessage", reply[1].toString(), reply[2].toString(), reply[3]); // pattern, channel, message
  513. } else if (type === "subscribe" || type === "unsubscribe" || type === "psubscribe" || type === "punsubscribe") {
  514. if (reply[2] === 0) {
  515. this.pub_sub_mode = false;
  516. if (this.debug_mode) {
  517. console.log("All subscriptions removed, exiting pub/sub mode");
  518. }
  519. } else {
  520. this.pub_sub_mode = true;
  521. }
  522. // subscribe commands take an optional callback and also emit an event, but only the first response is included in the callback
  523. // TODO - document this or fix it so it works in a more obvious way
  524. if (command_obj && typeof command_obj.callback === "function") {
  525. try_callback(command_obj.callback, reply[1].toString());
  526. }
  527. this.emit(type, reply[1].toString(), reply[2]); // channel, count
  528. } else {
  529. throw new Error("subscriptions are active but got unknown reply type " + type);
  530. }
  531. } else if (! this.closing) {
  532. throw new Error("subscriptions are active but got an invalid reply: " + reply);
  533. }
  534. } else if (this.monitoring) {
  535. len = reply.indexOf(" ");
  536. timestamp = reply.slice(0, len);
  537. argindex = reply.indexOf('"');
  538. args = reply.slice(argindex + 1, -1).split('" "').map(function (elem) {
  539. return elem.replace(/\\"/g, '"');
  540. });
  541. this.emit("monitor", timestamp, args);
  542. } else {
  543. throw new Error("node_redis command queue state error. If you can reproduce this, please report it.");
  544. }
  545. };
  546. // This Command constructor is ever so slightly faster than using an object literal, but more importantly, using
  547. // a named constructor helps it show up meaningfully in the V8 CPU profiler and in heap snapshots.
  548. function Command(command, args, sub_command, buffer_args, callback) {
  549. this.command = command;
  550. this.args = args;
  551. this.sub_command = sub_command;
  552. this.buffer_args = buffer_args;
  553. this.callback = callback;
  554. }
  555. RedisClient.prototype.send_command = function (command, args, callback) {
  556. var arg, this_args, command_obj, i, il, elem_count, buffer_args, stream = this.stream, command_str = "", buffered_writes = 0, last_arg_type;
  557. if (typeof command !== "string") {
  558. throw new Error("First argument to send_command must be the command name string, not " + typeof command);
  559. }
  560. if (Array.isArray(args)) {
  561. if (typeof callback === "function") {
  562. // probably the fastest way:
  563. // client.command([arg1, arg2], cb); (straight passthrough)
  564. // send_command(command, [arg1, arg2], cb);
  565. } else if (! callback) {
  566. // most people find this variable argument length form more convenient, but it uses arguments, which is slower
  567. // client.command(arg1, arg2, cb); (wraps up arguments into an array)
  568. // send_command(command, [arg1, arg2, cb]);
  569. // client.command(arg1, arg2); (callback is optional)
  570. // send_command(command, [arg1, arg2]);
  571. // client.command(arg1, arg2, undefined); (callback is undefined)
  572. // send_command(command, [arg1, arg2, undefined]);
  573. last_arg_type = typeof args[args.length - 1];
  574. if (last_arg_type === "function" || last_arg_type === "undefined") {
  575. callback = args.pop();
  576. }
  577. } else {
  578. throw new Error("send_command: last argument must be a callback or undefined");
  579. }
  580. } else {
  581. throw new Error("send_command: second argument must be an array");
  582. }
  583. // if the last argument is an array and command is sadd, expand it out:
  584. // client.sadd(arg1, [arg2, arg3, arg4], cb);
  585. // converts to:
  586. // client.sadd(arg1, arg2, arg3, arg4, cb);
  587. if ((command === 'sadd' || command === 'SADD') && args.length > 0 && Array.isArray(args[args.length - 1])) {
  588. args = args.slice(0, -1).concat(args[args.length - 1]);
  589. }
  590. buffer_args = false;
  591. for (i = 0, il = args.length, arg; i < il; i += 1) {
  592. if (Buffer.isBuffer(args[i])) {
  593. buffer_args = true;
  594. }
  595. }
  596. command_obj = new Command(command, args, false, buffer_args, callback);
  597. if ((!this.ready && !this.send_anyway) || !stream.writable) {
  598. if (exports.debug_mode) {
  599. if (!stream.writable) {
  600. console.log("send command: stream is not writeable.");
  601. }
  602. }
  603. if (this.enable_offline_queue) {
  604. if (exports.debug_mode) {
  605. console.log("Queueing " + command + " for next server connection.");
  606. }
  607. this.offline_queue.push(command_obj);
  608. this.should_buffer = true;
  609. } else {
  610. var not_writeable_error = new Error('send_command: stream not writeable. enable_offline_queue is false');
  611. if (command_obj.callback) {
  612. command_obj.callback(not_writeable_error);
  613. } else {
  614. throw not_writeable_error;
  615. }
  616. }
  617. return false;
  618. }
  619. if (command === "subscribe" || command === "psubscribe" || command === "unsubscribe" || command === "punsubscribe") {
  620. this.pub_sub_command(command_obj);
  621. } else if (command === "monitor") {
  622. this.monitoring = true;
  623. } else if (command === "quit") {
  624. this.closing = true;
  625. } else if (this.pub_sub_mode === true) {
  626. throw new Error("Connection in pub/sub mode, only pub/sub commands may be used");
  627. }
  628. this.command_queue.push(command_obj);
  629. this.commands_sent += 1;
  630. elem_count = args.length + 1;
  631. // Always use "Multi bulk commands", but if passed any Buffer args, then do multiple writes, one for each arg.
  632. // This means that using Buffers in commands is going to be slower, so use Strings if you don't already have a Buffer.
  633. command_str = "*" + elem_count + "\r\n$" + command.length + "\r\n" + command + "\r\n";
  634. if (! buffer_args) { // Build up a string and send entire command in one write
  635. for (i = 0, il = args.length, arg; i < il; i += 1) {
  636. arg = args[i];
  637. if (typeof arg !== "string") {
  638. arg = String(arg);
  639. }
  640. command_str += "$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n";
  641. }
  642. if (exports.debug_mode) {
  643. console.log("send " + this.host + ":" + this.port + " id " + this.connection_id + ": " + command_str);
  644. }
  645. buffered_writes += !stream.write(command_str);
  646. } else {
  647. if (exports.debug_mode) {
  648. console.log("send command (" + command_str + ") has Buffer arguments");
  649. }
  650. buffered_writes += !stream.write(command_str);
  651. for (i = 0, il = args.length, arg; i < il; i += 1) {
  652. arg = args[i];
  653. if (!(Buffer.isBuffer(arg) || arg instanceof String)) {
  654. arg = String(arg);
  655. }
  656. if (Buffer.isBuffer(arg)) {
  657. if (arg.length === 0) {
  658. if (exports.debug_mode) {
  659. console.log("send_command: using empty string for 0 length buffer");
  660. }
  661. buffered_writes += !stream.write("$0\r\n\r\n");
  662. } else {
  663. buffered_writes += !stream.write("$" + arg.length + "\r\n");
  664. buffered_writes += !stream.write(arg);
  665. buffered_writes += !stream.write("\r\n");
  666. if (exports.debug_mode) {
  667. console.log("send_command: buffer send " + arg.length + " bytes");
  668. }
  669. }
  670. } else {
  671. if (exports.debug_mode) {
  672. console.log("send_command: string send " + Buffer.byteLength(arg) + " bytes: " + arg);
  673. }
  674. buffered_writes += !stream.write("$" + Buffer.byteLength(arg) + "\r\n" + arg + "\r\n");
  675. }
  676. }
  677. }
  678. if (exports.debug_mode) {
  679. console.log("send_command buffered_writes: " + buffered_writes, " should_buffer: " + this.should_buffer);
  680. }
  681. if (buffered_writes || this.command_queue.getLength() >= this.command_queue_high_water) {
  682. this.should_buffer = true;
  683. }
  684. return !this.should_buffer;
  685. };
  686. RedisClient.prototype.pub_sub_command = function (command_obj) {
  687. var i, key, command, args;
  688. if (this.pub_sub_mode === false && exports.debug_mode) {
  689. console.log("Entering pub/sub mode from " + command_obj.command);
  690. }
  691. this.pub_sub_mode = true;
  692. command_obj.sub_command = true;
  693. command = command_obj.command;
  694. args = command_obj.args;
  695. if (command === "subscribe" || command === "psubscribe") {
  696. if (command === "subscribe") {
  697. key = "sub";
  698. } else {
  699. key = "psub";
  700. }
  701. for (i = 0; i < args.length; i++) {
  702. this.subscription_set[key + " " + args[i]] = true;
  703. }
  704. } else {
  705. if (command === "unsubscribe") {
  706. key = "sub";
  707. } else {
  708. key = "psub";
  709. }
  710. for (i = 0; i < args.length; i++) {
  711. delete this.subscription_set[key + " " + args[i]];
  712. }
  713. }
  714. };
  715. RedisClient.prototype.end = function () {
  716. this.stream._events = {};
  717. this.connected = false;
  718. this.ready = false;
  719. return this.stream.end();
  720. };
  721. function Multi(client, args) {
  722. this.client = client;
  723. this.queue = [["MULTI"]];
  724. if (Array.isArray(args)) {
  725. this.queue = this.queue.concat(args);
  726. }
  727. }
  728. exports.Multi = Multi;
  729. // take 2 arrays and return the union of their elements
  730. function set_union(seta, setb) {
  731. var obj = {};
  732. seta.forEach(function (val) {
  733. obj[val] = true;
  734. });
  735. setb.forEach(function (val) {
  736. obj[val] = true;
  737. });
  738. return Object.keys(obj);
  739. }
  740. // This static list of commands is updated from time to time. ./lib/commands.js can be updated with generate_commands.js
  741. commands = set_union(["get", "set", "setnx", "setex", "append", "strlen", "del", "exists", "setbit", "getbit", "setrange", "getrange", "substr",
  742. "incr", "decr", "mget", "rpush", "lpush", "rpushx", "lpushx", "linsert", "rpop", "lpop", "brpop", "brpoplpush", "blpop", "llen", "lindex",
  743. "lset", "lrange", "ltrim", "lrem", "rpoplpush", "sadd", "srem", "smove", "sismember", "scard", "spop", "srandmember", "sinter", "sinterstore",
  744. "sunion", "sunionstore", "sdiff", "sdiffstore", "smembers", "zadd", "zincrby", "zrem", "zremrangebyscore", "zremrangebyrank", "zunionstore",
  745. "zinterstore", "zrange", "zrangebyscore", "zrevrangebyscore", "zcount", "zrevrange", "zcard", "zscore", "zrank", "zrevrank", "hset", "hsetnx",
  746. "hget", "hmset", "hmget", "hincrby", "hdel", "hlen", "hkeys", "hvals", "hgetall", "hexists", "incrby", "decrby", "getset", "mset", "msetnx",
  747. "randomkey", "select", "move", "rename", "renamenx", "expire", "expireat", "keys", "dbsize", "auth", "ping", "echo", "save", "bgsave",
  748. "bgrewriteaof", "shutdown", "lastsave", "type", "multi", "exec", "discard", "sync", "flushdb", "flushall", "sort", "info", "monitor", "ttl",
  749. "persist", "slaveof", "debug", "config", "subscribe", "unsubscribe", "psubscribe", "punsubscribe", "publish", "watch", "unwatch", "cluster",
  750. "restore", "migrate", "dump", "object", "client", "eval", "evalsha"], require("./lib/commands"));
  751. commands.forEach(function (command) {
  752. RedisClient.prototype[command] = function (args, callback) {
  753. if (Array.isArray(args) && typeof callback === "function") {
  754. return this.send_command(command, args, callback);
  755. } else {
  756. return this.send_command(command, to_array(arguments));
  757. }
  758. };
  759. RedisClient.prototype[command.toUpperCase()] = RedisClient.prototype[command];
  760. Multi.prototype[command] = function () {
  761. this.queue.push([command].concat(to_array(arguments)));
  762. return this;
  763. };
  764. Multi.prototype[command.toUpperCase()] = Multi.prototype[command];
  765. });
  766. // store db in this.select_db to restore it on reconnect
  767. RedisClient.prototype.select = function (db, callback) {
  768. var self = this;
  769. this.send_command('select', [db], function (err, res) {
  770. if (err === null) {
  771. self.selected_db = db;
  772. }
  773. if (typeof(callback) === 'function') {
  774. callback(err, res);
  775. }
  776. });
  777. };
  778. RedisClient.prototype.SELECT = RedisClient.prototype.select;
  779. // Stash auth for connect and reconnect. Send immediately if already connected.
  780. RedisClient.prototype.auth = function () {
  781. var args = to_array(arguments);
  782. this.auth_pass = args[0];
  783. this.auth_callback = args[1];
  784. if (exports.debug_mode) {
  785. console.log("Saving auth as " + this.auth_pass);
  786. }
  787. if (this.connected) {
  788. this.send_command("auth", args);
  789. }
  790. };
  791. RedisClient.prototype.AUTH = RedisClient.prototype.auth;
  792. RedisClient.prototype.hmget = function (arg1, arg2, arg3) {
  793. if (Array.isArray(arg2) && typeof arg3 === "function") {
  794. return this.send_command("hmget", [arg1].concat(arg2), arg3);
  795. } else if (Array.isArray(arg1) && typeof arg2 === "function") {
  796. return this.send_command("hmget", arg1, arg2);
  797. } else {
  798. return this.send_command("hmget", to_array(arguments));
  799. }
  800. };
  801. RedisClient.prototype.HMGET = RedisClient.prototype.hmget;
  802. RedisClient.prototype.hmset = function (args, callback) {
  803. var tmp_args, tmp_keys, i, il, key;
  804. if (Array.isArray(args) && typeof callback === "function") {
  805. return this.send_command("hmset", args, callback);
  806. }
  807. args = to_array(arguments);
  808. if (typeof args[args.length - 1] === "function") {
  809. callback = args[args.length - 1];
  810. args.length -= 1;
  811. } else {
  812. callback = null;
  813. }
  814. if (args.length === 2 && typeof args[0] === "string" && typeof args[1] === "object") {
  815. // User does: client.hmset(key, {key1: val1, key2: val2})
  816. tmp_args = [ args[0] ];
  817. tmp_keys = Object.keys(args[1]);
  818. for (i = 0, il = tmp_keys.length; i < il ; i++) {
  819. key = tmp_keys[i];
  820. tmp_args.push(key);
  821. if (typeof args[1][key] !== "string") {
  822. var err = new Error("hmset expected value to be a string", key, ":", args[1][key]);
  823. if (callback) return callback(err);
  824. else throw err;
  825. }
  826. tmp_args.push(args[1][key]);
  827. }
  828. args = tmp_args;
  829. }
  830. return this.send_command("hmset", args, callback);
  831. };
  832. RedisClient.prototype.HMSET = RedisClient.prototype.hmset;
  833. Multi.prototype.hmset = function () {
  834. var args = to_array(arguments), tmp_args;
  835. if (args.length >= 2 && typeof args[0] === "string" && typeof args[1] === "object") {
  836. tmp_args = [ "hmset", args[0] ];
  837. Object.keys(args[1]).map(function (key) {
  838. tmp_args.push(key);
  839. tmp_args.push(args[1][key]);
  840. });
  841. if (args[2]) {
  842. tmp_args.push(args[2]);
  843. }
  844. args = tmp_args;
  845. } else {
  846. args.unshift("hmset");
  847. }
  848. this.queue.push(args);
  849. return this;
  850. };
  851. Multi.prototype.HMSET = Multi.prototype.hmset;
  852. Multi.prototype.exec = function (callback) {
  853. var self = this;
  854. // drain queue, callback will catch "QUEUED" or error
  855. // TODO - get rid of all of these anonymous functions which are elegant but slow
  856. this.queue.forEach(function (args, index) {
  857. var command = args[0], obj;
  858. if (typeof args[args.length - 1] === "function") {
  859. args = args.slice(1, -1);
  860. } else {
  861. args = args.slice(1);
  862. }
  863. if (args.length === 1 && Array.isArray(args[0])) {
  864. args = args[0];
  865. }
  866. if (command.toLowerCase() === 'hmset' && typeof args[1] === 'object') {
  867. obj = args.pop();
  868. Object.keys(obj).forEach(function (key) {
  869. args.push(key);
  870. args.push(obj[key]);
  871. });
  872. }
  873. this.client.send_command(command, args, function (err, reply) {
  874. if (err) {
  875. var cur = self.queue[index];
  876. if (typeof cur[cur.length - 1] === "function") {
  877. cur[cur.length - 1](err);
  878. } else {
  879. throw new Error(err);
  880. }
  881. self.queue.splice(index, 1);
  882. }
  883. });
  884. }, this);
  885. // TODO - make this callback part of Multi.prototype instead of creating it each time
  886. return this.client.send_command("EXEC", [], function (err, replies) {
  887. if (err) {
  888. if (callback) {
  889. callback(new Error(err));
  890. return;
  891. } else {
  892. throw new Error(err);
  893. }
  894. }
  895. var i, il, j, jl, reply, args;
  896. if (replies) {
  897. for (i = 1, il = self.queue.length; i < il; i += 1) {
  898. reply = replies[i - 1];
  899. args = self.queue[i];
  900. // TODO - confusing and error-prone that hgetall is special cased in two places
  901. if (reply && args[0].toLowerCase() === "hgetall") {
  902. replies[i - 1] = reply = reply_to_object(reply);
  903. }
  904. if (typeof args[args.length - 1] === "function") {
  905. args[args.length - 1](null, reply);
  906. }
  907. }
  908. }
  909. if (callback) {
  910. callback(null, replies);
  911. }
  912. });
  913. };
  914. Multi.prototype.EXEC = Multi.prototype.exec;
  915. RedisClient.prototype.multi = function (args) {
  916. return new Multi(this, args);
  917. };
  918. RedisClient.prototype.MULTI = function (args) {
  919. return new Multi(this, args);
  920. };
  921. // stash original eval method
  922. var eval = RedisClient.prototype.eval;
  923. // hook eval with an attempt to evalsha for cached scripts
  924. RedisClient.prototype.eval =
  925. RedisClient.prototype.EVAL = function () {
  926. var self = this,
  927. args = to_array(arguments),
  928. callback;
  929. if (typeof args[args.length - 1] === "function") {
  930. callback = args.pop();
  931. }
  932. // replace script source with sha value
  933. var source = args[0];
  934. args[0] = crypto.createHash("sha1").update(source).digest("hex");
  935. self.evalsha(args, function (err, reply) {
  936. if (err && /NOSCRIPT/.test(err.message)) {
  937. args[0] = source;
  938. eval.call(self, args, callback);
  939. } else if (callback) {
  940. callback(err, reply);
  941. }
  942. });
  943. };
  944. exports.createClient = function (port_arg, host_arg, options) {
  945. var port = port_arg || default_port,
  946. host = host_arg || default_host,
  947. redis_client, net_client;
  948. net_client = net.createConnection(port, host);
  949. redis_client = new RedisClient(net_client, options);
  950. redis_client.port = port;
  951. redis_client.host = host;
  952. return redis_client;
  953. };
  954. exports.print = function (err, reply) {
  955. if (err) {
  956. console.log("Error: " + err);
  957. } else {
  958. console.log("Reply: " + reply);
  959. }
  960. };