replies timeline only, appview-less bluesky client
at main 20 kB view raw
1import { writable } from 'svelte/store'; 2import { 3 AtpClient, 4 setRecordCache, 5 type NotificationsStream, 6 type NotificationsStreamEvent 7} from './at/client.svelte'; 8import { SvelteMap, SvelteDate, SvelteSet } from 'svelte/reactivity'; 9import type { Did, Handle, Nsid, RecordKey, ResourceUri } from '@atcute/lexicons'; 10import { fetchPosts, hydratePosts, type HydrateOptions, type PostWithUri } from './at/fetch'; 11import { parseCanonicalResourceUri, type AtprotoDid } from '@atcute/lexicons/syntax'; 12import { 13 AppBskyActorProfile, 14 AppBskyFeedPost, 15 AppBskyGraphBlock, 16 type AppBskyGraphFollow 17} from '@atcute/bluesky'; 18import type { JetstreamSubscription, JetstreamEvent } from '@atcute/jetstream'; 19import { expect, ok } from './result'; 20import type { Backlink, BacklinksSource } from './at/constellation'; 21import { now as tidNow } from '@atcute/tid'; 22import type { Records } from '@atcute/lexicons/ambient'; 23import { 24 blockSource, 25 extractDidFromUri, 26 likeSource, 27 replyRootSource, 28 replySource, 29 repostSource, 30 timestampFromCursor, 31 toCanonicalUri 32} from '$lib'; 33import { Router } from './router.svelte'; 34import type { Account } from './accounts'; 35 36export const notificationStream = writable<NotificationsStream | null>(null); 37export const jetstream = writable<JetstreamSubscription | null>(null); 38 39export const profiles = new SvelteMap<Did, AppBskyActorProfile.Main>(); 40export const handles = new SvelteMap<Did, Handle>(); 41 42// source -> subject -> did (who did the interaction) -> rkey 43export type BacklinksMap = SvelteMap< 44 BacklinksSource, 45 SvelteMap<ResourceUri, SvelteMap<Did, SvelteSet<RecordKey>>> 46>; 47export const allBacklinks: BacklinksMap = new SvelteMap(); 48 49export const addBacklinks = ( 50 subject: ResourceUri, 51 source: BacklinksSource, 52 links: Iterable<Backlink> 53) => { 54 let subjectMap = allBacklinks.get(source); 55 if (!subjectMap) { 56 subjectMap = new SvelteMap(); 57 allBacklinks.set(source, subjectMap); 58 } 59 60 let didMap = subjectMap.get(subject); 61 if (!didMap) { 62 didMap = new SvelteMap(); 63 subjectMap.set(subject, didMap); 64 } 65 66 for (const link of links) { 67 let rkeys = didMap.get(link.did); 68 if (!rkeys) { 69 rkeys = new SvelteSet(); 70 didMap.set(link.did, rkeys); 71 } 72 rkeys.add(link.rkey); 73 } 74}; 75 76export const removeBacklinks = ( 77 subject: ResourceUri, 78 source: BacklinksSource, 79 links: Iterable<Backlink> 80) => { 81 const didMap = allBacklinks.get(source)?.get(subject); 82 if (!didMap) return; 83 84 for (const link of links) { 85 const rkeys = didMap.get(link.did); 86 if (!rkeys) continue; 87 rkeys.delete(link.rkey); 88 if (rkeys.size === 0) didMap.delete(link.did); 89 } 90}; 91 92export const findBacklinksBy = (subject: ResourceUri, source: BacklinksSource, did: Did) => { 93 const rkeys = allBacklinks.get(source)?.get(subject)?.get(did) ?? []; 94 // reconstruct the collection from the source 95 const collection = source.split(':')[0] as Nsid; 96 return rkeys.values().map((rkey) => ({ did, collection, rkey })); 97}; 98 99export const hasBacklink = (subject: ResourceUri, source: BacklinksSource, did: Did): boolean => { 100 return allBacklinks.get(source)?.get(subject)?.has(did) ?? false; 101}; 102 103export const getAllBacklinksFor = (subject: ResourceUri, source: BacklinksSource): Backlink[] => { 104 const subjectMap = allBacklinks.get(source); 105 if (!subjectMap) return []; 106 107 const didMap = subjectMap.get(subject); 108 if (!didMap) return []; 109 110 const collection = source.split(':')[0] as Nsid; 111 const result: Backlink[] = []; 112 113 for (const [did, rkeys] of didMap) 114 for (const rkey of rkeys) result.push({ did, collection, rkey }); 115 116 return result; 117}; 118 119export const isBlockedBy = (subject: Did, blocker: Did): boolean => { 120 return hasBacklink(`at://${subject}`, 'app.bsky.graph.block:subject', blocker); 121}; 122 123// eslint-disable-next-line @typescript-eslint/no-explicit-any 124const getNestedValue = (obj: any, path: string[]): any => { 125 return path.reduce((current, key) => current?.[key], obj); 126}; 127 128// eslint-disable-next-line @typescript-eslint/no-explicit-any 129const setNestedValue = (obj: any, path: string[], value: any): void => { 130 const lastKey = path[path.length - 1]; 131 const parent = path.slice(0, -1).reduce((current, key) => { 132 if (current[key] === undefined) current[key] = {}; 133 return current[key]; 134 }, obj); 135 parent[lastKey] = value; 136}; 137 138export const backlinksCursors = new SvelteMap< 139 Did, 140 SvelteMap<BacklinksSource, string | undefined> 141>(); 142 143export const fetchLinksUntil = async ( 144 subject: Did, 145 client: AtpClient, 146 backlinkSource: BacklinksSource, 147 timestamp: number = -1 148) => { 149 let cursorMap = backlinksCursors.get(subject); 150 if (!cursorMap) { 151 cursorMap = new SvelteMap<BacklinksSource, string | undefined>(); 152 backlinksCursors.set(subject, cursorMap); 153 } 154 155 const [_collection, source] = backlinkSource.split(':'); 156 const collection = _collection as keyof Records; 157 const cursor = cursorMap.get(backlinkSource); 158 159 // if already fetched we dont need to fetch again 160 const cursorTimestamp = timestampFromCursor(cursor); 161 if (cursorTimestamp && cursorTimestamp <= timestamp) return; 162 163 console.log(`${subject}: fetchLinksUntil`, backlinkSource, cursor, timestamp); 164 const result = await client.listRecordsUntil(subject, collection, cursor, timestamp); 165 166 if (!result.ok) { 167 console.error('failed to fetch links until', result.error); 168 return; 169 } 170 cursorMap.set(backlinkSource, result.value.cursor); 171 172 const path = source.split('.'); 173 for (const record of result.value.records) { 174 const uri = getNestedValue(record.value, path); 175 const parsedUri = parseCanonicalResourceUri(record.uri); 176 if (!parsedUri.ok) continue; 177 addBacklinks(uri, `${collection}:${source}`, [ 178 { 179 did: parsedUri.value.repo, 180 collection: parsedUri.value.collection, 181 rkey: parsedUri.value.rkey 182 } 183 ]); 184 } 185}; 186 187export const deletePostBacklink = async ( 188 client: AtpClient, 189 post: PostWithUri, 190 source: BacklinksSource 191) => { 192 const did = client.user?.did; 193 if (!did) return; 194 const collection = source.split(':')[0] as Nsid; 195 const links = findBacklinksBy(post.uri, source, did); 196 removeBacklinks(post.uri, source, links); 197 await Promise.allSettled( 198 links.map((link) => 199 client.user?.atcute.post('com.atproto.repo.deleteRecord', { 200 input: { repo: did, collection, rkey: link.rkey! } 201 }) 202 ) 203 ); 204}; 205 206export const createPostBacklink = async ( 207 client: AtpClient, 208 post: PostWithUri, 209 source: BacklinksSource 210) => { 211 const did = client.user?.did; 212 if (!did) return; 213 const [_collection, subject] = source.split(':'); 214 const collection = _collection as Nsid; 215 const rkey = tidNow(); 216 addBacklinks(post.uri, source, [ 217 { 218 did, 219 collection, 220 rkey 221 } 222 ]); 223 const record = { 224 $type: collection, 225 // eslint-disable-next-line svelte/prefer-svelte-reactivity 226 createdAt: new Date().toISOString() 227 }; 228 const subjectPath = subject.split('.'); 229 setNestedValue(record, subjectPath, post.uri); 230 setNestedValue(record, [...subjectPath.slice(0, -1), 'cid'], post.cid); 231 await client.user?.atcute.post('com.atproto.repo.createRecord', { 232 input: { 233 repo: did, 234 collection, 235 rkey, 236 record 237 } 238 }); 239}; 240 241export const pulsingPostId = writable<string | null>(null); 242 243export const viewClient = new AtpClient(); 244export const clients = new SvelteMap<Did, AtpClient>(); 245 246export const follows = new SvelteMap<Did, SvelteMap<ResourceUri, AppBskyGraphFollow.Main>>(); 247 248export const addFollows = ( 249 did: Did, 250 followMap: Iterable<[ResourceUri, AppBskyGraphFollow.Main]> 251) => { 252 let map = follows.get(did)!; 253 if (!map) { 254 map = new SvelteMap(followMap); 255 follows.set(did, map); 256 return; 257 } 258 for (const [uri, record] of followMap) map.set(uri, record); 259}; 260 261export const fetchFollows = async ( 262 account: Account 263): Promise<IteratorObject<AppBskyGraphFollow.Main>> => { 264 const client = clients.get(account.did)!; 265 const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.follow'); 266 if (!res.ok) { 267 console.error("can't fetch follows:", res.error); 268 return [].values(); 269 } 270 addFollows( 271 account.did, 272 res.value.records.map((follow) => [follow.uri, follow.value as AppBskyGraphFollow.Main]) 273 ); 274 return res.value.records.values().map((follow) => follow.value as AppBskyGraphFollow.Main); 275}; 276 277// this fetches up to three days of posts and interactions for using in following list 278export const fetchForInteractions = async (client: AtpClient, subject: Did) => { 279 const threeDaysAgo = (Date.now() - 3 * 24 * 60 * 60 * 1000) * 1000; 280 281 const res = await client.listRecordsUntil(subject, 'app.bsky.feed.post', undefined, threeDaysAgo); 282 if (!res.ok) return; 283 const postsWithUri = res.value.records.map( 284 (post) => 285 ({ cid: post.cid, uri: post.uri, record: post.value as AppBskyFeedPost.Main }) as PostWithUri 286 ); 287 addPosts(postsWithUri); 288 289 const cursorTimestamp = timestampFromCursor(res.value.cursor) ?? -1; 290 const timestamp = Math.min(cursorTimestamp, threeDaysAgo); 291 console.log(`${subject}: fetchForInteractions`, res.value.cursor, timestamp); 292 await Promise.all([repostSource].map((s) => fetchLinksUntil(subject, client, s, timestamp))); 293}; 294 295// if did is in set, we have fetched blocks for them already (against logged in users) 296export const blockFlags = new SvelteMap<Did, SvelteSet<Did>>(); 297 298export const fetchBlocked = async (client: AtpClient, subject: Did, blocker: Did) => { 299 const subjectUri = `at://${subject}` as ResourceUri; 300 const res = await client.getBacklinks(subjectUri, blockSource, [blocker], 1); 301 if (!res.ok) return false; 302 if (res.value.total > 0) addBacklinks(subjectUri, blockSource, res.value.records); 303 304 // mark as fetched 305 let flags = blockFlags.get(subject); 306 if (!flags) { 307 flags = new SvelteSet(); 308 blockFlags.set(subject, flags); 309 } 310 flags.add(blocker); 311 312 return res.value.total > 0; 313}; 314 315export const fetchBlocks = async (account: Account) => { 316 const client = clients.get(account.did)!; 317 const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.block'); 318 if (!res.ok) return; 319 for (const block of res.value.records) { 320 const record = block.value as AppBskyGraphBlock.Main; 321 const parsedUri = expect(parseCanonicalResourceUri(block.uri)); 322 addBacklinks(`at://${record.subject}`, blockSource, [ 323 { 324 did: parsedUri.repo, 325 collection: parsedUri.collection, 326 rkey: parsedUri.rkey 327 } 328 ]); 329 } 330}; 331 332export const createBlock = async (client: AtpClient, targetDid: Did) => { 333 const userDid = client.user?.did; 334 if (!userDid) return; 335 336 const rkey = tidNow(); 337 const targetUri = `at://${targetDid}` as ResourceUri; 338 339 addBacklinks(targetUri, blockSource, [ 340 { 341 did: userDid, 342 collection: 'app.bsky.graph.block', 343 rkey 344 } 345 ]); 346 347 const record: AppBskyGraphBlock.Main = { 348 $type: 'app.bsky.graph.block', 349 subject: targetDid, 350 // eslint-disable-next-line svelte/prefer-svelte-reactivity 351 createdAt: new Date().toISOString() 352 }; 353 354 await client.user?.atcute.post('com.atproto.repo.createRecord', { 355 input: { 356 repo: userDid, 357 collection: 'app.bsky.graph.block', 358 rkey, 359 record 360 } 361 }); 362}; 363 364export const deleteBlock = async (client: AtpClient, targetDid: Did) => { 365 const userDid = client.user?.did; 366 if (!userDid) return; 367 368 const targetUri = `at://${targetDid}` as ResourceUri; 369 const links = findBacklinksBy(targetUri, blockSource, userDid); 370 371 removeBacklinks(targetUri, blockSource, links); 372 373 await Promise.allSettled( 374 links.map((link) => 375 client.user?.atcute.post('com.atproto.repo.deleteRecord', { 376 input: { 377 repo: userDid, 378 collection: 'app.bsky.graph.block', 379 rkey: link.rkey 380 } 381 }) 382 ) 383 ); 384}; 385 386export const isBlockedByUser = (targetDid: Did, userDid: Did): boolean => { 387 return isBlockedBy(targetDid, userDid); 388}; 389 390export const isUserBlockedBy = (userDid: Did, targetDid: Did): boolean => { 391 return isBlockedBy(userDid, targetDid); 392}; 393 394export const hasBlockRelationship = (did1: Did, did2: Did): boolean => { 395 return isBlockedBy(did1, did2) || isBlockedBy(did2, did1); 396}; 397 398export const getBlockRelationship = ( 399 userDid: Did, 400 targetDid: Did 401): { userBlocked: boolean; blockedByTarget: boolean } => { 402 return { 403 userBlocked: isBlockedBy(targetDid, userDid), 404 blockedByTarget: isBlockedBy(userDid, targetDid) 405 }; 406}; 407 408export const allPosts = new SvelteMap<Did, SvelteMap<ResourceUri, PostWithUri>>(); 409export type DeletedPostInfo = { reply?: PostWithUri['record']['reply'] }; 410export const deletedPosts = new SvelteMap<ResourceUri, DeletedPostInfo>(); 411// did -> post uris that are replies to that did 412export const replyIndex = new SvelteMap<Did, SvelteSet<ResourceUri>>(); 413 414export const getPost = (did: Did, rkey: RecordKey) => 415 allPosts.get(did)?.get(toCanonicalUri({ did, collection: 'app.bsky.feed.post', rkey })); 416const hydrateCacheFn: Parameters<typeof hydratePosts>[3] = (did, rkey) => { 417 const cached = getPost(did, rkey); 418 return cached ? ok(cached) : undefined; 419}; 420 421export const addPosts = (newPosts: Iterable<PostWithUri>) => { 422 for (const post of newPosts) { 423 const parsedUri = expect(parseCanonicalResourceUri(post.uri)); 424 let posts = allPosts.get(parsedUri.repo); 425 if (!posts) { 426 posts = new SvelteMap(); 427 allPosts.set(parsedUri.repo, posts); 428 } 429 posts.set(post.uri, post); 430 if (post.record.reply) { 431 const link = { 432 did: parsedUri.repo, 433 collection: parsedUri.collection, 434 rkey: parsedUri.rkey 435 }; 436 addBacklinks(post.record.reply.parent.uri, replySource, [link]); 437 addBacklinks(post.record.reply.root.uri, replyRootSource, [link]); 438 439 // update reply index 440 const parentDid = extractDidFromUri(post.record.reply.parent.uri); 441 if (parentDid) { 442 let set = replyIndex.get(parentDid); 443 if (!set) { 444 set = new SvelteSet(); 445 replyIndex.set(parentDid, set); 446 } 447 set.add(post.uri); 448 } 449 } 450 } 451}; 452 453export const deletePost = (uri: ResourceUri) => { 454 const did = extractDidFromUri(uri)!; 455 const post = allPosts.get(did)?.get(uri); 456 if (!post) return; 457 allPosts.get(did)?.delete(uri); 458 // remove reply from index 459 const subjectDid = extractDidFromUri(post.record.reply?.parent.uri ?? ''); 460 if (subjectDid) replyIndex.get(subjectDid)?.delete(uri); 461 deletedPosts.set(uri, { reply: post.record.reply }); 462}; 463 464export const timelines = new SvelteMap<Did, SvelteSet<ResourceUri>>(); 465export const postCursors = new SvelteMap<Did, { value?: string; end: boolean }>(); 466 467const traversePostChain = (post: PostWithUri) => { 468 const result = [post.uri]; 469 const parentUri = post.record.reply?.parent.uri; 470 if (parentUri) { 471 const parentPost = allPosts.get(extractDidFromUri(parentUri)!)?.get(parentUri); 472 if (parentPost) result.push(...traversePostChain(parentPost)); 473 } 474 return result; 475}; 476export const addTimeline = (did: Did, uris: Iterable<ResourceUri>) => { 477 let timeline = timelines.get(did); 478 if (!timeline) { 479 timeline = new SvelteSet(); 480 timelines.set(did, timeline); 481 } 482 for (const uri of uris) { 483 const post = allPosts.get(did)?.get(uri); 484 // we need to traverse the post chain to add all posts in the chain to the timeline 485 // because the parent posts might not be in the timeline yet 486 const chain = post ? traversePostChain(post) : [uri]; 487 for (const uri of chain) timeline.add(uri); 488 } 489}; 490 491export const fetchTimeline = async ( 492 client: AtpClient, 493 subject: Did, 494 limit: number = 6, 495 withBacklinks: boolean = true, 496 hydrateOptions?: Partial<HydrateOptions> 497) => { 498 const cursor = postCursors.get(subject); 499 if (cursor && cursor.end) return; 500 501 const accPosts = await fetchPosts(subject, client, cursor?.value, limit, withBacklinks); 502 if (!accPosts.ok) throw `cant fetch posts ${subject}: ${accPosts.error}`; 503 504 // if the cursor is undefined, we've reached the end of the timeline 505 const newCursor = { value: accPosts.value.cursor, end: !accPosts.value.cursor }; 506 postCursors.set(subject, newCursor); 507 const hydrated = await hydratePosts( 508 client, 509 subject, 510 accPosts.value.posts, 511 hydrateCacheFn, 512 hydrateOptions 513 ); 514 if (!hydrated.ok) throw `cant hydrate posts ${subject}: ${hydrated.error}`; 515 516 addPosts(hydrated.value.values()); 517 addTimeline(subject, hydrated.value.keys()); 518 519 if (client.user?.did) { 520 const userDid = client.user.did; 521 // check if any of the post authors block the user 522 // eslint-disable-next-line svelte/prefer-svelte-reactivity 523 let distinctDids = new Set(hydrated.value.keys().map((uri) => extractDidFromUri(uri)!)); 524 distinctDids.delete(userDid); // dont need to check if user blocks themselves 525 const alreadyFetched = blockFlags.get(userDid); 526 if (alreadyFetched) distinctDids = distinctDids.difference(alreadyFetched); 527 if (distinctDids.size > 0) 528 await Promise.all(distinctDids.values().map((did) => fetchBlocked(client, userDid, did))); 529 } 530 531 console.log(`${subject}: fetchTimeline`, accPosts.value.cursor); 532 return newCursor; 533}; 534 535export const fetchInteractionsToTimelineEnd = async ( 536 client: AtpClient, 537 interactor: Did, 538 subject: Did 539) => { 540 const cursor = postCursors.get(subject); 541 if (!cursor) return; 542 const timestamp = timestampFromCursor(cursor.value); 543 await Promise.all( 544 [likeSource, repostSource].map((s) => fetchLinksUntil(interactor, client, s, timestamp)) 545 ); 546}; 547 548export const fetchInitial = async (account: Account) => { 549 const client = clients.get(account.did)!; 550 await Promise.all([ 551 fetchBlocks(account), 552 fetchForInteractions(client, account.did), 553 fetchFollows(account).then((follows) => 554 Promise.all(follows.map((follow) => fetchForInteractions(client, follow.subject)) ?? []) 555 ) 556 ]); 557}; 558 559export const handleJetstreamEvent = async (event: JetstreamEvent) => { 560 if (event.kind !== 'commit') return; 561 562 const { did, commit } = event; 563 const uri: ResourceUri = toCanonicalUri({ did, ...commit }); 564 if (commit.collection === 'app.bsky.feed.post') { 565 if (commit.operation === 'create') { 566 const record = commit.record as AppBskyFeedPost.Main; 567 const posts = [ 568 { 569 record, 570 uri, 571 cid: commit.cid 572 } 573 ]; 574 await setRecordCache(uri, record); 575 const client = clients.get(did) ?? viewClient; 576 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn); 577 if (!hydrated.ok) { 578 console.error(`cant hydrate posts ${did}: ${hydrated.error}`); 579 return; 580 } 581 addPosts(hydrated.value.values()); 582 addTimeline(did, hydrated.value.keys()); 583 if (record.reply) { 584 const parentDid = extractDidFromUri(record.reply.parent.uri)!; 585 addTimeline(parentDid, [uri]); 586 // const rootDid = extractDidFromUri(record.reply.root.uri)!; 587 // addTimeline(rootDid, [uri]); 588 } 589 } else if (commit.operation === 'delete') { 590 deletePost(uri); 591 } 592 } 593}; 594 595const handlePostNotification = async (event: NotificationsStreamEvent & { type: 'message' }) => { 596 const parsedSubjectUri = expect(parseCanonicalResourceUri(event.data.link.subject)); 597 const did = parsedSubjectUri.repo as AtprotoDid; 598 const client = clients.get(did); 599 if (!client) { 600 console.error(`${did}: cant handle post notification, client not found !?`); 601 return; 602 } 603 const subjectPost = await client.getRecord( 604 AppBskyFeedPost.mainSchema, 605 did, 606 parsedSubjectUri.rkey 607 ); 608 if (!subjectPost.ok) return; 609 610 const parsedSourceUri = expect(parseCanonicalResourceUri(event.data.link.source_record)); 611 const posts = [ 612 { 613 record: subjectPost.value.record, 614 uri: event.data.link.subject, 615 cid: subjectPost.value.cid, 616 replies: { 617 cursor: null, 618 total: 1, 619 records: [ 620 { 621 did: parsedSourceUri.repo, 622 collection: parsedSourceUri.collection, 623 rkey: parsedSourceUri.rkey 624 } 625 ] 626 } 627 } 628 ]; 629 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn); 630 if (!hydrated.ok) { 631 console.error(`cant hydrate posts ${did}: ${hydrated.error}`); 632 return; 633 } 634 635 // console.log(hydrated); 636 addPosts(hydrated.value.values()); 637 addTimeline(did, hydrated.value.keys()); 638}; 639 640const handleBacklink = (event: NotificationsStreamEvent & { type: 'message' }) => { 641 const parsedSource = expect(parseCanonicalResourceUri(event.data.link.source_record)); 642 addBacklinks(event.data.link.subject, event.data.link.source, [ 643 { 644 did: parsedSource.repo, 645 collection: parsedSource.collection, 646 rkey: parsedSource.rkey 647 } 648 ]); 649}; 650 651export const handleNotification = async (event: NotificationsStreamEvent) => { 652 if (event.type === 'message') { 653 if (event.data.link.source.startsWith('app.bsky.feed.post')) handlePostNotification(event); 654 else handleBacklink(event); 655 } 656}; 657 658export const currentTime = new SvelteDate(); 659 660if (typeof window !== 'undefined') 661 setInterval(() => { 662 currentTime.setTime(Date.now()); 663 }, 1000); 664 665export const router = new Router();