this repo has no description
at main 9.5 kB view raw
1* Streams 2 3- don't need to fit in memory all at once 4- data not available all at once 5- instance of =EventEmitter= 6- is usually consumed using =pipe= 7- using streams, it can potentially scale infinitely because th amount of memory used by the process stays constant 8- most programs written using streams will scale well with any input size 9 10** Pratical 11Create a huge file 12 13- creates a file with 400MB using a =writable= stream 14 15#+begin_src js :tangle examples/stream/create-big-file.mjs 16 import fs from 'node:fs' 17 18 const file = fs.createWriteStream('./big.file') 19 20 for (let i = 0; i <= 1e6; i++) { 21 file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n') 22 } 23 24 file.end() 25#+end_src 26 27Let's serve this files with a running server 28 29- sends the file without blocking the main thread 30- reads all the file in memory and then send it for the client 31 32#+begin_src js :tangle examples/stream/server-without-stream.mjs 33 import fs from 'node:fs' 34 import { createServer } from 'node:http' 35 36 const server = createServer() 37 38 server.on('request', (req, res) => { 39 fs.readFile('./big.file', (err, data) => { 40 if (err) throw err; 41 42 res.end(data) 43 }) 44 }) 45 46 server.listen(8000) 47#+end_src 48 49- HTTP response object is also a writable stream 50 51#+begin_src js :tangle examples/stream/server-with-stream.mjs 52 import fs from 'node:fs' 53 import { createServer } from 'node:http' 54 55 const server = createServer() 56 57 server.on('request', (req, res) => { 58 const src = fs.createReadStream('./big.file') 59 60 src.pipe(res) 61 }) 62 63 server.listen(8000) 64#+end_src 65 66** Types of streams 67 68- Readable stream :: source in which data can be consumed 69- Writable stream :: destination to which data can be written 70- Duplex stream :: is both Readable and Writable stream 71- Transform stream :: is a duplex that is used to transform data when data is read and written 72 73** Pipe 74 75- pipes the output of a stream into the input of the next 76- source has to be a readable stream and the destination has to be a writable 77- lacks error handling 78- it's better to use the =pump= library 79 80** Stream events 81 82- streams can also be consumed with events 83- streams inherit from the =EventEmitter= class 84 - data :: emitted when new data is read from a readable stream 85 - end :: emitted whe na readable stream has no more data available and all the available data has been read 86 - finish :: emitted when a writable stream has been ended and all pending writes have been completed 87 - close :: loosely defined in the stream docs, close is usually emitted when the stream is fully closed 88 - error :: emitted when a stream has experienced an error 89 - paused :: emitted when a readable streams has been paused. Pausing will happen when either backpressure occurs or if the pause method is explicitly called 90 - resume :: emitted when a readable stream goes from being palsed to being resumed again 91 92** Paused and Flowing Modes of readable streams 93 94- also referred as =pull= and =push= modes 95- all readable streams starts =paused= but can be switched to flowing mode in one of the following ways 96 - adding a =data= event handler 97 - calling the =stream.resume()= method 98 - calling the =stream.pipe()= method to send the data to a Writable 99- the =Readable= can switch back to paused mode using one of the following 100 - if there are no pipe destinations, by calling the =stream.pause()= method 101 - if there are pipe destinations, by removing all pipe destinations. Multiple pipe destinations may be removed by calling the =stream.unpipe()= method 102- in flowing mode, data can be lost if no consumers are available 103- when consuming readable streams using =pipe=, we don't need to worry about this as it is managed automatically 104- *important concept* :: a =Readable= will not generate data until a mechanism for either consuming or ignoring that data is provided. If the consuming mechanism is disabled or taken away, the =Readable= will attempt to stop generating the data. 105 106** Implement Writable Stream 107 108#+begin_src js :tangle examples/stream/implement-writable-stream.mjs 109 import { Writable } from 'node:stream' 110 111 const outStream = new Writable({ 112 write(chunk, encoding, callback) { 113 console.log(chunk.toString()) 114 callback() 115 } 116 }) 117 118 process.stdin.pipe(outStream) 119#+end_src 120 121- chunk :: usually a buffer, unless specified 122- encoding :: is needed, but usually ignored 123- callback :: it signals whether the write was successful or not. To signal a failure call the callback with an error object 124 125** Implement a Readable Stream 126 127#+begin_src js :tangle examples/stream/implement-readable-stream.mjs 128 import { Readable } from 'node:stream' 129 130 const inStream = new Readable({ 131 read(size) { 132 this.push(String.fromCharCode(this.currentCharCode++)) 133 if (this.currentCharCode > 90 ) { 134 this.push(null) 135 } 136 } 137 }) 138 139 inStream.currentCharCode = 65 140 141 inStream.pipe(process.stdout) 142#+end_src 143 144- pushing data into the stream when the consumer asks 145 146** Implement Duplex/Transform Stream 147 148#+begin_src js :tangle examples/stream/implement-duplex-stream.mjs 149 import { Duplex } from 'node:stream' 150 151 const inoutStream = new Duplex({ 152 write(chunk, encoding, callback) { 153 console.log(chunk.toString()) 154 callback() 155 } 156 157 read(size) { 158 this.push(String.fromCharCode(this.currentCharCode++)) 159 if (this.currentCharCode > 90 ) { 160 this.push(null) 161 } 162 } 163 }) 164 165 inoutStream.currentCharCode = 65 166 167 process.stding 168 .pipe(inountStream) 169 .pipe(process.stdout) 170#+end_src 171 172- readable and writable sides of a duplex operate independently from one another 173 174#+begin_src js :tangle examples/stream/implement-transform-stream.mjs 175 import { Transform } from 'node:stream' 176 177 const upperCaseTr = new Transform({ 178 transform(chunk, encoding, callback) { 179 this.push(chunk.toString().toUpperCase()) 180 callback() 181 } 182 }) 183 184 process.stding 185 .pipe(upperCaseTr) 186 .pipe(process.stdout) 187#+end_src 188 189** Streams Object Mode 190 191- by default, streams only accepts Buffer or String 192- flag =objectMode= to set stream to accept any js object 193 194#+begin_src js :tangle examples/stream/implement-object-mode-stream.mjs 195 import { Transform } from 'node:stream' 196 197 const commaSplitter = new Transform({ 198 readableObjectMode: true, 199 200 transform(chunk, encoding, callback) { 201 this.push(chunk.toString().trim().split(',')) 202 callback() 203 } 204 }) 205 206 const arrayToObject = new Transform({ 207 readableObjectMode: true, 208 writableObjectMode: true, 209 210 transform(chunk, encoding, callback) { 211 const obj = {} 212 for (let i = 0; i < chunk.length; i += 2) { 213 obj[chunk[i]] = chunk[i + 1] 214 } 215 this.push(obj) 216 217 callback() 218 } 219 }) 220 221 const objectToString = new Transform({ 222 readableObjectMode: true, 223 writableObjectMode: true, 224 225 transform(chunk, encoding, callback) { 226 this.push(JSON.stringify(chunk) + '\n') 227 callback() 228 } 229 }) 230 231 process.stdin 232 .pipe(commaSplitter) 233 .pipe(arrayToObject) 234 .pipe(objectToString) 235 .pipe(process.stdout) 236#+end_src 237 238** Async iterators 239 240- It's highly recommended to use async iterator when working with streams. 241- you can use async iterator when reading from readable streams 242- note that we had to use an async function because we wanted to return a Promise 243- it's important to keep in mind to not mix async function with =EventEmitter= because there is no way to catch a rejection when it is emitted withint an event handler 244 245#+begin_src js :tangle examples/stream/implement-async-iterators.mjs 246 import { createReadStream } from 'node:fs' 247 248 async function logChunks(readable) { 249 for await (const chunk of readable) { 250 console.log(chunk) 251 } 252 } 253 254 const readable = craeateReadStream('./big.file', { encoding: 'utf8' }) 255 256 logChunks(readable) 257#+end_src 258 259** Creating readable streams from iterables 260 261- there's a =from= method to create Readable streams out of iterators 262 263#+begin_src js :tangle examples/stream/implement-readable-streams-from-iterables.mjs 264 import { Readable } from 'node:stream' 265 266 async function* generate() { 267 yield 'hello' 268 yield 'streams' 269 } 270 271 const readable = Readable.from(generate()) 272 273 readable.on('data', chunk => { 274 console.log(chunk) 275 }) 276#+end_src 277 278** Backpressure 279 280- using =pipe= is safer than using =data= events and then writing go another stream directly, because it also handles backpressure for free 281- memory management capability where the amount of in-process memory is kept at a constant by holding data in the external pipeline 282 283* References 284 285- [[https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93/][Node.js Streams: Everything you need to know]] 286- [[https://nodesource.com/blog/understanding-streams-in-nodejs/][Understanding Streams in Node.js]] 287- [[https://www.packtpub.com/en-us/product/node-cookbook-9781838558758][Node Cookbook, 3rd Edition - Chapter 4 - Using streams]]