worker.js 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186
  1. "use strict";
  2. var __createBinding = (this && this.__createBinding) || (Object.create ? (function(o, m, k, k2) {
  3. if (k2 === undefined) k2 = k;
  4. var desc = Object.getOwnPropertyDescriptor(m, k);
  5. if (!desc || ("get" in desc ? !m.__esModule : desc.writable || desc.configurable)) {
  6. desc = { enumerable: true, get: function() { return m[k]; } };
  7. }
  8. Object.defineProperty(o, k2, desc);
  9. }) : (function(o, m, k, k2) {
  10. if (k2 === undefined) k2 = k;
  11. o[k2] = m[k];
  12. }));
  13. var __setModuleDefault = (this && this.__setModuleDefault) || (Object.create ? (function(o, v) {
  14. Object.defineProperty(o, "default", { enumerable: true, value: v });
  15. }) : function(o, v) {
  16. o["default"] = v;
  17. });
  18. var __importStar = (this && this.__importStar) || function (mod) {
  19. if (mod && mod.__esModule) return mod;
  20. var result = {};
  21. if (mod != null) for (var k in mod) if (k !== "default" && Object.prototype.hasOwnProperty.call(mod, k)) __createBinding(result, mod, k);
  22. __setModuleDefault(result, mod);
  23. return result;
  24. };
  25. Object.defineProperty(exports, "__esModule", { value: true });
  26. const node_worker_threads_1 = require("node:worker_threads");
  27. const node_url_1 = require("node:url");
  28. const symbols_1 = require("./symbols");
  29. const common_1 = require("./common");
  30. common_1.commonState.isWorkerThread = true;
  31. common_1.commonState.workerData = node_worker_threads_1.workerData;
  32. const handlerCache = new Map();
  33. let useAtomics = process.env.PISCINA_DISABLE_ATOMICS !== '1';
  34. // Get `import(x)` as a function that isn't transpiled to `require(x)` by
  35. // TypeScript for dual ESM/CJS support.
  36. // Load this lazily, so that there is no warning about the ESM loader being
  37. // experimental (on Node v12.x) until we actually try to use it.
  38. let importESMCached;
  39. function getImportESM() {
  40. if (importESMCached === undefined) {
  41. // eslint-disable-next-line no-new-func
  42. importESMCached = new Function('specifier', 'return import(specifier)');
  43. }
  44. return importESMCached;
  45. }
  46. // Look up the handler function that we call when a task is posted.
  47. // This is either going to be "the" export from a file, or the default export.
  48. async function getHandler(filename, name) {
  49. let handler = handlerCache.get(`${filename}/${name}`);
  50. if (handler !== undefined) {
  51. return handler;
  52. }
  53. try {
  54. // With our current set of TypeScript options, this is transpiled to
  55. // `require(filename)`.
  56. handler = await Promise.resolve(`${filename}`).then(s => __importStar(require(s)));
  57. if (typeof handler !== 'function') {
  58. handler = await (handler[name]);
  59. }
  60. }
  61. catch { }
  62. if (typeof handler !== 'function') {
  63. handler = await getImportESM()((0, node_url_1.pathToFileURL)(filename).href);
  64. if (typeof handler !== 'function') {
  65. handler = await (handler[name]);
  66. }
  67. }
  68. if (typeof handler !== 'function') {
  69. return null;
  70. }
  71. // Limit the handler cache size. This should not usually be an issue and is
  72. // only provided for pathological cases.
  73. if (handlerCache.size > 1000) {
  74. const [[key]] = handlerCache;
  75. handlerCache.delete(key);
  76. }
  77. handlerCache.set(`${filename}/${name}`, handler);
  78. return handler;
  79. }
  80. // We should only receive this message once, when the Worker starts. It gives
  81. // us the MessagePort used for receiving tasks, a SharedArrayBuffer for fast
  82. // communication using Atomics, and the name of the default filename for tasks
  83. // (so we can pre-load and cache the handler).
  84. node_worker_threads_1.parentPort.on('message', (message) => {
  85. useAtomics = process.env.PISCINA_DISABLE_ATOMICS === '1' ? false : message.useAtomics;
  86. const { port, sharedBuffer, filename, name, niceIncrement } = message;
  87. (async function () {
  88. try {
  89. if (niceIncrement !== 0) {
  90. (await Promise.resolve().then(() => __importStar(require('@napi-rs/nice')))).nice(niceIncrement);
  91. }
  92. }
  93. catch { }
  94. if (filename !== null) {
  95. await getHandler(filename, name);
  96. }
  97. const readyMessage = { [common_1.READY]: true };
  98. node_worker_threads_1.parentPort.postMessage(readyMessage);
  99. port.on('message', onMessage.bind(null, port, sharedBuffer));
  100. atomicsWaitLoop(port, sharedBuffer);
  101. })().catch(throwInNextTick);
  102. });
  103. let currentTasks = 0;
  104. let lastSeenRequestCount = 0;
  105. function atomicsWaitLoop(port, sharedBuffer) {
  106. if (!useAtomics)
  107. return;
  108. // This function is entered either after receiving the startup message, or
  109. // when we are done with a task. In those situations, the *only* thing we
  110. // expect to happen next is a 'message' on `port`.
  111. // That call would come with the overhead of a C++ → JS boundary crossing,
  112. // including async tracking. So, instead, if there is no task currently
  113. // running, we wait for a signal from the parent thread using Atomics.wait(),
  114. // and read the message from the port instead of generating an event,
  115. // in order to avoid that overhead.
  116. // The one catch is that this stops asynchronous operations that are still
  117. // running from proceeding. Generally, tasks should not spawn asynchronous
  118. // operations without waiting for them to finish, though.
  119. while (currentTasks === 0) {
  120. // Check whether there are new messages by testing whether the current
  121. // number of requests posted by the parent thread matches the number of
  122. // requests received.
  123. Atomics.wait(sharedBuffer, symbols_1.kRequestCountField, lastSeenRequestCount);
  124. lastSeenRequestCount = Atomics.load(sharedBuffer, symbols_1.kRequestCountField);
  125. // We have to read messages *after* updating lastSeenRequestCount in order
  126. // to avoid race conditions.
  127. let entry;
  128. while ((entry = (0, node_worker_threads_1.receiveMessageOnPort)(port)) !== undefined) {
  129. onMessage(port, sharedBuffer, entry.message);
  130. }
  131. }
  132. }
  133. function onMessage(port, sharedBuffer, message) {
  134. currentTasks++;
  135. const { taskId, task, filename, name } = message;
  136. (async function () {
  137. let response;
  138. let transferList = [];
  139. try {
  140. const handler = await getHandler(filename, name);
  141. if (handler === null) {
  142. throw new Error(`No handler function exported from ${filename}`);
  143. }
  144. let result = await handler(task);
  145. if ((0, common_1.isMovable)(result)) {
  146. transferList = transferList.concat(result[symbols_1.kTransferable]);
  147. result = result[symbols_1.kValue];
  148. }
  149. response = {
  150. taskId,
  151. result: result,
  152. error: null
  153. };
  154. // If the task used e.g. console.log(), wait for the stream to drain
  155. // before potentially entering the `Atomics.wait()` loop, and before
  156. // returning the result so that messages will always be printed even
  157. // if the process would otherwise be ready to exit.
  158. if (process.stdout.writableLength > 0) {
  159. await new Promise((resolve) => process.stdout.write('', resolve));
  160. }
  161. if (process.stderr.writableLength > 0) {
  162. await new Promise((resolve) => process.stderr.write('', resolve));
  163. }
  164. }
  165. catch (error) {
  166. response = {
  167. taskId,
  168. result: null,
  169. // It may be worth taking a look at the error cloning algorithm we
  170. // use in Node.js core here, it's quite a bit more flexible
  171. error: error
  172. };
  173. }
  174. currentTasks--;
  175. // Post the response to the parent thread, and let it know that we have
  176. // an additional message available. If possible, use Atomics.wait()
  177. // to wait for the next message.
  178. port.postMessage(response, transferList);
  179. Atomics.add(sharedBuffer, symbols_1.kResponseCountField, 1);
  180. atomicsWaitLoop(port, sharedBuffer);
  181. })().catch(throwInNextTick);
  182. }
  183. function throwInNextTick(error) {
  184. process.nextTick(() => { throw error; });
  185. }
  186. //# sourceMappingURL=worker.js.map