+4
-20
cmd/monarch/census.go
+4
-20
cmd/monarch/census.go
···
14
)
15
16
type CensusService struct {
17
-
cursor *CursorService
18
-
backfill *backfill.Backfiller
19
-
20
seenHosts map[string]bool
21
-
seenLk sync.Mutex
22
-
23
-
storeLk sync.Mutex
24
}
25
26
type jobMaker interface {
···
70
71
for _, host := range res.Hosts {
72
// don't reprocess hosts already handled
73
-
cs.seenLk.Lock()
74
-
_, ok := cs.seenHosts[host.Hostname]
75
-
cs.seenLk.Unlock()
76
-
if ok {
77
slog.Info("already processed host, skipping", "host", host)
78
continue
79
}
···
114
return
115
}
116
117
-
cs.storeLk.Lock()
118
hcur, err := cs.cursor.GetHostCursor(host)
119
if err != nil {
120
slog.Error("error fetching host cursor", "err", err)
121
}
122
-
cs.storeLk.Unlock()
123
124
var added int
125
curs := hcur.Cursor
···
138
continue
139
}
140
141
-
cs.storeLk.Lock()
142
for _, repo := range res.Repos {
143
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
144
if err != nil {
···
147
added += 1
148
}
149
}
150
-
cs.storeLk.Unlock()
151
152
if res.Cursor != nil && *res.Cursor != "" {
153
curs = *res.Cursor
154
-
cs.storeLk.Lock()
155
if err := cs.cursor.SetHostCursor(host, curs); err != nil {
156
slog.Error("error updating cursor for host", "err", err)
157
}
158
-
cs.storeLk.Unlock()
159
} else {
160
break
161
}
162
}
163
164
slog.Info("finished listing repos", "host", host)
165
-
166
-
cs.seenLk.Lock()
167
-
defer cs.seenLk.Unlock()
168
-
169
cs.seenHosts[host] = true
170
}
171
···
14
)
15
16
type CensusService struct {
17
+
cursor *CursorService
18
+
backfill *backfill.Backfiller
19
seenHosts map[string]bool
20
}
21
22
type jobMaker interface {
···
66
67
for _, host := range res.Hosts {
68
// don't reprocess hosts already handled
69
+
seen := cs.seenHosts[host.Hostname]
70
+
if seen {
71
slog.Info("already processed host, skipping", "host", host)
72
continue
73
}
···
108
return
109
}
110
111
hcur, err := cs.cursor.GetHostCursor(host)
112
if err != nil {
113
slog.Error("error fetching host cursor", "err", err)
114
}
115
116
var added int
117
curs := hcur.Cursor
···
130
continue
131
}
132
133
for _, repo := range res.Repos {
134
_, err := jmstore.GetOrCreateJob(ctx, repo.Did, backfill.StateEnqueued)
135
if err != nil {
···
138
added += 1
139
}
140
}
141
142
if res.Cursor != nil && *res.Cursor != "" {
143
curs = *res.Cursor
144
if err := cs.cursor.SetHostCursor(host, curs); err != nil {
145
slog.Error("error updating cursor for host", "err", err)
146
}
147
} else {
148
break
149
}
150
}
151
152
slog.Info("finished listing repos", "host", host)
153
cs.seenHosts[host] = true
154
}
155
+1
-4
cmd/monarch/cursors.go
+1
-4
cmd/monarch/cursors.go
-9
cmd/monarch/firehose.go
-9
cmd/monarch/firehose.go
···
35
}
36
37
func (cs *CursorService) GetFirehoseCursor() (int64, error) {
38
-
cs.firehoseLk.Lock()
39
-
defer cs.firehoseLk.Unlock()
40
-
41
var fcur firehoseCursor
42
if err := cs.store.Where(firehoseCursor{
43
Key: "firehose",
···
51
}
52
53
func (cs *CursorService) SetFirehoseCursor(seq int64) {
54
-
cs.firehoseLk.Lock()
55
-
defer cs.firehoseLk.Unlock()
56
-
57
cs.firehoseSeq = seq
58
}
59
60
func (cs *CursorService) PersistFirehoseCursor() error {
61
-
cs.firehoseLk.Lock()
62
-
defer cs.firehoseLk.Unlock()
63
-
64
if err := cs.store.Model(&firehoseCursor{}).Where(firehoseCursor{Key: "firehose"}).Update("val", cs.firehoseSeq).Error; err != nil {
65
return fmt.Errorf("error persisting firehose seq: %w", err)
66
}
···
35
}
36
37
func (cs *CursorService) GetFirehoseCursor() (int64, error) {
38
var fcur firehoseCursor
39
if err := cs.store.Where(firehoseCursor{
40
Key: "firehose",
···
48
}
49
50
func (cs *CursorService) SetFirehoseCursor(seq int64) {
51
cs.firehoseSeq = seq
52
}
53
54
func (cs *CursorService) PersistFirehoseCursor() error {
55
if err := cs.store.Model(&firehoseCursor{}).Where(firehoseCursor{Key: "firehose"}).Update("val", cs.firehoseSeq).Error; err != nil {
56
return fmt.Errorf("error persisting firehose seq: %w", err)
57
}