forked from
leaflet.pub/leaflet
a tool for shared writing and social publishing
1"use server";
2
3import { supabaseServerClient } from "supabase/serverClient";
4import { Tables, TablesInsert } from "supabase/database.types";
5import { AtUri } from "@atproto/syntax";
6import { idResolver } from "app/(home-pages)/reader/idResolver";
7
8type NotificationRow = Tables<"notifications">;
9
10export type Notification = Omit<TablesInsert<"notifications">, "data"> & {
11 data: NotificationData;
12};
13
14export type NotificationData =
15 | { type: "comment"; comment_uri: string; parent_uri?: string }
16 | { type: "subscribe"; subscription_uri: string }
17 | { type: "quote"; bsky_post_uri: string; document_uri: string }
18 | { type: "mention"; document_uri: string; mention_type: "did" }
19 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string }
20 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string }
21 | { type: "comment_mention"; comment_uri: string; mention_type: "did" }
22 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string }
23 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string };
24
25export type HydratedNotification =
26 | HydratedCommentNotification
27 | HydratedSubscribeNotification
28 | HydratedQuoteNotification
29 | HydratedMentionNotification
30 | HydratedCommentMentionNotification;
31export async function hydrateNotifications(
32 notifications: NotificationRow[],
33): Promise<Array<HydratedNotification>> {
34 // Call all hydrators in parallel
35 const [commentNotifications, subscribeNotifications, quoteNotifications, mentionNotifications, commentMentionNotifications] = await Promise.all([
36 hydrateCommentNotifications(notifications),
37 hydrateSubscribeNotifications(notifications),
38 hydrateQuoteNotifications(notifications),
39 hydrateMentionNotifications(notifications),
40 hydrateCommentMentionNotifications(notifications),
41 ]);
42
43 // Combine all hydrated notifications
44 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...mentionNotifications, ...commentMentionNotifications];
45
46 // Sort by created_at to maintain order
47 allHydrated.sort(
48 (a, b) =>
49 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(),
50 );
51
52 return allHydrated;
53}
54
55// Type guard to extract notification type
56type ExtractNotificationType<T extends NotificationData["type"]> = Extract<
57 NotificationData,
58 { type: T }
59>;
60
61export type HydratedCommentNotification = Awaited<
62 ReturnType<typeof hydrateCommentNotifications>
63>[0];
64
65async function hydrateCommentNotifications(notifications: NotificationRow[]) {
66 const commentNotifications = notifications.filter(
67 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } =>
68 (n.data as NotificationData)?.type === "comment",
69 );
70
71 if (commentNotifications.length === 0) {
72 return [];
73 }
74
75 // Fetch comment data from the database
76 const commentUris = commentNotifications.flatMap((n) =>
77 n.data.parent_uri
78 ? [n.data.comment_uri, n.data.parent_uri]
79 : [n.data.comment_uri],
80 );
81 const { data: comments } = await supabaseServerClient
82 .from("comments_on_documents")
83 .select(
84 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))",
85 )
86 .in("uri", commentUris);
87
88 return commentNotifications
89 .map((notification) => {
90 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri);
91 if (!commentData) return null;
92 return {
93 id: notification.id,
94 recipient: notification.recipient,
95 created_at: notification.created_at,
96 type: "comment" as const,
97 comment_uri: notification.data.comment_uri,
98 parentData: notification.data.parent_uri
99 ? comments?.find((c) => c.uri === notification.data.parent_uri)
100 : undefined,
101 commentData,
102 };
103 })
104 .filter((n) => n !== null);
105}
106
107export type HydratedSubscribeNotification = Awaited<
108 ReturnType<typeof hydrateSubscribeNotifications>
109>[0];
110
111async function hydrateSubscribeNotifications(notifications: NotificationRow[]) {
112 const subscribeNotifications = notifications.filter(
113 (
114 n,
115 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } =>
116 (n.data as NotificationData)?.type === "subscribe",
117 );
118
119 if (subscribeNotifications.length === 0) {
120 return [];
121 }
122
123 // Fetch subscription data from the database with related data
124 const subscriptionUris = subscribeNotifications.map(
125 (n) => n.data.subscription_uri,
126 );
127 const { data: subscriptions } = await supabaseServerClient
128 .from("publication_subscriptions")
129 .select("*, identities(bsky_profiles(*)), publications(*)")
130 .in("uri", subscriptionUris);
131
132 return subscribeNotifications
133 .map((notification) => {
134 const subscriptionData = subscriptions?.find((s) => s.uri === notification.data.subscription_uri);
135 if (!subscriptionData) return null;
136 return {
137 id: notification.id,
138 recipient: notification.recipient,
139 created_at: notification.created_at,
140 type: "subscribe" as const,
141 subscription_uri: notification.data.subscription_uri,
142 subscriptionData,
143 };
144 })
145 .filter((n) => n !== null);
146}
147
148export type HydratedQuoteNotification = Awaited<
149 ReturnType<typeof hydrateQuoteNotifications>
150>[0];
151
152async function hydrateQuoteNotifications(notifications: NotificationRow[]) {
153 const quoteNotifications = notifications.filter(
154 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } =>
155 (n.data as NotificationData)?.type === "quote",
156 );
157
158 if (quoteNotifications.length === 0) {
159 return [];
160 }
161
162 // Fetch bsky post data and document data
163 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri);
164 const documentUris = quoteNotifications.map((n) => n.data.document_uri);
165
166 const { data: bskyPosts } = await supabaseServerClient
167 .from("bsky_posts")
168 .select("*")
169 .in("uri", bskyPostUris);
170
171 const { data: documents } = await supabaseServerClient
172 .from("documents")
173 .select("*, documents_in_publications(publications(*))")
174 .in("uri", documentUris);
175
176 return quoteNotifications
177 .map((notification) => {
178 const bskyPost = bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri);
179 const document = documents?.find((d) => d.uri === notification.data.document_uri);
180 if (!bskyPost || !document) return null;
181 return {
182 id: notification.id,
183 recipient: notification.recipient,
184 created_at: notification.created_at,
185 type: "quote" as const,
186 bsky_post_uri: notification.data.bsky_post_uri,
187 document_uri: notification.data.document_uri,
188 bskyPost,
189 document,
190 };
191 })
192 .filter((n) => n !== null);
193}
194
195export type HydratedMentionNotification = Awaited<
196 ReturnType<typeof hydrateMentionNotifications>
197>[0];
198
199async function hydrateMentionNotifications(notifications: NotificationRow[]) {
200 const mentionNotifications = notifications.filter(
201 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } =>
202 (n.data as NotificationData)?.type === "mention",
203 );
204
205 if (mentionNotifications.length === 0) {
206 return [];
207 }
208
209 // Fetch document data from the database
210 const documentUris = mentionNotifications.map((n) => n.data.document_uri);
211 const { data: documents } = await supabaseServerClient
212 .from("documents")
213 .select("*, documents_in_publications(publications(*))")
214 .in("uri", documentUris);
215
216 // Extract unique DIDs from document URIs to resolve handles
217 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))];
218
219 // Resolve DIDs to handles in parallel
220 const didToHandleMap = new Map<string, string | null>();
221 await Promise.all(
222 documentCreatorDids.map(async (did) => {
223 try {
224 const resolved = await idResolver.did.resolve(did);
225 const handle = resolved?.alsoKnownAs?.[0]
226 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
227 : null;
228 didToHandleMap.set(did, handle);
229 } catch (error) {
230 console.error(`Failed to resolve DID ${did}:`, error);
231 didToHandleMap.set(did, null);
232 }
233 }),
234 );
235
236 // Fetch mentioned publications and documents
237 const mentionedPublicationUris = mentionNotifications
238 .filter((n) => n.data.mention_type === "publication")
239 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri);
240
241 const mentionedDocumentUris = mentionNotifications
242 .filter((n) => n.data.mention_type === "document")
243 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri);
244
245 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([
246 mentionedPublicationUris.length > 0
247 ? supabaseServerClient
248 .from("publications")
249 .select("*")
250 .in("uri", mentionedPublicationUris)
251 : Promise.resolve({ data: [] }),
252 mentionedDocumentUris.length > 0
253 ? supabaseServerClient
254 .from("documents")
255 .select("*, documents_in_publications(publications(*))")
256 .in("uri", mentionedDocumentUris)
257 : Promise.resolve({ data: [] }),
258 ]);
259
260 return mentionNotifications
261 .map((notification) => {
262 const document = documents?.find((d) => d.uri === notification.data.document_uri);
263 if (!document) return null;
264
265 const mentionedUri = notification.data.mention_type !== "did"
266 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri
267 : undefined;
268
269 const documentCreatorDid = new AtUri(notification.data.document_uri).host;
270 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null;
271
272 return {
273 id: notification.id,
274 recipient: notification.recipient,
275 created_at: notification.created_at,
276 type: "mention" as const,
277 document_uri: notification.data.document_uri,
278 mention_type: notification.data.mention_type,
279 mentioned_uri: mentionedUri,
280 document,
281 documentCreatorHandle,
282 mentionedPublication: mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined,
283 mentionedDocument: mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined,
284 };
285 })
286 .filter((n) => n !== null);
287}
288
289export type HydratedCommentMentionNotification = Awaited<
290 ReturnType<typeof hydrateCommentMentionNotifications>
291>[0];
292
293async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) {
294 const commentMentionNotifications = notifications.filter(
295 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } =>
296 (n.data as NotificationData)?.type === "comment_mention",
297 );
298
299 if (commentMentionNotifications.length === 0) {
300 return [];
301 }
302
303 // Fetch comment data from the database
304 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri);
305 const { data: comments } = await supabaseServerClient
306 .from("comments_on_documents")
307 .select(
308 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))",
309 )
310 .in("uri", commentUris);
311
312 // Extract unique DIDs from comment URIs to resolve handles
313 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))];
314
315 // Resolve DIDs to handles in parallel
316 const didToHandleMap = new Map<string, string | null>();
317 await Promise.all(
318 commenterDids.map(async (did) => {
319 try {
320 const resolved = await idResolver.did.resolve(did);
321 const handle = resolved?.alsoKnownAs?.[0]
322 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix
323 : null;
324 didToHandleMap.set(did, handle);
325 } catch (error) {
326 console.error(`Failed to resolve DID ${did}:`, error);
327 didToHandleMap.set(did, null);
328 }
329 }),
330 );
331
332 // Fetch mentioned publications and documents
333 const mentionedPublicationUris = commentMentionNotifications
334 .filter((n) => n.data.mention_type === "publication")
335 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri);
336
337 const mentionedDocumentUris = commentMentionNotifications
338 .filter((n) => n.data.mention_type === "document")
339 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri);
340
341 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([
342 mentionedPublicationUris.length > 0
343 ? supabaseServerClient
344 .from("publications")
345 .select("*")
346 .in("uri", mentionedPublicationUris)
347 : Promise.resolve({ data: [] }),
348 mentionedDocumentUris.length > 0
349 ? supabaseServerClient
350 .from("documents")
351 .select("*, documents_in_publications(publications(*))")
352 .in("uri", mentionedDocumentUris)
353 : Promise.resolve({ data: [] }),
354 ]);
355
356 return commentMentionNotifications
357 .map((notification) => {
358 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri);
359 if (!commentData) return null;
360
361 const mentionedUri = notification.data.mention_type !== "did"
362 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri
363 : undefined;
364
365 const commenterDid = new AtUri(notification.data.comment_uri).host;
366 const commenterHandle = didToHandleMap.get(commenterDid) ?? null;
367
368 return {
369 id: notification.id,
370 recipient: notification.recipient,
371 created_at: notification.created_at,
372 type: "comment_mention" as const,
373 comment_uri: notification.data.comment_uri,
374 mention_type: notification.data.mention_type,
375 mentioned_uri: mentionedUri,
376 commentData,
377 commenterHandle,
378 mentionedPublication: mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined,
379 mentionedDocument: mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined,
380 };
381 })
382 .filter((n) => n !== null);
383}
384
385export async function pingIdentityToUpdateNotification(did: string) {
386 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`);
387 await channel.send({
388 type: "broadcast",
389 event: "notification",
390 payload: { message: "poke" },
391 });
392 await supabaseServerClient.removeChannel(channel);
393}