(*--------------------------------------------------------------------------- Copyright (c) 2026 Anil Madhavapeddy . All rights reserved. SPDX-License-Identifier: ISC ---------------------------------------------------------------------------*) let src = Logs.Src.create "poe" ~doc:"Poe Zulip bot" module Log = (val Logs.src_log src : Logs.LOG) type 'a env = { sw : Eio.Switch.t; process_mgr : 'a Eio.Process.mgr; clock : float Eio.Time.clock_ty Eio.Resource.t; fs : Eio.Fs.dir_ty Eio.Path.t; } (** In-memory tracking of active sessions. A session becomes active when the bot is first @mentioned in a channel/DM. Once active, all messages in that scope are accumulated into context. Resets on bot restart (intentional - requires new @mention to reactivate). *) module Active_sessions = struct (* Store scope and activation time *) let sessions : (string, Session.scope * float) Hashtbl.t = Hashtbl.create 16 let activate scope = let key = Session.scope_to_string scope in if not (Hashtbl.mem sessions key) then begin Hashtbl.add sessions key (scope, Unix.gettimeofday ()); Log.info (fun m -> m "Session activated for %s" key) end let is_active scope = let key = Session.scope_to_string scope in Hashtbl.mem sessions key let deactivate scope = let key = Session.scope_to_string scope in Hashtbl.remove sessions key; Log.info (fun m -> m "Session deactivated for %s" key) let list_all () = Hashtbl.fold (fun _key (scope, activated_at) acc -> (scope, activated_at) :: acc ) sessions [] end (** Strip any @**name** mention from the start of content. This handles display names like @**Poe** that don't match email patterns. *) let strip_leading_mention content = let s = String.trim content in if String.length s >= 5 && String.sub s 0 3 = "@**" then match String.index_from_opt s 3 '*' with | Some i when i + 1 < String.length s && s.[i+1] = '*' -> (* Found closing **, strip the mention *) String.trim (String.sub s (i + 2) (String.length s - i - 2)) | _ -> s else s let run_git_pull ~proc ~cwd = Log.info (fun m -> m "Pulling latest changes from remote"); Eio.Switch.run @@ fun sw -> let buf_stdout = Buffer.create 256 in let buf_stderr = Buffer.create 256 in let child = Eio.Process.spawn proc ~sw ~cwd ~stdout:(Eio.Flow.buffer_sink buf_stdout) ~stderr:(Eio.Flow.buffer_sink buf_stderr) ["git"; "pull"; "--ff-only"] in match Eio.Process.await child with | `Exited 0 -> let output = String.trim (Buffer.contents buf_stdout) in if output = "Already up to date." then begin Log.info (fun m -> m "Repository already up to date"); Ok `Up_to_date end else begin Log.info (fun m -> m "Pulled new changes from remote"); Ok (`Updated output) end | `Exited code -> let stderr = String.trim (Buffer.contents buf_stderr) in Log.warn (fun m -> m "git pull exited with code %d: %s" code stderr); Error (Printf.sprintf "git pull failed (code %d): %s" code stderr) | `Signaled sig_ -> Log.warn (fun m -> m "git pull killed by signal %d" sig_); Error (Printf.sprintf "git pull killed by signal %d" sig_) let get_git_head ~proc ~cwd = Eio.Switch.run @@ fun sw -> let buf = Buffer.create 64 in let child = Eio.Process.spawn proc ~sw ~cwd ~stdout:(Eio.Flow.buffer_sink buf) ["git"; "rev-parse"; "--short"; "HEAD"] in match Eio.Process.await child with | `Exited 0 -> Some (String.trim (Buffer.contents buf)) | _ -> None let create_claude_client env = let options = Claude.Options.default |> Claude.Options.with_model `Opus_4_5 |> Claude.Options.with_permission_mode Claude.Permissions.Mode.Bypass_permissions |> Claude.Options.with_allowed_tools [ "Read"; "Glob"; "Grep" ] |> Claude.Options.with_append_system_prompt {|You are Poe, a helpful Zulip bot that manages a monorepo. You have access to your own source code in the poe/ directory. When asked to add features to yourself, you can read and understand your implementation. Be concise in your responses as they will be posted to Zulip. When suggesting code changes, format them clearly with markdown code blocks.|} in Claude.Client.create ~options ~sw:env.sw ~process_mgr:env.process_mgr ~clock:env.clock () (** Format tool use for Zulip richtext display *) let format_tool_use (tool : Claude.Response.Tool_use.t) = let name = Claude.Response.Tool_use.name tool in let input = Claude.Response.Tool_use.input tool in (* Extract key parameters for common tools *) let params = match name with | "Read" -> Claude.Tool_input.get_string input "file_path" |> Option.map (fun p -> Printf.sprintf "`%s`" p) |> Option.value ~default:"" | "Glob" -> Claude.Tool_input.get_string input "pattern" |> Option.map (fun p -> Printf.sprintf "pattern: `%s`" p) |> Option.value ~default:"" | "Grep" -> let pattern = Claude.Tool_input.get_string input "pattern" |> Option.value ~default:"" in let path = Claude.Tool_input.get_string input "path" |> Option.value ~default:"." in Printf.sprintf "`%s` in `%s`" pattern path | "Edit" -> Claude.Tool_input.get_string input "file_path" |> Option.map (fun p -> Printf.sprintf "`%s`" p) |> Option.value ~default:"" | "Write" -> Claude.Tool_input.get_string input "file_path" |> Option.map (fun p -> Printf.sprintf "`%s`" p) |> Option.value ~default:"" | "Bash" -> Claude.Tool_input.get_string input "command" |> Option.map (fun c -> let truncated = if String.length c > 60 then String.sub c 0 57 ^ "..." else c in Printf.sprintf "`%s`" truncated) |> Option.value ~default:"" | _ -> "" in if params = "" then Printf.sprintf "> :gear: **%s**" name else Printf.sprintf "> :gear: **%s** %s" name params (** Format thinking block for Zulip richtext display *) let format_thinking (thinking : Claude.Response.Thinking.t) = let content = Claude.Response.Thinking.content thinking in (* Truncate long thinking and format as quote *) let truncated = if String.length content > 200 then String.sub content 0 197 ^ "..." else content in Printf.sprintf "> :thought_balloon: *%s*" truncated (** Format error for Zulip richtext display *) let format_error (err : Claude.Response.Error.t) = let msg = Claude.Response.Error.message err in Printf.sprintf "> :warning: **Error:** %s" msg (** Post a message to the appropriate Zulip channel/DM based on scope *) let post_to_scope ~client ~(scope : Session.scope) content = let message = match scope with | Session.Channel { stream; topic } -> Zulip.Message.create ~type_:`Channel ~to_:[stream] ~topic ~content () | Session.Direct { user_email; _ } -> Zulip.Message.create ~type_:`Direct ~to_:[user_email] ~content () in let _resp = Zulip.Messages.send client message in () let ask_claude env prompt = let client = create_claude_client env in Claude.Client.query client prompt; let responses = Claude.Client.receive_all client in let text = List.filter_map (function | Claude.Response.Text t -> Some (Claude.Response.Text.content t) | _ -> None) responses in String.concat "" text (** Ask Claude with streaming responses posted to Zulip *) let ask_claude_with_session_streaming env ~zulip_client ~storage msg user_content = let scope = Session.scope_of_message msg in let now = Unix.gettimeofday () in let session = Session.load storage ~scope ~now in let session = Session.add_user_message session ~content:user_content ~now in (* Build prompt with session context *) let context_section = match Session.build_context session with | None -> "" | Some ctx -> ctx ^ "\n\n---\n\n" in let prompt = Printf.sprintf {|%sThe user sent this message to the Poe Zulip bot: %s Please help them. If they're asking about adding features to the bot, read the bot's source code in the poe/ directory first. If they're asking about the monorepo or daily changes, help them understand the content. Keep your response concise and suitable for a Zulip message.|} context_section user_content in (* Create Claude client and start query *) let claude_client = create_claude_client env in Claude.Client.query claude_client prompt; (* Accumulate text and track agent messages *) let text_buffer = Buffer.create 1024 in let agent_messages = ref [] in (* Create streaming handler that posts agent messages to Zulip *) let handler = object inherit Claude.Handler.default method! on_text t = Buffer.add_string text_buffer (Claude.Response.Text.content t) method! on_tool_use t = let formatted = format_tool_use t in agent_messages := formatted :: !agent_messages; Log.debug (fun m -> m "Tool use: %s" (Claude.Response.Tool_use.name t)) method! on_thinking t = let formatted = format_thinking t in agent_messages := formatted :: !agent_messages; Log.debug (fun m -> m "Thinking: %s" (String.sub (Claude.Response.Thinking.content t) 0 (min 50 (String.length (Claude.Response.Thinking.content t))))) method! on_error t = let formatted = format_error t in agent_messages := formatted :: !agent_messages; Log.warn (fun m -> m "Claude error: %s" (Claude.Response.Error.message t)) method! on_complete c = let cost = Claude.Response.Complete.total_cost_usd c |> Option.value ~default:0.0 in let turns = Claude.Response.Complete.num_turns c in Log.info (fun m -> m "Claude complete: %d turns, $%.4f" turns cost) end in (* Run the streaming handler *) Claude.Client.run claude_client ~handler; (* Post agent messages summary if any occurred *) let agent_msgs = List.rev !agent_messages in if agent_msgs <> [] then begin let agent_summary = String.concat "\n" agent_msgs in let header = Printf.sprintf "**Agent activity:**\n%s" agent_summary in post_to_scope ~client:zulip_client ~scope header end; let response = Buffer.contents text_buffer in (* Save the updated session with the response *) let now = Unix.gettimeofday () in let session = Session.add_assistant_message session ~content:response ~now in Session.save storage ~scope session; Log.info (fun m -> m "Session for %s: %s" (Session.scope_to_string scope) (Session.stats session)); response (** Silently accumulate a message into the session without calling Claude. Used when the bot is not @mentioned but the session is active. *) let accumulate_message_silently ~storage msg = let scope = Session.scope_of_message msg in let now = Unix.gettimeofday () in let session = Session.load storage ~scope ~now in let content = Zulip_bot.Message.content msg in let sender = Zulip_bot.Message.sender_full_name msg in (* Include sender name in the accumulated content for context *) let annotated_content = Printf.sprintf "[%s]: %s" sender content in let session = Session.add_user_message session ~content:annotated_content ~now in Session.save storage ~scope session; Log.debug (fun m -> m "Accumulated message from %s into session for %s" sender (Session.scope_to_string scope)) let handle_help () = Zulip_bot.Response.reply {|**Poe Bot Commands:** **Basic Commands:** - `help` or `?` - Show this help message - `status` - Show bot configuration and tracked verse users with repo links - `broadcast` / `post` / `changes` - Generate and broadcast changelog with Claude - `refresh` / `pull` / `sync` / `update` - Pull from remote and broadcast changes - `clear` / `new` / `reset` - Clear conversation session and start fresh **Admin Commands:** (require authorization) - `admin last-broadcast` - Show last broadcast time and git HEAD - `admin reset-broadcast ` - Reset broadcast time - `admin storage keys` - List all storage keys - `admin storage get ` - Get value for a storage key - `admin storage delete ` - Delete a storage key **Conversation Sessions:** Poe maintains separate conversation sessions for each channel topic and DM. Your conversation history is preserved within each context, allowing multi-turn conversations with Claude. Sessions expire after 1 hour of inactivity. Use `clear` to start a fresh conversation in the current context. **Other Messages:** Any other message will be interpreted by Claude to help you understand or modify the bot. **Configuration:** The bot reads its configuration from `poe.toml` with the following fields: - `channel` - The Zulip channel to broadcast to - `topic` - The topic for broadcast messages - `verse_path` - Path to verse/ directory containing user monorepos - `admin_emails` - List of emails authorized for admin commands|} (* Load verse registry and get tracked users with their repo URLs *) let get_verse_status ~fs ~verse_path = let registry_path = Monopam.Verse_config.registry_path () in let registry_toml = Fpath.(registry_path / "opamverse.toml") in match Monopam.Verse_registry.load ~fs registry_toml with | Error msg -> Log.warn (fun m -> m "Failed to load registry: %s" msg); [] | Ok registry -> (* Scan verse directory for user subdirectories *) let verse_eio = Eio.Path.(fs / verse_path) in let subdirs = try Eio.Path.read_dir verse_eio |> List.filter (fun name -> not (String.starts_with ~prefix:"." name) && not (String.ends_with ~suffix:"-opam" name)) with Eio.Io _ -> [] in (* Match each subdirectory with registry member *) List.filter_map (fun handle -> match Monopam.Verse_registry.find_member registry ~handle with | Some member -> Some (handle, member.monorepo, member.opamrepo) | None -> None ) subdirs let format_duration seconds = if seconds < 60.0 then Printf.sprintf "%.0fs" seconds else if seconds < 3600.0 then Printf.sprintf "%.0fm" (seconds /. 60.0) else Printf.sprintf "%.1fh" (seconds /. 3600.0) let handle_status env ~storage config = let admin_list = if config.Config.admin_emails = [] then "none configured" else String.concat ", " config.Config.admin_emails in let verse_path = match config.Config.verse_path with | Some vp -> vp | None -> let mono_dir = Filename.dirname config.Config.monorepo_path in Filename.concat mono_dir "verse" in let verse_users = get_verse_status ~fs:env.fs ~verse_path in let users_section = if verse_users = [] then "- Tracked verse users: none" else "- Tracked verse users:\n" ^ (verse_users |> List.map (fun (handle, mono_url, opam_url) -> Printf.sprintf " - **%s**: [monorepo](%s) | [opam-repo](%s)" handle mono_url opam_url) |> String.concat "\n") in (* Build active sessions section *) let active_sessions = Active_sessions.list_all () in let now = Unix.gettimeofday () in let sessions_section = if active_sessions = [] then "- Active sessions: none" else let session_lines = active_sessions |> List.map (fun (scope, activated_at) -> let session = Session.load storage ~scope ~now in let scope_mention = Session.scope_to_mention scope in let active_for = format_duration (now -. activated_at) in let stats = Session.stats session in Printf.sprintf " - %s: %s (active for %s)" scope_mention stats active_for ) in Printf.sprintf "- Active sessions (%d):\n%s" (List.length active_sessions) (String.concat "\n" session_lines) in Zulip_bot.Response.reply (Printf.sprintf {|**Poe Bot Status:** - Channel: `%s` - Topic: `%s` - Verse path: `%s` - Admin emails: %s %s %s|} config.Config.channel config.Config.topic verse_path admin_list users_section sessions_section) let handle_refresh env ~client ~storage ~config = let monorepo_path = Eio.Path.(env.fs / config.Config.monorepo_path) in (* Step 1: Git pull *) let pull_result = run_git_pull ~proc:env.process_mgr ~cwd:monorepo_path in match pull_result with | Error e -> Zulip_bot.Response.reply (Printf.sprintf "**Refresh failed:**\n\n%s" e) | Ok pull_status -> let pull_msg = match pull_status with | `Up_to_date -> "Repository already up to date" | `Updated _ -> "Pulled new changes from remote" in (* Step 2: Get commits since last HEAD *) let last_head = Admin.get_last_git_head storage in let commits = match last_head with | Some h -> Changelog.get_git_log ~proc:env.process_mgr ~cwd:monorepo_path ~since_head:h | None -> Changelog.get_recent_commits ~proc:env.process_mgr ~cwd:monorepo_path ~count:10 in if commits = [] then Zulip_bot.Response.reply (Printf.sprintf "**Refresh completed:**\n\n- %s\n- No new commits to broadcast" pull_msg) else begin (* Get channel members for @mentions *) let members = Changelog.get_channel_members ~client ~channel:config.Config.channel in (* Generate narrative changelog with Claude *) match Changelog.generate ~sw:env.sw ~proc:env.process_mgr ~clock:env.clock ~fs:env.fs ~commits ~members () with | None -> Zulip_bot.Response.reply (Printf.sprintf "**Refresh completed:**\n\n- %s\n- Could not generate changelog" pull_msg) | Some content -> (* Update storage *) let now = Ptime_clock.now () in Admin.set_last_broadcast_time storage now; let current_head = get_git_head ~proc:env.process_mgr ~cwd:monorepo_path in Option.iter (Admin.set_last_git_head storage) current_head; Log.info (fun m -> m "Refresh broadcasting: %s" content); (* Send to channel *) Zulip_bot.Response.stream ~stream:config.Config.channel ~topic:config.Config.topic ~content:(Printf.sprintf "**Refresh triggered manually**\n\n%s" content) end let handle_claude_query env ~zulip_client ~storage msg = let content = Zulip_bot.Message.content msg in Log.info (fun m -> m "Asking Claude: %s" content); let response = ask_claude_with_session_streaming env ~zulip_client ~storage msg content in Log.info (fun m -> m "Claude response: %s" response); Zulip_bot.Response.reply response let handle_clear_session ~storage msg = let scope = Session.scope_of_message msg in Session.clear storage ~scope; Zulip_bot.Response.reply (Printf.sprintf "Session cleared for %s. Starting fresh conversation." (Session.scope_to_string scope)) let is_admin config ~storage msg = let sender_id = Zulip_bot.Message.sender_id msg in let client = Zulip_bot.Storage.client storage in try let user = Zulip.Users.get_by_id client ~user_id:sender_id () in let delivery_email = Zulip.User.delivery_email user in let email = Zulip.User.email user in (* Check both delivery_email (actual email) and email (Zulip internal) *) let emails_to_check = match delivery_email with | Some de -> [ de; email ] | None -> [ email ] in List.exists (fun e -> List.mem e config.Config.admin_emails) emails_to_check with _ -> (* Fallback to sender_email from message if API call fails *) let sender_email = Zulip_bot.Message.sender_email msg in List.mem sender_email config.Config.admin_emails let make_handler env config = fun ~storage ~identity ~flags msg -> let bot_email = identity.Zulip_bot.Bot.email in let sender_email = Zulip_bot.Message.sender_email msg in (* Ignore messages from the bot itself *) if sender_email = bot_email then Zulip_bot.Response.silent else let scope = Session.scope_of_message msg in let is_mentioned = List.mem "mentioned" flags in let is_private = Zulip_bot.Message.is_private msg in (* Check if this is a message we should respond to *) if is_mentioned || is_private then begin (* Activate the session on first @mention or DM *) Active_sessions.activate scope; let client = Zulip_bot.Storage.client storage in let content = Zulip_bot.Message.content msg |> strip_leading_mention in Log.info (fun m -> m "Received message (mentioned): %s" content); match Commands.parse content with | Commands.Help -> handle_help () | Commands.Status -> handle_status env ~storage config | Commands.Broadcast -> Broadcast.run ~sw:env.sw ~proc:env.process_mgr ~clock:env.clock ~fs:env.fs ~client ~storage ~config | Commands.Refresh -> handle_refresh env ~client ~storage ~config | Commands.Admin cmd -> if is_admin config ~storage msg then Zulip_bot.Response.reply (Admin.handle ~storage cmd) else Zulip_bot.Response.reply "Admin commands require authorization. Contact an admin to be added to the admin_emails list." | Commands.Clear_session -> (* Also deactivate the in-memory session *) Active_sessions.deactivate scope; handle_clear_session ~storage msg | Commands.Unknown _ -> handle_claude_query env ~zulip_client:client ~storage msg end else if Active_sessions.is_active scope then begin (* Session is active but bot not mentioned - accumulate silently *) Log.debug (fun m -> m "Accumulating message in active session for %s" (Session.scope_to_string scope)); accumulate_message_silently ~storage msg; Zulip_bot.Response.silent end else begin (* Session not active and not mentioned - ignore *) Log.debug (fun m -> m "Ignoring message (session not active for %s)" (Session.scope_to_string scope)); Zulip_bot.Response.silent end