An atproto PDS written in Go
103
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.0 120 lines 2.7 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "log/slog" 8 "net/http" 9 "net/url" 10 "strings" 11 12 "github.com/bluesky-social/indigo/api/atproto" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events" 15 "github.com/bluesky-social/indigo/events/schedulers/parallel" 16 lexutil "github.com/bluesky-social/indigo/lex/util" 17 "github.com/bluesky-social/indigo/repo" 18 "github.com/bluesky-social/indigo/repomgr" 19 "github.com/gorilla/websocket" 20) 21 22func main() { 23 runFirehoseConsumer("ws://localhost:8080") 24} 25 26func runFirehoseConsumer(relayHost string) error { 27 dialer := websocket.DefaultDialer 28 u, err := url.Parse("wss://cocoon.hailey.at") 29 if err != nil { 30 return fmt.Errorf("invalid relayHost: %w", err) 31 } 32 33 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 34 conn, _, err := dialer.Dial(u.String(), http.Header{ 35 "User-Agent": []string{"cocoon-test/0.0.0"}, 36 }) 37 if err != nil { 38 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 39 } 40 41 rsc := &events.RepoStreamCallbacks{ 42 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 43 fmt.Println(evt.Repo) 44 return handleRepoCommit(evt) 45 }, 46 RepoIdentity: func(evt *atproto.SyncSubscribeRepos_Identity) error { 47 fmt.Println(evt.Did, evt.Handle) 48 return nil 49 }, 50 } 51 52 var scheduler events.Scheduler 53 parallelism := 700 54 scheduler = parallel.NewScheduler(parallelism, 1000, relayHost, rsc.EventHandler) 55 56 return events.HandleRepoStream(context.TODO(), conn, scheduler, slog.Default()) 57} 58 59func splitRepoPath(path string) (syntax.NSID, syntax.RecordKey, error) { 60 parts := strings.SplitN(path, "/", 3) 61 if len(parts) != 2 { 62 return "", "", fmt.Errorf("invalid record path: %s", path) 63 } 64 collection, err := syntax.ParseNSID(parts[0]) 65 if err != nil { 66 return "", "", err 67 } 68 rkey, err := syntax.ParseRecordKey(parts[1]) 69 if err != nil { 70 return "", "", err 71 } 72 return collection, rkey, nil 73} 74 75func handleRepoCommit(evt *atproto.SyncSubscribeRepos_Commit) error { 76 if evt.TooBig { 77 return nil 78 } 79 80 did, err := syntax.ParseDID(evt.Repo) 81 if err != nil { 82 panic(err) 83 } 84 85 rr, err := repo.ReadRepoFromCar(context.TODO(), bytes.NewReader(evt.Blocks)) 86 if err != nil { 87 panic(err) 88 } 89 90 for _, op := range evt.Ops { 91 collection, rkey, err := splitRepoPath(op.Path) 92 if err != nil { 93 panic(err) 94 } 95 96 ek := repomgr.EventKind(op.Action) 97 98 go func() { 99 switch ek { 100 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 101 rc, recordCBOR, err := rr.GetRecordBytes(context.TODO(), op.Path) 102 if err != nil { 103 panic(err) 104 } 105 106 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 107 panic("nocid") 108 } 109 110 _ = collection 111 _ = rkey 112 _ = recordCBOR 113 _ = did 114 115 } 116 }() 117 } 118 119 return nil 120}