Small tool to make it convenient to deduplicate records in collections
atproto
go
cli
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "fmt"
7 "os"
8 "sync"
9
10 "github.com/bluesky-social/indigo/api/atproto"
11 "github.com/bluesky-social/indigo/atproto/identity"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "github.com/bluesky-social/indigo/xrpc"
14)
15
16type Record struct {
17 URI string `json:"uri"`
18 CID string `json:"cid"`
19 Value json.RawMessage `json:"value"`
20}
21
22type ListRecordsOutput struct {
23 Records []struct {
24 URI string `json:"uri"`
25 CID string `json:"cid"`
26 Value json.RawMessage `json:"value"`
27 } `json:"records"`
28 Cursor string `json:"cursor"`
29}
30
31type ListClient struct {
32 client *xrpc.Client
33 did string
34}
35
36type DeleteClient struct {
37 client *xrpc.Client
38 did string
39}
40
41func fetchRecords(ctx context.Context, handle, collection string, limit int, cursor string) (<-chan Record, <-chan error) {
42 recordCh := make(chan Record, 100)
43 errCh := make(chan error, 1)
44
45 go func() {
46 defer close(recordCh)
47 defer close(errCh)
48
49 currentCursor := cursor
50 listClient, err := createListClient(ctx, handle)
51 if err != nil {
52 errCh <- err
53 return
54 }
55
56 for {
57 params := map[string]any{
58 "repo": listClient.did,
59 "collection": collection,
60 "limit": limit,
61 }
62 if currentCursor != "" {
63 params["cursor"] = currentCursor
64 }
65
66 var output ListRecordsOutput
67 err = listClient.client.Do(ctx, xrpc.Query, "", "com.atproto.repo.listRecords", params, nil, &output)
68 if err != nil {
69 errCh <- fmt.Errorf("failed to list records: %w", err)
70 return
71 }
72
73 for _, r := range output.Records {
74 recordCh <- Record{
75 URI: r.URI,
76 CID: r.CID,
77 Value: r.Value,
78 }
79 }
80
81 if output.Cursor == "" {
82 break
83 }
84
85 currentCursor = output.Cursor
86 }
87 }()
88
89 return recordCh, errCh
90}
91
92func deleteRecords(ctx context.Context, client *xrpc.Client, uris []string) error {
93 errCh := make(chan error, len(uris))
94 wg := sync.WaitGroup{}
95
96 for _, uri := range uris {
97 wg.Go(func() {
98 errCh <- deleteSingleRecord(ctx, client, uri)
99 })
100 }
101
102 wg.Wait()
103 close(errCh)
104
105 for err := range errCh {
106 if err != nil {
107 fmt.Fprintf(os.Stderr, "Error: %v\n", err)
108 }
109 }
110
111 return nil
112}
113
114func deleteSingleRecord(ctx context.Context, client *xrpc.Client, uriStr string) error {
115 atUri, err := syntax.ParseATURI(uriStr)
116 if err != nil {
117 return fmt.Errorf("invalid URI: %w", err)
118 }
119
120 input := &atproto.RepoDeleteRecord_Input{
121 Repo: atUri.Authority().String(),
122 Collection: atUri.Collection().String(),
123 Rkey: atUri.RecordKey().String(),
124 }
125
126 _, err = atproto.RepoDeleteRecord(ctx, client, input)
127 if err != nil {
128 return fmt.Errorf("failed to delete record: %w", err)
129 }
130
131 fmt.Printf("Deleted: %s\n", uriStr)
132 return nil
133}
134
135func createAuthenticatedClient(ctx context.Context, handle, password string) (*DeleteClient, error) {
136 atid, err := syntax.ParseAtIdentifier(handle)
137 if err != nil {
138 return nil, fmt.Errorf("invalid identifier: %w", err)
139 }
140
141 dir := identity.DefaultDirectory()
142 ident, err := dir.Lookup(ctx, *atid)
143 if err != nil {
144 return nil, fmt.Errorf("failed to lookup identity: %w", err)
145 }
146
147 pdsEndpoint := ident.PDSEndpoint()
148 if pdsEndpoint == "" {
149 return nil, fmt.Errorf("no PDS endpoint found for %s", handle)
150 }
151
152 client := &xrpc.Client{
153 Host: pdsEndpoint,
154 }
155
156 sessionInput := &atproto.ServerCreateSession_Input{
157 Identifier: handle,
158 Password: password,
159 }
160
161 session, err := atproto.ServerCreateSession(ctx, client, sessionInput)
162 if err != nil {
163 return nil, fmt.Errorf("failed to create session: %w", err)
164 }
165
166 client.Auth = &xrpc.AuthInfo{
167 AccessJwt: session.AccessJwt,
168 RefreshJwt: session.RefreshJwt,
169 Handle: session.Handle,
170 Did: session.Did,
171 }
172
173 return &DeleteClient{
174 client: client,
175 did: session.Did,
176 }, nil
177}
178
179func createListClient(ctx context.Context, handle string) (*ListClient, error) {
180 atid, err := syntax.ParseAtIdentifier(handle)
181 if err != nil {
182 return nil, fmt.Errorf("invalid identifier: %w", err)
183 }
184
185 dir := identity.DefaultDirectory()
186 ident, err := dir.Lookup(ctx, *atid)
187 if err != nil {
188 return nil, fmt.Errorf("failed to lookup identity: %w", err)
189 }
190
191 pdsEndpoint := ident.PDSEndpoint()
192 if pdsEndpoint == "" {
193 return nil, fmt.Errorf("no PDS endpoint found for %s", handle)
194 }
195
196 client := &xrpc.Client{
197 Host: pdsEndpoint,
198 }
199
200 return &ListClient{
201 client: client,
202 did: ident.DID.String(),
203 }, nil
204}