transport.js 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534
  1. /*!
  2. * socket.io-node
  3. * Copyright(c) 2011 LearnBoost <[email protected]>
  4. * MIT Licensed
  5. */
  6. /**
  7. * Module dependencies.
  8. */
  9. var parser = require('./parser');
  10. /**
  11. * Expose the constructor.
  12. */
  13. exports = module.exports = Transport;
  14. /**
  15. * Transport constructor.
  16. *
  17. * @api public
  18. */
  19. function Transport (mng, data, req) {
  20. this.manager = mng;
  21. this.id = data.id;
  22. this.disconnected = false;
  23. this.drained = true;
  24. this.handleRequest(req);
  25. };
  26. /**
  27. * Access the logger.
  28. *
  29. * @api public
  30. */
  31. Transport.prototype.__defineGetter__('log', function () {
  32. return this.manager.log;
  33. });
  34. /**
  35. * Access the store.
  36. *
  37. * @api public
  38. */
  39. Transport.prototype.__defineGetter__('store', function () {
  40. return this.manager.store;
  41. });
  42. /**
  43. * Handles a request when it's set.
  44. *
  45. * @api private
  46. */
  47. Transport.prototype.handleRequest = function (req) {
  48. this.log.debug('setting request', req.method, req.url);
  49. this.req = req;
  50. if (req.method == 'GET') {
  51. this.socket = req.socket;
  52. this.open = true;
  53. this.drained = true;
  54. this.setHeartbeatInterval();
  55. this.setHandlers();
  56. this.onSocketConnect();
  57. }
  58. };
  59. /**
  60. * Called when a connection is first set.
  61. *
  62. * @api private
  63. */
  64. Transport.prototype.onSocketConnect = function () { };
  65. /**
  66. * Sets transport handlers
  67. *
  68. * @api private
  69. */
  70. Transport.prototype.setHandlers = function () {
  71. var self = this;
  72. // we need to do this in a pub/sub way since the client can POST the message
  73. // over a different socket (ie: different Transport instance)
  74. this.store.subscribe('heartbeat-clear:' + this.id, function () {
  75. self.onHeartbeatClear();
  76. });
  77. this.store.subscribe('disconnect-force:' + this.id, function () {
  78. self.onForcedDisconnect();
  79. });
  80. this.store.subscribe('dispatch:' + this.id, function (packet, volatile) {
  81. self.onDispatch(packet, volatile);
  82. });
  83. this.bound = {
  84. end: this.onSocketEnd.bind(this)
  85. , close: this.onSocketClose.bind(this)
  86. , error: this.onSocketError.bind(this)
  87. , drain: this.onSocketDrain.bind(this)
  88. };
  89. this.socket.on('end', this.bound.end);
  90. this.socket.on('close', this.bound.close);
  91. this.socket.on('error', this.bound.error);
  92. this.socket.on('drain', this.bound.drain);
  93. this.handlersSet = true;
  94. };
  95. /**
  96. * Removes transport handlers
  97. *
  98. * @api private
  99. */
  100. Transport.prototype.clearHandlers = function () {
  101. if (this.handlersSet) {
  102. this.store.unsubscribe('disconnect-force:' + this.id);
  103. this.store.unsubscribe('heartbeat-clear:' + this.id);
  104. this.store.unsubscribe('dispatch:' + this.id);
  105. this.socket.removeListener('end', this.bound.end);
  106. this.socket.removeListener('close', this.bound.close);
  107. this.socket.removeListener('error', this.bound.error);
  108. this.socket.removeListener('drain', this.bound.drain);
  109. }
  110. };
  111. /**
  112. * Called when the connection dies
  113. *
  114. * @api private
  115. */
  116. Transport.prototype.onSocketEnd = function () {
  117. this.end('socket end');
  118. };
  119. /**
  120. * Called when the connection dies
  121. *
  122. * @api private
  123. */
  124. Transport.prototype.onSocketClose = function (error) {
  125. this.end(error ? 'socket error' : 'socket close');
  126. };
  127. /**
  128. * Called when the connection has an error.
  129. *
  130. * @api private
  131. */
  132. Transport.prototype.onSocketError = function (err) {
  133. if (this.open) {
  134. this.socket.destroy();
  135. this.onClose();
  136. }
  137. this.log.info('socket error ' + err.stack);
  138. };
  139. /**
  140. * Called when the connection is drained.
  141. *
  142. * @api private
  143. */
  144. Transport.prototype.onSocketDrain = function () {
  145. this.drained = true;
  146. };
  147. /**
  148. * Called upon receiving a heartbeat packet.
  149. *
  150. * @api private
  151. */
  152. Transport.prototype.onHeartbeatClear = function () {
  153. this.clearHeartbeatTimeout();
  154. this.setHeartbeatInterval();
  155. };
  156. /**
  157. * Called upon a forced disconnection.
  158. *
  159. * @api private
  160. */
  161. Transport.prototype.onForcedDisconnect = function () {
  162. if (!this.disconnected) {
  163. this.log.info('transport end by forced client disconnection');
  164. if (this.open) {
  165. this.packet({ type: 'disconnect' });
  166. }
  167. this.end('booted');
  168. }
  169. };
  170. /**
  171. * Dispatches a packet.
  172. *
  173. * @api private
  174. */
  175. Transport.prototype.onDispatch = function (packet, volatile) {
  176. if (volatile) {
  177. this.writeVolatile(packet);
  178. } else {
  179. this.write(packet);
  180. }
  181. };
  182. /**
  183. * Sets the close timeout.
  184. */
  185. Transport.prototype.setCloseTimeout = function () {
  186. if (!this.closeTimeout) {
  187. var self = this;
  188. this.closeTimeout = setTimeout(function () {
  189. self.log.debug('fired close timeout for client', self.id);
  190. self.closeTimeout = null;
  191. self.end('close timeout');
  192. }, this.manager.get('close timeout') * 1000);
  193. this.log.debug('set close timeout for client', this.id);
  194. }
  195. };
  196. /**
  197. * Clears the close timeout.
  198. */
  199. Transport.prototype.clearCloseTimeout = function () {
  200. if (this.closeTimeout) {
  201. clearTimeout(this.closeTimeout);
  202. this.closeTimeout = null;
  203. this.log.debug('cleared close timeout for client', this.id);
  204. }
  205. };
  206. /**
  207. * Sets the heartbeat timeout
  208. */
  209. Transport.prototype.setHeartbeatTimeout = function () {
  210. if (!this.heartbeatTimeout && this.manager.enabled('heartbeats')) {
  211. var self = this;
  212. this.heartbeatTimeout = setTimeout(function () {
  213. self.log.debug('fired heartbeat timeout for client', self.id);
  214. self.heartbeatTimeout = null;
  215. self.end('heartbeat timeout');
  216. }, this.manager.get('heartbeat timeout') * 1000);
  217. this.log.debug('set heartbeat timeout for client', this.id);
  218. }
  219. };
  220. /**
  221. * Clears the heartbeat timeout
  222. *
  223. * @param text
  224. */
  225. Transport.prototype.clearHeartbeatTimeout = function () {
  226. if (this.heartbeatTimeout && this.manager.enabled('heartbeats')) {
  227. clearTimeout(this.heartbeatTimeout);
  228. this.heartbeatTimeout = null;
  229. this.log.debug('cleared heartbeat timeout for client', this.id);
  230. }
  231. };
  232. /**
  233. * Sets the heartbeat interval. To be called when a connection opens and when
  234. * a heartbeat is received.
  235. *
  236. * @api private
  237. */
  238. Transport.prototype.setHeartbeatInterval = function () {
  239. if (!this.heartbeatInterval && this.manager.enabled('heartbeats')) {
  240. var self = this;
  241. this.heartbeatInterval = setTimeout(function () {
  242. self.heartbeat();
  243. self.heartbeatInterval = null;
  244. }, this.manager.get('heartbeat interval') * 1000);
  245. this.log.debug('set heartbeat interval for client', this.id);
  246. }
  247. };
  248. /**
  249. * Clears all timeouts.
  250. *
  251. * @api private
  252. */
  253. Transport.prototype.clearTimeouts = function () {
  254. this.clearCloseTimeout();
  255. this.clearHeartbeatTimeout();
  256. this.clearHeartbeatInterval();
  257. };
  258. /**
  259. * Sends a heartbeat
  260. *
  261. * @api private
  262. */
  263. Transport.prototype.heartbeat = function () {
  264. if (this.open) {
  265. this.log.debug('emitting heartbeat for client', this.id);
  266. this.packet({ type: 'heartbeat' });
  267. this.setHeartbeatTimeout();
  268. }
  269. return this;
  270. };
  271. /**
  272. * Handles a message.
  273. *
  274. * @param {Object} packet object
  275. * @api private
  276. */
  277. Transport.prototype.onMessage = function (packet) {
  278. var current = this.manager.transports[this.id];
  279. if ('heartbeat' == packet.type) {
  280. this.log.debug('got heartbeat packet');
  281. if (current && current.open) {
  282. current.onHeartbeatClear();
  283. } else {
  284. this.store.publish('heartbeat-clear:' + this.id);
  285. }
  286. } else {
  287. if ('disconnect' == packet.type && packet.endpoint == '') {
  288. this.log.debug('got disconnection packet');
  289. if (current) {
  290. current.onForcedDisconnect();
  291. } else {
  292. this.store.publish('disconnect-force:' + this.id);
  293. }
  294. return;
  295. }
  296. if (packet.id && packet.ack != 'data') {
  297. this.log.debug('acknowledging packet automatically');
  298. var ack = parser.encodePacket({
  299. type: 'ack'
  300. , ackId: packet.id
  301. , endpoint: packet.endpoint || ''
  302. });
  303. if (current && current.open) {
  304. current.onDispatch(ack);
  305. } else {
  306. this.manager.onClientDispatch(this.id, ack);
  307. this.store.publish('dispatch:' + this.id, ack);
  308. }
  309. }
  310. // handle packet locally or publish it
  311. if (current) {
  312. this.manager.onClientMessage(this.id, packet);
  313. } else {
  314. this.store.publish('message:' + this.id, packet);
  315. }
  316. }
  317. };
  318. /**
  319. * Clears the heartbeat interval
  320. *
  321. * @api private
  322. */
  323. Transport.prototype.clearHeartbeatInterval = function () {
  324. if (this.heartbeatInterval && this.manager.enabled('heartbeats')) {
  325. clearTimeout(this.heartbeatInterval);
  326. this.heartbeatInterval = null;
  327. this.log.debug('cleared heartbeat interval for client', this.id);
  328. }
  329. };
  330. /**
  331. * Finishes the connection and makes sure client doesn't reopen
  332. *
  333. * @api private
  334. */
  335. Transport.prototype.disconnect = function (reason) {
  336. this.packet({ type: 'disconnect' });
  337. this.end(reason);
  338. return this;
  339. };
  340. /**
  341. * Closes the connection.
  342. *
  343. * @api private
  344. */
  345. Transport.prototype.close = function () {
  346. if (this.open) {
  347. this.doClose();
  348. this.onClose();
  349. }
  350. };
  351. /**
  352. * Called upon a connection close.
  353. *
  354. * @api private
  355. */
  356. Transport.prototype.onClose = function () {
  357. if (this.open) {
  358. this.setCloseTimeout();
  359. this.clearHandlers();
  360. this.open = false;
  361. this.manager.onClose(this.id);
  362. this.store.publish('close', this.id);
  363. }
  364. };
  365. /**
  366. * Cleans up the connection, considers the client disconnected.
  367. *
  368. * @api private
  369. */
  370. Transport.prototype.end = function (reason) {
  371. if (!this.disconnected) {
  372. this.log.info('transport end (' + reason + ')');
  373. var local = this.manager.transports[this.id];
  374. this.close();
  375. this.clearTimeouts();
  376. this.disconnected = true;
  377. if (local) {
  378. this.manager.onClientDisconnect(this.id, reason, true);
  379. } else {
  380. this.store.publish('disconnect:' + this.id, reason);
  381. }
  382. }
  383. };
  384. /**
  385. * Signals that the transport should pause and buffer data.
  386. *
  387. * @api public
  388. */
  389. Transport.prototype.discard = function () {
  390. this.log.debug('discarding transport');
  391. this.discarded = true;
  392. this.clearTimeouts();
  393. this.clearHandlers();
  394. return this;
  395. };
  396. /**
  397. * Writes an error packet with the specified reason and advice.
  398. *
  399. * @param {Number} advice
  400. * @param {Number} reason
  401. * @api public
  402. */
  403. Transport.prototype.error = function (reason, advice) {
  404. this.packet({
  405. type: 'error'
  406. , reason: reason
  407. , advice: advice
  408. });
  409. this.log.warn(reason, advice ? ('client should ' + advice) : '');
  410. this.end('error');
  411. };
  412. /**
  413. * Write a packet.
  414. *
  415. * @api public
  416. */
  417. Transport.prototype.packet = function (obj) {
  418. return this.write(parser.encodePacket(obj));
  419. };
  420. /**
  421. * Writes a volatile message.
  422. *
  423. * @api private
  424. */
  425. Transport.prototype.writeVolatile = function (msg) {
  426. if (this.open) {
  427. if (this.drained) {
  428. this.write(msg);
  429. } else {
  430. this.log.debug('ignoring volatile packet, buffer not drained');
  431. }
  432. } else {
  433. this.log.debug('ignoring volatile packet, transport not open');
  434. }
  435. };