···1919 - name: Set up Go tooling
2020 uses: actions/setup-go@v4
2121 with:
2222- go-version: "1.22"
2222+ go-version: "1.23"
2323 - name: Build
2424 run: make build
2525 - name: Test
···3232 - name: Set up Go tooling
3333 uses: actions/setup-go@v4
3434 with:
3535- go-version: "1.22"
3535+ go-version: "1.23"
3636 - name: Lint
3737 run: make lint
+1
HACKING.md
···1414- `cmd/supercollider`: event stream load generation tool
1515- `cmd/sonar`: event stream monitoring tool
1616- `cmd/hepa`: auto-moderation rule engine service
1717+- `cmd/rainbow`: firehose fanout service
1718- `gen`: dev tool to run CBOR type codegen
18191920Packages:
+4
api/agnostic/doc.go
···11+// Package indigo/api/agnositc provides schema-agnostic helpers for fetching records from the network.
22+//
33+// These are variants of endpoints in indigo/api/atproto.
44+package agnostic
+189
api/agnostic/repoapplyWrites.go
···11+// Copied from indigo:api/atproto/repoapplyWrites.go
22+33+package agnostic
44+55+// schema: com.atproto.repo.applyWrites
66+77+import (
88+ "context"
99+ "encoding/json"
1010+ "fmt"
1111+1212+ "github.com/bluesky-social/indigo/lex/util"
1313+ "github.com/bluesky-social/indigo/xrpc"
1414+)
1515+1616+// RepoApplyWrites_Create is a "create" in the com.atproto.repo.applyWrites schema.
1717+//
1818+// Operation which creates a new record.
1919+//
2020+// RECORDTYPE: RepoApplyWrites_Create
2121+type RepoApplyWrites_Create struct {
2222+ LexiconTypeID string `json:"$type,const=com.atproto.repo.applyWrites#create" cborgen:"$type,const=com.atproto.repo.applyWrites#create"`
2323+ Collection string `json:"collection" cborgen:"collection"`
2424+ Rkey *string `json:"rkey,omitempty" cborgen:"rkey,omitempty"`
2525+ Value *json.RawMessage `json:"value" cborgen:"value"`
2626+}
2727+2828+// RepoApplyWrites_CreateResult is a "createResult" in the com.atproto.repo.applyWrites schema.
2929+//
3030+// RECORDTYPE: RepoApplyWrites_CreateResult
3131+type RepoApplyWrites_CreateResult struct {
3232+ LexiconTypeID string `json:"$type,const=com.atproto.repo.applyWrites#createResult" cborgen:"$type,const=com.atproto.repo.applyWrites#createResult"`
3333+ Cid string `json:"cid" cborgen:"cid"`
3434+ Uri string `json:"uri" cborgen:"uri"`
3535+ ValidationStatus *string `json:"validationStatus,omitempty" cborgen:"validationStatus,omitempty"`
3636+}
3737+3838+// RepoApplyWrites_Delete is a "delete" in the com.atproto.repo.applyWrites schema.
3939+//
4040+// Operation which deletes an existing record.
4141+//
4242+// RECORDTYPE: RepoApplyWrites_Delete
4343+type RepoApplyWrites_Delete struct {
4444+ LexiconTypeID string `json:"$type,const=com.atproto.repo.applyWrites#delete" cborgen:"$type,const=com.atproto.repo.applyWrites#delete"`
4545+ Collection string `json:"collection" cborgen:"collection"`
4646+ Rkey string `json:"rkey" cborgen:"rkey"`
4747+}
4848+4949+// RepoApplyWrites_DeleteResult is a "deleteResult" in the com.atproto.repo.applyWrites schema.
5050+//
5151+// RECORDTYPE: RepoApplyWrites_DeleteResult
5252+type RepoApplyWrites_DeleteResult struct {
5353+ LexiconTypeID string `json:"$type,const=com.atproto.repo.applyWrites#deleteResult" cborgen:"$type,const=com.atproto.repo.applyWrites#deleteResult"`
5454+}
5555+5656+// RepoApplyWrites_Input is the input argument to a com.atproto.repo.applyWrites call.
5757+type RepoApplyWrites_Input struct {
5858+ // repo: The handle or DID of the repo (aka, current account).
5959+ Repo string `json:"repo" cborgen:"repo"`
6060+ // swapCommit: If provided, the entire operation will fail if the current repo commit CID does not match this value. Used to prevent conflicting repo mutations.
6161+ SwapCommit *string `json:"swapCommit,omitempty" cborgen:"swapCommit,omitempty"`
6262+ // validate: Can be set to 'false' to skip Lexicon schema validation of record data across all operations, 'true' to require it, or leave unset to validate only for known Lexicons.
6363+ Validate *bool `json:"validate,omitempty" cborgen:"validate,omitempty"`
6464+ Writes []*RepoApplyWrites_Input_Writes_Elem `json:"writes" cborgen:"writes"`
6565+}
6666+6767+type RepoApplyWrites_Input_Writes_Elem struct {
6868+ RepoApplyWrites_Create *RepoApplyWrites_Create
6969+ RepoApplyWrites_Update *RepoApplyWrites_Update
7070+ RepoApplyWrites_Delete *RepoApplyWrites_Delete
7171+}
7272+7373+func (t *RepoApplyWrites_Input_Writes_Elem) MarshalJSON() ([]byte, error) {
7474+ if t.RepoApplyWrites_Create != nil {
7575+ t.RepoApplyWrites_Create.LexiconTypeID = "com.atproto.repo.applyWrites#create"
7676+ return json.Marshal(t.RepoApplyWrites_Create)
7777+ }
7878+ if t.RepoApplyWrites_Update != nil {
7979+ t.RepoApplyWrites_Update.LexiconTypeID = "com.atproto.repo.applyWrites#update"
8080+ return json.Marshal(t.RepoApplyWrites_Update)
8181+ }
8282+ if t.RepoApplyWrites_Delete != nil {
8383+ t.RepoApplyWrites_Delete.LexiconTypeID = "com.atproto.repo.applyWrites#delete"
8484+ return json.Marshal(t.RepoApplyWrites_Delete)
8585+ }
8686+ return nil, fmt.Errorf("cannot marshal empty enum")
8787+}
8888+func (t *RepoApplyWrites_Input_Writes_Elem) UnmarshalJSON(b []byte) error {
8989+ typ, err := util.TypeExtract(b)
9090+ if err != nil {
9191+ return err
9292+ }
9393+9494+ switch typ {
9595+ case "com.atproto.repo.applyWrites#create":
9696+ t.RepoApplyWrites_Create = new(RepoApplyWrites_Create)
9797+ return json.Unmarshal(b, t.RepoApplyWrites_Create)
9898+ case "com.atproto.repo.applyWrites#update":
9999+ t.RepoApplyWrites_Update = new(RepoApplyWrites_Update)
100100+ return json.Unmarshal(b, t.RepoApplyWrites_Update)
101101+ case "com.atproto.repo.applyWrites#delete":
102102+ t.RepoApplyWrites_Delete = new(RepoApplyWrites_Delete)
103103+ return json.Unmarshal(b, t.RepoApplyWrites_Delete)
104104+105105+ default:
106106+ return fmt.Errorf("closed enums must have a matching value")
107107+ }
108108+}
109109+110110+// RepoApplyWrites_Output is the output of a com.atproto.repo.applyWrites call.
111111+type RepoApplyWrites_Output struct {
112112+ Commit *RepoDefs_CommitMeta `json:"commit,omitempty" cborgen:"commit,omitempty"`
113113+ Results []*RepoApplyWrites_Output_Results_Elem `json:"results,omitempty" cborgen:"results,omitempty"`
114114+}
115115+116116+type RepoApplyWrites_Output_Results_Elem struct {
117117+ RepoApplyWrites_CreateResult *RepoApplyWrites_CreateResult
118118+ RepoApplyWrites_UpdateResult *RepoApplyWrites_UpdateResult
119119+ RepoApplyWrites_DeleteResult *RepoApplyWrites_DeleteResult
120120+}
121121+122122+func (t *RepoApplyWrites_Output_Results_Elem) MarshalJSON() ([]byte, error) {
123123+ if t.RepoApplyWrites_CreateResult != nil {
124124+ t.RepoApplyWrites_CreateResult.LexiconTypeID = "com.atproto.repo.applyWrites#createResult"
125125+ return json.Marshal(t.RepoApplyWrites_CreateResult)
126126+ }
127127+ if t.RepoApplyWrites_UpdateResult != nil {
128128+ t.RepoApplyWrites_UpdateResult.LexiconTypeID = "com.atproto.repo.applyWrites#updateResult"
129129+ return json.Marshal(t.RepoApplyWrites_UpdateResult)
130130+ }
131131+ if t.RepoApplyWrites_DeleteResult != nil {
132132+ t.RepoApplyWrites_DeleteResult.LexiconTypeID = "com.atproto.repo.applyWrites#deleteResult"
133133+ return json.Marshal(t.RepoApplyWrites_DeleteResult)
134134+ }
135135+ return nil, fmt.Errorf("cannot marshal empty enum")
136136+}
137137+func (t *RepoApplyWrites_Output_Results_Elem) UnmarshalJSON(b []byte) error {
138138+ typ, err := util.TypeExtract(b)
139139+ if err != nil {
140140+ return err
141141+ }
142142+143143+ switch typ {
144144+ case "com.atproto.repo.applyWrites#createResult":
145145+ t.RepoApplyWrites_CreateResult = new(RepoApplyWrites_CreateResult)
146146+ return json.Unmarshal(b, t.RepoApplyWrites_CreateResult)
147147+ case "com.atproto.repo.applyWrites#updateResult":
148148+ t.RepoApplyWrites_UpdateResult = new(RepoApplyWrites_UpdateResult)
149149+ return json.Unmarshal(b, t.RepoApplyWrites_UpdateResult)
150150+ case "com.atproto.repo.applyWrites#deleteResult":
151151+ t.RepoApplyWrites_DeleteResult = new(RepoApplyWrites_DeleteResult)
152152+ return json.Unmarshal(b, t.RepoApplyWrites_DeleteResult)
153153+154154+ default:
155155+ return fmt.Errorf("closed enums must have a matching value")
156156+ }
157157+}
158158+159159+// RepoApplyWrites_Update is a "update" in the com.atproto.repo.applyWrites schema.
160160+//
161161+// Operation which updates an existing record.
162162+//
163163+// RECORDTYPE: RepoApplyWrites_Update
164164+type RepoApplyWrites_Update struct {
165165+ LexiconTypeID string `json:"$type,const=com.atproto.repo.applyWrites#update" cborgen:"$type,const=com.atproto.repo.applyWrites#update"`
166166+ Collection string `json:"collection" cborgen:"collection"`
167167+ Rkey string `json:"rkey" cborgen:"rkey"`
168168+ Value *json.RawMessage `json:"value" cborgen:"value"`
169169+}
170170+171171+// RepoApplyWrites_UpdateResult is a "updateResult" in the com.atproto.repo.applyWrites schema.
172172+//
173173+// RECORDTYPE: RepoApplyWrites_UpdateResult
174174+type RepoApplyWrites_UpdateResult struct {
175175+ LexiconTypeID string `json:"$type,const=com.atproto.repo.applyWrites#updateResult" cborgen:"$type,const=com.atproto.repo.applyWrites#updateResult"`
176176+ Cid string `json:"cid" cborgen:"cid"`
177177+ Uri string `json:"uri" cborgen:"uri"`
178178+ ValidationStatus *string `json:"validationStatus,omitempty" cborgen:"validationStatus,omitempty"`
179179+}
180180+181181+// RepoApplyWrites calls the XRPC method "com.atproto.repo.applyWrites".
182182+func RepoApplyWrites(ctx context.Context, c *xrpc.Client, input *RepoApplyWrites_Input) (*RepoApplyWrites_Output, error) {
183183+ var out RepoApplyWrites_Output
184184+ if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.applyWrites", nil, input, &out); err != nil {
185185+ return nil, err
186186+ }
187187+188188+ return &out, nil
189189+}
+2
api/atproto/servercreateSession.go
···12121313// ServerCreateSession_Input is the input argument to a com.atproto.server.createSession call.
1414type ServerCreateSession_Input struct {
1515+ // allowTakendown: When true, instead of throwing error for takendown accounts, a valid response with a narrow scoped token will be returned
1616+ AllowTakendown *bool `json:"allowTakendown,omitempty" cborgen:"allowTakendown,omitempty"`
1517 AuthFactorToken *string `json:"authFactorToken,omitempty" cborgen:"authFactorToken,omitempty"`
1618 // identifier: Handle or other identifier supported by the server for the authenticating user.
1719 Identifier string `json:"identifier" cborgen:"identifier"`
+30
api/atproto/tempaddReservedHandle.go
···11+// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
22+33+package atproto
44+55+// schema: com.atproto.temp.addReservedHandle
66+77+import (
88+ "context"
99+1010+ "github.com/bluesky-social/indigo/xrpc"
1111+)
1212+1313+// TempAddReservedHandle_Input is the input argument to a com.atproto.temp.addReservedHandle call.
1414+type TempAddReservedHandle_Input struct {
1515+ Handle string `json:"handle" cborgen:"handle"`
1616+}
1717+1818+// TempAddReservedHandle_Output is the output of a com.atproto.temp.addReservedHandle call.
1919+type TempAddReservedHandle_Output struct {
2020+}
2121+2222+// TempAddReservedHandle calls the XRPC method "com.atproto.temp.addReservedHandle".
2323+func TempAddReservedHandle(ctx context.Context, c *xrpc.Client, input *TempAddReservedHandle_Input) (*TempAddReservedHandle_Output, error) {
2424+ var out TempAddReservedHandle_Output
2525+ if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.temp.addReservedHandle", nil, input, &out); err != nil {
2626+ return nil, err
2727+ }
2828+2929+ return &out, nil
3030+}
+2
api/bsky/actorgetSuggestions.go
···1414type ActorGetSuggestions_Output struct {
1515 Actors []*ActorDefs_ProfileView `json:"actors" cborgen:"actors"`
1616 Cursor *string `json:"cursor,omitempty" cborgen:"cursor,omitempty"`
1717+ // recId: Snowflake for this recommendation, use when submitting recommendation events.
1818+ RecId *int64 `json:"recId,omitempty" cborgen:"recId,omitempty"`
1719}
18201921// ActorGetSuggestions calls the XRPC method "app.bsky.actor.getSuggestions".
+3-1
api/bsky/graphgetSuggestedFollowsByActor.go
···1313// GraphGetSuggestedFollowsByActor_Output is the output of a app.bsky.graph.getSuggestedFollowsByActor call.
1414type GraphGetSuggestedFollowsByActor_Output struct {
1515 // isFallback: If true, response has fallen-back to generic results, and is not scoped using relativeToDid
1616- IsFallback *bool `json:"isFallback,omitempty" cborgen:"isFallback,omitempty"`
1616+ IsFallback *bool `json:"isFallback,omitempty" cborgen:"isFallback,omitempty"`
1717+ // recId: Snowflake for this recommendation, use when submitting recommendation events.
1818+ RecId *int64 `json:"recId,omitempty" cborgen:"recId,omitempty"`
1719 Suggestions []*ActorDefs_ProfileView `json:"suggestions" cborgen:"suggestions"`
1820}
1921
+35
api/bsky/graphsearchStarterPacks.go
···11+// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
22+33+package bsky
44+55+// schema: app.bsky.graph.searchStarterPacks
66+77+import (
88+ "context"
99+1010+ "github.com/bluesky-social/indigo/xrpc"
1111+)
1212+1313+// GraphSearchStarterPacks_Output is the output of a app.bsky.graph.searchStarterPacks call.
1414+type GraphSearchStarterPacks_Output struct {
1515+ Cursor *string `json:"cursor,omitempty" cborgen:"cursor,omitempty"`
1616+ StarterPacks []*GraphDefs_StarterPackViewBasic `json:"starterPacks" cborgen:"starterPacks"`
1717+}
1818+1919+// GraphSearchStarterPacks calls the XRPC method "app.bsky.graph.searchStarterPacks".
2020+//
2121+// q: Search query string. Syntax, phrase, boolean, and faceting is unspecified, but Lucene query syntax is recommended.
2222+func GraphSearchStarterPacks(ctx context.Context, c *xrpc.Client, cursor string, limit int64, q string) (*GraphSearchStarterPacks_Output, error) {
2323+ var out GraphSearchStarterPacks_Output
2424+2525+ params := map[string]interface{}{
2626+ "cursor": cursor,
2727+ "limit": limit,
2828+ "q": q,
2929+ }
3030+ if err := c.Do(ctx, xrpc.Query, "", "app.bsky.graph.searchStarterPacks", params, nil, &out); err != nil {
3131+ return nil, err
3232+ }
3333+3434+ return &out, nil
3535+}
···1313type UnspeccedDefs_SkeletonSearchPost struct {
1414 Uri string `json:"uri" cborgen:"uri"`
1515}
1616+1717+// UnspeccedDefs_SkeletonSearchStarterPack is a "skeletonSearchStarterPack" in the app.bsky.unspecced.defs schema.
1818+type UnspeccedDefs_SkeletonSearchStarterPack struct {
1919+ Uri string `json:"uri" cborgen:"uri"`
2020+}
2121+2222+// UnspeccedDefs_TrendingTopic is a "trendingTopic" in the app.bsky.unspecced.defs schema.
2323+type UnspeccedDefs_TrendingTopic struct {
2424+ Description *string `json:"description,omitempty" cborgen:"description,omitempty"`
2525+ DisplayName *string `json:"displayName,omitempty" cborgen:"displayName,omitempty"`
2626+ Link string `json:"link" cborgen:"link"`
2727+ Topic string `json:"topic" cborgen:"topic"`
2828+}
+2
api/bsky/unspeccedgetSuggestionsSkeleton.go
···1414type UnspeccedGetSuggestionsSkeleton_Output struct {
1515 Actors []*UnspeccedDefs_SkeletonSearchActor `json:"actors" cborgen:"actors"`
1616 Cursor *string `json:"cursor,omitempty" cborgen:"cursor,omitempty"`
1717+ // recId: Snowflake for this recommendation, use when submitting recommendation events.
1818+ RecId *int64 `json:"recId,omitempty" cborgen:"recId,omitempty"`
1719 // relativeToDid: DID of the account these suggestions are relative to. If this is returned undefined, suggestions are based on the viewer.
1820 RelativeToDid *string `json:"relativeToDid,omitempty" cborgen:"relativeToDid,omitempty"`
1921}
+34
api/bsky/unspeccedgetTrendingTopics.go
···11+// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
22+33+package bsky
44+55+// schema: app.bsky.unspecced.getTrendingTopics
66+77+import (
88+ "context"
99+1010+ "github.com/bluesky-social/indigo/xrpc"
1111+)
1212+1313+// UnspeccedGetTrendingTopics_Output is the output of a app.bsky.unspecced.getTrendingTopics call.
1414+type UnspeccedGetTrendingTopics_Output struct {
1515+ Suggested []*UnspeccedDefs_TrendingTopic `json:"suggested" cborgen:"suggested"`
1616+ Topics []*UnspeccedDefs_TrendingTopic `json:"topics" cborgen:"topics"`
1717+}
1818+1919+// UnspeccedGetTrendingTopics calls the XRPC method "app.bsky.unspecced.getTrendingTopics".
2020+//
2121+// viewer: DID of the account making the request (not included for public/unauthenticated queries). Used to boost followed accounts in ranking.
2222+func UnspeccedGetTrendingTopics(ctx context.Context, c *xrpc.Client, limit int64, viewer string) (*UnspeccedGetTrendingTopics_Output, error) {
2323+ var out UnspeccedGetTrendingTopics_Output
2424+2525+ params := map[string]interface{}{
2626+ "limit": limit,
2727+ "viewer": viewer,
2828+ }
2929+ if err := c.Do(ctx, xrpc.Query, "", "app.bsky.unspecced.getTrendingTopics", params, nil, &out); err != nil {
3030+ return nil, err
3131+ }
3232+3333+ return &out, nil
3434+}
+40
api/bsky/unspeccedsearchStarterPacksSkeleton.go
···11+// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
22+33+package bsky
44+55+// schema: app.bsky.unspecced.searchStarterPacksSkeleton
66+77+import (
88+ "context"
99+1010+ "github.com/bluesky-social/indigo/xrpc"
1111+)
1212+1313+// UnspeccedSearchStarterPacksSkeleton_Output is the output of a app.bsky.unspecced.searchStarterPacksSkeleton call.
1414+type UnspeccedSearchStarterPacksSkeleton_Output struct {
1515+ Cursor *string `json:"cursor,omitempty" cborgen:"cursor,omitempty"`
1616+ // hitsTotal: Count of search hits. Optional, may be rounded/truncated, and may not be possible to paginate through all hits.
1717+ HitsTotal *int64 `json:"hitsTotal,omitempty" cborgen:"hitsTotal,omitempty"`
1818+ StarterPacks []*UnspeccedDefs_SkeletonSearchStarterPack `json:"starterPacks" cborgen:"starterPacks"`
1919+}
2020+2121+// UnspeccedSearchStarterPacksSkeleton calls the XRPC method "app.bsky.unspecced.searchStarterPacksSkeleton".
2222+//
2323+// cursor: Optional pagination mechanism; may not necessarily allow scrolling through entire result set.
2424+// q: Search query string; syntax, phrase, boolean, and faceting is unspecified, but Lucene query syntax is recommended.
2525+// viewer: DID of the account making the request (not included for public/unauthenticated queries).
2626+func UnspeccedSearchStarterPacksSkeleton(ctx context.Context, c *xrpc.Client, cursor string, limit int64, q string, viewer string) (*UnspeccedSearchStarterPacksSkeleton_Output, error) {
2727+ var out UnspeccedSearchStarterPacksSkeleton_Output
2828+2929+ params := map[string]interface{}{
3030+ "cursor": cursor,
3131+ "limit": limit,
3232+ "q": q,
3333+ "viewer": viewer,
3434+ }
3535+ if err := c.Do(ctx, xrpc.Query, "", "app.bsky.unspecced.searchStarterPacksSkeleton", params, nil, &out); err != nil {
3636+ return nil, err
3737+ }
3838+3939+ return &out, nil
4040+}
···2121// addedLabels: If specified, only events where all of these labels were added are returned
2222// addedTags: If specified, only events where all of these tags were added are returned
2323// collections: If specified, only events where the subject belongs to the given collections will be returned. When subjectType is set to 'account', this will be ignored.
2424-// comment: If specified, only events with comments containing the keyword are returned
2424+// comment: If specified, only events with comments containing the keyword are returned. Apply || separator to use multiple keywords and match using OR condition.
2525// createdAfter: Retrieve events created after a given timestamp
2626// createdBefore: Retrieve events created before a given timestamp
2727// hasComment: If true, only events with comments are returned
···3131// sortDirection: Sort direction for the events. Defaults to descending order of created at timestamp.
3232// subjectType: If specified, only events where the subject is of the given type (account or record) will be returned. When this is set to 'account' the 'collections' parameter will be ignored. When includeAllUserRecords or subject is set, this will be ignored.
3333// types: The types of events (fully qualified string in the format of tools.ozone.moderation.defs#modEvent<name>) to filter by. If not specified, all events are returned.
3434-func ModerationQueryEvents(ctx context.Context, c *xrpc.Client, addedLabels []string, addedTags []string, collections []string, comment string, createdAfter string, createdBefore string, createdBy string, cursor string, hasComment bool, includeAllUserRecords bool, limit int64, removedLabels []string, removedTags []string, reportTypes []string, sortDirection string, subject string, subjectType string, types []string) (*ModerationQueryEvents_Output, error) {
3434+func ModerationQueryEvents(ctx context.Context, c *xrpc.Client, addedLabels []string, addedTags []string, collections []string, comment string, createdAfter string, createdBefore string, createdBy string, cursor string, hasComment bool, includeAllUserRecords bool, limit int64, policies []string, removedLabels []string, removedTags []string, reportTypes []string, sortDirection string, subject string, subjectType string, types []string) (*ModerationQueryEvents_Output, error) {
3535 var out ModerationQueryEvents_Output
36363737 params := map[string]interface{}{
···4646 "hasComment": hasComment,
4747 "includeAllUserRecords": includeAllUserRecords,
4848 "limit": limit,
4949+ "policies": policies,
4950 "removedLabels": removedLabels,
5051 "removedTags": removedTags,
5152 "reportTypes": reportTypes,
+17-1
api/ozone/moderationqueryStatuses.go
···2121// appealed: Get subjects in unresolved appealed status
2222// collections: If specified, subjects belonging to the given collections will be returned. When subjectType is set to 'account', this will be ignored.
2323// comment: Search subjects by keyword from comments
2424+// hostingDeletedAfter: Search subjects where the associated record/account was deleted after a given timestamp
2525+// hostingDeletedBefore: Search subjects where the associated record/account was deleted before a given timestamp
2626+// hostingStatuses: Search subjects by the status of the associated record/account
2727+// hostingUpdatedAfter: Search subjects where the associated record/account was updated after a given timestamp
2828+// hostingUpdatedBefore: Search subjects where the associated record/account was updated before a given timestamp
2429// includeAllUserRecords: All subjects, or subjects from given 'collections' param, belonging to the account specified in the 'subject' param will be returned.
2530// includeMuted: By default, we don't include muted subjects in the results. Set this to true to include them.
2631// lastReviewedBy: Get all subject statuses that were reviewed by a specific moderator
2732// onlyMuted: When set to true, only muted subjects and reporters will be returned.
3333+// queueCount: Number of queues being used by moderators. Subjects will be split among all queues.
3434+// queueIndex: Index of the queue to fetch subjects from. Works only when queueCount value is specified.
3535+// queueSeed: A seeder to shuffle/balance the queue items.
2836// reportedAfter: Search subjects reported after a given timestamp
2937// reportedBefore: Search subjects reported before a given timestamp
3038// reviewState: Specify when fetching subjects in a certain state
···3341// subject: The subject to get the status for.
3442// subjectType: If specified, subjects of the given type (account or record) will be returned. When this is set to 'account' the 'collections' parameter will be ignored. When includeAllUserRecords or subject is set, this will be ignored.
3543// takendown: Get subjects that were taken down
3636-func ModerationQueryStatuses(ctx context.Context, c *xrpc.Client, appealed bool, collections []string, comment string, cursor string, excludeTags []string, ignoreSubjects []string, includeAllUserRecords bool, includeMuted bool, lastReviewedBy string, limit int64, onlyMuted bool, reportedAfter string, reportedBefore string, reviewState string, reviewedAfter string, reviewedBefore string, sortDirection string, sortField string, subject string, subjectType string, tags []string, takendown bool) (*ModerationQueryStatuses_Output, error) {
4444+func ModerationQueryStatuses(ctx context.Context, c *xrpc.Client, appealed bool, collections []string, comment string, cursor string, excludeTags []string, hostingDeletedAfter string, hostingDeletedBefore string, hostingStatuses []string, hostingUpdatedAfter string, hostingUpdatedBefore string, ignoreSubjects []string, includeAllUserRecords bool, includeMuted bool, lastReviewedBy string, limit int64, onlyMuted bool, queueCount int64, queueIndex int64, queueSeed string, reportedAfter string, reportedBefore string, reviewState string, reviewedAfter string, reviewedBefore string, sortDirection string, sortField string, subject string, subjectType string, tags []string, takendown bool) (*ModerationQueryStatuses_Output, error) {
3745 var out ModerationQueryStatuses_Output
38463947 params := map[string]interface{}{
···4250 "comment": comment,
4351 "cursor": cursor,
4452 "excludeTags": excludeTags,
5353+ "hostingDeletedAfter": hostingDeletedAfter,
5454+ "hostingDeletedBefore": hostingDeletedBefore,
5555+ "hostingStatuses": hostingStatuses,
5656+ "hostingUpdatedAfter": hostingUpdatedAfter,
5757+ "hostingUpdatedBefore": hostingUpdatedBefore,
4558 "ignoreSubjects": ignoreSubjects,
4659 "includeAllUserRecords": includeAllUserRecords,
4760 "includeMuted": includeMuted,
4861 "lastReviewedBy": lastReviewedBy,
4962 "limit": limit,
5063 "onlyMuted": onlyMuted,
6464+ "queueCount": queueCount,
6565+ "queueIndex": queueIndex,
6666+ "queueSeed": queueSeed,
5167 "reportedAfter": reportedAfter,
5268 "reportedBefore": reportedBefore,
5369 "reviewState": reviewState,
+23
api/ozone/settingdefs.go
···11+// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
22+33+package ozone
44+55+// schema: tools.ozone.setting.defs
66+77+import (
88+ "github.com/bluesky-social/indigo/lex/util"
99+)
1010+1111+// SettingDefs_Option is a "option" in the tools.ozone.setting.defs schema.
1212+type SettingDefs_Option struct {
1313+ CreatedAt *string `json:"createdAt,omitempty" cborgen:"createdAt,omitempty"`
1414+ CreatedBy string `json:"createdBy" cborgen:"createdBy"`
1515+ Description *string `json:"description,omitempty" cborgen:"description,omitempty"`
1616+ Did string `json:"did" cborgen:"did"`
1717+ Key string `json:"key" cborgen:"key"`
1818+ LastUpdatedBy string `json:"lastUpdatedBy" cborgen:"lastUpdatedBy"`
1919+ ManagerRole *string `json:"managerRole,omitempty" cborgen:"managerRole,omitempty"`
2020+ Scope string `json:"scope" cborgen:"scope"`
2121+ UpdatedAt *string `json:"updatedAt,omitempty" cborgen:"updatedAt,omitempty"`
2222+ Value *util.LexiconTypeDecoder `json:"value" cborgen:"value"`
2323+}
+38
api/ozone/settinglistOptions.go
···11+// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
22+33+package ozone
44+55+// schema: tools.ozone.setting.listOptions
66+77+import (
88+ "context"
99+1010+ "github.com/bluesky-social/indigo/xrpc"
1111+)
1212+1313+// SettingListOptions_Output is the output of a tools.ozone.setting.listOptions call.
1414+type SettingListOptions_Output struct {
1515+ Cursor *string `json:"cursor,omitempty" cborgen:"cursor,omitempty"`
1616+ Options []*SettingDefs_Option `json:"options" cborgen:"options"`
1717+}
1818+1919+// SettingListOptions calls the XRPC method "tools.ozone.setting.listOptions".
2020+//
2121+// keys: Filter for only the specified keys. Ignored if prefix is provided
2222+// prefix: Filter keys by prefix
2323+func SettingListOptions(ctx context.Context, c *xrpc.Client, cursor string, keys []string, limit int64, prefix string, scope string) (*SettingListOptions_Output, error) {
2424+ var out SettingListOptions_Output
2525+2626+ params := map[string]interface{}{
2727+ "cursor": cursor,
2828+ "keys": keys,
2929+ "limit": limit,
3030+ "prefix": prefix,
3131+ "scope": scope,
3232+ }
3333+ if err := c.Do(ctx, xrpc.Query, "", "tools.ozone.setting.listOptions", params, nil, &out); err != nil {
3434+ return nil, err
3535+ }
3636+3737+ return &out, nil
3838+}
+31
api/ozone/settingremoveOptions.go
···11+// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
22+33+package ozone
44+55+// schema: tools.ozone.setting.removeOptions
66+77+import (
88+ "context"
99+1010+ "github.com/bluesky-social/indigo/xrpc"
1111+)
1212+1313+// SettingRemoveOptions_Input is the input argument to a tools.ozone.setting.removeOptions call.
1414+type SettingRemoveOptions_Input struct {
1515+ Keys []string `json:"keys" cborgen:"keys"`
1616+ Scope string `json:"scope" cborgen:"scope"`
1717+}
1818+1919+// SettingRemoveOptions_Output is the output of a tools.ozone.setting.removeOptions call.
2020+type SettingRemoveOptions_Output struct {
2121+}
2222+2323+// SettingRemoveOptions calls the XRPC method "tools.ozone.setting.removeOptions".
2424+func SettingRemoveOptions(ctx context.Context, c *xrpc.Client, input *SettingRemoveOptions_Input) (*SettingRemoveOptions_Output, error) {
2525+ var out SettingRemoveOptions_Output
2626+ if err := c.Do(ctx, xrpc.Procedure, "application/json", "tools.ozone.setting.removeOptions", nil, input, &out); err != nil {
2727+ return nil, err
2828+ }
2929+3030+ return &out, nil
3131+}
+36
api/ozone/settingupsertOption.go
···11+// Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT.
22+33+package ozone
44+55+// schema: tools.ozone.setting.upsertOption
66+77+import (
88+ "context"
99+1010+ "github.com/bluesky-social/indigo/lex/util"
1111+ "github.com/bluesky-social/indigo/xrpc"
1212+)
1313+1414+// SettingUpsertOption_Input is the input argument to a tools.ozone.setting.upsertOption call.
1515+type SettingUpsertOption_Input struct {
1616+ Description *string `json:"description,omitempty" cborgen:"description,omitempty"`
1717+ Key string `json:"key" cborgen:"key"`
1818+ ManagerRole *string `json:"managerRole,omitempty" cborgen:"managerRole,omitempty"`
1919+ Scope string `json:"scope" cborgen:"scope"`
2020+ Value *util.LexiconTypeDecoder `json:"value" cborgen:"value"`
2121+}
2222+2323+// SettingUpsertOption_Output is the output of a tools.ozone.setting.upsertOption call.
2424+type SettingUpsertOption_Output struct {
2525+ Option *SettingDefs_Option `json:"option" cborgen:"option"`
2626+}
2727+2828+// SettingUpsertOption calls the XRPC method "tools.ozone.setting.upsertOption".
2929+func SettingUpsertOption(ctx context.Context, c *xrpc.Client, input *SettingUpsertOption_Input) (*SettingUpsertOption_Output, error) {
3030+ var out SettingUpsertOption_Output
3131+ if err := c.Do(ctx, xrpc.Procedure, "application/json", "tools.ozone.setting.upsertOption", nil, input, &out); err != nil {
3232+ return nil, err
3333+ }
3434+3535+ return &out, nil
3636+}
+2-8
atproto/data/data.go
···6666 out = append(out, v)
6767 case []any:
6868 for _, el := range v {
6969- down := extractBlobsAtom(el)
7070- for _, d := range down {
7171- out = append(out, d)
7272- }
6969+ out = append(out, extractBlobsAtom(el)...)
7370 }
7471 case map[string]any:
7572 for _, val := range v {
7676- down := extractBlobsAtom(val)
7777- for _, d := range down {
7878- out = append(out, d)
7979- }
7373+ out = append(out, extractBlobsAtom(val)...)
8074 }
8175 default:
8276 }
+4-3
atproto/identity/base_directory.go
···4646 ident := ParseIdentity(doc)
4747 declared, err := ident.DeclaredHandle()
4848 if err != nil {
4949- return nil, err
4949+ return nil, fmt.Errorf("could not verify handle/DID match: %w", err)
5050 }
5151 if declared != h {
5252- return nil, ErrHandleMismatch
5252+ return nil, fmt.Errorf("%w: %s != %s", ErrHandleMismatch, declared, h)
5353 }
5454 ident.Handle = declared
5555···6666 if errors.Is(err, ErrHandleNotDeclared) {
6767 ident.Handle = syntax.HandleInvalid
6868 } else if err != nil {
6969- return nil, err
6969+ return nil, fmt.Errorf("could not parse handle from DID document: %w", err)
7070 } else {
7171 // if a handle was declared, resolve it
7272 resolvedDID, err := d.ResolveHandle(ctx, declared)
···9999}
100100101101func (d *BaseDirectory) Purge(ctx context.Context, a syntax.AtIdentifier) error {
102102+ // BaseDirectory itself does not implement caching
102103 return nil
103104}
+3-3
atproto/identity/cache_directory.go
···93939494func (d *CacheDirectory) ResolveHandle(ctx context.Context, h syntax.Handle) (syntax.DID, error) {
9595 if h.IsInvalidHandle() {
9696- return "", fmt.Errorf("invalid handle")
9696+ return "", fmt.Errorf("can not resolve handle: %w", ErrInvalidHandle)
9797 }
9898 entry, ok := d.handleCache.Get(h)
9999 if ok && !d.IsHandleStale(&entry) {
···230230231231 declared, err := ident.DeclaredHandle()
232232 if err != nil {
233233- return nil, hit, err
233233+ return nil, hit, fmt.Errorf("could not verify handle/DID mapping: %w", err)
234234 }
235235 if declared != h {
236236- return nil, hit, ErrHandleMismatch
236236+ return nil, hit, fmt.Errorf("%w: %s != %s", ErrHandleMismatch, declared, h)
237237 }
238238 return ident, hit, nil
239239}
-20
atproto/identity/did.go
···1313 "github.com/bluesky-social/indigo/atproto/syntax"
1414)
15151616-type DIDDocument struct {
1717- DID syntax.DID `json:"id"`
1818- AlsoKnownAs []string `json:"alsoKnownAs,omitempty"`
1919- VerificationMethod []DocVerificationMethod `json:"verificationMethod,omitempty"`
2020- Service []DocService `json:"service,omitempty"`
2121-}
2222-2323-type DocVerificationMethod struct {
2424- ID string `json:"id"`
2525- Type string `json:"type"`
2626- Controller string `json:"controller"`
2727- PublicKeyMultibase string `json:"publicKeyMultibase"`
2828-}
2929-3030-type DocService struct {
3131- ID string `json:"id"`
3232- Type string `json:"type"`
3333- ServiceEndpoint string `json:"serviceEndpoint"`
3434-}
3535-3616// WARNING: this does *not* bi-directionally verify account metadata; it only implements direct DID-to-DID-document lookup for the supported DID methods, and parses the resulting DID Doc into an Identity struct
3717func (d *BaseDirectory) ResolveDID(ctx context.Context, did syntax.DID) (*DIDDocument, error) {
3818 start := time.Now()
···11+package identity
22+33+import (
44+ "context"
55+ "errors"
66+ "net"
77+ "net/http"
88+ "time"
99+1010+ "github.com/bluesky-social/indigo/atproto/syntax"
1111+)
1212+1313+// API for doing account lookups by DID or handle, with bi-directional verification handled automatically. Almost all atproto services and clients should use an implementation of this interface instead of resolving handles or DIDs separately
1414+//
1515+// Handles which fail to resolve, or don't match DID alsoKnownAs, are an error. DIDs which resolve but the handle does not resolve back to the DID return an Identity where the Handle is the special `handle.invalid` value.
1616+//
1717+// Some example implementations of this interface could be:
1818+// - basic direct resolution on every call
1919+// - local in-memory caching layer to reduce network hits
2020+// - API client, which just makes requests to PDS (or other remote service)
2121+// - client for shared network cache (eg, Redis)
2222+type Directory interface {
2323+ LookupHandle(ctx context.Context, h syntax.Handle) (*Identity, error)
2424+ LookupDID(ctx context.Context, d syntax.DID) (*Identity, error)
2525+ Lookup(ctx context.Context, i syntax.AtIdentifier) (*Identity, error)
2626+2727+ // Flushes any cache of the indicated identifier. If directory is not using caching, can ignore this.
2828+ Purge(ctx context.Context, i syntax.AtIdentifier) error
2929+}
3030+3131+// Indicates that handle resolution failed. A wrapped error may provide more context. This is only returned when looking up a handle, not when looking up a DID.
3232+var ErrHandleResolutionFailed = errors.New("handle resolution failed")
3333+3434+// Indicates that resolution process completed successfully, but handle does not exist. This is only returned when looking up a handle, not when looking up a DID.
3535+var ErrHandleNotFound = errors.New("handle not found")
3636+3737+// Indicates that resolution process completed successfully, handle mapped to a different DID. This is only returned when looking up a handle, not when looking up a DID.
3838+var ErrHandleMismatch = errors.New("handle/DID mismatch")
3939+4040+// Indicates that DID document did not include any handle ("alsoKnownAs"). This is only returned when looking up a handle, not when looking up a DID.
4141+var ErrHandleNotDeclared = errors.New("DID document did not declare a handle")
4242+4343+// Handle top-level domain (TLD) is one of the special "Reserved" suffixes, and not allowed for atproto use
4444+var ErrHandleReservedTLD = errors.New("handle top-level domain is disallowed")
4545+4646+// Indicates that resolution process completed successfully, but the DID does not exist.
4747+var ErrDIDNotFound = errors.New("DID not found")
4848+4949+// Indicates that DID resolution process failed. A wrapped error may provide more context.
5050+var ErrDIDResolutionFailed = errors.New("DID resolution failed")
5151+5252+// Indicates that DID document did not include a public key with the specified ID
5353+var ErrKeyNotDeclared = errors.New("DID document did not declare a relevant public key")
5454+5555+// Handle was invalid, in a situation where a valid handle is required.
5656+var ErrInvalidHandle = errors.New("Invalid Handle")
5757+5858+var DefaultPLCURL = "https://plc.directory"
5959+6060+// Returns a reasonable Directory implementation for applications
6161+func DefaultDirectory() Directory {
6262+ base := BaseDirectory{
6363+ PLCURL: DefaultPLCURL,
6464+ HTTPClient: http.Client{
6565+ Timeout: time.Second * 10,
6666+ Transport: &http.Transport{
6767+ // would want this around 100ms for services doing lots of handle resolution. Impacts PLC connections as well, but not too bad.
6868+ IdleConnTimeout: time.Millisecond * 1000,
6969+ MaxIdleConns: 100,
7070+ },
7171+ },
7272+ Resolver: net.Resolver{
7373+ Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
7474+ d := net.Dialer{Timeout: time.Second * 3}
7575+ return d.DialContext(ctx, network, address)
7676+ },
7777+ },
7878+ TryAuthoritativeDNS: true,
7979+ // primary Bluesky PDS instance only supports HTTP resolution method
8080+ SkipDNSDomainSuffixes: []string{".bsky.social"},
8181+ }
8282+ cached := NewCacheDirectory(&base, 250_000, time.Hour*24, time.Minute*2, time.Minute*5)
8383+ return &cached
8484+}
+8-4
atproto/identity/handle.go
···3535 var dnsErr *net.DNSError
3636 if errors.As(err, &dnsErr) {
3737 if dnsErr.IsNotFound {
3838- return "", ErrHandleNotFound
3838+ return "", fmt.Errorf("%w: %s", ErrHandleNotFound, handle)
3939 }
4040 }
4141 if err != nil {
···138138 var dnsErr *net.DNSError
139139 if errors.As(err, &dnsErr) {
140140 if dnsErr.IsNotFound {
141141- return "", fmt.Errorf("%w: DNS NXDOMAIN for %s", ErrHandleNotFound, handle)
141141+ return "", fmt.Errorf("%w: DNS NXDOMAIN for HTTP well-known resolution of %s", ErrHandleNotFound, handle)
142142 }
143143 }
144144 return "", fmt.Errorf("%w: HTTP well-known request error: %w", ErrHandleResolutionFailed, err)
···160160 return "", fmt.Errorf("%w: HTTP well-known body read for %s: %w", ErrHandleResolutionFailed, handle, err)
161161 }
162162 line := strings.TrimSpace(string(b))
163163- return syntax.ParseDID(line)
163163+ outDid, err := syntax.ParseDID(line)
164164+ if err != nil {
165165+ return outDid, fmt.Errorf("%w: invalid DID in HTTP well-known for %s", ErrHandleResolutionFailed, handle)
166166+ }
167167+ return outDid, err
164168}
165169166170func (d *BaseDirectory) ResolveHandle(ctx context.Context, handle syntax.Handle) (syntax.DID, error) {
···169173 var did syntax.DID
170174171175 if handle.IsInvalidHandle() {
172172- return "", fmt.Errorf("invalid handle")
176176+ return "", fmt.Errorf("can not resolve handle: %w", ErrInvalidHandle)
173177 }
174178175179 if !handle.AllowedTLD() {
-75
atproto/identity/identity.go
···11package identity
2233import (
44- "context"
55- "errors"
64 "fmt"
77- "net"
88- "net/http"
95 "net/url"
106 "strings"
1111- "time"
127138 "github.com/bluesky-social/indigo/atproto/crypto"
149 "github.com/bluesky-social/indigo/atproto/syntax"
15101611 "github.com/mr-tron/base58"
1712)
1818-1919-// API for doing account lookups by DID or handle, with bi-directional verification handled automatically. Almost all atproto services and clients should use an implementation of this interface instead of resolving handles or DIDs separately
2020-//
2121-// Handles which fail to resolve, or don't match DID alsoKnownAs, are an error. DIDs which resolve but the handle does not resolve back to the DID return an Identity where the Handle is the special `handle.invalid` value.
2222-//
2323-// Some example implementations of this interface could be:
2424-// - basic direct resolution on every call
2525-// - local in-memory caching layer to reduce network hits
2626-// - API client, which just makes requests to PDS (or other remote service)
2727-// - client for shared network cache (eg, Redis)
2828-type Directory interface {
2929- LookupHandle(ctx context.Context, h syntax.Handle) (*Identity, error)
3030- LookupDID(ctx context.Context, d syntax.DID) (*Identity, error)
3131- Lookup(ctx context.Context, i syntax.AtIdentifier) (*Identity, error)
3232-3333- // Flushes any cache of the indicated identifier. If directory is not using caching, can ignore this.
3434- Purge(ctx context.Context, i syntax.AtIdentifier) error
3535-}
3636-3737-// Indicates that handle resolution failed. A wrapped error may provide more context. This is only returned when looking up a handle, not when looking up a DID.
3838-var ErrHandleResolutionFailed = errors.New("handle resolution failed")
3939-4040-// Indicates that resolution process completed successfully, but handle does not exist. This is only returned when looking up a handle, not when looking up a DID.
4141-var ErrHandleNotFound = errors.New("handle not found")
4242-4343-// Indicates that resolution process completed successfully, handle mapped to a different DID. This is only returned when looking up a handle, not when looking up a DID.
4444-var ErrHandleMismatch = errors.New("handle/DID mismatch")
4545-4646-// Indicates that DID document did not include any handle ("alsoKnownAs"). This is only returned when looking up a handle, not when looking up a DID.
4747-var ErrHandleNotDeclared = errors.New("DID document did not declare a handle")
4848-4949-// Handle top-level domain (TLD) is one of the special "Reserved" suffixes, and not allowed for atproto use
5050-var ErrHandleReservedTLD = errors.New("handle top-level domain is disallowed")
5151-5252-// Indicates that resolution process completed successfully, but the DID does not exist.
5353-var ErrDIDNotFound = errors.New("DID not found")
5454-5555-// Indicates that DID resolution process failed. A wrapped error may provide more context.
5656-var ErrDIDResolutionFailed = errors.New("DID resolution failed")
5757-5858-// Indicates that DID document did not include a public key with the specified ID
5959-var ErrKeyNotDeclared = errors.New("DID document did not declare a relevant public key")
6060-6161-var DefaultPLCURL = "https://plc.directory"
6262-6363-// Returns a reasonable Directory implementation for applications
6464-func DefaultDirectory() Directory {
6565- base := BaseDirectory{
6666- PLCURL: DefaultPLCURL,
6767- HTTPClient: http.Client{
6868- Timeout: time.Second * 10,
6969- Transport: &http.Transport{
7070- // would want this around 100ms for services doing lots of handle resolution. Impacts PLC connections as well, but not too bad.
7171- IdleConnTimeout: time.Millisecond * 1000,
7272- MaxIdleConns: 100,
7373- },
7474- },
7575- Resolver: net.Resolver{
7676- Dial: func(ctx context.Context, network, address string) (net.Conn, error) {
7777- d := net.Dialer{Timeout: time.Second * 3}
7878- return d.DialContext(ctx, network, address)
7979- },
8080- },
8181- TryAuthoritativeDNS: true,
8282- // primary Bluesky PDS instance only supports HTTP resolution method
8383- SkipDNSDomainSuffixes: []string{".bsky.social"},
8484- }
8585- cached := NewCacheDirectory(&base, 250_000, time.Hour*24, time.Minute*2, time.Minute*5)
8686- return &cached
8787-}
88138914// Represents an atproto identity. Could be a regular user account, or a service account (eg, feed generator)
9015type Identity struct {
···296296297297func (s *SchemaRecord) CheckSchema() error {
298298 switch s.Key {
299299- case "tid", "any":
299299+ case "tid", "nsid", "any":
300300 // pass
301301 default:
302302 if !strings.HasPrefix(s.Key, "literal:") {
+21
atproto/syntax/cmd/atp-syntax/main.go
···2222 ArgsUsage: "<tid>",
2323 Action: runParseTID,
2424 },
2525+ &cli.Command{
2626+ Name: "parse-did",
2727+ Usage: "parse a DID",
2828+ ArgsUsage: "<did>",
2929+ Action: runParseDID,
3030+ },
2531 }
2632 h := slog.NewTextHandler(os.Stderr, &slog.HandlerOptions{Level: slog.LevelDebug})
2733 slog.SetDefault(slog.New(h))
···43494450 return nil
4551}
5252+5353+func runParseDID(cctx *cli.Context) error {
5454+ s := cctx.Args().First()
5555+ if s == "" {
5656+ return fmt.Errorf("need to provide identifier as an argument")
5757+ }
5858+5959+ did, err := syntax.ParseDID(s)
6060+ if err != nil {
6161+ return err
6262+ }
6363+ fmt.Printf("%s\n", did)
6464+6565+ return nil
6666+}
+22
atproto/syntax/did.go
···1414type DID string
15151616var didRegex = regexp.MustCompile(`^did:[a-z]+:[a-zA-Z0-9._:%-]*[a-zA-Z0-9._-]$`)
1717+var plcChars = ""
1818+1919+func isASCIIAlphaNum(c rune) bool {
2020+ if (c >= 'a' && c <= 'z') || (c >= 'A' && c <= 'Z') || (c >= '0' && c <= '9') {
2121+ return true
2222+ }
2323+ return false
2424+}
17251826func ParseDID(raw string) (DID, error) {
2727+ // fast-path for did:plc, avoiding regex
2828+ if len(raw) == 32 && strings.HasPrefix(raw, "did:plc:") {
2929+ // NOTE: this doesn't really check base32, just broader alphanumberic. might pass invalid PLC DIDs, but they still have overall valid DID syntax
3030+ isPlc := true
3131+ for _, c := range raw[8:32] {
3232+ if !isASCIIAlphaNum(c) {
3333+ isPlc = false
3434+ break
3535+ }
3636+ }
3737+ if isPlc {
3838+ return DID(raw), nil
3939+ }
4040+ }
1941 if raw == "" {
2042 return "", errors.New("expected DID, got empty string")
2143 }
+26
atproto/syntax/path.go
···11+package syntax
22+33+import (
44+ "errors"
55+ "fmt"
66+ "strings"
77+)
88+99+// Parses an atproto repo path string in to "collection" (NSID) and record key parts.
1010+//
1111+// Does not return partial success: either both collection and record key are complete (and error is nil), or both are empty string (and error is not nil)
1212+func ParseRepoPath(raw string) (NSID, RecordKey, error) {
1313+ parts := strings.SplitN(raw, "/", 3)
1414+ if len(parts) != 2 {
1515+ return "", "", errors.New("expected path to have two parts, separated by single slash")
1616+ }
1717+ nsid, err := ParseNSID(parts[0])
1818+ if err != nil {
1919+ return "", "", fmt.Errorf("collection part of path not a valid NSID: %w", err)
2020+ }
2121+ rkey, err := ParseRecordKey(parts[1])
2222+ if err != nil {
2323+ return "", "", fmt.Errorf("record key part of path not valid: %w", err)
2424+ }
2525+ return nsid, rkey, nil
2626+}
···4444 p1cbor := p1buf.Bytes()
45454646 // generate double the quote of events; expect to only count the quote worth of actions
4747- for i := 0; i < 2*QuotaModTakedownDay; i++ {
4747+ for i := 0; i < 2*eng.Config.QuotaModTakedownDay; i++ {
4848 ident := identity.Identity{
4949 DID: syntax.DID(fmt.Sprintf("did:plc:abc%d", i)),
5050 Handle: syntax.Handle("handle.example.com"),
···63636464 takedowns, err := eng.Counters.GetCount(ctx, "automod-quota", "takedown", countstore.PeriodDay)
6565 assert.NoError(err)
6666- assert.Equal(QuotaModTakedownDay, takedowns)
6666+ assert.Equal(eng.Config.QuotaModTakedownDay, takedowns)
67676868 reports, err := eng.Counters.GetCount(ctx, "automod-quota", "report", countstore.PeriodDay)
6969 assert.NoError(err)
···8989 p1cbor := p1buf.Bytes()
90909191 // generate double the quota of events; expect to only count the quota worth of actions
9292- for i := 0; i < 2*QuotaModReportDay; i++ {
9292+ for i := 0; i < 2*eng.Config.QuotaModReportDay; i++ {
9393 ident := identity.Identity{
9494 DID: syntax.DID(fmt.Sprintf("did:plc:abc%d", i)),
9595 Handle: syntax.Handle("handle.example.com"),
···112112113113 reports, err := eng.Counters.GetCount(ctx, "automod-quota", "report", countstore.PeriodDay)
114114 assert.NoError(err)
115115- assert.Equal(QuotaModReportDay, reports)
115115+ assert.Equal(eng.Config.QuotaModReportDay, reports)
116116}
-12
automod/engine/effects.go
···2233import (
44 "sync"
55- "time"
66-)
77-88-var (
99- // time period within which automod will not re-report an account for the same reasonType
1010- ReportDupePeriod = 1 * 24 * time.Hour
1111- // number of reports automod can file per day, for all subjects and types combined (circuit breaker)
1212- QuotaModReportDay = 2000
1313- // number of takedowns automod can action per day, for all subjects combined (circuit breaker)
1414- QuotaModTakedownDay = 200
1515- // number of misc actions automod can do per day, for all subjects combined (circuit breaker)
1616- QuotaModActionDay = 1000
175)
186197type CounterRef struct {
+8
automod/engine/engine.go
···5252type EngineConfig struct {
5353 // if enabled, account metadata is not hydrated for every event by default
5454 SkipAccountMeta bool
5555+ // time period within which automod will not re-report an account for the same reasonType
5656+ ReportDupePeriod time.Duration
5757+ // number of reports automod can file per day, for all subjects and types combined (circuit breaker)
5858+ QuotaModReportDay int
5959+ // number of takedowns automod can action per day, for all subjects combined (circuit breaker)
6060+ QuotaModTakedownDay int
6161+ // number of misc actions automod can do per day, for all subjects combined (circuit breaker)
6262+ QuotaModActionDay int
5563}
56645765// Entrypoint for external code pushing #identity events in to the engine.
+6-4
automod/engine/fetch_account_meta.go
···139139 ap.AccountTags = dedupeStrings(rd.Moderation.SubjectStatus.Tags)
140140 if rd.Moderation.SubjectStatus.ReviewState != nil {
141141 switch *rd.Moderation.SubjectStatus.ReviewState {
142142- case "#reviewOpen":
142142+ case "tools.ozone.moderation.defs#reviewOpen":
143143 ap.ReviewState = ReviewStateOpen
144144- case "#reviewEscalated":
144144+ case "tools.ozone.moderation.defs#reviewEscalated":
145145 ap.ReviewState = ReviewStateEscalated
146146- case "#reviewClosed":
146146+ case "tools.ozone.moderation.defs#reviewClosed":
147147 ap.ReviewState = ReviewStateClosed
148148- case "#reviewNonde":
148148+ case "tools.ozone.moderation.defs#reviewNone":
149149 ap.ReviewState = ReviewStateNone
150150+ default:
151151+ logger.Warn("unexpected ozone moderation review state", "state", rd.Moderation.SubjectStatus.ReviewState, "did", ident.DID)
150152 }
151153 }
152154 }
···2727 if !ok {
2828 v = []string{}
2929 }
3030- for _, f := range flags {
3131- v = append(v, f)
3232- }
3030+ v = append(v, flags...)
3331 v = dedupeStrings(v)
3432 s.Data[key] = v
3533 return nil
+1-3
automod/helpers/bsky.go
···11111212func ExtractHashtagsPost(post *appbsky.FeedPost) []string {
1313 var tags []string
1414- for _, tag := range post.Tags {
1515- tags = append(tags, tag)
1616- }
1414+ tags = append(tags, post.Tags...)
1715 for _, facet := range post.Facets {
1816 for _, feat := range facet.Features {
1917 if feat.RichtextFacet_Tag != nil {
+1-1
automod/rules/harassment.go
···130130131131 if count > 5 {
132132 //c.AddRecordFlag("trivial-harassing-post")
133133- c.ReportAccount(automod.ReportReasonOther, fmt.Sprintf("possible targetted harassment (also labeled; remove label if this isn't harassment!)"))
133133+ c.ReportAccount(automod.ReportReasonOther, "possible targetted harassment (also labeled; remove label if this isn't harassment!)")
134134 c.AddAccountLabel("!hide")
135135 c.Notify("slack")
136136 }
···11-// Copied from indigo:api/atproto/repolistRecords.go
22-33-package main
44-55-// schema: com.atproto.repo.getRecord
66-77-import (
88- "context"
99- "encoding/json"
1010-1111- "github.com/bluesky-social/indigo/xrpc"
1212-)
1313-1414-// RepoGetRecord_Output is the output of a com.atproto.repo.getRecord call.
1515-type RepoGetRecord_Output struct {
1616- Cid *string `json:"cid,omitempty" cborgen:"cid,omitempty"`
1717- Uri string `json:"uri" cborgen:"uri"`
1818- // NOTE: changed from lex decoder to json.RawMessage
1919- Value *json.RawMessage `json:"value" cborgen:"value"`
2020-}
2121-2222-// RepoGetRecord calls the XRPC method "com.atproto.repo.getRecord".
2323-//
2424-// cid: The CID of the version of the record. If not specified, then return the most recent version.
2525-// collection: The NSID of the record collection.
2626-// repo: The handle or DID of the repo.
2727-// rkey: The Record Key.
2828-func RepoGetRecord(ctx context.Context, c *xrpc.Client, cid string, collection string, repo string, rkey string) (*RepoGetRecord_Output, error) {
2929- var out RepoGetRecord_Output
3030-3131- params := map[string]interface{}{
3232- "cid": cid,
3333- "collection": collection,
3434- "repo": repo,
3535- "rkey": rkey,
3636- }
3737- if err := c.Do(ctx, xrpc.Query, "", "com.atproto.repo.getRecord", params, nil, &out); err != nil {
3838- return nil, err
3939- }
4040-4141- return &out, nil
4242-}
-53
cmd/astrolabe/repolistRecords.go
···11-// Copied from indigo:api/atproto/repolistRecords.go
22-33-package main
44-55-// schema: com.atproto.repo.listRecords
66-77-import (
88- "context"
99- "encoding/json"
1010-1111- "github.com/bluesky-social/indigo/xrpc"
1212-)
1313-1414-// RepoListRecords_Output is the output of a com.atproto.repo.listRecords call.
1515-type RepoListRecords_Output struct {
1616- Cursor *string `json:"cursor,omitempty" cborgen:"cursor,omitempty"`
1717- Records []*RepoListRecords_Record `json:"records" cborgen:"records"`
1818-}
1919-2020-// RepoListRecords_Record is a "record" in the com.atproto.repo.listRecords schema.
2121-type RepoListRecords_Record struct {
2222- Cid string `json:"cid" cborgen:"cid"`
2323- Uri string `json:"uri" cborgen:"uri"`
2424- // NOTE: changed from lex decoder to json.RawMessage
2525- Value *json.RawMessage `json:"value" cborgen:"value"`
2626-}
2727-2828-// RepoListRecords calls the XRPC method "com.atproto.repo.listRecords".
2929-//
3030-// collection: The NSID of the record type.
3131-// limit: The number of records to return.
3232-// repo: The handle or DID of the repo.
3333-// reverse: Flag to reverse the order of the returned records.
3434-// rkeyEnd: DEPRECATED: The highest sort-ordered rkey to stop at (exclusive)
3535-// rkeyStart: DEPRECATED: The lowest sort-ordered rkey to start from (exclusive)
3636-func RepoListRecords(ctx context.Context, c *xrpc.Client, collection string, cursor string, limit int64, repo string, reverse bool, rkeyEnd string, rkeyStart string) (*RepoListRecords_Output, error) {
3737- var out RepoListRecords_Output
3838-3939- params := map[string]interface{}{
4040- "collection": collection,
4141- "cursor": cursor,
4242- "limit": limit,
4343- "repo": repo,
4444- "reverse": reverse,
4545- "rkeyEnd": rkeyEnd,
4646- "rkeyStart": rkeyStart,
4747- }
4848- if err := c.Do(ctx, xrpc.Query, "", "com.atproto.repo.listRecords", params, nil, &out); err != nil {
4949- return nil, err
5050- }
5151-5252- return &out, nil
5353-}
···11+22+`rainbow`: atproto Firehose Fanout Service
33+==========================================
44+55+This is an atproto service which consumes from a firehose (eg, from a relay or PDS) and fans out events to many subscribers.
66+77+Features and design points:
88+99+- retains "backfill window" on local disk (using [pebble](https://github.com/cockroachdb/pebble))
1010+- serves the `com.atproto.sync.subscribeRepos` endpoint (WebSocket)
1111+- retains upstream firehose "sequence numbers"
1212+- does not validate events (signatures, repo tree, hashes, etc), just passes through
1313+- does not archive or mirror individual records or entire repositories (or implement related API endpoints)
1414+- disk I/O intensive: fast NVMe disks are recommended, and RAM is helpful for caching
1515+- single golang binary for easy deployment
1616+- observability: logging, prometheus metrics, OTEL traces
1717+1818+## Running
1919+2020+This is a simple, single-binary Go program. You can also build and run it as a docker container (see `./Dockerfile`).
2121+2222+From the top level of this repo, you can build:
2323+2424+```shell
2525+go build ./cmd/rainbow -o rainbow-bin
2626+```
2727+2828+or just run it, and see configuration options:
2929+3030+```shell
3131+go run ./cmd/rainbow --help
3232+```
···11# Stage 1: Build the Go binary
22-FROM golang:1.22-alpine3.18 AS builder
22+FROM golang:1.23-alpine3.20 AS builder
3344# Create a directory for the application
55WORKDIR /app
···2233import (
44 "context"
55+ "log/slog"
56 "sync"
67 "time"
7889 "github.com/bluesky-social/indigo/events"
910 "github.com/bluesky-social/indigo/events/schedulers"
1010- logging "github.com/ipfs/go-log"
1111 "github.com/prometheus/client_golang/prometheus"
1212)
1313-1414-var log = logging.Logger("autoscaling-scheduler")
15131614// Scheduler is a scheduler that will scale up and down the number of workers based on the throughput of the workers.
1715type Scheduler struct {
···4038 autoscaleFrequency time.Duration
4139 autoscalerIn chan struct{}
4240 autoscalerOut chan struct{}
4141+4242+ log *slog.Logger
4343}
44444545type AutoscaleSettings struct {
···9999 autoscaleFrequency: autoscaleSettings.AutoscaleFrequency,
100100 autoscalerIn: make(chan struct{}),
101101 autoscalerOut: make(chan struct{}),
102102+103103+ log: slog.Default().With("system", "autoscaling-scheduler"),
102104 }
103105104106 for i := 0; i < p.concurrency; i++ {
···111113}
112114113115func (p *Scheduler) Shutdown() {
114114- log.Debugf("shutting down autoscaling scheduler for %s", p.ident)
116116+ p.log.Debug("shutting down autoscaling scheduler", "ident", p.ident)
115117116118 // stop autoscaling
117119 p.autoscalerIn <- struct{}{}
118120 close(p.autoscalerIn)
119121 <-p.autoscalerOut
120122121121- log.Debug("stopping autoscaling scheduler workers")
123123+ p.log.Debug("stopping autoscaling scheduler workers")
122124 // stop workers
123125 for i := 0; i < p.concurrency; i++ {
124126 p.feeder <- &consumerTask{signal: "stop"}
125127 }
126128 close(p.feeder)
127129128128- log.Debug("waiting for autoscaling scheduler workers to stop")
130130+ p.log.Debug("waiting for autoscaling scheduler workers to stop")
129131130132 p.workerGroup.Wait()
131133132132- log.Debug("stopping autoscaling scheduler throughput manager")
134134+ p.log.Debug("stopping autoscaling scheduler throughput manager")
133135 p.throughputManager.Stop()
134136135135- log.Debug("autoscaling scheduler shutdown complete")
137137+ p.log.Debug("autoscaling scheduler shutdown complete")
136138}
137139138140// Add autoscaling function
···197199}
198200199201func (p *Scheduler) worker() {
200200- log.Debugf("starting autoscaling worker for %s", p.ident)
202202+ p.log.Debug("starting autoscaling worker", "ident", p.ident)
201203 p.workersActive.Inc()
202204 p.workerGroup.Add(1)
203205 defer p.workerGroup.Done()
···205207 for work != nil {
206208 // Check if the work item contains a signal to stop the worker.
207209 if work.signal == "stop" {
208208- log.Debugf("stopping autoscaling worker for %s", p.ident)
210210+ p.log.Debug("stopping autoscaling worker", "ident", p.ident)
209211 p.workersActive.Dec()
210212 return
211213 }
212214213215 p.itemsActive.Inc()
214216 if err := p.do(context.TODO(), work.val); err != nil {
215215- log.Errorf("event handler failed: %s", err)
217217+ p.log.Error("event handler failed", "err", err)
216218 }
217219 p.itemsProcessed.Inc()
218220219221 p.lk.Lock()
220222 rem, ok := p.active[work.repo]
221223 if !ok {
222222- log.Errorf("should always have an 'active' entry if a worker is processing a job")
224224+ p.log.Error("should always have an 'active' entry if a worker is processing a job")
223225 }
224226225227 if len(rem) == 0 {
+9-7
events/schedulers/parallel/parallel.go
···2233import (
44 "context"
55+ "log/slog"
56 "sync"
6778 "github.com/bluesky-social/indigo/events"
89 "github.com/bluesky-social/indigo/events/schedulers"
99- logging "github.com/ipfs/go-log"
10101111 "github.com/prometheus/client_golang/prometheus"
1212)
1313-1414-var log = logging.Logger("parallel-scheduler")
15131614// Scheduler is a parallel scheduler that will run work on a fixed number of workers
1715type Scheduler struct {
···3331 itemsProcessed prometheus.Counter
3432 itemsActive prometheus.Counter
3533 workesActive prometheus.Gauge
3434+3535+ log *slog.Logger
3636}
37373838func NewScheduler(maxC, maxQ int, ident string, do func(context.Context, *events.XRPCStreamEvent) error) *Scheduler {
···5252 itemsProcessed: schedulers.WorkItemsProcessed.WithLabelValues(ident, "parallel"),
5353 itemsActive: schedulers.WorkItemsActive.WithLabelValues(ident, "parallel"),
5454 workesActive: schedulers.WorkersActive.WithLabelValues(ident, "parallel"),
5555+5656+ log: slog.Default().With("system", "parallel-scheduler"),
5557 }
56585759 for i := 0; i < maxC; i++ {
···6466}
65676668func (p *Scheduler) Shutdown() {
6767- log.Infof("shutting down parallel scheduler for %s", p.ident)
6969+ p.log.Info("shutting down parallel scheduler", "ident", p.ident)
68706971 for i := 0; i < p.maxConcurrency; i++ {
7072 p.feeder <- &consumerTask{
···7880 <-p.out
7981 }
80828181- log.Info("parallel scheduler shutdown complete")
8383+ p.log.Info("parallel scheduler shutdown complete")
8284}
83858486type consumerTask struct {
···123125124126 p.itemsActive.Inc()
125127 if err := p.do(context.TODO(), work.val); err != nil {
126126- log.Errorf("event handler failed: %s", err)
128128+ p.log.Error("event handler failed", "err", err)
127129 }
128130 p.itemsProcessed.Inc()
129131130132 p.lk.Lock()
131133 rem, ok := p.active[work.repo]
132134 if !ok {
133133- log.Errorf("should always have an 'active' entry if a worker is processing a job")
135135+ p.log.Error("should always have an 'active' entry if a worker is processing a job")
134136 }
135137136138 if len(rem) == 0 {
+1-3
events/schedulers/sequential/sequential.go
···2233import (
44 "context"
55-65 "github.com/bluesky-social/indigo/events"
76 "github.com/bluesky-social/indigo/events/schedulers"
88- logging "github.com/ipfs/go-log"
97 "github.com/prometheus/client_golang/prometheus"
108)
1191212-var log = logging.Logger("sequential-scheduler")
1010+// var log = slog.Default().With("system", "sequential-scheduler")
13111412// Scheduler is a sequential scheduler that will run work on a single worker
1513type Scheduler struct {
+3-7
fakedata/accounts.go
···20202121func (ac *AccountCatalog) Combined() []AccountContext {
2222 var combined []AccountContext
2323- for _, c := range ac.Celebs {
2424- combined = append(combined, c)
2525- }
2626- for _, r := range ac.Regulars {
2727- combined = append(combined, r)
2828- }
2323+ combined = append(combined, ac.Celebs...)
2424+ combined = append(combined, ac.Regulars...)
2925 return combined
3026}
3127···7268 return nil, fmt.Errorf("account index didn't match: %d != %d (%s)", i, u.Index, u.AccountType)
7369 }
7470 }
7575- log.Infof("loaded account catalog: regular=%d celebrity=%d", len(catalog.Regulars), len(catalog.Celebs))
7171+ log.Info("loaded account catalog", "regular", len(catalog.Regulars), "celebrity", len(catalog.Celebs))
7672 return catalog, nil
7773}
7874
···1818var _ = math.E
1919var _ = sort.Sort
20202121-func (t *nodeData) MarshalCBOR(w io.Writer) error {
2121+func (t *NodeData) MarshalCBOR(w io.Writer) error {
2222 if t == nil {
2323 _, err := w.Write(cbg.CborNull)
2424 return err
···3030 return err
3131 }
32323333- // t.Entries ([]mst.treeEntry) (slice)
3333+ // t.Entries ([]mst.TreeEntry) (slice)
3434 if len("e") > 1000000 {
3535 return xerrors.Errorf("Value in field \"e\" was too long")
3636 }
···8181 return nil
8282}
83838484-func (t *nodeData) UnmarshalCBOR(r io.Reader) (err error) {
8585- *t = nodeData{}
8484+func (t *NodeData) UnmarshalCBOR(r io.Reader) (err error) {
8585+ *t = NodeData{}
86868787 cr := cbg.NewCborReader(r)
8888···101101 }
102102103103 if extra > cbg.MaxLength {
104104- return fmt.Errorf("nodeData: map struct too large (%d)", extra)
104104+ return fmt.Errorf("NodeData: map struct too large (%d)", extra)
105105 }
106106107107 n := extra
···122122 }
123123124124 switch string(nameBuf[:nameLen]) {
125125- // t.Entries ([]mst.treeEntry) (slice)
125125+ // t.Entries ([]mst.TreeEntry) (slice)
126126 case "e":
127127128128 maj, extra, err = cr.ReadHeader()
···139139 }
140140141141 if extra > 0 {
142142- t.Entries = make([]treeEntry, extra)
142142+ t.Entries = make([]TreeEntry, extra)
143143 }
144144145145 for i := 0; i < int(extra); i++ {
···195195196196 return nil
197197}
198198-func (t *treeEntry) MarshalCBOR(w io.Writer) error {
198198+func (t *TreeEntry) MarshalCBOR(w io.Writer) error {
199199 if t == nil {
200200 _, err := w.Write(cbg.CborNull)
201201 return err
···294294 return nil
295295}
296296297297-func (t *treeEntry) UnmarshalCBOR(r io.Reader) (err error) {
298298- *t = treeEntry{}
297297+func (t *TreeEntry) UnmarshalCBOR(r io.Reader) (err error) {
298298+ *t = TreeEntry{}
299299300300 cr := cbg.NewCborReader(r)
301301···314314 }
315315316316 if extra > cbg.MaxLength {
317317- return fmt.Errorf("treeEntry: map struct too large (%d)", extra)
317317+ return fmt.Errorf("TreeEntry: map struct too large (%d)", extra)
318318 }
319319320320 n := extra
+8-8
mst/mst.go
···105105// the CBOR codec.
106106func CBORTypes() []reflect.Type {
107107 return []reflect.Type{
108108- reflect.TypeOf(nodeData{}),
109109- reflect.TypeOf(treeEntry{}),
108108+ reflect.TypeOf(NodeData{}),
109109+ reflect.TypeOf(TreeEntry{}),
110110 }
111111}
112112113113// MST tree node as gets serialized to CBOR. Note that the CBOR fields are all
114114// single-character.
115115-type nodeData struct {
115115+type NodeData struct {
116116 Left *cid.Cid `cborgen:"l"` // [nullable] pointer to lower-level subtree to the "left" of this path/key
117117- Entries []treeEntry `cborgen:"e"` // ordered list of entries at this node
117117+ Entries []TreeEntry `cborgen:"e"` // ordered list of entries at this node
118118}
119119120120-// treeEntry are elements of nodeData's Entries.
121121-type treeEntry struct {
120120+// TreeEntry are elements of NodeData's Entries.
121121+type TreeEntry struct {
122122 PrefixLen int64 `cborgen:"p"` // count of characters shared with previous path/key in tree
123123 KeySuffix []byte `cborgen:"k"` // remaining part of path/key (appended to "previous key")
124124 Val cid.Cid `cborgen:"v"` // CID pointer at this path/key
···189189 // otherwise this is a virtual/pointer struct and we need to hydrate from
190190 // blockstore before returning entries
191191 if mst.pointer != cid.Undef {
192192- var nd nodeData
192192+ var nd NodeData
193193 if err := mst.cst.Get(ctx, mst.pointer, &nd); err != nil {
194194 return nil, err
195195 }
···210210}
211211212212// golang-specific helper that calls in to deserializeNodeData
213213-func entriesFromNodeData(ctx context.Context, nd *nodeData, cst cbor.IpldStore) ([]nodeEntry, error) {
213213+func entriesFromNodeData(ctx context.Context, nd *NodeData, cst cbor.IpldStore) ([]nodeEntry, error) {
214214 layer := -1
215215 if len(nd.Entries) > 0 {
216216 // NOTE(bnewbold): can compute the layer on the first KeySuffix, because for the first entry that field is a complete key
···267267 }
268268 }
269269 }
270270- for _, t := range p.Tags {
271271- ret = append(ret, t)
272272- }
270270+ ret = append(ret, p.Tags...)
273271 if len(ret) == 0 {
274272 return nil
275273 }
+16
splitter/metrics.go
···11+package splitter
22+33+import (
44+ "github.com/prometheus/client_golang/prometheus"
55+ "github.com/prometheus/client_golang/prometheus/promauto"
66+)
77+88+var eventsSentCounter = promauto.NewCounterVec(prometheus.CounterOpts{
99+ Name: "spl_events_sent_counter",
1010+ Help: "The total number of events sent to consumers",
1111+}, []string{"remote_addr", "user_agent"})
1212+1313+var activeClientGauge = promauto.NewGauge(prometheus.GaugeOpts{
1414+ Name: "spl_active_clients",
1515+ Help: "Current number of active clients",
1616+})
+144
splitter/ringbuf.go
···11+package splitter
22+33+import (
44+ "context"
55+ "sync"
66+77+ events "github.com/bluesky-social/indigo/events"
88+ "github.com/bluesky-social/indigo/models"
99+)
1010+1111+func NewEventRingBuffer(chunkSize, nchunks int) *EventRingBuffer {
1212+ return &EventRingBuffer{
1313+ chunkSize: chunkSize,
1414+ maxChunkCount: nchunks,
1515+ }
1616+}
1717+1818+type EventRingBuffer struct {
1919+ lk sync.Mutex
2020+ chunks []*ringChunk
2121+ chunkSize int
2222+ maxChunkCount int
2323+2424+ broadcast func(*events.XRPCStreamEvent)
2525+}
2626+2727+type ringChunk struct {
2828+ lk sync.Mutex
2929+ buf []*events.XRPCStreamEvent
3030+}
3131+3232+func (rc *ringChunk) append(evt *events.XRPCStreamEvent) {
3333+ rc.lk.Lock()
3434+ defer rc.lk.Unlock()
3535+ rc.buf = append(rc.buf, evt)
3636+}
3737+3838+func (rc *ringChunk) events() []*events.XRPCStreamEvent {
3939+ rc.lk.Lock()
4040+ defer rc.lk.Unlock()
4141+ return rc.buf
4242+}
4343+4444+func (er *EventRingBuffer) Persist(ctx context.Context, evt *events.XRPCStreamEvent) error {
4545+ er.lk.Lock()
4646+ defer er.lk.Unlock()
4747+4848+ if len(er.chunks) == 0 {
4949+ er.chunks = []*ringChunk{new(ringChunk)}
5050+ }
5151+5252+ last := er.chunks[len(er.chunks)-1]
5353+ if len(last.buf) >= er.chunkSize {
5454+ last = new(ringChunk)
5555+ er.chunks = append(er.chunks, last)
5656+ if len(er.chunks) > er.maxChunkCount {
5757+ er.chunks = er.chunks[1:]
5858+ }
5959+ }
6060+6161+ last.append(evt)
6262+6363+ er.broadcast(evt)
6464+ return nil
6565+}
6666+6767+func (er *EventRingBuffer) Flush(context.Context) error {
6868+ return nil
6969+}
7070+7171+func (er *EventRingBuffer) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
7272+ // run playback a few times to get as close to 'live' as possible before returning
7373+ for i := 0; i < 10; i++ {
7474+ n, err := er.playbackRound(ctx, since, cb)
7575+ if err != nil {
7676+ return err
7777+ }
7878+7979+ // playback had no new events
8080+ if n-since == 0 {
8181+ return nil
8282+ }
8383+ since = n
8484+ }
8585+8686+ return nil
8787+}
8888+8989+func (er *EventRingBuffer) playbackRound(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) (int64, error) {
9090+ // grab a snapshot of the current chunks
9191+ er.lk.Lock()
9292+ chunks := er.chunks
9393+ er.lk.Unlock()
9494+9595+ i := len(chunks) - 1
9696+ for ; i >= 0; i-- {
9797+ c := chunks[i]
9898+ evts := c.events()
9999+ if since > events.SequenceForEvent(evts[len(evts)-1]) {
100100+ i++
101101+ break
102102+ }
103103+ }
104104+ if i < 0 {
105105+ i = 0
106106+ }
107107+108108+ var lastSeq int64 = since
109109+ for _, c := range chunks[i:] {
110110+ var nread int
111111+ evts := c.events()
112112+ for nread < len(evts) {
113113+ for _, e := range evts[nread:] {
114114+ nread++
115115+ seq := events.SequenceForEvent(e)
116116+ if seq <= since {
117117+ continue
118118+ }
119119+120120+ if err := cb(e); err != nil {
121121+ return 0, err
122122+ }
123123+ lastSeq = seq
124124+ }
125125+126126+ // recheck evts buffer to see if more were added while we were here
127127+ evts = c.events()
128128+ }
129129+ }
130130+131131+ return lastSeq, nil
132132+}
133133+134134+func (er *EventRingBuffer) SetEventBroadcaster(brc func(*events.XRPCStreamEvent)) {
135135+ er.broadcast = brc
136136+}
137137+138138+func (er *EventRingBuffer) Shutdown(context.Context) error {
139139+ return nil
140140+}
141141+142142+func (er *EventRingBuffer) TakeDownRepo(context.Context, models.Uid) error {
143143+ return nil
144144+}