StreamHelper.js 6.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. "use strict";
  2. var utils = require("../utils");
  3. var ConvertWorker = require("./ConvertWorker");
  4. var GenericWorker = require("./GenericWorker");
  5. var base64 = require("../base64");
  6. var support = require("../support");
  7. var external = require("../external");
  8. var NodejsStreamOutputAdapter = null;
  9. if (support.nodestream) {
  10. try {
  11. NodejsStreamOutputAdapter = require("../nodejs/NodejsStreamOutputAdapter");
  12. } catch(e) {
  13. // ignore
  14. }
  15. }
  16. /**
  17. * Apply the final transformation of the data. If the user wants a Blob for
  18. * example, it's easier to work with an U8intArray and finally do the
  19. * ArrayBuffer/Blob conversion.
  20. * @param {String} type the name of the final type
  21. * @param {String|Uint8Array|Buffer} content the content to transform
  22. * @param {String} mimeType the mime type of the content, if applicable.
  23. * @return {String|Uint8Array|ArrayBuffer|Buffer|Blob} the content in the right format.
  24. */
  25. function transformZipOutput(type, content, mimeType) {
  26. switch(type) {
  27. case "blob" :
  28. return utils.newBlob(utils.transformTo("arraybuffer", content), mimeType);
  29. case "base64" :
  30. return base64.encode(content);
  31. default :
  32. return utils.transformTo(type, content);
  33. }
  34. }
  35. /**
  36. * Concatenate an array of data of the given type.
  37. * @param {String} type the type of the data in the given array.
  38. * @param {Array} dataArray the array containing the data chunks to concatenate
  39. * @return {String|Uint8Array|Buffer} the concatenated data
  40. * @throws Error if the asked type is unsupported
  41. */
  42. function concat (type, dataArray) {
  43. var i, index = 0, res = null, totalLength = 0;
  44. for(i = 0; i < dataArray.length; i++) {
  45. totalLength += dataArray[i].length;
  46. }
  47. switch(type) {
  48. case "string":
  49. return dataArray.join("");
  50. case "array":
  51. return Array.prototype.concat.apply([], dataArray);
  52. case "uint8array":
  53. res = new Uint8Array(totalLength);
  54. for(i = 0; i < dataArray.length; i++) {
  55. res.set(dataArray[i], index);
  56. index += dataArray[i].length;
  57. }
  58. return res;
  59. case "nodebuffer":
  60. return Buffer.concat(dataArray);
  61. default:
  62. throw new Error("concat : unsupported type '" + type + "'");
  63. }
  64. }
  65. /**
  66. * Listen a StreamHelper, accumulate its content and concatenate it into a
  67. * complete block.
  68. * @param {StreamHelper} helper the helper to use.
  69. * @param {Function} updateCallback a callback called on each update. Called
  70. * with one arg :
  71. * - the metadata linked to the update received.
  72. * @return Promise the promise for the accumulation.
  73. */
  74. function accumulate(helper, updateCallback) {
  75. return new external.Promise(function (resolve, reject){
  76. var dataArray = [];
  77. var chunkType = helper._internalType,
  78. resultType = helper._outputType,
  79. mimeType = helper._mimeType;
  80. helper
  81. .on("data", function (data, meta) {
  82. dataArray.push(data);
  83. if(updateCallback) {
  84. updateCallback(meta);
  85. }
  86. })
  87. .on("error", function(err) {
  88. dataArray = [];
  89. reject(err);
  90. })
  91. .on("end", function (){
  92. try {
  93. var result = transformZipOutput(resultType, concat(chunkType, dataArray), mimeType);
  94. resolve(result);
  95. } catch (e) {
  96. reject(e);
  97. }
  98. dataArray = [];
  99. })
  100. .resume();
  101. });
  102. }
  103. /**
  104. * An helper to easily use workers outside of JSZip.
  105. * @constructor
  106. * @param {Worker} worker the worker to wrap
  107. * @param {String} outputType the type of data expected by the use
  108. * @param {String} mimeType the mime type of the content, if applicable.
  109. */
  110. function StreamHelper(worker, outputType, mimeType) {
  111. var internalType = outputType;
  112. switch(outputType) {
  113. case "blob":
  114. case "arraybuffer":
  115. internalType = "uint8array";
  116. break;
  117. case "base64":
  118. internalType = "string";
  119. break;
  120. }
  121. try {
  122. // the type used internally
  123. this._internalType = internalType;
  124. // the type used to output results
  125. this._outputType = outputType;
  126. // the mime type
  127. this._mimeType = mimeType;
  128. utils.checkSupport(internalType);
  129. this._worker = worker.pipe(new ConvertWorker(internalType));
  130. // the last workers can be rewired without issues but we need to
  131. // prevent any updates on previous workers.
  132. worker.lock();
  133. } catch(e) {
  134. this._worker = new GenericWorker("error");
  135. this._worker.error(e);
  136. }
  137. }
  138. StreamHelper.prototype = {
  139. /**
  140. * Listen a StreamHelper, accumulate its content and concatenate it into a
  141. * complete block.
  142. * @param {Function} updateCb the update callback.
  143. * @return Promise the promise for the accumulation.
  144. */
  145. accumulate : function (updateCb) {
  146. return accumulate(this, updateCb);
  147. },
  148. /**
  149. * Add a listener on an event triggered on a stream.
  150. * @param {String} evt the name of the event
  151. * @param {Function} fn the listener
  152. * @return {StreamHelper} the current helper.
  153. */
  154. on : function (evt, fn) {
  155. var self = this;
  156. if(evt === "data") {
  157. this._worker.on(evt, function (chunk) {
  158. fn.call(self, chunk.data, chunk.meta);
  159. });
  160. } else {
  161. this._worker.on(evt, function () {
  162. utils.delay(fn, arguments, self);
  163. });
  164. }
  165. return this;
  166. },
  167. /**
  168. * Resume the flow of chunks.
  169. * @return {StreamHelper} the current helper.
  170. */
  171. resume : function () {
  172. utils.delay(this._worker.resume, [], this._worker);
  173. return this;
  174. },
  175. /**
  176. * Pause the flow of chunks.
  177. * @return {StreamHelper} the current helper.
  178. */
  179. pause : function () {
  180. this._worker.pause();
  181. return this;
  182. },
  183. /**
  184. * Return a nodejs stream for this helper.
  185. * @param {Function} updateCb the update callback.
  186. * @return {NodejsStreamOutputAdapter} the nodejs stream.
  187. */
  188. toNodejsStream : function (updateCb) {
  189. utils.checkSupport("nodestream");
  190. if (this._outputType !== "nodebuffer") {
  191. // an object stream containing blob/arraybuffer/uint8array/string
  192. // is strange and I don't know if it would be useful.
  193. // I you find this comment and have a good usecase, please open a
  194. // bug report !
  195. throw new Error(this._outputType + " is not supported by this method");
  196. }
  197. return new NodejsStreamOutputAdapter(this, {
  198. objectMode : this._outputType !== "nodebuffer"
  199. }, updateCb);
  200. }
  201. };
  202. module.exports = StreamHelper;