+5
-2
.env.example
+5
-2
.env.example
+7
-3
docker-compose.yml
+7
-3
docker-compose.yml
···
6
6
environment:
7
7
- "AUTHORIZED_USERS=${AUTHORIZED_USERS}"
8
8
- "SERVICE=${SERVICE:?https://bsky.social}"
9
-
- "DB_PATH=sqlite.db"
9
+
- DB_PATH=/data/sqlite.db
10
10
- "GEMINI_MODEL=${GEMINI_MODEL:-gemini-2.5-flash}"
11
11
- "DID=${DID:?}"
12
12
- "HANDLE=${HANDLE:?}"
13
-
- "BSKY_PASSWORD=${BSKY_PASSWORD:?}"
13
+
- "APP_PASSWORD=${APP_PASSWORD:?}"
14
14
- "GEMINI_API_KEY=${GEMINI_API_KEY:?}"
15
+
- "USE_JETSTREAM=${USE_JETSTREAM:-false}"
15
16
volumes:
16
-
- aero_db:/sqlite.db
17
+
- "aero_db:/data"
18
+
19
+
volumes:
20
+
aero_db:
+53
-1
src/core.ts
+53
-1
src/core.ts
···
1
1
import { GoogleGenAI } from "@google/genai";
2
-
import { Bot } from "@skyware/bot";
2
+
import { Bot, EventStrategy } from "@skyware/bot";
3
3
import { env } from "./env";
4
+
import type { BinaryType } from "bun";
5
+
6
+
// Websocket patch was written by Claude, hopefully it doesn't suck
7
+
const OriginalWebSocket = global.WebSocket;
8
+
const binaryTypeDescriptor = Object.getOwnPropertyDescriptor(
9
+
OriginalWebSocket.prototype,
10
+
"binaryType",
11
+
);
12
+
13
+
const originalSetter = binaryTypeDescriptor?.set;
14
+
15
+
if (OriginalWebSocket && originalSetter) {
16
+
global.WebSocket = new Proxy(OriginalWebSocket, {
17
+
construct(target, args) {
18
+
//@ts-ignore
19
+
const ws = new target(...args) as WebSocket & {
20
+
_binaryType?: BinaryType;
21
+
};
22
+
23
+
Object.defineProperty(ws, "binaryType", {
24
+
get(): BinaryType {
25
+
return ws._binaryType ||
26
+
(binaryTypeDescriptor.get
27
+
? binaryTypeDescriptor.get.call(ws)
28
+
: "arraybuffer");
29
+
},
30
+
set(value: BinaryType) {
31
+
//@ts-ignore
32
+
if (value === "blob") {
33
+
originalSetter.call(ws, "arraybuffer");
34
+
//@ts-ignore
35
+
ws._binaryType = "blob";
36
+
} else {
37
+
originalSetter.call(ws, value);
38
+
ws._binaryType = value;
39
+
}
40
+
},
41
+
configurable: true,
42
+
});
43
+
44
+
return ws;
45
+
},
46
+
}) as typeof WebSocket;
47
+
}
4
48
5
49
export const bot = new Bot({
6
50
service: env.SERVICE,
7
51
emitChatEvents: true,
52
+
eventEmitterOptions: {
53
+
strategy: env.USE_JETSTREAM
54
+
? EventStrategy.Jetstream
55
+
: EventStrategy.Polling,
56
+
},
8
57
});
9
58
10
59
export const ai = new GoogleGenAI({
···
13
62
14
63
export const QUOTA_EXCEEDED_MESSAGE =
15
64
"You have exceeded your daily message quota (15). Please wait 24 hours before trying again.";
65
+
66
+
export const ERROR_MESSAGE =
67
+
"Sorry, I ran into an issue analyzing that post. Please try again.";
16
68
17
69
export const UNAUTHORIZED_MESSAGE =
18
70
"I canโt make sense of your noise just yet. Youโll need to be whitelisted before I can help.";
+9
-2
src/env.ts
+9
-2
src/env.ts
···
11
11
DB_PATH: z.string().default("sqlite.db"),
12
12
GEMINI_MODEL: z.string().default("gemini-2.5-flash"),
13
13
14
+
ADMIN_DID: z.string().optional(),
15
+
14
16
DID: z.string(),
15
17
HANDLE: z.string(),
16
-
BSKY_PASSWORD: z.string(),
18
+
APP_PASSWORD: z.string(),
17
19
18
20
GEMINI_API_KEY: z.string(),
19
21
DAILY_QUERY_LIMIT: z.preprocess(
20
-
(val) => (typeof val === "string" && val.trim() !== "") ? Number(val) : undefined,
22
+
(val) =>
23
+
(typeof val === "string" && val.trim() !== "") ? Number(val) : undefined,
21
24
z.number().int().positive().default(15),
25
+
),
26
+
USE_JETSTREAM: z.preprocess(
27
+
(val) => val === "true",
28
+
z.boolean().default(false),
22
29
),
23
30
});
24
31
+114
-51
src/handlers/messages.ts
+114
-51
src/handlers/messages.ts
···
1
1
import modelPrompt from "../model/prompt.txt";
2
-
import { ChatMessage, Conversation } from "@skyware/bot";
2
+
import { ChatMessage, Conversation, RichText } from "@skyware/bot";
3
3
import * as c from "../core";
4
4
import * as tools from "../tools";
5
5
import consola from "consola";
···
18
18
19
19
type SupportedFunctionCall = typeof c.SUPPORTED_FUNCTION_CALLS[number];
20
20
21
-
async function generateAIResponse(parsedConversation: string) {
21
+
async function generateAIResponse(parsedContext: string, messages: {
22
+
role: string;
23
+
parts: {
24
+
text: string;
25
+
}[];
26
+
}[]) {
22
27
const config = {
23
28
model: env.GEMINI_MODEL,
24
29
config: {
···
32
37
parts: [
33
38
{
34
39
text: modelPrompt
35
-
.replace("{{ handle }}", env.HANDLE),
40
+
.replace("$handle", env.HANDLE),
36
41
},
37
42
],
38
43
},
39
44
{
40
-
role: "user" as const,
45
+
role: "model" as const,
41
46
parts: [
42
47
{
43
-
text:
44
-
`Below is the yaml for the current conversation. The last message is the one to respond to. The post is the current one you are meant to be analyzing.
45
-
46
-
${parsedConversation}`,
48
+
text: parsedContext,
47
49
},
48
50
],
49
51
},
52
+
...messages,
50
53
];
51
54
52
55
let inference = await c.ai.models.generateContent({
···
99
102
return inference;
100
103
}
101
104
102
-
async function sendResponse(
103
-
conversation: Conversation,
104
-
text: string,
105
-
): Promise<void> {
106
-
if (exceedsGraphemes(text)) {
107
-
multipartResponse(conversation, text);
108
-
} else {
109
-
conversation.sendMessage({
110
-
text,
111
-
});
105
+
function addCitations(
106
+
inference: Awaited<ReturnType<typeof c.ai.models.generateContent>>,
107
+
) {
108
+
let originalText = inference.text ?? "";
109
+
if (!inference.candidates) {
110
+
return originalText;
111
+
}
112
+
const supports = inference.candidates[0]?.groundingMetadata
113
+
?.groundingSupports;
114
+
const chunks = inference.candidates[0]?.groundingMetadata?.groundingChunks;
115
+
116
+
const richText = new RichText();
117
+
118
+
if (!supports || !chunks || originalText === "") {
119
+
return richText.addText(originalText);
120
+
}
121
+
122
+
const sortedSupports = [...supports].sort(
123
+
(a, b) => (b.segment?.endIndex ?? 0) - (a.segment?.endIndex ?? 0),
124
+
);
125
+
126
+
let currentText = originalText;
127
+
128
+
for (const support of sortedSupports) {
129
+
const endIndex = support.segment?.endIndex;
130
+
if (endIndex === undefined || !support.groundingChunkIndices?.length) {
131
+
continue;
132
+
}
133
+
134
+
const citationLinks = support.groundingChunkIndices
135
+
.map((i) => {
136
+
const uri = chunks[i]?.web?.uri;
137
+
if (uri) {
138
+
return { index: i + 1, uri };
139
+
}
140
+
return null;
141
+
})
142
+
.filter(Boolean);
143
+
144
+
if (citationLinks.length > 0) {
145
+
richText.addText(currentText.slice(endIndex));
146
+
147
+
citationLinks.forEach((citation, idx) => {
148
+
if (citation) {
149
+
richText.addLink(`[${citation.index}]`, citation.uri);
150
+
if (idx < citationLinks.length - 1) {
151
+
richText.addText(", ");
152
+
}
153
+
}
154
+
});
155
+
156
+
currentText = currentText.slice(0, endIndex);
157
+
}
112
158
}
159
+
160
+
richText.addText(currentText);
161
+
162
+
return richText;
113
163
}
114
164
115
165
export async function handler(message: ChatMessage): Promise<void> {
···
125
175
: env.AUTHORIZED_USERS.includes(message.senderDid as any);
126
176
127
177
if (!authorized) {
128
-
conversation.sendMessage({
178
+
await conversation.sendMessage({
129
179
text: c.UNAUTHORIZED_MESSAGE,
130
180
});
131
181
132
182
return;
133
183
}
134
184
135
-
const today = new Date();
136
-
today.setHours(0, 0, 0, 0);
137
-
const tomorrow = new Date(today);
138
-
tomorrow.setDate(tomorrow.getDate() + 1);
185
+
if (message.senderDid != env.ADMIN_DID) {
186
+
const todayStart = new Date();
187
+
todayStart.setHours(0, 0, 0, 0);
139
188
140
-
const dailyCount = await db
141
-
.select({ count: count(messages.id) })
142
-
.from(messages)
143
-
.where(
144
-
and(
145
-
eq(messages.did, message.senderDid),
146
-
gte(messages.created_at, today),
147
-
lt(messages.created_at, tomorrow),
148
-
),
149
-
);
189
+
const dailyCount = await db
190
+
.select({ count: count(messages.id) })
191
+
.from(messages)
192
+
.where(
193
+
and(
194
+
eq(messages.did, message.senderDid),
195
+
gte(messages.created_at, todayStart),
196
+
),
197
+
);
150
198
151
-
if (dailyCount[0]!.count >= env.DAILY_QUERY_LIMIT) {
152
-
conversation.sendMessage({
153
-
text: c.QUOTA_EXCEEDED_MESSAGE,
154
-
});
155
-
return;
199
+
if (dailyCount[0]!.count >= env.DAILY_QUERY_LIMIT) {
200
+
conversation.sendMessage({
201
+
text: c.QUOTA_EXCEEDED_MESSAGE,
202
+
});
203
+
return;
204
+
}
156
205
}
157
206
158
207
logger.success("Found conversation");
···
160
209
text: "...",
161
210
});
162
211
163
-
const parsedConversation = await parseConversation(conversation);
164
-
165
-
logger.info("Parsed conversation: ", parsedConversation);
212
+
const parsedConversation = await parseConversation(conversation, message);
166
213
167
214
try {
168
-
const inference = await generateAIResponse(parsedConversation);
215
+
const inference = await generateAIResponse(
216
+
parsedConversation.context,
217
+
parsedConversation.messages,
218
+
);
169
219
if (!inference) {
170
-
throw new Error("Failed to generate text. Returned undefined.");
220
+
logger.error("Failed to generate text. Returned undefined.");
221
+
return;
171
222
}
172
223
173
-
logger.success("Generated text:", inference.text);
224
+
const responseText = inference.text;
225
+
const responseWithCitations = addCitations(inference);
174
226
175
-
saveMessage(conversation, env.DID, inference.text!);
227
+
if (responseWithCitations) {
228
+
logger.success("Generated text:", responseText);
229
+
saveMessage(conversation, env.DID, responseText!);
176
230
177
-
const responseText = inference.text;
178
-
if (responseText) {
179
-
await sendResponse(conversation, responseText);
231
+
if (exceedsGraphemes(responseWithCitations)) {
232
+
multipartResponse(conversation, responseWithCitations);
233
+
} else {
234
+
conversation.sendMessage({
235
+
text: responseWithCitations,
236
+
});
237
+
}
180
238
}
181
-
} catch (error) {
239
+
} catch (error: any) {
182
240
logger.error("Error in post handler:", error);
241
+
let errorMsg = c.ERROR_MESSAGE;
242
+
243
+
if (error.error.code == 503) {
244
+
errorMsg =
245
+
"Sorry, the AI model is currently overloaded. Please try again later.";
246
+
}
183
247
184
248
await conversation.sendMessage({
185
-
text:
186
-
"Sorry, I ran into an issue analyzing that post. Please try again.",
249
+
text: errorMsg,
187
250
});
188
251
}
189
252
}
+1
-1
src/index.ts
+1
-1
src/index.ts
+1
-1
src/model/prompt.txt
+1
-1
src/model/prompt.txt
+42
src/utils/cache.ts
+42
src/utils/cache.ts
···
1
+
interface CacheEntry<T> {
2
+
value: T;
3
+
expiry: number;
4
+
}
5
+
6
+
class TimedCache<T> {
7
+
private cache = new Map<string, CacheEntry<T>>();
8
+
private ttl: number; // Time to live in milliseconds
9
+
10
+
constructor(ttl: number) {
11
+
this.ttl = ttl;
12
+
}
13
+
14
+
get(key: string): T | undefined {
15
+
const entry = this.cache.get(key);
16
+
if (!entry) {
17
+
return undefined;
18
+
}
19
+
20
+
if (Date.now() > entry.expiry) {
21
+
this.cache.delete(key); // Entry expired
22
+
return undefined;
23
+
}
24
+
25
+
return entry.value;
26
+
}
27
+
28
+
set(key: string, value: T): void {
29
+
const expiry = Date.now() + this.ttl;
30
+
this.cache.set(key, { value, expiry });
31
+
}
32
+
33
+
delete(key: string): void {
34
+
this.cache.delete(key);
35
+
}
36
+
37
+
clear(): void {
38
+
this.cache.clear();
39
+
}
40
+
}
41
+
42
+
export const postCache = new TimedCache<any>(2 * 60 * 1000); // 2 minutes cache
+66
-37
src/utils/conversation.ts
+66
-37
src/utils/conversation.ts
···
2
2
type ChatMessage,
3
3
type Conversation,
4
4
graphemeLength,
5
+
RichText,
5
6
} from "@skyware/bot";
6
7
import * as yaml from "js-yaml";
7
8
import db from "../db";
8
9
import { conversations, messages } from "../db/schema";
9
10
import { and, eq } from "drizzle-orm";
10
11
import { env } from "../env";
11
-
import { bot, MAX_GRAPHEMES } from "../core";
12
+
import { bot, ERROR_MESSAGE, MAX_GRAPHEMES } from "../core";
12
13
import { parsePost, parsePostImages, traverseThread } from "./post";
14
+
import { postCache } from "../utils/cache";
13
15
14
16
/*
15
17
Utilities
16
18
*/
17
-
const resolveDid = (convo: Conversation, did: string) =>
18
-
convo.members.find((actor) => actor.did == did)!;
19
-
20
19
const getUserDid = (convo: Conversation) =>
21
20
convo.members.find((actor) => actor.did != env.DID)!;
22
21
···
29
28
/*
30
29
Conversations
31
30
*/
32
-
async function initConvo(convo: Conversation) {
31
+
async function initConvo(convo: Conversation, initialMessage: ChatMessage) {
33
32
const user = getUserDid(convo);
34
33
35
-
const initialMessage = (await convo.getMessages()).messages[0] as
36
-
| ChatMessage
37
-
| undefined;
38
-
if (!initialMessage) {
39
-
throw new Error("Failed to get initial message of conversation");
40
-
}
41
-
42
34
const postUri = await parseMessagePostUri(initialMessage);
43
35
if (!postUri) {
44
-
convo.sendMessage({
36
+
await convo.sendMessage({
45
37
text:
46
38
"Please send a post for me to make sense of the noise for you.",
47
39
});
40
+
48
41
throw new Error("No post reference in initial message.");
49
42
}
50
43
···
70
63
did: user.did,
71
64
postUri,
72
65
revision: _convo.revision,
73
-
text: initialMessage.text,
66
+
text:
67
+
!initialMessage.text ||
68
+
initialMessage.text.trim().length == 0
69
+
? "Explain this post."
70
+
: initialMessage.text,
74
71
});
75
72
76
73
return _convo!;
···
87
84
return convo;
88
85
}
89
86
90
-
export async function parseConversation(convo: Conversation) {
87
+
export async function parseConversation(
88
+
convo: Conversation,
89
+
latestMessage: ChatMessage,
90
+
) {
91
91
let row = await getConvo(convo.id);
92
92
if (!row) {
93
-
row = await initConvo(convo);
93
+
row = await initConvo(convo, latestMessage);
94
94
} else {
95
-
const latestMessage = (await convo.getMessages())
96
-
.messages[0] as ChatMessage;
97
-
98
95
const postUri = await parseMessagePostUri(latestMessage);
99
96
if (postUri) {
100
97
const [updatedRow] = await db
···
119
116
did: getUserDid(convo).did,
120
117
postUri: row.postUri,
121
118
revision: row.revision,
122
-
text: latestMessage!.text,
119
+
text: postUri &&
120
+
(!latestMessage.text ||
121
+
latestMessage.text.trim().length == 0)
122
+
? "Explain this post."
123
+
: latestMessage.text,
123
124
});
124
125
}
125
126
126
-
const post = await bot.getPost(row.postUri);
127
+
let post = postCache.get(row.postUri);
128
+
if (!post) {
129
+
post = await bot.getPost(row.postUri);
130
+
postCache.set(row.postUri, post);
131
+
}
127
132
const convoMessages = await getRelevantMessages(row!);
128
133
129
134
let parseResult = null;
130
135
try {
131
-
parseResult = yaml.dump({
132
-
post: await parsePost(post, true),
136
+
const parsedPost = await parsePost(post, true, new Set());
137
+
parseResult = {
138
+
context: yaml.dump({
139
+
post: parsedPost || null,
140
+
}),
133
141
messages: convoMessages.map((message) => {
134
-
const profile = resolveDid(convo, message.did);
142
+
const role = message.did == env.DID ? "model" : "user";
135
143
136
144
return {
137
-
user: profile.displayName
138
-
? `${profile.displayName} (${profile.handle})`
139
-
: `Handle: ${profile.handle}`,
140
-
text: message.text,
145
+
role,
146
+
parts: [
147
+
{
148
+
text: message.text,
149
+
},
150
+
],
141
151
};
142
152
}),
143
-
});
153
+
};
144
154
} catch (e) {
145
-
convo.sendMessage({
146
-
text:
147
-
"Sorry, I ran into an issue analyzing that post. Please try again.",
155
+
await convo.sendMessage({
156
+
text: ERROR_MESSAGE,
148
157
});
149
158
150
159
throw new Error("Failed to parse conversation");
···
169
178
.where(
170
179
and(
171
180
eq(messages.conversationId, convo.id),
172
-
eq(messages.postUri, convo!.postUri),
181
+
eq(messages.postUri, convo.postUri),
182
+
eq(messages.revision, convo.revision),
173
183
),
174
184
)
175
185
.limit(15);
···
192
202
.values({
193
203
conversationId: _convo.id,
194
204
postUri: _convo.postUri,
195
-
revision: _convo.postUri,
205
+
revision: _convo.revision,
196
206
did,
197
207
text,
198
208
});
···
201
211
/*
202
212
Reponse Utilities
203
213
*/
204
-
export function exceedsGraphemes(content: string) {
214
+
export function exceedsGraphemes(content: string | RichText) {
215
+
if (content instanceof RichText) {
216
+
return graphemeLength(content.text) > MAX_GRAPHEMES;
217
+
}
205
218
return graphemeLength(content) > MAX_GRAPHEMES;
206
219
}
207
220
···
229
242
return chunks.map((chunk, i) => `(${i + 1}/${total}) ${chunk}`);
230
243
}
231
244
232
-
export async function multipartResponse(convo: Conversation, content: string) {
233
-
const parts = splitResponse(content).filter((p) => p.trim().length > 0);
245
+
export async function multipartResponse(
246
+
convo: Conversation,
247
+
content: string | RichText,
248
+
) {
249
+
let parts: (string | RichText)[];
250
+
251
+
if (content instanceof RichText) {
252
+
if (exceedsGraphemes(content)) {
253
+
// If RichText exceeds grapheme limit, convert to plain text for splitting
254
+
parts = splitResponse(content.text);
255
+
} else {
256
+
// Otherwise, send the RichText directly as a single part
257
+
parts = [content];
258
+
}
259
+
} else {
260
+
// If content is a string, behave as before
261
+
parts = splitResponse(content);
262
+
}
234
263
235
264
for (const segment of parts) {
236
265
await convo.sendMessage({
+26
-11
src/utils/post.ts
+26
-11
src/utils/post.ts
···
8
8
import * as c from "../core";
9
9
import * as yaml from "js-yaml";
10
10
import type { ParsedPost } from "../types";
11
+
import { postCache } from "../utils/cache";
11
12
12
13
export async function parsePost(
13
14
post: Post,
14
15
includeThread: boolean,
15
-
): Promise<ParsedPost> {
16
+
seenUris: Set<string> = new Set(),
17
+
): Promise<ParsedPost | undefined> {
18
+
if (seenUris.has(post.uri)) {
19
+
return undefined;
20
+
}
21
+
seenUris.add(post.uri);
22
+
16
23
const [images, quotePost, ancestorPosts] = await Promise.all([
17
24
parsePostImages(post),
18
-
parseQuote(post),
25
+
parseQuote(post, seenUris),
19
26
includeThread ? traverseThread(post) : Promise.resolve(null),
20
27
]);
21
28
···
28
35
...(quotePost && { quotePost }),
29
36
...(ancestorPosts && {
30
37
thread: {
31
-
ancestors: await Promise.all(
32
-
ancestorPosts.map((ancestor) => parsePost(ancestor, false)),
33
-
),
38
+
ancestors: (await Promise.all(
39
+
ancestorPosts.map((ancestor) => parsePost(ancestor, false, seenUris)),
40
+
)).filter((post): post is ParsedPost => post !== undefined),
34
41
},
35
42
}),
36
43
};
37
44
}
38
45
39
-
async function parseQuote(post: Post) {
46
+
async function parseQuote(post: Post, seenUris: Set<string>) {
40
47
if (
41
48
!post.embed || (!post.embed.isRecord() && !post.embed.isRecordWithMedia())
42
49
) return undefined;
43
50
44
51
const record = (post.embed as RecordEmbed || RecordWithMediaEmbed).record;
45
-
console.log("embed: ", post.embed);
46
-
console.log("record: ", record);
47
-
const embedPost = await c.bot.getPost(record.uri);
52
+
if (seenUris.has(record.uri)) {
53
+
return undefined;
54
+
}
48
55
49
-
return await parsePost(embedPost, false);
56
+
let embedPost = postCache.get(record.uri);
57
+
if (!embedPost) {
58
+
embedPost = await c.bot.getPost(record.uri);
59
+
postCache.set(record.uri, embedPost);
60
+
}
61
+
62
+
return await parsePost(embedPost, false, seenUris);
50
63
}
51
64
52
65
export function parsePostImages(post: Post) {
···
63
76
}
64
77
}
65
78
66
-
return images.map((image, idx) => parseImage(image, idx + 1));
79
+
return images.map((image, idx) => parseImage(image, idx + 1)).filter((img) =>
80
+
img.alt.length > 0
81
+
);
67
82
}
68
83
69
84
function parseImage(image: EmbedImage, index: number) {