porting all github actions from bluesky-social/indigo to tangled CI
at main 3.9 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "log/slog" 8 "net/http" 9 "net/url" 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 appbsky "github.com/bluesky-social/indigo/api/bsky" 13 "github.com/bluesky-social/indigo/atproto/syntax" 14 "github.com/bluesky-social/indigo/events/schedulers/parallel" 15 lexutil "github.com/bluesky-social/indigo/lex/util" 16 17 "github.com/bluesky-social/indigo/events" 18 "github.com/bluesky-social/indigo/repo" 19 "github.com/bluesky-social/indigo/repomgr" 20 "github.com/carlmjohnson/versioninfo" 21 "github.com/gorilla/websocket" 22) 23 24func RunFirehoseConsumer(ctx context.Context, logger *slog.Logger, relayHost string, postCallback func(context.Context, syntax.DID, syntax.RecordKey, appbsky.FeedPost) error) error { 25 26 dialer := websocket.DefaultDialer 27 u, err := url.Parse(relayHost) 28 if err != nil { 29 return fmt.Errorf("invalid relayHost URI: %w", err) 30 } 31 // always continue at the current cursor offset (don't provide cursor query param) 32 u.Path = "xrpc/com.atproto.sync.subscribeRepos" 33 logger.Info("subscribing to repo event stream", "upstream", relayHost) 34 con, _, err := dialer.Dial(u.String(), http.Header{ 35 "User-Agent": []string{fmt.Sprintf("beemo/%s", versioninfo.Short())}, 36 }) 37 if err != nil { 38 return fmt.Errorf("subscribing to firehose failed (dialing): %w", err) 39 } 40 41 rsc := &events.RepoStreamCallbacks{ 42 RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error { 43 return HandleRepoCommit(ctx, logger, evt, postCallback) 44 }, 45 // NOTE: could add other callbacks as needed 46 } 47 48 var scheduler events.Scheduler 49 // use parallel scheduler 50 parallelism := 4 51 scheduler = parallel.NewScheduler( 52 parallelism, 53 1000, 54 relayHost, 55 rsc.EventHandler, 56 ) 57 logger.Info("beemo firehose scheduler configured", "scheduler", "parallel", "workers", parallelism) 58 59 return events.HandleRepoStream(ctx, con, scheduler, logger) 60} 61 62// NOTE: for now, this function basically never errors, just logs and returns nil. Should think through error processing better. 63func HandleRepoCommit(ctx context.Context, logger *slog.Logger, evt *comatproto.SyncSubscribeRepos_Commit, postCallback func(context.Context, syntax.DID, syntax.RecordKey, appbsky.FeedPost) error) error { 64 65 logger = logger.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq) 66 logger.Debug("received commit event") 67 68 if evt.TooBig { 69 logger.Warn("skipping tooBig events for now") 70 return nil 71 } 72 73 did, err := syntax.ParseDID(evt.Repo) 74 if err != nil { 75 logger.Error("bad DID syntax in event", "err", err) 76 return nil 77 } 78 79 rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks)) 80 if err != nil { 81 logger.Error("failed to read repo from car", "err", err) 82 return nil 83 } 84 85 for _, op := range evt.Ops { 86 logger = logger.With("eventKind", op.Action, "path", op.Path) 87 collection, rkey, err := syntax.ParseRepoPath(op.Path) 88 if err != nil { 89 logger.Error("invalid path in repo op") 90 return nil 91 } 92 93 ek := repomgr.EventKind(op.Action) 94 switch ek { 95 case repomgr.EvtKindCreateRecord, repomgr.EvtKindUpdateRecord: 96 // read the record bytes from blocks, and verify CID 97 rc, recordCBOR, err := rr.GetRecordBytes(ctx, op.Path) 98 if err != nil { 99 logger.Error("reading record from event blocks (CAR)", "err", err) 100 continue 101 } 102 if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid { 103 logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid) 104 continue 105 } 106 107 switch collection { 108 case "app.bsky.feed.post": 109 var post appbsky.FeedPost 110 if err := post.UnmarshalCBOR(bytes.NewReader(*recordCBOR)); err != nil { 111 logger.Error("failed to parse app.bsky.feed.post record", "err", err) 112 continue 113 } 114 if err := postCallback(ctx, did, rkey, post); err != nil { 115 logger.Error("failed to process post record", "err", err) 116 continue 117 } 118 } 119 120 default: 121 // ignore other events 122 } 123 } 124 125 return nil 126}