+1
-1
automod/consumer/firehose.go
+1
-1
automod/consumer/firehose.go
···
116
116
fc.Logger.Info("hepa scheduler configured", "scheduler", "autoscaling", "initial", scaleSettings.Concurrency, "max", scaleSettings.MaxConcurrency)
117
117
}
118
118
119
-
return events.HandleRepoStream(ctx, con, scheduler)
119
+
return events.HandleRepoStream(ctx, con, scheduler, fc.Logger)
120
120
}
121
121
122
122
// NOTE: for now, this function basically never errors, just logs and returns nil. Should think through error processing better.
+1
-1
bgs/admin.go
+1
-1
bgs/admin.go
+60
-56
bgs/bgs.go
+60
-56
bgs/bgs.go
···
6
6
"encoding/json"
7
7
"errors"
8
8
"fmt"
9
+
"log/slog"
9
10
"net"
10
11
"net/http"
11
12
_ "net/http/pprof"
12
13
"net/url"
14
+
"reflect"
13
15
"strconv"
14
16
"strings"
15
17
"sync"
···
34
36
"github.com/gorilla/websocket"
35
37
"github.com/ipfs/go-cid"
36
38
ipld "github.com/ipfs/go-ipld-format"
37
-
logging "github.com/ipfs/go-log"
38
39
"github.com/labstack/echo/v4"
39
40
"github.com/labstack/echo/v4/middleware"
40
41
promclient "github.com/prometheus/client_golang/prometheus"
···
45
46
"gorm.io/gorm"
46
47
)
47
48
48
-
var log = logging.Logger("bgs")
49
49
var tracer = otel.Tracer("bgs")
50
50
51
51
// serverListenerBootTimeout is how long to wait for the requested server socket
···
95
95
// nextCrawlers gets forwarded POST /xrpc/com.atproto.sync.requestCrawl
96
96
nextCrawlers []*url.URL
97
97
httpClient http.Client
98
+
99
+
log *slog.Logger
98
100
}
99
101
100
102
type PDSResync struct {
···
166
168
pdsResyncs: make(map[uint]*PDSResync),
167
169
168
170
userCache: uc,
171
+
172
+
log: slog.Default().With("system", "bgs"),
169
173
}
170
174
171
175
ix.CreateExternalUser = bgs.createExternalUser
···
244
248
act, err := bgs.Index.GetUserOrMissing(ctx, did)
245
249
if err != nil {
246
250
w.WriteHeader(500)
247
-
log.Errorf("failed to get user: %s", err)
251
+
bgs.log.Error("failed to get user", "err", err)
248
252
return
249
253
}
250
254
251
255
if err := bgs.Index.Crawler.Crawl(ctx, act); err != nil {
252
256
w.WriteHeader(500)
253
-
log.Errorf("failed to add user to crawler: %s", err)
257
+
bgs.log.Error("failed to add user to crawler", "err", err)
254
258
return
255
259
}
256
260
})
···
335
339
if err2 := ctx.JSON(err.Code, map[string]any{
336
340
"error": err.Message,
337
341
}); err2 != nil {
338
-
log.Errorf("Failed to write http error: %s", err2)
342
+
bgs.log.Error("Failed to write http error", "err", err2)
339
343
}
340
344
default:
341
345
sendHeader := true
···
343
347
sendHeader = false
344
348
}
345
349
346
-
log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err)
350
+
bgs.log.Warn("HANDLER ERROR: (%s) %s", ctx.Path(), err)
347
351
348
352
if strings.HasPrefix(ctx.Path(), "/admin/") {
349
353
ctx.JSON(500, map[string]any{
···
436
440
437
441
func (bgs *BGS) HandleHealthCheck(c echo.Context) error {
438
442
if err := bgs.db.Exec("SELECT 1").Error; err != nil {
439
-
log.Errorf("healthcheck can't connect to database: %v", err)
443
+
bgs.log.Error("healthcheck can't connect to database", "err", err)
440
444
return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"})
441
445
} else {
442
446
return c.JSON(200, HealthStatus{Status: "ok"})
···
603
607
604
608
var m = &dto.Metric{}
605
609
if err := c.EventsSent.Write(m); err != nil {
606
-
log.Errorf("failed to get sent counter: %s", err)
610
+
bgs.log.Error("failed to get sent counter", "err", err)
607
611
}
608
612
609
-
log.Infow("consumer disconnected",
613
+
bgs.log.Info("consumer disconnected",
610
614
"consumer_id", id,
611
615
"remote_addr", c.RemoteAddr,
612
616
"user_agent", c.UserAgent,
···
658
662
}
659
663
660
664
if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil {
661
-
log.Warnf("failed to ping client: %s", err)
665
+
bgs.log.Warn("failed to ping client: %s", err)
662
666
cancel()
663
667
return
664
668
}
···
683
687
for {
684
688
_, _, err := conn.ReadMessage()
685
689
if err != nil {
686
-
log.Warnf("failed to read message from client: %s", err)
690
+
bgs.log.Warn("failed to read message from client: %s", err)
687
691
cancel()
688
692
return
689
693
}
···
710
714
consumerID := bgs.registerConsumer(&consumer)
711
715
defer bgs.cleanupConsumer(consumerID)
712
716
713
-
logger := log.With(
717
+
logger := bgs.log.With(
714
718
"consumer_id", consumerID,
715
719
"remote_addr", consumer.RemoteAddr,
716
720
"user_agent", consumer.UserAgent,
717
721
)
718
722
719
-
logger.Infow("new consumer", "cursor", since)
723
+
logger.Info("new consumer", "cursor", since)
720
724
721
725
for {
722
726
select {
···
728
732
729
733
wc, err := conn.NextWriter(websocket.BinaryMessage)
730
734
if err != nil {
731
-
logger.Errorf("failed to get next writer: %s", err)
735
+
logger.Error("failed to get next writer", "err", err)
732
736
return err
733
737
}
734
738
···
742
746
}
743
747
744
748
if err := wc.Close(); err != nil {
745
-
logger.Warnf("failed to flush-close our event write: %s", err)
749
+
logger.Warn("failed to flush-close our event write", "err", err)
746
750
return nil
747
751
}
748
752
···
763
767
// defensive in case things change under the hood.
764
768
registry, ok := promclient.DefaultRegisterer.(*promclient.Registry)
765
769
if !ok {
766
-
log.Warnf("failed to export default prometheus registry; some metrics will be unavailable; unexpected type: %T", promclient.DefaultRegisterer)
770
+
slog.Warn("failed to export default prometheus registry; some metrics will be unavailable; unexpected type", "type", reflect.TypeOf(promclient.DefaultRegisterer))
767
771
}
768
772
exporter, err := prometheus.NewExporter(prometheus.Options{
769
773
Registry: registry,
770
774
Namespace: "bigsky",
771
775
})
772
776
if err != nil {
773
-
log.Errorf("could not create the prometheus stats exporter: %v", err)
777
+
slog.Error("could not create the prometheus stats exporter", "err", err, "system", "bgs")
774
778
}
775
779
776
780
return exporter
···
885
889
case env.RepoCommit != nil:
886
890
repoCommitsReceivedCounter.WithLabelValues(host.Host).Add(1)
887
891
evt := env.RepoCommit
888
-
log.Debugw("bgs got repo append event", "seq", evt.Seq, "pdsHost", host.Host, "repo", evt.Repo)
892
+
bgs.log.Debug("bgs got repo append event", "seq", evt.Seq, "pdsHost", host.Host, "repo", evt.Repo)
889
893
890
894
s := time.Now()
891
895
u, err := bgs.lookupUserByDid(ctx, evt.Repo)
···
915
919
916
920
if u.GetTakenDown() || ustatus == events.AccountStatusTakendown {
917
921
span.SetAttributes(attribute.Bool("taken_down_by_relay_admin", u.GetTakenDown()))
918
-
log.Debugw("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
922
+
bgs.log.Debug("dropping commit event from taken down user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
919
923
repoCommitsResultCounter.WithLabelValues(host.Host, "tdu").Inc()
920
924
return nil
921
925
}
922
926
923
927
if ustatus == events.AccountStatusSuspended {
924
-
log.Debugw("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
928
+
bgs.log.Debug("dropping commit event from suspended user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
925
929
repoCommitsResultCounter.WithLabelValues(host.Host, "susu").Inc()
926
930
return nil
927
931
}
928
932
929
933
if ustatus == events.AccountStatusDeactivated {
930
-
log.Debugw("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
934
+
bgs.log.Debug("dropping commit event from deactivated user", "did", evt.Repo, "seq", evt.Seq, "pdsHost", host.Host)
931
935
repoCommitsResultCounter.WithLabelValues(host.Host, "du").Inc()
932
936
return nil
933
937
}
···
938
942
}
939
943
940
944
if host.ID != u.PDS && u.PDS != 0 {
941
-
log.Warnw("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host)
945
+
bgs.log.Warn("received event for repo from different pds than expected", "repo", evt.Repo, "expPds", u.PDS, "gotPds", host.Host)
942
946
// Flush any cached DID documents for this user
943
947
bgs.didr.FlushCacheFor(env.RepoCommit.Repo)
944
948
···
1000
1004
if errors.Is(err, carstore.ErrRepoBaseMismatch) || ipld.IsNotFound(err) {
1001
1005
ai, lerr := bgs.Index.LookupUser(ctx, u.ID)
1002
1006
if lerr != nil {
1003
-
log.Warnw("failed handling event, no user", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
1007
+
log.Warn("failed handling event, no user", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
1004
1008
repoCommitsResultCounter.WithLabelValues(host.Host, "nou4").Inc()
1005
1009
return fmt.Errorf("failed to look up user %s (%d) (err case: %s): %w", u.Did, u.ID, err, lerr)
1006
1010
}
1007
1011
1008
1012
span.SetAttributes(attribute.Bool("catchup_queue", true))
1009
1013
1010
-
log.Infow("failed handling event, catchup", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
1014
+
log.Info("failed handling event, catchup", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
1011
1015
repoCommitsResultCounter.WithLabelValues(host.Host, "catchup2").Inc()
1012
1016
return bgs.Index.Crawler.AddToCatchupQueue(ctx, host, ai, evt)
1013
1017
}
1014
1018
1015
-
log.Warnw("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
1019
+
log.Warn("failed handling event", "err", err, "pdsHost", host.Host, "seq", evt.Seq, "repo", u.Did, "prev", stringLink(evt.Prev), "commit", evt.Commit.String())
1016
1020
repoCommitsResultCounter.WithLabelValues(host.Host, "err").Inc()
1017
1021
return fmt.Errorf("handle user event failed: %w", err)
1018
1022
}
···
1020
1024
repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc()
1021
1025
return nil
1022
1026
case env.RepoHandle != nil:
1023
-
log.Infow("bgs got repo handle event", "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle)
1027
+
bgs.log.Info("bgs got repo handle event", "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle)
1024
1028
// Flush any cached DID documents for this user
1025
1029
bgs.didr.FlushCacheFor(env.RepoHandle.Did)
1026
1030
···
1031
1035
}
1032
1036
1033
1037
if act.Handle.String != env.RepoHandle.Handle {
1034
-
log.Warnw("handle update did not update handle to asserted value", "did", env.RepoHandle.Did, "expected", env.RepoHandle.Handle, "actual", act.Handle)
1038
+
bgs.log.Warn("handle update did not update handle to asserted value", "did", env.RepoHandle.Did, "expected", env.RepoHandle.Handle, "actual", act.Handle)
1035
1039
}
1036
1040
1037
1041
// TODO: Update the ReposHandle event type to include "verified" or something
···
1045
1049
},
1046
1050
})
1047
1051
if err != nil {
1048
-
log.Errorw("failed to broadcast RepoHandle event", "error", err, "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle)
1052
+
bgs.log.Error("failed to broadcast RepoHandle event", "error", err, "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle)
1049
1053
return fmt.Errorf("failed to broadcast RepoHandle event: %w", err)
1050
1054
}
1051
1055
1052
1056
return nil
1053
1057
case env.RepoIdentity != nil:
1054
-
log.Infow("bgs got identity event", "did", env.RepoIdentity.Did)
1058
+
bgs.log.Info("bgs got identity event", "did", env.RepoIdentity.Did)
1055
1059
// Flush any cached DID documents for this user
1056
1060
bgs.didr.FlushCacheFor(env.RepoIdentity.Did)
1057
1061
···
1071
1075
},
1072
1076
})
1073
1077
if err != nil {
1074
-
log.Errorw("failed to broadcast Identity event", "error", err, "did", env.RepoIdentity.Did)
1078
+
bgs.log.Error("failed to broadcast Identity event", "error", err, "did", env.RepoIdentity.Did)
1075
1079
return fmt.Errorf("failed to broadcast Identity event: %w", err)
1076
1080
}
1077
1081
···
1087
1091
span.SetAttributes(attribute.String("repo_status", *env.RepoAccount.Status))
1088
1092
}
1089
1093
1090
-
log.Infow("bgs got account event", "did", env.RepoAccount.Did)
1094
+
bgs.log.Info("bgs got account event", "did", env.RepoAccount.Did)
1091
1095
// Flush any cached DID documents for this user
1092
1096
bgs.didr.FlushCacheFor(env.RepoAccount.Did)
1093
1097
···
1101
1105
// Check if the PDS is still authoritative
1102
1106
// if not we don't want to be propagating this account event
1103
1107
if ai.PDS != host.ID {
1104
-
log.Errorw("account event from non-authoritative pds",
1108
+
bgs.log.Error("account event from non-authoritative pds",
1105
1109
"seq", env.RepoAccount.Seq,
1106
1110
"did", env.RepoAccount.Did,
1107
1111
"event_from", host.Host,
···
1146
1150
},
1147
1151
})
1148
1152
if err != nil {
1149
-
log.Errorw("failed to broadcast Account event", "error", err, "did", env.RepoAccount.Did)
1153
+
bgs.log.Error("failed to broadcast Account event", "error", err, "did", env.RepoAccount.Did)
1150
1154
return fmt.Errorf("failed to broadcast Account event: %w", err)
1151
1155
}
1152
1156
···
1194
1198
// delete data from carstore
1195
1199
if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
1196
1200
// don't let a failure here prevent us from propagating this event
1197
-
log.Errorf("failed to delete user data from carstore: %s", err)
1201
+
bgs.log.Error("failed to delete user data from carstore", "err", err)
1198
1202
}
1199
1203
1200
1204
return bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{
···
1209
1213
1210
1214
externalUserCreationAttempts.Inc()
1211
1215
1212
-
log.Debugf("create external user: %s", did)
1216
+
s.log.Debug("create external user", "did", did)
1213
1217
doc, err := s.didr.GetDocument(ctx, did)
1214
1218
if err != nil {
1215
1219
return nil, fmt.Errorf("could not locate DID document for followed user (%s): %w", did, err)
···
1232
1236
// TODO: the PDS's DID should also be in the service, we could use that to look up?
1233
1237
var peering models.PDS
1234
1238
if err := s.db.Find(&peering, "host = ?", durl.Host).Error; err != nil {
1235
-
log.Error("failed to find pds", durl.Host)
1239
+
s.log.Error("failed to find pds", "host", durl.Host)
1236
1240
return nil, err
1237
1241
}
1238
1242
···
1305
1309
defer func() {
1306
1310
if !successfullyCreated {
1307
1311
if err := s.db.Model(&models.PDS{}).Where("id = ?", peering.ID).Update("repo_count", gorm.Expr("repo_count - 1")).Error; err != nil {
1308
-
log.Errorf("failed to decrement repo count for pds: %s", err)
1312
+
s.log.Error("failed to decrement repo count for pds", "err", err)
1309
1313
}
1310
1314
}
1311
1315
}()
···
1319
1323
return nil, err
1320
1324
}
1321
1325
1322
-
log.Debugw("creating external user", "did", did, "handle", hurl.Host, "pds", peering.ID)
1326
+
s.log.Debug("creating external user", "did", did, "handle", hurl.Host, "pds", peering.ID)
1323
1327
1324
1328
handle := hurl.Host
1325
1329
···
1327
1331
1328
1332
resdid, err := s.hr.ResolveHandleToDid(ctx, handle)
1329
1333
if err != nil {
1330
-
log.Errorf("failed to resolve users claimed handle (%q) on pds: %s", handle, err)
1334
+
s.log.Error("failed to resolve users claimed handle on pds", "handle", handle, "err", err)
1331
1335
validHandle = false
1332
1336
}
1333
1337
1334
1338
if resdid != did {
1335
-
log.Errorf("claimed handle did not match servers response (%s != %s)", resdid, did)
1339
+
s.log.Error("claimed handle did not match servers response", "resdid", resdid, "did", did)
1336
1340
validHandle = false
1337
1341
}
1338
1342
···
1341
1345
1342
1346
exu, err := s.Index.LookupUserByDid(ctx, did)
1343
1347
if err == nil {
1344
-
log.Debugw("lost the race to create a new user", "did", did, "handle", handle, "existing_hand", exu.Handle)
1348
+
s.log.Debug("lost the race to create a new user", "did", did, "handle", handle, "existing_hand", exu.Handle)
1345
1349
if exu.PDS != peering.ID {
1346
1350
// User is now on a different PDS, update
1347
1351
if err := s.db.Model(User{}).Where("id = ?", exu.Uid).Update("pds", peering.ID).Error; err != nil {
···
1500
1504
// delete data from carstore
1501
1505
if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
1502
1506
// don't let a failure here prevent us from propagating this event
1503
-
log.Errorf("failed to delete user data from carstore: %s", err)
1507
+
bgs.log.Error("failed to delete user data from carstore", "err", err)
1504
1508
}
1505
1509
}
1506
1510
···
1607
1611
func (bgs *BGS) ResyncPDS(ctx context.Context, pds models.PDS) error {
1608
1612
ctx, span := tracer.Start(ctx, "ResyncPDS")
1609
1613
defer span.End()
1610
-
log := log.With("pds", pds.Host, "source", "resync_pds")
1614
+
log := bgs.log.With("pds", pds.Host, "source", "resync_pds")
1611
1615
resync, found := bgs.LoadOrStoreResync(pds)
1612
1616
if found {
1613
1617
return fmt.Errorf("resync already in progress")
···
1639
1643
for {
1640
1644
pages++
1641
1645
if pages%10 == 0 {
1642
-
log.Warnw("fetching PDS page during resync", "pages", pages, "total_repos", len(repos))
1646
+
log.Warn("fetching PDS page during resync", "pages", pages, "total_repos", len(repos))
1643
1647
resync.NumRepoPages = pages
1644
1648
resync.NumRepos = len(repos)
1645
1649
bgs.UpdateResync(resync)
1646
1650
}
1647
1651
if err := limiter.Wait(ctx); err != nil {
1648
-
log.Errorw("failed to wait for rate limiter", "error", err)
1652
+
log.Error("failed to wait for rate limiter", "error", err)
1649
1653
return fmt.Errorf("failed to wait for rate limiter: %w", err)
1650
1654
}
1651
1655
repoList, err := comatproto.SyncListRepos(ctx, &xrpcc, cursor, limit)
1652
1656
if err != nil {
1653
-
log.Errorw("failed to list repos", "error", err)
1657
+
log.Error("failed to list repos", "error", err)
1654
1658
return fmt.Errorf("failed to list repos: %w", err)
1655
1659
}
1656
1660
···
1672
1676
1673
1677
repolistDone := time.Now()
1674
1678
1675
-
log.Warnw("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start))
1679
+
log.Warn("listed all repos, checking roots", "num_repos", len(repos), "took", repolistDone.Sub(start))
1676
1680
resync = bgs.SetResyncStatus(pds.ID, "checking revs")
1677
1681
1678
1682
// run loop over repos with some concurrency
···
1681
1685
// Check repo revs against our local copy and enqueue crawls for any that are out of date
1682
1686
for i, r := range repos {
1683
1687
if err := sem.Acquire(ctx, 1); err != nil {
1684
-
log.Errorw("failed to acquire semaphore", "error", err)
1688
+
log.Error("failed to acquire semaphore", "error", err)
1685
1689
continue
1686
1690
}
1687
1691
go func(r comatproto.SyncListRepos_Repo) {
1688
1692
defer sem.Release(1)
1689
-
log := log.With("did", r.Did, "remote_rev", r.Rev)
1693
+
log := bgs.log.With("did", r.Did, "remote_rev", r.Rev)
1690
1694
// Fetches the user if we have it, otherwise automatically enqueues it for crawling
1691
1695
ai, err := bgs.Index.GetUserOrMissing(ctx, r.Did)
1692
1696
if err != nil {
1693
-
log.Errorw("failed to get user while resyncing PDS, we can't recrawl it", "error", err)
1697
+
log.Error("failed to get user while resyncing PDS, we can't recrawl it", "error", err)
1694
1698
return
1695
1699
}
1696
1700
1697
1701
rev, err := bgs.repoman.GetRepoRev(ctx, ai.Uid)
1698
1702
if err != nil {
1699
-
log.Warnw("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid)
1703
+
log.Warn("recrawling because we failed to get the local repo root", "err", err, "uid", ai.Uid)
1700
1704
err := bgs.Index.Crawler.Crawl(ctx, ai)
1701
1705
if err != nil {
1702
-
log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did)
1706
+
log.Error("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did)
1703
1707
}
1704
1708
return
1705
1709
}
1706
1710
1707
1711
if rev == "" || rev < r.Rev {
1708
-
log.Warnw("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev)
1712
+
log.Warn("recrawling because the repo rev from the PDS is newer than our local repo rev", "local_rev", rev)
1709
1713
err := bgs.Index.Crawler.Crawl(ctx, ai)
1710
1714
if err != nil {
1711
-
log.Errorw("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did)
1715
+
log.Error("failed to enqueue crawl for repo during resync", "error", err, "uid", ai.Uid, "did", ai.Did)
1712
1716
}
1713
1717
return
1714
1718
}
1715
1719
}(r)
1716
1720
if i%100 == 0 {
1717
1721
if i%10_000 == 0 {
1718
-
log.Warnw("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", -1, "took", time.Now().Sub(resync.StatusChangedAt))
1722
+
log.Warn("checked revs during resync", "num_repos_checked", i, "num_repos_to_crawl", -1, "took", time.Now().Sub(resync.StatusChangedAt))
1719
1723
}
1720
1724
resync.NumReposChecked = i
1721
1725
bgs.UpdateResync(resync)
···
1725
1729
resync.NumReposChecked = len(repos)
1726
1730
bgs.UpdateResync(resync)
1727
1731
1728
-
log.Warnw("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", -1)
1732
+
bgs.log.Warn("enqueued all crawls, exiting resync", "took", time.Now().Sub(start), "num_repos_to_crawl", -1)
1729
1733
1730
1734
return nil
1731
1735
}
+6
-6
bgs/compactor.go
+6
-6
bgs/compactor.go
···
210
210
}
211
211
if c.requeueInterval > 0 {
212
212
go func() {
213
-
log.Infow("starting compactor requeue routine",
213
+
log.Info("starting compactor requeue routine",
214
214
"interval", c.requeueInterval,
215
215
"limit", c.requeueLimit,
216
216
"shardCount", c.requeueShardCount,
···
226
226
ctx := context.Background()
227
227
ctx, span := otel.Tracer("compactor").Start(ctx, "RequeueRoutine")
228
228
if err := c.EnqueueAllRepos(ctx, bgs, c.requeueLimit, c.requeueShardCount, c.requeueFast); err != nil {
229
-
log.Errorw("failed to enqueue all repos", "err", err)
229
+
log.Error("failed to enqueue all repos", "err", err)
230
230
}
231
231
span.End()
232
232
}
···
262
262
time.Sleep(time.Second * 5)
263
263
continue
264
264
}
265
-
log.Errorw("failed to compact repo",
265
+
log.Error("failed to compact repo",
266
266
"err", err,
267
267
"uid", state.latestUID,
268
268
"repo", state.latestDID,
···
273
273
// Pause for a bit to avoid spamming failed compactions
274
274
time.Sleep(time.Millisecond * 100)
275
275
} else {
276
-
log.Infow("compacted repo",
276
+
log.Info("compacted repo",
277
277
"uid", state.latestUID,
278
278
"repo", state.latestDID,
279
279
"status", state.status,
···
352
352
func (c *Compactor) EnqueueRepo(ctx context.Context, user *User, fast bool) {
353
353
ctx, span := otel.Tracer("compactor").Start(ctx, "EnqueueRepo")
354
354
defer span.End()
355
-
log.Infow("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast)
355
+
log.Info("enqueueing compaction for repo", "repo", user.Did, "uid", user.ID, "fast", fast)
356
356
c.q.Append(user.ID, fast)
357
357
}
358
358
···
396
396
c.q.Append(r.Usr, fast)
397
397
}
398
398
399
-
log.Infow("done enqueueing all repos", "repos_enqueued", len(repos))
399
+
log.Info("done enqueueing all repos", "repos_enqueued", len(repos))
400
400
401
401
return nil
402
402
}
+26
-23
bgs/fedmgr.go
+26
-23
bgs/fedmgr.go
···
4
4
"context"
5
5
"errors"
6
6
"fmt"
7
+
"log/slog"
7
8
"math/rand"
8
9
"strings"
9
10
"sync"
···
21
22
pq "github.com/lib/pq"
22
23
"gorm.io/gorm"
23
24
)
25
+
26
+
var log = slog.Default().With("system", "bgs")
24
27
25
28
type IndexCallback func(context.Context, *models.PDS, *events.XRPCStreamEvent) error
26
29
···
129
132
var errs []error
130
133
if errs = s.flushCursors(ctx); len(errs) > 0 {
131
134
for _, err := range errs {
132
-
log.Errorf("failed to flush cursors on shutdown: %s", err)
135
+
log.Error("failed to flush cursors on shutdown", "err", err)
133
136
}
134
137
}
135
138
log.Info("done flushing PDS cursors on shutdown")
···
142
145
defer span.End()
143
146
if errs := s.flushCursors(ctx); len(errs) > 0 {
144
147
for _, err := range errs {
145
-
log.Errorf("failed to flush cursors: %s", err)
148
+
log.Error("failed to flush cursors", "err", err)
146
149
}
147
150
}
148
151
log.Debug("done flushing PDS cursors")
···
210
213
errs := <-s.shutdownResult
211
214
if len(errs) > 0 {
212
215
for _, err := range errs {
213
-
log.Errorf("shutdown error: %s", err)
216
+
log.Error("shutdown error", "err", err)
214
217
}
215
218
}
216
219
log.Info("slurper shutdown complete")
···
490
493
url := fmt.Sprintf("%s://%s/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", protocol, host.Host, cursor)
491
494
con, res, err := d.DialContext(ctx, url, nil)
492
495
if err != nil {
493
-
log.Warnw("dialing failed", "pdsHost", host.Host, "err", err, "backoff", backoff)
496
+
log.Warn("dialing failed", "pdsHost", host.Host, "err", err, "backoff", backoff)
494
497
time.Sleep(sleepForBackoff(backoff))
495
498
backoff++
496
499
497
500
if backoff > 15 {
498
-
log.Warnw("pds does not appear to be online, disabling for now", "pdsHost", host.Host)
501
+
log.Warn("pds does not appear to be online, disabling for now", "pdsHost", host.Host)
499
502
if err := s.db.Model(&models.PDS{}).Where("id = ?", host.ID).Update("registered", false).Error; err != nil {
500
-
log.Errorf("failed to unregister failing pds: %w", err)
503
+
log.Error("failed to unregister failing pds", "err", err)
501
504
}
502
505
503
506
return
···
506
509
continue
507
510
}
508
511
509
-
log.Info("event subscription response code: ", res.StatusCode)
512
+
log.Info("event subscription response", "code", res.StatusCode)
510
513
511
514
curCursor := cursor
512
515
if err := s.handleConnection(ctx, host, con, &cursor, sub); err != nil {
513
516
if errors.Is(err, ErrTimeoutShutdown) {
514
-
log.Infof("shutting down pds subscription to %s, no activity after %s", host.Host, EventsTimeout)
517
+
log.Info("shutting down pds subscription after timeout", "host", host.Host, "time", EventsTimeout)
515
518
return
516
519
}
517
-
log.Warnf("connection to %q failed: %s", host.Host, err)
520
+
log.Warn("connection to failed", "host", host.Host, "err", err)
518
521
}
519
522
520
523
if cursor > curCursor {
···
545
548
546
549
rsc := &events.RepoStreamCallbacks{
547
550
RepoCommit: func(evt *comatproto.SyncSubscribeRepos_Commit) error {
548
-
log.Debugw("got remote repo event", "pdsHost", host.Host, "repo", evt.Repo, "seq", evt.Seq)
551
+
log.Debug("got remote repo event", "pdsHost", host.Host, "repo", evt.Repo, "seq", evt.Seq)
549
552
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
550
553
RepoCommit: evt,
551
554
}); err != nil {
552
-
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
555
+
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
553
556
}
554
557
*lastCursor = evt.Seq
555
558
···
560
563
return nil
561
564
},
562
565
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
563
-
log.Infow("got remote handle update event", "pdsHost", host.Host, "did", evt.Did, "handle", evt.Handle)
566
+
log.Info("got remote handle update event", "pdsHost", host.Host, "did", evt.Did, "handle", evt.Handle)
564
567
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
565
568
RepoHandle: evt,
566
569
}); err != nil {
567
-
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
570
+
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
568
571
}
569
572
*lastCursor = evt.Seq
570
573
···
575
578
return nil
576
579
},
577
580
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error {
578
-
log.Infow("got remote repo migrate event", "pdsHost", host.Host, "did", evt.Did, "migrateTo", evt.MigrateTo)
581
+
log.Info("got remote repo migrate event", "pdsHost", host.Host, "did", evt.Did, "migrateTo", evt.MigrateTo)
579
582
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
580
583
RepoMigrate: evt,
581
584
}); err != nil {
582
-
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
585
+
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
583
586
}
584
587
*lastCursor = evt.Seq
585
588
···
590
593
return nil
591
594
},
592
595
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
593
-
log.Infow("got remote repo tombstone event", "pdsHost", host.Host, "did", evt.Did)
596
+
log.Info("got remote repo tombstone event", "pdsHost", host.Host, "did", evt.Did)
594
597
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
595
598
RepoTombstone: evt,
596
599
}); err != nil {
597
-
log.Errorf("failed handling event from %q (%d): %s", host.Host, evt.Seq, err)
600
+
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
598
601
}
599
602
*lastCursor = evt.Seq
600
603
···
605
608
return nil
606
609
},
607
610
RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error {
608
-
log.Infow("info event", "name", info.Name, "message", info.Message, "pdsHost", host.Host)
611
+
log.Info("info event", "name", info.Name, "message", info.Message, "pdsHost", host.Host)
609
612
return nil
610
613
},
611
614
RepoIdentity: func(ident *comatproto.SyncSubscribeRepos_Identity) error {
612
-
log.Infow("identity event", "did", ident.Did)
615
+
log.Info("identity event", "did", ident.Did)
613
616
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
614
617
RepoIdentity: ident,
615
618
}); err != nil {
616
-
log.Errorf("failed handling event from %q (%d): %s", host.Host, ident.Seq, err)
619
+
log.Error("failed handling event", "host", host.Host, "seq", ident.Seq, "err", err)
617
620
}
618
621
*lastCursor = ident.Seq
619
622
···
624
627
return nil
625
628
},
626
629
RepoAccount: func(acct *comatproto.SyncSubscribeRepos_Account) error {
627
-
log.Infow("account event", "did", acct.Did, "status", acct.Status)
630
+
log.Info("account event", "did", acct.Did, "status", acct.Status)
628
631
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
629
632
RepoAccount: acct,
630
633
}); err != nil {
631
-
log.Errorf("failed handling event from %q (%d): %s", host.Host, acct.Seq, err)
634
+
log.Error("failed handling event", "host", host.Host, "seq", acct.Seq, "err", err)
632
635
}
633
636
*lastCursor = acct.Seq
634
637
···
671
674
con.RemoteAddr().String(),
672
675
instrumentedRSC.EventHandler,
673
676
)
674
-
return events.HandleRepoStream(ctx, con, pool)
677
+
return events.HandleRepoStream(ctx, con, pool, nil)
675
678
}
676
679
677
680
func (s *Slurper) updateCursor(sub *activeSub, curs int64) error {
+13
-13
bgs/handlers.go
+13
-13
bgs/handlers.go
···
31
31
if errors.Is(err, gorm.ErrRecordNotFound) {
32
32
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
33
33
}
34
-
log.Errorw("failed to lookup user", "err", err, "did", did)
34
+
log.Error("failed to lookup user", "err", err, "did", did)
35
35
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
36
36
}
37
37
···
61
61
if errors.Is(err, mst.ErrNotFound) {
62
62
return nil, echo.NewHTTPError(http.StatusNotFound, "record not found in repo")
63
63
}
64
-
log.Errorw("failed to get record from repo", "err", err, "did", did, "collection", collection, "rkey", rkey)
64
+
log.Error("failed to get record from repo", "err", err, "did", did, "collection", collection, "rkey", rkey)
65
65
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get record from repo")
66
66
}
67
67
···
89
89
if errors.Is(err, gorm.ErrRecordNotFound) {
90
90
return nil, echo.NewHTTPError(http.StatusNotFound, "user not found")
91
91
}
92
-
log.Errorw("failed to lookup user", "err", err, "did", did)
92
+
log.Error("failed to lookup user", "err", err, "did", did)
93
93
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to lookup user")
94
94
}
95
95
···
117
117
// TODO: stream the response
118
118
buf := new(bytes.Buffer)
119
119
if err := s.repoman.ReadRepo(ctx, u.ID, since, buf); err != nil {
120
-
log.Errorw("failed to read repo into buffer", "err", err, "did", did)
120
+
log.Error("failed to read repo into buffer", "err", err, "did", did)
121
121
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to read repo into buffer")
122
122
}
123
123
···
170
170
return echo.NewHTTPError(http.StatusUnauthorized, "domain is banned")
171
171
}
172
172
173
-
log.Warnf("TODO: better host validation for crawl requests")
173
+
log.Warn("TODO: better host validation for crawl requests")
174
174
175
175
clientHost := fmt.Sprintf("%s://%s", u.Scheme, host)
176
176
···
191
191
if len(s.nextCrawlers) != 0 {
192
192
blob, err := json.Marshal(body)
193
193
if err != nil {
194
-
log.Warnw("could not forward requestCrawl, json err", "err", err)
194
+
log.Warn("could not forward requestCrawl, json err", "err", err)
195
195
} else {
196
196
go func(bodyBlob []byte) {
197
197
for _, rpu := range s.nextCrawlers {
···
201
201
response.Body.Close()
202
202
}
203
203
if err != nil || response == nil {
204
-
log.Warnw("requestCrawl forward failed", "host", rpu, "err", err)
204
+
log.Warn("requestCrawl forward failed", "host", rpu, "err", err)
205
205
} else if response.StatusCode != http.StatusOK {
206
-
log.Warnw("requestCrawl forward failed", "host", rpu, "status", response.Status)
206
+
log.Warn("requestCrawl forward failed", "host", rpu, "status", response.Status)
207
207
} else {
208
-
log.Infow("requestCrawl forward successful", "host", rpu)
208
+
log.Info("requestCrawl forward successful", "host", rpu)
209
209
}
210
210
}
211
211
}(blob)
···
231
231
if err == gorm.ErrRecordNotFound {
232
232
return &comatprototypes.SyncListRepos_Output{}, nil
233
233
}
234
-
log.Errorw("failed to query users", "err", err)
234
+
log.Error("failed to query users", "err", err)
235
235
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to query users")
236
236
}
237
237
···
252
252
253
253
root, err := s.repoman.GetRepoRoot(ctx, user.ID)
254
254
if err != nil {
255
-
log.Errorw("failed to get repo root", "err", err, "did", user.Did)
255
+
log.Error("failed to get repo root", "err", err, "did", user.Did)
256
256
return nil, echo.NewHTTPError(http.StatusInternalServerError, fmt.Sprintf("failed to get repo root for (%s): %v", user.Did, err.Error()))
257
257
}
258
258
···
303
303
304
304
root, err := s.repoman.GetRepoRoot(ctx, u.ID)
305
305
if err != nil {
306
-
log.Errorw("failed to get repo root", "err", err, "did", u.Did)
306
+
log.Error("failed to get repo root", "err", err, "did", u.Did)
307
307
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get repo root")
308
308
}
309
309
310
310
rev, err := s.repoman.GetRepoRev(ctx, u.ID)
311
311
if err != nil {
312
-
log.Errorw("failed to get repo rev", "err", err, "did", u.Did)
312
+
log.Error("failed to get repo rev", "err", err, "did", u.Did)
313
313
return nil, echo.NewHTTPError(http.StatusInternalServerError, "failed to get repo rev")
314
314
}
315
315
+9
-8
carstore/bs.go
+9
-8
carstore/bs.go
···
6
6
"context"
7
7
"fmt"
8
8
"io"
9
+
"log/slog"
9
10
"os"
10
11
"path/filepath"
11
12
"sort"
···
24
25
cbor "github.com/ipfs/go-ipld-cbor"
25
26
ipld "github.com/ipfs/go-ipld-format"
26
27
"github.com/ipfs/go-libipfs/blocks"
27
-
logging "github.com/ipfs/go-log"
28
28
car "github.com/ipld/go-car"
29
29
carutil "github.com/ipld/go-car/util"
30
30
cbg "github.com/whyrusleeping/cbor-gen"
···
41
41
var blockGetTotalCounterUsrskip = blockGetTotalCounter.WithLabelValues("true", "miss")
42
42
var blockGetTotalCounterCached = blockGetTotalCounter.WithLabelValues("false", "hit")
43
43
var blockGetTotalCounterNormal = blockGetTotalCounter.WithLabelValues("false", "miss")
44
-
45
-
var log = logging.Logger("carstore")
46
44
47
45
const MaxSliceLength = 2 << 20
48
46
···
67
65
68
66
lscLk sync.Mutex
69
67
lastShardCache map[models.Uid]*CarShard
68
+
69
+
log *slog.Logger
70
70
}
71
71
72
72
func NewCarStore(meta *gorm.DB, roots []string) (CarStore, error) {
···
92
92
meta: &CarStoreGormMeta{meta: meta},
93
93
rootDirs: roots,
94
94
lastShardCache: make(map[models.Uid]*CarShard),
95
+
log: slog.Default().With("system", "carstore"),
95
96
}, nil
96
97
}
97
98
···
883
884
if !os.IsNotExist(err) {
884
885
return err
885
886
}
886
-
log.Warnw("shard file we tried to delete did not exist", "shard", sh.ID, "path", sh.Path)
887
+
cs.log.Warn("shard file we tried to delete did not exist", "shard", sh.ID, "path", sh.Path)
887
888
}
888
889
}
889
890
···
1034
1035
st, err := os.Stat(sh.Path)
1035
1036
if err != nil {
1036
1037
if os.IsNotExist(err) {
1037
-
log.Warnw("missing shard, return size of zero", "path", sh.Path, "shard", sh.ID)
1038
+
slog.Warn("missing shard, return size of zero", "path", sh.Path, "shard", sh.ID, "system", "carstore")
1038
1039
return 0, nil
1039
1040
}
1040
1041
return 0, fmt.Errorf("stat %q: %w", sh.Path, err)
···
1155
1156
// still around but we're doing that anyways since compaction isn't a
1156
1157
// perfect process
1157
1158
1158
-
log.Debugw("repo has dirty dupes", "count", len(dupes), "uid", user, "staleRefs", len(staleRefs), "blockRefs", len(brefs))
1159
+
cs.log.Debug("repo has dirty dupes", "count", len(dupes), "uid", user, "staleRefs", len(staleRefs), "blockRefs", len(brefs))
1159
1160
1160
1161
//return nil, fmt.Errorf("WIP: not currently handling this case")
1161
1162
}
···
1350
1351
}); err != nil {
1351
1352
// If we ever fail to iterate a shard file because its
1352
1353
// corrupted, just log an error and skip the shard
1353
-
log.Errorw("iterating blocks in shard", "shard", s.ID, "err", err, "uid", user)
1354
+
cs.log.Error("iterating blocks in shard", "shard", s.ID, "err", err, "uid", user)
1354
1355
}
1355
1356
}
1356
1357
···
1368
1369
_ = fi.Close()
1369
1370
1370
1371
if err2 := os.Remove(fi.Name()); err2 != nil {
1371
-
log.Errorf("failed to remove shard file (%s) after failed db transaction: %w", fi.Name(), err2)
1372
+
cs.log.Error("failed to remove shard file after failed db transaction", "path", fi.Name(), "err", err2)
1372
1373
}
1373
1374
1374
1375
return err
+1
-1
cmd/beemo/firehose_consumer.go
+1
-1
cmd/beemo/firehose_consumer.go
···
57
57
)
58
58
logger.Info("beemo firehose scheduler configured", "scheduler", "parallel", "workers", parallelism)
59
59
60
-
return events.HandleRepoStream(ctx, con, scheduler)
60
+
return events.HandleRepoStream(ctx, con, scheduler, logger)
61
61
}
62
62
63
63
// TODO: move this to a "ParsePath" helper in syntax package?
+22
-17
cmd/bigsky/main.go
+22
-17
cmd/bigsky/main.go
···
3
3
import (
4
4
"context"
5
5
"fmt"
6
+
"log/slog"
6
7
"net/http"
7
8
_ "net/http/pprof"
8
9
"net/url"
···
30
31
_ "go.uber.org/automaxprocs"
31
32
32
33
"github.com/carlmjohnson/versioninfo"
33
-
logging "github.com/ipfs/go-log"
34
34
"github.com/urfave/cli/v2"
35
35
"go.opentelemetry.io/otel"
36
36
"go.opentelemetry.io/otel/attribute"
···
42
42
"gorm.io/plugin/opentelemetry/tracing"
43
43
)
44
44
45
-
var log = logging.Logger("bigsky")
45
+
var log = slog.Default().With("system", "bigsky")
46
46
47
47
func init() {
48
48
// control log level using, eg, GOLOG_LOG_LEVEL=debug
···
51
51
52
52
func main() {
53
53
if err := run(os.Args); err != nil {
54
-
log.Fatal(err)
54
+
slog.Error(err.Error())
55
+
os.Exit(1)
55
56
}
56
57
}
57
58
···
255
256
// At a minimum, you need to set
256
257
// OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
257
258
if ep := cctx.String("otel-exporter-otlp-endpoint"); ep != "" {
258
-
log.Infow("setting up trace exporter", "endpoint", ep)
259
+
slog.Info("setting up trace exporter", "endpoint", ep)
259
260
ctx, cancel := context.WithCancel(context.Background())
260
261
defer cancel()
261
262
262
263
exp, err := otlptracehttp.New(ctx)
263
264
if err != nil {
264
-
log.Fatalw("failed to create trace exporter", "error", err)
265
+
slog.Error("failed to create trace exporter", "error", err)
266
+
os.Exit(1)
265
267
}
266
268
defer func() {
267
269
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
268
270
defer cancel()
269
271
if err := exp.Shutdown(ctx); err != nil {
270
-
log.Errorw("failed to shutdown trace exporter", "error", err)
272
+
slog.Error("failed to shutdown trace exporter", "error", err)
271
273
}
272
274
}()
273
275
···
292
294
signals := make(chan os.Signal, 1)
293
295
signal.Notify(signals, syscall.SIGINT, syscall.SIGTERM)
294
296
297
+
// TODO: set slog default from param/env
298
+
295
299
// start observability/tracing (OTEL and jaeger)
296
300
if err := setupOTEL(cctx); err != nil {
297
301
return err
···
304
308
return err
305
309
}
306
310
307
-
log.Infow("setting up main database")
311
+
slog.Info("setting up main database")
308
312
dburl := cctx.String("db-url")
309
313
db, err := cliutil.SetupDatabase(dburl, cctx.Int("max-metadb-connections"))
310
314
if err != nil {
311
315
return err
312
316
}
313
317
314
-
log.Infow("setting up carstore database")
318
+
slog.Info("setting up carstore database")
315
319
csdburl := cctx.String("carstore-db-url")
316
320
csdb, err := cliutil.SetupDatabase(csdburl, cctx.Int("max-carstore-connections"))
317
321
if err != nil {
···
378
382
var persister events.EventPersistence
379
383
380
384
if dpd := cctx.String("disk-persister-dir"); dpd != "" {
381
-
log.Infow("setting up disk persister")
385
+
slog.Info("setting up disk persister")
382
386
383
387
pOpts := events.DefaultDiskPersistOptions()
384
388
pOpts.Retention = cctx.Duration("event-playback-ttl")
···
428
432
429
433
repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) {
430
434
if err := ix.HandleRepoEvent(ctx, evt); err != nil {
431
-
log.Errorw("failed to handle repo event", "err", err)
435
+
slog.Error("failed to handle repo event", "err", err)
432
436
}
433
437
}, false)
434
438
···
452
456
}
453
457
}
454
458
455
-
log.Infow("constructing bgs")
459
+
slog.Info("constructing bgs")
456
460
bgsConfig := libbgs.DefaultBGSConfig()
457
461
bgsConfig.SSL = !cctx.Bool("crawl-insecure-ws")
458
462
bgsConfig.CompactInterval = cctx.Duration("compact-interval")
···
469
473
if err != nil {
470
474
return fmt.Errorf("failed to parse next-crawler url: %w", err)
471
475
}
472
-
log.Infow("configuring relay for requestCrawl", "host", nextCrawlerUrls[i])
476
+
slog.Info("configuring relay for requestCrawl", "host", nextCrawlerUrls[i])
473
477
}
474
478
bgsConfig.NextCrawlers = nextCrawlerUrls
475
479
}
···
487
491
// set up metrics endpoint
488
492
go func() {
489
493
if err := bgs.StartMetrics(cctx.String("metrics-listen")); err != nil {
490
-
log.Fatalf("failed to start metrics endpoint: %s", err)
494
+
log.Error("failed to start metrics endpoint", "err", err)
495
+
os.Exit(1)
491
496
}
492
497
}()
493
498
···
498
503
bgsErr <- err
499
504
}()
500
505
501
-
log.Infow("startup complete")
506
+
slog.Info("startup complete")
502
507
select {
503
508
case <-signals:
504
509
log.Info("received shutdown signal")
505
510
errs := bgs.Shutdown()
506
511
for err := range errs {
507
-
log.Errorw("error during BGS shutdown", "err", err)
512
+
slog.Error("error during BGS shutdown", "err", err)
508
513
}
509
514
case err := <-bgsErr:
510
515
if err != nil {
511
-
log.Errorw("error during BGS startup", "err", err)
516
+
slog.Error("error during BGS startup", "err", err)
512
517
}
513
518
log.Info("shutting down")
514
519
errs := bgs.Shutdown()
515
520
for err := range errs {
516
-
log.Errorw("error during BGS shutdown", "err", err)
521
+
slog.Error("error during BGS shutdown", "err", err)
517
522
}
518
523
}
519
524
+1
-1
cmd/goat/firehose.go
+1
-1
cmd/goat/firehose.go
···
130
130
rsc.EventHandler,
131
131
)
132
132
slog.Info("starting firehose consumer", "relayHost", relayHost)
133
-
return events.HandleRepoStream(ctx, con, scheduler)
133
+
return events.HandleRepoStream(ctx, con, scheduler, nil)
134
134
}
135
135
136
136
// TODO: move this to a "ParsePath" helper in syntax package?
+2
-2
cmd/gosky/car.go
+2
-2
cmd/gosky/car.go
···
64
64
if topDir == "" {
65
65
topDir = did.String()
66
66
}
67
-
log.Infof("writing output to: %s", topDir)
67
+
log.Info("writing output", "topDir", topDir)
68
68
69
69
commitPath := topDir + "/_commit"
70
70
os.MkdirAll(filepath.Dir(commitPath), os.ModePerm)
···
90
90
if err != nil {
91
91
return err
92
92
}
93
-
log.Debugf("processing record: %s", k)
93
+
log.Debug("processing record", "rec", k)
94
94
95
95
// TODO: check if path is safe more carefully
96
96
recPath := topDir + "/" + k
+15
-9
cmd/gosky/debug.go
+15
-9
cmd/gosky/debug.go
···
106
106
}
107
107
108
108
seqScheduler := sequential.NewScheduler("debug-inspect-event", rsc.EventHandler)
109
-
err = events.HandleRepoStream(ctx, con, seqScheduler)
109
+
err = events.HandleRepoStream(ctx, con, seqScheduler, nil)
110
110
if err != errFoundIt {
111
111
return err
112
112
}
···
284
284
},
285
285
}
286
286
seqScheduler := sequential.NewScheduler("debug-stream", rsc.EventHandler)
287
-
err = events.HandleRepoStream(ctx, con, seqScheduler)
287
+
err = events.HandleRepoStream(ctx, con, seqScheduler, nil)
288
288
if err != nil {
289
289
return err
290
290
}
···
390
390
go func(i int, url string) {
391
391
con, _, err := d.Dial(url, http.Header{})
392
392
if err != nil {
393
-
log.Fatalf("Dial failure on url%d: %s", i+1, err)
393
+
log.Error("Dial failure", "i", i, "url", url, "err", err)
394
+
os.Exit(1)
394
395
}
395
396
396
397
ctx := context.TODO()
···
405
406
},
406
407
}
407
408
seqScheduler := sequential.NewScheduler(fmt.Sprintf("debug-stream-%d", i+1), rsc.EventHandler)
408
-
if err := events.HandleRepoStream(ctx, con, seqScheduler); err != nil {
409
-
log.Fatalf("HandleRepoStream failure on url%d: %s", i+1, err)
409
+
if err := events.HandleRepoStream(ctx, con, seqScheduler, nil); err != nil {
410
+
log.Error("HandleRepoStream failure", "i", i, "url", url, "err", err)
411
+
os.Exit(1)
410
412
}
411
413
}(i, url)
412
414
}
···
876
878
logger := log.With("host", cctx.String("host-1"))
877
879
repo1bytes, err := comatproto.SyncGetRepo(ctx, &xrpc1, did.String(), "")
878
880
if err != nil {
879
-
logger.Fatalf("getting repo: %s", err)
881
+
logger.Error("getting repo", "err", err)
882
+
os.Exit(1)
880
883
return
881
884
}
882
885
883
886
rep1, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo1bytes))
884
887
if err != nil {
885
-
logger.Fatalf("reading repo: %s", err)
888
+
logger.Error("reading repo", "err", err)
889
+
os.Exit(1)
886
890
return
887
891
}
888
892
}()
···
893
897
logger := log.With("host", cctx.String("host-2"))
894
898
repo2bytes, err := comatproto.SyncGetRepo(ctx, &xrpc2, did.String(), "")
895
899
if err != nil {
896
-
logger.Fatalf("getting repo: %s", err)
900
+
logger.Error("getting repo", "err", err)
901
+
os.Exit(1)
897
902
return
898
903
}
899
904
900
905
rep2, err = repo.ReadRepoFromCar(ctx, bytes.NewReader(repo2bytes))
901
906
if err != nil {
902
-
logger.Fatalf("reading repo: %s", err)
907
+
logger.Error("reading repo", "err", err)
908
+
os.Exit(1)
903
909
return
904
910
}
905
911
}()
+6
-3
cmd/gosky/main.go
+6
-3
cmd/gosky/main.go
···
7
7
"encoding/json"
8
8
"fmt"
9
9
"io"
10
+
"log/slog"
10
11
"net/http"
11
12
"os"
12
13
"os/signal"
···
39
40
_ "github.com/joho/godotenv/autoload"
40
41
41
42
"github.com/carlmjohnson/versioninfo"
42
-
logging "github.com/ipfs/go-log"
43
43
"github.com/polydawn/refmt/cbor"
44
44
rejson "github.com/polydawn/refmt/json"
45
45
"github.com/polydawn/refmt/shared"
46
46
cli "github.com/urfave/cli/v2"
47
47
)
48
48
49
-
var log = logging.Logger("gosky")
49
+
var log = slog.Default().With("system", "gosky")
50
50
51
51
func main() {
52
52
run(os.Args)
···
80
80
EnvVars: []string{"ATP_PLC_HOST"},
81
81
},
82
82
}
83
+
84
+
// TODO: slog.SetDefault from param/env
85
+
83
86
app.Commands = []*cli.Command{
84
87
accountCmd,
85
88
adminCmd,
···
339
342
},
340
343
}
341
344
seqScheduler := sequential.NewScheduler(con.RemoteAddr().String(), rsc.EventHandler)
342
-
return events.HandleRepoStream(ctx, con, seqScheduler)
345
+
return events.HandleRepoStream(ctx, con, seqScheduler, log)
343
346
},
344
347
}
345
348
+4
-4
cmd/gosky/streamdiff.go
+4
-4
cmd/gosky/streamdiff.go
···
58
58
},
59
59
}
60
60
seqScheduler := sequential.NewScheduler("streamA", rsc.EventHandler)
61
-
err = events.HandleRepoStream(ctx, cona, seqScheduler)
61
+
err = events.HandleRepoStream(ctx, cona, seqScheduler, log)
62
62
if err != nil {
63
-
log.Errorf("stream A failed: %s", err)
63
+
log.Error("stream A failed", "err", err)
64
64
}
65
65
}()
66
66
···
82
82
}
83
83
84
84
seqScheduler := sequential.NewScheduler("streamB", rsc.EventHandler)
85
-
err = events.HandleRepoStream(ctx, conb, seqScheduler)
85
+
err = events.HandleRepoStream(ctx, conb, seqScheduler, log)
86
86
if err != nil {
87
-
log.Errorf("stream B failed: %s", err)
87
+
log.Error("stream B failed", "err", err)
88
88
}
89
89
}()
90
90
+1
-1
cmd/gosky/sync.go
+1
-1
cmd/gosky/sync.go
-3
cmd/laputa/main.go
-3
cmd/laputa/main.go
···
14
14
_ "go.uber.org/automaxprocs"
15
15
16
16
"github.com/carlmjohnson/versioninfo"
17
-
logging "github.com/ipfs/go-log"
18
17
"github.com/urfave/cli/v2"
19
18
"go.opentelemetry.io/otel"
20
19
"go.opentelemetry.io/otel/attribute"
···
24
23
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
25
24
"gorm.io/plugin/opentelemetry/tracing"
26
25
)
27
-
28
-
var log = logging.Logger("laputa")
29
26
30
27
func main() {
31
28
run(os.Args)
+20
-14
cmd/rainbow/main.go
+20
-14
cmd/rainbow/main.go
···
3
3
import (
4
4
"context"
5
5
"github.com/bluesky-social/indigo/events"
6
+
"log/slog"
6
7
_ "net/http/pprof"
7
8
"os"
8
9
"os/signal"
···
15
16
_ "go.uber.org/automaxprocs"
16
17
17
18
"github.com/carlmjohnson/versioninfo"
18
-
logging "github.com/ipfs/go-log"
19
19
"github.com/urfave/cli/v2"
20
20
"go.opentelemetry.io/otel"
21
21
"go.opentelemetry.io/otel/attribute"
···
25
25
semconv "go.opentelemetry.io/otel/semconv/v1.4.0"
26
26
)
27
27
28
-
var log = logging.Logger("rainbow")
28
+
var log = slog.Default().With("system", "rainbow")
29
29
30
30
func init() {
31
31
// control log level using, eg, GOLOG_LOG_LEVEL=debug
32
-
logging.SetAllLoggers(logging.LevelDebug)
32
+
//logging.SetAllLoggers(logging.LevelDebug)
33
33
}
34
34
35
35
func main() {
···
90
90
},
91
91
}
92
92
93
+
// TODO: slog.SetDefault and set module `var log *slog.Logger` based on flags and env
94
+
93
95
app.Action = Splitter
94
96
err := app.Run(os.Args)
95
97
if err != nil {
96
-
log.Fatal(err)
98
+
log.Error(err.Error())
99
+
os.Exit(1)
97
100
}
98
101
}
99
102
···
108
111
// At a minimum, you need to set
109
112
// OTEL_EXPORTER_OTLP_ENDPOINT=http://localhost:4318
110
113
if ep := os.Getenv("OTEL_EXPORTER_OTLP_ENDPOINT"); ep != "" {
111
-
log.Infow("setting up trace exporter", "endpoint", ep)
114
+
log.Info("setting up trace exporter", "endpoint", ep)
112
115
ctx, cancel := context.WithCancel(context.Background())
113
116
defer cancel()
114
117
115
118
exp, err := otlptracehttp.New(ctx)
116
119
if err != nil {
117
-
log.Fatalw("failed to create trace exporter", "error", err)
120
+
log.Error("failed to create trace exporter", "error", err)
121
+
os.Exit(1)
118
122
}
119
123
defer func() {
120
124
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
121
125
defer cancel()
122
126
if err := exp.Shutdown(ctx); err != nil {
123
-
log.Errorw("failed to shutdown trace exporter", "error", err)
127
+
log.Error("failed to shutdown trace exporter", "error", err)
124
128
}
125
129
}()
126
130
···
142
146
var spl *splitter.Splitter
143
147
var err error
144
148
if persistPath != "" {
145
-
log.Infof("building splitter with storage at: %s", persistPath)
149
+
log.Info("building splitter with storage at", "path", persistPath)
146
150
ppopts := events.PebblePersistOptions{
147
151
DbPath: persistPath,
148
152
PersistDuration: time.Duration(float64(time.Hour) * cctx.Float64("persist-hours")),
···
164
168
spl, err = splitter.NewSplitter(conf)
165
169
}
166
170
if err != nil {
167
-
log.Fatalw("failed to create splitter", "path", persistPath, "error", err)
171
+
log.Error("failed to create splitter", "path", persistPath, "error", err)
172
+
os.Exit(1)
168
173
return err
169
174
}
170
175
171
176
// set up metrics endpoint
172
177
go func() {
173
178
if err := spl.StartMetrics(cctx.String("metrics-listen")); err != nil {
174
-
log.Fatalf("failed to start metrics endpoint: %s", err)
179
+
log.Error("failed to start metrics endpoint", "err", err)
180
+
os.Exit(1)
175
181
}
176
182
}()
177
183
···
182
188
runErr <- err
183
189
}()
184
190
185
-
log.Infow("startup complete")
191
+
log.Info("startup complete")
186
192
select {
187
193
case <-signals:
188
194
log.Info("received shutdown signal")
189
195
if err := spl.Shutdown(); err != nil {
190
-
log.Errorw("error during Splitter shutdown", "err", err)
196
+
log.Error("error during Splitter shutdown", "err", err)
191
197
}
192
198
case err := <-runErr:
193
199
if err != nil {
194
-
log.Errorw("error during Splitter startup", "err", err)
200
+
log.Error("error during Splitter startup", "err", err)
195
201
}
196
202
log.Info("shutting down")
197
203
if err := spl.Shutdown(); err != nil {
198
-
log.Errorw("error during Splitter shutdown", "err", err)
204
+
log.Error("error during Splitter shutdown", "err", err)
199
205
}
200
206
}
201
207
+1
-1
cmd/sonar/main.go
+1
-1
cmd/sonar/main.go
-3
cmd/stress/main.go
-3
cmd/stress/main.go
···
26
26
_ "github.com/joho/godotenv/autoload"
27
27
28
28
"github.com/carlmjohnson/versioninfo"
29
-
logging "github.com/ipfs/go-log"
30
29
"github.com/ipld/go-car"
31
30
cli "github.com/urfave/cli/v2"
32
31
)
33
-
34
-
var log = logging.Logger("stress")
35
32
36
33
func main() {
37
34
run(os.Args)
+18
-10
events/consumer.go
+18
-10
events/consumer.go
···
4
4
"context"
5
5
"fmt"
6
6
"io"
7
+
"log/slog"
7
8
"net"
8
9
"time"
9
10
···
108
109
return n, err
109
110
}
110
111
111
-
func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler) error {
112
+
// HandleRepoStream
113
+
// con is source of events
114
+
// sched gets AddWork for each event
115
+
// log may be nil for default logger
116
+
func HandleRepoStream(ctx context.Context, con *websocket.Conn, sched Scheduler, log *slog.Logger) error {
117
+
if log == nil {
118
+
log = slog.Default().With("system", "events")
119
+
}
112
120
ctx, cancel := context.WithCancel(ctx)
113
121
defer cancel()
114
122
defer sched.Shutdown()
···
124
132
select {
125
133
case <-t.C:
126
134
if err := con.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(time.Second*10)); err != nil {
127
-
log.Warnf("failed to ping: %s", err)
135
+
log.Warn("failed to ping", "err", err)
128
136
}
129
137
case <-ctx.Done():
130
138
con.Close()
···
145
153
146
154
con.SetPongHandler(func(_ string) error {
147
155
if err := con.SetReadDeadline(time.Now().Add(time.Minute)); err != nil {
148
-
log.Errorf("failed to set read deadline: %s", err)
156
+
log.Error("failed to set read deadline", "err", err)
149
157
}
150
158
151
159
return nil
···
194
202
}
195
203
196
204
if evt.Seq < lastSeq {
197
-
log.Errorf("Got events out of order from stream (seq = %d, prev = %d)", evt.Seq, lastSeq)
205
+
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
198
206
}
199
207
200
208
lastSeq = evt.Seq
···
211
219
}
212
220
213
221
if evt.Seq < lastSeq {
214
-
log.Errorf("Got events out of order from stream (seq = %d, prev = %d)", evt.Seq, lastSeq)
222
+
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
215
223
}
216
224
lastSeq = evt.Seq
217
225
···
227
235
}
228
236
229
237
if evt.Seq < lastSeq {
230
-
log.Errorf("Got events out of order from stream (seq = %d, prev = %d)", evt.Seq, lastSeq)
238
+
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
231
239
}
232
240
lastSeq = evt.Seq
233
241
···
243
251
}
244
252
245
253
if evt.Seq < lastSeq {
246
-
log.Errorf("Got events out of order from stream (seq = %d, prev = %d)", evt.Seq, lastSeq)
254
+
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
247
255
}
248
256
lastSeq = evt.Seq
249
257
···
271
279
}
272
280
273
281
if evt.Seq < lastSeq {
274
-
log.Errorf("Got events out of order from stream (seq = %d, prev = %d)", evt.Seq, lastSeq)
282
+
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
275
283
}
276
284
lastSeq = evt.Seq
277
285
···
287
295
}
288
296
289
297
if evt.Seq < lastSeq {
290
-
log.Errorf("Got events out of order from stream (seq = %d, prev = %d)", evt.Seq, lastSeq)
298
+
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
291
299
}
292
300
lastSeq = evt.Seq
293
301
···
303
311
}
304
312
305
313
if evt.Seq < lastSeq {
306
-
log.Errorf("Got events out of order from stream (seq = %d, prev = %d)", evt.Seq, lastSeq)
314
+
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
307
315
}
308
316
309
317
lastSeq = evt.Seq
+2
-2
events/dbpersist.go
+2
-2
events/dbpersist.go
···
131
131
132
132
if needsFlush {
133
133
if err := p.Flush(context.Background()); err != nil {
134
-
log.Errorf("failed to flush batch: %s", err)
134
+
log.Error("failed to flush batch", "err", err)
135
135
}
136
136
}
137
137
}
···
323
323
func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*RepoEventRecord, error) {
324
324
// TODO: hack hack hack
325
325
if len(evt.Ops) > 8192 {
326
-
log.Errorf("(VERY BAD) truncating ops field in outgoing event (len = %d)", len(evt.Ops))
326
+
log.Error("(VERY BAD) truncating ops field in outgoing event", "len", len(evt.Ops))
327
327
evt.Ops = evt.Ops[:8192]
328
328
}
329
329
-5
events/dbpersist_test.go
-5
events/dbpersist_test.go
···
16
16
pds "github.com/bluesky-social/indigo/pds/data"
17
17
"github.com/bluesky-social/indigo/repomgr"
18
18
"github.com/bluesky-social/indigo/util"
19
-
logging "github.com/ipfs/go-log/v2"
20
19
"gorm.io/driver/sqlite"
21
20
"gorm.io/gorm"
22
21
)
23
-
24
-
func init() {
25
-
logging.SetAllLoggers(logging.LevelDebug)
26
-
}
27
22
28
23
func BenchmarkDBPersist(b *testing.B) {
29
24
ctx := context.Background()
+5
-5
events/diskpersist.go
+5
-5
events/diskpersist.go
···
312
312
dp.lk.Lock()
313
313
if err := dp.flushLog(ctx); err != nil {
314
314
// TODO: this happening is quite bad. Need a recovery strategy
315
-
log.Errorf("failed to flush disk log: %s", err)
315
+
log.Error("failed to flush disk log", "err", err)
316
316
}
317
317
dp.lk.Unlock()
318
318
}
···
354
354
case <-t.C:
355
355
if errs := dp.garbageCollect(ctx); len(errs) > 0 {
356
356
for _, err := range errs {
357
-
log.Errorf("garbage collection error: %s", err)
357
+
log.Error("garbage collection error", "err", err)
358
358
}
359
359
}
360
360
}
···
430
430
refsGarbageCollected.WithLabelValues().Add(float64(refsDeleted))
431
431
filesGarbageCollected.WithLabelValues().Add(float64(filesDeleted))
432
432
433
-
log.Infow("garbage collection complete",
433
+
log.Info("garbage collection complete",
434
434
"filesDeleted", filesDeleted,
435
435
"refsDeleted", refsDeleted,
436
436
"oldRefsFound", oldRefsFound,
···
696
696
return nil, err
697
697
}
698
698
if since > lastSeq {
699
-
log.Errorw("playback cursor is greater than last seq of file checked",
699
+
log.Error("playback cursor is greater than last seq of file checked",
700
700
"since", since,
701
701
"lastSeq", lastSeq,
702
702
"filename", fn,
···
778
778
return nil, err
779
779
}
780
780
default:
781
-
log.Warnw("unrecognized event kind coming from log file", "seq", h.Seq, "kind", h.Kind)
781
+
log.Warn("unrecognized event kind coming from log file", "seq", h.Seq, "kind", h.Kind)
782
782
return nil, fmt.Errorf("halting on unrecognized event kind")
783
783
}
784
784
}
+12
-9
events/events.go
+12
-9
events/events.go
···
6
6
"errors"
7
7
"fmt"
8
8
"io"
9
+
"log/slog"
9
10
"sync"
10
11
"time"
11
12
···
14
15
"github.com/bluesky-social/indigo/models"
15
16
"github.com/prometheus/client_golang/prometheus"
16
17
17
-
logging "github.com/ipfs/go-log"
18
18
cbg "github.com/whyrusleeping/cbor-gen"
19
19
"go.opentelemetry.io/otel"
20
20
)
21
21
22
-
var log = logging.Logger("events")
22
+
var log = slog.Default().With("system", "events")
23
23
24
24
type Scheduler interface {
25
25
AddWork(ctx context.Context, repo string, val *XRPCStreamEvent) error
···
34
34
crossoverBufferSize int
35
35
36
36
persister EventPersistence
37
+
38
+
log *slog.Logger
37
39
}
38
40
39
41
func NewEventManager(persister EventPersistence) *EventManager {
···
41
43
bufferSize: 16 << 10,
42
44
crossoverBufferSize: 512,
43
45
persister: persister,
46
+
log: slog.Default().With("system", "events"),
44
47
}
45
48
46
49
persister.SetEventBroadcaster(em.broadcastEvent)
···
67
70
func (em *EventManager) broadcastEvent(evt *XRPCStreamEvent) {
68
71
// the main thing we do is send it out, so MarshalCBOR once
69
72
if err := evt.Preserialize(); err != nil {
70
-
log.Errorf("broadcast serialize failed, %s", err)
73
+
em.log.Error("broadcast serialize failed", "err", err)
71
74
// serialize isn't going to go better later, this event is cursed
72
75
return
73
76
}
···
93
96
// code
94
97
s.filter = func(*XRPCStreamEvent) bool { return false }
95
98
96
-
log.Warnw("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident)
99
+
em.log.Warn("dropping slow consumer due to event overflow", "bufferSize", len(s.outgoing), "ident", s.ident)
97
100
go func(torem *Subscriber) {
98
101
torem.lk.Lock()
99
102
if !torem.cleanedUp {
···
104
107
},
105
108
}:
106
109
case <-time.After(time.Second * 5):
107
-
log.Warnw("failed to send error frame to backed up consumer", "ident", torem.ident)
110
+
em.log.Warn("failed to send error frame to backed up consumer", "ident", torem.ident)
108
111
}
109
112
}
110
113
torem.lk.Unlock()
···
121
124
// accept a uid. The lookup inside the persister is notably expensive (despite
122
125
// being an lru cache?)
123
126
if err := em.persister.Persist(ctx, evt); err != nil {
124
-
log.Errorf("failed to persist outbound event: %s", err)
127
+
em.log.Error("failed to persist outbound event", "err", err)
125
128
}
126
129
}
127
130
···
370
373
}
371
374
}); err != nil {
372
375
if errors.Is(err, ErrPlaybackShutdown) {
373
-
log.Warnf("events playback: %s", err)
376
+
em.log.Warn("events playback", "err", err)
374
377
} else {
375
-
log.Errorf("events playback: %s", err)
378
+
em.log.Error("events playback", "err", err)
376
379
}
377
380
378
381
// TODO: send an error frame or something?
···
400
403
}
401
404
}); err != nil {
402
405
if !errors.Is(err, ErrCaughtUp) {
403
-
log.Errorf("events playback: %s", err)
406
+
em.log.Error("events playback", "err", err)
404
407
405
408
// TODO: send an error frame or something?
406
409
close(out)
+6
-6
events/pebblepersist.go
+6
-6
events/pebblepersist.go
···
193
193
case <-ticker.C:
194
194
err := pp.GarbageCollect(ctx)
195
195
if err != nil {
196
-
log.Errorw("GC err", "err", err)
196
+
log.Error("GC err", "err", err)
197
197
}
198
198
case <-ctx.Done():
199
199
return
···
239
239
sizeBefore, _ := pp.db.EstimateDiskUsage(zeroKey[:], ffffKey[:])
240
240
if seq == -1 {
241
241
// nothing to delete
242
-
log.Infow("pebble gc nop", "size", sizeBefore)
242
+
log.Info("pebble gc nop", "size", sizeBefore)
243
243
return nil
244
244
}
245
245
var key [16]byte
246
246
setKeySeqMillis(key[:], seq, lastKeyTime)
247
-
log.Infow("pebble gc start", "to", hex.EncodeToString(key[:]))
247
+
log.Info("pebble gc start", "to", hex.EncodeToString(key[:]))
248
248
err = pp.db.DeleteRange(zeroKey[:], key[:], pebble.Sync)
249
249
if err != nil {
250
250
return err
251
251
}
252
252
sizeAfter, _ := pp.db.EstimateDiskUsage(zeroKey[:], ffffKey[:])
253
-
log.Infow("pebble gc", "before", sizeBefore, "after", sizeAfter)
253
+
log.Info("pebble gc", "before", sizeBefore, "after", sizeAfter)
254
254
start := time.Now()
255
255
err = pp.db.Compact(zeroKey[:], key[:], true)
256
256
if err != nil {
257
-
log.Warnw("pebble gc compact", "err", err)
257
+
log.Warn("pebble gc compact", "err", err)
258
258
}
259
259
dt := time.Since(start)
260
-
log.Infow("pebble gc compact ok", "dt", dt)
260
+
log.Info("pebble gc compact ok", "dt", dt)
261
261
return nil
262
262
}
+14
-12
events/schedulers/autoscaling/autoscaling.go
+14
-12
events/schedulers/autoscaling/autoscaling.go
···
2
2
3
3
import (
4
4
"context"
5
+
"log/slog"
5
6
"sync"
6
7
"time"
7
8
8
9
"github.com/bluesky-social/indigo/events"
9
10
"github.com/bluesky-social/indigo/events/schedulers"
10
-
logging "github.com/ipfs/go-log"
11
11
"github.com/prometheus/client_golang/prometheus"
12
12
)
13
-
14
-
var log = logging.Logger("autoscaling-scheduler")
15
13
16
14
// Scheduler is a scheduler that will scale up and down the number of workers based on the throughput of the workers.
17
15
type Scheduler struct {
···
40
38
autoscaleFrequency time.Duration
41
39
autoscalerIn chan struct{}
42
40
autoscalerOut chan struct{}
41
+
42
+
log *slog.Logger
43
43
}
44
44
45
45
type AutoscaleSettings struct {
···
99
99
autoscaleFrequency: autoscaleSettings.AutoscaleFrequency,
100
100
autoscalerIn: make(chan struct{}),
101
101
autoscalerOut: make(chan struct{}),
102
+
103
+
log: slog.Default().With("system", "autoscaling-scheduler"),
102
104
}
103
105
104
106
for i := 0; i < p.concurrency; i++ {
···
111
113
}
112
114
113
115
func (p *Scheduler) Shutdown() {
114
-
log.Debugf("shutting down autoscaling scheduler for %s", p.ident)
116
+
p.log.Debug("shutting down autoscaling scheduler", "ident", p.ident)
115
117
116
118
// stop autoscaling
117
119
p.autoscalerIn <- struct{}{}
118
120
close(p.autoscalerIn)
119
121
<-p.autoscalerOut
120
122
121
-
log.Debug("stopping autoscaling scheduler workers")
123
+
p.log.Debug("stopping autoscaling scheduler workers")
122
124
// stop workers
123
125
for i := 0; i < p.concurrency; i++ {
124
126
p.feeder <- &consumerTask{signal: "stop"}
125
127
}
126
128
close(p.feeder)
127
129
128
-
log.Debug("waiting for autoscaling scheduler workers to stop")
130
+
p.log.Debug("waiting for autoscaling scheduler workers to stop")
129
131
130
132
p.workerGroup.Wait()
131
133
132
-
log.Debug("stopping autoscaling scheduler throughput manager")
134
+
p.log.Debug("stopping autoscaling scheduler throughput manager")
133
135
p.throughputManager.Stop()
134
136
135
-
log.Debug("autoscaling scheduler shutdown complete")
137
+
p.log.Debug("autoscaling scheduler shutdown complete")
136
138
}
137
139
138
140
// Add autoscaling function
···
197
199
}
198
200
199
201
func (p *Scheduler) worker() {
200
-
log.Debugf("starting autoscaling worker for %s", p.ident)
202
+
p.log.Debug("starting autoscaling worker", "ident", p.ident)
201
203
p.workersActive.Inc()
202
204
p.workerGroup.Add(1)
203
205
defer p.workerGroup.Done()
···
205
207
for work != nil {
206
208
// Check if the work item contains a signal to stop the worker.
207
209
if work.signal == "stop" {
208
-
log.Debugf("stopping autoscaling worker for %s", p.ident)
210
+
p.log.Debug("stopping autoscaling worker", "ident", p.ident)
209
211
p.workersActive.Dec()
210
212
return
211
213
}
212
214
213
215
p.itemsActive.Inc()
214
216
if err := p.do(context.TODO(), work.val); err != nil {
215
-
log.Errorf("event handler failed: %s", err)
217
+
p.log.Error("event handler failed", "err", err)
216
218
}
217
219
p.itemsProcessed.Inc()
218
220
219
221
p.lk.Lock()
220
222
rem, ok := p.active[work.repo]
221
223
if !ok {
222
-
log.Errorf("should always have an 'active' entry if a worker is processing a job")
224
+
p.log.Error("should always have an 'active' entry if a worker is processing a job")
223
225
}
224
226
225
227
if len(rem) == 0 {
+9
-7
events/schedulers/parallel/parallel.go
+9
-7
events/schedulers/parallel/parallel.go
···
2
2
3
3
import (
4
4
"context"
5
+
"log/slog"
5
6
"sync"
6
7
7
8
"github.com/bluesky-social/indigo/events"
8
9
"github.com/bluesky-social/indigo/events/schedulers"
9
-
logging "github.com/ipfs/go-log"
10
10
11
11
"github.com/prometheus/client_golang/prometheus"
12
12
)
13
-
14
-
var log = logging.Logger("parallel-scheduler")
15
13
16
14
// Scheduler is a parallel scheduler that will run work on a fixed number of workers
17
15
type Scheduler struct {
···
33
31
itemsProcessed prometheus.Counter
34
32
itemsActive prometheus.Counter
35
33
workesActive prometheus.Gauge
34
+
35
+
log *slog.Logger
36
36
}
37
37
38
38
func NewScheduler(maxC, maxQ int, ident string, do func(context.Context, *events.XRPCStreamEvent) error) *Scheduler {
···
52
52
itemsProcessed: schedulers.WorkItemsProcessed.WithLabelValues(ident, "parallel"),
53
53
itemsActive: schedulers.WorkItemsActive.WithLabelValues(ident, "parallel"),
54
54
workesActive: schedulers.WorkersActive.WithLabelValues(ident, "parallel"),
55
+
56
+
log: slog.Default().With("system", "parallel-scheduler"),
55
57
}
56
58
57
59
for i := 0; i < maxC; i++ {
···
64
66
}
65
67
66
68
func (p *Scheduler) Shutdown() {
67
-
log.Infof("shutting down parallel scheduler for %s", p.ident)
69
+
p.log.Info("shutting down parallel scheduler", "ident", p.ident)
68
70
69
71
for i := 0; i < p.maxConcurrency; i++ {
70
72
p.feeder <- &consumerTask{
···
78
80
<-p.out
79
81
}
80
82
81
-
log.Info("parallel scheduler shutdown complete")
83
+
p.log.Info("parallel scheduler shutdown complete")
82
84
}
83
85
84
86
type consumerTask struct {
···
123
125
124
126
p.itemsActive.Inc()
125
127
if err := p.do(context.TODO(), work.val); err != nil {
126
-
log.Errorf("event handler failed: %s", err)
128
+
p.log.Error("event handler failed", "err", err)
127
129
}
128
130
p.itemsProcessed.Inc()
129
131
130
132
p.lk.Lock()
131
133
rem, ok := p.active[work.repo]
132
134
if !ok {
133
-
log.Errorf("should always have an 'active' entry if a worker is processing a job")
135
+
p.log.Error("should always have an 'active' entry if a worker is processing a job")
134
136
}
135
137
136
138
if len(rem) == 0 {
+1
-3
events/schedulers/sequential/sequential.go
+1
-3
events/schedulers/sequential/sequential.go
···
2
2
3
3
import (
4
4
"context"
5
-
6
5
"github.com/bluesky-social/indigo/events"
7
6
"github.com/bluesky-social/indigo/events/schedulers"
8
-
logging "github.com/ipfs/go-log"
9
7
"github.com/prometheus/client_golang/prometheus"
10
8
)
11
9
12
-
var log = logging.Logger("sequential-scheduler")
10
+
// var log = slog.Default().With("system", "sequential-scheduler")
13
11
14
12
// Scheduler is a sequential scheduler that will run work on a single worker
15
13
type Scheduler struct {
+1
-1
fakedata/accounts.go
+1
-1
fakedata/accounts.go
···
68
68
return nil, fmt.Errorf("account index didn't match: %d != %d (%s)", i, u.Index, u.AccountType)
69
69
}
70
70
}
71
-
log.Infof("loaded account catalog: regular=%d celebrity=%d", len(catalog.Regulars), len(catalog.Celebs))
71
+
log.Info("loaded account catalog", "regular", len(catalog.Regulars), "celebrity", len(catalog.Celebs))
72
72
return catalog, nil
73
73
}
74
74
+7
-3
fakedata/generators.go
+7
-3
fakedata/generators.go
···
7
7
"bytes"
8
8
"context"
9
9
"fmt"
10
+
"log/slog"
10
11
"math/rand"
11
12
"time"
12
13
···
16
17
"github.com/bluesky-social/indigo/xrpc"
17
18
18
19
"github.com/brianvoe/gofakeit/v6"
19
-
logging "github.com/ipfs/go-log"
20
20
)
21
21
22
-
var log = logging.Logger("fakedata")
22
+
var log = slog.Default().With("system", "fakedata")
23
+
24
+
func SetLogger(logger *slog.Logger) {
25
+
log = logger
26
+
}
23
27
24
28
func MeasureIterations(name string) func(int) {
25
29
start := time.Now()
···
28
32
return
29
33
}
30
34
total := time.Since(start)
31
-
log.Infof("%s wall runtime: count=%d total=%s mean=%s", name, count, total, total/time.Duration(count))
35
+
log.Info("wall runtime", "name", name, "count", count, "total", total, "rate", total/time.Duration(count))
32
36
}
33
37
}
34
38
+2
-2
go.mod
+2
-2
go.mod
···
31
31
github.com/ipfs/go-ipld-cbor v0.1.0
32
32
github.com/ipfs/go-ipld-format v0.6.0
33
33
github.com/ipfs/go-libipfs v0.7.0
34
-
github.com/ipfs/go-log v1.0.5
35
-
github.com/ipfs/go-log/v2 v2.5.1
36
34
github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4
37
35
github.com/ipld/go-car/v2 v2.13.1
38
36
github.com/jackc/pgx/v5 v5.5.0
···
90
88
github.com/go-redis/redis v6.15.9+incompatible // indirect
91
89
github.com/golang/snappy v0.0.4 // indirect
92
90
github.com/hashicorp/golang-lru v1.0.2 // indirect
91
+
github.com/ipfs/go-log v1.0.5 // indirect
92
+
github.com/ipfs/go-log/v2 v2.5.1 // indirect
93
93
github.com/jackc/puddle/v2 v2.2.1 // indirect
94
94
github.com/klauspost/compress v1.17.3 // indirect
95
95
github.com/kr/pretty v0.3.1 // indirect
+6
-2
indexer/crawler.go
+6
-2
indexer/crawler.go
···
3
3
import (
4
4
"context"
5
5
"fmt"
6
+
"log/slog"
6
7
"sync"
7
8
"time"
8
9
···
29
30
30
31
concurrency int
31
32
33
+
log *slog.Logger
34
+
32
35
done chan struct{}
33
36
}
34
37
35
-
func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int) (*CrawlDispatcher, error) {
38
+
func NewCrawlDispatcher(repoFn func(context.Context, *crawlWork) error, concurrency int, log *slog.Logger) (*CrawlDispatcher, error) {
36
39
if concurrency < 1 {
37
40
return nil, fmt.Errorf("must specify a non-zero positive integer for crawl dispatcher concurrency")
38
41
}
···
46
49
concurrency: concurrency,
47
50
todo: make(map[models.Uid]*crawlWork),
48
51
inProgress: make(map[models.Uid]*crawlWork),
52
+
log: log,
49
53
done: make(chan struct{}),
50
54
}
51
55
go out.CatchupRepoGaugePoller()
···
218
222
select {
219
223
case job := <-c.repoSync:
220
224
if err := c.doRepoCrawl(context.TODO(), job); err != nil {
221
-
log.Errorf("failed to perform repo crawl of %q: %s", job.act.Did, err)
225
+
c.log.Error("failed to perform repo crawl", "did", job.act.Did, "err", err)
222
226
}
223
227
224
228
// TODO: do we still just do this if it errors?
+31
-30
indexer/indexer.go
+31
-30
indexer/indexer.go
···
5
5
"database/sql"
6
6
"errors"
7
7
"fmt"
8
+
"log/slog"
8
9
"time"
9
10
10
11
comatproto "github.com/bluesky-social/indigo/api/atproto"
···
19
20
"github.com/bluesky-social/indigo/xrpc"
20
21
21
22
"github.com/ipfs/go-cid"
22
-
logging "github.com/ipfs/go-log"
23
23
"go.opentelemetry.io/otel"
24
24
"gorm.io/gorm"
25
25
"gorm.io/gorm/clause"
26
26
)
27
-
28
-
var log = logging.Logger("indexer")
29
27
30
28
const MaxEventSliceLength = 1000000
31
29
const MaxOpsSliceLength = 200
···
45
43
SendRemoteFollow func(context.Context, string, uint) error
46
44
CreateExternalUser func(context.Context, string) (*models.ActorInfo, error)
47
45
ApplyPDSClientSettings func(*xrpc.Client)
46
+
47
+
log *slog.Logger
48
48
}
49
49
50
50
func NewIndexer(db *gorm.DB, notifman notifs.NotificationManager, evtman *events.EventManager, didr did.Resolver, fetcher *RepoFetcher, crawl, aggregate, spider bool) (*Indexer, error) {
···
65
65
return nil
66
66
},
67
67
ApplyPDSClientSettings: func(*xrpc.Client) {},
68
+
log: slog.Default().With("system", "indexer"),
68
69
}
69
70
70
71
if crawl {
71
-
c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency)
72
+
c, err := NewCrawlDispatcher(fetcher.FetchAndIndexRepo, fetcher.MaxConcurrency, ix.log)
72
73
if err != nil {
73
74
return nil, err
74
75
}
···
90
91
ctx, span := otel.Tracer("indexer").Start(ctx, "HandleRepoEvent")
91
92
defer span.End()
92
93
93
-
log.Debugw("Handling Repo Event!", "uid", evt.User)
94
+
ix.log.Debug("Handling Repo Event!", "uid", evt.User)
94
95
95
96
outops := make([]*comatproto.SyncSubscribeRepos_RepoOp, 0, len(evt.Ops))
96
97
for _, op := range evt.Ops {
···
102
103
})
103
104
104
105
if err := ix.handleRepoOp(ctx, evt, &op); err != nil {
105
-
log.Errorw("failed to handle repo op", "err", err)
106
+
ix.log.Error("failed to handle repo op", "err", err)
106
107
}
107
108
}
108
109
···
119
120
toobig = true
120
121
}
121
122
122
-
log.Debugw("Sending event", "did", did)
123
+
ix.log.Debug("Sending event", "did", did)
123
124
if err := ix.events.AddEvent(ctx, &events.XRPCStreamEvent{
124
125
RepoCommit: &comatproto.SyncSubscribeRepos_Commit{
125
126
Repo: did,
···
197
198
if e.Type == "mention" {
198
199
_, err := ix.GetUserOrMissing(ctx, e.Value)
199
200
if err != nil {
200
-
log.Infow("failed to parse user mention", "ref", e.Value, "err", err)
201
+
ix.log.Info("failed to parse user mention", "ref", e.Value, "err", err)
201
202
}
202
203
}
203
204
}
···
205
206
if rec.Reply != nil {
206
207
if rec.Reply.Parent != nil {
207
208
if err := ix.crawlAtUriRef(ctx, rec.Reply.Parent.Uri); err != nil {
208
-
log.Infow("failed to crawl reply parent", "cid", op.RecCid, "replyuri", rec.Reply.Parent.Uri, "err", err)
209
+
ix.log.Info("failed to crawl reply parent", "cid", op.RecCid, "replyuri", rec.Reply.Parent.Uri, "err", err)
209
210
}
210
211
}
211
212
212
213
if rec.Reply.Root != nil {
213
214
if err := ix.crawlAtUriRef(ctx, rec.Reply.Root.Uri); err != nil {
214
-
log.Infow("failed to crawl reply root", "cid", op.RecCid, "rooturi", rec.Reply.Root.Uri, "err", err)
215
+
ix.log.Info("failed to crawl reply root", "cid", op.RecCid, "rooturi", rec.Reply.Root.Uri, "err", err)
215
216
}
216
217
}
217
218
}
···
220
221
case *bsky.FeedRepost:
221
222
if rec.Subject != nil {
222
223
if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil {
223
-
log.Infow("failed to crawl repost subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err)
224
+
ix.log.Info("failed to crawl repost subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err)
224
225
}
225
226
}
226
227
return nil
227
228
case *bsky.FeedLike:
228
229
if rec.Subject != nil {
229
230
if err := ix.crawlAtUriRef(ctx, rec.Subject.Uri); err != nil {
230
-
log.Infow("failed to crawl like subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err)
231
+
ix.log.Info("failed to crawl like subject", "cid", op.RecCid, "subjecturi", rec.Subject.Uri, "err", err)
231
232
}
232
233
}
233
234
return nil
234
235
case *bsky.GraphFollow:
235
236
_, err := ix.GetUserOrMissing(ctx, rec.Subject)
236
237
if err != nil {
237
-
log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err)
238
+
ix.log.Info("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err)
238
239
}
239
240
return nil
240
241
case *bsky.GraphBlock:
241
242
_, err := ix.GetUserOrMissing(ctx, rec.Subject)
242
243
if err != nil {
243
-
log.Infow("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err)
244
+
ix.log.Info("failed to crawl follow subject", "cid", op.RecCid, "subjectdid", rec.Subject, "err", err)
244
245
}
245
246
return nil
246
247
case *bsky.ActorProfile:
···
252
253
case *bsky.FeedGenerator:
253
254
return nil
254
255
default:
255
-
log.Warnw("unrecognized record type (crawling references)", "record", op.Record, "collection", op.Collection)
256
+
ix.log.Warn("unrecognized record type (crawling references)", "record", op.Record, "collection", op.Collection)
256
257
return nil
257
258
}
258
259
}
···
293
294
}
294
295
295
296
func (ix *Indexer) addUserToCrawler(ctx context.Context, ai *models.ActorInfo) error {
296
-
log.Debugw("Sending user to crawler: ", "did", ai.Did)
297
+
ix.log.Debug("Sending user to crawler: ", "did", ai.Did)
297
298
if ix.Crawler == nil {
298
299
return nil
299
300
}
···
395
396
}
396
397
397
398
func (ix *Indexer) handleRecordDelete(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) error {
398
-
log.Debugw("record delete event", "collection", op.Collection)
399
+
ix.log.Debug("record delete event", "collection", op.Collection)
399
400
400
401
switch op.Collection {
401
402
case "app.bsky.feed.post":
···
411
412
fp, err := ix.GetPost(ctx, uri)
412
413
if err != nil {
413
414
if errors.Is(err, gorm.ErrRecordNotFound) {
414
-
log.Warnw("deleting post weve never seen before. Weird.", "user", evt.User, "rkey", op.Rkey)
415
+
ix.log.Warn("deleting post weve never seen before. Weird.", "user", evt.User, "rkey", op.Rkey)
415
416
return nil
416
417
}
417
418
return err
···
425
426
return err
426
427
}
427
428
428
-
log.Warn("TODO: remove notifications on delete")
429
+
ix.log.Warn("TODO: remove notifications on delete")
429
430
/*
430
431
if err := ix.notifman.RemoveRepost(ctx, fp.Author, rr.ID, evt.User); err != nil {
431
432
return nil, err
···
466
467
return err
467
468
}
468
469
469
-
log.Warnf("need to delete vote notification")
470
+
ix.log.Warn("need to delete vote notification")
470
471
return nil
471
472
}
472
473
···
477
478
}
478
479
479
480
if q.RowsAffected == 0 {
480
-
log.Warnw("attempted to delete follow we did not have a record for", "user", evt.User, "rkey", op.Rkey)
481
+
ix.log.Warn("attempted to delete follow we did not have a record for", "user", evt.User, "rkey", op.Rkey)
481
482
return nil
482
483
}
483
484
···
485
486
}
486
487
487
488
func (ix *Indexer) handleRecordCreate(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) ([]uint, error) {
488
-
log.Debugw("record create event", "collection", op.Collection)
489
+
ix.log.Debug("record create event", "collection", op.Collection)
489
490
490
491
var out []uint
491
492
switch rec := op.Record.(type) {
···
535
536
case *bsky.FeedGenerator:
536
537
return out, nil
537
538
case *bsky.ActorProfile:
538
-
log.Debugf("TODO: got actor profile record creation, need to do something with this")
539
+
ix.log.Debug("TODO: got actor profile record creation, need to do something with this")
539
540
default:
540
541
return nil, fmt.Errorf("unrecognized record type (creation): %s", op.Collection)
541
542
}
···
609
610
}
610
611
611
612
func (ix *Indexer) handleRecordUpdate(ctx context.Context, evt *repomgr.RepoEvent, op *repomgr.RepoOp, local bool) error {
612
-
log.Debugw("record update event", "collection", op.Collection)
613
+
ix.log.Debug("record update event", "collection", op.Collection)
613
614
614
615
switch rec := op.Record.(type) {
615
616
case *bsky.FeedPost:
···
629
630
630
631
if oldReply != newReply {
631
632
// the 'replyness' of the post was changed... that's weird
632
-
log.Errorf("need to properly handle case where reply-ness of posts is changed")
633
+
ix.log.Error("need to properly handle case where reply-ness of posts is changed")
633
634
return nil
634
635
}
635
636
···
640
641
}
641
642
642
643
if replyto.ID != fp.ReplyTo {
643
-
log.Errorf("post was changed to be a reply to a different post")
644
+
ix.log.Error("post was changed to be a reply to a different post")
644
645
return nil
645
646
}
646
647
}
···
693
694
694
695
return ix.handleRecordCreateGraphFollow(ctx, rec, evt, op)
695
696
case *bsky.ActorProfile:
696
-
log.Debugf("TODO: got actor profile record update, need to do something with this")
697
+
ix.log.Debug("TODO: got actor profile record update, need to do something with this")
697
698
default:
698
699
return fmt.Errorf("unrecognized record type (update): %s", op.Collection)
699
700
}
···
767
768
// we're likely filling in a missing reference
768
769
if !maybe.Missing {
769
770
// TODO: we've already processed this record creation
770
-
log.Warnw("potentially erroneous event, duplicate create", "rkey", rkey, "user", user)
771
+
ix.log.Warn("potentially erroneous event, duplicate create", "rkey", rkey, "user", user)
771
772
}
772
773
773
774
if err := ix.db.Clauses(clause.OnConflict{
···
791
792
}
792
793
793
794
func (ix *Indexer) createMissingPostRecord(ctx context.Context, puri *util.ParsedUri) (*models.FeedPost, error) {
794
-
log.Warn("creating missing post record")
795
+
ix.log.Warn("creating missing post record")
795
796
ai, err := ix.GetUserOrMissing(ctx, puri.Did)
796
797
if err != nil {
797
798
return nil, err
···
813
814
if post.Reply != nil {
814
815
replyto, err := ix.GetPost(ctx, post.Reply.Parent.Uri)
815
816
if err != nil {
816
-
log.Error("probably shouldn't error when processing a reply to a not-found post")
817
+
ix.log.Error("probably shouldn't error when processing a reply to a not-found post")
817
818
return err
818
819
}
819
820
+5
-1
indexer/keymgr.go
+5
-1
indexer/keymgr.go
···
3
3
import (
4
4
"context"
5
5
"fmt"
6
+
"log/slog"
6
7
7
8
did "github.com/whyrusleeping/go-did"
8
9
"go.opentelemetry.io/otel"
···
12
13
didr DidResolver
13
14
14
15
signingKey *did.PrivKey
16
+
17
+
log *slog.Logger
15
18
}
16
19
17
20
type DidResolver interface {
···
22
25
return &KeyManager{
23
26
didr: didr,
24
27
signingKey: k,
28
+
log: slog.Default().With("system", "indexer"),
25
29
}
26
30
}
27
31
···
36
40
37
41
err = k.Verify(msg, sig)
38
42
if err != nil {
39
-
log.Warnw("signature failed to verify", "err", err, "did", did, "pubKey", k, "sigBytes", sig, "msgBytes", msg)
43
+
km.log.Warn("signature failed to verify", "err", err, "did", did, "pubKey", k, "sigBytes", sig, "msgBytes", msg)
40
44
}
41
45
return err
42
46
}
+7
-3
indexer/repofetch.go
+7
-3
indexer/repofetch.go
···
7
7
"fmt"
8
8
"io"
9
9
"io/fs"
10
+
"log/slog"
10
11
"sync"
11
12
12
13
"github.com/bluesky-social/indigo/api/atproto"
···
27
28
Limiters: make(map[uint]*rate.Limiter),
28
29
ApplyPDSClientSettings: func(*xrpc.Client) {},
29
30
MaxConcurrency: maxConcurrency,
31
+
log: slog.Default().With("system", "indexer"),
30
32
}
31
33
}
32
34
···
40
42
MaxConcurrency int
41
43
42
44
ApplyPDSClientSettings func(*xrpc.Client)
45
+
46
+
log *slog.Logger
43
47
}
44
48
45
49
func (rf *RepoFetcher) GetLimiter(pdsID uint) *rate.Limiter {
···
84
88
// Wait to prevent DOSing the PDS when connecting to a new stream with lots of active repos
85
89
limiter.Wait(ctx)
86
90
87
-
log.Debugw("SyncGetRepo", "did", did, "since", rev)
91
+
rf.log.Debug("SyncGetRepo", "did", did, "since", rev)
88
92
// TODO: max size on these? A malicious PDS could just send us a petabyte sized repo here and kill us
89
93
repo, err := atproto.SyncGetRepo(ctx, c, did, rev)
90
94
if err != nil {
···
125
129
for i, j := range job.catchup {
126
130
catchupEventsProcessed.Inc()
127
131
if err := rf.repoman.HandleExternalUserEvent(ctx, pds.ID, ai.Uid, ai.Did, j.evt.Since, j.evt.Rev, j.evt.Blocks, j.evt.Ops); err != nil {
128
-
log.Errorw("buffered event catchup failed", "error", err, "did", ai.Did, "i", i, "jobCount", len(job.catchup), "seq", j.evt.Seq)
132
+
rf.log.Error("buffered event catchup failed", "error", err, "did", ai.Did, "i", i, "jobCount", len(job.catchup), "seq", j.evt.Seq)
129
133
resync = true // fall back to a repo sync
130
134
break
131
135
}
···
153
157
span.RecordError(err)
154
158
155
159
if ipld.IsNotFound(err) || errors.Is(err, io.EOF) || errors.Is(err, fs.ErrNotExist) {
156
-
log.Errorw("partial repo fetch was missing data", "did", ai.Did, "pds", pds.Host, "rev", rev)
160
+
rf.log.Error("partial repo fetch was missing data", "did", ai.Did, "pds", pds.Host, "rev", rev)
157
161
repo, err := rf.fetchRepo(ctx, c, &pds, ai.Did, "")
158
162
if err != nil {
159
163
return err
+6
-2
pds/feedgen.go
+6
-2
pds/feedgen.go
···
3
3
import (
4
4
"context"
5
5
"fmt"
6
+
"log/slog"
6
7
"sort"
7
8
"strings"
8
9
"time"
···
22
23
ix *indexer.Indexer
23
24
24
25
readRecord ReadRecordFunc
26
+
27
+
log *slog.Logger
25
28
}
26
29
27
-
func NewFeedGenerator(db *gorm.DB, ix *indexer.Indexer, readRecord ReadRecordFunc) (*FeedGenerator, error) {
30
+
func NewFeedGenerator(db *gorm.DB, ix *indexer.Indexer, readRecord ReadRecordFunc, log *slog.Logger) (*FeedGenerator, error) {
28
31
return &FeedGenerator{
29
32
db: db,
30
33
ix: ix,
31
34
readRecord: readRecord,
35
+
log: log,
32
36
}, nil
33
37
}
34
38
···
355
359
356
360
func (fg *FeedGenerator) GetVotes(ctx context.Context, uri string, pcid cid.Cid, limit int, before string) ([]*HydratedVote, error) {
357
361
if before != "" {
358
-
log.Warn("not respecting 'before' yet")
362
+
fg.log.Warn("not respecting 'before' yet")
359
363
}
360
364
361
365
p, err := fg.ix.GetPost(ctx, uri)
+9
-7
pds/server.go
+9
-7
pds/server.go
···
5
5
"database/sql"
6
6
"errors"
7
7
"fmt"
8
+
"log/slog"
8
9
"net"
9
10
"net/http"
10
11
"net/mail"
···
30
31
gojwt "github.com/golang-jwt/jwt"
31
32
"github.com/gorilla/websocket"
32
33
"github.com/ipfs/go-cid"
33
-
logging "github.com/ipfs/go-log"
34
34
"github.com/labstack/echo/v4"
35
35
"github.com/labstack/echo/v4/middleware"
36
36
"github.com/lestrrat-go/jwx/v2/jwt"
37
37
"github.com/whyrusleeping/go-did"
38
38
"gorm.io/gorm"
39
39
)
40
-
41
-
var log = logging.Logger("pds")
42
40
43
41
type Server struct {
44
42
db *gorm.DB
···
57
55
serviceUrl string
58
56
59
57
plc plc.PLCClient
58
+
59
+
log *slog.Logger
60
60
}
61
61
62
62
// serverListenerBootTimeout is how long to wait for the requested server socket
···
97
97
serviceUrl: serviceUrl,
98
98
jwtSigningKey: jwtkey,
99
99
enforcePeering: false,
100
+
101
+
log: slog.Default().With("system", "pds"),
100
102
}
101
103
102
104
repoman.SetEventHandler(func(ctx context.Context, evt *repomgr.RepoEvent) {
103
105
if err := ix.HandleRepoEvent(ctx, evt); err != nil {
104
-
log.Errorw("handle repo event failed", "user", evt.User, "err", err)
106
+
s.log.Error("handle repo event failed", "user", evt.User, "err", err)
105
107
}
106
108
}, true)
107
109
108
110
//ix.SendRemoteFollow = s.sendRemoteFollow
109
111
ix.CreateExternalUser = s.createExternalUser
110
112
111
-
feedgen, err := NewFeedGenerator(db, ix, s.readRecordFunc)
113
+
feedgen, err := NewFeedGenerator(db, ix, s.readRecordFunc, s.log)
112
114
if err != nil {
113
115
return nil, err
114
116
}
···
434
436
435
437
func (s *Server) HandleHealthCheck(c echo.Context) error {
436
438
if err := s.db.Exec("SELECT 1").Error; err != nil {
437
-
log.Errorf("healthcheck can't connect to database: %v", err)
439
+
s.log.Error("healthcheck can't connect to database", "err", err)
438
440
return c.JSON(500, HealthStatus{Status: "error", Message: "can't connect to database"})
439
441
} else {
440
442
return c.JSON(200, HealthStatus{Status: "ok"})
···
726
728
func (s *Server) UpdateUserHandle(ctx context.Context, u *User, handle string) error {
727
729
if u.Handle == handle {
728
730
// no change? move on
729
-
log.Warnw("attempted to change handle to current handle", "did", u.Did, "handle", handle)
731
+
s.log.Warn("attempted to change handle to current handle", "did", u.Did, "handle", handle)
730
732
return nil
731
733
}
732
734
+13
-12
repomgr/repomgr.go
+13
-12
repomgr/repomgr.go
···
6
6
"errors"
7
7
"fmt"
8
8
"io"
9
+
"log/slog"
9
10
"strings"
10
11
"sync"
11
12
"time"
···
24
25
"github.com/ipfs/go-datastore"
25
26
blockstore "github.com/ipfs/go-ipfs-blockstore"
26
27
ipld "github.com/ipfs/go-ipld-format"
27
-
logging "github.com/ipfs/go-log/v2"
28
28
"github.com/ipld/go-car"
29
29
cbg "github.com/whyrusleeping/cbor-gen"
30
30
"go.opentelemetry.io/otel"
31
31
"go.opentelemetry.io/otel/attribute"
32
32
"gorm.io/gorm"
33
33
)
34
-
35
-
var log = logging.Logger("repomgr")
36
34
37
35
func NewRepoManager(cs carstore.CarStore, kmgr KeyManager) *RepoManager {
38
36
···
40
38
cs: cs,
41
39
userLocks: make(map[models.Uid]*userLock),
42
40
kmgr: kmgr,
41
+
log: slog.Default().With("system", "repomgr"),
43
42
}
44
43
}
45
44
···
62
61
63
62
events func(context.Context, *RepoEvent)
64
63
hydrateRecords bool
64
+
65
+
log *slog.Logger
65
66
}
66
67
67
68
type ActorInfo struct {
···
534
535
535
536
span.SetAttributes(attribute.Int64("uid", int64(uid)))
536
537
537
-
log.Debugw("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev)
538
+
rm.log.Debug("HandleExternalUserEvent", "pds", pdsid, "uid", uid, "since", since, "nrev", nrev)
538
539
539
540
unlock := rm.lockUser(ctx, uid)
540
541
defer unlock()
···
835
836
ops := make([]RepoOp, 0, len(diffops))
836
837
for _, op := range diffops {
837
838
repoOpsImported.Inc()
838
-
out, err := processOp(ctx, bs, op, rm.hydrateRecords)
839
+
out, err := rm.processOp(ctx, bs, op, rm.hydrateRecords)
839
840
if err != nil {
840
-
log.Errorw("failed to process repo op", "err", err, "path", op.Rpath, "repo", repoDid)
841
+
rm.log.Error("failed to process repo op", "err", err, "path", op.Rpath, "repo", repoDid)
841
842
}
842
843
843
844
if out != nil {
···
871
872
return nil
872
873
}
873
874
874
-
func processOp(ctx context.Context, bs blockstore.Blockstore, op *mst.DiffOp, hydrateRecords bool) (*RepoOp, error) {
875
+
func (rm *RepoManager) processOp(ctx context.Context, bs blockstore.Blockstore, op *mst.DiffOp, hydrateRecords bool) (*RepoOp, error) {
875
876
parts := strings.SplitN(op.Rpath, "/", 2)
876
877
if len(parts) != 2 {
877
878
return nil, fmt.Errorf("repo mst had invalid rpath: %q", op.Rpath)
···
904
905
return nil, err
905
906
}
906
907
907
-
log.Warnf("failed processing repo diff: %s", err)
908
+
rm.log.Warn("failed processing repo diff", "err", err)
908
909
} else {
909
910
outop.Record = rec
910
911
}
···
960
961
// the repos lifecycle, this will end up erroneously not including
961
962
// them. We should compute the set of blocks needed to read any repo
962
963
// ops that happened in the commit and use that for our 'output' blocks
963
-
cids, err := walkTree(ctx, seen, root, membs, true)
964
+
cids, err := rm.walkTree(ctx, seen, root, membs, true)
964
965
if err != nil {
965
966
return fmt.Errorf("walkTree: %w", err)
966
967
}
···
1001
1002
1002
1003
// walkTree returns all cids linked recursively by the root, skipping any cids
1003
1004
// in the 'skip' map, and not erroring on 'not found' if prevMissing is set
1004
-
func walkTree(ctx context.Context, skip map[cid.Cid]bool, root cid.Cid, bs blockstore.Blockstore, prevMissing bool) ([]cid.Cid, error) {
1005
+
func (rm *RepoManager) walkTree(ctx context.Context, skip map[cid.Cid]bool, root cid.Cid, bs blockstore.Blockstore, prevMissing bool) ([]cid.Cid, error) {
1005
1006
// TODO: what if someone puts non-cbor links in their repo?
1006
1007
if root.Prefix().Codec != cid.DagCBOR {
1007
1008
return nil, fmt.Errorf("can only handle dag-cbor objects in repos (%s is %d)", root, root.Prefix().Codec)
···
1015
1016
var links []cid.Cid
1016
1017
if err := cbg.ScanForLinks(bytes.NewReader(blk.RawData()), func(c cid.Cid) {
1017
1018
if c.Prefix().Codec == cid.Raw {
1018
-
log.Debugw("skipping 'raw' CID in record", "recordCid", root, "rawCid", c)
1019
+
rm.log.Debug("skipping 'raw' CID in record", "recordCid", root, "rawCid", c)
1019
1020
return
1020
1021
}
1021
1022
if skip[c] {
···
1035
1036
1036
1037
// TODO: should do this non-recursive since i expect these may get deep
1037
1038
for _, c := range links {
1038
-
sub, err := walkTree(ctx, skip, c, bs, prevMissing)
1039
+
sub, err := rm.walkTree(ctx, skip, c, bs, prevMissing)
1039
1040
if err != nil {
1040
1041
if prevMissing && !ipld.IsNotFound(err) {
1041
1042
return nil, err
+1
search/firehose.go
+1
search/firehose.go
+25
-21
splitter/splitter.go
+25
-21
splitter/splitter.go
···
5
5
"errors"
6
6
"fmt"
7
7
"io"
8
+
"log/slog"
8
9
"math/rand"
9
10
"net"
10
11
"net/http"
···
18
19
events "github.com/bluesky-social/indigo/events"
19
20
"github.com/bluesky-social/indigo/events/schedulers/sequential"
20
21
"github.com/gorilla/websocket"
21
-
logging "github.com/ipfs/go-log"
22
22
"github.com/labstack/echo/v4"
23
23
"github.com/labstack/echo/v4/middleware"
24
24
promclient "github.com/prometheus/client_golang/prometheus"
···
26
26
dto "github.com/prometheus/client_model/go"
27
27
)
28
28
29
-
var log = logging.Logger("splitter")
30
-
31
29
type Splitter struct {
32
30
erb *EventRingBuffer
33
31
pp *events.PebblePersist
···
39
37
consumers map[uint64]*SocketConsumer
40
38
41
39
conf SplitterConfig
40
+
41
+
log *slog.Logger
42
42
}
43
43
44
44
type SplitterConfig struct {
···
61
61
erb: erb,
62
62
events: em,
63
63
consumers: make(map[uint64]*SocketConsumer),
64
+
log: slog.Default().With("system", "splitter"),
64
65
}
65
66
}
66
67
func NewSplitter(conf SplitterConfig) (*Splitter, error) {
···
74
75
erb: erb,
75
76
events: em,
76
77
consumers: make(map[uint64]*SocketConsumer),
78
+
log: slog.Default().With("system", "splitter"),
77
79
}, nil
78
80
} else {
79
81
pp, err := events.NewPebblePersistance(conf.PebbleOptions)
···
88
90
pp: pp,
89
91
events: em,
90
92
consumers: make(map[uint64]*SocketConsumer),
93
+
log: slog.Default().With("system", "splitter"),
91
94
}, nil
92
95
}
93
96
}
···
115
118
pp: pp,
116
119
events: em,
117
120
consumers: make(map[uint64]*SocketConsumer),
121
+
log: slog.Default().With("system", "splitter"),
118
122
}, nil
119
123
}
120
124
···
173
177
if err2 := ctx.JSON(err.Code, map[string]any{
174
178
"error": err.Message,
175
179
}); err2 != nil {
176
-
log.Errorf("Failed to write http error: %s", err2)
180
+
s.log.Error("Failed to write http error", "err", err2)
177
181
}
178
182
default:
179
183
sendHeader := true
···
181
185
sendHeader = false
182
186
}
183
187
184
-
log.Warnf("HANDLER ERROR: (%s) %s", ctx.Path(), err)
188
+
s.log.Warn("HANDLER ERROR", "path", ctx.Path(), "err", err)
185
189
186
190
if strings.HasPrefix(ctx.Path(), "/admin/") {
187
191
ctx.JSON(500, map[string]any{
···
275
279
}
276
280
277
281
if err := conn.WriteControl(websocket.PingMessage, []byte{}, time.Now().Add(5*time.Second)); err != nil {
278
-
log.Errorf("failed to ping client: %s", err)
282
+
s.log.Error("failed to ping client", "err", err)
279
283
cancel()
280
284
return
281
285
}
···
300
304
for {
301
305
_, _, err := conn.ReadMessage()
302
306
if err != nil {
303
-
log.Errorf("failed to read message from client: %s", err)
307
+
s.log.Error("failed to read message from client", "err", err)
304
308
cancel()
305
309
return
306
310
}
···
327
331
consumerID := s.registerConsumer(&consumer)
328
332
defer s.cleanupConsumer(consumerID)
329
333
330
-
log.Infow("new consumer",
334
+
s.log.Info("new consumer",
331
335
"remote_addr", consumer.RemoteAddr,
332
336
"user_agent", consumer.UserAgent,
333
337
"cursor", since,
···
340
344
select {
341
345
case evt, ok := <-evts:
342
346
if !ok {
343
-
log.Error("event stream closed unexpectedly")
347
+
s.log.Error("event stream closed unexpectedly")
344
348
return nil
345
349
}
346
350
347
351
wc, err := conn.NextWriter(websocket.BinaryMessage)
348
352
if err != nil {
349
-
log.Errorf("failed to get next writer: %s", err)
353
+
s.log.Error("failed to get next writer", "err", err)
350
354
return err
351
355
}
352
356
···
360
364
}
361
365
362
366
if err := wc.Close(); err != nil {
363
-
log.Warnf("failed to flush-close our event write: %s", err)
367
+
s.log.Warn("failed to flush-close our event write", "err", err)
364
368
return nil
365
369
}
366
370
···
401
405
402
406
var m = &dto.Metric{}
403
407
if err := c.EventsSent.Write(m); err != nil {
404
-
log.Errorf("failed to get sent counter: %s", err)
408
+
s.log.Error("failed to get sent counter", "err", err)
405
409
}
406
410
407
-
log.Infow("consumer disconnected",
411
+
s.log.Info("consumer disconnected",
408
412
"consumer_id", id,
409
413
"remote_addr", c.RemoteAddr,
410
414
"user_agent", c.UserAgent,
···
450
454
}
451
455
con, res, err := d.DialContext(ctx, url, header)
452
456
if err != nil {
453
-
log.Warnw("dialing failed", "host", host, "err", err, "backoff", backoff)
457
+
s.log.Warn("dialing failed", "host", host, "err", err, "backoff", backoff)
454
458
time.Sleep(sleepForBackoff(backoff))
455
459
backoff++
456
460
457
461
continue
458
462
}
459
463
460
-
log.Info("event subscription response code: ", res.StatusCode)
464
+
s.log.Info("event subscription response", "code", res.StatusCode)
461
465
462
466
if err := s.handleConnection(ctx, host, con, &cursor); err != nil {
463
-
log.Warnf("connection to %q failed: %s", host, err)
467
+
s.log.Warn("connection failed", "host", host, "err", err)
464
468
}
465
469
}
466
470
}
···
483
487
if seq%5000 == 0 {
484
488
// TODO: don't need this after we move to getting seq from pebble
485
489
if err := s.writeCursor(seq); err != nil {
486
-
log.Errorf("write cursor failed: %s", err)
490
+
s.log.Error("write cursor failed", "err", err)
487
491
}
488
492
}
489
493
···
491
495
return nil
492
496
})
493
497
494
-
return events.HandleRepoStream(ctx, con, sched)
498
+
return events.HandleRepoStream(ctx, con, sched, nil)
495
499
}
496
500
497
501
func (s *Splitter) getLastCursor() (int64, error) {
498
502
if s.pp != nil {
499
503
seq, millis, _, err := s.pp.GetLast(context.Background())
500
504
if err == nil {
501
-
log.Debugw("got last cursor from pebble", "seq", seq, "millis", millis)
505
+
s.log.Debug("got last cursor from pebble", "seq", seq, "millis", millis)
502
506
return seq, nil
503
507
} else if errors.Is(err, events.ErrNoLast) {
504
-
log.Info("pebble no last")
508
+
s.log.Info("pebble no last")
505
509
} else {
506
-
log.Errorw("pebble seq fail", "err", err)
510
+
s.log.Error("pebble seq fail", "err", err)
507
511
}
508
512
}
509
513
-5
testing/integ_test.go
-5
testing/integ_test.go
···
15
15
"github.com/bluesky-social/indigo/repo"
16
16
"github.com/bluesky-social/indigo/xrpc"
17
17
"github.com/ipfs/go-cid"
18
-
"github.com/ipfs/go-log/v2"
19
18
car "github.com/ipld/go-car"
20
19
"github.com/stretchr/testify/assert"
21
20
)
22
-
23
-
func init() {
24
-
log.SetAllLoggers(log.LevelInfo)
25
-
}
26
21
27
22
func TestRelayBasic(t *testing.T) {
28
23
if testing.Short() {
+1
-1
testing/utils.go
+1
-1
testing/utils.go
···
691
691
},
692
692
}
693
693
seqScheduler := sequential.NewScheduler("test", rsc.EventHandler)
694
-
if err := events.HandleRepoStream(ctx, con, seqScheduler); err != nil {
694
+
if err := events.HandleRepoStream(ctx, con, seqScheduler, nil); err != nil {
695
695
fmt.Println(err)
696
696
}
697
697
}()