(*--------------------------------------------------------------------------- Copyright (c) 2026 Anil Madhavapeddy . All rights reserved. SPDX-License-Identifier: ISC ---------------------------------------------------------------------------*) let src = Logs.Src.create "poe.loop" ~doc:"Poe polling loop" module Log = (val Logs.src_log src : Logs.LOG) (* Verse user info for iteration *) type verse_user = { handle : string; monorepo_path : Eio.Fs.dir_ty Eio.Path.t; monorepo_url : string; opamrepo_url : string; opamrepo_path : Fpath.t option; (* Local path to opam repo for reading dev-repo URLs *) } (* Load the opamverse registry from XDG data path *) let load_registry ~fs = let registry_path = Monopam.Verse_config.registry_path () in let registry_toml = Fpath.(registry_path / "opamverse.toml") in Monopam.Verse_registry.load ~fs registry_toml (* Get list of verse users by scanning verse directory and matching with registry *) let get_verse_users ~fs ~verse_path = match load_registry ~fs with | Error msg -> Log.warn (fun m -> m "Failed to load registry: %s" msg); [] | Ok registry -> (* Scan verse directory for user subdirectories *) let verse_eio = Eio.Path.(fs / verse_path) in let subdirs = try Eio.Path.read_dir verse_eio |> List.filter (fun name -> (* Filter out -opam directories and hidden files *) not (String.starts_with ~prefix:"." name) && not (String.ends_with ~suffix:"-opam" name)) with Eio.Io _ -> Log.warn (fun m -> m "Failed to read verse directory: %s" verse_path); [] in (* Match each subdirectory with registry member *) List.filter_map (fun handle -> match Monopam.Verse_registry.find_member registry ~handle with | Some member -> let mono_path = Eio.Path.(verse_eio / handle) in (* Check if it's actually a git repo *) let is_repo = try match Eio.Path.kind ~follow:true Eio.Path.(mono_path / ".git") with | `Directory -> true | _ -> false with _ -> false in if is_repo then (* Check if local opam repo exists *) let opam_dir = handle ^ "-opam" in let opam_path = Eio.Path.(verse_eio / opam_dir) in let opamrepo_path = try match Eio.Path.kind ~follow:true opam_path with | `Directory -> Some (Fpath.v (verse_path ^ "/" ^ opam_dir)) | _ -> None with _ -> None in Some { handle; monorepo_path = mono_path; monorepo_url = member.monorepo; opamrepo_url = member.opamrepo; opamrepo_path; } else begin Log.debug (fun m -> m "Skipping %s: not a git repo" handle); None end | None -> Log.debug (fun m -> m "Skipping %s: not in registry" handle); None ) subdirs let run_command ~proc ~cwd cmd = let cwd_str = Eio.Path.native_exn cwd in Log.debug (fun m -> m "Running: %s (in %s)" (String.concat " " cmd) cwd_str); Eio.Switch.run @@ fun sw -> let buf_stdout = Buffer.create 256 in let buf_stderr = Buffer.create 256 in let child = Eio.Process.spawn proc ~sw ~cwd ~stdout:(Eio.Flow.buffer_sink buf_stdout) ~stderr:(Eio.Flow.buffer_sink buf_stderr) cmd in (* Must await process before reading buffers *) let status = Eio.Process.await child in let stdout = Buffer.contents buf_stdout in let stderr = Buffer.contents buf_stderr in match status with | `Exited code -> (code, stdout, stderr) | `Signaled sig_ -> (-sig_, stdout, stderr) let get_git_head ~proc ~cwd = match run_command ~proc ~cwd ["git"; "rev-parse"; "--short"; "HEAD"] with | (0, stdout, _) -> Some (String.trim stdout) | _ -> None (* Sync a tracked repo by fetching and resetting to upstream. We don't make local commits to verse repos, so reset is safe. *) let sync_to_upstream ~proc ~cwd ~handle = Log.debug (fun m -> m "[%s] Fetching from origin" handle); match run_command ~proc ~cwd ["git"; "fetch"; "origin"] with | (0, _, _) -> (* Get the default branch name *) let branch = match run_command ~proc ~cwd ["git"; "rev-parse"; "--abbrev-ref"; "origin/HEAD"] with | (0, stdout, _) -> (* Returns "origin/main" or "origin/master" - extract branch name *) let full = String.trim stdout in (match String.split_on_char '/' full with | _ :: branch :: _ -> branch | _ -> "main") | _ -> "main" in Log.debug (fun m -> m "[%s] Resetting to origin/%s" handle branch); (match run_command ~proc ~cwd ["git"; "reset"; "--hard"; "origin/" ^ branch] with | (0, _, _) -> true | (code, _, stderr) -> Log.warn (fun m -> m "[%s] git reset failed (code %d): %s" handle code (String.trim stderr)); false) | (code, _, stderr) -> Log.warn (fun m -> m "[%s] git fetch failed (code %d): %s" handle code (String.trim stderr)); false let send_message ~client ~stream ~topic ~content = let msg = Zulip.Message.create ~type_:`Channel ~to_:[stream] ~topic ~content () in let resp = Zulip.Messages.send client msg in Log.info (fun m -> m "Broadcast sent, message ID: %d" (Zulip.Message_response.id resp)) (* Detect fork relationships: check if this user's affected repos appear as forks in other users' sources.toml, or vice versa *) let detect_fork_context ~fs ~verse_path ~handle ~affected_repos users = (* For each other user, load their sources.toml and check for upstream refs *) let forks = List.filter_map (fun (other : verse_user) -> if other.handle = handle then None else let sources_path = Fpath.(v verse_path / other.handle / "sources.toml") in match Monopam.Sources_registry.load ~fs sources_path with | Error _ -> None | Ok registry -> (* Check if any of the other user's entries have upstream pointing to repos that this user also has *) let forked_repos = List.filter_map (fun (subtree, entry) -> match entry.Monopam.Sources_registry.upstream with | Some _upstream when List.mem subtree affected_repos -> Some subtree | _ -> (* Also check if this user's sources.toml has entries with upstream pointing to the other user's repos *) None ) (Monopam.Sources_registry.to_list registry) in match forked_repos with | [] -> None | repos -> Some { Changelog.source_handle = handle; target_handle = other.handle; repos } ) users in (* Also check the reverse: if this user has sources with upstream pointing to other users *) let own_sources_path = Fpath.(v verse_path / handle / "sources.toml") in let reverse_forks = match Monopam.Sources_registry.load ~fs own_sources_path with | Error _ -> [] | Ok registry -> List.filter_map (fun (subtree, entry) -> match entry.Monopam.Sources_registry.upstream with | Some upstream_url -> (* Try to match upstream URL to a verse user *) let matching_user = List.find_opt (fun (other : verse_user) -> other.handle <> handle && (String.starts_with ~prefix:other.handle upstream_url || String.starts_with ~prefix:other.monorepo_url upstream_url) ) users in (match matching_user with | Some other when List.mem subtree affected_repos -> Some { Changelog.source_handle = other.handle; target_handle = handle; repos = [subtree] } | _ -> None) | None -> None ) (Monopam.Sources_registry.to_list registry) in (* Merge: group by (source, target) pair *) let all = forks @ reverse_forks in let merged = Hashtbl.create 8 in List.iter (fun (fc : Changelog.fork_context) -> let key = (fc.source_handle, fc.target_handle) in let existing = try Hashtbl.find merged key with Not_found -> [] in Hashtbl.replace merged key (fc.repos @ existing) ) all; Hashtbl.fold (fun (source_handle, target_handle) repos acc -> let repos = List.sort_uniq String.compare repos in { Changelog.source_handle; target_handle; repos } :: acc ) merged [] (* Process a single verse user: pull, check HEAD, generate changelog if needed *) let process_verse_user ~sw ~proc ~clock ~fs ~storage ~client ~config ~verse_path ~all_users user = let handle = user.handle in Log.info (fun m -> m "Checking %s for changes..." handle); (* Sync to upstream (fetch + reset, since we don't make local commits here) *) let _sync_ok = sync_to_upstream ~proc ~cwd:user.monorepo_path ~handle in (* Get current git HEAD *) let current_head = get_git_head ~proc ~cwd:user.monorepo_path in let last_head = Admin.get_user_git_head storage ~handle in Log.info (fun m -> m "[%s] Current HEAD: %s, Last HEAD: %s" handle (Option.value ~default:"(none)" current_head) (Option.value ~default:"(none)" last_head)); (* Check if HEAD has changed *) let head_changed = match (current_head, last_head) with | (Some c, Some l) -> c <> l | (Some _, None) -> true (* First run for this user *) | _ -> false in if head_changed then begin Log.info (fun m -> m "[%s] Git HEAD changed, generating changes..." handle); (* Get commits since last HEAD, or last 25 on first run *) let commits = match last_head with | Some h -> Changelog.get_git_log ~proc ~cwd:user.monorepo_path ~since_head:h | None -> Changelog.get_recent_commits ~proc ~cwd:user.monorepo_path ~count:25 in if commits = [] then begin Log.info (fun m -> m "[%s] No commits to broadcast" handle); (* Still update HEAD so we don't reprocess *) Option.iter (Admin.set_user_git_head storage ~handle) current_head end else begin (* Get channel members for @mentions *) let members = Changelog.get_channel_members ~client ~channel:config.Config.channel in (* Detect fork relationships *) let affected = Changelog.affected_subprojects commits in let fork_context = detect_fork_context ~fs ~verse_path ~handle:user.handle ~affected_repos:affected all_users in (* Generate narrative changelog with Claude *) match Changelog.generate ~sw ~proc ~clock ~fs ~commits ~members ~fork_context ?opamrepo_path:user.opamrepo_path () with | None -> Log.info (fun m -> m "[%s] No changelog generated" handle); Option.iter (Admin.set_user_git_head storage ~handle) current_head | Some changelog_content -> Log.info (fun m -> m "[%s] Broadcasting narrative changelog" handle); (* Format the broadcast with repo hrefs *) let content = Printf.sprintf {|**Updates from %s** Repos: [monorepo](%s) | [opam-repo](%s) %s|} handle user.monorepo_url user.opamrepo_url changelog_content in send_message ~client ~stream:config.Config.channel ~topic:config.Config.topic ~content; (* Update storage with per-user HEAD *) let now = Ptime_clock.now () in Admin.set_last_broadcast_time storage now; Option.iter (Admin.set_user_git_head storage ~handle) current_head; Log.info (fun m -> m "[%s] Updated broadcast time and git HEAD" handle) end end else Log.debug (fun m -> m "[%s] No HEAD change, skipping" handle) let run ~sw ~env ~config ~zulip_config ~handler ~interval = let fs = Eio.Stdenv.fs env in let proc = Eio.Stdenv.process_mgr env in let clock = Eio.Stdenv.clock env in (* Create Zulip client *) let client = Zulip_bot.Bot.create_client ~sw ~env ~config:zulip_config in let storage = Zulip_bot.Storage.create client in Log.info (fun m -> m "Starting loop with %d second interval" interval); (* Determine verse path - use config.verse_path if set, otherwise derive from monorepo_path *) let verse_path = match config.Config.verse_path with | Some vp -> vp | None -> (* Assume verse/ is a sibling of monorepo_path's parent *) let mono_dir = Filename.dirname config.Config.monorepo_path in Filename.concat mono_dir "verse" in Log.info (fun m -> m "Verse path: %s" verse_path); let broadcast_loop () = let rec loop () = Log.info (fun m -> m "Checking for changes across verse users..."); (* Get all verse users *) let users = get_verse_users ~fs ~verse_path in Log.info (fun m -> m "Found %d verse users" (List.length users)); (* Process each user *) List.iter (fun user -> try process_verse_user ~sw ~proc ~clock ~fs ~storage ~client ~config ~verse_path ~all_users:users user with e -> Log.warn (fun m -> m "[%s] Error processing user: %s" user.handle (Printexc.to_string e)) ) users; (* Sleep until next check *) Log.info (fun m -> m "Sleeping for %d seconds" interval); Eio.Time.sleep clock (float_of_int interval); loop () in loop () in (* Run broadcast loop and message handler concurrently *) Eio.Fiber.both broadcast_loop (fun () -> Log.info (fun m -> m "Starting message handler"); let identity = Zulip_bot.Bot.fetch_identity client in Log.info (fun m -> m "Bot identity: %s <%s> (id: %d)" identity.full_name identity.email identity.user_id); let queue = Zulip.Event_queue.register client ~event_types:[ Zulip.Event_type.Message ] () in Log.info (fun m -> m "Event queue registered: %s" (Zulip.Event_queue.id queue)); let rec event_loop last_event_id = try let events = Zulip.Event_queue.get_events queue client ~last_event_id () in if List.length events > 0 then Log.info (fun m -> m "Received %d event(s)" (List.length events)); List.iter (fun event -> Log.debug (fun m -> m "Event id=%d, type=%s" (Zulip.Event.id event) (Zulip.Event_type.to_string (Zulip.Event.type_ event))); Zulip_bot.Bot.process_event ~client ~storage ~identity ~handler event) events; let new_last_id = List.fold_left (fun max_id event -> max (Zulip.Event.id event) max_id) last_event_id events in event_loop new_last_id with Eio.Exn.Io (e, _) -> Log.warn (fun m -> m "Error getting events: %a (retrying in 2s)" Eio.Exn.pp_err e); Eio.Time.sleep clock 2.0; event_loop last_event_id in event_loop (-1))