+9
Node/examples/stream/create-big-file.mjs
+9
Node/examples/stream/create-big-file.mjs
···
1
+
import fs from 'node:fs'
2
+
3
+
const file = fs.createWriteStream('./big.file')
4
+
5
+
for (let i = 0; i <= 1e6; i++) {
6
+
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n')
7
+
}
8
+
9
+
file.end()
+21
Node/examples/stream/implement-duplex-stream.mjs
+21
Node/examples/stream/implement-duplex-stream.mjs
···
1
+
import { Duplex } from 'node:stream'
2
+
3
+
const inoutStream = new Duplex({
4
+
write(chunk, encoding, callback) {
5
+
console.log(chunk.toString())
6
+
callback()
7
+
}
8
+
9
+
read(size) {
10
+
this.push(String.fromCharCode(this.currentCharCode++))
11
+
if (this.currentCharCode > 90 ) {
12
+
this.push(null)
13
+
}
14
+
}
15
+
})
16
+
17
+
inoutStream.currentCharCode = 65
18
+
19
+
process.stding
20
+
.pipe(inountStream)
21
+
.pipe(process.stdout)
+41
Node/examples/stream/implement-object-mode-stream.mjs
+41
Node/examples/stream/implement-object-mode-stream.mjs
···
1
+
import { Transform } from 'node:stream'
2
+
3
+
const commaSplitter = new Transform({
4
+
readableObjectMode: true,
5
+
6
+
transform(chunk, encoding, callback) {
7
+
this.push(chunk.toString().trim().split(','))
8
+
callback()
9
+
}
10
+
})
11
+
12
+
const arrayToObject = new Transform({
13
+
readableObjectMode: true,
14
+
writableObjectMode: true,
15
+
16
+
transform(chunk, encoding, callback) {
17
+
const obj = {}
18
+
for (let i = 0; i < chunk.length; i += 2) {
19
+
obj[chunk[i]] = chunk[i + 1]
20
+
}
21
+
this.push(obj)
22
+
23
+
callback()
24
+
}
25
+
})
26
+
27
+
const objectToString = new Transform({
28
+
readableObjectMode: true,
29
+
writableObjectMode: true,
30
+
31
+
transform(chunk, encoding, callback) {
32
+
this.push(JSON.stringify(chunk) + '\n')
33
+
callback()
34
+
}
35
+
})
36
+
37
+
process.stdin
38
+
.pipe(commaSplitter)
39
+
.pipe(arrayToObject)
40
+
.pipe(objectToString)
41
+
.pipe(process.stdout)
+14
Node/examples/stream/implement-readable-stream.mjs
+14
Node/examples/stream/implement-readable-stream.mjs
···
1
+
import { Readable } from 'node:stream'
2
+
3
+
const inStream = new Readable({
4
+
read(size) {
5
+
this.push(String.fromCharCode(this.currentCharCode++))
6
+
if (this.currentCharCode > 90 ) {
7
+
this.push(null)
8
+
}
9
+
}
10
+
})
11
+
12
+
inStream.currentCharCode = 65
13
+
14
+
inStream.pipe(process.stdout)
+12
Node/examples/stream/implement-transform-stream.mjs
+12
Node/examples/stream/implement-transform-stream.mjs
+10
Node/examples/stream/implement-writable-stream.mjs
+10
Node/examples/stream/implement-writable-stream.mjs
+12
Node/examples/stream/server-with-stream.mjs
+12
Node/examples/stream/server-with-stream.mjs
+14
Node/examples/stream/server-without-stream.mjs
+14
Node/examples/stream/server-without-stream.mjs
···
1
+
import fs from 'node:fs'
2
+
import { createServer } from 'node:http'
3
+
4
+
const server = createServer()
5
+
6
+
server.on('request', (req, res) => {
7
+
fs.readFile('./big.file', (err, data) => {
8
+
if (err) throw err;
9
+
10
+
res.end(data)
11
+
})
12
+
})
13
+
14
+
server.listen(8000)
+217
Node/streams.org
+217
Node/streams.org
···
1
+
* Streams
2
+
3
+
- don't need to fit in memory all at once
4
+
- data not available all at once
5
+
- instance of =EventEmitter=
6
+
- is usually consumed using =pipe=
7
+
8
+
** Pratical
9
+
Create a huge file
10
+
11
+
- creates a file with 400MB using a =writable= stream
12
+
13
+
#+begin_src js :tangle examples/stream/create-big-file.mjs
14
+
import fs from 'node:fs'
15
+
16
+
const file = fs.createWriteStream('./big.file')
17
+
18
+
for (let i = 0; i <= 1e6; i++) {
19
+
file.write('Lorem ipsum dolor sit amet, consectetur adipisicing elit, sed do eiusmod tempor incididunt ut labore et dolore magna aliqua. Ut enim ad minim veniam, quis nostrud exercitation ullamco laboris nisi ut aliquip ex ea commodo consequat. Duis aute irure dolor in reprehenderit in voluptate velit esse cillum dolore eu fugiat nulla pariatur. Excepteur sint occaecat cupidatat non proident, sunt in culpa qui officia deserunt mollit anim id est laborum.\n')
20
+
}
21
+
22
+
file.end()
23
+
#+end_src
24
+
25
+
Let's serve this files with a running server
26
+
27
+
- sends the file without blocking the main thread
28
+
- reads all the file in memory and then send it for the client
29
+
30
+
#+begin_src js :tangle examples/stream/server-without-stream.mjs
31
+
import fs from 'node:fs'
32
+
import { createServer } from 'node:http'
33
+
34
+
const server = createServer()
35
+
36
+
server.on('request', (req, res) => {
37
+
fs.readFile('./big.file', (err, data) => {
38
+
if (err) throw err;
39
+
40
+
res.end(data)
41
+
})
42
+
})
43
+
44
+
server.listen(8000)
45
+
#+end_src
46
+
47
+
- HTTP response object is also a writable stream
48
+
49
+
#+begin_src js :tangle examples/stream/server-with-stream.mjs
50
+
import fs from 'node:fs'
51
+
import { createServer } from 'node:http'
52
+
53
+
const server = createServer()
54
+
55
+
server.on('request', (req, res) => {
56
+
const src = fs.createReadStream('./big.file')
57
+
58
+
src.pipe(res)
59
+
})
60
+
61
+
server.listen(8000)
62
+
#+end_src
63
+
64
+
** Types of streams
65
+
66
+
- Readable stream :: source in which data can be consumed
67
+
- Writable stream :: destination to which data can be written
68
+
- Duplex stream :: is both Readable and Writable stream
69
+
- Transform stream :: is a duplex that is used to transform data when data is read and written
70
+
71
+
** Pipe
72
+
73
+
- pipes the output of a stream into the input of the next
74
+
- source has to be a readable stream and the destination has to be a writable
75
+
76
+
** Stream events
77
+
78
+
- streams can also be consumed with events
79
+
80
+
** Paused and Flowing Modes of readable streams
81
+
82
+
- also referred as =pull= and =push= modes
83
+
- all readable streams starts =paused=, only after the =read()= is that it changes to =flowing=
84
+
- in flowing mode, data can be lost if no consumers are available
85
+
- when consuming readable streams using =pipe=, we don't nedd to worry about this as it is managed automatically
86
+
87
+
** Implement Writable Stream
88
+
89
+
#+begin_src js :tangle examples/stream/implement-writable-stream.mjs
90
+
import { Writable } from 'node:stream'
91
+
92
+
const outStream = new Writable({
93
+
write(chunk, encoding, callback) {
94
+
console.log(chunk.toString())
95
+
callback()
96
+
}
97
+
})
98
+
99
+
process.stdin.pipe(outStream)
100
+
#+end_src
101
+
102
+
- chunk :: usually a buffer, unless specified
103
+
- encoding :: is needed, but usually ignored
104
+
- callback :: it signals whether the write was successful or not. To signal a failure call the callback with an error object
105
+
106
+
** Implement a Readable Stream
107
+
108
+
#+begin_src js :tangle examples/stream/implement-readable-stream.mjs
109
+
import { Readable } from 'node:stream'
110
+
111
+
const inStream = new Readable({
112
+
read(size) {
113
+
this.push(String.fromCharCode(this.currentCharCode++))
114
+
if (this.currentCharCode > 90 ) {
115
+
this.push(null)
116
+
}
117
+
}
118
+
})
119
+
120
+
inStream.currentCharCode = 65
121
+
122
+
inStream.pipe(process.stdout)
123
+
#+end_src
124
+
125
+
- pushing data into the stream when the consumer asks
126
+
127
+
** Implement Duplex/Transform Stream
128
+
129
+
#+begin_src js :tangle examples/stream/implement-duplex-stream.mjs
130
+
import { Duplex } from 'node:stream'
131
+
132
+
const inoutStream = new Duplex({
133
+
write(chunk, encoding, callback) {
134
+
console.log(chunk.toString())
135
+
callback()
136
+
}
137
+
138
+
read(size) {
139
+
this.push(String.fromCharCode(this.currentCharCode++))
140
+
if (this.currentCharCode > 90 ) {
141
+
this.push(null)
142
+
}
143
+
}
144
+
})
145
+
146
+
inoutStream.currentCharCode = 65
147
+
148
+
process.stding
149
+
.pipe(inountStream)
150
+
.pipe(process.stdout)
151
+
#+end_src
152
+
153
+
- readable and writable sides of a duplex operate independently from one another
154
+
155
+
#+begin_src js :tangle examples/stream/implement-transform-stream.mjs
156
+
import { Transform } from 'node:stream'
157
+
158
+
const upperCaseTr = new Transform({
159
+
transform(chunk, encoding, callback) {
160
+
this.push(chunk.toString().toUpperCase())
161
+
callback()
162
+
}
163
+
})
164
+
165
+
process.stding
166
+
.pipe(upperCaseTr)
167
+
.pipe(process.stdout)
168
+
#+end_src
169
+
170
+
** Streams Object Mode
171
+
172
+
- by default, streams only accepts Buffer or String
173
+
- flag =objectMode= to set stream to accept any js object
174
+
175
+
#+begin_src js :tangle examples/stream/implement-object-mode-stream.mjs
176
+
import { Transform } from 'node:stream'
177
+
178
+
const commaSplitter = new Transform({
179
+
readableObjectMode: true,
180
+
181
+
transform(chunk, encoding, callback) {
182
+
this.push(chunk.toString().trim().split(','))
183
+
callback()
184
+
}
185
+
})
186
+
187
+
const arrayToObject = new Transform({
188
+
readableObjectMode: true,
189
+
writableObjectMode: true,
190
+
191
+
transform(chunk, encoding, callback) {
192
+
const obj = {}
193
+
for (let i = 0; i < chunk.length; i += 2) {
194
+
obj[chunk[i]] = chunk[i + 1]
195
+
}
196
+
this.push(obj)
197
+
198
+
callback()
199
+
}
200
+
})
201
+
202
+
const objectToString = new Transform({
203
+
readableObjectMode: true,
204
+
writableObjectMode: true,
205
+
206
+
transform(chunk, encoding, callback) {
207
+
this.push(JSON.stringify(chunk) + '\n')
208
+
callback()
209
+
}
210
+
})
211
+
212
+
process.stdin
213
+
.pipe(commaSplitter)
214
+
.pipe(arrayToObject)
215
+
.pipe(objectToString)
216
+
.pipe(process.stdout)
217
+
#+end_src