index.js 1.5 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061
  1. 'use strict';
  2. const {constants: BufferConstants} = require('buffer');
  3. const stream = require('stream');
  4. const {promisify} = require('util');
  5. const bufferStream = require('./buffer-stream');
  6. const streamPipelinePromisified = promisify(stream.pipeline);
  7. class MaxBufferError extends Error {
  8. constructor() {
  9. super('maxBuffer exceeded');
  10. this.name = 'MaxBufferError';
  11. }
  12. }
  13. async function getStream(inputStream, options) {
  14. if (!inputStream) {
  15. throw new Error('Expected a stream');
  16. }
  17. options = {
  18. maxBuffer: Infinity,
  19. ...options
  20. };
  21. const {maxBuffer} = options;
  22. const stream = bufferStream(options);
  23. await new Promise((resolve, reject) => {
  24. const rejectPromise = error => {
  25. // Don't retrieve an oversized buffer.
  26. if (error && stream.getBufferedLength() <= BufferConstants.MAX_LENGTH) {
  27. error.bufferedData = stream.getBufferedValue();
  28. }
  29. reject(error);
  30. };
  31. (async () => {
  32. try {
  33. await streamPipelinePromisified(inputStream, stream);
  34. resolve();
  35. } catch (error) {
  36. rejectPromise(error);
  37. }
  38. })();
  39. stream.on('data', () => {
  40. if (stream.getBufferedLength() > maxBuffer) {
  41. rejectPromise(new MaxBufferError());
  42. }
  43. });
  44. });
  45. return stream.getBufferedValue();
  46. }
  47. module.exports = getStream;
  48. module.exports.buffer = (stream, options) => getStream(stream, {...options, encoding: 'buffer'});
  49. module.exports.array = (stream, options) => getStream(stream, {...options, array: true});
  50. module.exports.MaxBufferError = MaxBufferError;