A locally focused bluesky appview
1package main
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "log"
9 "log/slog"
10 "net/http"
11 _ "net/http/pprof"
12 "net/url"
13 "os"
14 "runtime"
15 "strings"
16 "sync"
17 "time"
18
19 "github.com/bluesky-social/indigo/api/atproto"
20 "github.com/bluesky-social/indigo/atproto/identity"
21 "github.com/bluesky-social/indigo/atproto/identity/redisdir"
22 "github.com/bluesky-social/indigo/atproto/syntax"
23 "github.com/bluesky-social/indigo/repo"
24 "github.com/bluesky-social/indigo/util/cliutil"
25 xrpclib "github.com/bluesky-social/indigo/xrpc"
26 "github.com/ipfs/go-cid"
27 "github.com/jackc/pgx/v5/pgxpool"
28 "github.com/prometheus/client_golang/prometheus"
29 "github.com/prometheus/client_golang/prometheus/promauto"
30 "github.com/urfave/cli/v2"
31 "github.com/whyrusleeping/konbini/backend"
32 "github.com/whyrusleeping/konbini/xrpc"
33 "go.opentelemetry.io/otel"
34 "go.opentelemetry.io/otel/attribute"
35 "go.opentelemetry.io/otel/exporters/jaeger"
36 "go.opentelemetry.io/otel/sdk/resource"
37 tracesdk "go.opentelemetry.io/otel/sdk/trace"
38 semconv "go.opentelemetry.io/otel/semconv/v1.20.0"
39 "gorm.io/gorm"
40 "gorm.io/gorm/logger"
41
42 . "github.com/whyrusleeping/konbini/models"
43)
44
45var firehoseCursorGauge = promauto.NewGaugeVec(prometheus.GaugeOpts{
46 Name: "firehose_cursor",
47}, []string{"stage"})
48
49func main() {
50 app := cli.App{
51 Name: "konbini",
52 }
53
54 app.Flags = []cli.Flag{
55 &cli.StringFlag{
56 Name: "db-url",
57 EnvVars: []string{"DATABASE_URL"},
58 },
59 &cli.BoolFlag{
60 Name: "jaeger",
61 },
62 &cli.StringFlag{
63 Name: "handle",
64 },
65 &cli.IntFlag{
66 Name: "max-db-connections",
67 Value: runtime.NumCPU(),
68 },
69 &cli.StringFlag{
70 Name: "redis-url",
71 },
72 &cli.StringFlag{
73 Name: "sync-config",
74 },
75 }
76 app.Action = func(cctx *cli.Context) error {
77 db, err := cliutil.SetupDatabase(cctx.String("db-url"), cctx.Int("max-db-connections"))
78 if err != nil {
79 return err
80 }
81
82 db.Logger = logger.New(log.New(os.Stdout, "\r\n", log.LstdFlags), logger.Config{
83 SlowThreshold: 500 * time.Millisecond,
84 LogLevel: logger.Warn,
85 IgnoreRecordNotFoundError: false,
86 Colorful: true,
87 })
88
89 if cctx.Bool("jaeger") {
90 // Use Jaeger native exporter sending to port 14268
91 jaegerUrl := "http://localhost:14268/api/traces"
92 exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(jaegerUrl)))
93 if err != nil {
94 return err
95 }
96
97 env := os.Getenv("ENV")
98 if env == "" {
99 env = "development"
100 }
101
102 tp := tracesdk.NewTracerProvider(
103 // Always be sure to batch in production.
104 tracesdk.WithBatcher(exp),
105 // Record information about this application in a Resource.
106 tracesdk.WithResource(resource.NewWithAttributes(
107 semconv.SchemaURL,
108 semconv.ServiceNameKey.String("konbini"),
109 attribute.String("env", env), // DataDog
110 attribute.String("environment", env), // Others
111 attribute.Int64("ID", 1),
112 )),
113 )
114
115 otel.SetTracerProvider(tp)
116 }
117
118 db.AutoMigrate(Repo{})
119 db.AutoMigrate(Post{})
120 db.AutoMigrate(Follow{})
121 db.AutoMigrate(Block{})
122 db.AutoMigrate(Like{})
123 db.AutoMigrate(Repost{})
124 db.AutoMigrate(List{})
125 db.AutoMigrate(ListItem{})
126 db.AutoMigrate(ListBlock{})
127 db.AutoMigrate(Profile{})
128 db.AutoMigrate(ThreadGate{})
129 db.AutoMigrate(FeedGenerator{})
130 db.AutoMigrate(Image{})
131 db.AutoMigrate(PostGate{})
132 db.AutoMigrate(StarterPack{})
133 db.AutoMigrate(backend.SyncInfo{})
134 db.AutoMigrate(Notification{})
135 db.AutoMigrate(NotificationSeen{})
136 db.AutoMigrate(SequenceTracker{})
137 db.Exec("CREATE INDEX IF NOT EXISTS reposts_subject_idx ON reposts (subject)")
138 db.Exec("CREATE INDEX IF NOT EXISTS posts_reply_to_idx ON posts (reply_to)")
139 db.Exec("CREATE INDEX IF NOT EXISTS posts_in_thread_idx ON posts (in_thread)")
140
141 ctx := context.TODO()
142
143 cfg, err := pgxpool.ParseConfig(cctx.String("db-url"))
144 if err != nil {
145 return err
146 }
147
148 if cfg.MaxConns < 8 {
149 cfg.MaxConns = 8
150 }
151
152 pool, err := pgxpool.NewWithConfig(context.TODO(), cfg)
153 if err != nil {
154 return err
155 }
156
157 if err := pool.Ping(context.TODO()); err != nil {
158 return err
159 }
160
161 handle := os.Getenv("BSKY_HANDLE")
162 password := os.Getenv("BSKY_PASSWORD")
163
164 dir := identity.DefaultDirectory()
165
166 if redisURL := cctx.String("redis-url"); redisURL != "" {
167 rdir, err := redisdir.NewRedisDirectory(dir, redisURL, time.Minute, time.Second*10, time.Second*10, 100_000)
168 if err != nil {
169 return err
170 }
171 dir = rdir
172 }
173
174 resp, err := dir.LookupHandle(ctx, syntax.Handle(handle))
175 if err != nil {
176 return err
177 }
178 mydid := resp.DID.String()
179
180 cc := &xrpclib.Client{
181 Host: resp.PDSEndpoint(),
182 }
183
184 nsess, err := atproto.ServerCreateSession(ctx, cc, &atproto.ServerCreateSession_Input{
185 Identifier: handle,
186 Password: password,
187 })
188 if err != nil {
189 return err
190 }
191
192 cc.Auth = &xrpclib.AuthInfo{
193 AccessJwt: nsess.AccessJwt,
194 Did: mydid,
195 Handle: nsess.Handle,
196 RefreshJwt: nsess.RefreshJwt,
197 }
198
199 s := &Server{
200 mydid: mydid,
201 client: cc,
202 dir: dir,
203
204 db: db,
205 }
206
207 pgb, err := backend.NewPostgresBackend(mydid, db, pool, cc, dir)
208 if err != nil {
209 return err
210 }
211
212 s.backend = pgb
213
214 myrepo, err := s.backend.GetOrCreateRepo(ctx, mydid)
215 if err != nil {
216 return fmt.Errorf("failed to get repo record for our own did: %w", err)
217 }
218 s.myrepo = myrepo
219
220 if err := s.backend.LoadRelevantDids(); err != nil {
221 return fmt.Errorf("failed to load relevant dids set: %w", err)
222 }
223
224 // Start custom API server (for the custom frontend)
225 go func() {
226 if err := s.runApiServer(); err != nil {
227 fmt.Println("failed to start api server: ", err)
228 }
229 }()
230
231 // Start XRPC server (for official Bluesky app compatibility)
232 go func() {
233 xrpcServer := xrpc.NewServer(db, dir, pgb)
234 if err := xrpcServer.Start(":4446"); err != nil {
235 fmt.Println("failed to start XRPC server: ", err)
236 }
237 }()
238
239 // Start pprof server
240 go func() {
241 http.ListenAndServe(":4445", nil)
242 }()
243
244 sc := SyncConfig{
245 Backends: []SyncBackend{
246 {
247 Type: "firehose",
248 Host: "bsky.network",
249 },
250 },
251 }
252
253 if scfn := cctx.String("sync-config"); scfn != "" {
254 {
255 scfi, err := os.Open(scfn)
256 if err != nil {
257 return err
258 }
259 defer scfi.Close()
260
261 var lsc SyncConfig
262 if err := json.NewDecoder(scfi).Decode(&lsc); err != nil {
263 return err
264 }
265 sc = lsc
266 }
267 }
268
269 /*
270 sc.Backends[0] = SyncBackend{
271 Type: "jetstream",
272 Host: "jetstream1.us-west.bsky.network",
273 }
274 */
275
276 return s.StartSyncEngine(ctx, &sc)
277
278 }
279
280 app.RunAndExitOnError()
281}
282
283type Server struct {
284 backend *backend.PostgresBackend
285
286 dir identity.Directory
287
288 client *xrpclib.Client
289 mydid string
290 myrepo *Repo
291
292 seqLk sync.Mutex
293 lastSeq int64
294
295 mpLk sync.Mutex
296
297 db *gorm.DB
298}
299
300func (s *Server) getXrpcClient() (*xrpclib.Client, error) {
301 // TODO: handle refreshing the token periodically
302 return s.client, nil
303}
304
305func (s *Server) resolveAccountIdent(ctx context.Context, acc string) (string, error) {
306 unesc, err := url.PathUnescape(acc)
307 if err != nil {
308 return "", err
309 }
310
311 acc = unesc
312 if strings.HasPrefix(acc, "did:") {
313 return acc, nil
314 }
315
316 resp, err := s.dir.LookupHandle(ctx, syntax.Handle(acc))
317 if err != nil {
318 return "", err
319 }
320
321 return resp.DID.String(), nil
322}
323
324func (s *Server) rescanRepo(ctx context.Context, did string) error {
325 resp, err := s.dir.LookupDID(ctx, syntax.DID(did))
326 if err != nil {
327 return err
328 }
329
330 s.backend.AddRelevantDid(did)
331
332 c := &xrpclib.Client{
333 Host: resp.PDSEndpoint(),
334 }
335
336 repob, err := atproto.SyncGetRepo(ctx, c, did, "")
337 if err != nil {
338 return err
339 }
340
341 rep, err := repo.ReadRepoFromCar(ctx, bytes.NewReader(repob))
342 if err != nil {
343 return err
344 }
345
346 return rep.ForEach(ctx, "", func(k string, v cid.Cid) error {
347 blk, err := rep.Blockstore().Get(ctx, v)
348 if err != nil {
349 slog.Error("record missing in repo", "path", k, "cid", v, "error", err)
350 return nil
351 }
352
353 d := blk.RawData()
354 if err := s.backend.HandleCreate(ctx, did, "", k, &d, &v); err != nil {
355 slog.Error("failed to index record", "path", k, "cid", v, "error", err)
356 }
357 return nil
358 })
359
360}