task-queue.ts 7.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279
  1. import Piscina, { PiscinaTask, TaskQueue } from '..';
  2. import { test } from 'tap';
  3. import { resolve } from 'path';
  4. test('will put items into a task queue until they can run', async ({ equal }) => {
  5. const pool = new Piscina({
  6. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  7. minThreads: 2,
  8. maxThreads: 3
  9. });
  10. equal(pool.threads.length, 2);
  11. equal(pool.queueSize, 0);
  12. const buffers = [
  13. new Int32Array(new SharedArrayBuffer(4)),
  14. new Int32Array(new SharedArrayBuffer(4)),
  15. new Int32Array(new SharedArrayBuffer(4)),
  16. new Int32Array(new SharedArrayBuffer(4))
  17. ];
  18. const results = [];
  19. results.push(pool.runTask(buffers[0]));
  20. equal(pool.threads.length, 2);
  21. equal(pool.queueSize, 0);
  22. results.push(pool.runTask(buffers[1]));
  23. equal(pool.threads.length, 2);
  24. equal(pool.queueSize, 0);
  25. results.push(pool.runTask(buffers[2]));
  26. equal(pool.threads.length, 3);
  27. equal(pool.queueSize, 0);
  28. results.push(pool.runTask(buffers[3]));
  29. equal(pool.threads.length, 3);
  30. equal(pool.queueSize, 1);
  31. for (const buffer of buffers) {
  32. Atomics.store(buffer, 0, 1);
  33. Atomics.notify(buffer, 0, 1);
  34. }
  35. await results[0];
  36. equal(pool.queueSize, 0);
  37. await Promise.all(results);
  38. });
  39. test('will reject items over task queue limit', async ({ equal, rejects }) => {
  40. const pool = new Piscina({
  41. filename: resolve(__dirname, 'fixtures/eval.js'),
  42. minThreads: 0,
  43. maxThreads: 1,
  44. maxQueue: 2
  45. });
  46. equal(pool.threads.length, 0);
  47. equal(pool.queueSize, 0);
  48. rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
  49. equal(pool.threads.length, 1);
  50. equal(pool.queueSize, 0);
  51. rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
  52. equal(pool.threads.length, 1);
  53. equal(pool.queueSize, 1);
  54. rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
  55. equal(pool.threads.length, 1);
  56. equal(pool.queueSize, 2);
  57. rejects(pool.runTask('while (true) {}'), /Task queue is at limit/);
  58. await pool.destroy();
  59. });
  60. test('will reject items when task queue is unavailable', async ({ equal, rejects }) => {
  61. const pool = new Piscina({
  62. filename: resolve(__dirname, 'fixtures/eval.js'),
  63. minThreads: 0,
  64. maxThreads: 1,
  65. maxQueue: 0
  66. });
  67. equal(pool.threads.length, 0);
  68. equal(pool.queueSize, 0);
  69. rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
  70. equal(pool.threads.length, 1);
  71. equal(pool.queueSize, 0);
  72. rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
  73. await pool.destroy();
  74. });
  75. test('will reject items when task queue is unavailable (fixed thread count)', async ({ equal, rejects }) => {
  76. const pool = new Piscina({
  77. filename: resolve(__dirname, 'fixtures/eval.js'),
  78. minThreads: 1,
  79. maxThreads: 1,
  80. maxQueue: 0
  81. });
  82. equal(pool.threads.length, 1);
  83. equal(pool.queueSize, 0);
  84. rejects(pool.runTask('while (true) {}'), /Terminating worker thread/);
  85. equal(pool.threads.length, 1);
  86. equal(pool.queueSize, 0);
  87. rejects(pool.runTask('while (true) {}'), /No task queue available and all Workers are busy/);
  88. await pool.destroy();
  89. });
  90. test('tasks can share a Worker if requested (both tests blocking)', async ({ equal, rejects }) => {
  91. const pool = new Piscina({
  92. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  93. minThreads: 0,
  94. maxThreads: 1,
  95. maxQueue: 0,
  96. concurrentTasksPerWorker: 2
  97. });
  98. equal(pool.threads.length, 0);
  99. equal(pool.queueSize, 0);
  100. rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
  101. equal(pool.threads.length, 1);
  102. equal(pool.queueSize, 0);
  103. rejects(pool.runTask(new Int32Array(new SharedArrayBuffer(4))));
  104. equal(pool.threads.length, 1);
  105. equal(pool.queueSize, 0);
  106. await pool.destroy();
  107. });
  108. test('tasks can share a Worker if requested (one test finishes)', async ({ equal, rejects }) => {
  109. const pool = new Piscina({
  110. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  111. minThreads: 0,
  112. maxThreads: 1,
  113. maxQueue: 0,
  114. concurrentTasksPerWorker: 2
  115. });
  116. const buffers = [
  117. new Int32Array(new SharedArrayBuffer(4)),
  118. new Int32Array(new SharedArrayBuffer(4))
  119. ];
  120. equal(pool.threads.length, 0);
  121. equal(pool.queueSize, 0);
  122. const firstTask = pool.runTask(buffers[0]);
  123. equal(pool.threads.length, 1);
  124. equal(pool.queueSize, 0);
  125. rejects(pool.runTask(
  126. 'new Promise((resolve) => setTimeout(resolve, 1000000))',
  127. resolve(__dirname, 'fixtures/eval.js')), /Terminating worker thread/);
  128. equal(pool.threads.length, 1);
  129. equal(pool.queueSize, 0);
  130. Atomics.store(buffers[0], 0, 1);
  131. Atomics.notify(buffers[0], 0, 1);
  132. await firstTask;
  133. equal(pool.threads.length, 1);
  134. equal(pool.queueSize, 0);
  135. await pool.destroy();
  136. });
  137. test('tasks can share a Worker if requested (both tests finish)', async ({ equal }) => {
  138. const pool = new Piscina({
  139. filename: resolve(__dirname, 'fixtures/wait-for-notify.ts'),
  140. minThreads: 1,
  141. maxThreads: 1,
  142. maxQueue: 0,
  143. concurrentTasksPerWorker: 2
  144. });
  145. const buffers = [
  146. new Int32Array(new SharedArrayBuffer(4)),
  147. new Int32Array(new SharedArrayBuffer(4))
  148. ];
  149. equal(pool.threads.length, 1);
  150. equal(pool.queueSize, 0);
  151. const firstTask = pool.runTask(buffers[0]);
  152. equal(pool.threads.length, 1);
  153. equal(pool.queueSize, 0);
  154. const secondTask = pool.runTask(buffers[1]);
  155. equal(pool.threads.length, 1);
  156. equal(pool.queueSize, 0);
  157. Atomics.store(buffers[0], 0, 1);
  158. Atomics.store(buffers[1], 0, 1);
  159. Atomics.notify(buffers[0], 0, 1);
  160. Atomics.notify(buffers[1], 0, 1);
  161. Atomics.wait(buffers[0], 0, 1);
  162. Atomics.wait(buffers[1], 0, 1);
  163. await firstTask;
  164. equal(buffers[0][0], -1);
  165. await secondTask;
  166. equal(buffers[1][0], -1);
  167. equal(pool.threads.length, 1);
  168. equal(pool.queueSize, 0);
  169. });
  170. test('custom task queue works', async ({ equal, ok }) => {
  171. let sizeCalled : boolean = false;
  172. let shiftCalled : boolean = false;
  173. let pushCalled : boolean = false;
  174. class CustomTaskPool implements TaskQueue {
  175. tasks: PiscinaTask[] = [];
  176. get size () : number {
  177. sizeCalled = true;
  178. return this.tasks.length;
  179. }
  180. shift () : PiscinaTask | null {
  181. shiftCalled = true;
  182. return this.tasks.length > 0 ? this.tasks.shift() as PiscinaTask : null;
  183. }
  184. push (task : PiscinaTask) : void {
  185. pushCalled = true;
  186. this.tasks.push(task);
  187. ok(Piscina.queueOptionsSymbol in task);
  188. if ((task as any).task.a === 3) {
  189. equal(task[Piscina.queueOptionsSymbol], null);
  190. } else {
  191. equal(task[Piscina.queueOptionsSymbol].option,
  192. (task as any).task.a);
  193. }
  194. }
  195. remove (task : PiscinaTask) : void {
  196. const index = this.tasks.indexOf(task);
  197. this.tasks.splice(index, 1);
  198. }
  199. };
  200. const pool = new Piscina({
  201. filename: resolve(__dirname, 'fixtures/eval.js'),
  202. taskQueue: new CustomTaskPool(),
  203. // Setting maxThreads low enough to ensure we queue
  204. maxThreads: 1,
  205. minThreads: 1
  206. });
  207. function makeTask (task, option) {
  208. return { ...task, [Piscina.queueOptionsSymbol]: { option } };
  209. }
  210. const ret = await Promise.all([
  211. pool.runTask(makeTask({ a: 1 }, 1)),
  212. pool.runTask(makeTask({ a: 2 }, 2)),
  213. pool.runTask({ a: 3 }) // No queueOptionsSymbol attached
  214. ]);
  215. equal(ret[0].a, 1);
  216. equal(ret[1].a, 2);
  217. equal(ret[2].a, 3);
  218. ok(sizeCalled);
  219. ok(pushCalled);
  220. ok(shiftCalled);
  221. });