A locally focused bluesky appview
at master 8.1 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "log" 9 "log/slog" 10 "net/http" 11 _ "net/http/pprof" 12 "net/url" 13 "os" 14 "runtime" 15 "strings" 16 "sync" 17 "time" 18 19 "github.com/bluesky-social/indigo/api/atproto" 20 "github.com/bluesky-social/indigo/atproto/identity" 21 "github.com/bluesky-social/indigo/atproto/identity/redisdir" 22 "github.com/bluesky-social/indigo/atproto/syntax" 23 "github.com/bluesky-social/indigo/repo" 24 "github.com/bluesky-social/indigo/util/cliutil" 25 xrpclib "github.com/bluesky-social/indigo/xrpc" 26 "github.com/ipfs/go-cid" 27 "github.com/jackc/pgx/v5/pgxpool" 28 "github.com/prometheus/client_golang/prometheus" 29 "github.com/prometheus/client_golang/prometheus/promauto" 30 "github.com/urfave/cli/v2" 31 "github.com/whyrusleeping/konbini/backend" 32 "github.com/whyrusleeping/konbini/xrpc" 33 "go.opentelemetry.io/otel" 34 "go.opentelemetry.io/otel/attribute" 35 "go.opentelemetry.io/otel/exporters/jaeger" 36 "go.opentelemetry.io/otel/sdk/resource" 37 tracesdk "go.opentelemetry.io/otel/sdk/trace" 38 semconv "go.opentelemetry.io/otel/semconv/v1.20.0" 39 "gorm.io/gorm" 40 "gorm.io/gorm/logger" 41 42 . "github.com/whyrusleeping/konbini/models" 43) 44 45var firehoseCursorGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{ 46 Name: "firehose_cursor", 47}, []string{"stage"}) 48 49func main() { 50 app := cli.App{ 51 Name: "konbini", 52 } 53 54 app.Flags = []cli.Flag{ 55 &cli.StringFlag{ 56 Name: "db-url", 57 EnvVars: []string{"DATABASE_URL"}, 58 }, 59 &cli.BoolFlag{ 60 Name: "jaeger", 61 }, 62 &cli.StringFlag{ 63 Name: "handle", 64 }, 65 &cli.IntFlag{ 66 Name: "max-db-connections", 67 Value: runtime.NumCPU(), 68 }, 69 &cli.StringFlag{ 70 Name: "redis-url", 71 }, 72 &cli.StringFlag{ 73 Name: "sync-config", 74 }, 75 } 76 app.Action = func(cctx *cli.Context) error { 77 db, err := cliutil.SetupDatabase(cctx.String("db-url"), cctx.Int("max-db-connections")) 78 if err != nil { 79 return err 80 } 81 82 db.Logger = logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{ 83 SlowThreshold: 500 * time.Millisecond, 84 LogLevel: logger.Warn, 85 IgnoreRecordNotFoundError: false, 86 Colorful: true, 87 }) 88 89 if cctx.Bool("jaeger") { 90 // Use Jaeger native exporter sending to port 14268 91 jaegerUrl := "http://localhost:14268/api/traces" 92 exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerUrl))) 93 if err != nil { 94 return err 95 } 96 97 env := os.Getenv("ENV") 98 if env == "" { 99 env = "development" 100 } 101 102 tp := tracesdk.NewTracerProvider( 103 // Always be sure to batch in production. 104 tracesdk.WithBatcher(exp), 105 // Record information about this application in a Resource. 106 tracesdk.WithResource(resource.NewWithAttributes( 107 semconv.SchemaURL, 108 semconv.ServiceNameKey.String("konbini"), 109 attribute.String("env", env), // DataDog 110 attribute.String("environment", env), // Others 111 attribute.Int64("ID", 1), 112 )), 113 ) 114 115 otel.SetTracerProvider(tp) 116 } 117 118 db.AutoMigrate(Repo{}) 119 db.AutoMigrate(Post{}) 120 db.AutoMigrate(Follow{}) 121 db.AutoMigrate(Block{}) 122 db.AutoMigrate(Like{}) 123 db.AutoMigrate(Repost{}) 124 db.AutoMigrate(List{}) 125 db.AutoMigrate(ListItem{}) 126 db.AutoMigrate(ListBlock{}) 127 db.AutoMigrate(Profile{}) 128 db.AutoMigrate(ThreadGate{}) 129 db.AutoMigrate(FeedGenerator{}) 130 db.AutoMigrate(Image{}) 131 db.AutoMigrate(PostGate{}) 132 db.AutoMigrate(StarterPack{}) 133 db.AutoMigrate(backend.SyncInfo{}) 134 db.AutoMigrate(Notification{}) 135 db.AutoMigrate(NotificationSeen{}) 136 db.AutoMigrate(SequenceTracker{}) 137 db.Exec("CREATE INDEX IF NOT EXISTS reposts_subject_idx ON reposts (subject)") 138 db.Exec("CREATE INDEX IF NOT EXISTS posts_reply_to_idx ON posts (reply_to)") 139 db.Exec("CREATE INDEX IF NOT EXISTS posts_in_thread_idx ON posts (in_thread)") 140 141 ctx := context.TODO() 142 143 cfg, err := pgxpool.ParseConfig(cctx.String("db-url")) 144 if err != nil { 145 return err 146 } 147 148 if cfg.MaxConns < 8 { 149 cfg.MaxConns = 8 150 } 151 152 pool, err := pgxpool.NewWithConfig(context.TODO(), cfg) 153 if err != nil { 154 return err 155 } 156 157 if err := pool.Ping(context.TODO()); err != nil { 158 return err 159 } 160 161 handle := os.Getenv("BSKY_HANDLE") 162 password := os.Getenv("BSKY_PASSWORD") 163 164 dir := identity.DefaultDirectory() 165 166 if redisURL := cctx.String("redis-url"); redisURL != "" { 167 rdir, err := redisdir.NewRedisDirectory(dir, redisURL, time.Minute, time.Second*10, time.Second*10, 100_000) 168 if err != nil { 169 return err 170 } 171 dir = rdir 172 } 173 174 resp, err := dir.LookupHandle(ctx, syntax.Handle(handle)) 175 if err != nil { 176 return err 177 } 178 mydid := resp.DID.String() 179 180 cc := &xrpclib.Client{ 181 Host: resp.PDSEndpoint(), 182 } 183 184 nsess, err := atproto.ServerCreateSession(ctx, cc, &atproto.ServerCreateSession_Input{ 185 Identifier: handle, 186 Password: password, 187 }) 188 if err != nil { 189 return err 190 } 191 192 cc.Auth = &xrpclib.AuthInfo{ 193 AccessJwt: nsess.AccessJwt, 194 Did: mydid, 195 Handle: nsess.Handle, 196 RefreshJwt: nsess.RefreshJwt, 197 } 198 199 s := &Server{ 200 mydid: mydid, 201 client: cc, 202 dir: dir, 203 204 db: db, 205 } 206 207 pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, dir) 208 if err != nil { 209 return err 210 } 211 212 s.backend = pgb 213 214 myrepo, err := s.backend.GetOrCreateRepo(ctx, mydid) 215 if err != nil { 216 return fmt.Errorf("failed to get repo record for our own did: %w", err) 217 } 218 s.myrepo = myrepo 219 220 if err := s.backend.LoadRelevantDids(); err != nil { 221 return fmt.Errorf("failed to load relevant dids set: %w", err) 222 } 223 224 // Start custom API server (for the custom frontend) 225 go func() { 226 if err := s.runApiServer(); err != nil { 227 fmt.Println("failed to start api server: ", err) 228 } 229 }() 230 231 // Start XRPC server (for official Bluesky app compatibility) 232 go func() { 233 xrpcServer := xrpc.NewServer(db, dir, pgb) 234 if err := xrpcServer.Start(":4446"); err != nil { 235 fmt.Println("failed to start XRPC server: ", err) 236 } 237 }() 238 239 // Start pprof server 240 go func() { 241 http.ListenAndServe(":4445", nil) 242 }() 243 244 sc := SyncConfig{ 245 Backends: []SyncBackend{ 246 { 247 Type: "firehose", 248 Host: "bsky.network", 249 }, 250 }, 251 } 252 253 if scfn := cctx.String("sync-config"); scfn != "" { 254 { 255 scfi, err := os.Open(scfn) 256 if err != nil { 257 return err 258 } 259 defer scfi.Close() 260 261 var lsc SyncConfig 262 if err := json.NewDecoder(scfi).Decode(&lsc); err != nil { 263 return err 264 } 265 sc = lsc 266 } 267 } 268 269 /* 270 sc.Backends[0] = SyncBackend{ 271 Type: "jetstream", 272 Host: "jetstream1.us-west.bsky.network", 273 } 274 */ 275 276 return s.StartSyncEngine(ctx, &sc) 277 278 } 279 280 app.RunAndExitOnError() 281} 282 283type Server struct { 284 backend *backend.PostgresBackend 285 286 dir identity.Directory 287 288 client *xrpclib.Client 289 mydid string 290 myrepo *Repo 291 292 seqLk sync.Mutex 293 lastSeq int64 294 295 mpLk sync.Mutex 296 297 db *gorm.DB 298} 299 300func (s *Server) getXrpcClient() (*xrpclib.Client, error) { 301 // TODO: handle refreshing the token periodically 302 return s.client, nil 303} 304 305func (s *Server) resolveAccountIdent(ctx context.Context, acc string) (string, error) { 306 unesc, err := url.PathUnescape(acc) 307 if err != nil { 308 return "", err 309 } 310 311 acc = unesc 312 if strings.HasPrefix(acc, "did:") { 313 return acc, nil 314 } 315 316 resp, err := s.dir.LookupHandle(ctx, syntax.Handle(acc)) 317 if err != nil { 318 return "", err 319 } 320 321 return resp.DID.String(), nil 322} 323 324func (s *Server) rescanRepo(ctx context.Context, did string) error { 325 resp, err := s.dir.LookupDID(ctx, syntax.DID(did)) 326 if err != nil { 327 return err 328 } 329 330 s.backend.AddRelevantDid(did) 331 332 c := &xrpclib.Client{ 333 Host: resp.PDSEndpoint(), 334 } 335 336 repob, err := atproto.SyncGetRepo(ctx, c, did, "") 337 if err != nil { 338 return err 339 } 340 341 rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repob)) 342 if err != nil { 343 return err 344 } 345 346 return rep.ForEach(ctx, "", func(k string, v cid.Cid) error { 347 blk, err := rep.Blockstore().Get(ctx, v) 348 if err != nil { 349 slog.Error("record missing in repo", "path", k, "cid", v, "error", err) 350 return nil 351 } 352 353 d := blk.RawData() 354 if err := s.backend.HandleCreate(ctx, did, "", k, &d, &v); err != nil { 355 slog.Error("failed to index record", "path", k, "cid", v, "error", err) 356 } 357 return nil 358 }) 359 360}