* Streams - don't need to fit in memory all at once - data not available all at once - instance of =EventEmitter= - is usually consumed using =pipe= - using streams, it can potentially scale infinitely because th amount of memory used by the process stays constant - most programs written using streams will scale well with any input size ** Pratical Create a huge file - creates a file with 400MB using a =writable= stream #+begin_src js :tangle examples/stream/create-big-file.mjs import fs from 'node:fs' const file = fs.createWriteStream('./big.file') for (let i = 0; i <= 1e6; i++) { file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n') } file.end() #+end_src Let's serve this files with a running server - sends the file without blocking the main thread - reads all the file in memory and then send it for the client #+begin_src js :tangle examples/stream/server-without-stream.mjs import fs from 'node:fs' import { createServer } from 'node:http' const server = createServer() server.on('request', (req, res) => { fs.readFile('./big.file', (err, data) => { if (err) throw err; res.end(data) }) }) server.listen(8000) #+end_src - HTTP response object is also a writable stream #+begin_src js :tangle examples/stream/server-with-stream.mjs import fs from 'node:fs' import { createServer } from 'node:http' const server = createServer() server.on('request', (req, res) => { const src = fs.createReadStream('./big.file') src.pipe(res) }) server.listen(8000) #+end_src ** Types of streams - Readable stream :: source in which data can be consumed - Writable stream :: destination to which data can be written - Duplex stream :: is both Readable and Writable stream - Transform stream :: is a duplex that is used to transform data when data is read and written ** Pipe - pipes the output of a stream into the input of the next - source has to be a readable stream and the destination has to be a writable - lacks error handling - it's better to use the =pump= library ** Stream events - streams can also be consumed with events - streams inherit from the =EventEmitter= class - data :: emitted when new data is read from a readable stream - end :: emitted whe na readable stream has no more data available and all the available data has been read - finish :: emitted when a writable stream has been ended and all pending writes have been completed - close :: loosely defined in the stream docs, close is usually emitted when the stream is fully closed - error :: emitted when a stream has experienced an error - paused :: emitted when a readable streams has been paused. Pausing will happen when either backpressure occurs or if the pause method is explicitly called - resume :: emitted when a readable stream goes from being palsed to being resumed again ** Paused and Flowing Modes of readable streams - also referred as =pull= and =push= modes - all readable streams starts =paused= but can be switched to flowing mode in one of the following ways - adding a =data= event handler - calling the =stream.resume()= method - calling the =stream.pipe()= method to send the data to a Writable - the =Readable= can switch back to paused mode using one of the following - if there are no pipe destinations, by calling the =stream.pause()= method - if there are pipe destinations, by removing all pipe destinations. Multiple pipe destinations may be removed by calling the =stream.unpipe()= method - in flowing mode, data can be lost if no consumers are available - when consuming readable streams using =pipe=, we don't need to worry about this as it is managed automatically - *important concept* :: a =Readable= will not generate data until a mechanism for either consuming or ignoring that data is provided. If the consuming mechanism is disabled or taken away, the =Readable= will attempt to stop generating the data. ** Implement Writable Stream #+begin_src js :tangle examples/stream/implement-writable-stream.mjs import { Writable } from 'node:stream' const outStream = new Writable({ write(chunk, encoding, callback) { console.log(chunk.toString()) callback() } }) process.stdin.pipe(outStream) #+end_src - chunk :: usually a buffer, unless specified - encoding :: is needed, but usually ignored - callback :: it signals whether the write was successful or not. To signal a failure call the callback with an error object ** Implement a Readable Stream #+begin_src js :tangle examples/stream/implement-readable-stream.mjs import { Readable } from 'node:stream' const inStream = new Readable({ read(size) { this.push(String.fromCharCode(this.currentCharCode++)) if (this.currentCharCode > 90 ) { this.push(null) } } }) inStream.currentCharCode = 65 inStream.pipe(process.stdout) #+end_src - pushing data into the stream when the consumer asks ** Implement Duplex/Transform Stream #+begin_src js :tangle examples/stream/implement-duplex-stream.mjs import { Duplex } from 'node:stream' const inoutStream = new Duplex({ write(chunk, encoding, callback) { console.log(chunk.toString()) callback() } read(size) { this.push(String.fromCharCode(this.currentCharCode++)) if (this.currentCharCode > 90 ) { this.push(null) } } }) inoutStream.currentCharCode = 65 process.stding .pipe(inountStream) .pipe(process.stdout) #+end_src - readable and writable sides of a duplex operate independently from one another #+begin_src js :tangle examples/stream/implement-transform-stream.mjs import { Transform } from 'node:stream' const upperCaseTr = new Transform({ transform(chunk, encoding, callback) { this.push(chunk.toString().toUpperCase()) callback() } }) process.stding .pipe(upperCaseTr) .pipe(process.stdout) #+end_src ** Streams Object Mode - by default, streams only accepts Buffer or String - flag =objectMode= to set stream to accept any js object #+begin_src js :tangle examples/stream/implement-object-mode-stream.mjs import { Transform } from 'node:stream' const commaSplitter = new Transform({ readableObjectMode: true, transform(chunk, encoding, callback) { this.push(chunk.toString().trim().split(',')) callback() } }) const arrayToObject = new Transform({ readableObjectMode: true, writableObjectMode: true, transform(chunk, encoding, callback) { const obj = {} for (let i = 0; i < chunk.length; i += 2) { obj[chunk[i]] = chunk[i + 1] } this.push(obj) callback() } }) const objectToString = new Transform({ readableObjectMode: true, writableObjectMode: true, transform(chunk, encoding, callback) { this.push(JSON.stringify(chunk) + '\n') callback() } }) process.stdin .pipe(commaSplitter) .pipe(arrayToObject) .pipe(objectToString) .pipe(process.stdout) #+end_src ** Async iterators - It's highly recommended to use async iterator when working with streams. - you can use async iterator when reading from readable streams - note that we had to use an async function because we wanted to return a Promise - it's important to keep in mind to not mix async function with =EventEmitter= because there is no way to catch a rejection when it is emitted withint an event handler #+begin_src js :tangle examples/stream/implement-async-iterators.mjs import { createReadStream } from 'node:fs' async function logChunks(readable) { for await (const chunk of readable) { console.log(chunk) } } const readable = craeateReadStream('./big.file', { encoding: 'utf8' }) logChunks(readable) #+end_src ** Creating readable streams from iterables - there's a =from= method to create Readable streams out of iterators #+begin_src js :tangle examples/stream/implement-readable-streams-from-iterables.mjs import { Readable } from 'node:stream' async function* generate() { yield 'hello' yield 'streams' } const readable = Readable.from(generate()) readable.on('data', chunk => { console.log(chunk) }) #+end_src ** Backpressure - using =pipe= is safer than using =data= events and then writing go another stream directly, because it also handles backpressure for free - memory management capability where the amount of in-process memory is kept at a constant by holding data in the external pipeline * References - [[https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93/][Node.js Streams: Everything you need to know]] - [[https://nodesource.com/blog/understanding-streams-in-nodejs/][Understanding Streams in Node.js]] - [[https://www.packtpub.com/en-us/product/node-cookbook-9781838558758][Node Cookbook, 3rd Edition - Chapter 4 - Using streams]]