+134
-11
cmd/monarch/census.go
+134
-11
cmd/monarch/census.go
···
3
3
import (
4
4
"context"
5
5
"log/slog"
6
+
"sync"
7
+
"time"
6
8
7
9
"github.com/bluesky-social/indigo/api/atproto"
8
10
"github.com/bluesky-social/indigo/backfill"
9
11
"github.com/bluesky-social/indigo/xrpc"
10
12
"github.com/urfave/cli/v2"
13
+
"golang.org/x/sync/semaphore"
14
+
"gorm.io/gorm"
11
15
)
12
16
13
17
type CensusService struct {
14
-
cursor *CursorService
18
+
store *gorm.DB
15
19
backfill *backfill.Backfiller
20
+
21
+
storeLk sync.Mutex
16
22
}
17
23
18
24
type jobMaker interface {
19
25
GetOrCreateJob(context.Context, string, string) (backfill.Job, error)
20
26
}
21
27
22
-
func NewCensusService(cursorSvc *CursorService, backfillSvc *backfill.Backfiller) *CensusService {
28
+
func NewCensusService(store *gorm.DB, backfillSvc *backfill.Backfiller) *CensusService {
23
29
return &CensusService{
24
-
cursor: cursorSvc,
30
+
store: store,
25
31
backfill: backfillSvc,
26
32
}
27
33
}
28
34
29
-
func (cs *CensusService) Start(ctx context.Context, cctx *cli.Context) {
35
+
type relayCursor struct {
36
+
ID int `gorm:"primaryKey"`
37
+
Host string
38
+
Cursor string
39
+
}
40
+
41
+
type hostCursor struct {
42
+
ID int `gorm:"primaryKey"`
43
+
Host string
44
+
Cursor string
45
+
}
46
+
47
+
// fetch the PDSes known to the relay
48
+
func (cs *CensusService) listHosts(ctx context.Context, cctx *cli.Context) {
49
+
relay := cctx.String("relay-host")
50
+
maxCrawlers := cctx.Int64("max-repo-crawlers")
51
+
52
+
xrpcc := &xrpc.Client{
53
+
Host: "https://" + relay,
54
+
}
55
+
56
+
var rcur relayCursor
57
+
if err := cs.store.Where("host = ?", relay).Attrs(relayCursor{
58
+
Host: relay,
59
+
Cursor: "",
60
+
}).FirstOrCreate(&rcur).Error; err != nil {
61
+
slog.Error("error fetching relay cursor", "err", err)
62
+
}
63
+
64
+
var wg sync.WaitGroup
65
+
sem := semaphore.NewWeighted(maxCrawlers)
66
+
67
+
curs := rcur.Cursor
68
+
for {
69
+
select {
70
+
case <-ctx.Done():
71
+
slog.Info("stopping listHosts", "err", ctx.Err())
72
+
return
73
+
default:
74
+
}
75
+
76
+
slog.Info("listing hosts", "relay", relay, "curs", curs)
77
+
res, err := atproto.SyncListHosts(ctx, xrpcc, curs, 1000)
78
+
if err != nil {
79
+
slog.Error("error obtaining hosts from relay", "err", err)
80
+
continue
81
+
}
82
+
83
+
for _, host := range res.Hosts {
84
+
sem.Acquire(ctx, 1)
85
+
wg.Add(1) // TODO wg.Go
86
+
go func() {
87
+
defer sem.Release(1)
88
+
defer wg.Done()
89
+
slog.Info("adding host", "host", host.Hostname)
90
+
cs.listRepos(ctx, host.Hostname)
91
+
}()
92
+
}
93
+
94
+
if res.Cursor != nil && *res.Cursor != "" {
95
+
curs = *res.Cursor
96
+
if err := cs.store.Model(&rcur).Update("cursor", curs).Error; err != nil {
97
+
slog.Error("error updating cursor for relay", "err", err)
98
+
}
99
+
} else {
100
+
break
101
+
}
102
+
}
103
+
104
+
wg.Wait()
105
+
slog.Info("finished listing hosts", "relay", relay)
106
+
}
107
+
108
+
// fetch the repos known to the PDS
109
+
func (cs *CensusService) listRepos(ctx context.Context, host string) {
30
110
xrpcc := &xrpc.Client{
31
-
Host: "https://" + cctx.String("relay-host"),
111
+
Host: "https://" + host,
32
112
}
33
113
34
114
jmstore, ok := cs.backfill.Store.(jobMaker)
···
37
117
return
38
118
}
39
119
40
-
curs, _ := cs.cursor.Get("repos")
120
+
cs.storeLk.Lock()
121
+
var hcur hostCursor
122
+
if err := cs.store.Where("host = ?", host).Attrs(hostCursor{
123
+
Host: host,
124
+
Cursor: "",
125
+
}).FirstOrCreate(&hcur).Error; err != nil {
126
+
slog.Error("error fetching host cursor", "err", err)
127
+
}
128
+
cs.storeLk.Unlock()
129
+
130
+
var added int
131
+
curs := hcur.Cursor
41
132
for {
42
133
select {
43
134
case <-ctx.Done():
44
-
slog.Info("stopping repo census")
135
+
slog.Info("stopping listRepos", "err", ctx.Err())
45
136
return
46
137
default:
47
138
}
48
139
140
+
slog.Info("listing repos", "host", host, "curs", curs, "added", added)
49
141
res, err := atproto.SyncListRepos(ctx, xrpcc, curs, 1000)
50
142
if err != nil {
51
-
slog.Error("error listing repos", "err", err)
52
-
return
143
+
slog.Error("error obtaining repos from host", "err", err)
144
+
continue
53
145
}
54
146
147
+
cs.storeLk.Lock()
55
148
for _, repo := range res.Repos {
56
149
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
57
150
if err != nil {
58
-
slog.Error("error adding listed repo to backfiller", "err", err)
151
+
slog.Error("error adding repo to backfiller", "err", err)
152
+
} else {
153
+
added += 1
59
154
}
60
155
}
156
+
cs.storeLk.Unlock()
61
157
62
158
if res.Cursor != nil && *res.Cursor != "" {
63
159
curs = *res.Cursor
64
-
cs.cursor.SetReposCursor(curs)
160
+
cs.storeLk.Lock()
161
+
if err := cs.store.Model(&hcur).Update("cursor", curs).Error; err != nil {
162
+
slog.Error("error updating cursor for host", "err", err)
163
+
}
164
+
cs.storeLk.Unlock()
65
165
} else {
66
166
break
67
167
}
168
+
}
169
+
170
+
slog.Info("finished listing repos", "host", host)
171
+
}
172
+
173
+
func (cs *CensusService) Start(ctx context.Context, cctx *cli.Context) {
174
+
slog.Info("starting initial hosts and repos crawl")
175
+
cs.listHosts(ctx, cctx)
176
+
177
+
slog.Info("finished with initial refresh, starting ticker")
178
+
t := time.NewTicker(time.Hour)
179
+
defer t.Stop()
180
+
181
+
for {
182
+
select {
183
+
case <-ctx.Done():
184
+
slog.Info("stopping census service", "err", ctx.Err())
185
+
return
186
+
case <-t.C:
187
+
}
188
+
189
+
slog.Info("refreshing hosts and repos")
190
+
cs.listHosts(ctx, cctx)
68
191
}
69
192
}
+2
cmd/monarch/cursors.go
+2
cmd/monarch/cursors.go
+4
-2
cmd/monarch/handlers.go
+4
-2
cmd/monarch/handlers.go
···
163
163
}
164
164
165
165
func (hs *HandlerService) HandleCreate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
166
-
return hs.HandleUpsert(ctx, repo, rev, path, rec, cid, ActionCreate)
166
+
// return hs.HandleUpsert(ctx, repo, rev, path, rec, cid, ActionCreate)
167
+
return nil
167
168
}
168
169
169
170
func (hs *HandlerService) HandleUpdate(ctx context.Context, repo string, rev string, path string, rec *[]byte, cid *cid.Cid) error {
170
-
return hs.HandleUpsert(ctx, repo, rev, path, rec, cid, ActionUpdate)
171
+
// return hs.HandleUpsert(ctx, repo, rev, path, rec, cid, ActionUpdate)
172
+
return nil
171
173
}
172
174
173
175
func (hs *HandlerService) HandleDelete(ctx context.Context, repo string, rev string, path string) error {
+5
-1
cmd/monarch/main.go
+5
-1
cmd/monarch/main.go
···
49
49
app.backfill = NewBackfillService(backfill.NewGormstore(app.state), app.handler, cctx)
50
50
go app.backfill.Start()
51
51
52
-
app.census = NewCensusService(app.cursor, app.backfill)
52
+
app.census = NewCensusService(app.state, app.backfill)
53
53
go app.census.Start(ctx, cctx)
54
54
55
55
wsconn, err := NewFirehoseConnection(ctx, cctx, app.cursor)
···
132
132
&cli.IntFlag{
133
133
Name: "backfill-consumers",
134
134
Value: 100,
135
+
},
136
+
&cli.IntFlag{
137
+
Name: "max-repo-crawlers",
138
+
Value: 4,
135
139
},
136
140
}
137
141
+1
-1
go.mod
+1
-1
go.mod
···
9
9
github.com/gorilla/websocket v1.5.1
10
10
github.com/ipfs/go-cid v0.4.1
11
11
github.com/urfave/cli/v2 v2.25.7
12
+
golang.org/x/sync v0.7.0
12
13
gorm.io/gorm v1.25.9
13
14
)
14
15
···
102
103
go.uber.org/zap v1.26.0 // indirect
103
104
golang.org/x/crypto v0.21.0 // indirect
104
105
golang.org/x/net v0.23.0 // indirect
105
-
golang.org/x/sync v0.7.0 // indirect
106
106
golang.org/x/sys v0.22.0 // indirect
107
107
golang.org/x/text v0.14.0 // indirect
108
108
golang.org/x/time v0.3.0 // indirect