A collection of Custom Bluesky Feeds, including Fresh Feeds, all under one roof
1package main
2
3import (
4 "context"
5 "database/sql"
6 "encoding/json"
7 "fmt"
8 "log"
9 "os"
10 "time"
11
12 "github.com/gorilla/websocket"
13 "github.com/lib/pq"
14 _ "github.com/lib/pq"
15)
16
17const wsUrl = "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.graph.follow"
18
19type LikeMessage struct {
20 Did string `json:"did"`
21 TimeUs int64 `json:"time_us"`
22 Kind string `json:"kind"`
23 Commit Commit `json:"commit"`
24}
25
26type Commit struct {
27 Rev string `json:"rev"`
28 Operation string `json:"operation"`
29 Collection string `json:"collection"`
30 RKey string `json:"rkey"`
31 Record LikeRecord `json:"record"`
32 CID string `json:"cid"`
33}
34
35type LikeRecord struct {
36 Type string `json:"$type"`
37 CreatedAt string `json:"createdAt"`
38 Subject LikeSubject `json:"subject"`
39 Reply *Reply `json:"reply,omitempty"`
40}
41
42type FollowRecord struct {
43 DID string `json:"subject"`
44 createdAt string `json:"createdAt"`
45}
46
47type Reply struct {
48 Parent ReplySubject `json:"parent"`
49 Root ReplySubject `json:"root"`
50}
51
52type LikeSubject struct {
53 CID string `json:"cid"`
54 URI string `json:"uri"`
55 Raw string // Stores raw string if not a JSON object
56}
57
58// This handles both string and object forms
59func (ls *LikeSubject) UnmarshalJSON(data []byte) error {
60 // Try to unmarshal as object first
61 var obj struct {
62 CID string `json:"cid"`
63 URI string `json:"uri"`
64 }
65 if err := json.Unmarshal(data, &obj); err == nil && obj.URI != "" {
66 ls.CID = obj.CID
67 ls.URI = obj.URI
68 return nil
69 }
70
71 // If it's not a valid object, try to unmarshal as string
72 var str string
73 if err := json.Unmarshal(data, &str); err == nil {
74 ls.Raw = str
75 return nil
76 }
77
78 return fmt.Errorf("LikeSubject: invalid JSON: %s", string(data))
79}
80
81type ReplySubject struct {
82 CID string `json:"cid"`
83 URI string `json:"uri"`
84}
85
86var lastLoggedSecond int64 // Keep track of the last logged second
87
88var (
89 postBatch []Post
90 likeBatch []Like
91 followBatch []Follow
92 batchInsertSize = 1000 // Adjust the batch size as needed
93 batchInterval = 30 * time.Second // Flush every 30 seconds
94)
95
96type Post struct {
97 RelAuthor string
98 PostUri string
99 RelDate int64
100 IsRepost bool
101 RepostUri string
102 ReplyTo string
103}
104
105type Like struct {
106 RelAuthor string
107 PostUri string
108 RelDate int64
109}
110
111type Follow struct {
112 RelAuthor string
113 followSubjectDID string
114}
115
116var trackedDIDsMap map[string]struct{}
117var trackedDIDs []string
118
119func initTrackedDIDsMap(dids []string) {
120 trackedDIDsMap = make(map[string]struct{}, len(dids))
121 for _, did := range dids {
122 trackedDIDsMap[did] = struct{}{}
123 }
124}
125
126func getLastCursor(db *sql.DB) int64 {
127 var lastCursor int64
128 err := db.QueryRow("SELECT lastCursor FROM cursor WHERE id = 1").Scan(&lastCursor)
129 if err != nil {
130 if err == sql.ErrNoRows {
131 log.Println("Cursor table is empty; starting fresh.")
132 return 0
133 }
134 log.Fatalf("Error fetching last cursor: %v", err)
135 }
136 return lastCursor
137}
138
139// Periodically refreshes the trackedDIDs and trackedDIDsMap from the database.
140func startTrackedDIDRefreshJob(db *sql.DB) {
141 ticker := time.NewTicker(10 * time.Minute)
142 defer ticker.Stop()
143
144 for range ticker.C {
145 updatedDIDs, err := getTrackedDIDs(context.Background(), db)
146 if err != nil {
147 log.Printf("Failed to refresh tracked DIDs: %v", err)
148 continue
149 }
150 initTrackedDIDsMap(updatedDIDs)
151 trackedDIDs = updatedDIDs
152 log.Printf("Refreshed tracked DIDs: %v\n", trackedDIDs)
153 }
154}
155func main() {
156 // Connect to Postgres // Open the database connection
157 dbHost := os.Getenv("DB_HOST")
158 dbUser := os.Getenv("DB_USER")
159 dbName := os.Getenv("DB_NAME")
160 dbPassword := os.Getenv("DB_PASSWORD")
161 db, err := sql.Open("postgres", fmt.Sprintf("user=%s dbname=%s host=%s password=%s sslmode=disable", dbUser, dbName, dbHost, dbPassword))
162
163 if err != nil {
164 log.Fatalf("Failed to connect to Postgres: %v", err)
165 }
166 defer db.Close()
167
168 // Ensure tables exist
169 createTables(db)
170
171 // Start the cleanup job
172 go startCleanupJob(db)
173
174 // Start the batch insert job
175 go startBatchInsertJob(db)
176
177 // Start the batch insert job for likes
178 go startBatchInsertLikesJob(db)
179
180 // Retrieve the last cursor
181 lastCursor := getLastCursor(db)
182
183 // initialize the tracked DIDs
184 trackedDIDs, err = getTrackedDIDs(context.Background(), db)
185 if err != nil {
186 log.Fatalf("Failed to get tracked DIDs: %v", err)
187 }
188 log.Printf("Tracked DIDs: %v\n", trackedDIDs)
189 initTrackedDIDsMap(trackedDIDs)
190
191 go startBatchInsertFollowJob(db)
192
193 go startTrackedDIDRefreshJob(db)
194
195 // If the cursor is older than 24 hours, skip it
196 if lastCursor > 0 {
197 cursorTime := time.UnixMicro(lastCursor)
198 if time.Since(cursorTime) > 24*time.Hour {
199 log.Printf("Cursor is older than 24 hours (%s); skipping it.", cursorTime.Format("2006-01-02 15:04:05"))
200 lastCursor = 0 // Ignore this cursor
201 } else {
202 log.Printf("Resuming from cursor: %d (%s)", lastCursor, cursorTime.Format("2006-01-02 15:04:05"))
203 }
204 }
205
206 // WebSocket URL with cursor if available
207 wsFullUrl := wsUrl
208 if lastCursor > 0 {
209 wsFullUrl += "&cursor=" + fmt.Sprintf("%d", lastCursor)
210 }
211
212 // Connect to WebSocket
213 conn, _, err := websocket.DefaultDialer.Dial(wsFullUrl, nil)
214 if err != nil {
215 log.Fatalf("WebSocket connection error: %v", err)
216 }
217 defer conn.Close()
218
219 //print wsFullUrl
220 log.Printf("Connected to WebSocket: %s", wsFullUrl)
221
222 log.Println("Listening for WebSocket messages...")
223
224 // Process WebSocket messages
225 for {
226 var msg LikeMessage
227 err := conn.ReadJSON(&msg)
228 if err != nil {
229 log.Printf("Error reading WebSocket message: %v", err)
230 continue
231 }
232
233 processMessage(db, msg)
234 }
235}
236
237func createTables(db *sql.DB) {
238 _, err := db.Exec(`
239 CREATE TABLE IF NOT EXISTS posts (
240 id SERIAL PRIMARY KEY,
241 rel_author TEXT NOT NULL,
242 post_uri TEXT NOT NULL,
243 rel_date BIGINT NOT NULL,
244 is_repost BOOLEAN NOT NULL DEFAULT FALSE,
245 repost_uri TEXT,
246 reply_to TEXT,
247 UNIQUE(rel_author, post_uri, rel_date)
248 );
249 `)
250 if err != nil {
251 log.Fatalf("Error creating 'posts' table: %v", err)
252 }
253
254 _, err = db.Exec(`
255 CREATE TABLE IF NOT EXISTS likes (
256 id SERIAL PRIMARY KEY,
257 rel_author TEXT NOT NULL,
258 post_uri TEXT NOT NULL,
259 rel_date BIGINT NOT NULL,
260 UNIQUE(rel_author, post_uri)
261 );
262 `)
263 if err != nil {
264 log.Fatalf("Error creating 'posts' table: %v", err)
265 }
266
267 // Create a cursor table with a single-row constraint
268 _, err = db.Exec(`
269 CREATE TABLE IF NOT EXISTS cursor (
270 id INT PRIMARY KEY CHECK (id = 1),
271 lastCursor BIGINT NOT NULL
272 );
273 `)
274 if err != nil {
275 log.Fatalf("Error creating 'cursor' table: %v", err)
276 }
277
278 // Ensure the cursor table always has exactly one row
279 _, err = db.Exec(`
280 INSERT INTO cursor (id, lastCursor)
281 VALUES (1, 0)
282 ON CONFLICT (id) DO NOTHING;
283 `)
284 if err != nil {
285 log.Fatalf("Error initializing cursor table: %v", err)
286 }
287}
288func processMessage(db *sql.DB, msg LikeMessage) {
289 // log.Print("Processing message...")
290 // Convert cursor to time
291 cursorTime := time.UnixMicro(msg.TimeUs)
292
293 // Get the whole second as a Unix timestamp
294 currentSecond := cursorTime.Unix()
295
296 // Check if this second has already been logged
297 if currentSecond != lastLoggedSecond && cursorTime.Nanosecond() >= 100_000_000 && cursorTime.Nanosecond() < 200_000_000 {
298 // Update the last logged second
299 lastLoggedSecond = currentSecond
300
301 // Log only once per second
302 humanReadableTime := cursorTime.Format("2006-01-02 15:04:05.000")
303 log.Printf("Cursor (time_us): %d, Human-readable time: %s", msg.TimeUs, humanReadableTime)
304 }
305
306 // Save the record
307 record := msg.Commit.Record
308 postUri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", msg.Did, msg.Commit.RKey)
309 repostUri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", msg.Did, msg.Commit.RKey)
310 reply := ""
311 if msg.Commit.Record.Reply != nil {
312 reply = fmt.Sprintf("Parent: %s, Root: %s", msg.Commit.Record.Reply.Parent.URI, msg.Commit.Record.Reply.Root.URI)
313 }
314
315 switch msg.Commit.Collection {
316 case "app.bsky.feed.post":
317 if msg.Commit.Operation == "create" {
318 postBatch = append(postBatch, Post{msg.Did, postUri, msg.TimeUs, false, "", reply})
319 } else if msg.Commit.Operation == "delete" {
320 deletePost(db, msg.Did, postUri, msg.TimeUs)
321 }
322 case "app.bsky.feed.repost":
323
324 if record.Subject.URI != "" {
325 if msg.Commit.Operation == "create" {
326 postBatch = append(postBatch, Post{msg.Did, record.Subject.URI, msg.TimeUs, true, repostUri, ""})
327 } else if msg.Commit.Operation == "delete" {
328 deletePost(db, msg.Did, record.Subject.URI, msg.TimeUs)
329 }
330 }
331 case "app.bsky.feed.like":
332 if record.Subject.URI != "" {
333 if msg.Commit.Operation == "create" {
334 likeBatch = append(likeBatch, Like{msg.Did, record.Subject.URI, msg.TimeUs})
335 } else if msg.Commit.Operation == "delete" {
336 deleteLike(db, msg.Did, record.Subject.URI)
337 }
338 }
339 case "app.bsky.graph.follow":
340 _, tracked := trackedDIDsMap[msg.Did]
341 if tracked {
342 // log.Printf("Following found tracked DID: %s\n", msg.Did)
343 if msg.Commit.Operation == "create" {
344 // log.Printf("Following Create; doer: %s, subject: %s\n", msg.Did, record.Subject.Raw)
345 followBatch = append(followBatch, Follow{msg.Did, record.Subject.Raw})
346 } else if msg.Commit.Operation == "delete" {
347 // log.Printf("Following Delete; doer: %s, subject: %s\n", msg.Did, record.Subject.Raw)
348 //log.Printf("Unfollowing: %s", msg.Commit.RKey)
349 // Remove the DID from the tracked DIDs map
350 deleteFollow(db, msg.Did, msg.Commit.RKey)
351 }
352 }
353 default:
354 //log.Printf("Unknown collection: %s", msg.Commit.Collection)
355 }
356
357 // Update the cursor in the single-row table
358 _, err := db.Exec(`
359 UPDATE cursor SET lastCursor = $1 WHERE id = 1;
360 `, msg.TimeUs)
361 if err != nil {
362 log.Printf("Error updating cursor: %v", err)
363 }
364}
365
366func deletePost(db *sql.DB, relAuthor, postUri string, relDate int64) {
367 _, err := db.Exec(`
368 DELETE FROM posts WHERE rel_author = $1 AND post_uri = $2 AND rel_date = $3;
369 `, relAuthor, postUri, relDate)
370 if err != nil {
371 log.Printf("Error deleting post: %v", err)
372 }
373}
374
375func deleteLike(db *sql.DB, relAuthor, postUri string) {
376 _, err := db.Exec(`
377 DELETE FROM likes WHERE rel_author = $1 AND post_uri = $2;
378 `, relAuthor, postUri)
379 if err != nil {
380 log.Printf("Error deleting like: %v", err)
381 }
382}
383
384func deleteFollow(db *sql.DB, relAuthor, did string) {
385 unquotedTableName := "follows_" + relAuthor
386 tableName := pq.QuoteIdentifier(unquotedTableName)
387 _, err := db.Exec(fmt.Sprintf(`
388 DELETE FROM %s WHERE follow = $1;
389 `, tableName), did)
390 if err != nil {
391 log.Printf("Error deleting follow: %v", err)
392 }
393}
394
395func startCleanupJob(db *sql.DB) {
396 ticker := time.NewTicker(1 * time.Hour)
397 defer ticker.Stop()
398
399 for range ticker.C {
400 cleanupOldPosts(db)
401 if err := cleanOldFeedCaches(context.Background(), db); err != nil {
402 log.Printf("Error cleaning old feed caches: %v\n", err)
403 }
404 }
405}
406
407func cleanupOldPosts(db *sql.DB) {
408 threshold := time.Now().Add(-24 * time.Hour).UnixMicro()
409 _, err := db.Exec(`
410 DELETE FROM posts WHERE rel_date < $1;
411 `, threshold)
412 if err != nil {
413 log.Printf("Error deleting old posts: %v", err)
414 } else {
415 log.Printf("Deleted posts older than 24 hours.")
416 }
417}
418
419func startBatchInsertJob(db *sql.DB) {
420 ticker := time.NewTicker(batchInterval)
421 defer ticker.Stop()
422
423 for range ticker.C {
424 if len(postBatch) >= batchInsertSize {
425 batchInsertPosts(db)
426 }
427 }
428}
429
430func batchInsertPosts(db *sql.DB) {
431 tx, err := db.Begin()
432 if err != nil {
433 log.Printf("Error starting transaction: %v", err)
434 return
435 }
436
437 stmt, err := tx.Prepare(`
438 INSERT INTO posts (rel_author, post_uri, rel_date, is_repost, repost_uri, reply_to)
439 VALUES ($1, $2, $3, $4, $5, $6)
440 ON CONFLICT (rel_author, post_uri, rel_date) DO NOTHING;
441 `)
442 if err != nil {
443 log.Printf("Error preparing statement: %v", err)
444 return
445 }
446 defer stmt.Close()
447
448 for _, post := range postBatch {
449 _, err := stmt.Exec(post.RelAuthor, post.PostUri, post.RelDate, post.IsRepost, post.RepostUri, post.ReplyTo)
450 if err != nil {
451 log.Printf("Error executing statement: %v", err)
452 log.Printf("Failed INSERT: %+v\nError: %v", post, err)
453 os.Exit(1) // Exit on error
454 }
455 }
456
457 err = tx.Commit()
458 if err != nil {
459 log.Printf("Error committing transaction: %v", err)
460 }
461
462 // Clear the batch
463 postBatch = postBatch[:0]
464}
465
466func startBatchInsertLikesJob(db *sql.DB) {
467 ticker := time.NewTicker(batchInterval)
468 defer ticker.Stop()
469
470 for range ticker.C {
471 if len(likeBatch) >= batchInsertSize {
472 batchInsertLikes(db)
473 }
474 }
475}
476
477func startBatchInsertFollowJob(db *sql.DB) {
478 ticker := time.NewTicker(batchInterval)
479 defer ticker.Stop()
480
481 for range ticker.C {
482 if len(followBatch) >= 1 {
483 batchInsertFollow(db)
484 }
485 }
486}
487func batchInsertFollow(db *sql.DB) {
488 tx, err := db.Begin()
489 if err != nil {
490 log.Printf("Error starting transaction: %v", err)
491 return
492 }
493
494 unquotedTableName := "follows_" + followBatch[0].RelAuthor
495 tableName := pq.QuoteIdentifier(unquotedTableName)
496
497 stmt, err := tx.Prepare(fmt.Sprintf(`
498 INSERT INTO %s (follow)
499 VALUES ($1)
500 ON CONFLICT (follow) DO NOTHING
501 `, tableName))
502 if err != nil {
503 log.Printf("Error preparing statement: %v", err)
504 return
505 }
506 defer stmt.Close()
507
508 for _, follow := range followBatch {
509 _, err := stmt.Exec(follow.followSubjectDID)
510 if err != nil {
511 log.Printf("Error executing statement: %v", err)
512 log.Printf("Failed FOLLOW INSERT: %+v\nError: %v", follow, err)
513 os.Exit(1) // Exit on error
514 }
515 }
516
517 err = tx.Commit()
518 if err != nil {
519 log.Printf("Error committing transaction: %v", err)
520 }
521
522 // Clear the batch
523 followBatch = followBatch[:0]
524}
525
526func batchInsertLikes(db *sql.DB) {
527 tx, err := db.Begin()
528 if err != nil {
529 log.Printf("Error starting transaction: %v", err)
530 return
531 }
532
533 stmt, err := tx.Prepare(`
534 INSERT INTO likes (rel_author, post_uri, rel_date)
535 VALUES ($1, $2, $3)
536 ON CONFLICT (rel_author, post_uri) DO NOTHING;
537 `)
538 if err != nil {
539 log.Printf("Error preparing statement: %v", err)
540 return
541 }
542 defer stmt.Close()
543
544 for _, like := range likeBatch {
545 _, err := stmt.Exec(like.RelAuthor, like.PostUri, like.RelDate)
546 if err != nil {
547 log.Printf("Error executing statement: %v", err)
548 log.Printf("Failed LIKE INSERT: %+v\nError: %v", like, err)
549 os.Exit(1) // Exit on error
550 }
551 }
552
553 err = tx.Commit()
554 if err != nil {
555 log.Printf("Error committing transaction: %v", err)
556 }
557
558 // Clear the batch
559 likeBatch = likeBatch[:0]
560}
561
562func cleanOldFeedCaches(ctx context.Context, db *sql.DB) error {
563 //log
564 log.Println("Cleaning old feed caches")
565 // Get the current time minus 24 hours
566 expirationTime := time.Now().Add(-24 * time.Hour)
567
568 // Get all tables from cachetimeout that are older than 24 hours
569 rows, err := db.QueryContext(ctx, `
570 SELECT table_name
571 FROM cachetimeout
572 WHERE creation_time < $1
573 `, expirationTime)
574 if err != nil {
575 return fmt.Errorf("error querying cachetimeout table: %w", err)
576 }
577 defer rows.Close()
578
579 var tablesToDelete []string
580 for rows.Next() {
581 var tableName string
582 if err := rows.Scan(&tableName); err != nil {
583 return fmt.Errorf("error scanning table name: %w", err)
584 }
585 tablesToDelete = append(tablesToDelete, tableName)
586 }
587
588 if err := rows.Err(); err != nil {
589 return fmt.Errorf("error iterating cachetimeout rows: %w", err)
590 }
591
592 // Get all feedcache_* tables that do not have an entry in cachetimeout
593 rows, err = db.QueryContext(ctx, `
594 SELECT table_name
595 FROM information_schema.tables
596 WHERE table_name LIKE 'feedcache_%'
597 AND table_name NOT IN (SELECT table_name FROM cachetimeout)
598 `)
599 if err != nil {
600 return fmt.Errorf("error querying feedcache tables: %w", err)
601 }
602 defer rows.Close()
603
604 for rows.Next() {
605 var tableName string
606 if err := rows.Scan(&tableName); err != nil {
607 return fmt.Errorf("error scanning table name: %w", err)
608 }
609 tablesToDelete = append(tablesToDelete, tableName)
610 }
611
612 if err := rows.Err(); err != nil {
613 return fmt.Errorf("error iterating feedcache rows: %w", err)
614 }
615
616 // Drop the old tables and remove their entries from cachetimeout
617 for _, tableName := range tablesToDelete {
618 _, err := db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", pq.QuoteIdentifier(tableName)))
619 if err != nil {
620 return fmt.Errorf("error dropping table %s: %w", tableName, err)
621 }
622 _, err = db.ExecContext(ctx, "DELETE FROM cachetimeout WHERE table_name = $1", tableName)
623 if err != nil {
624 return fmt.Errorf("error deleting from cachetimeout table: %w", err)
625 }
626 }
627
628 return nil
629}
630
631// ehhhhh why are we doing this
632func getTrackedDIDs(ctx context.Context, db *sql.DB) ([]string, error) {
633 const prefix = "follows_"
634 query := `
635 SELECT table_name
636 FROM information_schema.tables
637 WHERE table_name LIKE $1
638 `
639 rows, err := db.QueryContext(ctx, query, prefix+"%")
640 if err != nil {
641 return nil, fmt.Errorf("error querying tracked follows tables: %w", err)
642 }
643 defer rows.Close()
644
645 var trackedDIDs []string
646 for rows.Next() {
647 var tableName string
648 if err := rows.Scan(&tableName); err != nil {
649 return nil, fmt.Errorf("error scanning table name: %w", err)
650 }
651 // Strip prefix to get the DID
652 if len(tableName) > len(prefix) {
653 did := tableName[len(prefix):]
654 trackedDIDs = append(trackedDIDs, did)
655 }
656 }
657
658 if err := rows.Err(); err != nil {
659 return nil, fmt.Errorf("error iterating tracked tables: %w", err)
660 }
661
662 return trackedDIDs, nil
663}