ocaml http/1, http/2 and websocket client and server library
1(** Streaming abstractions for HCS HTTP library.
2
3 This module provides both synchronous and asynchronous stream types for
4 handling large payloads efficiently with backpressure support.
5
6 {1 Synchronous Streams}
7
8 For simple, pull-based iteration without runtime dependencies.
9
10 {1 Asynchronous Streams (Eio)}
11
12 For streaming with Eio's structured concurrency, supporting flows, files,
13 and chunked transfer encoding. *)
14
15(** {1 Synchronous Stream}
16
17 A simple pull-based stream using OCaml's Seq. *)
18module Sync = struct
19 type 'a t = 'a Seq.t
20 (** A synchronous stream of values *)
21
22 (** {2 Producers} *)
23
24 let empty : 'a t = Seq.empty
25 let singleton x : 'a t = Seq.return x
26 let of_list l : 'a t = List.to_seq l
27 let of_array a : 'a t = Array.to_seq a
28
29 (** Create a stream from an unfolding function *)
30 let unfold (f : 's -> ('a * 's) option) (init : 's) : 'a t = Seq.unfold f init
31
32 (** Create a stream that repeats a value n times *)
33 let repeat n x : 'a t =
34 unfold (fun i -> if i > 0 then Some (x, i - 1) else None) n
35
36 (** Create a stream from a generator function *)
37 let generate (f : unit -> 'a option) : 'a t =
38 let rec next () =
39 match f () with Some x -> Seq.Cons (x, next) | None -> Seq.Nil
40 in
41 next
42
43 (** {2 Transformers} *)
44
45 let map f s : 'a t = Seq.map f s
46 let filter p s : 'a t = Seq.filter p s
47 let filter_map f s : 'a t = Seq.filter_map f s
48
49 (** Take the first n elements *)
50 let take n s : 'a t = Seq.take n s
51
52 (** Drop the first n elements *)
53 let drop n s : 'a t = Seq.drop n s
54
55 (** Split stream into chunks of size n *)
56 let chunks n s : 'a list t =
57 let rec next acc count seq () =
58 if count >= n then Seq.Cons (List.rev acc, next [] 0 seq)
59 else
60 match seq () with
61 | Seq.Nil ->
62 if acc = [] then Seq.Nil
63 else Seq.Cons (List.rev acc, fun () -> Seq.Nil)
64 | Seq.Cons (x, rest) -> next (x :: acc) (count + 1) rest ()
65 in
66 next [] 0 s
67
68 (** Flatten a stream of streams *)
69 let flatten s : 'a t = Seq.flat_map Fun.id s
70
71 (** Append two streams *)
72 let append s1 s2 : 'a t = Seq.append s1 s2
73
74 (** {2 Consumers} *)
75
76 (** Fold over the stream *)
77 let fold f init s = Seq.fold_left f init s
78
79 (** Iterate over the stream for side effects *)
80 let iter f s = Seq.iter f s
81
82 (** Drain the stream, discarding all values *)
83 let drain s = iter (fun _ -> ()) s
84
85 (** Collect stream into a list *)
86 let to_list s = List.of_seq s
87
88 (** Collect stream into an array *)
89 let to_array s = Array.of_seq s
90
91 (** {2 Cstruct-specific operations} *)
92
93 (** Concatenate a stream of Cstructs into a single string *)
94 let cstructs_to_string (s : Cstruct.t t) : string =
95 let bufs = to_list s in
96 let total = List.fold_left (fun acc cs -> acc + Cstruct.length cs) 0 bufs in
97 let result = Bytes.create total in
98 let _ =
99 List.fold_left
100 (fun off cs ->
101 let len = Cstruct.length cs in
102 Cstruct.blit_to_bytes cs 0 result off len;
103 off + len)
104 0 bufs
105 in
106 Bytes.to_string result
107
108 (** Split a string into chunks of Cstructs *)
109 let string_to_cstructs ?(chunk_size = 4096) (s : string) : Cstruct.t t =
110 let len = String.length s in
111 unfold
112 (fun off ->
113 if off >= len then None
114 else
115 let chunk_len = min chunk_size (len - off) in
116 Some (Cstruct.of_string ~off ~len:chunk_len s, off + chunk_len))
117 0
118end
119
120(** {1 Asynchronous Stream (Eio)}
121
122 Streams that integrate with Eio's structured concurrency. *)
123module Async = struct
124 type 'a t = unit -> 'a option
125 (** An asynchronous stream that can be pulled from *)
126
127 (** {2 Producers} *)
128
129 let empty : 'a t = fun () -> None
130
131 let singleton x : 'a t =
132 let taken = ref false in
133 fun () ->
134 if !taken then None
135 else begin
136 taken := true;
137 Some x
138 end
139
140 let of_list l : 'a t =
141 let r = ref l in
142 fun () ->
143 match !r with
144 | [] -> None
145 | x :: rest ->
146 r := rest;
147 Some x
148
149 let of_seq s : 'a t =
150 let r = ref s in
151 fun () ->
152 match !r () with
153 | Seq.Nil -> None
154 | Seq.Cons (x, rest) ->
155 r := rest;
156 Some x
157
158 (** Create a stream from an Eio flow (reads until EOF) *)
159 let of_flow ?(buf_size = 4096) (flow : _ Eio.Flow.source) : Cstruct.t t =
160 let buf = Cstruct.create buf_size in
161 let finished = ref false in
162 fun () ->
163 if !finished then None
164 else
165 try
166 let n = Eio.Flow.single_read flow buf in
167 Some (Cstruct.sub buf 0 n)
168 with End_of_file ->
169 finished := true;
170 None
171
172 (** Create a stream that reads a file in chunks *)
173 let of_file ?(buf_size = 4096) ~fs path : Cstruct.t t =
174 let file = Eio.Path.open_in ~sw:(Eio.Switch.run Fun.id) (fs, path) in
175 of_flow ~buf_size file
176
177 (** {2 Transformers} *)
178
179 let map f (s : 'a t) : 'b t = fun () -> Option.map f (s ())
180
181 let filter p (s : 'a t) : 'a t =
182 let rec next () =
183 match s () with None -> None | Some x -> if p x then Some x else next ()
184 in
185 next
186
187 let filter_map f (s : 'a t) : 'b t =
188 let rec next () =
189 match s () with
190 | None -> None
191 | Some x -> ( match f x with Some y -> Some y | None -> next ())
192 in
193 next
194
195 let take n (s : 'a t) : 'a t =
196 let count = ref 0 in
197 fun () ->
198 if !count >= n then None
199 else begin
200 incr count;
201 s ()
202 end
203
204 (** {2 Consumers} *)
205
206 (** Fold over the stream *)
207 let fold f init (s : 'a t) =
208 let rec loop acc =
209 match s () with None -> acc | Some x -> loop (f acc x)
210 in
211 loop init
212
213 (** Iterate over the stream *)
214 let iter f (s : 'a t) =
215 let rec loop () =
216 match s () with
217 | None -> ()
218 | Some x ->
219 f x;
220 loop ()
221 in
222 loop ()
223
224 (** Drain the stream *)
225 let drain s = iter (fun _ -> ()) s
226
227 (** Collect into a list *)
228 let to_list (s : 'a t) = List.rev (fold (fun acc x -> x :: acc) [] s)
229
230 (** Write stream to an Eio flow *)
231 let to_flow (flow : _ Eio.Flow.sink) (s : Cstruct.t t) =
232 iter (fun cs -> Eio.Flow.write flow [ cs ]) s
233
234 (** {2 Cstruct-specific operations} *)
235
236 (** Concatenate a stream of Cstructs into a single string *)
237 let cstructs_to_string (s : Cstruct.t t) : string =
238 let bufs = to_list s in
239 let total = List.fold_left (fun acc cs -> acc + Cstruct.length cs) 0 bufs in
240 let result = Bytes.create total in
241 let _ =
242 List.fold_left
243 (fun off cs ->
244 let len = Cstruct.length cs in
245 Cstruct.blit_to_bytes cs 0 result off len;
246 off + len)
247 0 bufs
248 in
249 Bytes.to_string result
250
251 (** Read entire stream into a Cstruct *)
252 let cstructs_to_cstruct (s : Cstruct.t t) : Cstruct.t =
253 let bufs = to_list s in
254 let total = List.fold_left (fun acc cs -> acc + Cstruct.length cs) 0 bufs in
255 let result = Cstruct.create total in
256 let _ =
257 List.fold_left
258 (fun off cs ->
259 let len = Cstruct.length cs in
260 Cstruct.blit cs 0 result off len;
261 off + len)
262 0 bufs
263 in
264 result
265end
266
267(** {1 Chunked Transfer Encoding} *)
268
269(** Helpers for HTTP chunked transfer encoding *)
270module Chunked = struct
271 (** Encode chunks for chunked transfer encoding *)
272 let encode (chunks : Cstruct.t Sync.t) : Cstruct.t Sync.t =
273 let encode_chunk cs =
274 let len = Cstruct.length cs in
275 if len = 0 then Cstruct.of_string "0\r\n\r\n"
276 else
277 let header =
278 Buf.build64 (fun b ->
279 Buf.hex b len;
280 Buf.string b "\r\n")
281 in
282 let trailer = "\r\n" in
283 let total = String.length header + len + String.length trailer in
284 let buf = Cstruct.create total in
285 Cstruct.blit_from_string header 0 buf 0 (String.length header);
286 Cstruct.blit cs 0 buf (String.length header) len;
287 Cstruct.blit_from_string trailer 0 buf
288 (String.length header + len)
289 (String.length trailer);
290 buf
291 in
292 Seq.append
293 (Seq.map encode_chunk chunks)
294 (Seq.return (Cstruct.of_string "0\r\n\r\n"))
295
296 (** Calculate content length from chunks (consumes the stream) *)
297 let content_length (chunks : Cstruct.t Sync.t) : int =
298 Sync.fold (fun acc cs -> acc + Cstruct.length cs) 0 chunks
299end