a tool for shared writing and social publishing
1"use server";
2
3import { supabaseServerClient } from "supabase/serverClient";
4import { Tables, TablesInsert } from "supabase/database.types";
5import { AtUri } from "@atproto/syntax";
6import { idResolver } from "app/(home-pages)/reader/idResolver";
7import {
8 normalizeDocumentRecord,
9 normalizePublicationRecord,
10 type NormalizedDocument,
11 type NormalizedPublication,
12} from "src/utils/normalizeRecords";
13
14type NotificationRow = Tables<"notifications">;
15
16export type Notification = Omit<TablesInsert<"notifications">, "data"> & {
17 data: NotificationData;
18};
19
20export type NotificationData =
21 | { type: "comment"; comment_uri: string; parent_uri?: string }
22 | { type: "subscribe"; subscription_uri: string }
23 | { type: "quote"; bsky_post_uri: string; document_uri: string }
24 | { type: "bsky_post_embed"; document_uri: string; bsky_post_uri: string }
25 | { type: "mention"; document_uri: string; mention_type: "did" }
26 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string }
27 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string }
28 | { type: "comment_mention"; comment_uri: string; mention_type: "did" }
29 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string }
30 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string };
31
32export type HydratedNotification =
33 | HydratedCommentNotification
34 | HydratedSubscribeNotification
35 | HydratedQuoteNotification
36 | HydratedBskyPostEmbedNotification
37 | HydratedMentionNotification
38 | HydratedCommentMentionNotification;
39export async function hydrateNotifications(
40 notifications: NotificationRow[],
41): Promise<Array<HydratedNotification>> {
42 // Call all hydrators in parallel
43 const [commentNotifications, subscribeNotifications, quoteNotifications, bskyPostEmbedNotifications, mentionNotifications, commentMentionNotifications] = await Promise.all([
44 hydrateCommentNotifications(notifications),
45 hydrateSubscribeNotifications(notifications),
46 hydrateQuoteNotifications(notifications),
47 hydrateBskyPostEmbedNotifications(notifications),
48 hydrateMentionNotifications(notifications),
49 hydrateCommentMentionNotifications(notifications),
50 ]);
51
52 // Combine all hydrated notifications
53 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...bskyPostEmbedNotifications, ...mentionNotifications, ...commentMentionNotifications];
54
55 // Sort by created_at to maintain order
56 allHydrated.sort(
57 (a, b) =>
58 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(),
59 );
60
61 return allHydrated;
62}
63
64// Type guard to extract notification type
65type ExtractNotificationType<T extends NotificationData["type"]> = Extract<
66 NotificationData,
67 { type: T }
68>;
69
70export type HydratedCommentNotification = Awaited<
71 ReturnType<typeof hydrateCommentNotifications>
72>[0];
73
74async function hydrateCommentNotifications(notifications: NotificationRow[]) {
75 const commentNotifications = notifications.filter(
76 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } =>
77 (n.data as NotificationData)?.type === "comment",
78 );
79
80 if (commentNotifications.length === 0) {
81 return [];
82 }
83
84 // Fetch comment data from the database
85 const commentUris = commentNotifications.flatMap((n) =>
86 n.data.parent_uri
87 ? [n.data.comment_uri, n.data.parent_uri]
88 : [n.data.comment_uri],
89 );
90 const { data: comments } = await supabaseServerClient
91 .from("comments_on_documents")
92 .select(
93 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))",
94 )
95 .in("uri", commentUris);
96
97 return commentNotifications
98 .map((notification) => {
99 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri);
100 if (!commentData) return null;
101 return {
102 id: notification.id,
103 recipient: notification.recipient,
104 created_at: notification.created_at,
105 type: "comment" as const,
106 comment_uri: notification.data.comment_uri,
107 parentData: notification.data.parent_uri
108 ? comments?.find((c) => c.uri === notification.data.parent_uri)
109 : undefined,
110 commentData,
111 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri),
112 normalizedPublication: normalizePublicationRecord(
113 commentData.documents?.documents_in_publications[0]?.publications?.record,
114 ),
115 };
116 })
117 .filter((n) => n !== null);
118}
119
120export type HydratedSubscribeNotification = Awaited<
121 ReturnType<typeof hydrateSubscribeNotifications>
122>[0];
123
124async function hydrateSubscribeNotifications(notifications: NotificationRow[]) {
125 const subscribeNotifications = notifications.filter(
126 (
127 n,
128 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } =>
129 (n.data as NotificationData)?.type === "subscribe",
130 );
131
132 if (subscribeNotifications.length === 0) {
133 return [];
134 }
135
136 // Fetch subscription data from the database with related data
137 const subscriptionUris = subscribeNotifications.map(
138 (n) => n.data.subscription_uri,
139 );
140 const { data: subscriptions } = await supabaseServerClient
141 .from("publication_subscriptions")
142 .select("*, identities(bsky_profiles(*)), publications(*)")
143 .in("uri", subscriptionUris);
144
145 return subscribeNotifications
146 .map((notification) => {
147 const subscriptionData = subscriptions?.find((s) => s.uri === notification.data.subscription_uri);
148 if (!subscriptionData) return null;
149 return {
150 id: notification.id,
151 recipient: notification.recipient,
152 created_at: notification.created_at,
153 type: "subscribe" as const,
154 subscription_uri: notification.data.subscription_uri,
155 subscriptionData,
156 normalizedPublication: normalizePublicationRecord(subscriptionData.publications?.record),
157 };
158 })
159 .filter((n) => n !== null);
160}
161
162export type HydratedQuoteNotification = Awaited<
163 ReturnType<typeof hydrateQuoteNotifications>
164>[0];
165
166async function hydrateQuoteNotifications(notifications: NotificationRow[]) {
167 const quoteNotifications = notifications.filter(
168 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } =>
169 (n.data as NotificationData)?.type === "quote",
170 );
171
172 if (quoteNotifications.length === 0) {
173 return [];
174 }
175
176 // Fetch bsky post data and document data
177 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri);
178 const documentUris = quoteNotifications.map((n) => n.data.document_uri);
179
180 const { data: bskyPosts } = await supabaseServerClient
181 .from("bsky_posts")
182 .select("*")
183 .in("uri", bskyPostUris);
184
185 const { data: documents } = await supabaseServerClient
186 .from("documents")
187 .select("*, documents_in_publications(publications(*))")
188 .in("uri", documentUris);
189
190 return quoteNotifications
191 .map((notification) => {
192 const bskyPost = bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri);
193 const document = documents?.find((d) => d.uri === notification.data.document_uri);
194 if (!bskyPost || !document) return null;
195 return {
196 id: notification.id,
197 recipient: notification.recipient,
198 created_at: notification.created_at,
199 type: "quote" as const,
200 bsky_post_uri: notification.data.bsky_post_uri,
201 document_uri: notification.data.document_uri,
202 bskyPost,
203 document,
204 normalizedDocument: normalizeDocumentRecord(document.data, document.uri),
205 normalizedPublication: normalizePublicationRecord(
206 document.documents_in_publications[0]?.publications?.record,
207 ),
208 };
209 })
210 .filter((n) => n !== null);
211}
212
213export type HydratedBskyPostEmbedNotification = Awaited<
214 ReturnType<typeof hydrateBskyPostEmbedNotifications>
215>[0];
216
217async function hydrateBskyPostEmbedNotifications(notifications: NotificationRow[]) {
218 const bskyPostEmbedNotifications = notifications.filter(
219 (n): n is NotificationRow & { data: ExtractNotificationType<"bsky_post_embed"> } =>
220 (n.data as NotificationData)?.type === "bsky_post_embed",
221 );
222
223 if (bskyPostEmbedNotifications.length === 0) {
224 return [];
225 }
226
227 // Fetch document data (the leaflet that embedded the post)
228 const documentUris = bskyPostEmbedNotifications.map((n) => n.data.document_uri);
229 const bskyPostUris = bskyPostEmbedNotifications.map((n) => n.data.bsky_post_uri);
230
231 const [{ data: documents }, { data: cachedBskyPosts }] = await Promise.all([
232 supabaseServerClient
233 .from("documents")
234 .select("*, documents_in_publications(publications(*))")
235 .in("uri", documentUris),
236 supabaseServerClient
237 .from("bsky_posts")
238 .select("*")
239 .in("uri", bskyPostUris),
240 ]);
241
242 // Find which posts we need to fetch from the API
243 const cachedPostUris = new Set(cachedBskyPosts?.map((p) => p.uri) ?? []);
244 const missingPostUris = bskyPostUris.filter((uri) => !cachedPostUris.has(uri));
245
246 // Fetch missing posts from Bluesky API
247 const fetchedPosts = new Map<string, { text: string } | null>();
248 if (missingPostUris.length > 0) {
249 try {
250 const { AtpAgent } = await import("@atproto/api");
251 const agent = new AtpAgent({ service: "https://public.api.bsky.app" });
252 const response = await agent.app.bsky.feed.getPosts({ uris: missingPostUris });
253 for (const post of response.data.posts) {
254 const record = post.record as { text?: string };
255 fetchedPosts.set(post.uri, { text: record.text ?? "" });
256 }
257 } catch (error) {
258 console.error("Failed to fetch Bluesky posts:", error);
259 }
260 }
261
262 // Extract unique DIDs from document URIs to resolve handles
263 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))];
264
265 // Resolve DIDs to handles in parallel
266 const didToHandleMap = new Map<string, string | null>();
267 await Promise.all(
268 documentCreatorDids.map(async (did) => {
269 try {
270 const resolved = await idResolver.did.resolve(did);
271 const handle = resolved?.alsoKnownAs?.[0]
272 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
273 : null;
274 didToHandleMap.set(did, handle);
275 } catch (error) {
276 console.error(`Failed to resolve DID ${did}:`, error);
277 didToHandleMap.set(did, null);
278 }
279 }),
280 );
281
282 return bskyPostEmbedNotifications
283 .map((notification) => {
284 const document = documents?.find((d) => d.uri === notification.data.document_uri);
285 if (!document) return null;
286
287 const documentCreatorDid = new AtUri(notification.data.document_uri).host;
288 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null;
289
290 // Get post text from cache or fetched data
291 const cachedPost = cachedBskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri);
292 const postView = cachedPost?.post_view as { record?: { text?: string } } | undefined;
293 const bskyPostText = postView?.record?.text ?? fetchedPosts.get(notification.data.bsky_post_uri)?.text ?? null;
294
295 return {
296 id: notification.id,
297 recipient: notification.recipient,
298 created_at: notification.created_at,
299 type: "bsky_post_embed" as const,
300 document_uri: notification.data.document_uri,
301 bsky_post_uri: notification.data.bsky_post_uri,
302 document,
303 documentCreatorHandle,
304 bskyPostText,
305 normalizedDocument: normalizeDocumentRecord(document.data, document.uri),
306 normalizedPublication: normalizePublicationRecord(
307 document.documents_in_publications[0]?.publications?.record,
308 ),
309 };
310 })
311 .filter((n) => n !== null);
312}
313
314export type HydratedMentionNotification = Awaited<
315 ReturnType<typeof hydrateMentionNotifications>
316>[0];
317
318async function hydrateMentionNotifications(notifications: NotificationRow[]) {
319 const mentionNotifications = notifications.filter(
320 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } =>
321 (n.data as NotificationData)?.type === "mention",
322 );
323
324 if (mentionNotifications.length === 0) {
325 return [];
326 }
327
328 // Fetch document data from the database
329 const documentUris = mentionNotifications.map((n) => n.data.document_uri);
330 const { data: documents } = await supabaseServerClient
331 .from("documents")
332 .select("*, documents_in_publications(publications(*))")
333 .in("uri", documentUris);
334
335 // Extract unique DIDs from document URIs to resolve handles
336 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))];
337
338 // Resolve DIDs to handles in parallel
339 const didToHandleMap = new Map<string, string | null>();
340 await Promise.all(
341 documentCreatorDids.map(async (did) => {
342 try {
343 const resolved = await idResolver.did.resolve(did);
344 const handle = resolved?.alsoKnownAs?.[0]
345 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
346 : null;
347 didToHandleMap.set(did, handle);
348 } catch (error) {
349 console.error(`Failed to resolve DID ${did}:`, error);
350 didToHandleMap.set(did, null);
351 }
352 }),
353 );
354
355 // Fetch mentioned publications and documents
356 const mentionedPublicationUris = mentionNotifications
357 .filter((n) => n.data.mention_type === "publication")
358 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri);
359
360 const mentionedDocumentUris = mentionNotifications
361 .filter((n) => n.data.mention_type === "document")
362 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri);
363
364 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([
365 mentionedPublicationUris.length > 0
366 ? supabaseServerClient
367 .from("publications")
368 .select("*")
369 .in("uri", mentionedPublicationUris)
370 : Promise.resolve({ data: [] }),
371 mentionedDocumentUris.length > 0
372 ? supabaseServerClient
373 .from("documents")
374 .select("*, documents_in_publications(publications(*))")
375 .in("uri", mentionedDocumentUris)
376 : Promise.resolve({ data: [] }),
377 ]);
378
379 return mentionNotifications
380 .map((notification) => {
381 const document = documents?.find((d) => d.uri === notification.data.document_uri);
382 if (!document) return null;
383
384 const mentionedUri = notification.data.mention_type !== "did"
385 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri
386 : undefined;
387
388 const documentCreatorDid = new AtUri(notification.data.document_uri).host;
389 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null;
390
391 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined;
392 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined;
393
394 return {
395 id: notification.id,
396 recipient: notification.recipient,
397 created_at: notification.created_at,
398 type: "mention" as const,
399 document_uri: notification.data.document_uri,
400 mention_type: notification.data.mention_type,
401 mentioned_uri: mentionedUri,
402 document,
403 documentCreatorHandle,
404 mentionedPublication,
405 mentionedDocument: mentionedDoc,
406 normalizedDocument: normalizeDocumentRecord(document.data, document.uri),
407 normalizedPublication: normalizePublicationRecord(
408 document.documents_in_publications[0]?.publications?.record,
409 ),
410 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record),
411 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri),
412 };
413 })
414 .filter((n) => n !== null);
415}
416
417export type HydratedCommentMentionNotification = Awaited<
418 ReturnType<typeof hydrateCommentMentionNotifications>
419>[0];
420
421async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) {
422 const commentMentionNotifications = notifications.filter(
423 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } =>
424 (n.data as NotificationData)?.type === "comment_mention",
425 );
426
427 if (commentMentionNotifications.length === 0) {
428 return [];
429 }
430
431 // Fetch comment data from the database
432 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri);
433 const { data: comments } = await supabaseServerClient
434 .from("comments_on_documents")
435 .select(
436 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))",
437 )
438 .in("uri", commentUris);
439
440 // Extract unique DIDs from comment URIs to resolve handles
441 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))];
442
443 // Resolve DIDs to handles in parallel
444 const didToHandleMap = new Map<string, string | null>();
445 await Promise.all(
446 commenterDids.map(async (did) => {
447 try {
448 const resolved = await idResolver.did.resolve(did);
449 const handle = resolved?.alsoKnownAs?.[0]
450 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
451 : null;
452 didToHandleMap.set(did, handle);
453 } catch (error) {
454 console.error(`Failed to resolve DID ${did}:`, error);
455 didToHandleMap.set(did, null);
456 }
457 }),
458 );
459
460 // Fetch mentioned publications and documents
461 const mentionedPublicationUris = commentMentionNotifications
462 .filter((n) => n.data.mention_type === "publication")
463 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri);
464
465 const mentionedDocumentUris = commentMentionNotifications
466 .filter((n) => n.data.mention_type === "document")
467 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri);
468
469 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([
470 mentionedPublicationUris.length > 0
471 ? supabaseServerClient
472 .from("publications")
473 .select("*")
474 .in("uri", mentionedPublicationUris)
475 : Promise.resolve({ data: [] }),
476 mentionedDocumentUris.length > 0
477 ? supabaseServerClient
478 .from("documents")
479 .select("*, documents_in_publications(publications(*))")
480 .in("uri", mentionedDocumentUris)
481 : Promise.resolve({ data: [] }),
482 ]);
483
484 return commentMentionNotifications
485 .map((notification) => {
486 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri);
487 if (!commentData) return null;
488
489 const mentionedUri = notification.data.mention_type !== "did"
490 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri
491 : undefined;
492
493 const commenterDid = new AtUri(notification.data.comment_uri).host;
494 const commenterHandle = didToHandleMap.get(commenterDid) ?? null;
495
496 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined;
497 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined;
498
499 return {
500 id: notification.id,
501 recipient: notification.recipient,
502 created_at: notification.created_at,
503 type: "comment_mention" as const,
504 comment_uri: notification.data.comment_uri,
505 mention_type: notification.data.mention_type,
506 mentioned_uri: mentionedUri,
507 commentData,
508 commenterHandle,
509 mentionedPublication,
510 mentionedDocument: mentionedDoc,
511 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri),
512 normalizedPublication: normalizePublicationRecord(
513 commentData.documents?.documents_in_publications[0]?.publications?.record,
514 ),
515 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record),
516 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri),
517 };
518 })
519 .filter((n) => n !== null);
520}
521
522export async function pingIdentityToUpdateNotification(did: string) {
523 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`);
524 await channel.send({
525 type: "broadcast",
526 event: "notification",
527 payload: { message: "poke" },
528 });
529 await supabaseServerClient.removeChannel(channel);
530}