Small tool to make it convenient to deduplicate records in collections
atproto go cli
at main 4.4 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "fmt" 7 "os" 8 "sync" 9 10 "github.com/bluesky-social/indigo/api/atproto" 11 "github.com/bluesky-social/indigo/atproto/identity" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "github.com/bluesky-social/indigo/xrpc" 14) 15 16type Record struct { 17 URI string `json:"uri"` 18 CID string `json:"cid"` 19 Value json.RawMessage `json:"value"` 20} 21 22type ListRecordsOutput struct { 23 Records []struct { 24 URI string `json:"uri"` 25 CID string `json:"cid"` 26 Value json.RawMessage `json:"value"` 27 } `json:"records"` 28 Cursor string `json:"cursor"` 29} 30 31type ListClient struct { 32 client *xrpc.Client 33 did string 34} 35 36type DeleteClient struct { 37 client *xrpc.Client 38 did string 39} 40 41func fetchRecords(ctx context.Context, handle, collection string, limit int, cursor string) (<-chan Record, <-chan error) { 42 recordCh := make(chan Record, 100) 43 errCh := make(chan error, 1) 44 45 go func() { 46 defer close(recordCh) 47 defer close(errCh) 48 49 currentCursor := cursor 50 listClient, err := createListClient(ctx, handle) 51 if err != nil { 52 errCh <- err 53 return 54 } 55 56 for { 57 params := map[string]any{ 58 "repo": listClient.did, 59 "collection": collection, 60 "limit": limit, 61 } 62 if currentCursor != "" { 63 params["cursor"] = currentCursor 64 } 65 66 var output ListRecordsOutput 67 err = listClient.client.Do(ctx, xrpc.Query, "", "com.atproto.repo.listRecords", params, nil, &output) 68 if err != nil { 69 errCh <- fmt.Errorf("failed to list records: %w", err) 70 return 71 } 72 73 for _, r := range output.Records { 74 recordCh <- Record{ 75 URI: r.URI, 76 CID: r.CID, 77 Value: r.Value, 78 } 79 } 80 81 if output.Cursor == "" { 82 break 83 } 84 85 currentCursor = output.Cursor 86 } 87 }() 88 89 return recordCh, errCh 90} 91 92func deleteRecords(ctx context.Context, client *xrpc.Client, uris []string) error { 93 errCh := make(chan error, len(uris)) 94 wg := sync.WaitGroup{} 95 96 for _, uri := range uris { 97 wg.Go(func() { 98 errCh <- deleteSingleRecord(ctx, client, uri) 99 }) 100 } 101 102 wg.Wait() 103 close(errCh) 104 105 for err := range errCh { 106 if err != nil { 107 fmt.Fprintf(os.Stderr, "Error: %v\n", err) 108 } 109 } 110 111 return nil 112} 113 114func deleteSingleRecord(ctx context.Context, client *xrpc.Client, uriStr string) error { 115 atUri, err := syntax.ParseATURI(uriStr) 116 if err != nil { 117 return fmt.Errorf("invalid URI: %w", err) 118 } 119 120 input := &atproto.RepoDeleteRecord_Input{ 121 Repo: atUri.Authority().String(), 122 Collection: atUri.Collection().String(), 123 Rkey: atUri.RecordKey().String(), 124 } 125 126 _, err = atproto.RepoDeleteRecord(ctx, client, input) 127 if err != nil { 128 return fmt.Errorf("failed to delete record: %w", err) 129 } 130 131 fmt.Printf("Deleted: %s\n", uriStr) 132 return nil 133} 134 135func createAuthenticatedClient(ctx context.Context, handle, password string) (*DeleteClient, error) { 136 atid, err := syntax.ParseAtIdentifier(handle) 137 if err != nil { 138 return nil, fmt.Errorf("invalid identifier: %w", err) 139 } 140 141 dir := identity.DefaultDirectory() 142 ident, err := dir.Lookup(ctx, *atid) 143 if err != nil { 144 return nil, fmt.Errorf("failed to lookup identity: %w", err) 145 } 146 147 pdsEndpoint := ident.PDSEndpoint() 148 if pdsEndpoint == "" { 149 return nil, fmt.Errorf("no PDS endpoint found for %s", handle) 150 } 151 152 client := &xrpc.Client{ 153 Host: pdsEndpoint, 154 } 155 156 sessionInput := &atproto.ServerCreateSession_Input{ 157 Identifier: handle, 158 Password: password, 159 } 160 161 session, err := atproto.ServerCreateSession(ctx, client, sessionInput) 162 if err != nil { 163 return nil, fmt.Errorf("failed to create session: %w", err) 164 } 165 166 client.Auth = &xrpc.AuthInfo{ 167 AccessJwt: session.AccessJwt, 168 RefreshJwt: session.RefreshJwt, 169 Handle: session.Handle, 170 Did: session.Did, 171 } 172 173 return &DeleteClient{ 174 client: client, 175 did: session.Did, 176 }, nil 177} 178 179func createListClient(ctx context.Context, handle string) (*ListClient, error) { 180 atid, err := syntax.ParseAtIdentifier(handle) 181 if err != nil { 182 return nil, fmt.Errorf("invalid identifier: %w", err) 183 } 184 185 dir := identity.DefaultDirectory() 186 ident, err := dir.Lookup(ctx, *atid) 187 if err != nil { 188 return nil, fmt.Errorf("failed to lookup identity: %w", err) 189 } 190 191 pdsEndpoint := ident.PDSEndpoint() 192 if pdsEndpoint == "" { 193 return nil, fmt.Errorf("no PDS endpoint found for %s", handle) 194 } 195 196 client := &xrpc.Client{ 197 Host: pdsEndpoint, 198 } 199 200 return &ListClient{ 201 client: client, 202 did: ident.DID.String(), 203 }, nil 204}