a tool for shared writing and social publishing
298
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 8f68be5863d8e7c1944cc5ce848a9a9fc6a032aa 530 lines 21 kB view raw
1"use server"; 2 3import { supabaseServerClient } from "supabase/serverClient"; 4import { Tables, TablesInsert } from "supabase/database.types"; 5import { AtUri } from "@atproto/syntax"; 6import { idResolver } from "app/(home-pages)/reader/idResolver"; 7import { 8 normalizeDocumentRecord, 9 normalizePublicationRecord, 10 type NormalizedDocument, 11 type NormalizedPublication, 12} from "src/utils/normalizeRecords"; 13 14type NotificationRow = Tables<"notifications">; 15 16export type Notification = Omit<TablesInsert<"notifications">, "data"> & { 17 data: NotificationData; 18}; 19 20export type NotificationData = 21 | { type: "comment"; comment_uri: string; parent_uri?: string } 22 | { type: "subscribe"; subscription_uri: string } 23 | { type: "quote"; bsky_post_uri: string; document_uri: string } 24 | { type: "bsky_post_embed"; document_uri: string; bsky_post_uri: string } 25 | { type: "mention"; document_uri: string; mention_type: "did" } 26 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string } 27 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string } 28 | { type: "comment_mention"; comment_uri: string; mention_type: "did" } 29 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string } 30 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string }; 31 32export type HydratedNotification = 33 | HydratedCommentNotification 34 | HydratedSubscribeNotification 35 | HydratedQuoteNotification 36 | HydratedBskyPostEmbedNotification 37 | HydratedMentionNotification 38 | HydratedCommentMentionNotification; 39export async function hydrateNotifications( 40 notifications: NotificationRow[], 41): Promise<Array<HydratedNotification>> { 42 // Call all hydrators in parallel 43 const [commentNotifications, subscribeNotifications, quoteNotifications, bskyPostEmbedNotifications, mentionNotifications, commentMentionNotifications] = await Promise.all([ 44 hydrateCommentNotifications(notifications), 45 hydrateSubscribeNotifications(notifications), 46 hydrateQuoteNotifications(notifications), 47 hydrateBskyPostEmbedNotifications(notifications), 48 hydrateMentionNotifications(notifications), 49 hydrateCommentMentionNotifications(notifications), 50 ]); 51 52 // Combine all hydrated notifications 53 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...bskyPostEmbedNotifications, ...mentionNotifications, ...commentMentionNotifications]; 54 55 // Sort by created_at to maintain order 56 allHydrated.sort( 57 (a, b) => 58 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(), 59 ); 60 61 return allHydrated; 62} 63 64// Type guard to extract notification type 65type ExtractNotificationType<T extends NotificationData["type"]> = Extract< 66 NotificationData, 67 { type: T } 68>; 69 70export type HydratedCommentNotification = Awaited< 71 ReturnType<typeof hydrateCommentNotifications> 72>[0]; 73 74async function hydrateCommentNotifications(notifications: NotificationRow[]) { 75 const commentNotifications = notifications.filter( 76 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } => 77 (n.data as NotificationData)?.type === "comment", 78 ); 79 80 if (commentNotifications.length === 0) { 81 return []; 82 } 83 84 // Fetch comment data from the database 85 const commentUris = commentNotifications.flatMap((n) => 86 n.data.parent_uri 87 ? [n.data.comment_uri, n.data.parent_uri] 88 : [n.data.comment_uri], 89 ); 90 const { data: comments } = await supabaseServerClient 91 .from("comments_on_documents") 92 .select( 93 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 94 ) 95 .in("uri", commentUris); 96 97 return commentNotifications 98 .map((notification) => { 99 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 100 if (!commentData) return null; 101 return { 102 id: notification.id, 103 recipient: notification.recipient, 104 created_at: notification.created_at, 105 type: "comment" as const, 106 comment_uri: notification.data.comment_uri, 107 parentData: notification.data.parent_uri 108 ? comments?.find((c) => c.uri === notification.data.parent_uri) 109 : undefined, 110 commentData, 111 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri), 112 normalizedPublication: normalizePublicationRecord( 113 commentData.documents?.documents_in_publications[0]?.publications?.record, 114 ), 115 }; 116 }) 117 .filter((n) => n !== null); 118} 119 120export type HydratedSubscribeNotification = Awaited< 121 ReturnType<typeof hydrateSubscribeNotifications> 122>[0]; 123 124async function hydrateSubscribeNotifications(notifications: NotificationRow[]) { 125 const subscribeNotifications = notifications.filter( 126 ( 127 n, 128 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } => 129 (n.data as NotificationData)?.type === "subscribe", 130 ); 131 132 if (subscribeNotifications.length === 0) { 133 return []; 134 } 135 136 // Fetch subscription data from the database with related data 137 const subscriptionUris = subscribeNotifications.map( 138 (n) => n.data.subscription_uri, 139 ); 140 const { data: subscriptions } = await supabaseServerClient 141 .from("publication_subscriptions") 142 .select("*, identities(bsky_profiles(*)), publications(*)") 143 .in("uri", subscriptionUris); 144 145 return subscribeNotifications 146 .map((notification) => { 147 const subscriptionData = subscriptions?.find((s) => s.uri === notification.data.subscription_uri); 148 if (!subscriptionData) return null; 149 return { 150 id: notification.id, 151 recipient: notification.recipient, 152 created_at: notification.created_at, 153 type: "subscribe" as const, 154 subscription_uri: notification.data.subscription_uri, 155 subscriptionData, 156 normalizedPublication: normalizePublicationRecord(subscriptionData.publications?.record), 157 }; 158 }) 159 .filter((n) => n !== null); 160} 161 162export type HydratedQuoteNotification = Awaited< 163 ReturnType<typeof hydrateQuoteNotifications> 164>[0]; 165 166async function hydrateQuoteNotifications(notifications: NotificationRow[]) { 167 const quoteNotifications = notifications.filter( 168 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } => 169 (n.data as NotificationData)?.type === "quote", 170 ); 171 172 if (quoteNotifications.length === 0) { 173 return []; 174 } 175 176 // Fetch bsky post data and document data 177 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri); 178 const documentUris = quoteNotifications.map((n) => n.data.document_uri); 179 180 const { data: bskyPosts } = await supabaseServerClient 181 .from("bsky_posts") 182 .select("*") 183 .in("uri", bskyPostUris); 184 185 const { data: documents } = await supabaseServerClient 186 .from("documents") 187 .select("*, documents_in_publications(publications(*))") 188 .in("uri", documentUris); 189 190 return quoteNotifications 191 .map((notification) => { 192 const bskyPost = bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri); 193 const document = documents?.find((d) => d.uri === notification.data.document_uri); 194 if (!bskyPost || !document) return null; 195 return { 196 id: notification.id, 197 recipient: notification.recipient, 198 created_at: notification.created_at, 199 type: "quote" as const, 200 bsky_post_uri: notification.data.bsky_post_uri, 201 document_uri: notification.data.document_uri, 202 bskyPost, 203 document, 204 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 205 normalizedPublication: normalizePublicationRecord( 206 document.documents_in_publications[0]?.publications?.record, 207 ), 208 }; 209 }) 210 .filter((n) => n !== null); 211} 212 213export type HydratedBskyPostEmbedNotification = Awaited< 214 ReturnType<typeof hydrateBskyPostEmbedNotifications> 215>[0]; 216 217async function hydrateBskyPostEmbedNotifications(notifications: NotificationRow[]) { 218 const bskyPostEmbedNotifications = notifications.filter( 219 (n): n is NotificationRow & { data: ExtractNotificationType<"bsky_post_embed"> } => 220 (n.data as NotificationData)?.type === "bsky_post_embed", 221 ); 222 223 if (bskyPostEmbedNotifications.length === 0) { 224 return []; 225 } 226 227 // Fetch document data (the leaflet that embedded the post) 228 const documentUris = bskyPostEmbedNotifications.map((n) => n.data.document_uri); 229 const bskyPostUris = bskyPostEmbedNotifications.map((n) => n.data.bsky_post_uri); 230 231 const [{ data: documents }, { data: cachedBskyPosts }] = await Promise.all([ 232 supabaseServerClient 233 .from("documents") 234 .select("*, documents_in_publications(publications(*))") 235 .in("uri", documentUris), 236 supabaseServerClient 237 .from("bsky_posts") 238 .select("*") 239 .in("uri", bskyPostUris), 240 ]); 241 242 // Find which posts we need to fetch from the API 243 const cachedPostUris = new Set(cachedBskyPosts?.map((p) => p.uri) ?? []); 244 const missingPostUris = bskyPostUris.filter((uri) => !cachedPostUris.has(uri)); 245 246 // Fetch missing posts from Bluesky API 247 const fetchedPosts = new Map<string, { text: string } | null>(); 248 if (missingPostUris.length > 0) { 249 try { 250 const { AtpAgent } = await import("@atproto/api"); 251 const agent = new AtpAgent({ service: "https://public.api.bsky.app" }); 252 const response = await agent.app.bsky.feed.getPosts({ uris: missingPostUris }); 253 for (const post of response.data.posts) { 254 const record = post.record as { text?: string }; 255 fetchedPosts.set(post.uri, { text: record.text ?? "" }); 256 } 257 } catch (error) { 258 console.error("Failed to fetch Bluesky posts:", error); 259 } 260 } 261 262 // Extract unique DIDs from document URIs to resolve handles 263 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))]; 264 265 // Resolve DIDs to handles in parallel 266 const didToHandleMap = new Map<string, string | null>(); 267 await Promise.all( 268 documentCreatorDids.map(async (did) => { 269 try { 270 const resolved = await idResolver.did.resolve(did); 271 const handle = resolved?.alsoKnownAs?.[0] 272 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 273 : null; 274 didToHandleMap.set(did, handle); 275 } catch (error) { 276 console.error(`Failed to resolve DID ${did}:`, error); 277 didToHandleMap.set(did, null); 278 } 279 }), 280 ); 281 282 return bskyPostEmbedNotifications 283 .map((notification) => { 284 const document = documents?.find((d) => d.uri === notification.data.document_uri); 285 if (!document) return null; 286 287 const documentCreatorDid = new AtUri(notification.data.document_uri).host; 288 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null; 289 290 // Get post text from cache or fetched data 291 const cachedPost = cachedBskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri); 292 const postView = cachedPost?.post_view as { record?: { text?: string } } | undefined; 293 const bskyPostText = postView?.record?.text ?? fetchedPosts.get(notification.data.bsky_post_uri)?.text ?? null; 294 295 return { 296 id: notification.id, 297 recipient: notification.recipient, 298 created_at: notification.created_at, 299 type: "bsky_post_embed" as const, 300 document_uri: notification.data.document_uri, 301 bsky_post_uri: notification.data.bsky_post_uri, 302 document, 303 documentCreatorHandle, 304 bskyPostText, 305 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 306 normalizedPublication: normalizePublicationRecord( 307 document.documents_in_publications[0]?.publications?.record, 308 ), 309 }; 310 }) 311 .filter((n) => n !== null); 312} 313 314export type HydratedMentionNotification = Awaited< 315 ReturnType<typeof hydrateMentionNotifications> 316>[0]; 317 318async function hydrateMentionNotifications(notifications: NotificationRow[]) { 319 const mentionNotifications = notifications.filter( 320 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } => 321 (n.data as NotificationData)?.type === "mention", 322 ); 323 324 if (mentionNotifications.length === 0) { 325 return []; 326 } 327 328 // Fetch document data from the database 329 const documentUris = mentionNotifications.map((n) => n.data.document_uri); 330 const { data: documents } = await supabaseServerClient 331 .from("documents") 332 .select("*, documents_in_publications(publications(*))") 333 .in("uri", documentUris); 334 335 // Extract unique DIDs from document URIs to resolve handles 336 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))]; 337 338 // Resolve DIDs to handles in parallel 339 const didToHandleMap = new Map<string, string | null>(); 340 await Promise.all( 341 documentCreatorDids.map(async (did) => { 342 try { 343 const resolved = await idResolver.did.resolve(did); 344 const handle = resolved?.alsoKnownAs?.[0] 345 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 346 : null; 347 didToHandleMap.set(did, handle); 348 } catch (error) { 349 console.error(`Failed to resolve DID ${did}:`, error); 350 didToHandleMap.set(did, null); 351 } 352 }), 353 ); 354 355 // Fetch mentioned publications and documents 356 const mentionedPublicationUris = mentionNotifications 357 .filter((n) => n.data.mention_type === "publication") 358 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri); 359 360 const mentionedDocumentUris = mentionNotifications 361 .filter((n) => n.data.mention_type === "document") 362 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri); 363 364 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 365 mentionedPublicationUris.length > 0 366 ? supabaseServerClient 367 .from("publications") 368 .select("*") 369 .in("uri", mentionedPublicationUris) 370 : Promise.resolve({ data: [] }), 371 mentionedDocumentUris.length > 0 372 ? supabaseServerClient 373 .from("documents") 374 .select("*, documents_in_publications(publications(*))") 375 .in("uri", mentionedDocumentUris) 376 : Promise.resolve({ data: [] }), 377 ]); 378 379 return mentionNotifications 380 .map((notification) => { 381 const document = documents?.find((d) => d.uri === notification.data.document_uri); 382 if (!document) return null; 383 384 const mentionedUri = notification.data.mention_type !== "did" 385 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri 386 : undefined; 387 388 const documentCreatorDid = new AtUri(notification.data.document_uri).host; 389 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null; 390 391 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined; 392 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined; 393 394 return { 395 id: notification.id, 396 recipient: notification.recipient, 397 created_at: notification.created_at, 398 type: "mention" as const, 399 document_uri: notification.data.document_uri, 400 mention_type: notification.data.mention_type, 401 mentioned_uri: mentionedUri, 402 document, 403 documentCreatorHandle, 404 mentionedPublication, 405 mentionedDocument: mentionedDoc, 406 normalizedDocument: normalizeDocumentRecord(document.data, document.uri), 407 normalizedPublication: normalizePublicationRecord( 408 document.documents_in_publications[0]?.publications?.record, 409 ), 410 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record), 411 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri), 412 }; 413 }) 414 .filter((n) => n !== null); 415} 416 417export type HydratedCommentMentionNotification = Awaited< 418 ReturnType<typeof hydrateCommentMentionNotifications> 419>[0]; 420 421async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) { 422 const commentMentionNotifications = notifications.filter( 423 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } => 424 (n.data as NotificationData)?.type === "comment_mention", 425 ); 426 427 if (commentMentionNotifications.length === 0) { 428 return []; 429 } 430 431 // Fetch comment data from the database 432 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri); 433 const { data: comments } = await supabaseServerClient 434 .from("comments_on_documents") 435 .select( 436 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 437 ) 438 .in("uri", commentUris); 439 440 // Extract unique DIDs from comment URIs to resolve handles 441 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))]; 442 443 // Resolve DIDs to handles in parallel 444 const didToHandleMap = new Map<string, string | null>(); 445 await Promise.all( 446 commenterDids.map(async (did) => { 447 try { 448 const resolved = await idResolver.did.resolve(did); 449 const handle = resolved?.alsoKnownAs?.[0] 450 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 451 : null; 452 didToHandleMap.set(did, handle); 453 } catch (error) { 454 console.error(`Failed to resolve DID ${did}:`, error); 455 didToHandleMap.set(did, null); 456 } 457 }), 458 ); 459 460 // Fetch mentioned publications and documents 461 const mentionedPublicationUris = commentMentionNotifications 462 .filter((n) => n.data.mention_type === "publication") 463 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri); 464 465 const mentionedDocumentUris = commentMentionNotifications 466 .filter((n) => n.data.mention_type === "document") 467 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri); 468 469 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 470 mentionedPublicationUris.length > 0 471 ? supabaseServerClient 472 .from("publications") 473 .select("*") 474 .in("uri", mentionedPublicationUris) 475 : Promise.resolve({ data: [] }), 476 mentionedDocumentUris.length > 0 477 ? supabaseServerClient 478 .from("documents") 479 .select("*, documents_in_publications(publications(*))") 480 .in("uri", mentionedDocumentUris) 481 : Promise.resolve({ data: [] }), 482 ]); 483 484 return commentMentionNotifications 485 .map((notification) => { 486 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 487 if (!commentData) return null; 488 489 const mentionedUri = notification.data.mention_type !== "did" 490 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri 491 : undefined; 492 493 const commenterDid = new AtUri(notification.data.comment_uri).host; 494 const commenterHandle = didToHandleMap.get(commenterDid) ?? null; 495 496 const mentionedPublication = mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined; 497 const mentionedDoc = mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined; 498 499 return { 500 id: notification.id, 501 recipient: notification.recipient, 502 created_at: notification.created_at, 503 type: "comment_mention" as const, 504 comment_uri: notification.data.comment_uri, 505 mention_type: notification.data.mention_type, 506 mentioned_uri: mentionedUri, 507 commentData, 508 commenterHandle, 509 mentionedPublication, 510 mentionedDocument: mentionedDoc, 511 normalizedDocument: normalizeDocumentRecord(commentData.documents?.data, commentData.documents?.uri), 512 normalizedPublication: normalizePublicationRecord( 513 commentData.documents?.documents_in_publications[0]?.publications?.record, 514 ), 515 normalizedMentionedPublication: normalizePublicationRecord(mentionedPublication?.record), 516 normalizedMentionedDocument: normalizeDocumentRecord(mentionedDoc?.data, mentionedDoc?.uri), 517 }; 518 }) 519 .filter((n) => n !== null); 520} 521 522export async function pingIdentityToUpdateNotification(did: string) { 523 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`); 524 await channel.send({ 525 type: "broadcast", 526 event: "notification", 527 payload: { message: "poke" }, 528 }); 529 await supabaseServerClient.removeChannel(channel); 530}