A collection of Custom Bluesky Feeds, including Fresh Feeds, all under one roof
at main 18 kB view raw
1package main 2 3import ( 4 "context" 5 "database/sql" 6 "encoding/json" 7 "fmt" 8 "log" 9 "os" 10 "time" 11 12 "github.com/gorilla/websocket" 13 "github.com/lib/pq" 14 _ "github.com/lib/pq" 15) 16 17const wsUrl = "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.graph.follow" 18 19type LikeMessage struct { 20 Did string `json:"did"` 21 TimeUs int64 `json:"time_us"` 22 Kind string `json:"kind"` 23 Commit Commit `json:"commit"` 24} 25 26type Commit struct { 27 Rev string `json:"rev"` 28 Operation string `json:"operation"` 29 Collection string `json:"collection"` 30 RKey string `json:"rkey"` 31 Record LikeRecord `json:"record"` 32 CID string `json:"cid"` 33} 34 35type LikeRecord struct { 36 Type string `json:"$type"` 37 CreatedAt string `json:"createdAt"` 38 Subject LikeSubject `json:"subject"` 39 Reply *Reply `json:"reply,omitempty"` 40} 41 42type FollowRecord struct { 43 DID string `json:"subject"` 44 createdAt string `json:"createdAt"` 45} 46 47type Reply struct { 48 Parent ReplySubject `json:"parent"` 49 Root ReplySubject `json:"root"` 50} 51 52type LikeSubject struct { 53 CID string `json:"cid"` 54 URI string `json:"uri"` 55 Raw string // Stores raw string if not a JSON object 56} 57 58// This handles both string and object forms 59func (ls *LikeSubject) UnmarshalJSON(data []byte) error { 60 // Try to unmarshal as object first 61 var obj struct { 62 CID string `json:"cid"` 63 URI string `json:"uri"` 64 } 65 if err := json.Unmarshal(data, &obj); err == nil && obj.URI != "" { 66 ls.CID = obj.CID 67 ls.URI = obj.URI 68 return nil 69 } 70 71 // If it's not a valid object, try to unmarshal as string 72 var str string 73 if err := json.Unmarshal(data, &str); err == nil { 74 ls.Raw = str 75 return nil 76 } 77 78 return fmt.Errorf("LikeSubject: invalid JSON: %s", string(data)) 79} 80 81type ReplySubject struct { 82 CID string `json:"cid"` 83 URI string `json:"uri"` 84} 85 86var lastLoggedSecond int64 // Keep track of the last logged second 87 88var ( 89 postBatch []Post 90 likeBatch []Like 91 followBatch []Follow 92 batchInsertSize = 1000 // Adjust the batch size as needed 93 batchInterval = 30 * time.Second // Flush every 30 seconds 94) 95 96type Post struct { 97 RelAuthor string 98 PostUri string 99 RelDate int64 100 IsRepost bool 101 RepostUri string 102 ReplyTo string 103} 104 105type Like struct { 106 RelAuthor string 107 PostUri string 108 RelDate int64 109} 110 111type Follow struct { 112 RelAuthor string 113 followSubjectDID string 114} 115 116var trackedDIDsMap map[string]struct{} 117var trackedDIDs []string 118 119func initTrackedDIDsMap(dids []string) { 120 trackedDIDsMap = make(map[string]struct{}, len(dids)) 121 for _, did := range dids { 122 trackedDIDsMap[did] = struct{}{} 123 } 124} 125 126func getLastCursor(db *sql.DB) int64 { 127 var lastCursor int64 128 err := db.QueryRow("SELECT lastCursor FROM cursor WHERE id = 1").Scan(&lastCursor) 129 if err != nil { 130 if err == sql.ErrNoRows { 131 log.Println("Cursor table is empty; starting fresh.") 132 return 0 133 } 134 log.Fatalf("Error fetching last cursor: %v", err) 135 } 136 return lastCursor 137} 138 139// Periodically refreshes the trackedDIDs and trackedDIDsMap from the database. 140func startTrackedDIDRefreshJob(db *sql.DB) { 141 ticker := time.NewTicker(10 * time.Minute) 142 defer ticker.Stop() 143 144 for range ticker.C { 145 updatedDIDs, err := getTrackedDIDs(context.Background(), db) 146 if err != nil { 147 log.Printf("Failed to refresh tracked DIDs: %v", err) 148 continue 149 } 150 initTrackedDIDsMap(updatedDIDs) 151 trackedDIDs = updatedDIDs 152 log.Printf("Refreshed tracked DIDs: %v\n", trackedDIDs) 153 } 154} 155func main() { 156 // Connect to Postgres // Open the database connection 157 dbHost := os.Getenv("DB_HOST") 158 dbUser := os.Getenv("DB_USER") 159 dbName := os.Getenv("DB_NAME") 160 dbPassword := os.Getenv("DB_PASSWORD") 161 db, err := sql.Open("postgres", fmt.Sprintf("user=%s dbname=%s host=%s password=%s sslmode=disable", dbUser, dbName, dbHost, dbPassword)) 162 163 if err != nil { 164 log.Fatalf("Failed to connect to Postgres: %v", err) 165 } 166 defer db.Close() 167 168 // Ensure tables exist 169 createTables(db) 170 171 // Start the cleanup job 172 go startCleanupJob(db) 173 174 // Start the batch insert job 175 go startBatchInsertJob(db) 176 177 // Start the batch insert job for likes 178 go startBatchInsertLikesJob(db) 179 180 // Retrieve the last cursor 181 lastCursor := getLastCursor(db) 182 183 // initialize the tracked DIDs 184 trackedDIDs, err = getTrackedDIDs(context.Background(), db) 185 if err != nil { 186 log.Fatalf("Failed to get tracked DIDs: %v", err) 187 } 188 log.Printf("Tracked DIDs: %v\n", trackedDIDs) 189 initTrackedDIDsMap(trackedDIDs) 190 191 go startBatchInsertFollowJob(db) 192 193 go startTrackedDIDRefreshJob(db) 194 195 // If the cursor is older than 24 hours, skip it 196 if lastCursor > 0 { 197 cursorTime := time.UnixMicro(lastCursor) 198 if time.Since(cursorTime) > 24*time.Hour { 199 log.Printf("Cursor is older than 24 hours (%s); skipping it.", cursorTime.Format("2006-01-02 15:04:05")) 200 lastCursor = 0 // Ignore this cursor 201 } else { 202 log.Printf("Resuming from cursor: %d (%s)", lastCursor, cursorTime.Format("2006-01-02 15:04:05")) 203 } 204 } 205 206 // WebSocket URL with cursor if available 207 wsFullUrl := wsUrl 208 if lastCursor > 0 { 209 wsFullUrl += "&cursor=" + fmt.Sprintf("%d", lastCursor) 210 } 211 212 // Connect to WebSocket 213 conn, _, err := websocket.DefaultDialer.Dial(wsFullUrl, nil) 214 if err != nil { 215 log.Fatalf("WebSocket connection error: %v", err) 216 } 217 defer conn.Close() 218 219 //print wsFullUrl 220 log.Printf("Connected to WebSocket: %s", wsFullUrl) 221 222 log.Println("Listening for WebSocket messages...") 223 224 // Process WebSocket messages 225 for { 226 var msg LikeMessage 227 err := conn.ReadJSON(&msg) 228 if err != nil { 229 log.Printf("Error reading WebSocket message: %v", err) 230 continue 231 } 232 233 processMessage(db, msg) 234 } 235} 236 237func createTables(db *sql.DB) { 238 _, err := db.Exec(` 239 CREATE TABLE IF NOT EXISTS posts ( 240 id SERIAL PRIMARY KEY, 241 rel_author TEXT NOT NULL, 242 post_uri TEXT NOT NULL, 243 rel_date BIGINT NOT NULL, 244 is_repost BOOLEAN NOT NULL DEFAULT FALSE, 245 repost_uri TEXT, 246 reply_to TEXT, 247 UNIQUE(rel_author, post_uri, rel_date) 248 ); 249 `) 250 if err != nil { 251 log.Fatalf("Error creating 'posts' table: %v", err) 252 } 253 254 _, err = db.Exec(` 255 CREATE TABLE IF NOT EXISTS likes ( 256 id SERIAL PRIMARY KEY, 257 rel_author TEXT NOT NULL, 258 post_uri TEXT NOT NULL, 259 rel_date BIGINT NOT NULL, 260 UNIQUE(rel_author, post_uri) 261 ); 262 `) 263 if err != nil { 264 log.Fatalf("Error creating 'posts' table: %v", err) 265 } 266 267 // Create a cursor table with a single-row constraint 268 _, err = db.Exec(` 269 CREATE TABLE IF NOT EXISTS cursor ( 270 id INT PRIMARY KEY CHECK (id = 1), 271 lastCursor BIGINT NOT NULL 272 ); 273 `) 274 if err != nil { 275 log.Fatalf("Error creating 'cursor' table: %v", err) 276 } 277 278 // Ensure the cursor table always has exactly one row 279 _, err = db.Exec(` 280 INSERT INTO cursor (id, lastCursor) 281 VALUES (1, 0) 282 ON CONFLICT (id) DO NOTHING; 283 `) 284 if err != nil { 285 log.Fatalf("Error initializing cursor table: %v", err) 286 } 287} 288func processMessage(db *sql.DB, msg LikeMessage) { 289 // log.Print("Processing message...") 290 // Convert cursor to time 291 cursorTime := time.UnixMicro(msg.TimeUs) 292 293 // Get the whole second as a Unix timestamp 294 currentSecond := cursorTime.Unix() 295 296 // Check if this second has already been logged 297 if currentSecond != lastLoggedSecond && cursorTime.Nanosecond() >= 100_000_000 && cursorTime.Nanosecond() < 200_000_000 { 298 // Update the last logged second 299 lastLoggedSecond = currentSecond 300 301 // Log only once per second 302 humanReadableTime := cursorTime.Format("2006-01-02 15:04:05.000") 303 log.Printf("Cursor (time_us): %d, Human-readable time: %s", msg.TimeUs, humanReadableTime) 304 } 305 306 // Save the record 307 record := msg.Commit.Record 308 postUri := fmt.Sprintf("at://%s/app.bsky.feed.post/%s", msg.Did, msg.Commit.RKey) 309 repostUri := fmt.Sprintf("at://%s/app.bsky.feed.repost/%s", msg.Did, msg.Commit.RKey) 310 reply := "" 311 if msg.Commit.Record.Reply != nil { 312 reply = fmt.Sprintf("Parent: %s, Root: %s", msg.Commit.Record.Reply.Parent.URI, msg.Commit.Record.Reply.Root.URI) 313 } 314 315 switch msg.Commit.Collection { 316 case "app.bsky.feed.post": 317 if msg.Commit.Operation == "create" { 318 postBatch = append(postBatch, Post{msg.Did, postUri, msg.TimeUs, false, "", reply}) 319 } else if msg.Commit.Operation == "delete" { 320 deletePost(db, msg.Did, postUri, msg.TimeUs) 321 } 322 case "app.bsky.feed.repost": 323 324 if record.Subject.URI != "" { 325 if msg.Commit.Operation == "create" { 326 postBatch = append(postBatch, Post{msg.Did, record.Subject.URI, msg.TimeUs, true, repostUri, ""}) 327 } else if msg.Commit.Operation == "delete" { 328 deletePost(db, msg.Did, record.Subject.URI, msg.TimeUs) 329 } 330 } 331 case "app.bsky.feed.like": 332 if record.Subject.URI != "" { 333 if msg.Commit.Operation == "create" { 334 likeBatch = append(likeBatch, Like{msg.Did, record.Subject.URI, msg.TimeUs}) 335 } else if msg.Commit.Operation == "delete" { 336 deleteLike(db, msg.Did, record.Subject.URI) 337 } 338 } 339 case "app.bsky.graph.follow": 340 _, tracked := trackedDIDsMap[msg.Did] 341 if tracked { 342 // log.Printf("Following found tracked DID: %s\n", msg.Did) 343 if msg.Commit.Operation == "create" { 344 // log.Printf("Following Create; doer: %s, subject: %s\n", msg.Did, record.Subject.Raw) 345 followBatch = append(followBatch, Follow{msg.Did, record.Subject.Raw}) 346 } else if msg.Commit.Operation == "delete" { 347 // log.Printf("Following Delete; doer: %s, subject: %s\n", msg.Did, record.Subject.Raw) 348 //log.Printf("Unfollowing: %s", msg.Commit.RKey) 349 // Remove the DID from the tracked DIDs map 350 deleteFollow(db, msg.Did, msg.Commit.RKey) 351 } 352 } 353 default: 354 //log.Printf("Unknown collection: %s", msg.Commit.Collection) 355 } 356 357 // Update the cursor in the single-row table 358 _, err := db.Exec(` 359 UPDATE cursor SET lastCursor = $1 WHERE id = 1; 360 `, msg.TimeUs) 361 if err != nil { 362 log.Printf("Error updating cursor: %v", err) 363 } 364} 365 366func deletePost(db *sql.DB, relAuthor, postUri string, relDate int64) { 367 _, err := db.Exec(` 368 DELETE FROM posts WHERE rel_author = $1 AND post_uri = $2 AND rel_date = $3; 369 `, relAuthor, postUri, relDate) 370 if err != nil { 371 log.Printf("Error deleting post: %v", err) 372 } 373} 374 375func deleteLike(db *sql.DB, relAuthor, postUri string) { 376 _, err := db.Exec(` 377 DELETE FROM likes WHERE rel_author = $1 AND post_uri = $2; 378 `, relAuthor, postUri) 379 if err != nil { 380 log.Printf("Error deleting like: %v", err) 381 } 382} 383 384func deleteFollow(db *sql.DB, relAuthor, did string) { 385 unquotedTableName := "follows_" + relAuthor 386 tableName := pq.QuoteIdentifier(unquotedTableName) 387 _, err := db.Exec(fmt.Sprintf(` 388 DELETE FROM %s WHERE follow = $1; 389 `, tableName), did) 390 if err != nil { 391 log.Printf("Error deleting follow: %v", err) 392 } 393} 394 395func startCleanupJob(db *sql.DB) { 396 ticker := time.NewTicker(1 * time.Hour) 397 defer ticker.Stop() 398 399 for range ticker.C { 400 cleanupOldPosts(db) 401 if err := cleanOldFeedCaches(context.Background(), db); err != nil { 402 log.Printf("Error cleaning old feed caches: %v\n", err) 403 } 404 } 405} 406 407func cleanupOldPosts(db *sql.DB) { 408 threshold := time.Now().Add(-24 * time.Hour).UnixMicro() 409 _, err := db.Exec(` 410 DELETE FROM posts WHERE rel_date < $1; 411 `, threshold) 412 if err != nil { 413 log.Printf("Error deleting old posts: %v", err) 414 } else { 415 log.Printf("Deleted posts older than 24 hours.") 416 } 417} 418 419func startBatchInsertJob(db *sql.DB) { 420 ticker := time.NewTicker(batchInterval) 421 defer ticker.Stop() 422 423 for range ticker.C { 424 if len(postBatch) >= batchInsertSize { 425 batchInsertPosts(db) 426 } 427 } 428} 429 430func batchInsertPosts(db *sql.DB) { 431 tx, err := db.Begin() 432 if err != nil { 433 log.Printf("Error starting transaction: %v", err) 434 return 435 } 436 437 stmt, err := tx.Prepare(` 438 INSERT INTO posts (rel_author, post_uri, rel_date, is_repost, repost_uri, reply_to) 439 VALUES ($1, $2, $3, $4, $5, $6) 440 ON CONFLICT (rel_author, post_uri, rel_date) DO NOTHING; 441 `) 442 if err != nil { 443 log.Printf("Error preparing statement: %v", err) 444 return 445 } 446 defer stmt.Close() 447 448 for _, post := range postBatch { 449 _, err := stmt.Exec(post.RelAuthor, post.PostUri, post.RelDate, post.IsRepost, post.RepostUri, post.ReplyTo) 450 if err != nil { 451 log.Printf("Error executing statement: %v", err) 452 log.Printf("Failed INSERT: %+v\nError: %v", post, err) 453 os.Exit(1) // Exit on error 454 } 455 } 456 457 err = tx.Commit() 458 if err != nil { 459 log.Printf("Error committing transaction: %v", err) 460 } 461 462 // Clear the batch 463 postBatch = postBatch[:0] 464} 465 466func startBatchInsertLikesJob(db *sql.DB) { 467 ticker := time.NewTicker(batchInterval) 468 defer ticker.Stop() 469 470 for range ticker.C { 471 if len(likeBatch) >= batchInsertSize { 472 batchInsertLikes(db) 473 } 474 } 475} 476 477func startBatchInsertFollowJob(db *sql.DB) { 478 ticker := time.NewTicker(batchInterval) 479 defer ticker.Stop() 480 481 for range ticker.C { 482 if len(followBatch) >= 1 { 483 batchInsertFollow(db) 484 } 485 } 486} 487func batchInsertFollow(db *sql.DB) { 488 tx, err := db.Begin() 489 if err != nil { 490 log.Printf("Error starting transaction: %v", err) 491 return 492 } 493 494 unquotedTableName := "follows_" + followBatch[0].RelAuthor 495 tableName := pq.QuoteIdentifier(unquotedTableName) 496 497 stmt, err := tx.Prepare(fmt.Sprintf(` 498 INSERT INTO %s (follow) 499 VALUES ($1) 500 ON CONFLICT (follow) DO NOTHING 501 `, tableName)) 502 if err != nil { 503 log.Printf("Error preparing statement: %v", err) 504 return 505 } 506 defer stmt.Close() 507 508 for _, follow := range followBatch { 509 _, err := stmt.Exec(follow.followSubjectDID) 510 if err != nil { 511 log.Printf("Error executing statement: %v", err) 512 log.Printf("Failed FOLLOW INSERT: %+v\nError: %v", follow, err) 513 os.Exit(1) // Exit on error 514 } 515 } 516 517 err = tx.Commit() 518 if err != nil { 519 log.Printf("Error committing transaction: %v", err) 520 } 521 522 // Clear the batch 523 followBatch = followBatch[:0] 524} 525 526func batchInsertLikes(db *sql.DB) { 527 tx, err := db.Begin() 528 if err != nil { 529 log.Printf("Error starting transaction: %v", err) 530 return 531 } 532 533 stmt, err := tx.Prepare(` 534 INSERT INTO likes (rel_author, post_uri, rel_date) 535 VALUES ($1, $2, $3) 536 ON CONFLICT (rel_author, post_uri) DO NOTHING; 537 `) 538 if err != nil { 539 log.Printf("Error preparing statement: %v", err) 540 return 541 } 542 defer stmt.Close() 543 544 for _, like := range likeBatch { 545 _, err := stmt.Exec(like.RelAuthor, like.PostUri, like.RelDate) 546 if err != nil { 547 log.Printf("Error executing statement: %v", err) 548 log.Printf("Failed LIKE INSERT: %+v\nError: %v", like, err) 549 os.Exit(1) // Exit on error 550 } 551 } 552 553 err = tx.Commit() 554 if err != nil { 555 log.Printf("Error committing transaction: %v", err) 556 } 557 558 // Clear the batch 559 likeBatch = likeBatch[:0] 560} 561 562func cleanOldFeedCaches(ctx context.Context, db *sql.DB) error { 563 //log 564 log.Println("Cleaning old feed caches") 565 // Get the current time minus 24 hours 566 expirationTime := time.Now().Add(-24 * time.Hour) 567 568 // Get all tables from cachetimeout that are older than 24 hours 569 rows, err := db.QueryContext(ctx, ` 570 SELECT table_name 571 FROM cachetimeout 572 WHERE creation_time < $1 573 `, expirationTime) 574 if err != nil { 575 return fmt.Errorf("error querying cachetimeout table: %w", err) 576 } 577 defer rows.Close() 578 579 var tablesToDelete []string 580 for rows.Next() { 581 var tableName string 582 if err := rows.Scan(&tableName); err != nil { 583 return fmt.Errorf("error scanning table name: %w", err) 584 } 585 tablesToDelete = append(tablesToDelete, tableName) 586 } 587 588 if err := rows.Err(); err != nil { 589 return fmt.Errorf("error iterating cachetimeout rows: %w", err) 590 } 591 592 // Get all feedcache_* tables that do not have an entry in cachetimeout 593 rows, err = db.QueryContext(ctx, ` 594 SELECT table_name 595 FROM information_schema.tables 596 WHERE table_name LIKE 'feedcache_%' 597 AND table_name NOT IN (SELECT table_name FROM cachetimeout) 598 `) 599 if err != nil { 600 return fmt.Errorf("error querying feedcache tables: %w", err) 601 } 602 defer rows.Close() 603 604 for rows.Next() { 605 var tableName string 606 if err := rows.Scan(&tableName); err != nil { 607 return fmt.Errorf("error scanning table name: %w", err) 608 } 609 tablesToDelete = append(tablesToDelete, tableName) 610 } 611 612 if err := rows.Err(); err != nil { 613 return fmt.Errorf("error iterating feedcache rows: %w", err) 614 } 615 616 // Drop the old tables and remove their entries from cachetimeout 617 for _, tableName := range tablesToDelete { 618 _, err := db.ExecContext(ctx, fmt.Sprintf("DROP TABLE IF EXISTS %s", pq.QuoteIdentifier(tableName))) 619 if err != nil { 620 return fmt.Errorf("error dropping table %s: %w", tableName, err) 621 } 622 _, err = db.ExecContext(ctx, "DELETE FROM cachetimeout WHERE table_name = $1", tableName) 623 if err != nil { 624 return fmt.Errorf("error deleting from cachetimeout table: %w", err) 625 } 626 } 627 628 return nil 629} 630 631// ehhhhh why are we doing this 632func getTrackedDIDs(ctx context.Context, db *sql.DB) ([]string, error) { 633 const prefix = "follows_" 634 query := ` 635 SELECT table_name 636 FROM information_schema.tables 637 WHERE table_name LIKE $1 638 ` 639 rows, err := db.QueryContext(ctx, query, prefix+"%") 640 if err != nil { 641 return nil, fmt.Errorf("error querying tracked follows tables: %w", err) 642 } 643 defer rows.Close() 644 645 var trackedDIDs []string 646 for rows.Next() { 647 var tableName string 648 if err := rows.Scan(&tableName); err != nil { 649 return nil, fmt.Errorf("error scanning table name: %w", err) 650 } 651 // Strip prefix to get the DID 652 if len(tableName) > len(prefix) { 653 did := tableName[len(prefix):] 654 trackedDIDs = append(trackedDIDs, did) 655 } 656 } 657 658 if err := rows.Err(); err != nil { 659 return nil, fmt.Errorf("error iterating tracked tables: %w", err) 660 } 661 662 return trackedDIDs, nil 663}