this repo has no description
1* Streams
2
3- don't need to fit in memory all at once
4- data not available all at once
5- instance of =EventEmitter=
6- is usually consumed using =pipe=
7- using streams, it can potentially scale infinitely because th amount of memory used by the process stays constant
8- most programs written using streams will scale well with any input size
9
10** Pratical
11Create a huge file
12
13- creates a file with 400MB using a =writable= stream
14
15#+begin_src js :tangle examples/stream/create-big-file.mjs
16 import fs from 'node:fs'
17
18 const file = fs.createWriteStream('./big.file')
19
20 for (let i = 0; i <= 1e6; i++) {
21 file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n')
22 }
23
24 file.end()
25#+end_src
26
27Let's serve this files with a running server
28
29- sends the file without blocking the main thread
30- reads all the file in memory and then send it for the client
31
32#+begin_src js :tangle examples/stream/server-without-stream.mjs
33 import fs from 'node:fs'
34 import { createServer } from 'node:http'
35
36 const server = createServer()
37
38 server.on('request', (req, res) => {
39 fs.readFile('./big.file', (err, data) => {
40 if (err) throw err;
41
42 res.end(data)
43 })
44 })
45
46 server.listen(8000)
47#+end_src
48
49- HTTP response object is also a writable stream
50
51#+begin_src js :tangle examples/stream/server-with-stream.mjs
52 import fs from 'node:fs'
53 import { createServer } from 'node:http'
54
55 const server = createServer()
56
57 server.on('request', (req, res) => {
58 const src = fs.createReadStream('./big.file')
59
60 src.pipe(res)
61 })
62
63 server.listen(8000)
64#+end_src
65
66** Types of streams
67
68- Readable stream :: source in which data can be consumed
69- Writable stream :: destination to which data can be written
70- Duplex stream :: is both Readable and Writable stream
71- Transform stream :: is a duplex that is used to transform data when data is read and written
72
73** Pipe
74
75- pipes the output of a stream into the input of the next
76- source has to be a readable stream and the destination has to be a writable
77- lacks error handling
78- it's better to use the =pump= library
79
80** Stream events
81
82- streams can also be consumed with events
83- streams inherit from the =EventEmitter= class
84 - data :: emitted when new data is read from a readable stream
85 - end :: emitted whe na readable stream has no more data available and all the available data has been read
86 - finish :: emitted when a writable stream has been ended and all pending writes have been completed
87 - close :: loosely defined in the stream docs, close is usually emitted when the stream is fully closed
88 - error :: emitted when a stream has experienced an error
89 - paused :: emitted when a readable streams has been paused. Pausing will happen when either backpressure occurs or if the pause method is explicitly called
90 - resume :: emitted when a readable stream goes from being palsed to being resumed again
91
92** Paused and Flowing Modes of readable streams
93
94- also referred as =pull= and =push= modes
95- all readable streams starts =paused= but can be switched to flowing mode in one of the following ways
96 - adding a =data= event handler
97 - calling the =stream.resume()= method
98 - calling the =stream.pipe()= method to send the data to a Writable
99- the =Readable= can switch back to paused mode using one of the following
100 - if there are no pipe destinations, by calling the =stream.pause()= method
101 - if there are pipe destinations, by removing all pipe destinations. Multiple pipe destinations may be removed by calling the =stream.unpipe()= method
102- in flowing mode, data can be lost if no consumers are available
103- when consuming readable streams using =pipe=, we don't need to worry about this as it is managed automatically
104- *important concept* :: a =Readable= will not generate data until a mechanism for either consuming or ignoring that data is provided. If the consuming mechanism is disabled or taken away, the =Readable= will attempt to stop generating the data.
105
106** Implement Writable Stream
107
108#+begin_src js :tangle examples/stream/implement-writable-stream.mjs
109 import { Writable } from 'node:stream'
110
111 const outStream = new Writable({
112 write(chunk, encoding, callback) {
113 console.log(chunk.toString())
114 callback()
115 }
116 })
117
118 process.stdin.pipe(outStream)
119#+end_src
120
121- chunk :: usually a buffer, unless specified
122- encoding :: is needed, but usually ignored
123- callback :: it signals whether the write was successful or not. To signal a failure call the callback with an error object
124
125** Implement a Readable Stream
126
127#+begin_src js :tangle examples/stream/implement-readable-stream.mjs
128 import { Readable } from 'node:stream'
129
130 const inStream = new Readable({
131 read(size) {
132 this.push(String.fromCharCode(this.currentCharCode++))
133 if (this.currentCharCode > 90 ) {
134 this.push(null)
135 }
136 }
137 })
138
139 inStream.currentCharCode = 65
140
141 inStream.pipe(process.stdout)
142#+end_src
143
144- pushing data into the stream when the consumer asks
145
146** Implement Duplex/Transform Stream
147
148#+begin_src js :tangle examples/stream/implement-duplex-stream.mjs
149 import { Duplex } from 'node:stream'
150
151 const inoutStream = new Duplex({
152 write(chunk, encoding, callback) {
153 console.log(chunk.toString())
154 callback()
155 }
156
157 read(size) {
158 this.push(String.fromCharCode(this.currentCharCode++))
159 if (this.currentCharCode > 90 ) {
160 this.push(null)
161 }
162 }
163 })
164
165 inoutStream.currentCharCode = 65
166
167 process.stding
168 .pipe(inountStream)
169 .pipe(process.stdout)
170#+end_src
171
172- readable and writable sides of a duplex operate independently from one another
173
174#+begin_src js :tangle examples/stream/implement-transform-stream.mjs
175 import { Transform } from 'node:stream'
176
177 const upperCaseTr = new Transform({
178 transform(chunk, encoding, callback) {
179 this.push(chunk.toString().toUpperCase())
180 callback()
181 }
182 })
183
184 process.stding
185 .pipe(upperCaseTr)
186 .pipe(process.stdout)
187#+end_src
188
189** Streams Object Mode
190
191- by default, streams only accepts Buffer or String
192- flag =objectMode= to set stream to accept any js object
193
194#+begin_src js :tangle examples/stream/implement-object-mode-stream.mjs
195 import { Transform } from 'node:stream'
196
197 const commaSplitter = new Transform({
198 readableObjectMode: true,
199
200 transform(chunk, encoding, callback) {
201 this.push(chunk.toString().trim().split(','))
202 callback()
203 }
204 })
205
206 const arrayToObject = new Transform({
207 readableObjectMode: true,
208 writableObjectMode: true,
209
210 transform(chunk, encoding, callback) {
211 const obj = {}
212 for (let i = 0; i < chunk.length; i += 2) {
213 obj[chunk[i]] = chunk[i + 1]
214 }
215 this.push(obj)
216
217 callback()
218 }
219 })
220
221 const objectToString = new Transform({
222 readableObjectMode: true,
223 writableObjectMode: true,
224
225 transform(chunk, encoding, callback) {
226 this.push(JSON.stringify(chunk) + '\n')
227 callback()
228 }
229 })
230
231 process.stdin
232 .pipe(commaSplitter)
233 .pipe(arrayToObject)
234 .pipe(objectToString)
235 .pipe(process.stdout)
236#+end_src
237
238** Async iterators
239
240- It's highly recommended to use async iterator when working with streams.
241- you can use async iterator when reading from readable streams
242- note that we had to use an async function because we wanted to return a Promise
243- it's important to keep in mind to not mix async function with =EventEmitter= because there is no way to catch a rejection when it is emitted withint an event handler
244
245#+begin_src js :tangle examples/stream/implement-async-iterators.mjs
246 import { createReadStream } from 'node:fs'
247
248 async function logChunks(readable) {
249 for await (const chunk of readable) {
250 console.log(chunk)
251 }
252 }
253
254 const readable = craeateReadStream('./big.file', { encoding: 'utf8' })
255
256 logChunks(readable)
257#+end_src
258
259** Creating readable streams from iterables
260
261- there's a =from= method to create Readable streams out of iterators
262
263#+begin_src js :tangle examples/stream/implement-readable-streams-from-iterables.mjs
264 import { Readable } from 'node:stream'
265
266 async function* generate() {
267 yield 'hello'
268 yield 'streams'
269 }
270
271 const readable = Readable.from(generate())
272
273 readable.on('data', chunk => {
274 console.log(chunk)
275 })
276#+end_src
277
278** Backpressure
279
280- using =pipe= is safer than using =data= events and then writing go another stream directly, because it also handles backpressure for free
281- memory management capability where the amount of in-process memory is kept at a constant by holding data in the external pipeline
282
283* References
284
285- [[https://www.freecodecamp.org/news/node-js-streams-everything-you-need-to-know-c9141306be93/][Node.js Streams: Everything you need to know]]
286- [[https://nodesource.com/blog/understanding-streams-in-nodejs/][Understanding Streams in Node.js]]
287- [[https://www.packtpub.com/en-us/product/node-cookbook-9781838558758][Node Cookbook, 3rd Edition - Chapter 4 - Using streams]]