package main import ( "context" "encoding/json" "fmt" "os" "sync" "github.com/bluesky-social/indigo/api/atproto" "github.com/bluesky-social/indigo/atproto/identity" "github.com/bluesky-social/indigo/atproto/syntax" "github.com/bluesky-social/indigo/xrpc" ) type Record struct { URI string `json:"uri"` CID string `json:"cid"` Value json.RawMessage `json:"value"` } type ListRecordsOutput struct { Records []struct { URI string `json:"uri"` CID string `json:"cid"` Value json.RawMessage `json:"value"` } `json:"records"` Cursor string `json:"cursor"` } type ListClient struct { client *xrpc.Client did string } type DeleteClient struct { client *xrpc.Client did string } func fetchRecords(ctx context.Context, handle, collection string, limit int, cursor string) (<-chan Record, <-chan error) { recordCh := make(chan Record, 100) errCh := make(chan error, 1) go func() { defer close(recordCh) defer close(errCh) currentCursor := cursor listClient, err := createListClient(ctx, handle) if err != nil { errCh <- err return } for { params := map[string]any{ "repo": listClient.did, "collection": collection, "limit": limit, } if currentCursor != "" { params["cursor"] = currentCursor } var output ListRecordsOutput err = listClient.client.Do(ctx, xrpc.Query, "", "com.atproto.repo.listRecords", params, nil, &output) if err != nil { errCh <- fmt.Errorf("failed to list records: %w", err) return } for _, r := range output.Records { recordCh <- Record{ URI: r.URI, CID: r.CID, Value: r.Value, } } if output.Cursor == "" { break } currentCursor = output.Cursor } }() return recordCh, errCh } func deleteRecords(ctx context.Context, client *xrpc.Client, uris []string) error { errCh := make(chan error, len(uris)) wg := sync.WaitGroup{} for _, uri := range uris { wg.Go(func() { errCh <- deleteSingleRecord(ctx, client, uri) }) } wg.Wait() close(errCh) for err := range errCh { if err != nil { fmt.Fprintf(os.Stderr, "Error: %v\n", err) } } return nil } func deleteSingleRecord(ctx context.Context, client *xrpc.Client, uriStr string) error { atUri, err := syntax.ParseATURI(uriStr) if err != nil { return fmt.Errorf("invalid URI: %w", err) } input := &atproto.RepoDeleteRecord_Input{ Repo: atUri.Authority().String(), Collection: atUri.Collection().String(), Rkey: atUri.RecordKey().String(), } _, err = atproto.RepoDeleteRecord(ctx, client, input) if err != nil { return fmt.Errorf("failed to delete record: %w", err) } fmt.Printf("Deleted: %s\n", uriStr) return nil } func createAuthenticatedClient(ctx context.Context, handle, password string) (*DeleteClient, error) { atid, err := syntax.ParseAtIdentifier(handle) if err != nil { return nil, fmt.Errorf("invalid identifier: %w", err) } dir := identity.DefaultDirectory() ident, err := dir.Lookup(ctx, *atid) if err != nil { return nil, fmt.Errorf("failed to lookup identity: %w", err) } pdsEndpoint := ident.PDSEndpoint() if pdsEndpoint == "" { return nil, fmt.Errorf("no PDS endpoint found for %s", handle) } client := &xrpc.Client{ Host: pdsEndpoint, } sessionInput := &atproto.ServerCreateSession_Input{ Identifier: handle, Password: password, } session, err := atproto.ServerCreateSession(ctx, client, sessionInput) if err != nil { return nil, fmt.Errorf("failed to create session: %w", err) } client.Auth = &xrpc.AuthInfo{ AccessJwt: session.AccessJwt, RefreshJwt: session.RefreshJwt, Handle: session.Handle, Did: session.Did, } return &DeleteClient{ client: client, did: session.Did, }, nil } func createListClient(ctx context.Context, handle string) (*ListClient, error) { atid, err := syntax.ParseAtIdentifier(handle) if err != nil { return nil, fmt.Errorf("invalid identifier: %w", err) } dir := identity.DefaultDirectory() ident, err := dir.Lookup(ctx, *atid) if err != nil { return nil, fmt.Errorf("failed to lookup identity: %w", err) } pdsEndpoint := ident.PDSEndpoint() if pdsEndpoint == "" { return nil, fmt.Errorf("no PDS endpoint found for %s", handle) } client := &xrpc.Client{ Host: pdsEndpoint, } return &ListClient{ client: client, did: ident.DID.String(), }, nil }