this repo has no description

Compare changes

Choose any two refs to compare.

Changed files
+274 -9
cmd
feedweb
mostliked
videostream
pkg
service
+7 -7
cmd/feedweb/did.go
··· 9 9 const NgrokHostname = "routinely-right-barnacle.ngrok-free.app" 10 10 11 11 type DidDocument struct { 12 - Context []string `json:"@context"` 13 - ID string `json:"id"` 12 + Context []string `json:"@context"` 13 + ID string `json:"id"` 14 14 Services []DidService `json:"service"` 15 15 } 16 16 17 17 type DidService struct { 18 - ID string `json:"id"` 19 - ServiceType string `json:"type"` 18 + ID string `json:"id"` 19 + ServiceType string `json:"type"` 20 20 ServiceEndpoint string `json:"serviceEndpoint"` 21 21 } 22 22 23 23 func didDoc(c echo.Context) error { 24 24 doc := DidDocument{ 25 25 Context: []string{"https://www.w3.org/ns/did/v1"}, 26 - ID: `did:web:` + NgrokHostname, 26 + ID: `did:web:` + NgrokHostname, 27 27 Services: []DidService{ 28 28 DidService{ 29 - ID: "#bsky_fg", 30 - ServiceType: "BskyFeedGenerator", 29 + ID: "#bsky_fg", 30 + ServiceType: "BskyFeedGenerator", 31 31 ServiceEndpoint: `https://` + NgrokHostname, 32 32 }, 33 33 },
+2
cmd/mostliked/main.go
··· 52 52 for { 53 53 _, message, err := conn.ReadMessage() 54 54 if err != nil { 55 + log.Printf("ReadJSON error: %v\n", err) 55 56 stop() 57 + break 56 58 } 57 59 jetstreamEvents <- message 58 60 }
+7
pkg/mostliked/handler.go
··· 86 86 eventCount int 87 87 ) 88 88 89 + forLoop: 89 90 for evt := range events { 91 + select { 92 + case <-ctx.Done(): 93 + break forLoop 94 + default: 95 + } 96 + 90 97 if !txOpen { 91 98 dbTx, err = dbCnx.BeginTx(ctx, nil) 92 99 if err != nil {
+1 -1
service/feedweb.service
··· 4 4 5 5 [Service] 6 6 Type=simple 7 - User=eric 7 + User=ubuntu 8 8 WorkingDirectory=/home/ubuntu/bsky-feeds 9 9 ExecStart=/home/ubuntu/bsky-feeds/bin/feedweb 10 10 TimeoutSec=15
+67
cmd/videostream/main.go
··· 1 + package main 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + "log" 7 + "os" 8 + "os/signal" 9 + "syscall" 10 + 11 + jetstream "github.com/bluesky-social/jetstream/pkg/models" 12 + "github.com/edavis/bsky-feeds/pkg/videostream" 13 + "github.com/gorilla/websocket" 14 + _ "github.com/mattn/go-sqlite3" 15 + ) 16 + 17 + const JetstreamUrl = `wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post` 18 + 19 + func main() { 20 + ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM) 21 + defer stop() 22 + 23 + conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil) 24 + if err != nil { 25 + log.Fatalf("failed to open websocket: %v\n", err) 26 + } 27 + defer func() { 28 + if err := conn.Close(); err != nil { 29 + log.Printf("failed to close websocket: %v\n", err) 30 + } 31 + log.Printf("websocket closed\n") 32 + }() 33 + 34 + dbCnx, err := sql.Open("sqlite3", "data/videostream.db?_journal=WAL&_fk=on&_timeout=5000&_sync=1&_txlock=immediate") 35 + if err != nil { 36 + log.Fatalf("failed to open database: %v\n", err) 37 + } 38 + defer func() { 39 + if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil { 40 + log.Printf("error doing final WAL checkpoint: %v\n", err) 41 + } 42 + if err := dbCnx.Close(); err != nil { 43 + log.Printf("failed to close db: %v\n", err) 44 + } 45 + log.Printf("db closed\n") 46 + }() 47 + 48 + queue := videostream.NewQueue(1000) 49 + go videostream.Handler(ctx, queue, dbCnx) 50 + 51 + log.Printf("starting up\n") 52 + go func() { 53 + for { 54 + var event jetstream.Event 55 + err := conn.ReadJSON(&event) 56 + if err != nil { 57 + log.Printf("ReadJSON error: %v\n", err) 58 + stop() 59 + break 60 + } 61 + queue.Enqueue(event) 62 + } 63 + }() 64 + 65 + <-ctx.Done() 66 + log.Printf("shutting down\n") 67 + }
+7
pkg/videostream/checkpoint.go
··· 1 + package videostream 2 + 3 + type CheckpointResults struct { 4 + Blocked int 5 + Pages int 6 + Transferred int 7 + }
+107
pkg/videostream/handler.go
··· 1 + package videostream 2 + 3 + import ( 4 + "context" 5 + "database/sql" 6 + _ "embed" 7 + "encoding/json" 8 + "fmt" 9 + "log" 10 + "time" 11 + 12 + appbsky "github.com/bluesky-social/indigo/api/bsky" 13 + jetstream "github.com/bluesky-social/jetstream/pkg/models" 14 + "github.com/edavis/bsky-feeds/pkg/feeds" 15 + _ "github.com/mattn/go-sqlite3" 16 + ) 17 + 18 + //go:embed schema.sql 19 + var ddl string 20 + 21 + func Handler(ctx context.Context, events *Queue, dbCnx *sql.DB) { 22 + var ( 23 + dbTx *sql.Tx 24 + err error 25 + eventCount int 26 + ) 27 + 28 + if _, err = dbCnx.ExecContext(ctx, ddl); err != nil { 29 + log.Printf("could not create tables: %v\n", err) 30 + } 31 + if _, err = dbCnx.ExecContext(ctx, `PRAGMA wal_autocheckpoint = 0`); err != nil { 32 + log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err) 33 + } 34 + 35 + for { 36 + select { 37 + case <-ctx.Done(): 38 + return 39 + default: 40 + } 41 + 42 + event, ok := events.Dequeue() 43 + if !ok { 44 + time.Sleep(100 * time.Millisecond) 45 + continue 46 + } 47 + 48 + if dbTx == nil { 49 + dbTx, err = dbCnx.BeginTx(ctx, nil) 50 + if err != nil { 51 + log.Printf("failed to begin transaction: %v\n", err) 52 + } 53 + } 54 + 55 + if event.Kind != jetstream.EventKindCommit { 56 + continue 57 + } 58 + 59 + if event.Commit.Operation != jetstream.CommitOperationCreate { 60 + continue 61 + } 62 + 63 + commit := *event.Commit 64 + var post appbsky.FeedPost 65 + if err = json.Unmarshal(commit.Record, &post); err != nil { 66 + log.Printf("error parsing commit.Record: %v\n", err) 67 + continue 68 + } 69 + 70 + if post.Embed != nil && post.Embed.EmbedVideo != nil { 71 + uri := fmt.Sprintf("at://%s/%s/%s", event.Did, commit.Collection, commit.RKey) 72 + ts := feeds.SafeTimestamp(post.CreatedAt) 73 + dbTx.ExecContext(ctx, `insert or ignore into posts (uri, create_ts) values (?, ?)`, uri, ts) 74 + } else { 75 + continue 76 + } 77 + 78 + eventCount += 1 79 + if eventCount%25 == 0 { 80 + // TODO trim 81 + 82 + if err = dbTx.Commit(); err != nil { 83 + log.Printf("commit failed: %v\n", err) 84 + } 85 + 86 + var results CheckpointResults 87 + err = dbCnx.QueryRowContext(ctx, `PRAGMA wal_checkpoint(RESTART)`).Scan(&results.Blocked, &results.Pages, &results.Transferred) 88 + switch { 89 + case err != nil: 90 + log.Printf("failed checkpoint: %v\n", err) 91 + case results.Blocked == 1: 92 + log.Printf("checkpoint: blocked\n") 93 + case results.Pages == results.Transferred: 94 + log.Printf("checkpoint: %d pages transferred\n", results.Transferred) 95 + case results.Pages != results.Transferred: 96 + log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred) 97 + } 98 + 99 + dbTx, err = dbCnx.BeginTx(ctx, nil) 100 + if err != nil { 101 + log.Printf("failed to begin transaction: %v\n", err) 102 + } 103 + 104 + log.Printf("queue size: %d\n", events.Size()) 105 + } 106 + } 107 + }
+46
pkg/videostream/queue.go
··· 1 + package videostream 2 + 3 + import ( 4 + "sync" 5 + 6 + jetstream "github.com/bluesky-social/jetstream/pkg/models" 7 + ) 8 + 9 + type Queue struct { 10 + lk sync.Mutex 11 + events []jetstream.Event 12 + } 13 + 14 + func NewQueue(capacity int) *Queue { 15 + return &Queue{ 16 + events: make([]jetstream.Event, 0, capacity), 17 + } 18 + } 19 + 20 + func (q *Queue) Enqueue(event jetstream.Event) { 21 + q.lk.Lock() 22 + defer q.lk.Unlock() 23 + 24 + q.events = append(q.events, event) 25 + } 26 + 27 + func (q *Queue) Dequeue() (jetstream.Event, bool) { 28 + q.lk.Lock() 29 + defer q.lk.Unlock() 30 + 31 + if len(q.events) == 0 { 32 + var e jetstream.Event 33 + return e, false 34 + } 35 + 36 + event := q.events[0] 37 + q.events = q.events[1:] 38 + return event, true 39 + } 40 + 41 + func (q *Queue) Size() int { 42 + q.lk.Lock() 43 + defer q.lk.Unlock() 44 + 45 + return len(q.events) 46 + }
+6
pkg/videostream/schema.sql
··· 1 + create table if not exists posts ( 2 + uri text primary key, 3 + create_ts int not null 4 + ); 5 + 6 + create index if not exists ts_idx on posts(create_ts);
+16
service/videostream.service
··· 1 + [Unit] 2 + Description=videostream 3 + After=network.target syslog.target 4 + 5 + [Service] 6 + Type=simple 7 + User=ubuntu 8 + WorkingDirectory=/home/ubuntu/bsky-feeds 9 + ExecStart=/home/ubuntu/bsky-feeds/bin/videostream 10 + TimeoutSec=15 11 + Restart=always 12 + RestartSec=5 13 + StandardOutput=journal 14 + 15 + [Install] 16 + WantedBy=multi-user.target
+8 -1
pkg/mostliked/generator.go
··· 77 77 } 78 78 } 79 79 80 - rows, err := getPosts(ctx, dbCnx, langs, params.Limit, offset) 80 + rows, err := getPosts(ctx, dbCnx, langs, params.Limit-1, offset) 81 81 if err != nil { 82 82 log.Printf("error fetching rows: %v\n", err) 83 83 } ··· 85 85 var cursor string 86 86 posts := make([]*appbsky.FeedDefs_SkeletonFeedPost, 0, params.Limit) 87 87 88 + posts = append(posts, &appbsky.FeedDefs_SkeletonFeedPost{ 89 + Post: "at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.post/3ltbfk5yfbs2n", 90 + Reason: &appbsky.FeedDefs_SkeletonFeedPost_Reason{ 91 + FeedDefs_SkeletonReasonPin: &appbsky.FeedDefs_SkeletonReasonPin{}, 92 + }, 93 + }) 94 + 88 95 for _, row := range rows { 89 96 posts = append(posts, &appbsky.FeedDefs_SkeletonFeedPost{Post: row.Uri}) 90 97 }