a tool for shared writing and social publishing
at main 393 lines 15 kB view raw
1"use server"; 2 3import { supabaseServerClient } from "supabase/serverClient"; 4import { Tables, TablesInsert } from "supabase/database.types"; 5import { AtUri } from "@atproto/syntax"; 6import { idResolver } from "app/(home-pages)/reader/idResolver"; 7 8type NotificationRow = Tables<"notifications">; 9 10export type Notification = Omit<TablesInsert<"notifications">, "data"> & { 11 data: NotificationData; 12}; 13 14export type NotificationData = 15 | { type: "comment"; comment_uri: string; parent_uri?: string } 16 | { type: "subscribe"; subscription_uri: string } 17 | { type: "quote"; bsky_post_uri: string; document_uri: string } 18 | { type: "mention"; document_uri: string; mention_type: "did" } 19 | { type: "mention"; document_uri: string; mention_type: "publication"; mentioned_uri: string } 20 | { type: "mention"; document_uri: string; mention_type: "document"; mentioned_uri: string } 21 | { type: "comment_mention"; comment_uri: string; mention_type: "did" } 22 | { type: "comment_mention"; comment_uri: string; mention_type: "publication"; mentioned_uri: string } 23 | { type: "comment_mention"; comment_uri: string; mention_type: "document"; mentioned_uri: string }; 24 25export type HydratedNotification = 26 | HydratedCommentNotification 27 | HydratedSubscribeNotification 28 | HydratedQuoteNotification 29 | HydratedMentionNotification 30 | HydratedCommentMentionNotification; 31export async function hydrateNotifications( 32 notifications: NotificationRow[], 33): Promise<Array<HydratedNotification>> { 34 // Call all hydrators in parallel 35 const [commentNotifications, subscribeNotifications, quoteNotifications, mentionNotifications, commentMentionNotifications] = await Promise.all([ 36 hydrateCommentNotifications(notifications), 37 hydrateSubscribeNotifications(notifications), 38 hydrateQuoteNotifications(notifications), 39 hydrateMentionNotifications(notifications), 40 hydrateCommentMentionNotifications(notifications), 41 ]); 42 43 // Combine all hydrated notifications 44 const allHydrated = [...commentNotifications, ...subscribeNotifications, ...quoteNotifications, ...mentionNotifications, ...commentMentionNotifications]; 45 46 // Sort by created_at to maintain order 47 allHydrated.sort( 48 (a, b) => 49 new Date(b.created_at).getTime() - new Date(a.created_at).getTime(), 50 ); 51 52 return allHydrated; 53} 54 55// Type guard to extract notification type 56type ExtractNotificationType<T extends NotificationData["type"]> = Extract< 57 NotificationData, 58 { type: T } 59>; 60 61export type HydratedCommentNotification = Awaited< 62 ReturnType<typeof hydrateCommentNotifications> 63>[0]; 64 65async function hydrateCommentNotifications(notifications: NotificationRow[]) { 66 const commentNotifications = notifications.filter( 67 (n): n is NotificationRow & { data: ExtractNotificationType<"comment"> } => 68 (n.data as NotificationData)?.type === "comment", 69 ); 70 71 if (commentNotifications.length === 0) { 72 return []; 73 } 74 75 // Fetch comment data from the database 76 const commentUris = commentNotifications.flatMap((n) => 77 n.data.parent_uri 78 ? [n.data.comment_uri, n.data.parent_uri] 79 : [n.data.comment_uri], 80 ); 81 const { data: comments } = await supabaseServerClient 82 .from("comments_on_documents") 83 .select( 84 "*,bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 85 ) 86 .in("uri", commentUris); 87 88 return commentNotifications 89 .map((notification) => { 90 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 91 if (!commentData) return null; 92 return { 93 id: notification.id, 94 recipient: notification.recipient, 95 created_at: notification.created_at, 96 type: "comment" as const, 97 comment_uri: notification.data.comment_uri, 98 parentData: notification.data.parent_uri 99 ? comments?.find((c) => c.uri === notification.data.parent_uri) 100 : undefined, 101 commentData, 102 }; 103 }) 104 .filter((n) => n !== null); 105} 106 107export type HydratedSubscribeNotification = Awaited< 108 ReturnType<typeof hydrateSubscribeNotifications> 109>[0]; 110 111async function hydrateSubscribeNotifications(notifications: NotificationRow[]) { 112 const subscribeNotifications = notifications.filter( 113 ( 114 n, 115 ): n is NotificationRow & { data: ExtractNotificationType<"subscribe"> } => 116 (n.data as NotificationData)?.type === "subscribe", 117 ); 118 119 if (subscribeNotifications.length === 0) { 120 return []; 121 } 122 123 // Fetch subscription data from the database with related data 124 const subscriptionUris = subscribeNotifications.map( 125 (n) => n.data.subscription_uri, 126 ); 127 const { data: subscriptions } = await supabaseServerClient 128 .from("publication_subscriptions") 129 .select("*, identities(bsky_profiles(*)), publications(*)") 130 .in("uri", subscriptionUris); 131 132 return subscribeNotifications 133 .map((notification) => { 134 const subscriptionData = subscriptions?.find((s) => s.uri === notification.data.subscription_uri); 135 if (!subscriptionData) return null; 136 return { 137 id: notification.id, 138 recipient: notification.recipient, 139 created_at: notification.created_at, 140 type: "subscribe" as const, 141 subscription_uri: notification.data.subscription_uri, 142 subscriptionData, 143 }; 144 }) 145 .filter((n) => n !== null); 146} 147 148export type HydratedQuoteNotification = Awaited< 149 ReturnType<typeof hydrateQuoteNotifications> 150>[0]; 151 152async function hydrateQuoteNotifications(notifications: NotificationRow[]) { 153 const quoteNotifications = notifications.filter( 154 (n): n is NotificationRow & { data: ExtractNotificationType<"quote"> } => 155 (n.data as NotificationData)?.type === "quote", 156 ); 157 158 if (quoteNotifications.length === 0) { 159 return []; 160 } 161 162 // Fetch bsky post data and document data 163 const bskyPostUris = quoteNotifications.map((n) => n.data.bsky_post_uri); 164 const documentUris = quoteNotifications.map((n) => n.data.document_uri); 165 166 const { data: bskyPosts } = await supabaseServerClient 167 .from("bsky_posts") 168 .select("*") 169 .in("uri", bskyPostUris); 170 171 const { data: documents } = await supabaseServerClient 172 .from("documents") 173 .select("*, documents_in_publications(publications(*))") 174 .in("uri", documentUris); 175 176 return quoteNotifications 177 .map((notification) => { 178 const bskyPost = bskyPosts?.find((p) => p.uri === notification.data.bsky_post_uri); 179 const document = documents?.find((d) => d.uri === notification.data.document_uri); 180 if (!bskyPost || !document) return null; 181 return { 182 id: notification.id, 183 recipient: notification.recipient, 184 created_at: notification.created_at, 185 type: "quote" as const, 186 bsky_post_uri: notification.data.bsky_post_uri, 187 document_uri: notification.data.document_uri, 188 bskyPost, 189 document, 190 }; 191 }) 192 .filter((n) => n !== null); 193} 194 195export type HydratedMentionNotification = Awaited< 196 ReturnType<typeof hydrateMentionNotifications> 197>[0]; 198 199async function hydrateMentionNotifications(notifications: NotificationRow[]) { 200 const mentionNotifications = notifications.filter( 201 (n): n is NotificationRow & { data: ExtractNotificationType<"mention"> } => 202 (n.data as NotificationData)?.type === "mention", 203 ); 204 205 if (mentionNotifications.length === 0) { 206 return []; 207 } 208 209 // Fetch document data from the database 210 const documentUris = mentionNotifications.map((n) => n.data.document_uri); 211 const { data: documents } = await supabaseServerClient 212 .from("documents") 213 .select("*, documents_in_publications(publications(*))") 214 .in("uri", documentUris); 215 216 // Extract unique DIDs from document URIs to resolve handles 217 const documentCreatorDids = [...new Set(documentUris.map((uri) => new AtUri(uri).host))]; 218 219 // Resolve DIDs to handles in parallel 220 const didToHandleMap = new Map<string, string | null>(); 221 await Promise.all( 222 documentCreatorDids.map(async (did) => { 223 try { 224 const resolved = await idResolver.did.resolve(did); 225 const handle = resolved?.alsoKnownAs?.[0] 226 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 227 : null; 228 didToHandleMap.set(did, handle); 229 } catch (error) { 230 console.error(`Failed to resolve DID ${did}:`, error); 231 didToHandleMap.set(did, null); 232 } 233 }), 234 ); 235 236 // Fetch mentioned publications and documents 237 const mentionedPublicationUris = mentionNotifications 238 .filter((n) => n.data.mention_type === "publication") 239 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "publication" }>).mentioned_uri); 240 241 const mentionedDocumentUris = mentionNotifications 242 .filter((n) => n.data.mention_type === "document") 243 .map((n) => (n.data as Extract<ExtractNotificationType<"mention">, { mention_type: "document" }>).mentioned_uri); 244 245 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 246 mentionedPublicationUris.length > 0 247 ? supabaseServerClient 248 .from("publications") 249 .select("*") 250 .in("uri", mentionedPublicationUris) 251 : Promise.resolve({ data: [] }), 252 mentionedDocumentUris.length > 0 253 ? supabaseServerClient 254 .from("documents") 255 .select("*, documents_in_publications(publications(*))") 256 .in("uri", mentionedDocumentUris) 257 : Promise.resolve({ data: [] }), 258 ]); 259 260 return mentionNotifications 261 .map((notification) => { 262 const document = documents?.find((d) => d.uri === notification.data.document_uri); 263 if (!document) return null; 264 265 const mentionedUri = notification.data.mention_type !== "did" 266 ? (notification.data as Extract<ExtractNotificationType<"mention">, { mentioned_uri: string }>).mentioned_uri 267 : undefined; 268 269 const documentCreatorDid = new AtUri(notification.data.document_uri).host; 270 const documentCreatorHandle = didToHandleMap.get(documentCreatorDid) ?? null; 271 272 return { 273 id: notification.id, 274 recipient: notification.recipient, 275 created_at: notification.created_at, 276 type: "mention" as const, 277 document_uri: notification.data.document_uri, 278 mention_type: notification.data.mention_type, 279 mentioned_uri: mentionedUri, 280 document, 281 documentCreatorHandle, 282 mentionedPublication: mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined, 283 mentionedDocument: mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined, 284 }; 285 }) 286 .filter((n) => n !== null); 287} 288 289export type HydratedCommentMentionNotification = Awaited< 290 ReturnType<typeof hydrateCommentMentionNotifications> 291>[0]; 292 293async function hydrateCommentMentionNotifications(notifications: NotificationRow[]) { 294 const commentMentionNotifications = notifications.filter( 295 (n): n is NotificationRow & { data: ExtractNotificationType<"comment_mention"> } => 296 (n.data as NotificationData)?.type === "comment_mention", 297 ); 298 299 if (commentMentionNotifications.length === 0) { 300 return []; 301 } 302 303 // Fetch comment data from the database 304 const commentUris = commentMentionNotifications.map((n) => n.data.comment_uri); 305 const { data: comments } = await supabaseServerClient 306 .from("comments_on_documents") 307 .select( 308 "*, bsky_profiles(*), documents(*, documents_in_publications(publications(*)))", 309 ) 310 .in("uri", commentUris); 311 312 // Extract unique DIDs from comment URIs to resolve handles 313 const commenterDids = [...new Set(commentUris.map((uri) => new AtUri(uri).host))]; 314 315 // Resolve DIDs to handles in parallel 316 const didToHandleMap = new Map<string, string | null>(); 317 await Promise.all( 318 commenterDids.map(async (did) => { 319 try { 320 const resolved = await idResolver.did.resolve(did); 321 const handle = resolved?.alsoKnownAs?.[0] 322 ? resolved.alsoKnownAs[0].slice(5) // Remove "at://" prefix 323 : null; 324 didToHandleMap.set(did, handle); 325 } catch (error) { 326 console.error(`Failed to resolve DID ${did}:`, error); 327 didToHandleMap.set(did, null); 328 } 329 }), 330 ); 331 332 // Fetch mentioned publications and documents 333 const mentionedPublicationUris = commentMentionNotifications 334 .filter((n) => n.data.mention_type === "publication") 335 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "publication" }>).mentioned_uri); 336 337 const mentionedDocumentUris = commentMentionNotifications 338 .filter((n) => n.data.mention_type === "document") 339 .map((n) => (n.data as Extract<ExtractNotificationType<"comment_mention">, { mention_type: "document" }>).mentioned_uri); 340 341 const [{ data: mentionedPublications }, { data: mentionedDocuments }] = await Promise.all([ 342 mentionedPublicationUris.length > 0 343 ? supabaseServerClient 344 .from("publications") 345 .select("*") 346 .in("uri", mentionedPublicationUris) 347 : Promise.resolve({ data: [] }), 348 mentionedDocumentUris.length > 0 349 ? supabaseServerClient 350 .from("documents") 351 .select("*, documents_in_publications(publications(*))") 352 .in("uri", mentionedDocumentUris) 353 : Promise.resolve({ data: [] }), 354 ]); 355 356 return commentMentionNotifications 357 .map((notification) => { 358 const commentData = comments?.find((c) => c.uri === notification.data.comment_uri); 359 if (!commentData) return null; 360 361 const mentionedUri = notification.data.mention_type !== "did" 362 ? (notification.data as Extract<ExtractNotificationType<"comment_mention">, { mentioned_uri: string }>).mentioned_uri 363 : undefined; 364 365 const commenterDid = new AtUri(notification.data.comment_uri).host; 366 const commenterHandle = didToHandleMap.get(commenterDid) ?? null; 367 368 return { 369 id: notification.id, 370 recipient: notification.recipient, 371 created_at: notification.created_at, 372 type: "comment_mention" as const, 373 comment_uri: notification.data.comment_uri, 374 mention_type: notification.data.mention_type, 375 mentioned_uri: mentionedUri, 376 commentData, 377 commenterHandle, 378 mentionedPublication: mentionedUri ? mentionedPublications?.find((p) => p.uri === mentionedUri) : undefined, 379 mentionedDocument: mentionedUri ? mentionedDocuments?.find((d) => d.uri === mentionedUri) : undefined, 380 }; 381 }) 382 .filter((n) => n !== null); 383} 384 385export async function pingIdentityToUpdateNotification(did: string) { 386 let channel = supabaseServerClient.channel(`identity.atp_did:${did}`); 387 await channel.send({ 388 type: "broadcast", 389 event: "notification", 390 payload: { message: "poke" }, 391 }); 392 await supabaseServerClient.removeChannel(channel); 393}