A Zulip bot agent to sit in our Black Sun. Ever evolving
at main 371 lines 15 kB view raw
1(*--------------------------------------------------------------------------- 2 Copyright (c) 2026 Anil Madhavapeddy <anil@recoil.org>. All rights reserved. 3 SPDX-License-Identifier: ISC 4 ---------------------------------------------------------------------------*) 5 6let src = Logs.Src.create "poe.loop" ~doc:"Poe polling loop" 7module Log = (val Logs.src_log src : Logs.LOG) 8 9(* Verse user info for iteration *) 10type verse_user = { 11 handle : string; 12 monorepo_path : Eio.Fs.dir_ty Eio.Path.t; 13 monorepo_url : string; 14 opamrepo_url : string; 15 opamrepo_path : Fpath.t option; (* Local path to opam repo for reading dev-repo URLs *) 16} 17 18(* Load the opamverse registry from XDG data path *) 19let load_registry ~fs = 20 let registry_path = Monopam.Verse_config.registry_path () in 21 let registry_toml = Fpath.(registry_path / "opamverse.toml") in 22 Monopam.Verse_registry.load ~fs registry_toml 23 24(* Get list of verse users by scanning verse directory and matching with registry *) 25let get_verse_users ~fs ~verse_path = 26 match load_registry ~fs with 27 | Error msg -> 28 Log.warn (fun m -> m "Failed to load registry: %s" msg); 29 [] 30 | Ok registry -> 31 (* Scan verse directory for user subdirectories *) 32 let verse_eio = Eio.Path.(fs / verse_path) in 33 let subdirs = try 34 Eio.Path.read_dir verse_eio 35 |> List.filter (fun name -> 36 (* Filter out -opam directories and hidden files *) 37 not (String.starts_with ~prefix:"." name) && 38 not (String.ends_with ~suffix:"-opam" name)) 39 with Eio.Io _ -> 40 Log.warn (fun m -> m "Failed to read verse directory: %s" verse_path); 41 [] 42 in 43 (* Match each subdirectory with registry member *) 44 List.filter_map (fun handle -> 45 match Monopam.Verse_registry.find_member registry ~handle with 46 | Some member -> 47 let mono_path = Eio.Path.(verse_eio / handle) in 48 (* Check if it's actually a git repo *) 49 let is_repo = try 50 match Eio.Path.kind ~follow:true Eio.Path.(mono_path / ".git") with 51 | `Directory -> true 52 | _ -> false 53 with _ -> false 54 in 55 if is_repo then 56 (* Check if local opam repo exists *) 57 let opam_dir = handle ^ "-opam" in 58 let opam_path = Eio.Path.(verse_eio / opam_dir) in 59 let opamrepo_path = try 60 match Eio.Path.kind ~follow:true opam_path with 61 | `Directory -> Some (Fpath.v (verse_path ^ "/" ^ opam_dir)) 62 | _ -> None 63 with _ -> None 64 in 65 Some { 66 handle; 67 monorepo_path = mono_path; 68 monorepo_url = member.monorepo; 69 opamrepo_url = member.opamrepo; 70 opamrepo_path; 71 } 72 else begin 73 Log.debug (fun m -> m "Skipping %s: not a git repo" handle); 74 None 75 end 76 | None -> 77 Log.debug (fun m -> m "Skipping %s: not in registry" handle); 78 None 79 ) subdirs 80 81let run_command ~proc ~cwd cmd = 82 let cwd_str = Eio.Path.native_exn cwd in 83 Log.debug (fun m -> m "Running: %s (in %s)" (String.concat " " cmd) cwd_str); 84 Eio.Switch.run @@ fun sw -> 85 let buf_stdout = Buffer.create 256 in 86 let buf_stderr = Buffer.create 256 in 87 let child = Eio.Process.spawn proc ~sw ~cwd 88 ~stdout:(Eio.Flow.buffer_sink buf_stdout) 89 ~stderr:(Eio.Flow.buffer_sink buf_stderr) 90 cmd 91 in 92 (* Must await process before reading buffers *) 93 let status = Eio.Process.await child in 94 let stdout = Buffer.contents buf_stdout in 95 let stderr = Buffer.contents buf_stderr in 96 match status with 97 | `Exited code -> (code, stdout, stderr) 98 | `Signaled sig_ -> (-sig_, stdout, stderr) 99 100let get_git_head ~proc ~cwd = 101 match run_command ~proc ~cwd ["git"; "rev-parse"; "--short"; "HEAD"] with 102 | (0, stdout, _) -> Some (String.trim stdout) 103 | _ -> None 104 105(* Sync a tracked repo by fetching and resetting to upstream. 106 We don't make local commits to verse repos, so reset is safe. *) 107let sync_to_upstream ~proc ~cwd ~handle = 108 Log.debug (fun m -> m "[%s] Fetching from origin" handle); 109 match run_command ~proc ~cwd ["git"; "fetch"; "origin"] with 110 | (0, _, _) -> 111 (* Get the default branch name *) 112 let branch = match run_command ~proc ~cwd 113 ["git"; "rev-parse"; "--abbrev-ref"; "origin/HEAD"] with 114 | (0, stdout, _) -> 115 (* Returns "origin/main" or "origin/master" - extract branch name *) 116 let full = String.trim stdout in 117 (match String.split_on_char '/' full with 118 | _ :: branch :: _ -> branch 119 | _ -> "main") 120 | _ -> "main" 121 in 122 Log.debug (fun m -> m "[%s] Resetting to origin/%s" handle branch); 123 (match run_command ~proc ~cwd ["git"; "reset"; "--hard"; "origin/" ^ branch] with 124 | (0, _, _) -> true 125 | (code, _, stderr) -> 126 Log.warn (fun m -> m "[%s] git reset failed (code %d): %s" 127 handle code (String.trim stderr)); 128 false) 129 | (code, _, stderr) -> 130 Log.warn (fun m -> m "[%s] git fetch failed (code %d): %s" 131 handle code (String.trim stderr)); 132 false 133 134let send_message ~client ~stream ~topic ~content = 135 let msg = Zulip.Message.create ~type_:`Channel ~to_:[stream] ~topic ~content () in 136 let resp = Zulip.Messages.send client msg in 137 Log.info (fun m -> m "Broadcast sent, message ID: %d" (Zulip.Message_response.id resp)) 138 139(* Detect fork relationships: check if this user's affected repos appear as 140 forks in other users' sources.toml, or vice versa *) 141let detect_fork_context ~fs ~verse_path ~handle ~affected_repos users = 142 (* For each other user, load their sources.toml and check for upstream refs *) 143 let forks = List.filter_map (fun (other : verse_user) -> 144 if other.handle = handle then None 145 else 146 let sources_path = Fpath.(v verse_path / other.handle / "sources.toml") in 147 match Monopam.Sources_registry.load ~fs sources_path with 148 | Error _ -> None 149 | Ok registry -> 150 (* Check if any of the other user's entries have upstream pointing to 151 repos that this user also has *) 152 let forked_repos = List.filter_map (fun (subtree, entry) -> 153 match entry.Monopam.Sources_registry.upstream with 154 | Some _upstream when List.mem subtree affected_repos -> 155 Some subtree 156 | _ -> 157 (* Also check if this user's sources.toml has entries with 158 upstream pointing to the other user's repos *) 159 None 160 ) (Monopam.Sources_registry.to_list registry) in 161 match forked_repos with 162 | [] -> None 163 | repos -> Some { Changelog.source_handle = handle; 164 target_handle = other.handle; 165 repos } 166 ) users in 167 (* Also check the reverse: if this user has sources with upstream pointing 168 to other users *) 169 let own_sources_path = Fpath.(v verse_path / handle / "sources.toml") in 170 let reverse_forks = match Monopam.Sources_registry.load ~fs own_sources_path with 171 | Error _ -> [] 172 | Ok registry -> 173 List.filter_map (fun (subtree, entry) -> 174 match entry.Monopam.Sources_registry.upstream with 175 | Some upstream_url -> 176 (* Try to match upstream URL to a verse user *) 177 let matching_user = List.find_opt (fun (other : verse_user) -> 178 other.handle <> handle && 179 (String.starts_with ~prefix:other.handle upstream_url || 180 String.starts_with ~prefix:other.monorepo_url upstream_url) 181 ) users in 182 (match matching_user with 183 | Some other when List.mem subtree affected_repos -> 184 Some { Changelog.source_handle = other.handle; 185 target_handle = handle; 186 repos = [subtree] } 187 | _ -> None) 188 | None -> None 189 ) (Monopam.Sources_registry.to_list registry) 190 in 191 (* Merge: group by (source, target) pair *) 192 let all = forks @ reverse_forks in 193 let merged = Hashtbl.create 8 in 194 List.iter (fun (fc : Changelog.fork_context) -> 195 let key = (fc.source_handle, fc.target_handle) in 196 let existing = try Hashtbl.find merged key with Not_found -> [] in 197 Hashtbl.replace merged key (fc.repos @ existing) 198 ) all; 199 Hashtbl.fold (fun (source_handle, target_handle) repos acc -> 200 let repos = List.sort_uniq String.compare repos in 201 { Changelog.source_handle; target_handle; repos } :: acc 202 ) merged [] 203 204(* Process a single verse user: pull, check HEAD, generate changelog if needed *) 205let process_verse_user ~sw ~proc ~clock ~fs ~storage ~client ~config ~verse_path ~all_users user = 206 let handle = user.handle in 207 Log.info (fun m -> m "Checking %s for changes..." handle); 208 209 (* Sync to upstream (fetch + reset, since we don't make local commits here) *) 210 let _sync_ok = sync_to_upstream ~proc ~cwd:user.monorepo_path ~handle in 211 212 (* Get current git HEAD *) 213 let current_head = get_git_head ~proc ~cwd:user.monorepo_path in 214 let last_head = Admin.get_user_git_head storage ~handle in 215 216 Log.info (fun m -> m "[%s] Current HEAD: %s, Last HEAD: %s" handle 217 (Option.value ~default:"(none)" current_head) 218 (Option.value ~default:"(none)" last_head)); 219 220 (* Check if HEAD has changed *) 221 let head_changed = match (current_head, last_head) with 222 | (Some c, Some l) -> c <> l 223 | (Some _, None) -> true (* First run for this user *) 224 | _ -> false 225 in 226 227 if head_changed then begin 228 Log.info (fun m -> m "[%s] Git HEAD changed, generating changes..." handle); 229 230 (* Get commits since last HEAD, or last 25 on first run *) 231 let commits = match last_head with 232 | Some h -> Changelog.get_git_log ~proc ~cwd:user.monorepo_path ~since_head:h 233 | None -> Changelog.get_recent_commits ~proc ~cwd:user.monorepo_path ~count:25 234 in 235 236 if commits = [] then begin 237 Log.info (fun m -> m "[%s] No commits to broadcast" handle); 238 (* Still update HEAD so we don't reprocess *) 239 Option.iter (Admin.set_user_git_head storage ~handle) current_head 240 end 241 else begin 242 (* Get channel members for @mentions *) 243 let members = Changelog.get_channel_members ~client ~channel:config.Config.channel in 244 245 (* Detect fork relationships *) 246 let affected = Changelog.affected_subprojects commits in 247 let fork_context = detect_fork_context ~fs ~verse_path 248 ~handle:user.handle ~affected_repos:affected all_users in 249 250 (* Generate narrative changelog with Claude *) 251 match Changelog.generate ~sw ~proc ~clock ~fs ~commits ~members 252 ~fork_context ?opamrepo_path:user.opamrepo_path () with 253 | None -> 254 Log.info (fun m -> m "[%s] No changelog generated" handle); 255 Option.iter (Admin.set_user_git_head storage ~handle) current_head 256 | Some changelog_content -> 257 Log.info (fun m -> m "[%s] Broadcasting narrative changelog" handle); 258 259 (* Format the broadcast with repo hrefs *) 260 let content = Printf.sprintf 261 {|**Updates from %s** 262 263Repos: [monorepo](%s) | [opam-repo](%s) 264 265%s|} 266 handle user.monorepo_url user.opamrepo_url changelog_content 267 in 268 269 send_message ~client ~stream:config.Config.channel 270 ~topic:config.Config.topic ~content; 271 272 (* Update storage with per-user HEAD *) 273 let now = Ptime_clock.now () in 274 Admin.set_last_broadcast_time storage now; 275 Option.iter (Admin.set_user_git_head storage ~handle) current_head; 276 Log.info (fun m -> m "[%s] Updated broadcast time and git HEAD" handle) 277 end 278 end 279 else 280 Log.debug (fun m -> m "[%s] No HEAD change, skipping" handle) 281 282let run ~sw ~env ~config ~zulip_config ~handler ~interval = 283 let fs = Eio.Stdenv.fs env in 284 let proc = Eio.Stdenv.process_mgr env in 285 let clock = Eio.Stdenv.clock env in 286 287 (* Create Zulip client *) 288 let client = Zulip_bot.Bot.create_client ~sw ~env ~config:zulip_config in 289 let storage = Zulip_bot.Storage.create client in 290 291 Log.info (fun m -> m "Starting loop with %d second interval" interval); 292 293 (* Determine verse path - use config.verse_path if set, otherwise derive from monorepo_path *) 294 let verse_path = match config.Config.verse_path with 295 | Some vp -> vp 296 | None -> 297 (* Assume verse/ is a sibling of monorepo_path's parent *) 298 let mono_dir = Filename.dirname config.Config.monorepo_path in 299 Filename.concat mono_dir "verse" 300 in 301 Log.info (fun m -> m "Verse path: %s" verse_path); 302 303 let broadcast_loop () = 304 let rec loop () = 305 Log.info (fun m -> m "Checking for changes across verse users..."); 306 307 (* Get all verse users *) 308 let users = get_verse_users ~fs ~verse_path in 309 Log.info (fun m -> m "Found %d verse users" (List.length users)); 310 311 (* Process each user *) 312 List.iter (fun user -> 313 try 314 process_verse_user ~sw ~proc ~clock ~fs ~storage ~client ~config 315 ~verse_path ~all_users:users user 316 with e -> 317 Log.warn (fun m -> m "[%s] Error processing user: %s" 318 user.handle (Printexc.to_string e)) 319 ) users; 320 321 (* Sleep until next check *) 322 Log.info (fun m -> m "Sleeping for %d seconds" interval); 323 Eio.Time.sleep clock (float_of_int interval); 324 loop () 325 in 326 loop () 327 in 328 329 (* Run broadcast loop and message handler concurrently *) 330 Eio.Fiber.both 331 broadcast_loop 332 (fun () -> 333 Log.info (fun m -> m "Starting message handler"); 334 let identity = Zulip_bot.Bot.fetch_identity client in 335 Log.info (fun m -> 336 m "Bot identity: %s <%s> (id: %d)" identity.full_name identity.email 337 identity.user_id); 338 let queue = 339 Zulip.Event_queue.register client 340 ~event_types:[ Zulip.Event_type.Message ] 341 () 342 in 343 Log.info (fun m -> 344 m "Event queue registered: %s" (Zulip.Event_queue.id queue)); 345 let rec event_loop last_event_id = 346 try 347 let events = 348 Zulip.Event_queue.get_events queue client ~last_event_id () 349 in 350 if List.length events > 0 then 351 Log.info (fun m -> m "Received %d event(s)" (List.length events)); 352 List.iter 353 (fun event -> 354 Log.debug (fun m -> 355 m "Event id=%d, type=%s" (Zulip.Event.id event) 356 (Zulip.Event_type.to_string (Zulip.Event.type_ event))); 357 Zulip_bot.Bot.process_event ~client ~storage ~identity ~handler event) 358 events; 359 let new_last_id = 360 List.fold_left 361 (fun max_id event -> max (Zulip.Event.id event) max_id) 362 last_event_id events 363 in 364 event_loop new_last_id 365 with Eio.Exn.Io (e, _) -> 366 Log.warn (fun m -> 367 m "Error getting events: %a (retrying in 2s)" Eio.Exn.pp_err e); 368 Eio.Time.sleep clock 2.0; 369 event_loop last_event_id 370 in 371 event_loop (-1))