···1616module Retry = Retry
1717module Cache = Cache
18181919+(* Note: RNG initialization should be done by the application using
2020+ Mirage_crypto_rng_unix.initialize before calling Eio_main.run.
2121+ We don't call use_default() here as it spawns background threads
2222+ that are incompatible with Eio's structured concurrency. *)
2323+1924(* Main API - Session functionality with concurrent fiber spawning *)
20252126type ('clock, 'net) t = {
+2-12
stack/requests/lib/response.ml
···1818 Eio.Switch.on_release sw (fun () ->
1919 if not response.closed then begin
2020 Log.debug (fun m -> m "Auto-closing response for %s via switch" url);
2121- try
2222- (* Read and discard remaining data *)
2323- let rec drain () =
2424- let buf = Cstruct.create 8192 in
2525- match Eio.Flow.single_read body buf with
2626- | 0 -> () (* EOF *)
2727- | _ -> drain ()
2828- in
2929- drain ();
3030- response.closed <- true
3131- with _ ->
3232- response.closed <- true
2121+ response.closed <- true;
2222+ (* TODO Body cleanup is handled by the underlying HTTP library but test this *)
3323 end
3424 );
3525
···1212}
13131414type state = {
1515- state_dir : Eio.Fs.dir_ty Eio.Path.t;
1515+ xdg : Xdge.t;
1616}
17171818(* State directory management *)
1919module State = struct
2020- let users_dir state = Eio.Path.(state.state_dir / "users")
2121- let feeds_dir state = Eio.Path.(state.state_dir / "feeds")
2020+ let users_dir state = Eio.Path.(Xdge.state_dir state.xdg / "users")
2121+ let feeds_dir state = Eio.Path.(Xdge.state_dir state.xdg / "feeds")
2222 let user_feeds_dir state = Eio.Path.(feeds_dir state / "user")
23232424 let user_file state username =
···259259 Ptime.compare b.updated a.updated
260260 ) combined
261261262262- let sync_user env state ~username =
262262+ let sync_user ~sw env state ~username =
263263 match State.load_user state username with
264264 | None ->
265265 Log.err (fun m -> m "User %s not found" username);
···270270 | Some user ->
271271 Log.info (fun m -> m "Syncing feeds for user %s..." username);
272272273273- (* Fetch all feeds *)
273273+ (* Create a single Requests session for all feeds *)
274274+ let requests = Requests.create ~sw env
275275+ ~follow_redirects:true
276276+ ~max_redirects:5 in
277277+278278+ (* Fetch all feeds using the shared session and switch *)
274279 let fetched_feeds =
275280 List.filter_map (fun source ->
276281 try
277282 Log.info (fun m -> m " Fetching %s (%s)..." source.River.name source.River.url);
278278- Some (River.fetch env source)
283283+ Some (River.fetch ~sw ~requests env source)
279284 with e ->
280285 Log.err (fun m -> m " Failed to fetch %s: %s"
281286 source.River.name (Printexc.to_string e));
···320325 0
321326 end
322327323323- let sync_all env state =
328328+ let sync_all ~sw env state =
324329 let users = State.list_users state in
325330 if users = [] then begin
326331 Log.info (fun m -> m "No users to sync");
···329334 Log.info (fun m -> m "Syncing %d users..." (List.length users));
330335 let results =
331336 List.map (fun username ->
332332- let result = sync_user env state ~username in
337337+ let result = sync_user ~sw env state ~username in
333338 Log.debug (fun m -> m "Completed sync for user");
334339 result
335340 ) users
···348353(* Cmdliner interface *)
349354open Cmdliner
350355351351-let state_dir =
352352- let doc = "State directory for storing user and feed data" in
353353- Arg.(value & opt string "~/.river" & info ["state-dir"; "d"] ~doc)
354354-355356let username_arg =
356357 let doc = "Username" in
357358 Arg.(required & pos 0 (some string) None & info [] ~docv:"USERNAME" ~doc)
···380381let log_level = Logs_cli.level ()
381382let log_style_renderer = Fmt_cli.style_renderer ()
382383383383-(* Commands *)
384384-let user_add_cmd =
384384+(* Commands - these are created within Eio context *)
385385+let user_add_cmd fs =
385386 let doc = "Add a new user" in
386386- let term = Term.(const (fun state_dir log_level style_renderer username fullname email ->
387387+ let xdg_term = Xdge.Cmd.term "river" fs ~config:false ~data:false ~cache:false ~runtime:false () in
388388+ let run log_level style_renderer (xdg, _cfg) username fullname email =
387389 setup_logs style_renderer log_level;
388388- Eio_main.run @@ fun env ->
389389- let state_dir =
390390- let path = if String.starts_with ~prefix:"~" state_dir then
391391- Filename.concat (Sys.getenv "HOME") (String.sub state_dir 2 (String.length state_dir - 2))
392392- else state_dir in
393393- Eio.Path.(Eio.Stdenv.fs env / path)
394394- in
395395- let state = { state_dir } in
390390+ let state = { xdg } in
396391 State.ensure_directories state;
397392 User.add state ~username ~fullname ~email
398398- ) $ state_dir $ log_level $ log_style_renderer $ username_arg $ fullname_arg $ email_arg) in
393393+ in
394394+ let term = Term.(const run $ log_level $ log_style_renderer $ xdg_term $ username_arg $ fullname_arg $ email_arg) in
399395 Cmd.v (Cmd.info "add" ~doc) term
400396401401-let user_remove_cmd =
397397+let user_remove_cmd fs =
402398 let doc = "Remove a user" in
403403- let term = Term.(const (fun state_dir log_level style_renderer username ->
399399+ let xdg_term = Xdge.Cmd.term "river" fs ~config:false ~data:false ~cache:false ~runtime:false () in
400400+ let run log_level style_renderer (xdg, _cfg) username =
404401 setup_logs style_renderer log_level;
405405- Eio_main.run @@ fun env ->
406406- let state_dir =
407407- let path = if String.starts_with ~prefix:"~" state_dir then
408408- Filename.concat (Sys.getenv "HOME") (String.sub state_dir 2 (String.length state_dir - 2))
409409- else state_dir in
410410- Eio.Path.(Eio.Stdenv.fs env / path)
411411- in
412412- let state = { state_dir } in
402402+ let state = { xdg } in
413403 User.remove state ~username
414414- ) $ state_dir $ log_level $ log_style_renderer $ username_arg) in
404404+ in
405405+ let term = Term.(const run $ log_level $ log_style_renderer $ xdg_term $ username_arg) in
415406 Cmd.v (Cmd.info "remove" ~doc) term
416407417417-let user_list_cmd =
408408+let user_list_cmd fs =
418409 let doc = "List all users" in
419419- let term = Term.(const (fun state_dir log_level style_renderer ->
410410+ let xdg_term = Xdge.Cmd.term "river" fs ~config:false ~data:false ~cache:false ~runtime:false () in
411411+ let run log_level style_renderer (xdg, _cfg) =
420412 setup_logs style_renderer log_level;
421421- Eio_main.run @@ fun env ->
422422- let state_dir =
423423- let path = if String.starts_with ~prefix:"~" state_dir then
424424- Filename.concat (Sys.getenv "HOME") (String.sub state_dir 2 (String.length state_dir - 2))
425425- else state_dir in
426426- Eio.Path.(Eio.Stdenv.fs env / path)
427427- in
428428- let state = { state_dir } in
413413+ let state = { xdg } in
429414 User.list state
430430- ) $ state_dir $ log_level $ log_style_renderer) in
415415+ in
416416+ let term = Term.(const run $ log_level $ log_style_renderer $ xdg_term) in
431417 Cmd.v (Cmd.info "list" ~doc) term
432418433433-let user_show_cmd =
419419+let user_show_cmd fs =
434420 let doc = "Show user details" in
435435- let term = Term.(const (fun state_dir log_level style_renderer username ->
421421+ let xdg_term = Xdge.Cmd.term "river" fs ~config:false ~data:false ~cache:false ~runtime:false () in
422422+ let run log_level style_renderer (xdg, _cfg) username =
436423 setup_logs style_renderer log_level;
437437- Eio_main.run @@ fun env ->
438438- let state_dir =
439439- let path = if String.starts_with ~prefix:"~" state_dir then
440440- Filename.concat (Sys.getenv "HOME") (String.sub state_dir 2 (String.length state_dir - 2))
441441- else state_dir in
442442- Eio.Path.(Eio.Stdenv.fs env / path)
443443- in
444444- let state = { state_dir } in
424424+ let state = { xdg } in
445425 User.show state ~username
446446- ) $ state_dir $ log_level $ log_style_renderer $ username_arg) in
426426+ in
427427+ let term = Term.(const run $ log_level $ log_style_renderer $ xdg_term $ username_arg) in
447428 Cmd.v (Cmd.info "show" ~doc) term
448429449449-let user_add_feed_cmd =
430430+let user_add_feed_cmd fs =
450431 let doc = "Add a feed to a user" in
451451- let term = Term.(const (fun state_dir log_level style_renderer username name url ->
432432+ let xdg_term = Xdge.Cmd.term "river" fs ~config:false ~data:false ~cache:false ~runtime:false () in
433433+ let run log_level style_renderer (xdg, _cfg) username name url =
452434 setup_logs style_renderer log_level;
453453- Eio_main.run @@ fun env ->
454454- let state_dir =
455455- let path = if String.starts_with ~prefix:"~" state_dir then
456456- Filename.concat (Sys.getenv "HOME") (String.sub state_dir 2 (String.length state_dir - 2))
457457- else state_dir in
458458- Eio.Path.(Eio.Stdenv.fs env / path)
459459- in
460460- let state = { state_dir } in
435435+ let state = { xdg } in
461436 User.add_feed state ~username ~name ~url
462462- ) $ state_dir $ log_level $ log_style_renderer $ username_arg $ feed_name_arg $ feed_url_arg) in
437437+ in
438438+ let term = Term.(const run $ log_level $ log_style_renderer $ xdg_term $ username_arg $ feed_name_arg $ feed_url_arg) in
463439 Cmd.v (Cmd.info "add-feed" ~doc) term
464440465465-let user_remove_feed_cmd =
441441+let user_remove_feed_cmd fs =
466442 let doc = "Remove a feed from a user" in
467467- let term = Term.(const (fun state_dir log_level style_renderer username url ->
443443+ let xdg_term = Xdge.Cmd.term "river" fs ~config:false ~data:false ~cache:false ~runtime:false () in
444444+ let run log_level style_renderer (xdg, _cfg) username url =
468445 setup_logs style_renderer log_level;
469469- Eio_main.run @@ fun env ->
470470- let state_dir =
471471- let path = if String.starts_with ~prefix:"~" state_dir then
472472- Filename.concat (Sys.getenv "HOME") (String.sub state_dir 2 (String.length state_dir - 2))
473473- else state_dir in
474474- Eio.Path.(Eio.Stdenv.fs env / path)
475475- in
476476- let state = { state_dir } in
446446+ let state = { xdg } in
477447 User.remove_feed state ~username ~url
478478- ) $ state_dir $ log_level $ log_style_renderer $ username_arg $ feed_url_arg) in
448448+ in
449449+ let term = Term.(const run $ log_level $ log_style_renderer $ xdg_term $ username_arg $ feed_url_arg) in
479450 Cmd.v (Cmd.info "remove-feed" ~doc) term
480451481481-let user_cmd =
452452+let user_cmd fs =
482453 let doc = "Manage users" in
483454 let info = Cmd.info "user" ~doc in
484455 Cmd.group info [
485485- user_add_cmd;
486486- user_remove_cmd;
487487- user_list_cmd;
488488- user_show_cmd;
489489- user_add_feed_cmd;
490490- user_remove_feed_cmd;
456456+ user_add_cmd fs;
457457+ user_remove_cmd fs;
458458+ user_list_cmd fs;
459459+ user_show_cmd fs;
460460+ user_add_feed_cmd fs;
461461+ user_remove_feed_cmd fs;
491462 ]
492463493493-let sync_cmd =
464464+let sync_cmd fs env =
494465 let doc = "Sync feeds for users" in
466466+ let xdg_term = Xdge.Cmd.term "river" fs ~config:false ~data:false ~cache:false ~runtime:false () in
495467 let username_opt =
496468 let doc = "Sync specific user (omit to sync all)" in
497469 Arg.(value & pos 0 (some string) None & info [] ~docv:"USERNAME" ~doc)
498470 in
499499- let term = Term.(const (fun state_dir log_level style_renderer username_opt ->
471471+ let run log_level style_renderer (xdg, _cfg) username_opt =
500472 setup_logs style_renderer log_level;
501501- Eio_main.run @@ fun env ->
502502- let state_dir =
503503- let path = if String.starts_with ~prefix:"~" state_dir then
504504- Filename.concat (Sys.getenv "HOME") (String.sub state_dir 2 (String.length state_dir - 2))
505505- else state_dir in
506506- Eio.Path.(Eio.Stdenv.fs env / path)
473473+ let state = { xdg } in
474474+ State.ensure_directories state;
475475+ (* Create a switch for all sync operations *)
476476+ Logs.info (fun m -> m "Creating switch for sync operations");
477477+ let result = Eio.Switch.run @@ fun sw ->
478478+ Logs.info (fun m -> m "Switch created, running sync");
479479+ let res = match username_opt with
480480+ | Some username -> Sync.sync_user ~sw env state ~username
481481+ | None -> Sync.sync_all ~sw env state
482482+ in
483483+ Logs.info (fun m -> m "Sync completed, about to exit switch");
484484+ res
507485 in
508508- let state = { state_dir } in
509509- State.ensure_directories state;
510510- match username_opt with
511511- | Some username -> Sync.sync_user env state ~username
512512- | None -> Sync.sync_all env state
513513- ) $ state_dir $ log_level $ log_style_renderer $ username_opt) in
486486+ Logs.info (fun m -> m "Switch exited, returning result %d" result);
487487+ result
488488+ in
489489+ let term = Term.(const run $ log_level $ log_style_renderer $ xdg_term $ username_opt) in
514490 Cmd.v (Cmd.info "sync" ~doc) term
515491516516-let main_cmd =
492492+let main_cmd fs env =
517493 let doc = "River feed management CLI" in
518494 let info = Cmd.info "river-cli" ~version:"1.0" ~doc in
519519- Cmd.group info [user_cmd; sync_cmd]
495495+ Cmd.group info [user_cmd fs; sync_cmd fs env]
520496521497let () =
522522- exit (Cmd.eval' main_cmd)498498+ (* Initialize the Mirage_crypto RNG for TLS.
499499+ Note: This spawns a background thread for entropy collection. *)
500500+ Mirage_crypto_rng_unix.use_default ();
501501+502502+ let exit_code = ref 0 in
503503+ Eio_main.run @@ fun env ->
504504+ exit_code := Cmd.eval' (main_cmd env#fs env);
505505+ Logs.info (fun m -> m "About to exit Eio_main.run");
506506+ Logs.info (fun m -> m "Exited Eio_main.run, calling exit %d" !exit_code);
507507+ exit !exit_code
···99 ]
10101111let main env =
1212- let feeds = List.map (River.fetch env) sources in
1212+ Eio.Switch.run @@ fun sw ->
1313+ let feeds = List.map (River.fetch ~sw env) sources in
1314 let posts = River.posts feeds in
1415 let entries = River.create_atom_entries posts in
1516 let feed =
+2-2
stack/river/lib/feed.ml
···4444 msg (fst pos) (snd pos));
4545 failwith "Neither Atom nor RSS2 feed")
46464747-let fetch env (source : source) =
4747+let fetch ~sw ?requests env (source : source) =
4848 Log.info (fun m -> m "Fetching feed '%s' from %s" source.name source.url);
49495050 let xmlbase = Uri.of_string @@ source.url in
5151 let response =
5252- try Http.get env source.url
5252+ try Http.get ~sw ?requests env source.url
5353 with e ->
5454 Log.err (fun m -> m "Failed to fetch feed '%s': %s" source.name (Printexc.to_string e));
5555 raise e
+10-7
stack/river/lib/http.ml
···2323exception Status_unhandled of string
2424exception Timeout
25252626-let get env url =
2626+let get ~sw ?requests env url =
2727 Log.info (fun m -> m "Fetching URL: %s" url);
28282929- Eio.Switch.run @@ fun sw ->
3029 try
3131- (* Create a Requests session with automatic redirect following *)
3232- let req = Requests.create ~sw env
3333- ~follow_redirects:true
3434- ~max_redirects:5 in
3030+ (* Create or use existing Requests session with automatic redirect following *)
3131+ let req = match requests with
3232+ | Some r -> r
3333+ | None ->
3434+ Requests.create ~sw env
3535+ ~follow_redirects:true
3636+ ~max_redirects:5
3737+ in
35383636- Log.debug (fun m -> m "Created Requests session with max_redirects=5");
3939+ Log.debug (fun m -> m "Using Requests session with max_redirects=5");
37403841 (* Make the GET request with timeout *)
3942 let response =
-25
stack/river/lib/http.mli
···11-(*
22- * Copyright (c) 2014, OCaml.org project
33- * Copyright (c) 2015 KC Sivaramakrishnan <sk826@cl.cam.ac.uk>
44- *
55- * Permission to use, copy, modify, and distribute this software for any
66- * purpose with or without fee is hereby granted, provided that the above
77- * copyright notice and this permission notice appear in all copies.
88- *
99- * THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES
1010- * WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
1111- * MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR
1212- * ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
1313- * WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
1414- * ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT OF
1515- * OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
1616- *)
1717-1818-exception Status_unhandled of string
1919-exception Timeout
2020-2121-val get : Eio_unix.Stdenv.base -> string -> string
2222-(** [get env uri] returns the body of the response of the HTTP GET request on [uri].
2323-2424- If the answer is a redirection, it will follow the redirections up to 5
2525- redirects. Uses the provided Eio environment. *)
···2121type feed
2222type post
23232424-val fetch : Eio_unix.Stdenv.base -> source -> feed
2525-(** [fetch env source] returns an Atom or RSS feed from a source
2626- using the provided Eio environment. *)
2424+val fetch :
2525+ sw:Eio.Switch.t ->
2626+ ?requests:(([> float Eio.Time.clock_ty ] as 'a) Eio.Resource.t, ([> [> `Generic ] Eio.Net.ty ] as 'b) Eio.Resource.t) Requests.t ->
2727+ < clock : 'a Eio.Resource.t;
2828+ fs : Eio.Fs.dir_ty Eio.Path.t;
2929+ net : 'b Eio.Resource.t; .. > ->
3030+ source ->
3131+ feed
3232+(** [fetch ~sw ?requests env source] returns an Atom or RSS feed from a source.
3333+3434+ @param sw The switch to use for resource cleanup
3535+ @param requests Optional Requests session to reuse. If not provided, a new session will be created
3636+ @param env The Eio environment
3737+ @param source The feed source to fetch *)
27382839val name : feed -> string
2940(** [name feed] is the name of the feed source passed to [fetch]. *)
···71827283val create_atom_entries : post list -> Syndic.Atom.entry list
7384(** [create_atom_feed posts] creates a list of atom entries, which can then be
7474- used to create an atom feed that is an aggregate of the posts. *)8585+ used to create an atom feed that is an aggregate of the posts. *)