a tool for shared writing and social publishing
1"use server"; 2 3import { supabaseServerClient } from "supabase/serverClient"; 4import { Tables, TablesInsert } from "supabase/database.types"; 5import { AtUri } from "@atproto/syntax"; 6import { idResolver } from "app/(home-pages)/reader/idResolver"; 7import { 8 normalizeDocumentRecord, 9 normalizePublicationRecord, 10 type NormalizedDocument, 11 type NormalizedPublication, 12} from "src/utils/normalizeRecords"; 13 14type NotificationRow = Tables<"notifications">; 15 16export type Notification = Omit<TablesInsert<"notifications">, "data"> & { 17 data: NotificationData; 18}; 19 20export type NotificationData = 21 | { type: "comment"; comment_uri: string; parent_uri?: string } 22 | { type: "subscribe"; subscription_uri: string } 23 | { type: "quote"; bsky_post_uri: string; document_uri: string } 24 | { type: "bsky_post_embed"; document_uri: string; bsky_post_uri: string } 25 | { type: "mention"; document_uri: string; mention_type: "did" } 26 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string } 27 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string } 28 | { type: "comment_mention"; comment_uri: string; mention_type: "did" } 29 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string } 30 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string }; 31 32export type HydratedNotification = 33 | HydratedCommentNotification 34 | HydratedSubscribeNotification 35 | HydratedQuoteNotification 36 | HydratedBskyPostEmbedNotification 37 | HydratedMentionNotification 38 | HydratedCommentMentionNotification; 39export async function hydrateNotifications( 40 notifications: NotificationRow[], 41): Promise<Array<HydratedNotification>> { 42 // Call all hydrators in parallel 43 const [commentNotifications, subscribeNotifications, quoteNotifications, bskyPostEmbedNotifications, mentionNotifications, commentMentionNotifications] = await Promise.all([ 44 hydrateCommentNotifications(notifications), 45 hydrateSubscribeNotifications(notifications), 46 hydrateQuoteNotifications(notifications), 47 hydrateBskyPostEmbedNotifications(notifications), 48 hydrateMentionNotifications(notifications), 49 hydrateCommentMentionNotifications(notifications), 50 ]); 51 52 // Combine all hydrated notifications 53 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...bskyPostEmbedNotifications, ...mentionNotifications, ...commentMentionNotifications]; 54 55 // Sort by created_at to maintain order 56 allHydrated.sort( 57 (a, b) => 58 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(), 59 ); 60 61 return allHydrated; 62} 63 64// Type guard to extract notification type 65type ExtractNotificationType<T extends NotificationData["type"]> = Extract< 66 NotificationData, 67 { type: T } 68>; 69 70export type HydratedCommentNotification = Awaited< 71 ReturnType<typeof hydrateCommentNotifications> 72>[0]; 73 74async function hydrateCommentNotifications(notifications: NotificationRow[]) { 75 const commentNotifications = notifications.filter( 76 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } => 77 (n.data as NotificationData)?.type === "comment", 78 ); 79 80 if (commentNotifications.length === 0) { 81 return []; 82 } 83 84 // Fetch comment data from the database 85 const commentUris = commentNotifications.flatMap((n) => 86 n.data.parent_uri 87 ? [n.data.comment_uri, n.data.parent_uri] 88 : [n.data.comment_uri], 89 ); 90 const { data: comments } = await supabaseServerClient 91 .from("comments_on_documents") 92 .select( 93 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 94 ) 95 .in("uri", commentUris); 96 97 return commentNotifications 98 .map((notification) => { 99 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 100 if (!commentData) return null; 101 return { 102 id: notification.id, 103 recipient: notification.recipient, 104 created_at: notification.created_at, 105 type: "comment" as const, 106 comment_uri: notification.data.comment_uri, 107 parentData: notification.data.parent_uri 108 ? comments?.find((c) => c.uri === notification.data.parent_uri) 109 : undefined, 110 commentData, 111 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri), 112 normalizedPublication: normalizePublicationRecord( 113 commentData.documents?.documents_in_publications[0]?.publications?.record, 114 ), 115 }; 116 }) 117 .filter((n) => n !== null); 118} 119 120export type HydratedSubscribeNotification = Awaited< 121 ReturnType<typeof hydrateSubscribeNotifications> 122>[0]; 123 124async function hydrateSubscribeNotifications(notifications: NotificationRow[]) { 125 const subscribeNotifications = notifications.filter( 126 ( 127 n, 128 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } => 129 (n.data as NotificationData)?.type === "subscribe", 130 ); 131 132 if (subscribeNotifications.length === 0) { 133 return []; 134 } 135 136 // Fetch subscription data from the database with related data 137 const subscriptionUris = subscribeNotifications.map( 138 (n) => n.data.subscription_uri, 139 ); 140 const { data: subscriptions } = await supabaseServerClient 141 .from("publication_subscriptions") 142 .select("*, identities(bsky_profiles(*)), publications(*)") 143 .in("uri", subscriptionUris); 144 145 return subscribeNotifications 146 .map((notification) => { 147 const subscriptionData = subscriptions?.find((s) => s.uri === notification.data.subscription_uri); 148 if (!subscriptionData) return null; 149 return { 150 id: notification.id, 151 recipient: notification.recipient, 152 created_at: notification.created_at, 153 type: "subscribe" as const, 154 subscription_uri: notification.data.subscription_uri, 155 subscriptionData, 156 normalizedPublication: normalizePublicationRecord(subscriptionData.publications?.record), 157 }; 158 }) 159 .filter((n) => n !== null); 160} 161 162export type HydratedQuoteNotification = Awaited< 163 ReturnType<typeof hydrateQuoteNotifications> 164>[0]; 165 166async function hydrateQuoteNotifications(notifications: NotificationRow[]) { 167 const quoteNotifications = notifications.filter( 168 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } => 169 (n.data as NotificationData)?.type === "quote", 170 ); 171 172 if (quoteNotifications.length === 0) { 173 return []; 174 } 175 176 // Fetch bsky post data and document data 177 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri); 178 const documentUris = quoteNotifications.map((n) => n.data.document_uri); 179 180 const { data: bskyPosts } = await supabaseServerClient 181 .from("bsky_posts") 182 .select("*") 183 .in("uri", bskyPostUris); 184 185 const { data: documents } = await supabaseServerClient 186 .from("documents") 187 .select("*, documents_in_publications(publications(*))") 188 .in("uri", documentUris); 189 190 return quoteNotifications 191 .map((notification) => { 192 const bskyPost = bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri); 193 const document = documents?.find((d) => d.uri === notification.data.document_uri); 194 if (!bskyPost || !document) return null; 195 return { 196 id: notification.id, 197 recipient: notification.recipient, 198 created_at: notification.created_at, 199 type: "quote" as const, 200 bsky_post_uri: notification.data.bsky_post_uri, 201 document_uri: notification.data.document_uri, 202 bskyPost, 203 document, 204 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 205 normalizedPublication: normalizePublicationRecord( 206 document.documents_in_publications[0]?.publications?.record, 207 ), 208 }; 209 }) 210 .filter((n) => n !== null); 211} 212 213export type HydratedBskyPostEmbedNotification = Awaited< 214 ReturnType<typeof hydrateBskyPostEmbedNotifications> 215>[0]; 216 217async function hydrateBskyPostEmbedNotifications(notifications: NotificationRow[]) { 218 const bskyPostEmbedNotifications = notifications.filter( 219 (n): n is NotificationRow & { data: ExtractNotificationType<"bsky_post_embed"> } => 220 (n.data as NotificationData)?.type === "bsky_post_embed", 221 ); 222 223 if (bskyPostEmbedNotifications.length === 0) { 224 return []; 225 } 226 227 // Fetch document data (the leaflet that embedded the post) 228 const documentUris = bskyPostEmbedNotifications.map((n) => n.data.document_uri); 229 const bskyPostUris = bskyPostEmbedNotifications.map((n) => n.data.bsky_post_uri); 230 231 const [{ data: documents }, { data: cachedBskyPosts }] = await Promise.all([ 232 supabaseServerClient 233 .from("documents") 234 .select("*, documents_in_publications(publications(*))") 235 .in("uri", documentUris), 236 supabaseServerClient 237 .from("bsky_posts") 238 .select("*") 239 .in("uri", bskyPostUris), 240 ]); 241 242 // Find which posts we need to fetch from the API 243 const cachedPostUris = new Set(cachedBskyPosts?.map((p) => p.uri) ?? []); 244 const missingPostUris = bskyPostUris.filter((uri) => !cachedPostUris.has(uri)); 245 246 // Fetch missing posts from Bluesky API 247 const fetchedPosts = new Map<string, { text: string } | null>(); 248 if (missingPostUris.length > 0) { 249 try { 250 const { AtpAgent } = await import("@atproto/api"); 251 const agent = new AtpAgent({ service: "https://public.api.bsky.app" }); 252 const response = await agent.app.bsky.feed.getPosts({ uris: missingPostUris }); 253 for (const post of response.data.posts) { 254 const record = post.record as { text?: string }; 255 fetchedPosts.set(post.uri, { text: record.text ?? "" }); 256 } 257 } catch (error) { 258 console.error("Failed to fetch Bluesky posts:", error); 259 } 260 } 261 262 // Extract unique DIDs from document URIs to resolve handles 263 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))]; 264 265 // Resolve DIDs to handles in parallel 266 const didToHandleMap = new Map<string, string | null>(); 267 await Promise.all( 268 documentCreatorDids.map(async (did) => { 269 try { 270 const resolved = await idResolver.did.resolve(did); 271 const handle = resolved?.alsoKnownAs?.[0] 272 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 273 : null; 274 didToHandleMap.set(did, handle); 275 } catch (error) { 276 console.error(`Failed to resolve DID ${did}:`, error); 277 didToHandleMap.set(did, null); 278 } 279 }), 280 ); 281 282 return bskyPostEmbedNotifications 283 .map((notification) => { 284 const document = documents?.find((d) => d.uri === notification.data.document_uri); 285 if (!document) return null; 286 287 const documentCreatorDid = new AtUri(notification.data.document_uri).host; 288 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null; 289 290 // Get post text from cache or fetched data 291 const cachedPost = cachedBskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri); 292 const postView = cachedPost?.post_view as { record?: { text?: string } } | undefined; 293 const bskyPostText = postView?.record?.text ?? fetchedPosts.get(notification.data.bsky_post_uri)?.text ?? null; 294 295 return { 296 id: notification.id, 297 recipient: notification.recipient, 298 created_at: notification.created_at, 299 type: "bsky_post_embed" as const, 300 document_uri: notification.data.document_uri, 301 bsky_post_uri: notification.data.bsky_post_uri, 302 document, 303 documentCreatorHandle, 304 bskyPostText, 305 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 306 normalizedPublication: normalizePublicationRecord( 307 document.documents_in_publications[0]?.publications?.record, 308 ), 309 }; 310 }) 311 .filter((n) => n !== null); 312} 313 314export type HydratedMentionNotification = Awaited< 315 ReturnType<typeof hydrateMentionNotifications> 316>[0]; 317 318async function hydrateMentionNotifications(notifications: NotificationRow[]) { 319 const mentionNotifications = notifications.filter( 320 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } => 321 (n.data as NotificationData)?.type === "mention", 322 ); 323 324 if (mentionNotifications.length === 0) { 325 return []; 326 } 327 328 // Fetch document data from the database 329 const documentUris = mentionNotifications.map((n) => n.data.document_uri); 330 const { data: documents } = await supabaseServerClient 331 .from("documents") 332 .select("*, documents_in_publications(publications(*))") 333 .in("uri", documentUris); 334 335 // Extract unique DIDs from document URIs to resolve handles 336 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))]; 337 338 // Resolve DIDs to handles in parallel 339 const didToHandleMap = new Map<string, string | null>(); 340 await Promise.all( 341 documentCreatorDids.map(async (did) => { 342 try { 343 const resolved = await idResolver.did.resolve(did); 344 const handle = resolved?.alsoKnownAs?.[0] 345 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 346 : null; 347 didToHandleMap.set(did, handle); 348 } catch (error) { 349 console.error(`Failed to resolve DID ${did}:`, error); 350 didToHandleMap.set(did, null); 351 } 352 }), 353 ); 354 355 // Fetch mentioned publications and documents 356 const mentionedPublicationUris = mentionNotifications 357 .filter((n) => n.data.mention_type === "publication") 358 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri); 359 360 const mentionedDocumentUris = mentionNotifications 361 .filter((n) => n.data.mention_type === "document") 362 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri); 363 364 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 365 mentionedPublicationUris.length > 0 366 ? supabaseServerClient 367 .from("publications") 368 .select("*") 369 .in("uri", mentionedPublicationUris) 370 : Promise.resolve({ data: [] }), 371 mentionedDocumentUris.length > 0 372 ? supabaseServerClient 373 .from("documents") 374 .select("*, documents_in_publications(publications(*))") 375 .in("uri", mentionedDocumentUris) 376 : Promise.resolve({ data: [] }), 377 ]); 378 379 return mentionNotifications 380 .map((notification) => { 381 const document = documents?.find((d) => d.uri === notification.data.document_uri); 382 if (!document) return null; 383 384 const mentionedUri = notification.data.mention_type !== "did" 385 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri 386 : undefined; 387 388 const documentCreatorDid = new AtUri(notification.data.document_uri).host; 389 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null; 390 391 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined; 392 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined; 393 394 return { 395 id: notification.id, 396 recipient: notification.recipient, 397 created_at: notification.created_at, 398 type: "mention" as const, 399 document_uri: notification.data.document_uri, 400 mention_type: notification.data.mention_type, 401 mentioned_uri: mentionedUri, 402 document, 403 documentCreatorHandle, 404 mentionedPublication, 405 mentionedDocument: mentionedDoc, 406 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 407 normalizedPublication: normalizePublicationRecord( 408 document.documents_in_publications[0]?.publications?.record, 409 ), 410 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record), 411 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri), 412 }; 413 }) 414 .filter((n) => n !== null); 415} 416 417export type HydratedCommentMentionNotification = Awaited< 418 ReturnType<typeof hydrateCommentMentionNotifications> 419>[0]; 420 421async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) { 422 const commentMentionNotifications = notifications.filter( 423 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } => 424 (n.data as NotificationData)?.type === "comment_mention", 425 ); 426 427 if (commentMentionNotifications.length === 0) { 428 return []; 429 } 430 431 // Fetch comment data from the database 432 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri); 433 const { data: comments } = await supabaseServerClient 434 .from("comments_on_documents") 435 .select( 436 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 437 ) 438 .in("uri", commentUris); 439 440 // Extract unique DIDs from comment URIs to resolve handles 441 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))]; 442 443 // Resolve DIDs to handles in parallel 444 const didToHandleMap = new Map<string, string | null>(); 445 await Promise.all( 446 commenterDids.map(async (did) => { 447 try { 448 const resolved = await idResolver.did.resolve(did); 449 const handle = resolved?.alsoKnownAs?.[0] 450 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 451 : null; 452 didToHandleMap.set(did, handle); 453 } catch (error) { 454 console.error(`Failed to resolve DID ${did}:`, error); 455 didToHandleMap.set(did, null); 456 } 457 }), 458 ); 459 460 // Fetch mentioned publications and documents 461 const mentionedPublicationUris = commentMentionNotifications 462 .filter((n) => n.data.mention_type === "publication") 463 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri); 464 465 const mentionedDocumentUris = commentMentionNotifications 466 .filter((n) => n.data.mention_type === "document") 467 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri); 468 469 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 470 mentionedPublicationUris.length > 0 471 ? supabaseServerClient 472 .from("publications") 473 .select("*") 474 .in("uri", mentionedPublicationUris) 475 : Promise.resolve({ data: [] }), 476 mentionedDocumentUris.length > 0 477 ? supabaseServerClient 478 .from("documents") 479 .select("*, documents_in_publications(publications(*))") 480 .in("uri", mentionedDocumentUris) 481 : Promise.resolve({ data: [] }), 482 ]); 483 484 return commentMentionNotifications 485 .map((notification) => { 486 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 487 if (!commentData) return null; 488 489 const mentionedUri = notification.data.mention_type !== "did" 490 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri 491 : undefined; 492 493 const commenterDid = new AtUri(notification.data.comment_uri).host; 494 const commenterHandle = didToHandleMap.get(commenterDid) ?? null; 495 496 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined; 497 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined; 498 499 return { 500 id: notification.id, 501 recipient: notification.recipient, 502 created_at: notification.created_at, 503 type: "comment_mention" as const, 504 comment_uri: notification.data.comment_uri, 505 mention_type: notification.data.mention_type, 506 mentioned_uri: mentionedUri, 507 commentData, 508 commenterHandle, 509 mentionedPublication, 510 mentionedDocument: mentionedDoc, 511 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri), 512 normalizedPublication: normalizePublicationRecord( 513 commentData.documents?.documents_in_publications[0]?.publications?.record, 514 ), 515 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record), 516 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri), 517 }; 518 }) 519 .filter((n) => n !== null); 520} 521 522export async function pingIdentityToUpdateNotification(did: string) { 523 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`); 524 await channel.send({ 525 type: "broadcast", 526 event: "notification", 527 payload: { message: "poke" }, 528 }); 529 await supabaseServerClient.removeChannel(channel); 530}