index.js 6.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190
  1. "use strict";
  2. var __importDefault = (this && this.__importDefault) || function (mod) {
  3. return (mod && mod.__esModule) ? mod : { "default": mod };
  4. };
  5. Object.defineProperty(exports, "__esModule", { value: true });
  6. exports.WorkerInfo = exports.AsynchronouslyCreatedResourcePool = void 0;
  7. const node_worker_threads_1 = require("node:worker_threads");
  8. const node_assert_1 = __importDefault(require("node:assert"));
  9. const errors_1 = require("../errors");
  10. const symbols_1 = require("../symbols");
  11. class AsynchronouslyCreatedResource {
  12. constructor() {
  13. this.onreadyListeners = [];
  14. }
  15. markAsReady() {
  16. const listeners = this.onreadyListeners;
  17. (0, node_assert_1.default)(listeners !== null);
  18. this.onreadyListeners = null;
  19. for (const listener of listeners) {
  20. listener();
  21. }
  22. }
  23. isReady() {
  24. return this.onreadyListeners === null;
  25. }
  26. onReady(fn) {
  27. if (this.onreadyListeners === null) {
  28. fn(); // Zalgo is okay here.
  29. return;
  30. }
  31. this.onreadyListeners.push(fn);
  32. }
  33. }
  34. class AsynchronouslyCreatedResourcePool {
  35. constructor(maximumUsage) {
  36. this.pendingItems = new Set();
  37. this.readyItems = new Set();
  38. this.maximumUsage = maximumUsage;
  39. this.onAvailableListeners = [];
  40. }
  41. add(item) {
  42. this.pendingItems.add(item);
  43. item.onReady(() => {
  44. /* istanbul ignore else */
  45. if (this.pendingItems.has(item)) {
  46. this.pendingItems.delete(item);
  47. this.readyItems.add(item);
  48. this.maybeAvailable(item);
  49. }
  50. });
  51. }
  52. delete(item) {
  53. this.pendingItems.delete(item);
  54. this.readyItems.delete(item);
  55. }
  56. findAvailable() {
  57. let minUsage = this.maximumUsage;
  58. let candidate = null;
  59. for (const item of this.readyItems) {
  60. const usage = item.currentUsage();
  61. if (usage === 0)
  62. return item;
  63. if (usage < minUsage) {
  64. candidate = item;
  65. minUsage = usage;
  66. }
  67. }
  68. return candidate;
  69. }
  70. *[Symbol.iterator]() {
  71. yield* this.pendingItems;
  72. yield* this.readyItems;
  73. }
  74. get size() {
  75. return this.pendingItems.size + this.readyItems.size;
  76. }
  77. maybeAvailable(item) {
  78. /* istanbul ignore else */
  79. if (item.currentUsage() < this.maximumUsage) {
  80. for (const listener of this.onAvailableListeners) {
  81. listener(item);
  82. }
  83. }
  84. }
  85. onAvailable(fn) {
  86. this.onAvailableListeners.push(fn);
  87. }
  88. }
  89. exports.AsynchronouslyCreatedResourcePool = AsynchronouslyCreatedResourcePool;
  90. class WorkerInfo extends AsynchronouslyCreatedResource {
  91. constructor(worker, port, onMessage) {
  92. super();
  93. this.idleTimeout = null; // eslint-disable-line no-undef
  94. this.lastSeenResponseCount = 0;
  95. this.worker = worker;
  96. this.port = port;
  97. this.port.on('message', (message) => this._handleResponse(message));
  98. this.onMessage = onMessage;
  99. this.taskInfos = new Map();
  100. this.sharedBuffer = new Int32Array(new SharedArrayBuffer(symbols_1.kFieldCount * Int32Array.BYTES_PER_ELEMENT));
  101. }
  102. destroy() {
  103. this.worker.terminate();
  104. this.port.close();
  105. this.clearIdleTimeout();
  106. for (const taskInfo of this.taskInfos.values()) {
  107. taskInfo.done(errors_1.Errors.ThreadTermination());
  108. }
  109. this.taskInfos.clear();
  110. }
  111. clearIdleTimeout() {
  112. if (this.idleTimeout !== null) {
  113. clearTimeout(this.idleTimeout);
  114. this.idleTimeout = null;
  115. }
  116. }
  117. ref() {
  118. this.port.ref();
  119. return this;
  120. }
  121. unref() {
  122. // Note: Do not call ref()/unref() on the Worker itself since that may cause
  123. // a hard crash, see https://github.com/nodejs/node/pull/33394.
  124. this.port.unref();
  125. return this;
  126. }
  127. _handleResponse(message) {
  128. this.onMessage(message);
  129. if (this.taskInfos.size === 0) {
  130. // No more tasks running on this Worker means it should not keep the
  131. // process running.
  132. this.unref();
  133. }
  134. }
  135. postTask(taskInfo) {
  136. (0, node_assert_1.default)(!this.taskInfos.has(taskInfo.taskId));
  137. const message = {
  138. task: taskInfo.releaseTask(),
  139. taskId: taskInfo.taskId,
  140. filename: taskInfo.filename,
  141. name: taskInfo.name
  142. };
  143. try {
  144. this.port.postMessage(message, taskInfo.transferList);
  145. }
  146. catch (err) {
  147. // This would mostly happen if e.g. message contains unserializable data
  148. // or transferList is invalid.
  149. taskInfo.done(err);
  150. return;
  151. }
  152. taskInfo.workerInfo = this;
  153. this.taskInfos.set(taskInfo.taskId, taskInfo);
  154. this.ref();
  155. this.clearIdleTimeout();
  156. // Inform the worker that there are new messages posted, and wake it up
  157. // if it is waiting for one.
  158. Atomics.add(this.sharedBuffer, symbols_1.kRequestCountField, 1);
  159. Atomics.notify(this.sharedBuffer, symbols_1.kRequestCountField, 1);
  160. }
  161. processPendingMessages() {
  162. // If we *know* that there are more messages than we have received using
  163. // 'message' events yet, then try to load and handle them synchronously,
  164. // without the need to wait for more expensive events on the event loop.
  165. // This would usually break async tracking, but in our case, we already have
  166. // the extra TaskInfo/AsyncResource layer that rectifies that situation.
  167. const actualResponseCount = Atomics.load(this.sharedBuffer, symbols_1.kResponseCountField);
  168. if (actualResponseCount !== this.lastSeenResponseCount) {
  169. this.lastSeenResponseCount = actualResponseCount;
  170. let entry;
  171. while ((entry = (0, node_worker_threads_1.receiveMessageOnPort)(this.port)) !== undefined) {
  172. this._handleResponse(entry.message);
  173. }
  174. }
  175. }
  176. isRunningAbortableTask() {
  177. // If there are abortable tasks, we are running one at most per Worker.
  178. if (this.taskInfos.size !== 1)
  179. return false;
  180. const [[, task]] = this.taskInfos;
  181. return task.abortSignal !== null;
  182. }
  183. currentUsage() {
  184. if (this.isRunningAbortableTask())
  185. return Infinity;
  186. return this.taskInfos.size;
  187. }
  188. }
  189. exports.WorkerInfo = WorkerInfo;
  190. //# sourceMappingURL=index.js.map