A Zulip bot agent to sit in our Black Sun. Ever evolving
1(*---------------------------------------------------------------------------
2 Copyright (c) 2026 Anil Madhavapeddy <anil@recoil.org>. All rights reserved.
3 SPDX-License-Identifier: ISC
4 ---------------------------------------------------------------------------*)
5
6let src = Logs.Src.create "poe.loop" ~doc:"Poe polling loop"
7module Log = (val Logs.src_log src : Logs.LOG)
8
9(* Verse user info for iteration *)
10type verse_user = {
11 handle : string;
12 monorepo_path : Eio.Fs.dir_ty Eio.Path.t;
13 monorepo_url : string;
14 opamrepo_url : string;
15 opamrepo_path : Fpath.t option; (* Local path to opam repo for reading dev-repo URLs *)
16}
17
18(* Load the opamverse registry from XDG data path *)
19let load_registry ~fs =
20 let registry_path = Monopam.Verse_config.registry_path () in
21 let registry_toml = Fpath.(registry_path / "opamverse.toml") in
22 Monopam.Verse_registry.load ~fs registry_toml
23
24(* Get list of verse users by scanning verse directory and matching with registry *)
25let get_verse_users ~fs ~verse_path =
26 match load_registry ~fs with
27 | Error msg ->
28 Log.warn (fun m -> m "Failed to load registry: %s" msg);
29 []
30 | Ok registry ->
31 (* Scan verse directory for user subdirectories *)
32 let verse_eio = Eio.Path.(fs / verse_path) in
33 let subdirs = try
34 Eio.Path.read_dir verse_eio
35 |> List.filter (fun name ->
36 (* Filter out -opam directories and hidden files *)
37 not (String.starts_with ~prefix:"." name) &&
38 not (String.ends_with ~suffix:"-opam" name))
39 with Eio.Io _ ->
40 Log.warn (fun m -> m "Failed to read verse directory: %s" verse_path);
41 []
42 in
43 (* Match each subdirectory with registry member *)
44 List.filter_map (fun handle ->
45 match Monopam.Verse_registry.find_member registry ~handle with
46 | Some member ->
47 let mono_path = Eio.Path.(verse_eio / handle) in
48 (* Check if it's actually a git repo *)
49 let is_repo = try
50 match Eio.Path.kind ~follow:true Eio.Path.(mono_path / ".git") with
51 | `Directory -> true
52 | _ -> false
53 with _ -> false
54 in
55 if is_repo then
56 (* Check if local opam repo exists *)
57 let opam_dir = handle ^ "-opam" in
58 let opam_path = Eio.Path.(verse_eio / opam_dir) in
59 let opamrepo_path = try
60 match Eio.Path.kind ~follow:true opam_path with
61 | `Directory -> Some (Fpath.v (verse_path ^ "/" ^ opam_dir))
62 | _ -> None
63 with _ -> None
64 in
65 Some {
66 handle;
67 monorepo_path = mono_path;
68 monorepo_url = member.monorepo;
69 opamrepo_url = member.opamrepo;
70 opamrepo_path;
71 }
72 else begin
73 Log.debug (fun m -> m "Skipping %s: not a git repo" handle);
74 None
75 end
76 | None ->
77 Log.debug (fun m -> m "Skipping %s: not in registry" handle);
78 None
79 ) subdirs
80
81let run_command ~proc ~cwd cmd =
82 let cwd_str = Eio.Path.native_exn cwd in
83 Log.debug (fun m -> m "Running: %s (in %s)" (String.concat " " cmd) cwd_str);
84 Eio.Switch.run @@ fun sw ->
85 let buf_stdout = Buffer.create 256 in
86 let buf_stderr = Buffer.create 256 in
87 let child = Eio.Process.spawn proc ~sw ~cwd
88 ~stdout:(Eio.Flow.buffer_sink buf_stdout)
89 ~stderr:(Eio.Flow.buffer_sink buf_stderr)
90 cmd
91 in
92 (* Must await process before reading buffers *)
93 let status = Eio.Process.await child in
94 let stdout = Buffer.contents buf_stdout in
95 let stderr = Buffer.contents buf_stderr in
96 match status with
97 | `Exited code -> (code, stdout, stderr)
98 | `Signaled sig_ -> (-sig_, stdout, stderr)
99
100let get_git_head ~proc ~cwd =
101 match run_command ~proc ~cwd ["git"; "rev-parse"; "--short"; "HEAD"] with
102 | (0, stdout, _) -> Some (String.trim stdout)
103 | _ -> None
104
105(* Sync a tracked repo by fetching and resetting to upstream.
106 We don't make local commits to verse repos, so reset is safe. *)
107let sync_to_upstream ~proc ~cwd ~handle =
108 Log.debug (fun m -> m "[%s] Fetching from origin" handle);
109 match run_command ~proc ~cwd ["git"; "fetch"; "origin"] with
110 | (0, _, _) ->
111 (* Get the default branch name *)
112 let branch = match run_command ~proc ~cwd
113 ["git"; "rev-parse"; "--abbrev-ref"; "origin/HEAD"] with
114 | (0, stdout, _) ->
115 (* Returns "origin/main" or "origin/master" - extract branch name *)
116 let full = String.trim stdout in
117 (match String.split_on_char '/' full with
118 | _ :: branch :: _ -> branch
119 | _ -> "main")
120 | _ -> "main"
121 in
122 Log.debug (fun m -> m "[%s] Resetting to origin/%s" handle branch);
123 (match run_command ~proc ~cwd ["git"; "reset"; "--hard"; "origin/" ^ branch] with
124 | (0, _, _) -> true
125 | (code, _, stderr) ->
126 Log.warn (fun m -> m "[%s] git reset failed (code %d): %s"
127 handle code (String.trim stderr));
128 false)
129 | (code, _, stderr) ->
130 Log.warn (fun m -> m "[%s] git fetch failed (code %d): %s"
131 handle code (String.trim stderr));
132 false
133
134let send_message ~client ~stream ~topic ~content =
135 let msg = Zulip.Message.create ~type_:`Channel ~to_:[stream] ~topic ~content () in
136 let resp = Zulip.Messages.send client msg in
137 Log.info (fun m -> m "Broadcast sent, message ID: %d" (Zulip.Message_response.id resp))
138
139(* Detect fork relationships: check if this user's affected repos appear as
140 forks in other users' sources.toml, or vice versa *)
141let detect_fork_context ~fs ~verse_path ~handle ~affected_repos users =
142 (* For each other user, load their sources.toml and check for upstream refs *)
143 let forks = List.filter_map (fun (other : verse_user) ->
144 if other.handle = handle then None
145 else
146 let sources_path = Fpath.(v verse_path / other.handle / "sources.toml") in
147 match Monopam.Sources_registry.load ~fs sources_path with
148 | Error _ -> None
149 | Ok registry ->
150 (* Check if any of the other user's entries have upstream pointing to
151 repos that this user also has *)
152 let forked_repos = List.filter_map (fun (subtree, entry) ->
153 match entry.Monopam.Sources_registry.upstream with
154 | Some _upstream when List.mem subtree affected_repos ->
155 Some subtree
156 | _ ->
157 (* Also check if this user's sources.toml has entries with
158 upstream pointing to the other user's repos *)
159 None
160 ) (Monopam.Sources_registry.to_list registry) in
161 match forked_repos with
162 | [] -> None
163 | repos -> Some { Changelog.source_handle = handle;
164 target_handle = other.handle;
165 repos }
166 ) users in
167 (* Also check the reverse: if this user has sources with upstream pointing
168 to other users *)
169 let own_sources_path = Fpath.(v verse_path / handle / "sources.toml") in
170 let reverse_forks = match Monopam.Sources_registry.load ~fs own_sources_path with
171 | Error _ -> []
172 | Ok registry ->
173 List.filter_map (fun (subtree, entry) ->
174 match entry.Monopam.Sources_registry.upstream with
175 | Some upstream_url ->
176 (* Try to match upstream URL to a verse user *)
177 let matching_user = List.find_opt (fun (other : verse_user) ->
178 other.handle <> handle &&
179 (String.starts_with ~prefix:other.handle upstream_url ||
180 String.starts_with ~prefix:other.monorepo_url upstream_url)
181 ) users in
182 (match matching_user with
183 | Some other when List.mem subtree affected_repos ->
184 Some { Changelog.source_handle = other.handle;
185 target_handle = handle;
186 repos = [subtree] }
187 | _ -> None)
188 | None -> None
189 ) (Monopam.Sources_registry.to_list registry)
190 in
191 (* Merge: group by (source, target) pair *)
192 let all = forks @ reverse_forks in
193 let merged = Hashtbl.create 8 in
194 List.iter (fun (fc : Changelog.fork_context) ->
195 let key = (fc.source_handle, fc.target_handle) in
196 let existing = try Hashtbl.find merged key with Not_found -> [] in
197 Hashtbl.replace merged key (fc.repos @ existing)
198 ) all;
199 Hashtbl.fold (fun (source_handle, target_handle) repos acc ->
200 let repos = List.sort_uniq String.compare repos in
201 { Changelog.source_handle; target_handle; repos } :: acc
202 ) merged []
203
204(* Process a single verse user: pull, check HEAD, generate changelog if needed *)
205let process_verse_user ~sw ~proc ~clock ~fs ~storage ~client ~config ~verse_path ~all_users user =
206 let handle = user.handle in
207 Log.info (fun m -> m "Checking %s for changes..." handle);
208
209 (* Sync to upstream (fetch + reset, since we don't make local commits here) *)
210 let _sync_ok = sync_to_upstream ~proc ~cwd:user.monorepo_path ~handle in
211
212 (* Get current git HEAD *)
213 let current_head = get_git_head ~proc ~cwd:user.monorepo_path in
214 let last_head = Admin.get_user_git_head storage ~handle in
215
216 Log.info (fun m -> m "[%s] Current HEAD: %s, Last HEAD: %s" handle
217 (Option.value ~default:"(none)" current_head)
218 (Option.value ~default:"(none)" last_head));
219
220 (* Check if HEAD has changed *)
221 let head_changed = match (current_head, last_head) with
222 | (Some c, Some l) -> c <> l
223 | (Some _, None) -> true (* First run for this user *)
224 | _ -> false
225 in
226
227 if head_changed then begin
228 Log.info (fun m -> m "[%s] Git HEAD changed, generating changes..." handle);
229
230 (* Get commits since last HEAD, or last 25 on first run *)
231 let commits = match last_head with
232 | Some h -> Changelog.get_git_log ~proc ~cwd:user.monorepo_path ~since_head:h
233 | None -> Changelog.get_recent_commits ~proc ~cwd:user.monorepo_path ~count:25
234 in
235
236 if commits = [] then begin
237 Log.info (fun m -> m "[%s] No commits to broadcast" handle);
238 (* Still update HEAD so we don't reprocess *)
239 Option.iter (Admin.set_user_git_head storage ~handle) current_head
240 end
241 else begin
242 (* Get channel members for @mentions *)
243 let members = Changelog.get_channel_members ~client ~channel:config.Config.channel in
244
245 (* Detect fork relationships *)
246 let affected = Changelog.affected_subprojects commits in
247 let fork_context = detect_fork_context ~fs ~verse_path
248 ~handle:user.handle ~affected_repos:affected all_users in
249
250 (* Generate narrative changelog with Claude *)
251 match Changelog.generate ~sw ~proc ~clock ~fs ~commits ~members
252 ~fork_context ?opamrepo_path:user.opamrepo_path () with
253 | None ->
254 Log.info (fun m -> m "[%s] No changelog generated" handle);
255 Option.iter (Admin.set_user_git_head storage ~handle) current_head
256 | Some changelog_content ->
257 Log.info (fun m -> m "[%s] Broadcasting narrative changelog" handle);
258
259 (* Format the broadcast with repo hrefs *)
260 let content = Printf.sprintf
261 {|**Updates from %s**
262
263Repos: [monorepo](%s) | [opam-repo](%s)
264
265%s|}
266 handle user.monorepo_url user.opamrepo_url changelog_content
267 in
268
269 send_message ~client ~stream:config.Config.channel
270 ~topic:config.Config.topic ~content;
271
272 (* Update storage with per-user HEAD *)
273 let now = Ptime_clock.now () in
274 Admin.set_last_broadcast_time storage now;
275 Option.iter (Admin.set_user_git_head storage ~handle) current_head;
276 Log.info (fun m -> m "[%s] Updated broadcast time and git HEAD" handle)
277 end
278 end
279 else
280 Log.debug (fun m -> m "[%s] No HEAD change, skipping" handle)
281
282let run ~sw ~env ~config ~zulip_config ~handler ~interval =
283 let fs = Eio.Stdenv.fs env in
284 let proc = Eio.Stdenv.process_mgr env in
285 let clock = Eio.Stdenv.clock env in
286
287 (* Create Zulip client *)
288 let client = Zulip_bot.Bot.create_client ~sw ~env ~config:zulip_config in
289 let storage = Zulip_bot.Storage.create client in
290
291 Log.info (fun m -> m "Starting loop with %d second interval" interval);
292
293 (* Determine verse path - use config.verse_path if set, otherwise derive from monorepo_path *)
294 let verse_path = match config.Config.verse_path with
295 | Some vp -> vp
296 | None ->
297 (* Assume verse/ is a sibling of monorepo_path's parent *)
298 let mono_dir = Filename.dirname config.Config.monorepo_path in
299 Filename.concat mono_dir "verse"
300 in
301 Log.info (fun m -> m "Verse path: %s" verse_path);
302
303 let broadcast_loop () =
304 let rec loop () =
305 Log.info (fun m -> m "Checking for changes across verse users...");
306
307 (* Get all verse users *)
308 let users = get_verse_users ~fs ~verse_path in
309 Log.info (fun m -> m "Found %d verse users" (List.length users));
310
311 (* Process each user *)
312 List.iter (fun user ->
313 try
314 process_verse_user ~sw ~proc ~clock ~fs ~storage ~client ~config
315 ~verse_path ~all_users:users user
316 with e ->
317 Log.warn (fun m -> m "[%s] Error processing user: %s"
318 user.handle (Printexc.to_string e))
319 ) users;
320
321 (* Sleep until next check *)
322 Log.info (fun m -> m "Sleeping for %d seconds" interval);
323 Eio.Time.sleep clock (float_of_int interval);
324 loop ()
325 in
326 loop ()
327 in
328
329 (* Run broadcast loop and message handler concurrently *)
330 Eio.Fiber.both
331 broadcast_loop
332 (fun () ->
333 Log.info (fun m -> m "Starting message handler");
334 let identity = Zulip_bot.Bot.fetch_identity client in
335 Log.info (fun m ->
336 m "Bot identity: %s <%s> (id: %d)" identity.full_name identity.email
337 identity.user_id);
338 let queue =
339 Zulip.Event_queue.register client
340 ~event_types:[ Zulip.Event_type.Message ]
341 ()
342 in
343 Log.info (fun m ->
344 m "Event queue registered: %s" (Zulip.Event_queue.id queue));
345 let rec event_loop last_event_id =
346 try
347 let events =
348 Zulip.Event_queue.get_events queue client ~last_event_id ()
349 in
350 if List.length events > 0 then
351 Log.info (fun m -> m "Received %d event(s)" (List.length events));
352 List.iter
353 (fun event ->
354 Log.debug (fun m ->
355 m "Event id=%d, type=%s" (Zulip.Event.id event)
356 (Zulip.Event_type.to_string (Zulip.Event.type_ event)));
357 Zulip_bot.Bot.process_event ~client ~storage ~identity ~handler event)
358 events;
359 let new_last_id =
360 List.fold_left
361 (fun max_id event -> max (Zulip.Event.id event) max_id)
362 last_event_id events
363 in
364 event_loop new_last_id
365 with Eio.Exn.Io (e, _) ->
366 Log.warn (fun m ->
367 m "Error getting events: %a (retrying in 2s)" Eio.Exn.pp_err e);
368 Eio.Time.sleep clock 2.0;
369 event_loop last_event_id
370 in
371 event_loop (-1))