broadcast-operator.js 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436
  1. "use strict";
  2. Object.defineProperty(exports, "__esModule", { value: true });
  3. exports.RemoteSocket = exports.BroadcastOperator = void 0;
  4. const socket_types_1 = require("./socket-types");
  5. const socket_io_parser_1 = require("socket.io-parser");
  6. class BroadcastOperator {
  7. constructor(adapter, rooms = new Set(), exceptRooms = new Set(), flags = {}) {
  8. this.adapter = adapter;
  9. this.rooms = rooms;
  10. this.exceptRooms = exceptRooms;
  11. this.flags = flags;
  12. }
  13. /**
  14. * Targets a room when emitting.
  15. *
  16. * @example
  17. * // the “foo” event will be broadcast to all connected clients in the “room-101” room
  18. * io.to("room-101").emit("foo", "bar");
  19. *
  20. * // with an array of rooms (a client will be notified at most once)
  21. * io.to(["room-101", "room-102"]).emit("foo", "bar");
  22. *
  23. * // with multiple chained calls
  24. * io.to("room-101").to("room-102").emit("foo", "bar");
  25. *
  26. * @param room - a room, or an array of rooms
  27. * @return a new {@link BroadcastOperator} instance for chaining
  28. */
  29. to(room) {
  30. const rooms = new Set(this.rooms);
  31. if (Array.isArray(room)) {
  32. room.forEach((r) => rooms.add(r));
  33. }
  34. else {
  35. rooms.add(room);
  36. }
  37. return new BroadcastOperator(this.adapter, rooms, this.exceptRooms, this.flags);
  38. }
  39. /**
  40. * Targets a room when emitting. Similar to `to()`, but might feel clearer in some cases:
  41. *
  42. * @example
  43. * // disconnect all clients in the "room-101" room
  44. * io.in("room-101").disconnectSockets();
  45. *
  46. * @param room - a room, or an array of rooms
  47. * @return a new {@link BroadcastOperator} instance for chaining
  48. */
  49. in(room) {
  50. return this.to(room);
  51. }
  52. /**
  53. * Excludes a room when emitting.
  54. *
  55. * @example
  56. * // the "foo" event will be broadcast to all connected clients, except the ones that are in the "room-101" room
  57. * io.except("room-101").emit("foo", "bar");
  58. *
  59. * // with an array of rooms
  60. * io.except(["room-101", "room-102"]).emit("foo", "bar");
  61. *
  62. * // with multiple chained calls
  63. * io.except("room-101").except("room-102").emit("foo", "bar");
  64. *
  65. * @param room - a room, or an array of rooms
  66. * @return a new {@link BroadcastOperator} instance for chaining
  67. */
  68. except(room) {
  69. const exceptRooms = new Set(this.exceptRooms);
  70. if (Array.isArray(room)) {
  71. room.forEach((r) => exceptRooms.add(r));
  72. }
  73. else {
  74. exceptRooms.add(room);
  75. }
  76. return new BroadcastOperator(this.adapter, this.rooms, exceptRooms, this.flags);
  77. }
  78. /**
  79. * Sets the compress flag.
  80. *
  81. * @example
  82. * io.compress(false).emit("hello");
  83. *
  84. * @param compress - if `true`, compresses the sending data
  85. * @return a new BroadcastOperator instance
  86. */
  87. compress(compress) {
  88. const flags = Object.assign({}, this.flags, { compress });
  89. return new BroadcastOperator(this.adapter, this.rooms, this.exceptRooms, flags);
  90. }
  91. /**
  92. * Sets a modifier for a subsequent event emission that the event data may be lost if the client is not ready to
  93. * receive messages (because of network slowness or other issues, or because they’re connected through long polling
  94. * and is in the middle of a request-response cycle).
  95. *
  96. * @example
  97. * io.volatile.emit("hello"); // the clients may or may not receive it
  98. *
  99. * @return a new BroadcastOperator instance
  100. */
  101. get volatile() {
  102. const flags = Object.assign({}, this.flags, { volatile: true });
  103. return new BroadcastOperator(this.adapter, this.rooms, this.exceptRooms, flags);
  104. }
  105. /**
  106. * Sets a modifier for a subsequent event emission that the event data will only be broadcast to the current node.
  107. *
  108. * @example
  109. * // the “foo” event will be broadcast to all connected clients on this node
  110. * io.local.emit("foo", "bar");
  111. *
  112. * @return a new {@link BroadcastOperator} instance for chaining
  113. */
  114. get local() {
  115. const flags = Object.assign({}, this.flags, { local: true });
  116. return new BroadcastOperator(this.adapter, this.rooms, this.exceptRooms, flags);
  117. }
  118. /**
  119. * Adds a timeout in milliseconds for the next operation
  120. *
  121. * @example
  122. * io.timeout(1000).emit("some-event", (err, responses) => {
  123. * if (err) {
  124. * // some clients did not acknowledge the event in the given delay
  125. * } else {
  126. * console.log(responses); // one response per client
  127. * }
  128. * });
  129. *
  130. * @param timeout
  131. */
  132. timeout(timeout) {
  133. const flags = Object.assign({}, this.flags, { timeout });
  134. return new BroadcastOperator(this.adapter, this.rooms, this.exceptRooms, flags);
  135. }
  136. /**
  137. * Emits to all clients.
  138. *
  139. * @example
  140. * // the “foo” event will be broadcast to all connected clients
  141. * io.emit("foo", "bar");
  142. *
  143. * // the “foo” event will be broadcast to all connected clients in the “room-101” room
  144. * io.to("room-101").emit("foo", "bar");
  145. *
  146. * // with an acknowledgement expected from all connected clients
  147. * io.timeout(1000).emit("some-event", (err, responses) => {
  148. * if (err) {
  149. * // some clients did not acknowledge the event in the given delay
  150. * } else {
  151. * console.log(responses); // one response per client
  152. * }
  153. * });
  154. *
  155. * @return Always true
  156. */
  157. emit(ev, ...args) {
  158. if (socket_types_1.RESERVED_EVENTS.has(ev)) {
  159. throw new Error(`"${String(ev)}" is a reserved event name`);
  160. }
  161. // set up packet object
  162. const data = [ev, ...args];
  163. const packet = {
  164. type: socket_io_parser_1.PacketType.EVENT,
  165. data: data,
  166. };
  167. const withAck = typeof data[data.length - 1] === "function";
  168. if (!withAck) {
  169. this.adapter.broadcast(packet, {
  170. rooms: this.rooms,
  171. except: this.exceptRooms,
  172. flags: this.flags,
  173. });
  174. return true;
  175. }
  176. const ack = data.pop();
  177. let timedOut = false;
  178. let responses = [];
  179. const timer = setTimeout(() => {
  180. timedOut = true;
  181. ack.apply(this, [
  182. new Error("operation has timed out"),
  183. this.flags.expectSingleResponse ? null : responses,
  184. ]);
  185. }, this.flags.timeout);
  186. let expectedServerCount = -1;
  187. let actualServerCount = 0;
  188. let expectedClientCount = 0;
  189. const checkCompleteness = () => {
  190. if (!timedOut &&
  191. expectedServerCount === actualServerCount &&
  192. responses.length === expectedClientCount) {
  193. clearTimeout(timer);
  194. ack.apply(this, [
  195. null,
  196. this.flags.expectSingleResponse ? responses[0] : responses,
  197. ]);
  198. }
  199. };
  200. this.adapter.broadcastWithAck(packet, {
  201. rooms: this.rooms,
  202. except: this.exceptRooms,
  203. flags: this.flags,
  204. }, (clientCount) => {
  205. // each Socket.IO server in the cluster sends the number of clients that were notified
  206. expectedClientCount += clientCount;
  207. actualServerCount++;
  208. checkCompleteness();
  209. }, (clientResponse) => {
  210. // each client sends an acknowledgement
  211. responses.push(clientResponse);
  212. checkCompleteness();
  213. });
  214. this.adapter.serverCount().then((serverCount) => {
  215. expectedServerCount = serverCount;
  216. checkCompleteness();
  217. });
  218. return true;
  219. }
  220. /**
  221. * Emits an event and waits for an acknowledgement from all clients.
  222. *
  223. * @example
  224. * try {
  225. * const responses = await io.timeout(1000).emitWithAck("some-event");
  226. * console.log(responses); // one response per client
  227. * } catch (e) {
  228. * // some clients did not acknowledge the event in the given delay
  229. * }
  230. *
  231. * @return a Promise that will be fulfilled when all clients have acknowledged the event
  232. */
  233. emitWithAck(ev, ...args) {
  234. return new Promise((resolve, reject) => {
  235. args.push((err, responses) => {
  236. if (err) {
  237. err.responses = responses;
  238. return reject(err);
  239. }
  240. else {
  241. return resolve(responses);
  242. }
  243. });
  244. this.emit(ev, ...args);
  245. });
  246. }
  247. /**
  248. * Gets a list of clients.
  249. *
  250. * @deprecated this method will be removed in the next major release, please use {@link Server#serverSideEmit} or
  251. * {@link fetchSockets} instead.
  252. */
  253. allSockets() {
  254. if (!this.adapter) {
  255. throw new Error("No adapter for this namespace, are you trying to get the list of clients of a dynamic namespace?");
  256. }
  257. return this.adapter.sockets(this.rooms);
  258. }
  259. /**
  260. * Returns the matching socket instances. This method works across a cluster of several Socket.IO servers.
  261. *
  262. * Note: this method also works within a cluster of multiple Socket.IO servers, with a compatible {@link Adapter}.
  263. *
  264. * @example
  265. * // return all Socket instances
  266. * const sockets = await io.fetchSockets();
  267. *
  268. * // return all Socket instances in the "room1" room
  269. * const sockets = await io.in("room1").fetchSockets();
  270. *
  271. * for (const socket of sockets) {
  272. * console.log(socket.id);
  273. * console.log(socket.handshake);
  274. * console.log(socket.rooms);
  275. * console.log(socket.data);
  276. *
  277. * socket.emit("hello");
  278. * socket.join("room1");
  279. * socket.leave("room2");
  280. * socket.disconnect();
  281. * }
  282. */
  283. fetchSockets() {
  284. return this.adapter
  285. .fetchSockets({
  286. rooms: this.rooms,
  287. except: this.exceptRooms,
  288. flags: this.flags,
  289. })
  290. .then((sockets) => {
  291. return sockets.map((socket) => {
  292. if (socket.server) {
  293. return socket; // local instance
  294. }
  295. else {
  296. return new RemoteSocket(this.adapter, socket);
  297. }
  298. });
  299. });
  300. }
  301. /**
  302. * Makes the matching socket instances join the specified rooms.
  303. *
  304. * Note: this method also works within a cluster of multiple Socket.IO servers, with a compatible {@link Adapter}.
  305. *
  306. * @example
  307. *
  308. * // make all socket instances join the "room1" room
  309. * io.socketsJoin("room1");
  310. *
  311. * // make all socket instances in the "room1" room join the "room2" and "room3" rooms
  312. * io.in("room1").socketsJoin(["room2", "room3"]);
  313. *
  314. * @param room - a room, or an array of rooms
  315. */
  316. socketsJoin(room) {
  317. this.adapter.addSockets({
  318. rooms: this.rooms,
  319. except: this.exceptRooms,
  320. flags: this.flags,
  321. }, Array.isArray(room) ? room : [room]);
  322. }
  323. /**
  324. * Makes the matching socket instances leave the specified rooms.
  325. *
  326. * Note: this method also works within a cluster of multiple Socket.IO servers, with a compatible {@link Adapter}.
  327. *
  328. * @example
  329. * // make all socket instances leave the "room1" room
  330. * io.socketsLeave("room1");
  331. *
  332. * // make all socket instances in the "room1" room leave the "room2" and "room3" rooms
  333. * io.in("room1").socketsLeave(["room2", "room3"]);
  334. *
  335. * @param room - a room, or an array of rooms
  336. */
  337. socketsLeave(room) {
  338. this.adapter.delSockets({
  339. rooms: this.rooms,
  340. except: this.exceptRooms,
  341. flags: this.flags,
  342. }, Array.isArray(room) ? room : [room]);
  343. }
  344. /**
  345. * Makes the matching socket instances disconnect.
  346. *
  347. * Note: this method also works within a cluster of multiple Socket.IO servers, with a compatible {@link Adapter}.
  348. *
  349. * @example
  350. * // make all socket instances disconnect (the connections might be kept alive for other namespaces)
  351. * io.disconnectSockets();
  352. *
  353. * // make all socket instances in the "room1" room disconnect and close the underlying connections
  354. * io.in("room1").disconnectSockets(true);
  355. *
  356. * @param close - whether to close the underlying connection
  357. */
  358. disconnectSockets(close = false) {
  359. this.adapter.disconnectSockets({
  360. rooms: this.rooms,
  361. except: this.exceptRooms,
  362. flags: this.flags,
  363. }, close);
  364. }
  365. }
  366. exports.BroadcastOperator = BroadcastOperator;
  367. /**
  368. * Expose of subset of the attributes and methods of the Socket class
  369. */
  370. class RemoteSocket {
  371. constructor(adapter, details) {
  372. this.id = details.id;
  373. this.handshake = details.handshake;
  374. this.rooms = new Set(details.rooms);
  375. this.data = details.data;
  376. this.operator = new BroadcastOperator(adapter, new Set([this.id]), new Set(), {
  377. expectSingleResponse: true, // so that remoteSocket.emit() with acknowledgement behaves like socket.emit()
  378. });
  379. }
  380. /**
  381. * Adds a timeout in milliseconds for the next operation.
  382. *
  383. * @example
  384. * const sockets = await io.fetchSockets();
  385. *
  386. * for (const socket of sockets) {
  387. * if (someCondition) {
  388. * socket.timeout(1000).emit("some-event", (err) => {
  389. * if (err) {
  390. * // the client did not acknowledge the event in the given delay
  391. * }
  392. * });
  393. * }
  394. * }
  395. *
  396. * // note: if possible, using a room instead of looping over all sockets is preferable
  397. * io.timeout(1000).to(someConditionRoom).emit("some-event", (err, responses) => {
  398. * // ...
  399. * });
  400. *
  401. * @param timeout
  402. */
  403. timeout(timeout) {
  404. return this.operator.timeout(timeout);
  405. }
  406. emit(ev, ...args) {
  407. return this.operator.emit(ev, ...args);
  408. }
  409. /**
  410. * Joins a room.
  411. *
  412. * @param {String|Array} room - room or array of rooms
  413. */
  414. join(room) {
  415. return this.operator.socketsJoin(room);
  416. }
  417. /**
  418. * Leaves a room.
  419. *
  420. * @param {String} room
  421. */
  422. leave(room) {
  423. return this.operator.socketsLeave(room);
  424. }
  425. /**
  426. * Disconnects this client.
  427. *
  428. * @param {Boolean} close - if `true`, closes the underlying connection
  429. * @return {Socket} self
  430. */
  431. disconnect(close = false) {
  432. this.operator.disconnectSockets(close);
  433. return this;
  434. }
  435. }
  436. exports.RemoteSocket = RemoteSocket;