uws.js 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136
  1. "use strict";
  2. var __importDefault = (this && this.__importDefault) || function (mod) {
  3. return (mod && mod.__esModule) ? mod : { "default": mod };
  4. };
  5. Object.defineProperty(exports, "__esModule", { value: true });
  6. exports.patchAdapter = patchAdapter;
  7. exports.restoreAdapter = restoreAdapter;
  8. exports.serveFile = serveFile;
  9. const socket_io_adapter_1 = require("socket.io-adapter");
  10. const fs_1 = require("fs");
  11. const debug_1 = __importDefault(require("debug"));
  12. const debug = (0, debug_1.default)("socket.io:adapter-uws");
  13. const SEPARATOR = "\x1f"; // see https://en.wikipedia.org/wiki/Delimiter#ASCII_delimited_text
  14. const { addAll, del, broadcast } = socket_io_adapter_1.Adapter.prototype;
  15. function patchAdapter(app /* : TemplatedApp */) {
  16. socket_io_adapter_1.Adapter.prototype.addAll = function (id, rooms) {
  17. const isNew = !this.sids.has(id);
  18. addAll.call(this, id, rooms);
  19. const socket = this.nsp.sockets.get(id) || this.nsp._preConnectSockets.get(id);
  20. if (!socket) {
  21. return;
  22. }
  23. if (socket.conn.transport.name === "websocket") {
  24. subscribe(this.nsp.name, socket, isNew, rooms);
  25. return;
  26. }
  27. if (isNew) {
  28. socket.conn.on("upgrade", () => {
  29. const rooms = this.sids.get(id);
  30. if (rooms) {
  31. subscribe(this.nsp.name, socket, isNew, rooms);
  32. }
  33. });
  34. }
  35. };
  36. socket_io_adapter_1.Adapter.prototype.del = function (id, room) {
  37. del.call(this, id, room);
  38. const socket = this.nsp.sockets.get(id) || this.nsp._preConnectSockets.get(id);
  39. if (socket && socket.conn.transport.name === "websocket") {
  40. // @ts-ignore
  41. const sessionId = socket.conn.id;
  42. // @ts-ignore
  43. const websocket = socket.conn.transport.socket;
  44. const topic = `${this.nsp.name}${SEPARATOR}${room}`;
  45. debug("unsubscribe connection %s from topic %s", sessionId, topic);
  46. websocket.unsubscribe(topic);
  47. }
  48. };
  49. socket_io_adapter_1.Adapter.prototype.broadcast = function (packet, opts) {
  50. const useFastPublish = opts.rooms.size <= 1 && opts.except.size === 0;
  51. if (!useFastPublish) {
  52. broadcast.call(this, packet, opts);
  53. return;
  54. }
  55. const flags = opts.flags || {};
  56. const basePacketOpts = {
  57. preEncoded: true,
  58. volatile: flags.volatile,
  59. compress: flags.compress,
  60. };
  61. packet.nsp = this.nsp.name;
  62. const encodedPackets = this.encoder.encode(packet);
  63. const topic = opts.rooms.size === 0
  64. ? this.nsp.name
  65. : `${this.nsp.name}${SEPARATOR}${opts.rooms.keys().next().value}`;
  66. debug("fast publish to %s", topic);
  67. // fast publish for clients connected with WebSocket
  68. encodedPackets.forEach((encodedPacket) => {
  69. const isBinary = typeof encodedPacket !== "string";
  70. // "4" being the message type in the Engine.IO protocol, see https://github.com/socketio/engine.io-protocol
  71. app.publish(topic, isBinary ? encodedPacket : "4" + encodedPacket, isBinary);
  72. });
  73. this.apply(opts, (socket) => {
  74. if (socket.conn.transport.name !== "websocket") {
  75. // classic publish for clients connected with HTTP long-polling
  76. socket.client.writeToEngine(encodedPackets, basePacketOpts);
  77. }
  78. });
  79. };
  80. }
  81. function subscribe(namespaceName, socket, isNew, rooms) {
  82. // @ts-ignore
  83. const sessionId = socket.conn.id;
  84. // @ts-ignore
  85. const websocket = socket.conn.transport.socket;
  86. if (isNew) {
  87. debug("subscribe connection %s to topic %s", sessionId, namespaceName);
  88. websocket.subscribe(namespaceName);
  89. }
  90. rooms.forEach((room) => {
  91. const topic = `${namespaceName}${SEPARATOR}${room}`; // '#' can be used as wildcard
  92. debug("subscribe connection %s to topic %s", sessionId, topic);
  93. websocket.subscribe(topic);
  94. });
  95. }
  96. function restoreAdapter() {
  97. socket_io_adapter_1.Adapter.prototype.addAll = addAll;
  98. socket_io_adapter_1.Adapter.prototype.del = del;
  99. socket_io_adapter_1.Adapter.prototype.broadcast = broadcast;
  100. }
  101. const toArrayBuffer = (buffer) => {
  102. const { buffer: arrayBuffer, byteOffset, byteLength } = buffer;
  103. return arrayBuffer.slice(byteOffset, byteOffset + byteLength);
  104. };
  105. // imported from https://github.com/kolodziejczak-sz/uwebsocket-serve
  106. function serveFile(res /* : HttpResponse */, filepath) {
  107. const { size } = (0, fs_1.statSync)(filepath);
  108. const readStream = (0, fs_1.createReadStream)(filepath);
  109. const destroyReadStream = () => !readStream.destroyed && readStream.destroy();
  110. const onError = (error) => {
  111. destroyReadStream();
  112. throw error;
  113. };
  114. const onDataChunk = (chunk) => {
  115. const arrayBufferChunk = toArrayBuffer(chunk);
  116. res.cork(() => {
  117. const lastOffset = res.getWriteOffset();
  118. const [ok, done] = res.tryEnd(arrayBufferChunk, size);
  119. if (!done && !ok) {
  120. readStream.pause();
  121. res.onWritable((offset) => {
  122. const [ok, done] = res.tryEnd(arrayBufferChunk.slice(offset - lastOffset), size);
  123. if (!done && ok) {
  124. readStream.resume();
  125. }
  126. return ok;
  127. });
  128. }
  129. });
  130. };
  131. res.onAborted(destroyReadStream);
  132. readStream
  133. .on("data", onDataChunk)
  134. .on("error", onError)
  135. .on("end", destroyReadStream);
  136. }