+2
-3
README.md
+2
-3
README.md
···
50
50
51
51
## Todo
52
52
- [ ] Faster Indexing
53
-
- [ ] Proper Up-to-Date Following Indexing
53
+
- [x] Proper Up-to-Date Following Indexing
54
54
- [x] Repost Indicators
55
-
- [ ] Cache Timeouts
55
+
- [x] Cache Timeouts
56
56
- [x] Likes
57
57
- [x] Posts
58
58
- [x] Feed Caches
59
-
- [ ] Followings
60
59
- [ ] More Fresh Feed Variants
61
60
- [ ] unFresh
62
61
- [x] +9 hrs
+166
-1
indexer/indexer.go
+166
-1
indexer/indexer.go
···
3
3
import (
4
4
"context"
5
5
"database/sql"
6
+
"encoding/json"
6
7
"fmt"
7
8
"log"
8
9
"os"
···
13
14
_ "github.com/lib/pq"
14
15
)
15
16
16
-
const wsUrl = "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.feed.like"
17
+
const wsUrl = "wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post&wantedCollections=app.bsky.feed.repost&wantedCollections=app.bsky.feed.like&wantedCollections=app.bsky.graph.follow"
17
18
18
19
type LikeMessage struct {
19
20
Did string `json:"did"`
···
38
39
Reply *Reply `json:"reply,omitempty"`
39
40
}
40
41
42
+
type FollowRecord struct {
43
+
DID string `json:"subject"`
44
+
createdAt string `json:"createdAt"`
45
+
}
46
+
41
47
type Reply struct {
42
48
Parent ReplySubject `json:"parent"`
43
49
Root ReplySubject `json:"root"`
···
46
52
type LikeSubject struct {
47
53
CID string `json:"cid"`
48
54
URI string `json:"uri"`
55
+
Raw string // Stores raw string if not a JSON object
56
+
}
57
+
58
+
// This handles both string and object forms
59
+
func (ls *LikeSubject) UnmarshalJSON(data []byte) error {
60
+
// Try to unmarshal as object first
61
+
var obj struct {
62
+
CID string `json:"cid"`
63
+
URI string `json:"uri"`
64
+
}
65
+
if err := json.Unmarshal(data, &obj); err == nil && obj.URI != "" {
66
+
ls.CID = obj.CID
67
+
ls.URI = obj.URI
68
+
return nil
69
+
}
70
+
71
+
// If it's not a valid object, try to unmarshal as string
72
+
var str string
73
+
if err := json.Unmarshal(data, &str); err == nil {
74
+
ls.Raw = str
75
+
return nil
76
+
}
77
+
78
+
return fmt.Errorf("LikeSubject: invalid JSON: %s", string(data))
49
79
}
50
80
51
81
type ReplySubject struct {
···
58
88
var (
59
89
postBatch []Post
60
90
likeBatch []Like
91
+
followBatch []Follow
61
92
batchInsertSize = 1000 // Adjust the batch size as needed
62
93
batchInterval = 30 * time.Second // Flush every 30 seconds
63
94
)
···
77
108
RelDate int64
78
109
}
79
110
111
+
type Follow struct {
112
+
RelAuthor string
113
+
followSubjectDID string
114
+
}
115
+
116
+
var trackedDIDsMap map[string]struct{}
117
+
var trackedDIDs []string
118
+
119
+
func initTrackedDIDsMap(dids []string) {
120
+
trackedDIDsMap = make(map[string]struct{}, len(dids))
121
+
for _, did := range dids {
122
+
trackedDIDsMap[did] = struct{}{}
123
+
}
124
+
}
125
+
80
126
func getLastCursor(db *sql.DB) int64 {
81
127
var lastCursor int64
82
128
err := db.QueryRow("SELECT lastCursor FROM cursor WHERE id = 1").Scan(&lastCursor)
···
118
164
// Retrieve the last cursor
119
165
lastCursor := getLastCursor(db)
120
166
167
+
// initialize the tracked DIDs
168
+
trackedDIDs, err = getTrackedDIDs(context.Background(), db)
169
+
if err != nil {
170
+
log.Fatalf("Failed to get tracked DIDs: %v", err)
171
+
}
172
+
log.Printf("Tracked DIDs: %v\n", trackedDIDs)
173
+
initTrackedDIDsMap(trackedDIDs)
174
+
175
+
go startBatchInsertFollowJob(db)
121
176
// If the cursor is older than 24 hours, skip it
122
177
if lastCursor > 0 {
123
178
cursorTime := time.UnixMicro(lastCursor)
···
212
267
}
213
268
}
214
269
func processMessage(db *sql.DB, msg LikeMessage) {
270
+
// log.Print("Processing message...")
215
271
// Convert cursor to time
216
272
cursorTime := time.UnixMicro(msg.TimeUs)
217
273
···
245
301
deletePost(db, msg.Did, postUri, msg.TimeUs)
246
302
}
247
303
case "app.bsky.feed.repost":
304
+
248
305
if record.Subject.URI != "" {
249
306
if msg.Commit.Operation == "create" {
250
307
postBatch = append(postBatch, Post{msg.Did, record.Subject.URI, msg.TimeUs, true, repostUri, ""})
···
260
317
deleteLike(db, msg.Did, record.Subject.URI)
261
318
}
262
319
}
320
+
case "app.bsky.graph.follow":
321
+
_, tracked := trackedDIDsMap[msg.Did]
322
+
if tracked {
323
+
// log.Printf("Following found tracked DID: %s\n", msg.Did)
324
+
if msg.Commit.Operation == "create" {
325
+
// log.Printf("Following Create; doer: %s, subject: %s\n", msg.Did, record.Subject.Raw)
326
+
followBatch = append(followBatch, Follow{msg.Did, record.Subject.Raw})
327
+
} else if msg.Commit.Operation == "delete" {
328
+
// log.Printf("Following Delete; doer: %s, subject: %s\n", msg.Did, record.Subject.Raw)
329
+
//log.Printf("Unfollowing: %s", msg.Commit.RKey)
330
+
// Remove the DID from the tracked DIDs map
331
+
deleteFollow(db, msg.Did, msg.Commit.RKey)
332
+
}
333
+
}
263
334
default:
264
335
//log.Printf("Unknown collection: %s", msg.Commit.Collection)
265
336
}
···
288
359
`, relAuthor, postUri)
289
360
if err != nil {
290
361
log.Printf("Error deleting like: %v", err)
362
+
}
363
+
}
364
+
365
+
func deleteFollow(db *sql.DB, relAuthor, did string) {
366
+
unquotedTableName := "follows_" + relAuthor
367
+
tableName := pq.QuoteIdentifier(unquotedTableName)
368
+
_, err := db.Exec(fmt.Sprintf(`
369
+
DELETE FROM %s WHERE follow = $1;
370
+
`, tableName), did)
371
+
if err != nil {
372
+
log.Printf("Error deleting follow: %v", err)
291
373
}
292
374
}
293
375
···
373
455
}
374
456
}
375
457
458
+
func startBatchInsertFollowJob(db *sql.DB) {
459
+
ticker := time.NewTicker(batchInterval)
460
+
defer ticker.Stop()
461
+
462
+
for range ticker.C {
463
+
if len(followBatch) >= 1 {
464
+
batchInsertFollow(db)
465
+
}
466
+
}
467
+
}
468
+
func batchInsertFollow(db *sql.DB) {
469
+
tx, err := db.Begin()
470
+
if err != nil {
471
+
log.Printf("Error starting transaction: %v", err)
472
+
return
473
+
}
474
+
475
+
unquotedTableName := "follows_" + followBatch[0].RelAuthor
476
+
tableName := pq.QuoteIdentifier(unquotedTableName)
477
+
478
+
stmt, err := tx.Prepare(fmt.Sprintf(`
479
+
INSERT INTO %s (follow)
480
+
VALUES ($1)
481
+
ON CONFLICT (follow) DO NOTHING
482
+
`, tableName))
483
+
if err != nil {
484
+
log.Printf("Error preparing statement: %v", err)
485
+
return
486
+
}
487
+
defer stmt.Close()
488
+
489
+
for _, follow := range followBatch {
490
+
_, err := stmt.Exec(follow.followSubjectDID)
491
+
if err != nil {
492
+
log.Printf("Error executing statement: %v", err)
493
+
log.Printf("Failed FOLLOW INSERT: %+v\nError: %v", follow, err)
494
+
os.Exit(1) // Exit on error
495
+
}
496
+
}
497
+
498
+
err = tx.Commit()
499
+
if err != nil {
500
+
log.Printf("Error committing transaction: %v", err)
501
+
}
502
+
503
+
// Clear the batch
504
+
followBatch = followBatch[:0]
505
+
}
506
+
376
507
func batchInsertLikes(db *sql.DB) {
377
508
tx, err := db.Begin()
378
509
if err != nil {
···
477
608
478
609
return nil
479
610
}
611
+
612
+
// ehhhhh why are we doing this
613
+
func getTrackedDIDs(ctx context.Context, db *sql.DB) ([]string, error) {
614
+
const prefix = "follows_"
615
+
query := `
616
+
SELECT table_name
617
+
FROM information_schema.tables
618
+
WHERE table_name LIKE $1
619
+
`
620
+
rows, err := db.QueryContext(ctx, query, prefix+"%")
621
+
if err != nil {
622
+
return nil, fmt.Errorf("error querying tracked follows tables: %w", err)
623
+
}
624
+
defer rows.Close()
625
+
626
+
var trackedDIDs []string
627
+
for rows.Next() {
628
+
var tableName string
629
+
if err := rows.Scan(&tableName); err != nil {
630
+
return nil, fmt.Errorf("error scanning table name: %w", err)
631
+
}
632
+
// Strip prefix to get the DID
633
+
if len(tableName) > len(prefix) {
634
+
did := tableName[len(prefix):]
635
+
trackedDIDs = append(trackedDIDs, did)
636
+
}
637
+
}
638
+
639
+
if err := rows.Err(); err != nil {
640
+
return nil, fmt.Errorf("error iterating tracked tables: %w", err)
641
+
}
642
+
643
+
return trackedDIDs, nil
644
+
}