a tool to help your Letta AI agents navigate bluesky

Refactor notification processing and session management

This commit introduces several improvements to the notification handling system:
- Implements task thread claiming to prevent concurrent processing - Adds
dynamic delay and retry mechanisms - Improves logging and error handling -
Separates concerns between notification checking and reflection prompts

+102 -30
server.ts
··· 1 1 import { bsky } from "./util/bsky.ts"; 2 - import { session } from "./util/sessionCount.ts"; 2 + import { claimTaskThread, releaseTaskThread, session } from "./util/session.ts"; 3 + import { messageAgent } from "./util/messageAgent.ts"; 4 + import { createReflectionPrompt } from "./util/promptsAndLogs.ts"; 3 5 import { processNotification } from "./util/processNotification.ts"; 6 + import { randomDelayMs } from "./util/time.ts"; 4 7 5 - const checkNotifications = async () => { 6 - session.checkCount++; 7 - 8 - const statusMsg = 9 - `(${session.checkCount} notif checks, ${session.mentionCount} mentions, ${session.replyCount} replies, ${session.likeCount} likes, ${session.followCount} follows})`; 10 - 11 - if (session.isProcessing) { 8 + const notificationCheck = async () => { 9 + // check if agent is busy before attempting 10 + // to give it new tasks 11 + if (!claimTaskThread()) { 12 + const newDelay = randomDelayMs(5, 10); 12 13 console.log( 13 - `pausing notification check while processing notifications… [current session: ${session.processingCount}] ${statusMsg}`, 14 + `${session.agentName} is busy, checking for notifications again in ${ 15 + newDelay * 60 * 1000 16 + } minutes…`, 14 17 ); 18 + // session is busy, try to check notifications in 5~10 minutes. 19 + setTimeout(notificationCheck, newDelay); 15 20 return; 16 21 } 17 22 18 - const allNotifications = await bsky.listNotifications({ 19 - reasons: ["like", "repost", "follow", "mention", "reply", "quote"], 20 - limit: 50, 21 - }); 22 - 23 - const unreadNotifications = allNotifications.data.notifications.filter( 24 - (notif) => !notif.isRead, 25 - ); 23 + try { 24 + // gets ALL notifications, limited to the last 50 25 + const allNotifications = await bsky.listNotifications({ 26 + reasons: session.notificationTypes, 27 + limit: 50, 28 + }); 26 29 27 - if (unreadNotifications.length > 0) { 28 - session.isProcessing = true; 30 + // setting time when receiving all notifications 31 + // this is to possibly mark the notifications as read later 29 32 const startedProcessingTime: string = new Date().toISOString(); 30 33 31 - for (const notification of unreadNotifications) { 32 - await processNotification(notification); 34 + const unreadNotifications = allNotifications.data.notifications.filter( 35 + (notif) => !notif.isRead, 36 + ); 37 + 38 + if (unreadNotifications.length > 0) { 39 + console.log( 40 + `found ${unreadNotifications.length} notification(s), processing…`, 41 + ); 42 + 43 + // resets delay for future notification checks since 44 + // it's likely agent actions might incur new ones 45 + session.currentNotifDelaySeconds = session.minNotifDelaySeconds; 46 + 47 + // loop through all notifications until complete 48 + for (const notification of unreadNotifications) { 49 + let notificationCounter = 1; 50 + console.log( 51 + `processing notification #${notificationCounter} [${notification.reason}]`, 52 + ); 53 + await processNotification(notification); 54 + notificationCounter++; 55 + } 56 + 57 + // marks all notifications that were processed as seen 58 + // based on time from when retrieved instead of finished 59 + await bsky.updateSeenNotifications(startedProcessingTime); 60 + 61 + // increases counter for notification processing sessions 62 + session.processingCount++; 63 + } else { 64 + // increases delay to check notifications again later 65 + session.currentNotifDelaySeconds = Math.round(Math.min( 66 + session.currentNotifDelaySeconds * session.notifDelayMultiplier, 67 + session.maxNotifDelaySeconds, 68 + )); 69 + 70 + console.log( 71 + "no notifications…", 72 + `checking again in ${session.currentNotifDelaySeconds / 1000} seconds`, 73 + ); 33 74 } 75 + } catch (error) { 76 + console.error("Error in notificationCheck:", error); 77 + // since something went wrong, lets check for notifications again sooner 78 + session.currentNotifDelaySeconds = session.minNotifDelaySeconds; 79 + } finally { 80 + // actually schedules next time to check for notifications 81 + setTimeout(notificationCheck, session.currentNotifDelaySeconds); 82 + // ends work 83 + session.busy = false; 84 + } 85 + }; 34 86 35 - await bsky.updateSeenNotifications(startedProcessingTime); 87 + const reflectionPrompt = async () => { 88 + if (!claimTaskThread()) { 89 + const newDelay = randomDelayMs(5, 10); 36 90 37 - session.processingCount++; 38 - session.isProcessing = false; 39 - } else { 91 + console.log( 92 + `${session.agentName} is busy, will try reflecting again in ${ 93 + newDelay * 60 * 1000 94 + } minutes…`, 95 + ); 96 + // session is busy, try to check notifications in 5~10 minutes. 97 + setTimeout(notificationCheck, newDelay); 98 + return; 99 + } 100 + 101 + try { 102 + const prompt = createReflectionPrompt(); 103 + console.log("starting reflection prompt…"); 104 + await messageAgent(prompt); 105 + } catch (error) { 106 + console.error("Error in reflectionCheck:", error); 107 + } finally { 40 108 console.log( 41 - `no notifications, checking in ${session.delaySeconds} second${ 42 - session.delaySeconds > 1 ? "s" : "" 43 - } ${statusMsg}…`, 109 + "finished reflection prompt. returning to checking for notifications…", 44 110 ); 111 + setTimeout( 112 + reflectionPrompt, 113 + session.currentReflectDelayMinutes + randomDelayMs(1, 90), 114 + ); 115 + releaseTaskThread(); 116 + session.currentNotifDelaySeconds = session.minNotifDelaySeconds; 45 117 } 46 118 }; 47 119 48 - setInterval(checkNotifications, session.delay); 49 - checkNotifications(); 120 + await notificationCheck(); 121 + setTimeout(reflectionPrompt, session.minReflectDelayMinutes);
+1 -1
util/getCleanThread.ts
··· 1 1 import { bsky } from "./bsky.ts"; 2 - import type { AppBskyFeedDefs } from "npm:@atproto/api"; 3 2 4 3 type threadPost = { 5 4 authorHandle: string; ··· 22 21 23 22 if (thread) { 24 23 postsThread.push({ 24 + // @TODO fix the type errors here 25 25 authorHandle: `@${thread.post.author.handle}`, 26 26 message: thread.post.record.text, 27 27 uri: thread.post.uri,
+1 -1
util/messageAgent.ts
··· 2 2 3 3 export const client = new LettaClient({ 4 4 token: Deno.env.get("LETTA_API_KEY"), 5 - project: Deno.env.get("LETTA_PROJECT_SLUG"), 5 + project: Deno.env.get("LETTA_PROJECT_NAME"), 6 6 }); 7 7 8 8 export const messageAgent = async (prompt: string) => {
+21 -9
util/processNotification.ts
··· 1 1 import type { Notification } from "./types.ts"; 2 - // import { getCleanThread } from "./getCleanThread.ts"; 3 - import { session } from "./sessionCount.ts"; 2 + import { session } from "./session.ts"; 4 3 import { messageAgent } from "./messageAgent.ts"; 5 4 6 5 import { ··· 10 9 createQuotePrompt, 11 10 createReplyPrompt, 12 11 createRepostPrompt, 13 - } from "./prompts.ts"; 12 + } from "./promptsAndLogs.ts"; 14 13 15 14 export const processNotification = async (notification: Notification) => { 16 - const agentProject = Deno.env.get("LETTA_PROJECT_SLUG"); 15 + const agentProject = Deno.env.get("LETTA_PROJECT_NAME"); 17 16 const kind = notification.reason; 18 - 17 + console.log(`pausing notif checks, received ${kind} notification…`); 19 18 // const referencePostThread = await getCleanThread(notification.uri); 20 19 // console.log(referencePostThread[0]); 21 20 ··· 24 23 25 24 const prompt = await createLikePrompt(notification); 26 25 await messageAgent(prompt); 26 + console.log( 27 + `sent ${kind} notification to ${agentProject}, waiting for response…`, 28 + ); 27 29 } else if (kind == "repost") { 28 30 session.repostCount++; 29 31 30 32 const prompt = await createRepostPrompt(notification); 31 33 await messageAgent(prompt); 34 + console.log( 35 + `sent ${kind} notification to ${agentProject}, waiting for response…`, 36 + ); 32 37 } else if (kind == "follow") { 33 38 session.followCount++; 34 39 35 40 const prompt = createNewFollowerPrompt(notification); 36 41 await messageAgent(prompt); 37 - } else if (kind == "mention") { 38 - session.mentionCount++; 39 42 console.log( 40 - `initiating message to ${agentProject} [kind: ${kind}]…`, 43 + `sent ${kind} notification to ${agentProject}, waiting for response…`, 41 44 ); 42 - 45 + } else if (kind == "mention") { 43 46 const prompt = await createMentionPrompt(notification); 44 47 45 48 await messageAgent(prompt); 49 + console.log( 50 + `sent ${kind} notification to ${agentProject}, waiting for response…`, 51 + ); 46 52 } else if (kind == "reply") { 47 53 session.replyCount++; 48 54 49 55 const prompt = await createReplyPrompt(notification); 50 56 await messageAgent(prompt); 57 + console.log( 58 + `sent ${kind} notification to ${agentProject}, waiting for response…`, 59 + ); 51 60 } else if (kind == "quote") { 52 61 session.quoteCount++; 53 62 54 63 const prompt = await createQuotePrompt(notification); 55 64 await messageAgent(prompt); 65 + console.log( 66 + `sent ${kind} notification to ${agentProject}, waiting for response…`, 67 + ); 56 68 } else { 57 69 console.log( 58 70 `kind "${kind} does not have a system prompt associated with it, moving on…`,