pool-close.ts 4.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105
  1. import { test } from 'tap';
  2. import Piscina from '..';
  3. import { resolve } from 'path';
  4. import { once } from 'events';
  5. test('close()', async (t) => {
  6. t.test('no pending tasks', async (t) => {
  7. const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js') });
  8. await pool.close();
  9. t.pass('pool closed successfully');
  10. });
  11. t.test('no pending tasks (with minThreads=0)', async (t) => {
  12. const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), minThreads: 0 });
  13. await pool.close();
  14. t.pass('pool closed successfully');
  15. });
  16. t.test('queued tasks waits for all tasks to complete', async (t) => {
  17. const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
  18. const task1 = pool.run({ time: 100 });
  19. const task2 = pool.run({ time: 100 });
  20. setImmediate(() => t.resolves(pool.close(), 'close is resolved when all running tasks are completed'));
  21. await Promise.all([
  22. t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
  23. t.resolves(task1, 'complete running task'),
  24. t.resolves(task2, 'complete running task')
  25. ]);
  26. });
  27. t.test('abort any task enqueued during closing up', async (t) => {
  28. const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1 });
  29. setImmediate(() => {
  30. t.resolves(pool.close(), 'close is resolved when running tasks are completed');
  31. t.resolves(pool.run({ time: 1000 }).then(null, err => {
  32. t.equal(err.message, 'The task has been aborted');
  33. t.equal(err.cause, 'queue is closing up');
  34. }));
  35. });
  36. await t.resolves(pool.run({ time: 100 }), 'complete running task');
  37. });
  38. });
  39. test('close({force: true})', async (t) => {
  40. t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => {
  41. const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 1 });
  42. const task1 = pool.run({ time: 1000 });
  43. const task2 = pool.run({ time: 200 });
  44. setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed'));
  45. await Promise.all([
  46. t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
  47. t.resolves(task1, 'complete running task'),
  48. t.resolves(task2.then(null, err => {
  49. t.equal(err.message, 'The task has been aborted');
  50. t.equal(err.cause, 'pool is closed');
  51. }))
  52. ]);
  53. });
  54. t.test('queued tasks waits for all tasks already running and aborts tasks that are not started yet', async (t) => {
  55. const pool = new Piscina({ filename: resolve(__dirname, 'fixtures/sleep.js'), maxThreads: 1, concurrentTasksPerWorker: 2 });
  56. const task1 = pool.run({ time: 500 });
  57. const task2 = pool.run({ time: 100 });
  58. const task3 = pool.run({ time: 100 });
  59. const task4 = pool.run({ time: 100 });
  60. setImmediate(() => t.resolves(pool.close({ force: true }), 'close is resolved when all running tasks are completed'));
  61. await Promise.all([
  62. t.resolves(once(pool, 'close'), 'handler is called when pool is closed'),
  63. t.resolves(task1, 'complete running task'),
  64. t.resolves(task2, 'complete running task'),
  65. t.rejects(task3, /The task has been aborted/, 'abort task that are not started yet'),
  66. t.rejects(task4, /The task has been aborted/, 'abort task that are not started yet')
  67. ]);
  68. });
  69. });
  70. test('timed out close operation destroys the pool', async (t) => {
  71. const pool = new Piscina({
  72. filename: resolve(__dirname, 'fixtures/sleep.js'),
  73. maxThreads: 1,
  74. closeTimeout: 500
  75. });
  76. const task1 = pool.run({ time: 5000 });
  77. const task2 = pool.run({ time: 5000 });
  78. setImmediate(() => t.resolves(pool.close(), 'close is resolved on timeout'));
  79. await Promise.all([
  80. t.resolves(once(pool, 'error'), 'error handler is called on timeout'),
  81. t.rejects(task1, /Terminating worker thread/, 'task is aborted due to timeout'),
  82. t.rejects(task2, /Terminating worker thread/, 'task is aborted due to timeout')
  83. ]);
  84. });