A locally focused bluesky appview
1package main
2
3import (
4 "bytes"
5 "context"
6 "fmt"
7 "log"
8 "log/slog"
9 "net/http"
10 _ "net/http/pprof"
11 "net/url"
12 "os"
13 "runtime"
14 "strings"
15 "sync"
16 "time"
17
18 "github.com/bluesky-social/indigo/api/atproto"
19 "github.com/bluesky-social/indigo/atproto/identity"
20 "github.com/bluesky-social/indigo/atproto/syntax"
21 "github.com/bluesky-social/indigo/cmd/relay/stream"
22 "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel"
23 "github.com/bluesky-social/indigo/repo"
24 "github.com/bluesky-social/indigo/util/cliutil"
25 xrpclib "github.com/bluesky-social/indigo/xrpc"
26 "github.com/gorilla/websocket"
27 lru "github.com/hashicorp/golang-lru/v2"
28 "github.com/ipfs/go-cid"
29 "github.com/jackc/pgx/v5/pgxpool"
30 "github.com/prometheus/client_golang/prometheus"
31 "github.com/prometheus/client_golang/prometheus/promauto"
32 "github.com/urfave/cli/v2"
33 "github.com/whyrusleeping/konbini/xrpc"
34 "gorm.io/gorm/logger"
35
36 . "github.com/whyrusleeping/konbini/models"
37)
38
39var handleOpHist = promauto.NewHistogramVec(prometheus.HistogramOpts{
40 Name: "handle_op_duration",
41 Help: "A histogram of op handling durations",
42 Buckets: prometheus.ExponentialBuckets(1, 2, 15),
43}, []string{"op", "collection"})
44
45var firehoseCursorGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
46 Name: "firehose_cursor",
47}, []string{"stage"})
48
49func main() {
50 app := cli.App{
51 Name: "konbini",
52 }
53
54 app.Flags = []cli.Flag{
55 &cli.StringFlag{
56 Name: "db-url",
57 EnvVars: []string{"DATABASE_URL"},
58 },
59 &cli.StringFlag{
60 Name: "handle",
61 },
62 &cli.IntFlag{
63 Name: "max-db-connections",
64 Value: runtime.NumCPU(),
65 },
66 }
67 app.Action = func(cctx *cli.Context) error {
68 db, err := cliutil.SetupDatabase(cctx.String("db-url"), cctx.Int("max-db-connections"))
69 if err != nil {
70 return err
71 }
72
73 db.Logger = logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{
74 SlowThreshold: 500 * time.Millisecond,
75 LogLevel: logger.Warn,
76 IgnoreRecordNotFoundError: false,
77 Colorful: true,
78 })
79
80 db.AutoMigrate(Repo{})
81 db.AutoMigrate(Post{})
82 db.AutoMigrate(Follow{})
83 db.AutoMigrate(Block{})
84 db.AutoMigrate(Like{})
85 db.AutoMigrate(Repost{})
86 db.AutoMigrate(List{})
87 db.AutoMigrate(ListItem{})
88 db.AutoMigrate(ListBlock{})
89 db.AutoMigrate(Profile{})
90 db.AutoMigrate(ThreadGate{})
91 db.AutoMigrate(FeedGenerator{})
92 db.AutoMigrate(Image{})
93 db.AutoMigrate(PostGate{})
94 db.AutoMigrate(StarterPack{})
95 db.AutoMigrate(SyncInfo{})
96 db.AutoMigrate(Notification{})
97 db.AutoMigrate(SequenceTracker{})
98
99 ctx := context.TODO()
100
101 rc, _ := lru.New2Q[string, *Repo](1_000_000)
102 pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000)
103 revc, _ := lru.New2Q[uint, string](1_000_000)
104
105 cfg, err := pgxpool.ParseConfig(cctx.String("db-url"))
106 if err != nil {
107 return err
108 }
109
110 if cfg.MaxConns < 8 {
111 cfg.MaxConns = 8
112 }
113
114 pool, err := pgxpool.NewWithConfig(context.TODO(), cfg)
115 if err != nil {
116 return err
117 }
118
119 if err := pool.Ping(context.TODO()); err != nil {
120 return err
121 }
122
123 handle := os.Getenv("BSKY_HANDLE")
124 password := os.Getenv("BSKY_PASSWORD")
125
126 dir := identity.DefaultDirectory()
127
128 resp, err := dir.LookupHandle(ctx, syntax.Handle(handle))
129 if err != nil {
130 return err
131 }
132 mydid := resp.DID.String()
133
134 cc := &xrpclib.Client{
135 Host: resp.PDSEndpoint(),
136 }
137
138 nsess, err := atproto.ServerCreateSession(ctx, cc, &atproto.ServerCreateSession_Input{
139 Identifier: handle,
140 Password: password,
141 })
142 if err != nil {
143 return err
144 }
145
146 cc.Auth = &xrpclib.AuthInfo{
147 AccessJwt: nsess.AccessJwt,
148 Did: mydid,
149 Handle: nsess.Handle,
150 RefreshJwt: nsess.RefreshJwt,
151 }
152
153 s := &Server{
154 mydid: mydid,
155 client: cc,
156 dir: dir,
157
158 missingRecords: make(chan MissingRecord, 1024),
159 }
160 fmt.Println("MY DID: ", s.mydid)
161
162 pgb := &PostgresBackend{
163 relevantDids: make(map[string]bool),
164 s: s,
165 db: db,
166 postInfoCache: pc,
167 repoCache: rc,
168 revCache: revc,
169 pgx: pool,
170 }
171 s.backend = pgb
172
173 myrepo, err := s.backend.getOrCreateRepo(ctx, mydid)
174 if err != nil {
175 return fmt.Errorf("failed to get repo record for our own did: %w", err)
176 }
177 s.myrepo = myrepo
178
179 if err := s.backend.loadRelevantDids(); err != nil {
180 return fmt.Errorf("failed to load relevant dids set: %w", err)
181 }
182
183 // Start custom API server (for the custom frontend)
184 go func() {
185 if err := s.runApiServer(); err != nil {
186 fmt.Println("failed to start api server: ", err)
187 }
188 }()
189
190 // Start XRPC server (for official Bluesky app compatibility)
191 go func() {
192 xrpcServer := xrpc.NewServer(db, dir, pgb)
193 if err := xrpcServer.Start(":4446"); err != nil {
194 fmt.Println("failed to start XRPC server: ", err)
195 }
196 }()
197
198 // Start pprof server
199 go func() {
200 http.ListenAndServe(":4445", nil)
201 }()
202
203 go s.missingRecordFetcher()
204
205 seqno, err := loadLastSeq(db, "firehose_seq")
206 if err != nil {
207 fmt.Println("failed to load sequence number, starting over", err)
208 }
209
210 return s.startLiveTail(ctx, int(seqno), 10, 20)
211 }
212
213 app.RunAndExitOnError()
214}
215
216type Server struct {
217 backend *PostgresBackend
218
219 dir identity.Directory
220
221 client *xrpclib.Client
222 mydid string
223 myrepo *Repo
224
225 seqLk sync.Mutex
226 lastSeq int64
227
228 mpLk sync.Mutex
229 missingRecords chan MissingRecord
230}
231
232func (s *Server) getXrpcClient() (*xrpclib.Client, error) {
233 // TODO: handle refreshing the token periodically
234 return s.client, nil
235}
236
237func (s *Server) startLiveTail(ctx context.Context, curs int, parWorkers, maxQ int) error {
238 slog.Info("starting live tail")
239
240 // Connect to the Relay websocket
241 urlStr := fmt.Sprintf("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", curs)
242
243 d := websocket.DefaultDialer
244 con, _, err := d.Dial(urlStr, http.Header{
245 "User-Agent": []string{"market/0.0.1"},
246 })
247 if err != nil {
248 return fmt.Errorf("failed to connect to relay: %w", err)
249 }
250
251 var lelk sync.Mutex
252 lastEvent := time.Now()
253
254 go func() {
255 for range time.Tick(time.Second) {
256 lelk.Lock()
257 let := lastEvent
258 lelk.Unlock()
259
260 if time.Since(let) > time.Second*30 {
261 slog.Error("firehose connection timed out")
262 con.Close()
263 return
264 }
265
266 }
267
268 }()
269
270 var cclk sync.Mutex
271 var completeCursor int64
272
273 rsc := &stream.RepoStreamCallbacks{
274 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error {
275 ctx := context.Background()
276
277 firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq))
278
279 s.seqLk.Lock()
280 if evt.Seq > s.lastSeq {
281 curs = int(evt.Seq)
282 s.lastSeq = evt.Seq
283
284 if evt.Seq%1000 == 0 {
285 if err := storeLastSeq(s.backend.db, "firehose_seq", evt.Seq); err != nil {
286 fmt.Println("failed to store seqno: ", err)
287 }
288 }
289 }
290 s.seqLk.Unlock()
291
292 lelk.Lock()
293 lastEvent = time.Now()
294 lelk.Unlock()
295
296 if err := s.backend.HandleEvent(ctx, evt); err != nil {
297 return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err)
298 }
299
300 cclk.Lock()
301 if evt.Seq > completeCursor {
302 completeCursor = evt.Seq
303 firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq))
304 }
305 cclk.Unlock()
306
307 return nil
308 },
309 RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error {
310 return nil
311 },
312 // TODO: all the other event types
313 Error: func(errf *stream.ErrorFrame) error {
314 return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message)
315 },
316 }
317
318 sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler)
319
320 //s.eventScheduler = sched
321 //s.streamFinished = make(chan struct{})
322
323 return stream.HandleRepoStream(ctx, con, sched, slog.Default())
324}
325
326func (s *Server) resolveAccountIdent(ctx context.Context, acc string) (string, error) {
327 unesc, err := url.PathUnescape(acc)
328 if err != nil {
329 return "", err
330 }
331
332 acc = unesc
333 if strings.HasPrefix(acc, "did:") {
334 return acc, nil
335 }
336
337 resp, err := s.dir.LookupHandle(ctx, syntax.Handle(acc))
338 if err != nil {
339 return "", err
340 }
341
342 return resp.DID.String(), nil
343}
344
345const (
346 NotifKindReply = "reply"
347 NotifKindLike = "like"
348 NotifKindMention = "mention"
349 NotifKindRepost = "repost"
350)
351
352func (s *Server) AddNotification(ctx context.Context, forUser, author uint, recordUri string, recordCid cid.Cid, kind string) error {
353 return s.backend.db.Create(&Notification{
354 For: forUser,
355 Author: author,
356 Source: recordUri,
357 SourceCid: recordCid.String(),
358 Kind: kind,
359 }).Error
360}
361
362func (s *Server) rescanRepo(ctx context.Context, did string) error {
363 resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
364 if err != nil {
365 return err
366 }
367
368 s.backend.addRelevantDid(did)
369
370 c := &xrpclib.Client{
371 Host: resp.PDSEndpoint(),
372 }
373
374 repob, err := atproto.SyncGetRepo(ctx, c, did, "")
375 if err != nil {
376 return err
377 }
378
379 rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repob))
380 if err != nil {
381 return err
382 }
383
384 return rep.ForEach(ctx, "", func(k string, v cid.Cid) error {
385 blk, err := rep.Blockstore().Get(ctx, v)
386 if err != nil {
387 slog.Error("record missing in repo", "path", k, "cid", v, "error", err)
388 return nil
389 }
390
391 d := blk.RawData()
392 if err := s.backend.HandleCreate(ctx, did, "", k, &d, &v); err != nil {
393 slog.Error("failed to index record", "path", k, "cid", v, "error", err)
394 }
395 return nil
396 })
397
398}