1import { writable } from 'svelte/store';
2import {
3 AtpClient,
4 setRecordCache,
5 type NotificationsStream,
6 type NotificationsStreamEvent
7} from './at/client.svelte';
8import { SvelteMap, SvelteDate, SvelteSet } from 'svelte/reactivity';
9import type { Did, Handle, Nsid, RecordKey, ResourceUri } from '@atcute/lexicons';
10import { fetchPosts, hydratePosts, type HydrateOptions, type PostWithUri } from './at/fetch';
11import { parseCanonicalResourceUri, type AtprotoDid } from '@atcute/lexicons/syntax';
12import {
13 AppBskyActorProfile,
14 AppBskyFeedPost,
15 AppBskyGraphBlock,
16 type AppBskyGraphFollow
17} from '@atcute/bluesky';
18import type { JetstreamSubscription, JetstreamEvent } from '@atcute/jetstream';
19import { expect, ok } from './result';
20import type { Backlink, BacklinksSource } from './at/constellation';
21import { now as tidNow } from '@atcute/tid';
22import type { Records } from '@atcute/lexicons/ambient';
23import {
24 blockSource,
25 extractDidFromUri,
26 likeSource,
27 replyRootSource,
28 replySource,
29 repostSource,
30 timestampFromCursor,
31 toCanonicalUri
32} from '$lib';
33import { Router } from './router.svelte';
34import type { Account } from './accounts';
35
36export const notificationStream = writable<NotificationsStream | null>(null);
37export const jetstream = writable<JetstreamSubscription | null>(null);
38
39export const profiles = new SvelteMap<Did, AppBskyActorProfile.Main>();
40export const handles = new SvelteMap<Did, Handle>();
41
42// source -> subject -> did (who did the interaction) -> rkey
43export type BacklinksMap = SvelteMap<
44 BacklinksSource,
45 SvelteMap<ResourceUri, SvelteMap<Did, SvelteSet<RecordKey>>>
46>;
47export const allBacklinks: BacklinksMap = new SvelteMap();
48
49export const addBacklinks = (
50 subject: ResourceUri,
51 source: BacklinksSource,
52 links: Iterable<Backlink>
53) => {
54 let subjectMap = allBacklinks.get(source);
55 if (!subjectMap) {
56 subjectMap = new SvelteMap();
57 allBacklinks.set(source, subjectMap);
58 }
59
60 let didMap = subjectMap.get(subject);
61 if (!didMap) {
62 didMap = new SvelteMap();
63 subjectMap.set(subject, didMap);
64 }
65
66 for (const link of links) {
67 let rkeys = didMap.get(link.did);
68 if (!rkeys) {
69 rkeys = new SvelteSet();
70 didMap.set(link.did, rkeys);
71 }
72 rkeys.add(link.rkey);
73 }
74};
75
76export const removeBacklinks = (
77 subject: ResourceUri,
78 source: BacklinksSource,
79 links: Iterable<Backlink>
80) => {
81 const didMap = allBacklinks.get(source)?.get(subject);
82 if (!didMap) return;
83
84 for (const link of links) {
85 const rkeys = didMap.get(link.did);
86 if (!rkeys) continue;
87 rkeys.delete(link.rkey);
88 if (rkeys.size === 0) didMap.delete(link.did);
89 }
90};
91
92export const findBacklinksBy = (subject: ResourceUri, source: BacklinksSource, did: Did) => {
93 const rkeys = allBacklinks.get(source)?.get(subject)?.get(did) ?? [];
94 // reconstruct the collection from the source
95 const collection = source.split(':')[0] as Nsid;
96 return rkeys.values().map((rkey) => ({ did, collection, rkey }));
97};
98
99export const hasBacklink = (subject: ResourceUri, source: BacklinksSource, did: Did): boolean => {
100 return allBacklinks.get(source)?.get(subject)?.has(did) ?? false;
101};
102
103export const getAllBacklinksFor = (subject: ResourceUri, source: BacklinksSource): Backlink[] => {
104 const subjectMap = allBacklinks.get(source);
105 if (!subjectMap) return [];
106
107 const didMap = subjectMap.get(subject);
108 if (!didMap) return [];
109
110 const collection = source.split(':')[0] as Nsid;
111 const result: Backlink[] = [];
112
113 for (const [did, rkeys] of didMap)
114 for (const rkey of rkeys) result.push({ did, collection, rkey });
115
116 return result;
117};
118
119export const isBlockedBy = (subject: Did, blocker: Did): boolean => {
120 return hasBacklink(`at://${subject}`, 'app.bsky.graph.block:subject', blocker);
121};
122
123// eslint-disable-next-line @typescript-eslint/no-explicit-any
124const getNestedValue = (obj: any, path: string[]): any => {
125 return path.reduce((current, key) => current?.[key], obj);
126};
127
128// eslint-disable-next-line @typescript-eslint/no-explicit-any
129const setNestedValue = (obj: any, path: string[], value: any): void => {
130 const lastKey = path[path.length - 1];
131 const parent = path.slice(0, -1).reduce((current, key) => {
132 if (current[key] === undefined) current[key] = {};
133 return current[key];
134 }, obj);
135 parent[lastKey] = value;
136};
137
138export const backlinksCursors = new SvelteMap<
139 Did,
140 SvelteMap<BacklinksSource, string | undefined>
141>();
142
143export const fetchLinksUntil = async (
144 subject: Did,
145 client: AtpClient,
146 backlinkSource: BacklinksSource,
147 timestamp: number = -1
148) => {
149 let cursorMap = backlinksCursors.get(subject);
150 if (!cursorMap) {
151 cursorMap = new SvelteMap<BacklinksSource, string | undefined>();
152 backlinksCursors.set(subject, cursorMap);
153 }
154
155 const [_collection, source] = backlinkSource.split(':');
156 const collection = _collection as keyof Records;
157 const cursor = cursorMap.get(backlinkSource);
158
159 // if already fetched we dont need to fetch again
160 const cursorTimestamp = timestampFromCursor(cursor);
161 if (cursorTimestamp && cursorTimestamp <= timestamp) return;
162
163 console.log(`${subject}: fetchLinksUntil`, backlinkSource, cursor, timestamp);
164 const result = await client.listRecordsUntil(subject, collection, cursor, timestamp);
165
166 if (!result.ok) {
167 console.error('failed to fetch links until', result.error);
168 return;
169 }
170 cursorMap.set(backlinkSource, result.value.cursor);
171
172 const path = source.split('.');
173 for (const record of result.value.records) {
174 const uri = getNestedValue(record.value, path);
175 const parsedUri = parseCanonicalResourceUri(record.uri);
176 if (!parsedUri.ok) continue;
177 addBacklinks(uri, `${collection}:${source}`, [
178 {
179 did: parsedUri.value.repo,
180 collection: parsedUri.value.collection,
181 rkey: parsedUri.value.rkey
182 }
183 ]);
184 }
185};
186
187export const deletePostBacklink = async (
188 client: AtpClient,
189 post: PostWithUri,
190 source: BacklinksSource
191) => {
192 const did = client.user?.did;
193 if (!did) return;
194 const collection = source.split(':')[0] as Nsid;
195 const links = findBacklinksBy(post.uri, source, did);
196 removeBacklinks(post.uri, source, links);
197 await Promise.allSettled(
198 links.map((link) =>
199 client.user?.atcute.post('com.atproto.repo.deleteRecord', {
200 input: { repo: did, collection, rkey: link.rkey! }
201 })
202 )
203 );
204};
205
206export const createPostBacklink = async (
207 client: AtpClient,
208 post: PostWithUri,
209 source: BacklinksSource
210) => {
211 const did = client.user?.did;
212 if (!did) return;
213 const [_collection, subject] = source.split(':');
214 const collection = _collection as Nsid;
215 const rkey = tidNow();
216 addBacklinks(post.uri, source, [
217 {
218 did,
219 collection,
220 rkey
221 }
222 ]);
223 const record = {
224 $type: collection,
225 // eslint-disable-next-line svelte/prefer-svelte-reactivity
226 createdAt: new Date().toISOString()
227 };
228 const subjectPath = subject.split('.');
229 setNestedValue(record, subjectPath, post.uri);
230 setNestedValue(record, [...subjectPath.slice(0, -1), 'cid'], post.cid);
231 await client.user?.atcute.post('com.atproto.repo.createRecord', {
232 input: {
233 repo: did,
234 collection,
235 rkey,
236 record
237 }
238 });
239};
240
241export const pulsingPostId = writable<string | null>(null);
242
243export const viewClient = new AtpClient();
244export const clients = new SvelteMap<Did, AtpClient>();
245
246export const follows = new SvelteMap<Did, SvelteMap<ResourceUri, AppBskyGraphFollow.Main>>();
247
248export const addFollows = (
249 did: Did,
250 followMap: Iterable<[ResourceUri, AppBskyGraphFollow.Main]>
251) => {
252 let map = follows.get(did)!;
253 if (!map) {
254 map = new SvelteMap(followMap);
255 follows.set(did, map);
256 return;
257 }
258 for (const [uri, record] of followMap) map.set(uri, record);
259};
260
261export const fetchFollows = async (
262 account: Account
263): Promise<IteratorObject<AppBskyGraphFollow.Main>> => {
264 const client = clients.get(account.did)!;
265 const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.follow');
266 if (!res.ok) {
267 console.error("can't fetch follows:", res.error);
268 return [].values();
269 }
270 addFollows(
271 account.did,
272 res.value.records.map((follow) => [follow.uri, follow.value as AppBskyGraphFollow.Main])
273 );
274 return res.value.records.values().map((follow) => follow.value as AppBskyGraphFollow.Main);
275};
276
277// this fetches up to three days of posts and interactions for using in following list
278export const fetchForInteractions = async (client: AtpClient, subject: Did) => {
279 const threeDaysAgo = (Date.now() - 3 * 24 * 60 * 60 * 1000) * 1000;
280
281 const res = await client.listRecordsUntil(subject, 'app.bsky.feed.post', undefined, threeDaysAgo);
282 if (!res.ok) return;
283 const postsWithUri = res.value.records.map(
284 (post) =>
285 ({ cid: post.cid, uri: post.uri, record: post.value as AppBskyFeedPost.Main }) as PostWithUri
286 );
287 addPosts(postsWithUri);
288
289 const cursorTimestamp = timestampFromCursor(res.value.cursor) ?? -1;
290 const timestamp = Math.min(cursorTimestamp, threeDaysAgo);
291 console.log(`${subject}: fetchForInteractions`, res.value.cursor, timestamp);
292 await Promise.all([repostSource].map((s) => fetchLinksUntil(subject, client, s, timestamp)));
293};
294
295// if did is in set, we have fetched blocks for them already (against logged in users)
296export const blockFlags = new SvelteMap<Did, SvelteSet<Did>>();
297
298export const fetchBlocked = async (client: AtpClient, subject: Did, blocker: Did) => {
299 const subjectUri = `at://${subject}` as ResourceUri;
300 const res = await client.getBacklinks(subjectUri, blockSource, [blocker], 1);
301 if (!res.ok) return false;
302 if (res.value.total > 0) addBacklinks(subjectUri, blockSource, res.value.records);
303
304 // mark as fetched
305 let flags = blockFlags.get(subject);
306 if (!flags) {
307 flags = new SvelteSet();
308 blockFlags.set(subject, flags);
309 }
310 flags.add(blocker);
311
312 return res.value.total > 0;
313};
314
315export const fetchBlocks = async (account: Account) => {
316 const client = clients.get(account.did)!;
317 const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.block');
318 if (!res.ok) return;
319 for (const block of res.value.records) {
320 const record = block.value as AppBskyGraphBlock.Main;
321 const parsedUri = expect(parseCanonicalResourceUri(block.uri));
322 addBacklinks(`at://${record.subject}`, blockSource, [
323 {
324 did: parsedUri.repo,
325 collection: parsedUri.collection,
326 rkey: parsedUri.rkey
327 }
328 ]);
329 }
330};
331
332export const createBlock = async (client: AtpClient, targetDid: Did) => {
333 const userDid = client.user?.did;
334 if (!userDid) return;
335
336 const rkey = tidNow();
337 const targetUri = `at://${targetDid}` as ResourceUri;
338
339 addBacklinks(targetUri, blockSource, [
340 {
341 did: userDid,
342 collection: 'app.bsky.graph.block',
343 rkey
344 }
345 ]);
346
347 const record: AppBskyGraphBlock.Main = {
348 $type: 'app.bsky.graph.block',
349 subject: targetDid,
350 // eslint-disable-next-line svelte/prefer-svelte-reactivity
351 createdAt: new Date().toISOString()
352 };
353
354 await client.user?.atcute.post('com.atproto.repo.createRecord', {
355 input: {
356 repo: userDid,
357 collection: 'app.bsky.graph.block',
358 rkey,
359 record
360 }
361 });
362};
363
364export const deleteBlock = async (client: AtpClient, targetDid: Did) => {
365 const userDid = client.user?.did;
366 if (!userDid) return;
367
368 const targetUri = `at://${targetDid}` as ResourceUri;
369 const links = findBacklinksBy(targetUri, blockSource, userDid);
370
371 removeBacklinks(targetUri, blockSource, links);
372
373 await Promise.allSettled(
374 links.map((link) =>
375 client.user?.atcute.post('com.atproto.repo.deleteRecord', {
376 input: {
377 repo: userDid,
378 collection: 'app.bsky.graph.block',
379 rkey: link.rkey
380 }
381 })
382 )
383 );
384};
385
386export const isBlockedByUser = (targetDid: Did, userDid: Did): boolean => {
387 return isBlockedBy(targetDid, userDid);
388};
389
390export const isUserBlockedBy = (userDid: Did, targetDid: Did): boolean => {
391 return isBlockedBy(userDid, targetDid);
392};
393
394export const hasBlockRelationship = (did1: Did, did2: Did): boolean => {
395 return isBlockedBy(did1, did2) || isBlockedBy(did2, did1);
396};
397
398export const getBlockRelationship = (
399 userDid: Did,
400 targetDid: Did
401): { userBlocked: boolean; blockedByTarget: boolean } => {
402 return {
403 userBlocked: isBlockedBy(targetDid, userDid),
404 blockedByTarget: isBlockedBy(userDid, targetDid)
405 };
406};
407
408export const allPosts = new SvelteMap<Did, SvelteMap<ResourceUri, PostWithUri>>();
409export type DeletedPostInfo = { reply?: PostWithUri['record']['reply'] };
410export const deletedPosts = new SvelteMap<ResourceUri, DeletedPostInfo>();
411// did -> post uris that are replies to that did
412export const replyIndex = new SvelteMap<Did, SvelteSet<ResourceUri>>();
413
414export const getPost = (did: Did, rkey: RecordKey) =>
415 allPosts.get(did)?.get(toCanonicalUri({ did, collection: 'app.bsky.feed.post', rkey }));
416const hydrateCacheFn: Parameters<typeof hydratePosts>[3] = (did, rkey) => {
417 const cached = getPost(did, rkey);
418 return cached ? ok(cached) : undefined;
419};
420
421export const addPosts = (newPosts: Iterable<PostWithUri>) => {
422 for (const post of newPosts) {
423 const parsedUri = expect(parseCanonicalResourceUri(post.uri));
424 let posts = allPosts.get(parsedUri.repo);
425 if (!posts) {
426 posts = new SvelteMap();
427 allPosts.set(parsedUri.repo, posts);
428 }
429 posts.set(post.uri, post);
430 if (post.record.reply) {
431 const link = {
432 did: parsedUri.repo,
433 collection: parsedUri.collection,
434 rkey: parsedUri.rkey
435 };
436 addBacklinks(post.record.reply.parent.uri, replySource, [link]);
437 addBacklinks(post.record.reply.root.uri, replyRootSource, [link]);
438
439 // update reply index
440 const parentDid = extractDidFromUri(post.record.reply.parent.uri);
441 if (parentDid) {
442 let set = replyIndex.get(parentDid);
443 if (!set) {
444 set = new SvelteSet();
445 replyIndex.set(parentDid, set);
446 }
447 set.add(post.uri);
448 }
449 }
450 }
451};
452
453export const deletePost = (uri: ResourceUri) => {
454 const did = extractDidFromUri(uri)!;
455 const post = allPosts.get(did)?.get(uri);
456 if (!post) return;
457 allPosts.get(did)?.delete(uri);
458 // remove reply from index
459 const subjectDid = extractDidFromUri(post.record.reply?.parent.uri ?? '');
460 if (subjectDid) replyIndex.get(subjectDid)?.delete(uri);
461 deletedPosts.set(uri, { reply: post.record.reply });
462};
463
464export const timelines = new SvelteMap<Did, SvelteSet<ResourceUri>>();
465export const postCursors = new SvelteMap<Did, { value?: string; end: boolean }>();
466
467const traversePostChain = (post: PostWithUri) => {
468 const result = [post.uri];
469 const parentUri = post.record.reply?.parent.uri;
470 if (parentUri) {
471 const parentPost = allPosts.get(extractDidFromUri(parentUri)!)?.get(parentUri);
472 if (parentPost) result.push(...traversePostChain(parentPost));
473 }
474 return result;
475};
476export const addTimeline = (did: Did, uris: Iterable<ResourceUri>) => {
477 let timeline = timelines.get(did);
478 if (!timeline) {
479 timeline = new SvelteSet();
480 timelines.set(did, timeline);
481 }
482 for (const uri of uris) {
483 const post = allPosts.get(did)?.get(uri);
484 // we need to traverse the post chain to add all posts in the chain to the timeline
485 // because the parent posts might not be in the timeline yet
486 const chain = post ? traversePostChain(post) : [uri];
487 for (const uri of chain) timeline.add(uri);
488 }
489};
490
491export const fetchTimeline = async (
492 client: AtpClient,
493 subject: Did,
494 limit: number = 6,
495 withBacklinks: boolean = true,
496 hydrateOptions?: Partial<HydrateOptions>
497) => {
498 const cursor = postCursors.get(subject);
499 if (cursor && cursor.end) return;
500
501 const accPosts = await fetchPosts(subject, client, cursor?.value, limit, withBacklinks);
502 if (!accPosts.ok) throw `cant fetch posts ${subject}: ${accPosts.error}`;
503
504 // if the cursor is undefined, we've reached the end of the timeline
505 const newCursor = { value: accPosts.value.cursor, end: !accPosts.value.cursor };
506 postCursors.set(subject, newCursor);
507 const hydrated = await hydratePosts(
508 client,
509 subject,
510 accPosts.value.posts,
511 hydrateCacheFn,
512 hydrateOptions
513 );
514 if (!hydrated.ok) throw `cant hydrate posts ${subject}: ${hydrated.error}`;
515
516 addPosts(hydrated.value.values());
517 addTimeline(subject, hydrated.value.keys());
518
519 if (client.user?.did) {
520 const userDid = client.user.did;
521 // check if any of the post authors block the user
522 // eslint-disable-next-line svelte/prefer-svelte-reactivity
523 let distinctDids = new Set(hydrated.value.keys().map((uri) => extractDidFromUri(uri)!));
524 distinctDids.delete(userDid); // dont need to check if user blocks themselves
525 const alreadyFetched = blockFlags.get(userDid);
526 if (alreadyFetched) distinctDids = distinctDids.difference(alreadyFetched);
527 if (distinctDids.size > 0)
528 await Promise.all(distinctDids.values().map((did) => fetchBlocked(client, userDid, did)));
529 }
530
531 console.log(`${subject}: fetchTimeline`, accPosts.value.cursor);
532 return newCursor;
533};
534
535export const fetchInteractionsToTimelineEnd = async (
536 client: AtpClient,
537 interactor: Did,
538 subject: Did
539) => {
540 const cursor = postCursors.get(subject);
541 if (!cursor) return;
542 const timestamp = timestampFromCursor(cursor.value);
543 await Promise.all(
544 [likeSource, repostSource].map((s) => fetchLinksUntil(interactor, client, s, timestamp))
545 );
546};
547
548export const fetchInitial = async (account: Account) => {
549 const client = clients.get(account.did)!;
550 await Promise.all([
551 fetchBlocks(account),
552 fetchForInteractions(client, account.did),
553 fetchFollows(account).then((follows) =>
554 Promise.all(follows.map((follow) => fetchForInteractions(client, follow.subject)) ?? [])
555 )
556 ]);
557};
558
559export const handleJetstreamEvent = async (event: JetstreamEvent) => {
560 if (event.kind !== 'commit') return;
561
562 const { did, commit } = event;
563 const uri: ResourceUri = toCanonicalUri({ did, ...commit });
564 if (commit.collection === 'app.bsky.feed.post') {
565 if (commit.operation === 'create') {
566 const record = commit.record as AppBskyFeedPost.Main;
567 const posts = [
568 {
569 record,
570 uri,
571 cid: commit.cid
572 }
573 ];
574 await setRecordCache(uri, record);
575 const client = clients.get(did) ?? viewClient;
576 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn);
577 if (!hydrated.ok) {
578 console.error(`cant hydrate posts ${did}: ${hydrated.error}`);
579 return;
580 }
581 addPosts(hydrated.value.values());
582 addTimeline(did, hydrated.value.keys());
583 if (record.reply) {
584 const parentDid = extractDidFromUri(record.reply.parent.uri)!;
585 addTimeline(parentDid, [uri]);
586 // const rootDid = extractDidFromUri(record.reply.root.uri)!;
587 // addTimeline(rootDid, [uri]);
588 }
589 } else if (commit.operation === 'delete') {
590 deletePost(uri);
591 }
592 }
593};
594
595const handlePostNotification = async (event: NotificationsStreamEvent & { type: 'message' }) => {
596 const parsedSubjectUri = expect(parseCanonicalResourceUri(event.data.link.subject));
597 const did = parsedSubjectUri.repo as AtprotoDid;
598 const client = clients.get(did);
599 if (!client) {
600 console.error(`${did}: cant handle post notification, client not found !?`);
601 return;
602 }
603 const subjectPost = await client.getRecord(
604 AppBskyFeedPost.mainSchema,
605 did,
606 parsedSubjectUri.rkey
607 );
608 if (!subjectPost.ok) return;
609
610 const parsedSourceUri = expect(parseCanonicalResourceUri(event.data.link.source_record));
611 const posts = [
612 {
613 record: subjectPost.value.record,
614 uri: event.data.link.subject,
615 cid: subjectPost.value.cid,
616 replies: {
617 cursor: null,
618 total: 1,
619 records: [
620 {
621 did: parsedSourceUri.repo,
622 collection: parsedSourceUri.collection,
623 rkey: parsedSourceUri.rkey
624 }
625 ]
626 }
627 }
628 ];
629 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn);
630 if (!hydrated.ok) {
631 console.error(`cant hydrate posts ${did}: ${hydrated.error}`);
632 return;
633 }
634
635 // console.log(hydrated);
636 addPosts(hydrated.value.values());
637 addTimeline(did, hydrated.value.keys());
638};
639
640const handleBacklink = (event: NotificationsStreamEvent & { type: 'message' }) => {
641 const parsedSource = expect(parseCanonicalResourceUri(event.data.link.source_record));
642 addBacklinks(event.data.link.subject, event.data.link.source, [
643 {
644 did: parsedSource.repo,
645 collection: parsedSource.collection,
646 rkey: parsedSource.rkey
647 }
648 ]);
649};
650
651export const handleNotification = async (event: NotificationsStreamEvent) => {
652 if (event.type === 'message') {
653 if (event.data.link.source.startsWith('app.bsky.feed.post')) handlePostNotification(event);
654 else handleBacklink(event);
655 }
656};
657
658export const currentTime = new SvelteDate();
659
660if (typeof window !== 'undefined')
661 setInterval(() => {
662 currentTime.setTime(Date.now());
663 }, 1000);
664
665export const router = new Router();