stream.js 1.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657
  1. import { Transform } from 'stream'
  2. import { Packr } from './pack.js'
  3. import { Unpackr } from './unpack.js'
  4. var DEFAULT_OPTIONS = {objectMode: true}
  5. export class PackrStream extends Transform {
  6. constructor(options) {
  7. if (!options)
  8. options = {}
  9. options.writableObjectMode = true
  10. super(options)
  11. options.sequential = true
  12. this.packr = options.packr || new Packr(options)
  13. }
  14. _transform(value, encoding, callback) {
  15. this.push(this.packr.pack(value))
  16. callback()
  17. }
  18. }
  19. export class UnpackrStream extends Transform {
  20. constructor(options) {
  21. if (!options)
  22. options = {}
  23. options.objectMode = true
  24. super(options)
  25. options.structures = []
  26. this.unpackr = options.unpackr || new Unpackr(options)
  27. }
  28. _transform(chunk, encoding, callback) {
  29. if (this.incompleteBuffer) {
  30. chunk = Buffer.concat([this.incompleteBuffer, chunk])
  31. this.incompleteBuffer = null
  32. }
  33. let values
  34. try {
  35. values = this.unpackr.unpackMultiple(chunk)
  36. } catch(error) {
  37. if (error.incomplete) {
  38. this.incompleteBuffer = chunk.slice(error.lastPosition)
  39. values = error.values
  40. }
  41. else
  42. throw error
  43. } finally {
  44. for (let value of values || []) {
  45. if (value === null)
  46. value = this.getNullValue()
  47. this.push(value)
  48. }
  49. }
  50. if (callback) callback()
  51. }
  52. getNullValue() {
  53. return Symbol.for(null)
  54. }
  55. }