post-task.ts 5.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207
  1. import { MessageChannel } from 'worker_threads';
  2. import { getAvailableParallelism } from '../dist/common';
  3. import Piscina from '..';
  4. import { test } from 'tap';
  5. import { resolve } from 'path';
  6. test('postTask() can transfer ArrayBuffer instances', async ({ equal }) => {
  7. const pool = new Piscina({
  8. filename: resolve(__dirname, 'fixtures/simple-isworkerthread.ts')
  9. });
  10. const ab = new ArrayBuffer(40);
  11. await pool.runTask({ ab }, [ab]);
  12. equal(pool.completed, 1);
  13. equal(ab.byteLength, 0);
  14. });
  15. test('postTask() can transfer ArrayBuffer instances', async ({ equal }) => {
  16. const pool = new Piscina({
  17. filename: resolve(__dirname, 'fixtures/simple-isworkerthread.ts')
  18. });
  19. const ab = new ArrayBuffer(40);
  20. await pool.run({ ab }, { transferList: [ab] });
  21. equal(pool.completed, 1);
  22. equal(ab.byteLength, 0);
  23. });
  24. test('postTask() cannot clone build-in objects', async ({ rejects }) => {
  25. const pool = new Piscina({
  26. filename: resolve(__dirname, 'fixtures/simple-isworkerthread.ts')
  27. });
  28. const obj = new MessageChannel().port1;
  29. rejects(pool.runTask({ obj }));
  30. });
  31. test('postTask() resolves with a rejection when the handler rejects', async ({ rejects }) => {
  32. const pool = new Piscina({
  33. filename: resolve(__dirname, 'fixtures/eval.js')
  34. });
  35. rejects(pool.runTask('Promise.reject(new Error("foo"))'), /foo/);
  36. });
  37. test('postTask() resolves with a rejection when the handler throws', async ({ rejects }) => {
  38. const pool = new Piscina({
  39. filename: resolve(__dirname, 'fixtures/eval.js')
  40. });
  41. rejects(pool.runTask('throw new Error("foo")'), /foo/);
  42. });
  43. test('postTask() validates transferList', async ({ rejects }) => {
  44. const pool = new Piscina({
  45. filename: resolve(__dirname, 'fixtures/eval.js')
  46. });
  47. rejects(pool.runTask('0', 42 as any),
  48. /transferList argument must be an Array/);
  49. rejects(pool.run('0', { transferList: 42 as any }),
  50. /transferList argument must be an Array/);
  51. });
  52. test('postTask() validates filename', async ({ rejects }) => {
  53. const pool = new Piscina({
  54. filename: resolve(__dirname, 'fixtures/eval.js')
  55. });
  56. rejects(pool.runTask('0', [], 42 as any),
  57. /filename argument must be a string/);
  58. rejects(pool.run('0', { filename: 42 as any }),
  59. /filename argument must be a string/);
  60. });
  61. test('postTask() validates name', async ({ rejects }) => {
  62. const pool = new Piscina({
  63. filename: resolve(__dirname, 'fixtures/eval.js')
  64. });
  65. rejects(pool.run('0', { name: 42 as any }),
  66. /name argument must be a string/);
  67. });
  68. test('postTask() validates abortSignal', async ({ rejects }) => {
  69. const pool = new Piscina({
  70. filename: resolve(__dirname, 'fixtures/eval.js')
  71. });
  72. rejects(pool.runTask('0', [], undefined, 42 as any),
  73. /signal argument must be an object/);
  74. rejects(pool.run('0', { signal: 42 as any }),
  75. /signal argument must be an object/);
  76. });
  77. test('Piscina emits drain', async ({ ok, notOk }) => {
  78. const pool = new Piscina({
  79. filename: resolve(__dirname, 'fixtures/eval.js')
  80. });
  81. let drained = false;
  82. let needsDrain = true;
  83. pool.on('drain', () => {
  84. drained = true;
  85. needsDrain = pool.needsDrain;
  86. });
  87. await Promise.all([pool.run('123'), pool.run('123')]);
  88. ok(drained);
  89. notOk(needsDrain);
  90. });
  91. test('Piscina exposes/emits needsDrain to true when capacity is exceeded', async ({ ok }) => {
  92. const pool = new Piscina({
  93. filename: resolve(__dirname, 'fixtures/eval.js'),
  94. maxQueue: 3,
  95. maxThreads: 1
  96. });
  97. let triggered = false;
  98. let drained = false;
  99. pool.once('drain', () => {
  100. drained = true;
  101. });
  102. pool.once('needsDrain', () => {
  103. triggered = true;
  104. });
  105. pool.run('123');
  106. pool.run('123');
  107. pool.run('123');
  108. pool.run('123');
  109. ok(pool.needsDrain);
  110. ok(triggered);
  111. ok(drained);
  112. });
  113. test('Piscina can use async loaded workers', async ({ equal }) => {
  114. const pool = new Piscina({
  115. filename: resolve(__dirname, 'fixtures/eval-async.js')
  116. });
  117. equal(await pool.runTask('1'), 1);
  118. });
  119. test('Piscina can use async loaded esm workers', {}, async ({ equal }) => {
  120. const pool = new Piscina({
  121. filename: resolve(__dirname, 'fixtures/esm-async.mjs')
  122. });
  123. equal(await pool.runTask('1'), 1);
  124. });
  125. test('Piscina.run options is correct type', async ({ rejects }) => {
  126. const pool = new Piscina({
  127. filename: resolve(__dirname, 'fixtures/eval.js')
  128. });
  129. rejects(pool.run(42, 1 as any), /options must be an object/);
  130. });
  131. test('Piscina.maxThreads should return the max number of threads to be used (default)', ({ equal, plan }) => {
  132. plan(1);
  133. const pool = new Piscina({
  134. filename: resolve(__dirname, 'fixtures/eval.js')
  135. });
  136. const maxThreads = getAvailableParallelism() * 1.5;
  137. equal(pool.maxThreads, maxThreads);
  138. });
  139. test('Piscina.minThreads should return the max number of threads to be used (custom)', ({ equal, plan }) => {
  140. const maxThreads = 3;
  141. const pool = new Piscina({
  142. maxThreads,
  143. filename: resolve(__dirname, 'fixtures/eval.js')
  144. });
  145. plan(1);
  146. equal(pool.maxThreads, maxThreads);
  147. });
  148. test('Piscina.minThreads should return the max number of threads to be used (default)', ({ equal, plan }) => {
  149. const pool = new Piscina({
  150. filename: resolve(__dirname, 'fixtures/eval.js')
  151. });
  152. const minThreads = Math.max(Math.floor(getAvailableParallelism() / 2), 1);
  153. plan(1);
  154. equal(pool.minThreads, minThreads);
  155. });
  156. test('Piscina.minThreads should return the max number of threads to be used (custom)', ({ equal, plan }) => {
  157. const minThreads = 2;
  158. const pool = new Piscina({
  159. filename: resolve(__dirname, 'fixtures/eval.js'),
  160. minThreads
  161. });
  162. plan(1);
  163. equal(pool.minThreads, minThreads);
  164. });