replies timeline only, appview-less bluesky client

implement blocks sync

ptr.pet 24c6dc95 bded7c3e

verified
+13
.zed/settings.json
··· 1 + { 2 + "lsp": { 3 + "vtsls": { 4 + "settings": { 5 + "typescript": { 6 + "suggestionActions": { 7 + "enabled": false 8 + } 9 + } 10 + } 11 + } 12 + } 13 + }
+11 -41
src/components/FollowingItem.svelte
··· 1 1 <script lang="ts" module> 2 - // Cache for synchronous access during component recycling 3 2 const profileCache = new SvelteMap<string, { displayName?: string; handle: string }>(); 4 3 </script> 5 4 ··· 8 7 import { getRelativeTime } from '$lib/date'; 9 8 import { generateColorForDid } from '$lib/accounts'; 10 9 import type { Did } from '@atcute/lexicons'; 11 - import type { AtprotoDid } from '@atcute/lexicons/syntax'; 12 10 import type { calculateFollowedUserStats, Sort } from '$lib/following'; 13 - import type { AtpClient } from '$lib/at/client'; 11 + import { resolveDidDoc, type AtpClient } from '$lib/at/client'; 14 12 import { SvelteMap } from 'svelte/reactivity'; 15 - import { clients, getClient, router } from '$lib/state.svelte'; 13 + import { router } from '$lib/state.svelte'; 14 + import { map } from '$lib/result'; 16 15 17 16 interface Props { 18 17 style: string; ··· 35 34 const c = profileCache.get(targetDid)!; 36 35 displayName = c.displayName; 37 36 handle = c.handle; 38 - } else { 39 - const existingClient = clients.get(targetDid as AtprotoDid); 40 - if (existingClient?.user?.handle) { 41 - handle = existingClient.user.handle; 42 - } else { 43 - handle = 'handle.invalid'; 44 - displayName = undefined; 45 - } 46 37 } 47 38 48 39 try { 49 - // Optimization: Check clients map first to avoid async overhead if possible 50 - // but we need to ensure we have the profile data, not just client existence. 51 - const userClient = await getClient(targetDid as AtprotoDid); 52 - 53 - // Check if the component has been recycled for a different user while we were awaiting 54 - if (did !== targetDid) return; 55 - 56 - let newHandle = handle; 57 - let newDisplayName = displayName; 58 - 59 - if (userClient.user?.handle) { 60 - newHandle = userClient.user.handle; 61 - handle = newHandle; 62 - } else { 63 - newHandle = targetDid; 64 - handle = newHandle; 65 - } 66 - 67 - const profileRes = await userClient.getProfile(); 68 - 40 + const [profileRes, handleRes] = await Promise.all([ 41 + client.getProfile(), 42 + resolveDidDoc(targetDid).then((r) => map(r, (doc) => doc.handle)) 43 + ]); 69 44 if (did !== targetDid) return; 70 - 71 - if (profileRes.ok) { 72 - newDisplayName = profileRes.value.displayName; 73 - displayName = newDisplayName; 74 - } 45 + if (profileRes.ok) displayName = profileRes.value.displayName; 46 + if (handleRes.ok) handle = handleRes.value; 75 47 76 - // Update cache 77 48 profileCache.set(targetDid, { 78 - handle: newHandle, 79 - displayName: newDisplayName 49 + handle, 50 + displayName 80 51 }); 81 52 } catch (e) { 82 53 if (did !== targetDid) return; ··· 85 56 } 86 57 }; 87 58 88 - // Re-run whenever `did` changes 89 59 $effect(() => { 90 60 loadProfile(did); 91 61 });
+10 -10
src/components/TimelineView.svelte
··· 11 11 fetchTimeline, 12 12 allPosts, 13 13 timelines, 14 - fetchInteractionsUntil 14 + fetchInteractionsToTimelineEnd 15 15 } from '$lib/state.svelte'; 16 16 import Icon from '@iconify/svelte'; 17 17 import { buildThreads, filterThreads, type ThreadPost } from '$lib/thread'; ··· 53 53 const loaderState = new LoaderState(); 54 54 let scrollContainer = $state<HTMLDivElement>(); 55 55 let loading = $state(false); 56 - let fetchMoreInteractions: boolean | undefined = $state(false); 57 56 let loadError = $state(''); 58 57 59 58 const loadMore = async () => { ··· 63 62 loaderState.status = 'LOADING'; 64 63 65 64 try { 66 - await fetchTimeline(did as AtprotoDid, 7, showReplies); 67 - // interaction fetching is done lazily so we dont block loading posts 68 - fetchMoreInteractions = true; 65 + await fetchTimeline(client, did as AtprotoDid, 7, showReplies); 66 + // only fetch interactions if logged in (because if not who is the interactor) 67 + if (client.user) await fetchInteractionsToTimelineEnd(client, did); 69 68 loaderState.loaded(); 70 69 } catch (error) { 71 70 loadError = `${error}`; ··· 87 86 const cursor = did ? postCursors.get(did as AtprotoDid) : undefined; 88 87 if (!cursor?.end) loadMore(); 89 88 } 90 - if (client && did && fetchMoreInteractions) { 91 - // set to false so it doesnt attempt to fetch again while its already fetching 92 - fetchMoreInteractions = false; 93 - fetchInteractionsUntil(client, did).then(() => (fetchMoreInteractions = undefined)); 94 - } 89 + }); 90 + // we want to load interactions when changing logged in user on timelines 91 + // only on timelines that arent logged in users, because those are already 92 + // loaded by loadMore 93 + $effect(() => { 94 + if (client && did && client.user?.did !== did) fetchInteractionsToTimelineEnd(client, did); 95 95 }); 96 96 </script> 97 97
+52 -80
src/lib/at/client.ts
··· 8 8 import { safeParse, type Blob as AtpBlob, type Handle, type InferOutput } from '@atcute/lexicons'; 9 9 import { 10 10 isDid, 11 - parseCanonicalResourceUri, 12 11 parseResourceUri, 13 12 type ActorIdentifier, 14 13 type AtprotoDid, ··· 36 35 import type { Notification } from './stardust'; 37 36 import type { OAuthUserAgent } from '@atcute/oauth-browser-client'; 38 37 import { timestampFromCursor, toCanonicalUri, toResourceUri } from '$lib'; 39 - import { constellationUrl, slingshotUrl, spacedustUrl } from '.'; 38 + import { constellationUrl, httpToDidWeb, slingshotUrl, spacedustUrl } from '.'; 40 39 41 40 export type RecordOutput<Output> = { uri: ResourceUri; cid: Cid | undefined; record: Output }; 42 41 ··· 90 89 91 90 if (onProgress && xhr.upload) { 92 91 xhr.upload.onprogress = (event: ProgressEvent) => { 93 - if (event.lengthComputable) { 94 - onProgress(event.loaded, event.total); 95 - } 92 + if (event.lengthComputable) onProgress(event.loaded, event.total); 96 93 }; 97 94 } 98 95 99 - Object.keys(headers).forEach((key) => { 100 - xhr.setRequestHeader(key, headers[key]); 101 - }); 96 + Object.keys(headers).forEach((key) => xhr.setRequestHeader(key, headers[key])); 102 97 103 98 xhr.onload = () => { 104 - if (xhr.status >= 200 && xhr.status < 300) { 105 - resolve(ok(JSON.parse(xhr.responseText))); 106 - } else { 107 - resolve(err(JSON.parse(xhr.responseText))); 108 - } 99 + if (xhr.status >= 200 && xhr.status < 300) resolve(ok(JSON.parse(xhr.responseText))); 100 + else resolve(err(JSON.parse(xhr.responseText))); 109 101 }; 110 102 111 - xhr.onerror = () => { 112 - resolve(err({ error: 'xhr_error', message: 'network error' })); 113 - }; 114 - 115 - xhr.onabort = () => { 116 - resolve(err({ error: 'xhr_error', message: 'upload aborted' })); 117 - }; 118 - 103 + xhr.onerror = () => resolve(err({ error: 'xhr_error', message: 'network error' })); 104 + xhr.onabort = () => resolve(err({ error: 'xhr_error', message: 'upload aborted' })); 119 105 xhr.send(body); 120 106 }); 121 107 }; ··· 202 188 } 203 189 204 190 async listRecords<Collection extends keyof Records>( 191 + ident: ActorIdentifier, 205 192 collection: Collection, 206 193 cursor?: string, 207 194 limit: number = 100 208 195 ): Promise< 209 196 Result<InferXRPCBodyOutput<(typeof ComAtprotoRepoListRecords.mainSchema)['output']>, string> 210 197 > { 211 - if (!this.atcute || !this.user) return err('not authenticated'); 212 - const res = await this.atcute.get('com.atproto.repo.listRecords', { 198 + if (!this.atcute) return err('not authenticated'); 199 + const docRes = await resolveDidDoc(ident); 200 + if (!docRes.ok) return docRes; 201 + const atp = 202 + this.user?.did === docRes.value.did 203 + ? this.atcute 204 + : new AtcuteClient({ handler: simpleFetchHandler({ service: docRes.value.pds }) }); 205 + const res = await atp.get('com.atproto.repo.listRecords', { 213 206 params: { 214 - repo: this.user.did, 207 + repo: docRes.value.did, 215 208 collection, 216 209 cursor, 217 210 limit, ··· 227 220 } 228 221 229 222 async listRecordsUntil<Collection extends keyof Records>( 223 + ident: ActorIdentifier, 230 224 collection: Collection, 231 225 cursor?: string, 232 226 timestamp: number = -1 ··· 238 232 239 233 let end = false; 240 234 while (!end) { 241 - const res = await this.listRecords(collection, data.cursor); 235 + const res = await this.listRecords(ident, collection, data.cursor); 242 236 if (!res.ok) return res; 243 237 data.cursor = res.value.cursor; 244 238 data.records.push(...res.value.records); ··· 255 249 end = true; 256 250 } else { 257 251 console.info( 258 - `${this.user?.did}: continuing to fetch ${collection}, on ${cursorTimestamp} until ${timestamp}` 252 + `${ident}: continuing to fetch ${collection}, on ${cursorTimestamp} until ${timestamp}` 259 253 ); 260 254 } 261 255 } ··· 264 258 return ok(data); 265 259 } 266 260 267 - async getBacklinksUri( 268 - uri: ResourceUri, 269 - source: BacklinksSource 270 - ): Promise<Result<Backlinks, string>> { 271 - const parsedResourceUri = expect(parseCanonicalResourceUri(uri)); 272 - return await this.getBacklinks( 273 - parsedResourceUri.repo, 274 - parsedResourceUri.collection, 275 - parsedResourceUri.rkey, 276 - source 277 - ); 278 - } 279 - 280 261 async getBacklinks( 281 - repo: ActorIdentifier, 282 - collection: Nsid, 283 - rkey: RecordKey, 262 + subject: ResourceUri, 284 263 source: BacklinksSource, 264 + filterBy?: Did[], 285 265 limit?: number 286 266 ): Promise<Result<Backlinks, string>> { 267 + const { repo, collection, rkey } = expect(parseResourceUri(subject)); 287 268 const did = await resolveHandle(repo); 288 269 if (!did.ok) return err(`cant resolve handle: ${did.error}`); 289 270 290 271 const timeout = new Promise<null>((resolve) => setTimeout(() => resolve(null), 2000)); 291 272 const query = fetchMicrocosm(constellationUrl, BacklinksQuery, { 292 - subject: toCanonicalUri({ did: did.value, collection, rkey }), 273 + subject: collection ? toCanonicalUri({ did: did.value, collection, rkey: rkey! }) : did.value, 293 274 source, 294 - limit: limit || 100 275 + limit: limit || 100, 276 + did: filterBy 295 277 }); 296 278 297 279 const results = await Promise.race([query, timeout]); ··· 303 285 async getServiceAuth(lxm: keyof XRPCProcedures, exp: number): Promise<Result<string, string>> { 304 286 if (!this.atcute || !this.user) return err('not authenticated'); 305 287 const serviceAuthUrl = new URL(`${this.user.pds}xrpc/com.atproto.server.getServiceAuth`); 306 - serviceAuthUrl.searchParams.append( 307 - 'aud', 308 - this.user.pds.replace('https://', 'did:web:').slice(0, -1) 309 - ); 288 + serviceAuthUrl.searchParams.append('aud', httpToDidWeb(this.user.pds)); 310 289 serviceAuthUrl.searchParams.append('lxm', 'com.atproto.repo.uploadBlob'); 311 290 serviceAuthUrl.searchParams.append('exp', exp.toString()); // 30 minutes 312 291 ··· 412 391 } 413 392 } 414 393 415 - export const newPublicClient = async (ident: ActorIdentifier): Promise<AtpClient> => { 416 - const atp = new AtpClient(); 417 - const didDoc = await resolveDidDoc(ident); 418 - if (!didDoc.ok) { 419 - console.error('failed to resolve did doc', didDoc.error); 420 - return atp; 421 - } 422 - atp.atcute = new AtcuteClient({ handler: simpleFetchHandler({ service: didDoc.value.pds }) }); 423 - atp.user = didDoc.value; 424 - return atp; 425 - }; 394 + // export const newPublicClient = async (ident: ActorIdentifier) => { 395 + // const atp = new AtpClient(); 396 + // const didDoc = await resolveDidDoc(ident); 397 + // if (!didDoc.ok) { 398 + // console.error('failed to resolve did doc', didDoc.error); 399 + // return atp; 400 + // } 401 + // atp.atcute = new AtcuteClient({ handler: simpleFetchHandler({ service: didDoc.value.pds }) }); 402 + // atp.user = didDoc.value; 403 + // return atp; 404 + // }; 426 405 427 - // Wrappers that use the cache 406 + export const resolveHandle = (identifier: ActorIdentifier) => { 407 + if (isDid(identifier)) return Promise.resolve(ok(identifier as AtprotoDid)); 428 408 429 - export const resolveHandle = async ( 430 - identifier: ActorIdentifier 431 - ): Promise<Result<AtprotoDid, string>> => { 432 - if (isDid(identifier)) return ok(identifier as AtprotoDid); 433 - 434 - try { 435 - const did = await cache.resolveHandle(identifier); 436 - return ok(did); 437 - } catch (e) { 438 - return err(String(e)); 439 - } 409 + return cache 410 + .resolveHandle(identifier) 411 + .then((did) => ok(did)) 412 + .catch((e) => err(String(e))); 440 413 }; 441 414 442 - export const resolveDidDoc = async (ident: ActorIdentifier): Promise<Result<MiniDoc, string>> => { 443 - try { 444 - const doc = await cache.resolveDidDoc(ident); 445 - return ok(doc); 446 - } catch (e) { 447 - return err(String(e)); 448 - } 449 - }; 415 + export const resolveDidDoc = (ident: ActorIdentifier) => 416 + cache 417 + .resolveDidDoc(ident) 418 + .then((doc) => ok(doc)) 419 + .catch((e) => err(String(e))); 450 420 451 421 type NotificationsStreamEncoder = WebSocket.Encoder<undefined, Notification>; 452 422 export type NotificationsStream = WebSocket<NotificationsStreamEncoder>; ··· 485 455 ): Promise<Result<Output, string>> => { 486 456 if (!schema.output || schema.output.type === 'blob') return err('schema must be blob'); 487 457 api.pathname = `/xrpc/${schema.nsid}`; 488 - api.search = params ? `?${new URLSearchParams(params)}` : ''; 458 + api.search = params 459 + ? `?${new URLSearchParams(Object.entries(params).flatMap(([k, v]) => (v === undefined ? [] : [[k, String(v)]])))}` 460 + : ''; 489 461 try { 490 462 const body = await fetchJson(api, init); 491 463 if (!body.ok) return err(body.error);
+1 -1
src/lib/at/constellation.ts
··· 9 9 }); 10 10 export const BacklinksQuery = v.query('blue.microcosm.links.getBacklinks', { 11 11 params: v.object({ 12 - subject: v.resourceUriString(), 12 + subject: v.string(), 13 13 source: v.string(), 14 14 did: v.optional(v.array(v.didString())), 15 15 limit: v.optional(v.integer())
+4 -3
src/lib/at/fetch.ts
··· 17 17 }; 18 18 19 19 export const fetchPosts = async ( 20 + subject: Did, 20 21 client: AtpClient, 21 22 cursor?: string, 22 23 limit?: number, 23 24 withBacklinks: boolean = true 24 25 ): Promise<Result<{ posts: PostWithBacklinks[]; cursor?: string }, string>> => { 25 - const recordsList = await client.listRecords('app.bsky.feed.post', cursor, limit); 26 + const recordsList = await client.listRecords(subject, 'app.bsky.feed.post', cursor, limit); 26 27 if (!recordsList.ok) return err(`can't retrieve posts: ${recordsList.error}`); 27 28 cursor = recordsList.value.cursor; 28 29 const records = recordsList.value.records; ··· 41 42 try { 42 43 const allBacklinks = await Promise.all( 43 44 records.map(async (r): Promise<PostWithBacklinks> => { 44 - const result = await client.getBacklinksUri(r.uri, replySource); 45 + const result = await client.getBacklinks(r.uri, replySource); 45 46 if (!result.ok) throw `cant fetch replies: ${result.error}`; 46 47 const replies = result.value; 47 48 return { ··· 120 121 if (repo === postRepo) return; 121 122 122 123 // get chains that are the same author until we exhaust them 123 - const backlinks = await client.getBacklinksUri(post.uri, replySource); 124 + const backlinks = await client.getBacklinks(post.uri, replySource); 124 125 if (!backlinks.ok) return; 125 126 126 127 const promises = [];
+3
src/lib/at/index.ts
··· 1 1 import { settings } from '$lib/settings'; 2 + import type { Did } from '@atcute/lexicons'; 2 3 import { get } from 'svelte/store'; 3 4 4 5 export const slingshotUrl: URL = new URL(get(settings).endpoints.slingshot); 5 6 export const spacedustUrl: URL = new URL(get(settings).endpoints.spacedust); 6 7 export const constellationUrl: URL = new URL(get(settings).endpoints.constellation); 8 + 9 + export const httpToDidWeb = (url: string): Did => `did:web:${new URL(url).hostname}`;
+2
src/lib/index.ts
··· 28 28 export const likeSource: BacklinksSource = 'app.bsky.feed.like:subject.uri'; 29 29 export const repostSource: BacklinksSource = 'app.bsky.feed.repost:subject.uri'; 30 30 export const replySource: BacklinksSource = 'app.bsky.feed.post:reply.parent.uri'; 31 + export const replyRootSource: BacklinksSource = 'app.bsky.feed.post:reply.root.uri'; 32 + export const blockSource: BacklinksSource = 'app.bsky.graph.block:subject'; 31 33 32 34 export const timestampFromCursor = (cursor: string | undefined) => { 33 35 if (!cursor) return undefined;
+128 -68
src/lib/state.svelte.ts
··· 1 1 import { writable } from 'svelte/store'; 2 - import { 3 - AtpClient, 4 - newPublicClient, 5 - type NotificationsStream, 6 - type NotificationsStreamEvent 7 - } from './at/client'; 2 + import { AtpClient, type NotificationsStream, type NotificationsStreamEvent } from './at/client'; 8 3 import { SvelteMap, SvelteDate, SvelteSet } from 'svelte/reactivity'; 9 - import type { Did, Handle, InferOutput, Nsid, RecordKey, ResourceUri } from '@atcute/lexicons'; 4 + import type { Did, Handle, Nsid, RecordKey, ResourceUri } from '@atcute/lexicons'; 10 5 import { fetchPosts, hydratePosts, type PostWithUri } from './at/fetch'; 11 6 import { parseCanonicalResourceUri, type AtprotoDid } from '@atcute/lexicons/syntax'; 12 - import { AppBskyActorProfile, AppBskyFeedPost, type AppBskyGraphFollow } from '@atcute/bluesky'; 13 - import type { ComAtprotoRepoListRecords } from '@atcute/atproto'; 7 + import { 8 + AppBskyActorProfile, 9 + AppBskyFeedPost, 10 + AppBskyGraphBlock, 11 + type AppBskyGraphFollow 12 + } from '@atcute/bluesky'; 14 13 import type { JetstreamSubscription, JetstreamEvent } from '@atcute/jetstream'; 15 14 import { expect, ok } from './result'; 16 15 import type { Backlink, BacklinksSource } from './at/constellation'; 17 16 import { now as tidNow } from '@atcute/tid'; 18 17 import type { Records } from '@atcute/lexicons/ambient'; 19 18 import { 19 + blockSource, 20 20 extractDidFromUri, 21 21 likeSource, 22 + replyRootSource, 22 23 replySource, 23 24 repostSource, 24 25 timestampFromCursor, 25 26 toCanonicalUri 26 27 } from '$lib'; 27 28 import { Router } from './router.svelte'; 29 + import type { Account } from './accounts'; 28 30 29 31 export const notificationStream = writable<NotificationsStream | null>(null); 30 32 export const jetstream = writable<JetstreamSubscription | null>(null); ··· 134 136 >(); 135 137 136 138 export const fetchLinksUntil = async ( 139 + subject: Did, 137 140 client: AtpClient, 138 141 backlinkSource: BacklinksSource, 139 142 timestamp: number = -1 140 143 ) => { 141 - const did = client.user?.did; 142 - if (!did) return; 143 - 144 - let cursorMap = backlinksCursors.get(did); 144 + let cursorMap = backlinksCursors.get(subject); 145 145 if (!cursorMap) { 146 146 cursorMap = new SvelteMap<BacklinksSource, string | undefined>(); 147 - backlinksCursors.set(did, cursorMap); 147 + backlinksCursors.set(subject, cursorMap); 148 148 } 149 149 150 150 const [_collection, source] = backlinkSource.split(':'); ··· 155 155 const cursorTimestamp = timestampFromCursor(cursor); 156 156 if (cursorTimestamp && cursorTimestamp <= timestamp) return; 157 157 158 - console.log(`${did}: fetchLinksUntil`, backlinkSource, cursor, timestamp); 159 - const result = await client.listRecordsUntil(collection, cursor, timestamp); 158 + console.log(`${subject}: fetchLinksUntil`, backlinkSource, cursor, timestamp); 159 + const result = await client.listRecordsUntil(subject, collection, cursor, timestamp); 160 160 161 161 if (!result.ok) { 162 162 console.error('failed to fetch links until', result.error); ··· 237 237 238 238 export const viewClient = new AtpClient(); 239 239 export const clients = new SvelteMap<Did, AtpClient>(); 240 - export const getClient = async (did: Did): Promise<AtpClient> => { 241 - if (!clients.has(did)) clients.set(did, await newPublicClient(did)); 242 - return clients.get(did)!; 243 - }; 244 240 245 241 export const follows = new SvelteMap<Did, SvelteMap<ResourceUri, AppBskyGraphFollow.Main>>(); 246 242 ··· 248 244 did: Did, 249 245 followMap: Iterable<[ResourceUri, AppBskyGraphFollow.Main]> 250 246 ) => { 251 - if (!follows.has(did)) { 252 - follows.set(did, new SvelteMap(followMap)); 247 + let map = follows.get(did)!; 248 + if (!map) { 249 + map = new SvelteMap(followMap); 250 + follows.set(did, map); 253 251 return; 254 252 } 255 - const map = follows.get(did)!; 256 253 for (const [uri, record] of followMap) map.set(uri, record); 257 254 }; 258 255 259 - export const fetchFollows = async (did: AtprotoDid) => { 260 - const client = await getClient(did); 261 - const res = await client.listRecordsUntil('app.bsky.graph.follow'); 262 - if (!res.ok) return; 256 + export const fetchFollows = async ( 257 + account: Account 258 + ): Promise<IteratorObject<AppBskyGraphFollow.Main>> => { 259 + const client = clients.get(account.did)!; 260 + const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.follow'); 261 + if (!res.ok) { 262 + console.error("can't fetch follows:", res.error); 263 + return [].values(); 264 + } 263 265 addFollows( 264 - did, 266 + account.did, 265 267 res.value.records.map((follow) => [follow.uri, follow.value as AppBskyGraphFollow.Main]) 266 268 ); 269 + return res.value.records.values().map((follow) => follow.value as AppBskyGraphFollow.Main); 267 270 }; 268 271 269 272 // this fetches up to three days of posts and interactions for using in following list 270 - export const fetchForInteractions = async (did: AtprotoDid) => { 273 + export const fetchForInteractions = async (client: AtpClient, subject: Did) => { 271 274 const threeDaysAgo = (Date.now() - 3 * 24 * 60 * 60 * 1000) * 1000; 272 275 273 - const client = await getClient(did); 274 - const res = await client.listRecordsUntil('app.bsky.feed.post', undefined, threeDaysAgo); 276 + const res = await client.listRecordsUntil(subject, 'app.bsky.feed.post', undefined, threeDaysAgo); 275 277 if (!res.ok) return; 276 - addPostsRaw(did, res.value); 278 + const postsWithUri = res.value.records.map( 279 + (post) => 280 + ({ cid: post.cid, uri: post.uri, record: post.value as AppBskyFeedPost.Main }) as PostWithUri 281 + ); 282 + addPosts(postsWithUri); 277 283 278 284 const cursorTimestamp = timestampFromCursor(res.value.cursor) ?? -1; 279 285 const timestamp = Math.min(cursorTimestamp, threeDaysAgo); 280 - console.log(`${did}: fetchForInteractions`, res.value.cursor, timestamp); 281 - await Promise.all([repostSource].map((s) => fetchLinksUntil(client, s, timestamp))); 286 + console.log(`${subject}: fetchForInteractions`, res.value.cursor, timestamp); 287 + await Promise.all([repostSource].map((s) => fetchLinksUntil(subject, client, s, timestamp))); 288 + }; 289 + 290 + // if did is in set, we have fetched blocks for them already (against logged in users) 291 + export const blockFlags = new SvelteMap<Did, SvelteSet<Did>>(); 292 + 293 + export const fetchBlocked = async (client: AtpClient, subject: Did, blocker: Did) => { 294 + const subjectUri = `at://${subject}` as ResourceUri; 295 + const res = await client.getBacklinks(subjectUri, blockSource, [blocker], 1); 296 + if (!res.ok) return; 297 + if (res.value.total > 0) addBacklinks(subjectUri, blockSource, res.value.records); 298 + }; 299 + 300 + export const fetchBlocks = async (account: Account) => { 301 + const client = clients.get(account.did)!; 302 + const res = await client.listRecordsUntil(account.did, 'app.bsky.graph.block'); 303 + if (!res.ok) return; 304 + for (const block of res.value.records) { 305 + const record = block.value as AppBskyGraphBlock.Main; 306 + const parsedUri = expect(parseCanonicalResourceUri(block.uri)); 307 + addBacklinks(`at://${record.subject}`, blockSource, [ 308 + { 309 + did: parsedUri.repo, 310 + collection: parsedUri.collection, 311 + rkey: parsedUri.rkey 312 + } 313 + ]); 314 + } 282 315 }; 283 316 284 317 export const allPosts = new SvelteMap<Did, SvelteMap<ResourceUri, PostWithUri>>(); ··· 292 325 return cached ? ok(cached) : undefined; 293 326 }; 294 327 295 - export const addPostsRaw = ( 296 - did: AtprotoDid, 297 - newPosts: InferOutput<ComAtprotoRepoListRecords.mainSchema['output']['schema']> 298 - ) => { 299 - const postsWithUri = newPosts.records.map( 300 - (post) => 301 - ({ cid: post.cid, uri: post.uri, record: post.value as AppBskyFeedPost.Main }) as PostWithUri 302 - ); 303 - addPosts(postsWithUri); 304 - }; 305 - 306 328 export const addPosts = (newPosts: Iterable<PostWithUri>) => { 307 329 for (const post of newPosts) { 308 330 const parsedUri = expect(parseCanonicalResourceUri(post.uri)); ··· 313 335 } 314 336 posts.set(post.uri, post); 315 337 if (post.record.reply) { 316 - addBacklinks(post.record.reply.parent.uri, replySource, [ 317 - { 318 - did: parsedUri.repo, 319 - collection: parsedUri.collection, 320 - rkey: parsedUri.rkey 321 - } 322 - ]); 338 + const link = { 339 + did: parsedUri.repo, 340 + collection: parsedUri.collection, 341 + rkey: parsedUri.rkey 342 + }; 343 + addBacklinks(post.record.reply.parent.uri, replySource, [link]); 344 + addBacklinks(post.record.reply.root.uri, replyRootSource, [link]); 323 345 324 346 // update reply index 325 347 const parentDid = extractDidFromUri(post.record.reply.parent.uri); ··· 363 385 }; 364 386 365 387 export const fetchTimeline = async ( 366 - did: AtprotoDid, 388 + client: AtpClient, 389 + subject: AtprotoDid, 367 390 limit: number = 6, 368 391 withBacklinks: boolean = true 369 392 ) => { 370 - const targetClient = await getClient(did); 371 - 372 - const cursor = postCursors.get(did); 393 + const cursor = postCursors.get(subject); 373 394 if (cursor && cursor.end) return; 374 395 375 - const accPosts = await fetchPosts(targetClient, cursor?.value, limit, withBacklinks); 376 - if (!accPosts.ok) throw `cant fetch posts ${did}: ${accPosts.error}`; 396 + const accPosts = await fetchPosts(subject, client, cursor?.value, limit, withBacklinks); 397 + if (!accPosts.ok) throw `cant fetch posts ${subject}: ${accPosts.error}`; 377 398 378 399 // if the cursor is undefined, we've reached the end of the timeline 379 - postCursors.set(did, { value: accPosts.value.cursor, end: !accPosts.value.cursor }); 380 - const hydrated = await hydratePosts(targetClient, did, accPosts.value.posts, hydrateCacheFn); 381 - if (!hydrated.ok) throw `cant hydrate posts ${did}: ${hydrated.error}`; 400 + const newCursor = { value: accPosts.value.cursor, end: !accPosts.value.cursor }; 401 + postCursors.set(subject, newCursor); 402 + const hydrated = await hydratePosts(client, subject, accPosts.value.posts, hydrateCacheFn); 403 + if (!hydrated.ok) throw `cant hydrate posts ${subject}: ${hydrated.error}`; 382 404 383 405 addPosts(hydrated.value.values()); 384 - addTimeline(did, hydrated.value.keys()); 406 + addTimeline(subject, hydrated.value.keys()); 385 407 386 - console.log(`${did}: fetchTimeline`, accPosts.value.cursor); 408 + // we only need to check blocks if the user is the subject (ie. logged in) 409 + if (client.user?.did === subject) { 410 + // check if any of the post authors block the user 411 + // eslint-disable-next-line svelte/prefer-svelte-reactivity 412 + let distinctDids = new Set(hydrated.value.keys().map((uri) => extractDidFromUri(uri)!)); 413 + distinctDids.delete(subject); // dont need to check if user blocks themselves 414 + const alreadyFetched = blockFlags.get(subject); 415 + if (alreadyFetched) distinctDids = distinctDids.difference(alreadyFetched); 416 + if (distinctDids.size > 0) 417 + await Promise.all(distinctDids.values().map((did) => fetchBlocked(client, subject, did))); 418 + } 419 + 420 + console.log(`${subject}: fetchTimeline`, accPosts.value.cursor); 421 + return newCursor; 387 422 }; 388 423 389 - export const fetchInteractionsUntil = async (client: AtpClient, did: Did) => { 424 + export const fetchInteractionsToTimelineEnd = async (client: AtpClient, did: Did) => { 390 425 const cursor = postCursors.get(did); 391 426 if (!cursor) return; 392 427 const timestamp = timestampFromCursor(cursor.value); 393 - await Promise.all([likeSource, repostSource].map((s) => fetchLinksUntil(client, s, timestamp))); 428 + await Promise.all( 429 + [likeSource, repostSource].map((s) => fetchLinksUntil(did, client, s, timestamp)) 430 + ); 431 + }; 432 + 433 + export const fetchInitial = async (account: Account) => { 434 + const client = clients.get(account.did)!; 435 + await Promise.all([ 436 + fetchBlocks(account), 437 + fetchForInteractions(client, account.did), 438 + fetchFollows(account).then((follows) => 439 + Promise.all(follows.map((follow) => fetchForInteractions(client, follow.subject)) ?? []) 440 + ) 441 + ]); 394 442 }; 395 443 396 444 export const handleJetstreamEvent = async (event: JetstreamEvent) => { ··· 407 455 cid: commit.cid 408 456 } 409 457 ]; 410 - const client = await getClient(did); 458 + const client = clients.get(did) ?? viewClient; 411 459 const hydrated = await hydratePosts(client, did, posts, hydrateCacheFn); 412 460 if (!hydrated.ok) { 413 461 console.error(`cant hydrate posts ${did}: ${hydrated.error}`); ··· 416 464 addPosts(hydrated.value.values()); 417 465 addTimeline(did, hydrated.value.keys()); 418 466 } else if (commit.operation === 'delete') { 419 - allPosts.get(did)?.delete(uri); 467 + const post = allPosts.get(did)?.get(uri); 468 + if (post) { 469 + allPosts.get(did)?.delete(uri); 470 + // remove from timeline 471 + timelines.get(did)?.delete(uri); 472 + // remove reply from index 473 + const subjectDid = extractDidFromUri(post.record.reply?.parent.uri ?? ''); 474 + if (subjectDid) replyIndex.get(subjectDid)?.delete(uri); 475 + } 420 476 } 421 477 } 422 478 }; ··· 424 480 const handlePostNotification = async (event: NotificationsStreamEvent & { type: 'message' }) => { 425 481 const parsedSubjectUri = expect(parseCanonicalResourceUri(event.data.link.subject)); 426 482 const did = parsedSubjectUri.repo as AtprotoDid; 427 - const client = await getClient(did); 483 + const client = clients.get(did); 484 + if (!client) { 485 + console.error(`${did}: cant handle post notification, client not found !?`); 486 + return; 487 + } 428 488 const subjectPost = await client.getRecord( 429 489 AppBskyFeedPost.mainSchema, 430 490 did,
+10 -19
src/routes/[...catchall]/+page.svelte
··· 12 12 import { 13 13 clients, 14 14 postCursors, 15 - fetchForInteractions, 16 - fetchFollows, 17 15 follows, 18 16 notificationStream, 19 17 viewClient, ··· 22 20 handleNotification, 23 21 addPosts, 24 22 addTimeline, 25 - router 23 + router, 24 + fetchInitial 26 25 } from '$lib/state.svelte'; 27 26 import { get } from 'svelte/store'; 28 27 import Icon from '@iconify/svelte'; ··· 113 112 'app.bsky.feed.post:embed.record.uri', 114 113 'app.bsky.feed.repost:subject.uri', 115 114 'app.bsky.feed.like:subject.uri', 116 - 'app.bsky.graph.follow:subject' 115 + 'app.bsky.graph.follow:subject', 116 + 'app.bsky.graph.block:subject' 117 117 ) 118 118 ); 119 119 }); ··· 144 144 } 145 145 if (!$accounts.some((account) => account.did === selectedDid)) selectedDid = $accounts[0].did; 146 146 // console.log('onMount selectedDid', selectedDid); 147 - Promise.all($accounts.map(loginAccount)).then(() => { 148 - $accounts.forEach((account) => { 149 - fetchFollows(account.did).then(() => 150 - follows 151 - .get(account.did) 152 - ?.forEach((follow) => fetchForInteractions(follow.subject as AtprotoDid)) 153 - ); 154 - fetchForInteractions(account.did); 155 - }); 156 - }); 147 + Promise.all($accounts.map(loginAccount)).then(() => $accounts.forEach(fetchInitial)); 157 148 } else { 158 149 selectedDid = null; 159 150 } ··· 163 154 164 155 $effect(() => { 165 156 const wantedDids: Did[] = ['did:web:guestbook.gaze.systems']; 166 - 167 - for (const followMap of follows.values()) 168 - for (const follow of followMap.values()) wantedDids.push(follow.subject); 169 - for (const account of $accounts) wantedDids.push(account.did); 170 - 157 + const followDids = follows 158 + .values() 159 + .flatMap((followMap) => followMap.values().map((follow) => follow.subject)); 160 + const accountDids = $accounts.values().map((account) => account.did); 161 + wantedDids.push(...followDids, ...accountDids); 171 162 // console.log('updating jetstream options:', wantedDids); 172 163 $jetstream?.updateOptions({ wantedDids }); 173 164 });