A locally focused bluesky appview
26
fork

Configure Feed

Select the types of activity you want to include in your feed.

at 1e3d98090561ca9688fe368c390c10ea8d3410a3 398 lines 9.3 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "fmt" 7 "log" 8 "log/slog" 9 "net/http" 10 _ "net/http/pprof" 11 "net/url" 12 "os" 13 "runtime" 14 "strings" 15 "sync" 16 "time" 17 18 "github.com/bluesky-social/indigo/api/atproto" 19 "github.com/bluesky-social/indigo/atproto/identity" 20 "github.com/bluesky-social/indigo/atproto/syntax" 21 "github.com/bluesky-social/indigo/cmd/relay/stream" 22 "github.com/bluesky-social/indigo/cmd/relay/stream/schedulers/parallel" 23 "github.com/bluesky-social/indigo/repo" 24 "github.com/bluesky-social/indigo/util/cliutil" 25 xrpclib "github.com/bluesky-social/indigo/xrpc" 26 "github.com/gorilla/websocket" 27 lru "github.com/hashicorp/golang-lru/v2" 28 "github.com/ipfs/go-cid" 29 "github.com/jackc/pgx/v5/pgxpool" 30 "github.com/prometheus/client_golang/prometheus" 31 "github.com/prometheus/client_golang/prometheus/promauto" 32 "github.com/urfave/cli/v2" 33 "github.com/whyrusleeping/konbini/xrpc" 34 "gorm.io/gorm/logger" 35 36 . "github.com/whyrusleeping/konbini/models" 37) 38 39var handleOpHist = promauto.NewHistogramVec(prometheus.HistogramOpts{ 40 Name: "handle_op_duration", 41 Help: "A histogram of op handling durations", 42 Buckets: prometheus.ExponentialBuckets(1, 2, 15), 43}, []string{"op", "collection"}) 44 45var firehoseCursorGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ 46 Name: "firehose_cursor", 47}, []string{"stage"}) 48 49func main() { 50 app := cli.App{ 51 Name: "konbini", 52 } 53 54 app.Flags = []cli.Flag{ 55 &cli.StringFlag{ 56 Name: "db-url", 57 EnvVars: []string{"DATABASE_URL"}, 58 }, 59 &cli.StringFlag{ 60 Name: "handle", 61 }, 62 &cli.IntFlag{ 63 Name: "max-db-connections", 64 Value: runtime.NumCPU(), 65 }, 66 } 67 app.Action = func(cctx *cli.Context) error { 68 db, err := cliutil.SetupDatabase(cctx.String("db-url"), cctx.Int("max-db-connections")) 69 if err != nil { 70 return err 71 } 72 73 db.Logger = logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{ 74 SlowThreshold: 500 * time.Millisecond, 75 LogLevel: logger.Warn, 76 IgnoreRecordNotFoundError: false, 77 Colorful: true, 78 }) 79 80 db.AutoMigrate(Repo{}) 81 db.AutoMigrate(Post{}) 82 db.AutoMigrate(Follow{}) 83 db.AutoMigrate(Block{}) 84 db.AutoMigrate(Like{}) 85 db.AutoMigrate(Repost{}) 86 db.AutoMigrate(List{}) 87 db.AutoMigrate(ListItem{}) 88 db.AutoMigrate(ListBlock{}) 89 db.AutoMigrate(Profile{}) 90 db.AutoMigrate(ThreadGate{}) 91 db.AutoMigrate(FeedGenerator{}) 92 db.AutoMigrate(Image{}) 93 db.AutoMigrate(PostGate{}) 94 db.AutoMigrate(StarterPack{}) 95 db.AutoMigrate(SyncInfo{}) 96 db.AutoMigrate(Notification{}) 97 db.AutoMigrate(SequenceTracker{}) 98 99 ctx := context.TODO() 100 101 rc, _ := lru.New2Q[string, *Repo](1_000_000) 102 pc, _ := lru.New2Q[string, cachedPostInfo](1_000_000) 103 revc, _ := lru.New2Q[uint, string](1_000_000) 104 105 cfg, err := pgxpool.ParseConfig(cctx.String("db-url")) 106 if err != nil { 107 return err 108 } 109 110 if cfg.MaxConns < 8 { 111 cfg.MaxConns = 8 112 } 113 114 pool, err := pgxpool.NewWithConfig(context.TODO(), cfg) 115 if err != nil { 116 return err 117 } 118 119 if err := pool.Ping(context.TODO()); err != nil { 120 return err 121 } 122 123 handle := os.Getenv("BSKY_HANDLE") 124 password := os.Getenv("BSKY_PASSWORD") 125 126 dir := identity.DefaultDirectory() 127 128 resp, err := dir.LookupHandle(ctx, syntax.Handle(handle)) 129 if err != nil { 130 return err 131 } 132 mydid := resp.DID.String() 133 134 cc := &xrpclib.Client{ 135 Host: resp.PDSEndpoint(), 136 } 137 138 nsess, err := atproto.ServerCreateSession(ctx, cc, &atproto.ServerCreateSession_Input{ 139 Identifier: handle, 140 Password: password, 141 }) 142 if err != nil { 143 return err 144 } 145 146 cc.Auth = &xrpclib.AuthInfo{ 147 AccessJwt: nsess.AccessJwt, 148 Did: mydid, 149 Handle: nsess.Handle, 150 RefreshJwt: nsess.RefreshJwt, 151 } 152 153 s := &Server{ 154 mydid: mydid, 155 client: cc, 156 dir: dir, 157 158 missingRecords: make(chan MissingRecord, 1024), 159 } 160 fmt.Println("MY DID: ", s.mydid) 161 162 pgb := &PostgresBackend{ 163 relevantDids: make(map[string]bool), 164 s: s, 165 db: db, 166 postInfoCache: pc, 167 repoCache: rc, 168 revCache: revc, 169 pgx: pool, 170 } 171 s.backend = pgb 172 173 myrepo, err := s.backend.getOrCreateRepo(ctx, mydid) 174 if err != nil { 175 return fmt.Errorf("failed to get repo record for our own did: %w", err) 176 } 177 s.myrepo = myrepo 178 179 if err := s.backend.loadRelevantDids(); err != nil { 180 return fmt.Errorf("failed to load relevant dids set: %w", err) 181 } 182 183 // Start custom API server (for the custom frontend) 184 go func() { 185 if err := s.runApiServer(); err != nil { 186 fmt.Println("failed to start api server: ", err) 187 } 188 }() 189 190 // Start XRPC server (for official Bluesky app compatibility) 191 go func() { 192 xrpcServer := xrpc.NewServer(db, dir, pgb) 193 if err := xrpcServer.Start(":4446"); err != nil { 194 fmt.Println("failed to start XRPC server: ", err) 195 } 196 }() 197 198 // Start pprof server 199 go func() { 200 http.ListenAndServe(":4445", nil) 201 }() 202 203 go s.missingRecordFetcher() 204 205 seqno, err := loadLastSeq(db, "firehose_seq") 206 if err != nil { 207 fmt.Println("failed to load sequence number, starting over", err) 208 } 209 210 return s.startLiveTail(ctx, int(seqno), 10, 20) 211 } 212 213 app.RunAndExitOnError() 214} 215 216type Server struct { 217 backend *PostgresBackend 218 219 dir identity.Directory 220 221 client *xrpclib.Client 222 mydid string 223 myrepo *Repo 224 225 seqLk sync.Mutex 226 lastSeq int64 227 228 mpLk sync.Mutex 229 missingRecords chan MissingRecord 230} 231 232func (s *Server) getXrpcClient() (*xrpclib.Client, error) { 233 // TODO: handle refreshing the token periodically 234 return s.client, nil 235} 236 237func (s *Server) startLiveTail(ctx context.Context, curs int, parWorkers, maxQ int) error { 238 slog.Info("starting live tail") 239 240 // Connect to the Relay websocket 241 urlStr := fmt.Sprintf("wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos?cursor=%d", curs) 242 243 d := websocket.DefaultDialer 244 con, _, err := d.Dial(urlStr, http.Header{ 245 "User-Agent": []string{"market/0.0.1"}, 246 }) 247 if err != nil { 248 return fmt.Errorf("failed to connect to relay: %w", err) 249 } 250 251 var lelk sync.Mutex 252 lastEvent := time.Now() 253 254 go func() { 255 for range time.Tick(time.Second) { 256 lelk.Lock() 257 let := lastEvent 258 lelk.Unlock() 259 260 if time.Since(let) > time.Second*30 { 261 slog.Error("firehose connection timed out") 262 con.Close() 263 return 264 } 265 266 } 267 268 }() 269 270 var cclk sync.Mutex 271 var completeCursor int64 272 273 rsc := &stream.RepoStreamCallbacks{ 274 RepoCommit: func(evt *atproto.SyncSubscribeRepos_Commit) error { 275 ctx := context.Background() 276 277 firehoseCursorGauge.WithLabelValues("ingest").Set(float64(evt.Seq)) 278 279 s.seqLk.Lock() 280 if evt.Seq > s.lastSeq { 281 curs = int(evt.Seq) 282 s.lastSeq = evt.Seq 283 284 if evt.Seq%1000 == 0 { 285 if err := storeLastSeq(s.backend.db, "firehose_seq", evt.Seq); err != nil { 286 fmt.Println("failed to store seqno: ", err) 287 } 288 } 289 } 290 s.seqLk.Unlock() 291 292 lelk.Lock() 293 lastEvent = time.Now() 294 lelk.Unlock() 295 296 if err := s.backend.HandleEvent(ctx, evt); err != nil { 297 return fmt.Errorf("handle event (%s,%d): %w", evt.Repo, evt.Seq, err) 298 } 299 300 cclk.Lock() 301 if evt.Seq > completeCursor { 302 completeCursor = evt.Seq 303 firehoseCursorGauge.WithLabelValues("complete").Set(float64(evt.Seq)) 304 } 305 cclk.Unlock() 306 307 return nil 308 }, 309 RepoInfo: func(info *atproto.SyncSubscribeRepos_Info) error { 310 return nil 311 }, 312 // TODO: all the other event types 313 Error: func(errf *stream.ErrorFrame) error { 314 return fmt.Errorf("error frame: %s: %s", errf.Error, errf.Message) 315 }, 316 } 317 318 sched := parallel.NewScheduler(parWorkers, maxQ, con.RemoteAddr().String(), rsc.EventHandler) 319 320 //s.eventScheduler = sched 321 //s.streamFinished = make(chan struct{}) 322 323 return stream.HandleRepoStream(ctx, con, sched, slog.Default()) 324} 325 326func (s *Server) resolveAccountIdent(ctx context.Context, acc string) (string, error) { 327 unesc, err := url.PathUnescape(acc) 328 if err != nil { 329 return "", err 330 } 331 332 acc = unesc 333 if strings.HasPrefix(acc, "did:") { 334 return acc, nil 335 } 336 337 resp, err := s.dir.LookupHandle(ctx, syntax.Handle(acc)) 338 if err != nil { 339 return "", err 340 } 341 342 return resp.DID.String(), nil 343} 344 345const ( 346 NotifKindReply = "reply" 347 NotifKindLike = "like" 348 NotifKindMention = "mention" 349 NotifKindRepost = "repost" 350) 351 352func (s *Server) AddNotification(ctx context.Context, forUser, author uint, recordUri string, recordCid cid.Cid, kind string) error { 353 return s.backend.db.Create(&Notification{ 354 For: forUser, 355 Author: author, 356 Source: recordUri, 357 SourceCid: recordCid.String(), 358 Kind: kind, 359 }).Error 360} 361 362func (s *Server) rescanRepo(ctx context.Context, did string) error { 363 resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 364 if err != nil { 365 return err 366 } 367 368 s.backend.addRelevantDid(did) 369 370 c := &xrpclib.Client{ 371 Host: resp.PDSEndpoint(), 372 } 373 374 repob, err := atproto.SyncGetRepo(ctx, c, did, "") 375 if err != nil { 376 return err 377 } 378 379 rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repob)) 380 if err != nil { 381 return err 382 } 383 384 return rep.ForEach(ctx, "", func(k string, v cid.Cid) error { 385 blk, err := rep.Blockstore().Get(ctx, v) 386 if err != nil { 387 slog.Error("record missing in repo", "path", k, "cid", v, "error", err) 388 return nil 389 } 390 391 d := blk.RawData() 392 if err := s.backend.HandleCreate(ctx, did, "", k, &d, &v); err != nil { 393 slog.Error("failed to index record", "path", k, "cid", v, "error", err) 394 } 395 return nil 396 }) 397 398}