+3
-3
automod/countstore.go
+3
-3
automod/countstore.go
···
15
15
16
16
type CountStore interface {
17
17
GetCount(ctx context.Context, key, period string) (int, error)
18
-
Increment(ctx context.Context, key string) (int, error)
18
+
Increment(ctx context.Context, key string) error
19
19
}
20
20
21
21
// TODO: this implementation isn't race-safe (yet)!
···
45
45
}
46
46
}
47
47
48
-
func (s *MemCountStore) GetCount(ctx context.Context, key, period string) (int, error) {
48
+
func (s MemCountStore) GetCount(ctx context.Context, key, period string) (int, error) {
49
49
v, ok := s.Counts[PeriodKey(key, period)]
50
50
if !ok {
51
51
return 0, nil
···
53
53
return v, nil
54
54
}
55
55
56
-
func (s *MemCountStore) Increment(ctx context.Context, key string) error {
56
+
func (s MemCountStore) Increment(ctx context.Context, key string) error {
57
57
for _, p := range []string{PeriodTotal, PeriodDay, PeriodHour} {
58
58
k := PeriodKey(key, p)
59
59
v, ok := s.Counts[k]
+52
-90
automod/engine.go
+52
-90
automod/engine.go
···
1
1
package automod
2
2
3
3
import (
4
-
"bytes"
5
4
"context"
6
5
"fmt"
7
6
"log/slog"
8
7
"strings"
9
8
"sync"
10
9
11
-
comatproto "github.com/bluesky-social/indigo/api/atproto"
10
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
12
11
"github.com/bluesky-social/indigo/atproto/identity"
13
12
"github.com/bluesky-social/indigo/atproto/syntax"
14
-
lexutil "github.com/bluesky-social/indigo/lex/util"
15
-
"github.com/bluesky-social/indigo/repo"
16
-
"github.com/bluesky-social/indigo/repomgr"
17
13
"github.com/bluesky-social/indigo/xrpc"
18
14
)
19
15
20
-
// runtime for executing rules, managing state, and recording moderation actions
16
+
// runtime for executing rules, managing state, and recording moderation actions.
17
+
//
18
+
// TODO: careful when initializing: several fields should not be null or zero, even though they are pointer type.
21
19
type Engine struct {
20
+
Logger *slog.Logger
21
+
Directory identity.Directory
22
22
// current rule sets. will eventually be possible to swap these out at runtime
23
23
RulesMap sync.Map
24
-
Directory identity.Directory
25
24
// used to persist moderation actions in mod service (optional)
26
25
AdminClient *xrpc.Client
27
26
CountStore CountStore
28
27
}
29
28
30
-
func (e *Engine) ProcessIdentityEvent(t string, did syntax.DID) error {
31
-
ctx := context.Background()
32
-
29
+
func (e *Engine) ProcessIdentityEvent(ctx context.Context, t string, did syntax.DID) error {
33
30
// similar to an HTTP server, we want to recover any panics from rule execution
34
31
defer func() {
35
32
if r := recover(); r != nil {
36
-
slog.Error("automod event execution exception", "err", r)
37
-
// TODO: mark repo as dirty?
38
-
// TODO: circuit-break on repeated panics?
33
+
e.Logger.Error("automod event execution exception", "err", r)
39
34
}
40
35
}()
41
36
···
53
48
Account: AccountMeta{Identity: ident},
54
49
},
55
50
}
56
-
e.CallIdentityRules(&evt)
57
-
58
-
_ = ctx
51
+
// TODO: call rules
52
+
_ = evt
59
53
return nil
60
54
}
61
55
62
-
// this method takes a full firehose commit event. it must not be a tooBig
63
-
func (e *Engine) ProcessCommit(ctx context.Context, commit *comatproto.SyncSubscribeRepos_Commit) error {
64
-
56
+
func (e *Engine) ProcessRecord(ctx context.Context, did syntax.DID, path string, rec any) error {
65
57
// similar to an HTTP server, we want to recover any panics from rule execution
66
-
/*
67
58
defer func() {
68
59
if r := recover(); r != nil {
69
-
slog.Error("automod event execution exception", "err", r)
70
-
// TODO: mark repo as dirty?
71
-
// TODO: circuit-break on repeated panics?
60
+
e.Logger.Error("automod event execution exception", "err", r)
72
61
}
73
62
}()
74
-
*/
75
-
76
-
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(commit.Blocks))
77
-
if err != nil {
78
-
// TODO: handle this case (instead of return nil)
79
-
slog.Error("reading repo from car", "size_bytes", len(commit.Blocks), "err", err)
80
-
return nil
81
-
}
82
-
83
-
did, err := syntax.ParseDID(commit.Repo)
84
-
if err != nil {
85
-
return fmt.Errorf("bad DID syntax in event: %w", err)
86
-
}
87
63
88
64
ident, err := e.Directory.LookupDID(ctx, did)
89
65
if err != nil {
···
92
68
if ident == nil {
93
69
return fmt.Errorf("identity not found for did: %s", did.String())
94
70
}
71
+
collection := strings.SplitN(path, "/", 2)[0]
95
72
96
-
for _, op := range commit.Ops {
97
-
ek := repomgr.EventKind(op.Action)
98
-
logOp := slog.With("op_path", op.Path, "op_cid", op.Cid)
99
-
switch ek {
100
-
case repomgr.EvtKindCreateRecord:
101
-
rc, rec, err := r.GetRecord(ctx, op.Path)
102
-
if err != nil {
103
-
// TODO: handle this case (instead of return nil)
104
-
logOp.Error("fetching record from event CAR slice", "err", err)
105
-
return nil
106
-
}
107
-
if lexutil.LexLink(rc) != *op.Cid {
108
-
// TODO: handle this case (instead of return nil)
109
-
logOp.Error("mismatch in record and op cid", "record_cid", rc)
110
-
return nil
111
-
}
112
-
113
-
if strings.HasPrefix(op.Path, "app.bsky.feed.post/") {
114
-
// TODO: handle as a PostEvent specially
115
-
} else {
116
-
// XXX: pass record in to event
117
-
_ = rec
118
-
evt := RecordEvent{
119
-
Event{
120
-
Engine: e,
121
-
Account: AccountMeta{Identity: ident},
122
-
},
123
-
[]string{},
124
-
false,
125
-
[]ModReport{},
126
-
[]string{},
127
-
}
128
-
e.CallRecordRules(&evt)
129
-
// TODO persist
130
-
}
131
-
case repomgr.EvtKindUpdateRecord:
132
-
slog.Info("ignoring record update", "did", commit.Repo, "seq", commit.Seq, "path", op.Path)
133
-
return nil
134
-
case repomgr.EvtKindDeleteRecord:
135
-
slog.Info("ignoring record deletion", "did", commit.Repo, "seq", commit.Seq, "path", op.Path)
136
-
return nil
73
+
switch collection {
74
+
case "app.bsky.feed.post":
75
+
post, ok := rec.(*appbsky.FeedPost)
76
+
if !ok {
77
+
return fmt.Errorf("mismatch between collection (%s) and type", collection)
137
78
}
79
+
evt := e.NewPostEvent(ident, path, post)
80
+
e.Logger.Info("processing post", "did", ident.DID, "path", path)
81
+
_ = evt
82
+
// TODO: call rules
83
+
default:
84
+
evt := e.NewRecordEvent(ident, path, rec)
85
+
e.Logger.Info("processing record", "did", ident.DID, "path", path)
86
+
_ = evt
87
+
// TODO: call rules
138
88
}
139
89
140
-
_ = ctx
141
90
return nil
142
91
}
143
92
144
-
func (e *Engine) CallIdentityRules(evt *IdentityEvent) error {
145
-
slog.Info("calling rules on identity event")
146
-
return nil
93
+
func (e *Engine) NewPostEvent(ident *identity.Identity, path string, post *appbsky.FeedPost) PostEvent {
94
+
return PostEvent{
95
+
RecordEvent {
96
+
Event{
97
+
Engine: e,
98
+
Account: AccountMeta{Identity: ident},
99
+
},
100
+
[]string{},
101
+
false,
102
+
[]ModReport{},
103
+
[]string{},
104
+
},
105
+
}
147
106
}
148
107
149
-
func (e *Engine) CallRecordRules(evt *RecordEvent) error {
150
-
slog.Info("calling rules on record event")
151
-
return nil
152
-
}
153
-
154
-
func (e *Engine) PersistModActions() error {
155
-
// XXX
156
-
return nil
108
+
func (e *Engine) NewRecordEvent(ident *identity.Identity, path string, rec any) RecordEvent {
109
+
return RecordEvent{
110
+
Event{
111
+
Engine: e,
112
+
Account: AccountMeta{Identity: ident},
113
+
},
114
+
[]string{},
115
+
false,
116
+
[]ModReport{},
117
+
[]string{},
118
+
}
157
119
}
158
120
159
121
func (e *Engine) GetCount(key, period string) (int, error) {
160
122
return e.CountStore.GetCount(context.TODO(), key, period)
161
123
}
162
124
163
-
func (e *Engine) InSet(name, val string) (bool, error) {
125
+
func (e *Engine) InSet(setName, val string) (bool, error) {
164
126
// XXX: implement
165
127
return false, nil
166
128
}
+123
cmd/hepa/consumer.go
+123
cmd/hepa/consumer.go
···
1
+
package main
2
+
3
+
import (
4
+
"bytes"
5
+
"context"
6
+
"fmt"
7
+
"net/http"
8
+
"net/url"
9
+
10
+
comatproto "github.com/bluesky-social/indigo/api/atproto"
11
+
"github.com/bluesky-social/indigo/atproto/syntax"
12
+
"github.com/bluesky-social/indigo/events/schedulers/autoscaling"
13
+
lexutil "github.com/bluesky-social/indigo/lex/util"
14
+
15
+
"github.com/bluesky-social/indigo/events"
16
+
"github.com/bluesky-social/indigo/repo"
17
+
"github.com/bluesky-social/indigo/repomgr"
18
+
"github.com/carlmjohnson/versioninfo"
19
+
"github.com/gorilla/websocket"
20
+
)
21
+
22
+
func (s *Server) RunConsumer(ctx context.Context) error {
23
+
24
+
// TODO: persist cursor in a database or local disk
25
+
cur := 0
26
+
27
+
dialer := websocket.DefaultDialer
28
+
u, err := url.Parse(s.bgshost)
29
+
if err != nil {
30
+
return fmt.Errorf("invalid bgshost URI: %w", err)
31
+
}
32
+
u.Path = "xrpc/com.atproto.sync.subscribeRepos"
33
+
if cur != 0 {
34
+
u.RawQuery = fmt.Sprintf("cursor=%d", cur)
35
+
}
36
+
s.logger.Info("subscribing to repo event stream", "upstream", s.bgshost, "cursor", cur)
37
+
con, _, err := dialer.Dial(u.String(), http.Header{
38
+
"User-Agent": []string{fmt.Sprintf("hepa/%s", versioninfo.Short())},
39
+
})
40
+
if err != nil {
41
+
return fmt.Errorf("subscribing to firehose failed (dialing): %w", err)
42
+
}
43
+
44
+
rsc := &events.RepoStreamCallbacks{
45
+
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
46
+
return s.HandleRepoCommit(ctx, evt)
47
+
},
48
+
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
49
+
did, err := syntax.ParseDID(evt.Did)
50
+
if err != nil {
51
+
s.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
52
+
return nil
53
+
}
54
+
if err := s.engine.ProcessIdentityEvent(ctx, "handle", did); err != nil {
55
+
s.logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
56
+
}
57
+
return nil
58
+
},
59
+
// TODO: other event callbacks as needed
60
+
}
61
+
62
+
return events.HandleRepoStream(
63
+
ctx, con, autoscaling.NewScheduler(
64
+
autoscaling.DefaultAutoscaleSettings(),
65
+
s.bgshost,
66
+
rsc.EventHandler,
67
+
),
68
+
)
69
+
}
70
+
71
+
// NOTE: for now, this function basically never errors, just logs and returns nil. Should think through error processing better.
72
+
func (s *Server) HandleRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
73
+
74
+
logger := s.logger.With("event", "commit", "did", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
75
+
// XXX: debug, not info
76
+
logger.Info("received commit event")
77
+
78
+
if evt.TooBig {
79
+
logger.Warn("skipping tooBig events for now")
80
+
return nil
81
+
}
82
+
83
+
did, err := syntax.ParseDID(evt.Repo)
84
+
if err != nil {
85
+
logger.Error("bad DID syntax in event", "err", err)
86
+
return nil
87
+
}
88
+
89
+
rr, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(evt.Blocks))
90
+
if err != nil {
91
+
logger.Error("failed to read repo from car", "err", err)
92
+
return nil
93
+
}
94
+
95
+
for _, op := range evt.Ops {
96
+
logger = logger.With("eventKind", op.Action, "path", op.Path)
97
+
98
+
ek := repomgr.EventKind(op.Action)
99
+
switch ek {
100
+
case repomgr.EvtKindCreateRecord:
101
+
// read the record from blocks, and verify CID
102
+
rc, rec, err := rr.GetRecord(ctx, op.Path)
103
+
if err != nil {
104
+
logger.Error("reading record from event blocks (CAR)", "err", err)
105
+
break
106
+
}
107
+
if op.Cid == nil || lexutil.LexLink(rc) != *op.Cid {
108
+
logger.Error("mismatch between commit op CID and record block", "recordCID", rc, "opCID", op.Cid)
109
+
break
110
+
}
111
+
112
+
err = s.engine.ProcessRecord(ctx, did, op.Path, rec)
113
+
if err != nil {
114
+
logger.Error("engine failed to process record", "err", err)
115
+
continue
116
+
}
117
+
default:
118
+
// TODO: other event types: update, delete
119
+
}
120
+
}
121
+
122
+
return nil
123
+
}
-292
cmd/hepa/firehose.go
-292
cmd/hepa/firehose.go
···
1
-
package main
2
-
3
-
import (
4
-
"bytes"
5
-
"context"
6
-
"fmt"
7
-
"net/http"
8
-
"net/url"
9
-
"strings"
10
-
"time"
11
-
12
-
comatproto "github.com/bluesky-social/indigo/api/atproto"
13
-
//bsky "github.com/bluesky-social/indigo/api/bsky"
14
-
"github.com/bluesky-social/indigo/atproto/syntax"
15
-
"github.com/bluesky-social/indigo/events"
16
-
"github.com/bluesky-social/indigo/events/schedulers/autoscaling"
17
-
"github.com/bluesky-social/indigo/repo"
18
-
19
-
"github.com/carlmjohnson/versioninfo"
20
-
"github.com/gorilla/websocket"
21
-
"github.com/ipfs/go-cid"
22
-
typegen "github.com/whyrusleeping/cbor-gen"
23
-
)
24
-
25
-
func (s *Server) getLastCursor() (int64, error) {
26
-
var lastSeq LastSeq
27
-
if err := s.db.Find(&lastSeq).Error; err != nil {
28
-
return 0, err
29
-
}
30
-
31
-
if lastSeq.ID == 0 {
32
-
return 0, s.db.Create(&lastSeq).Error
33
-
}
34
-
35
-
return lastSeq.Seq, nil
36
-
}
37
-
38
-
func (s *Server) updateLastCursor(curs int64) error {
39
-
return s.db.Model(LastSeq{}).Where("id = 1").Update("seq", curs).Error
40
-
}
41
-
42
-
func (s *Server) Run(ctx context.Context) error {
43
-
cur, err := s.getLastCursor()
44
-
if err != nil {
45
-
return fmt.Errorf("get last cursor: %w", err)
46
-
}
47
-
48
-
err = s.bfs.LoadJobs(ctx)
49
-
if err != nil {
50
-
return fmt.Errorf("loading backfill jobs: %w", err)
51
-
}
52
-
go s.bf.Start()
53
-
go s.discoverRepos()
54
-
55
-
d := websocket.DefaultDialer
56
-
u, err := url.Parse(s.bgshost)
57
-
if err != nil {
58
-
return fmt.Errorf("invalid bgshost URI: %w", err)
59
-
}
60
-
u.Path = "xrpc/com.atproto.sync.subscribeRepos"
61
-
if cur != 0 {
62
-
u.RawQuery = fmt.Sprintf("cursor=%d", cur)
63
-
}
64
-
con, _, err := d.Dial(u.String(), http.Header{
65
-
"User-Agent": []string{fmt.Sprintf("palomar/%s", versioninfo.Short())},
66
-
})
67
-
if err != nil {
68
-
return fmt.Errorf("events dial failed: %w", err)
69
-
}
70
-
71
-
rsc := &events.RepoStreamCallbacks{
72
-
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
73
-
ctx := context.Background()
74
-
ctx, span := tracer.Start(ctx, "RepoCommit")
75
-
defer span.End()
76
-
77
-
defer func() {
78
-
if evt.Seq%50 == 0 {
79
-
if err := s.updateLastCursor(evt.Seq); err != nil {
80
-
s.logger.Error("failed to persist cursor", "err", err)
81
-
}
82
-
}
83
-
}()
84
-
logEvt := s.logger.With("repo", evt.Repo, "rev", evt.Rev, "seq", evt.Seq)
85
-
if evt.TooBig && evt.Prev != nil {
86
-
// TODO: handle this case (instead of return nil)
87
-
logEvt.Error("skipping non-genesis tooBig events for now")
88
-
return nil
89
-
}
90
-
91
-
if evt.TooBig {
92
-
if err := s.processTooBigCommit(ctx, evt); err != nil {
93
-
// TODO: handle this case (instead of return nil)
94
-
logEvt.Error("failed to process tooBig event", "err", err)
95
-
return nil
96
-
}
97
-
98
-
return nil
99
-
}
100
-
101
-
if !s.skipBackfill {
102
-
// Check if we've backfilled this repo, if not, we should enqueue it
103
-
job, err := s.bfs.GetJob(ctx, evt.Repo)
104
-
if job == nil && err == nil {
105
-
logEvt.Info("enqueueing backfill job for new repo")
106
-
if err := s.bfs.EnqueueJob(evt.Repo); err != nil {
107
-
logEvt.Warn("failed to enqueue backfill job", "err", err)
108
-
}
109
-
}
110
-
}
111
-
112
-
if err = s.engine.ProcessCommit(ctx, evt); err != nil {
113
-
// TODO: handle this, instead of return nul
114
-
logEvt.Error("failed to process commit", "err", err)
115
-
return nil
116
-
}
117
-
118
-
return nil
119
-
120
-
},
121
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
122
-
ctx := context.Background()
123
-
ctx, span := tracer.Start(ctx, "RepoHandle")
124
-
defer span.End()
125
-
126
-
did, err := syntax.ParseDID(evt.Did)
127
-
if err != nil {
128
-
s.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
129
-
return nil
130
-
}
131
-
if err := s.engine.ProcessIdentityEvent("handle", did); err != nil {
132
-
s.logger.Error("processing handle update failed", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
133
-
}
134
-
return nil
135
-
},
136
-
}
137
-
138
-
return events.HandleRepoStream(
139
-
ctx, con, autoscaling.NewScheduler(
140
-
autoscaling.DefaultAutoscaleSettings(),
141
-
s.bgshost,
142
-
rsc.EventHandler,
143
-
),
144
-
)
145
-
}
146
-
147
-
func (s *Server) discoverRepos() {
148
-
if s.skipBackfill {
149
-
s.logger.Info("skipping repo discovery")
150
-
return
151
-
}
152
-
153
-
ctx := context.Background()
154
-
log := s.logger.With("func", "discoverRepos")
155
-
log.Info("starting repo discovery")
156
-
157
-
cursor := ""
158
-
limit := int64(500)
159
-
160
-
totalEnqueued := 0
161
-
totalSkipped := 0
162
-
totalErrored := 0
163
-
164
-
for {
165
-
resp, err := comatproto.SyncListRepos(ctx, s.bgsxrpc, cursor, limit)
166
-
if err != nil {
167
-
log.Error("failed to list repos", "err", err)
168
-
time.Sleep(5 * time.Second)
169
-
continue
170
-
}
171
-
log.Info("got repo page", "count", len(resp.Repos), "cursor", resp.Cursor)
172
-
enqueued := 0
173
-
skipped := 0
174
-
errored := 0
175
-
for _, repo := range resp.Repos {
176
-
job, err := s.bfs.GetJob(ctx, repo.Did)
177
-
if job == nil && err == nil {
178
-
log.Info("enqueuing backfill job for new repo", "did", repo.Did)
179
-
if err := s.bfs.EnqueueJob(repo.Did); err != nil {
180
-
log.Warn("failed to enqueue backfill job", "err", err)
181
-
errored++
182
-
continue
183
-
}
184
-
enqueued++
185
-
} else if err != nil {
186
-
log.Warn("failed to get backfill job", "did", repo.Did, "err", err)
187
-
errored++
188
-
} else {
189
-
skipped++
190
-
}
191
-
}
192
-
log.Info("enqueued repos", "enqueued", enqueued, "skipped", skipped, "errored", errored)
193
-
totalEnqueued += enqueued
194
-
totalSkipped += skipped
195
-
totalErrored += errored
196
-
if resp.Cursor != nil && *resp.Cursor != "" {
197
-
cursor = *resp.Cursor
198
-
} else {
199
-
break
200
-
}
201
-
}
202
-
203
-
log.Info("finished repo discovery", "totalEnqueued", totalEnqueued, "totalSkipped", totalSkipped, "totalErrored", totalErrored)
204
-
}
205
-
206
-
func (s *Server) handleCreateOrUpdate(ctx context.Context, rawDID string, path string, recP *typegen.CBORMarshaler, rcid *cid.Cid) error {
207
-
// Since this gets called in a backfill job, we need to check if the path is a post or profile
208
-
if !strings.Contains(path, "app.bsky.feed.post") && !strings.Contains(path, "app.bsky.actor.profile") {
209
-
return nil
210
-
}
211
-
212
-
did, err := syntax.ParseDID(rawDID)
213
-
if err != nil {
214
-
return fmt.Errorf("bad DID syntax in event: %w", err)
215
-
}
216
-
217
-
ident, err := s.dir.LookupDID(ctx, did)
218
-
if err != nil {
219
-
return fmt.Errorf("resolving identity: %w", err)
220
-
}
221
-
if ident == nil {
222
-
return fmt.Errorf("identity not found for did: %s", did.String())
223
-
}
224
-
rec := *recP
225
-
226
-
_ = rec
227
-
/* XXX:
228
-
switch rec := rec.(type) {
229
-
case *bsky.FeedPost:
230
-
// XXX: if err := s.indexPost(ctx, ident, rec, path, *rcid); err != nil {
231
-
_ = rec
232
-
if err := s.engine.ProcessCommit(ctx, evt); err != nil {
233
-
postsFailed.Inc()
234
-
return fmt.Errorf("processing post for %s: %w", did.String(), err)
235
-
}
236
-
postsIndexed.Inc()
237
-
case *bsky.ActorProfile:
238
-
// XXX: if err := s.indexProfile(ctx, ident, rec, path, *rcid); err != nil {
239
-
if err := s.engine.ProcessCommit(ctx, evt); err != nil {
240
-
profilesFailed.Inc()
241
-
return fmt.Errorf("processing profile for %s: %w", did.String(), err)
242
-
}
243
-
profilesIndexed.Inc()
244
-
default:
245
-
}
246
-
*/
247
-
return nil
248
-
}
249
-
250
-
func (s *Server) handleDelete(ctx context.Context, rawDID, path string) error {
251
-
// TODO: just ignoring for now
252
-
return nil
253
-
}
254
-
255
-
func (s *Server) processTooBigCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) error {
256
-
repodata, err := comatproto.SyncGetRepo(ctx, s.bgsxrpc, evt.Repo, "")
257
-
if err != nil {
258
-
return err
259
-
}
260
-
261
-
r, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repodata))
262
-
if err != nil {
263
-
return err
264
-
}
265
-
266
-
did, err := syntax.ParseDID(evt.Repo)
267
-
if err != nil {
268
-
return fmt.Errorf("bad DID in repo event: %w", err)
269
-
}
270
-
271
-
return r.ForEach(ctx, "", func(k string, v cid.Cid) error {
272
-
if strings.HasPrefix(k, "app.bsky.feed.post") || strings.HasPrefix(k, "app.bsky.actor.profile") {
273
-
rcid, rec, err := r.GetRecord(ctx, k)
274
-
if err != nil {
275
-
// TODO: handle this case (instead of return nil)
276
-
s.logger.Error("failed to get record from repo checkout", "path", k, "err", err)
277
-
return nil
278
-
}
279
-
280
-
// TODO: may want to treat this as a regular event?
281
-
_ = rcid
282
-
_ = did
283
-
_ = rec
284
-
/* XXX:
285
-
if err := s.engine.ProcessRecord(ctx, did, m, rec); err != nil {
286
-
return fmt.Errorf("processing record from tooBig commit: %w", err)
287
-
}
288
-
*/
289
-
}
290
-
return nil
291
-
})
292
-
}
+11
-86
cmd/hepa/main.go
+11
-86
cmd/hepa/main.go
···
3
3
import (
4
4
"context"
5
5
"fmt"
6
-
"log"
7
6
"log/slog"
8
7
"net/http"
9
8
"os"
10
9
"time"
11
10
12
11
"github.com/bluesky-social/indigo/atproto/identity"
13
-
"github.com/bluesky-social/indigo/util/cliutil"
14
12
15
13
"github.com/carlmjohnson/versioninfo"
16
14
_ "github.com/joho/godotenv/autoload"
17
15
cli "github.com/urfave/cli/v2"
18
-
"go.opentelemetry.io/otel"
19
-
"go.opentelemetry.io/otel/attribute"
20
-
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
21
-
"go.opentelemetry.io/otel/sdk/resource"
22
-
tracesdk "go.opentelemetry.io/otel/sdk/trace"
23
-
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
24
16
"golang.org/x/time/rate"
25
17
)
26
18
···
51
43
Usage: "method, hostname, and port of PLC registry",
52
44
Value: "https://plc.directory",
53
45
EnvVars: []string{"ATP_PLC_HOST"},
54
-
},
55
-
&cli.IntFlag{
56
-
Name: "max-metadb-connections",
57
-
EnvVars: []string{"MAX_METADB_CONNECTIONS"},
58
-
Value: 40,
59
46
},
60
47
}
61
48
···
68
55
69
56
var runCmd = &cli.Command{
70
57
Name: "run",
71
-
Usage: "run the service",
58
+
Usage: "run the hepa daemon",
72
59
Flags: []cli.Flag{
73
60
&cli.StringFlag{
74
-
Name: "database-url",
75
-
Value: "sqlite://data/hepa/automod.db",
76
-
EnvVars: []string{"DATABASE_URL"},
77
-
},
78
-
&cli.BoolFlag{
79
-
Name: "readonly",
80
-
EnvVars: []string{"HEPA_READONLY", "READONLY"},
81
-
},
82
-
&cli.StringFlag{
83
-
Name: "bind",
84
-
Usage: "IP or address, and port, to listen on for HTTP APIs",
85
-
Value: ":3999",
86
-
EnvVars: []string{"HEPA_BIND"},
87
-
},
88
-
&cli.StringFlag{
89
61
Name: "metrics-listen",
90
62
Usage: "IP or address, and port, to listen on for metrics APIs",
91
-
Value: ":3998",
63
+
Value: ":3989",
92
64
EnvVars: []string{"HEPA_METRICS_LISTEN"},
93
65
},
94
66
&cli.IntFlag{
95
-
Name: "bgs-sync-rate-limit",
96
-
Usage: "max repo sync (checkout) requests per second to upstream (BGS)",
97
-
Value: 8,
98
-
EnvVars: []string{"HEPA_BGS_SYNC_RATE_LIMIT"},
99
-
},
100
-
&cli.IntFlag{
101
67
Name: "plc-rate-limit",
102
68
Usage: "max number of requests per second to PLC registry",
103
69
Value: 100,
···
111
77
}))
112
78
slog.SetDefault(logger)
113
79
114
-
// Enable OTLP HTTP exporter
115
-
// For relevant environment variables:
116
-
// https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables
117
-
// At a minimum, you need to set
118
-
// OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
119
-
if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" {
120
-
slog.Info("setting up trace exporter", "endpoint", ep)
121
-
ctx, cancel := context.WithCancel(context.Background())
122
-
defer cancel()
80
+
configOTEL("hepa")
123
81
124
-
exp, err := otlptracehttp.New(ctx)
125
-
if err != nil {
126
-
log.Fatal("failed to create trace exporter", "error", err)
127
-
}
128
-
defer func() {
129
-
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
130
-
defer cancel()
131
-
if err := exp.Shutdown(ctx); err != nil {
132
-
slog.Error("failed to shutdown trace exporter", "error", err)
133
-
}
134
-
}()
135
-
136
-
tp := tracesdk.NewTracerProvider(
137
-
tracesdk.WithBatcher(exp),
138
-
tracesdk.WithResource(resource.NewWithAttributes(
139
-
semconv.SchemaURL,
140
-
semconv.ServiceNameKey.String("hepa"),
141
-
attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog
142
-
attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others
143
-
attribute.Int64("ID", 1),
144
-
)),
145
-
)
146
-
otel.SetTracerProvider(tp)
147
-
}
148
-
149
-
db, err := cliutil.SetupDatabase(cctx.String("database-url"), cctx.Int("max-metadb-connections"))
150
-
if err != nil {
151
-
return err
152
-
}
153
-
154
-
// TODO: replace this with "bingo" resolver?
155
-
base := identity.BaseDirectory{
82
+
baseDir := identity.BaseDirectory{
156
83
PLCURL: cctx.String("atp-plc-host"),
157
84
HTTPClient: http.Client{
158
85
Timeout: time.Second * 15,
···
161
88
TryAuthoritativeDNS: true,
162
89
SkipDNSDomainSuffixes: []string{".bsky.social"},
163
90
}
164
-
dir := identity.NewCacheDirectory(&base, 1_500_000, time.Hour*24, time.Minute*2)
91
+
dir := identity.NewCacheDirectory(&baseDir, 1_500_000, time.Hour*24, time.Minute*2)
165
92
166
93
srv, err := NewServer(
167
-
db,
168
94
&dir,
169
95
Config{
170
-
BGSHost: cctx.String("atp-bgs-host"),
171
-
Logger: logger,
172
-
BGSSyncRateLimit: cctx.Int("bgs-sync-rate-limit"),
96
+
BGSHost: cctx.String("atp-bgs-host"),
97
+
Logger: logger,
173
98
},
174
99
)
175
100
if err != nil {
176
101
return err
177
102
}
178
103
104
+
// prometheus HTTP endpoint: /metrics
179
105
go func() {
180
106
if err := srv.RunMetrics(cctx.String("metrics-listen")); err != nil {
181
107
slog.Error("failed to start metrics endpoint", "error", err)
···
183
109
}
184
110
}()
185
111
186
-
// TODO: if cctx.Bool("readonly") ...
187
-
188
-
if err := srv.Run(ctx); err != nil {
189
-
return fmt.Errorf("failed to run automod service: %w", err)
112
+
// the main service loop
113
+
if err := srv.RunConsumer(ctx); err != nil {
114
+
return fmt.Errorf("failure consuming and processing firehose: %w", err)
190
115
}
191
116
return nil
192
117
},
-54
cmd/hepa/metrics.go
-54
cmd/hepa/metrics.go
···
1
-
package main
2
-
3
-
import (
4
-
"github.com/prometheus/client_golang/prometheus"
5
-
"github.com/prometheus/client_golang/prometheus/promauto"
6
-
"go.opentelemetry.io/otel"
7
-
)
8
-
9
-
var tracer = otel.Tracer("hepa")
10
-
11
-
var postsReceived = promauto.NewCounter(prometheus.CounterOpts{
12
-
Name: "hepa_posts_received",
13
-
Help: "Number of posts received",
14
-
})
15
-
16
-
var postsIndexed = promauto.NewCounter(prometheus.CounterOpts{
17
-
Name: "hepa_posts_indexed",
18
-
Help: "Number of posts indexed",
19
-
})
20
-
21
-
var postsFailed = promauto.NewCounter(prometheus.CounterOpts{
22
-
Name: "hepa_posts_failed",
23
-
Help: "Number of posts that failed indexing",
24
-
})
25
-
26
-
var postsDeleted = promauto.NewCounter(prometheus.CounterOpts{
27
-
Name: "hepa_posts_deleted",
28
-
Help: "Number of posts deleted",
29
-
})
30
-
31
-
var profilesReceived = promauto.NewCounter(prometheus.CounterOpts{
32
-
Name: "hepa_profiles_received",
33
-
Help: "Number of profiles received",
34
-
})
35
-
36
-
var profilesIndexed = promauto.NewCounter(prometheus.CounterOpts{
37
-
Name: "hepa_profiles_indexed",
38
-
Help: "Number of profiles indexed",
39
-
})
40
-
41
-
var profilesFailed = promauto.NewCounter(prometheus.CounterOpts{
42
-
Name: "hepa_profiles_failed",
43
-
Help: "Number of profiles that failed indexing",
44
-
})
45
-
46
-
var profilesDeleted = promauto.NewCounter(prometheus.CounterOpts{
47
-
Name: "hepa_profiles_deleted",
48
-
Help: "Number of profiles deleted",
49
-
})
50
-
51
-
var currentSeq = promauto.NewGauge(prometheus.GaugeOpts{
52
-
Name: "hepa_current_seq",
53
-
Help: "Current sequence number",
54
-
})
+56
cmd/hepa/otel.go
+56
cmd/hepa/otel.go
···
1
+
package main
2
+
3
+
import (
4
+
"os"
5
+
"log/slog"
6
+
"log"
7
+
"context"
8
+
"time"
9
+
10
+
"go.opentelemetry.io/otel"
11
+
"go.opentelemetry.io/otel/attribute"
12
+
"go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracehttp"
13
+
"go.opentelemetry.io/otel/sdk/resource"
14
+
tracesdk "go.opentelemetry.io/otel/sdk/trace"
15
+
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
16
+
)
17
+
18
+
var tracer = otel.Tracer("hepa")
19
+
20
+
// Enable OTLP HTTP exporter
21
+
// For relevant environment variables:
22
+
// https://pkg.go.dev/go.opentelemetry.io/otel/exporters/otlp/otlptrace#readme-environment-variables
23
+
// At a minimum, you need to set
24
+
// OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
25
+
// TODO: this should be in cliutil or something
26
+
func configOTEL(serviceName string) {
27
+
if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" {
28
+
slog.Info("setting up trace exporter", "endpoint", ep)
29
+
ctx, cancel := context.WithCancel(context.Background())
30
+
defer cancel()
31
+
32
+
exp, err := otlptracehttp.New(ctx)
33
+
if err != nil {
34
+
log.Fatal("failed to create trace exporter", "error", err)
35
+
}
36
+
defer func() {
37
+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
38
+
defer cancel()
39
+
if err := exp.Shutdown(ctx); err != nil {
40
+
slog.Error("failed to shutdown trace exporter", "error", err)
41
+
}
42
+
}()
43
+
44
+
tp := tracesdk.NewTracerProvider(
45
+
tracesdk.WithBatcher(exp),
46
+
tracesdk.WithResource(resource.NewWithAttributes(
47
+
semconv.SchemaURL,
48
+
semconv.ServiceNameKey.String(serviceName),
49
+
attribute.String("env", os.Getenv("ENVIRONMENT")), // DataDog
50
+
attribute.String("environment", os.Getenv("ENVIRONMENT")), // Others
51
+
attribute.Int64("ID", 1),
52
+
)),
53
+
)
54
+
otel.SetTracerProvider(tp)
55
+
}
56
+
}
+13
-66
cmd/hepa/server.go
+13
-66
cmd/hepa/server.go
···
9
9
10
10
"github.com/bluesky-social/indigo/atproto/identity"
11
11
"github.com/bluesky-social/indigo/automod"
12
-
"github.com/bluesky-social/indigo/backfill"
13
-
"github.com/bluesky-social/indigo/xrpc"
14
12
15
13
"github.com/prometheus/client_golang/prometheus/promhttp"
16
-
gorm "gorm.io/gorm"
17
14
)
18
15
19
16
type Server struct {
20
-
db *gorm.DB
21
-
bgshost string
22
-
bgsxrpc *xrpc.Client
23
-
dir identity.Directory
24
-
logger *slog.Logger
25
-
engine *automod.Engine
26
-
skipBackfill bool
27
-
28
-
bfs *backfill.Gormstore
29
-
bf *backfill.Backfiller
30
-
}
31
-
32
-
type LastSeq struct {
33
-
ID uint `gorm:"primarykey"`
34
-
Seq int64
17
+
bgshost string
18
+
logger *slog.Logger
19
+
engine *automod.Engine
35
20
}
36
21
37
22
type Config struct {
38
-
BGSHost string
39
-
Logger *slog.Logger
40
-
BGSSyncRateLimit int
41
-
MaxEventConcurrency int
23
+
BGSHost string
24
+
Logger *slog.Logger
42
25
}
43
26
44
-
func NewServer(db *gorm.DB, dir identity.Directory, config Config) (*Server, error) {
27
+
func NewServer(dir identity.Directory, config Config) (*Server, error) {
45
28
logger := config.Logger
46
29
if logger == nil {
47
30
logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
···
49
32
}))
50
33
}
51
34
52
-
logger.Info("running database migrations")
53
-
db.AutoMigrate(&LastSeq{})
54
-
db.AutoMigrate(&backfill.GormDBJob{})
55
-
56
35
bgsws := config.BGSHost
57
36
if !strings.HasPrefix(bgsws, "ws") {
58
37
return nil, fmt.Errorf("specified bgs host must include 'ws://' or 'wss://'")
59
38
}
60
39
61
-
bgshttp := strings.Replace(bgsws, "ws", "http", 1)
62
-
bgsxrpc := &xrpc.Client{
63
-
Host: bgshttp,
64
-
}
65
-
66
40
engine := automod.Engine{
41
+
Logger: logger,
67
42
Directory: dir,
43
+
CountStore: automod.NewMemCountStore(),
44
+
// TODO: RulesMap (loaded/config from somewhere)
45
+
// TODO: AdminClient (XRPC with mod access)
68
46
}
69
47
70
48
s := &Server{
71
-
db: db,
72
-
bgshost: config.BGSHost, // NOTE: the original URL, not 'bgshttp'
73
-
bgsxrpc: bgsxrpc,
74
-
dir: dir,
75
-
logger: logger,
76
-
engine: &engine,
77
-
skipBackfill: true,
49
+
bgshost: config.BGSHost,
50
+
logger: logger,
51
+
engine: &engine,
78
52
}
79
-
80
-
bfstore := backfill.NewGormstore(db)
81
-
opts := backfill.DefaultBackfillOptions()
82
-
if config.BGSSyncRateLimit > 0 {
83
-
opts.SyncRequestsPerSecond = config.BGSSyncRateLimit
84
-
opts.ParallelBackfills = 2 * config.BGSSyncRateLimit
85
-
} else {
86
-
opts.SyncRequestsPerSecond = 8
87
-
}
88
-
opts.CheckoutPath = fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo", bgshttp)
89
-
if config.MaxEventConcurrency > 0 {
90
-
opts.ParallelRecordCreates = config.MaxEventConcurrency
91
-
} else {
92
-
opts.ParallelRecordCreates = 20
93
-
}
94
-
opts.NSIDFilter = "app.bsky."
95
-
bf := backfill.NewBackfiller(
96
-
"hepa",
97
-
bfstore,
98
-
s.handleCreateOrUpdate,
99
-
s.handleCreateOrUpdate,
100
-
s.handleDelete,
101
-
opts,
102
-
)
103
-
104
-
s.bfs = bfstore
105
-
s.bf = bf
106
53
107
54
return s, nil
108
55
}