123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223 |
- import {on, once} from 'node:events';
- import {PassThrough as PassThroughStream} from 'node:stream';
- import {finished} from 'node:stream/promises';
- export default function mergeStreams(streams) {
- if (!Array.isArray(streams)) {
- throw new TypeError(`Expected an array, got \`${typeof streams}\`.`);
- }
- for (const stream of streams) {
- validateStream(stream);
- }
- const objectMode = streams.some(({readableObjectMode}) => readableObjectMode);
- const highWaterMark = getHighWaterMark(streams, objectMode);
- const passThroughStream = new MergedStream({
- objectMode,
- writableHighWaterMark: highWaterMark,
- readableHighWaterMark: highWaterMark,
- });
- for (const stream of streams) {
- passThroughStream.add(stream);
- }
- if (streams.length === 0) {
- endStream(passThroughStream);
- }
- return passThroughStream;
- }
- const getHighWaterMark = (streams, objectMode) => {
- if (streams.length === 0) {
- // @todo Use `node:stream` `getDefaultHighWaterMark(objectMode)` in next major release
- return 16_384;
- }
- const highWaterMarks = streams
- .filter(({readableObjectMode}) => readableObjectMode === objectMode)
- .map(({readableHighWaterMark}) => readableHighWaterMark);
- return Math.max(...highWaterMarks);
- };
- class MergedStream extends PassThroughStream {
- #streams = new Set([]);
- #ended = new Set([]);
- #aborted = new Set([]);
- #onFinished;
- add(stream) {
- validateStream(stream);
- if (this.#streams.has(stream)) {
- return;
- }
- this.#streams.add(stream);
- this.#onFinished ??= onMergedStreamFinished(this, this.#streams);
- endWhenStreamsDone({
- passThroughStream: this,
- stream,
- streams: this.#streams,
- ended: this.#ended,
- aborted: this.#aborted,
- onFinished: this.#onFinished,
- });
- stream.pipe(this, {end: false});
- }
- remove(stream) {
- validateStream(stream);
- if (!this.#streams.has(stream)) {
- return false;
- }
- stream.unpipe(this);
- return true;
- }
- }
- const onMergedStreamFinished = async (passThroughStream, streams) => {
- updateMaxListeners(passThroughStream, PASSTHROUGH_LISTENERS_COUNT);
- const controller = new AbortController();
- try {
- await Promise.race([
- onMergedStreamEnd(passThroughStream, controller),
- onInputStreamsUnpipe(passThroughStream, streams, controller),
- ]);
- } finally {
- controller.abort();
- updateMaxListeners(passThroughStream, -PASSTHROUGH_LISTENERS_COUNT);
- }
- };
- const onMergedStreamEnd = async (passThroughStream, {signal}) => {
- await finished(passThroughStream, {signal, cleanup: true});
- };
- const onInputStreamsUnpipe = async (passThroughStream, streams, {signal}) => {
- for await (const [unpipedStream] of on(passThroughStream, 'unpipe', {signal})) {
- if (streams.has(unpipedStream)) {
- unpipedStream.emit(unpipeEvent);
- }
- }
- };
- const validateStream = stream => {
- if (typeof stream?.pipe !== 'function') {
- throw new TypeError(`Expected a readable stream, got: \`${typeof stream}\`.`);
- }
- };
- const endWhenStreamsDone = async ({passThroughStream, stream, streams, ended, aborted, onFinished}) => {
- updateMaxListeners(passThroughStream, PASSTHROUGH_LISTENERS_PER_STREAM);
- const controller = new AbortController();
- try {
- await Promise.race([
- afterMergedStreamFinished(onFinished, stream),
- onInputStreamEnd({passThroughStream, stream, streams, ended, aborted, controller}),
- onInputStreamUnpipe({stream, streams, ended, aborted, controller}),
- ]);
- } finally {
- controller.abort();
- updateMaxListeners(passThroughStream, -PASSTHROUGH_LISTENERS_PER_STREAM);
- }
- if (streams.size === ended.size + aborted.size) {
- if (ended.size === 0 && aborted.size > 0) {
- abortStream(passThroughStream);
- } else {
- endStream(passThroughStream);
- }
- }
- };
- // This is the error thrown by `finished()` on `stream.destroy()`
- const isAbortError = error => error?.code === 'ERR_STREAM_PREMATURE_CLOSE';
- const afterMergedStreamFinished = async (onFinished, stream) => {
- try {
- await onFinished;
- abortStream(stream);
- } catch (error) {
- if (isAbortError(error)) {
- abortStream(stream);
- } else {
- errorStream(stream, error);
- }
- }
- };
- const onInputStreamEnd = async ({passThroughStream, stream, streams, ended, aborted, controller: {signal}}) => {
- try {
- await finished(stream, {signal, cleanup: true, readable: true, writable: false});
- if (streams.has(stream)) {
- ended.add(stream);
- }
- } catch (error) {
- if (signal.aborted || !streams.has(stream)) {
- return;
- }
- if (isAbortError(error)) {
- aborted.add(stream);
- } else {
- errorStream(passThroughStream, error);
- }
- }
- };
- const onInputStreamUnpipe = async ({stream, streams, ended, aborted, controller: {signal}}) => {
- await once(stream, unpipeEvent, {signal});
- streams.delete(stream);
- ended.delete(stream);
- aborted.delete(stream);
- };
- const unpipeEvent = Symbol('unpipe');
- const endStream = stream => {
- if (stream.writable) {
- stream.end();
- }
- };
- const abortStream = stream => {
- if (stream.readable || stream.writable) {
- stream.destroy();
- }
- };
- // `stream.destroy(error)` crashes the process with `uncaughtException` if no `error` event listener exists on `stream`.
- // We take care of error handling on user behalf, so we do not want this to happen.
- const errorStream = (stream, error) => {
- if (!stream.destroyed) {
- stream.once('error', noop);
- stream.destroy(error);
- }
- };
- const noop = () => {};
- const updateMaxListeners = (passThroughStream, increment) => {
- const maxListeners = passThroughStream.getMaxListeners();
- if (maxListeners !== 0 && maxListeners !== Number.POSITIVE_INFINITY) {
- passThroughStream.setMaxListeners(maxListeners + increment);
- }
- };
- // Number of times `passThroughStream.on()` is called regardless of streams:
- // - once due to `finished(passThroughStream)`
- // - once due to `on(passThroughStream)`
- const PASSTHROUGH_LISTENERS_COUNT = 2;
- // Number of times `passThroughStream.on()` is called per stream:
- // - once due to `stream.pipe(passThroughStream)`
- const PASSTHROUGH_LISTENERS_PER_STREAM = 1;
|