Encrypted, ephemeral, private memos on atproto

feat(consumer): list items from PDS

graham.systems 46073a57 17100ea8

verified
Changed files
+37 -4
packages
consumer
+37 -4
packages/consumer/mod.ts
··· 1 1 import { produceRequirements } from "@cistern/shared"; 2 2 import { generateKeys } from "@cistern/crypto"; 3 3 import { generateRandomName } from "@puregarlic/randimal"; 4 + import { parse } from "@atcute/lexicons"; 4 5 import type { Did } from "@atcute/lexicons/syntax"; 5 6 import type { Client, CredentialManager } from "@atcute/client"; 6 - import type { AppCisternLexiconPubkey } from "@cistern/lexicon"; 7 + import { 8 + AppCisternLexiconItem, 9 + type AppCisternLexiconPubkey, 10 + } from "@cistern/lexicon"; 7 11 import type { ConsumerOptions, ConsumerParams, LocalKeyPair } from "./types.ts"; 8 12 9 13 import type {} from "@atcute/atproto"; ··· 80 84 } 81 85 82 86 /** 83 - * Returns an async iterator that returns pages of the user's items from their PDS. 84 - * @todo List items from repo 87 + * Asynchronously iterate through items in the user's PDS 85 88 */ 86 - async listItems() {} 89 + async *listItems(): AsyncIterator< 90 + AppCisternLexiconItem.Main, 91 + void, 92 + undefined 93 + > { 94 + let cursor: string | undefined; 95 + 96 + while (true) { 97 + const res = await this.rpc.get("com.atproto.repo.listRecords", { 98 + params: { 99 + collection: "app.cistern.lexicon.item", 100 + repo: this.did, 101 + cursor, 102 + }, 103 + }); 104 + 105 + if (!res.ok) { 106 + throw new Error( 107 + `failed to list items: ${res.status} ${res.data.error}`, 108 + ); 109 + } 110 + 111 + if (res.data.cursor) cursor = res.data.cursor; 112 + 113 + for (const record of res.data.records) { 114 + yield parse(AppCisternLexiconItem.mainSchema, record.value); 115 + } 116 + 117 + if (!cursor) return; 118 + } 119 + } 87 120 88 121 /** 89 122 * Subscribes to the Jetstreams for the user's items.