abort-task.ts 6.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233
  1. import { EventEmitter } from 'events';
  2. import Piscina from '..';
  3. import { test } from 'tap';
  4. import { resolve } from 'path';
  5. test('tasks can be aborted through AbortController while running', async ({ equal, rejects }) => {
  6. const pool = new Piscina({
  7. filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
  8. });
  9. const buf = new Int32Array(new SharedArrayBuffer(4));
  10. const abortController = new AbortController();
  11. rejects(pool.runTask(buf, abortController.signal),
  12. /The task has been aborted/);
  13. Atomics.wait(buf, 0, 0);
  14. equal(Atomics.load(buf, 0), 1);
  15. abortController.abort();
  16. });
  17. test('tasks can be aborted through EventEmitter while running', async ({ equal, rejects }) => {
  18. const pool = new Piscina({
  19. filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
  20. });
  21. const buf = new Int32Array(new SharedArrayBuffer(4));
  22. const ee = new EventEmitter();
  23. rejects(pool.runTask(buf, ee), /The task has been aborted/);
  24. rejects(pool.run(buf, { signal: ee }), /The task has been aborted/);
  25. Atomics.wait(buf, 0, 0);
  26. equal(Atomics.load(buf, 0), 1);
  27. ee.emit('abort');
  28. });
  29. test('tasks can be aborted through EventEmitter before running', async ({ equal, rejects }) => {
  30. const pool = new Piscina({
  31. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  32. maxThreads: 1
  33. });
  34. const bufs = [
  35. new Int32Array(new SharedArrayBuffer(4)),
  36. new Int32Array(new SharedArrayBuffer(4))
  37. ];
  38. const task1 = pool.runTask(bufs[0]);
  39. const ee = new EventEmitter();
  40. rejects(pool.runTask(bufs[1], ee), /The task has been aborted/);
  41. rejects(pool.run(bufs[1], { signal: ee }), /The task has been aborted/);
  42. equal(pool.queueSize, 2);
  43. ee.emit('abort');
  44. // Wake up the thread handling the first task.
  45. Atomics.store(bufs[0], 0, 1);
  46. Atomics.notify(bufs[0], 0, 1);
  47. await task1;
  48. });
  49. test('abortable tasks will not share workers (abortable posted second)', async ({ equal, rejects }) => {
  50. const pool = new Piscina({
  51. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  52. maxThreads: 1,
  53. concurrentTasksPerWorker: 2
  54. });
  55. const bufs = [
  56. new Int32Array(new SharedArrayBuffer(4)),
  57. new Int32Array(new SharedArrayBuffer(4))
  58. ];
  59. const task1 = pool.runTask(bufs[0]);
  60. const ee = new EventEmitter();
  61. rejects(pool.runTask(bufs[1], ee), /The task has been aborted/);
  62. equal(pool.queueSize, 1);
  63. ee.emit('abort');
  64. // Wake up the thread handling the first task.
  65. Atomics.store(bufs[0], 0, 1);
  66. Atomics.notify(bufs[0], 0, 1);
  67. await task1;
  68. });
  69. test('abortable tasks will not share workers (abortable posted first)', async ({ equal, rejects }) => {
  70. const pool = new Piscina({
  71. filename: resolve(__dirname, 'fixtures/eval.js'),
  72. maxThreads: 1,
  73. concurrentTasksPerWorker: 2
  74. });
  75. const ee = new EventEmitter();
  76. rejects(pool.runTask('while(true);', ee), /The task has been aborted/);
  77. const task2 = pool.runTask('42');
  78. equal(pool.queueSize, 1);
  79. ee.emit('abort');
  80. // Wake up the thread handling the second task.
  81. equal(await task2, 42);
  82. });
  83. test('abortable tasks will not share workers (on worker available)', async ({ equal }) => {
  84. const pool = new Piscina({
  85. filename: resolve(__dirname, 'fixtures/sleep.js'),
  86. maxThreads: 1,
  87. concurrentTasksPerWorker: 2
  88. });
  89. // Task 1 will sleep 100 ms then complete,
  90. // Task 2 will sleep 300 ms then complete.
  91. // Abortable task 3 should still be in the queue
  92. // when Task 1 completes, but should not be selected
  93. // until after Task 2 completes because it is abortable.
  94. const ret = await Promise.all([
  95. pool.runTask({ time: 100, a: 1 }),
  96. pool.runTask({ time: 300, a: 2 }),
  97. pool.runTask({ time: 100, a: 3 }, new EventEmitter())
  98. ]);
  99. equal(ret[0], 0);
  100. equal(ret[1], 1);
  101. equal(ret[2], 2);
  102. });
  103. test('abortable tasks will not share workers (destroy workers)', async ({ rejects }) => {
  104. const pool = new Piscina({
  105. filename: resolve(__dirname, 'fixtures/sleep.js'),
  106. maxThreads: 1,
  107. concurrentTasksPerWorker: 2
  108. });
  109. // Task 1 will sleep 100 ms then complete,
  110. // Task 2 will sleep 300 ms then complete.
  111. // Abortable task 3 should still be in the queue
  112. // when Task 1 completes, but should not be selected
  113. // until after Task 2 completes because it is abortable.
  114. pool.runTask({ time: 100, a: 1 }).then(() => {
  115. pool.destroy();
  116. });
  117. rejects(pool.runTask({ time: 300, a: 2 }), /Terminating worker thread/);
  118. rejects(pool.runTask({ time: 100, a: 3 }, new EventEmitter()),
  119. /Terminating worker thread/);
  120. });
  121. test('aborted AbortSignal rejects task immediately', async ({ rejects, equal }) => {
  122. const pool = new Piscina({
  123. filename: resolve(__dirname, 'fixtures/move.ts')
  124. });
  125. const controller = new AbortController();
  126. // Abort the controller early
  127. controller.abort();
  128. equal(controller.signal.aborted, true);
  129. // The data won't be moved because the task will abort immediately.
  130. const data = new Uint8Array(new SharedArrayBuffer(4));
  131. rejects(pool.runTask(data, [data.buffer], controller.signal),
  132. /The task has been aborted/);
  133. equal(data.length, 4);
  134. });
  135. test('task with AbortSignal cleans up properly', async ({ equal }) => {
  136. const pool = new Piscina({
  137. filename: resolve(__dirname, 'fixtures/eval.js')
  138. });
  139. const ee = new EventEmitter();
  140. await pool.runTask('1+1', ee);
  141. const { getEventListeners } = EventEmitter as any;
  142. if (typeof getEventListeners === 'function') {
  143. equal(getEventListeners(ee, 'abort').length, 0);
  144. }
  145. const controller = new AbortController();
  146. await pool.runTask('1+1', controller.signal);
  147. });
  148. test('aborted AbortSignal rejects task immediately (with reason)', async ({ match, equal }) => {
  149. const pool = new Piscina({
  150. filename: resolve(__dirname, 'fixtures/move.ts')
  151. });
  152. const customReason = new Error('custom reason');
  153. const controller = new AbortController();
  154. controller.abort(customReason);
  155. equal(controller.signal.aborted, true);
  156. equal(controller.signal.reason, customReason);
  157. // The data won't be moved because the task will abort immediately.
  158. const data = new Uint8Array(new SharedArrayBuffer(4));
  159. try {
  160. await pool.run(data, { transferList: [data.buffer], signal: controller.signal });
  161. } catch (error) {
  162. equal(error.message, 'The task has been aborted');
  163. match(error.cause, customReason);
  164. }
  165. equal(data.length, 4);
  166. });
  167. test('tasks can be aborted through AbortController while running', async ({ equal, match }) => {
  168. const pool = new Piscina({
  169. filename: resolve(__dirname, 'fixtures/notify-then-sleep.ts')
  170. });
  171. const reason = new Error('custom reason');
  172. const buf = new Int32Array(new SharedArrayBuffer(4));
  173. const abortController = new AbortController();
  174. try {
  175. const promise = pool.run(buf, { signal: abortController.signal });
  176. Atomics.wait(buf, 0, 0);
  177. equal(Atomics.load(buf, 0), 1);
  178. abortController.abort(reason);
  179. await promise;
  180. } catch (error) {
  181. equal(error.message, 'The task has been aborted');
  182. match(error.cause, reason);
  183. }
  184. });