ocaml http/1, http/2 and websocket client and server library
at main 8.3 kB view raw
1(** Streaming abstractions for HCS HTTP library. 2 3 This module provides both synchronous and asynchronous stream types for 4 handling large payloads efficiently with backpressure support. 5 6 {1 Synchronous Streams} 7 8 For simple, pull-based iteration without runtime dependencies. 9 10 {1 Asynchronous Streams (Eio)} 11 12 For streaming with Eio's structured concurrency, supporting flows, files, 13 and chunked transfer encoding. *) 14 15(** {1 Synchronous Stream} 16 17 A simple pull-based stream using OCaml's Seq. *) 18module Sync = struct 19 type 'a t = 'a Seq.t 20 (** A synchronous stream of values *) 21 22 (** {2 Producers} *) 23 24 let empty : 'a t = Seq.empty 25 let singleton x : 'a t = Seq.return x 26 let of_list l : 'a t = List.to_seq l 27 let of_array a : 'a t = Array.to_seq a 28 29 (** Create a stream from an unfolding function *) 30 let unfold (f : 's -> ('a * 's) option) (init : 's) : 'a t = Seq.unfold f init 31 32 (** Create a stream that repeats a value n times *) 33 let repeat n x : 'a t = 34 unfold (fun i -> if i > 0 then Some (x, i - 1) else None) n 35 36 (** Create a stream from a generator function *) 37 let generate (f : unit -> 'a option) : 'a t = 38 let rec next () = 39 match f () with Some x -> Seq.Cons (x, next) | None -> Seq.Nil 40 in 41 next 42 43 (** {2 Transformers} *) 44 45 let map f s : 'a t = Seq.map f s 46 let filter p s : 'a t = Seq.filter p s 47 let filter_map f s : 'a t = Seq.filter_map f s 48 49 (** Take the first n elements *) 50 let take n s : 'a t = Seq.take n s 51 52 (** Drop the first n elements *) 53 let drop n s : 'a t = Seq.drop n s 54 55 (** Split stream into chunks of size n *) 56 let chunks n s : 'a list t = 57 let rec next acc count seq () = 58 if count >= n then Seq.Cons (List.rev acc, next [] 0 seq) 59 else 60 match seq () with 61 | Seq.Nil -> 62 if acc = [] then Seq.Nil 63 else Seq.Cons (List.rev acc, fun () -> Seq.Nil) 64 | Seq.Cons (x, rest) -> next (x :: acc) (count + 1) rest () 65 in 66 next [] 0 s 67 68 (** Flatten a stream of streams *) 69 let flatten s : 'a t = Seq.flat_map Fun.id s 70 71 (** Append two streams *) 72 let append s1 s2 : 'a t = Seq.append s1 s2 73 74 (** {2 Consumers} *) 75 76 (** Fold over the stream *) 77 let fold f init s = Seq.fold_left f init s 78 79 (** Iterate over the stream for side effects *) 80 let iter f s = Seq.iter f s 81 82 (** Drain the stream, discarding all values *) 83 let drain s = iter (fun _ -> ()) s 84 85 (** Collect stream into a list *) 86 let to_list s = List.of_seq s 87 88 (** Collect stream into an array *) 89 let to_array s = Array.of_seq s 90 91 (** {2 Cstruct-specific operations} *) 92 93 (** Concatenate a stream of Cstructs into a single string *) 94 let cstructs_to_string (s : Cstruct.t t) : string = 95 let bufs = to_list s in 96 let total = List.fold_left (fun acc cs -> acc + Cstruct.length cs) 0 bufs in 97 let result = Bytes.create total in 98 let _ = 99 List.fold_left 100 (fun off cs -> 101 let len = Cstruct.length cs in 102 Cstruct.blit_to_bytes cs 0 result off len; 103 off + len) 104 0 bufs 105 in 106 Bytes.to_string result 107 108 (** Split a string into chunks of Cstructs *) 109 let string_to_cstructs ?(chunk_size = 4096) (s : string) : Cstruct.t t = 110 let len = String.length s in 111 unfold 112 (fun off -> 113 if off >= len then None 114 else 115 let chunk_len = min chunk_size (len - off) in 116 Some (Cstruct.of_string ~off ~len:chunk_len s, off + chunk_len)) 117 0 118end 119 120(** {1 Asynchronous Stream (Eio)} 121 122 Streams that integrate with Eio's structured concurrency. *) 123module Async = struct 124 type 'a t = unit -> 'a option 125 (** An asynchronous stream that can be pulled from *) 126 127 (** {2 Producers} *) 128 129 let empty : 'a t = fun () -> None 130 131 let singleton x : 'a t = 132 let taken = ref false in 133 fun () -> 134 if !taken then None 135 else begin 136 taken := true; 137 Some x 138 end 139 140 let of_list l : 'a t = 141 let r = ref l in 142 fun () -> 143 match !r with 144 | [] -> None 145 | x :: rest -> 146 r := rest; 147 Some x 148 149 let of_seq s : 'a t = 150 let r = ref s in 151 fun () -> 152 match !r () with 153 | Seq.Nil -> None 154 | Seq.Cons (x, rest) -> 155 r := rest; 156 Some x 157 158 (** Create a stream from an Eio flow (reads until EOF) *) 159 let of_flow ?(buf_size = 4096) (flow : _ Eio.Flow.source) : Cstruct.t t = 160 let buf = Cstruct.create buf_size in 161 let finished = ref false in 162 fun () -> 163 if !finished then None 164 else 165 try 166 let n = Eio.Flow.single_read flow buf in 167 Some (Cstruct.sub buf 0 n) 168 with End_of_file -> 169 finished := true; 170 None 171 172 (** Create a stream that reads a file in chunks *) 173 let of_file ?(buf_size = 4096) ~fs path : Cstruct.t t = 174 let file = Eio.Path.open_in ~sw:(Eio.Switch.run Fun.id) (fs, path) in 175 of_flow ~buf_size file 176 177 (** {2 Transformers} *) 178 179 let map f (s : 'a t) : 'b t = fun () -> Option.map f (s ()) 180 181 let filter p (s : 'a t) : 'a t = 182 let rec next () = 183 match s () with None -> None | Some x -> if p x then Some x else next () 184 in 185 next 186 187 let filter_map f (s : 'a t) : 'b t = 188 let rec next () = 189 match s () with 190 | None -> None 191 | Some x -> ( match f x with Some y -> Some y | None -> next ()) 192 in 193 next 194 195 let take n (s : 'a t) : 'a t = 196 let count = ref 0 in 197 fun () -> 198 if !count >= n then None 199 else begin 200 incr count; 201 s () 202 end 203 204 (** {2 Consumers} *) 205 206 (** Fold over the stream *) 207 let fold f init (s : 'a t) = 208 let rec loop acc = 209 match s () with None -> acc | Some x -> loop (f acc x) 210 in 211 loop init 212 213 (** Iterate over the stream *) 214 let iter f (s : 'a t) = 215 let rec loop () = 216 match s () with 217 | None -> () 218 | Some x -> 219 f x; 220 loop () 221 in 222 loop () 223 224 (** Drain the stream *) 225 let drain s = iter (fun _ -> ()) s 226 227 (** Collect into a list *) 228 let to_list (s : 'a t) = List.rev (fold (fun acc x -> x :: acc) [] s) 229 230 (** Write stream to an Eio flow *) 231 let to_flow (flow : _ Eio.Flow.sink) (s : Cstruct.t t) = 232 iter (fun cs -> Eio.Flow.write flow [ cs ]) s 233 234 (** {2 Cstruct-specific operations} *) 235 236 (** Concatenate a stream of Cstructs into a single string *) 237 let cstructs_to_string (s : Cstruct.t t) : string = 238 let bufs = to_list s in 239 let total = List.fold_left (fun acc cs -> acc + Cstruct.length cs) 0 bufs in 240 let result = Bytes.create total in 241 let _ = 242 List.fold_left 243 (fun off cs -> 244 let len = Cstruct.length cs in 245 Cstruct.blit_to_bytes cs 0 result off len; 246 off + len) 247 0 bufs 248 in 249 Bytes.to_string result 250 251 (** Read entire stream into a Cstruct *) 252 let cstructs_to_cstruct (s : Cstruct.t t) : Cstruct.t = 253 let bufs = to_list s in 254 let total = List.fold_left (fun acc cs -> acc + Cstruct.length cs) 0 bufs in 255 let result = Cstruct.create total in 256 let _ = 257 List.fold_left 258 (fun off cs -> 259 let len = Cstruct.length cs in 260 Cstruct.blit cs 0 result off len; 261 off + len) 262 0 bufs 263 in 264 result 265end 266 267(** {1 Chunked Transfer Encoding} *) 268 269(** Helpers for HTTP chunked transfer encoding *) 270module Chunked = struct 271 (** Encode chunks for chunked transfer encoding *) 272 let encode (chunks : Cstruct.t Sync.t) : Cstruct.t Sync.t = 273 let encode_chunk cs = 274 let len = Cstruct.length cs in 275 if len = 0 then Cstruct.of_string "0\r\n\r\n" 276 else 277 let header = 278 Buf.build64 (fun b -> 279 Buf.hex b len; 280 Buf.string b "\r\n") 281 in 282 let trailer = "\r\n" in 283 let total = String.length header + len + String.length trailer in 284 let buf = Cstruct.create total in 285 Cstruct.blit_from_string header 0 buf 0 (String.length header); 286 Cstruct.blit cs 0 buf (String.length header) len; 287 Cstruct.blit_from_string trailer 0 buf 288 (String.length header + len) 289 (String.length trailer); 290 buf 291 in 292 Seq.append 293 (Seq.map encode_chunk chunks) 294 (Seq.return (Cstruct.of_string "0\r\n\r\n")) 295 296 (** Calculate content length from chunks (consumes the stream) *) 297 let content_length (chunks : Cstruct.t Sync.t) : int = 298 Sync.fold (fun acc cs -> acc + Cstruct.length cs) 0 chunks 299end