+7
-7
cmd/feedweb/did.go
+7
-7
cmd/feedweb/did.go
···
9
const NgrokHostname = "routinely-right-barnacle.ngrok-free.app"
10
11
type DidDocument struct {
12
-
Context []string `json:"@context"`
13
-
ID string `json:"id"`
14
Services []DidService `json:"service"`
15
}
16
17
type DidService struct {
18
-
ID string `json:"id"`
19
-
ServiceType string `json:"type"`
20
ServiceEndpoint string `json:"serviceEndpoint"`
21
}
22
23
func didDoc(c echo.Context) error {
24
doc := DidDocument{
25
Context: []string{"https://www.w3.org/ns/did/v1"},
26
-
ID: `did:web:` + NgrokHostname,
27
Services: []DidService{
28
DidService{
29
-
ID: "#bsky_fg",
30
-
ServiceType: "BskyFeedGenerator",
31
ServiceEndpoint: `https://` + NgrokHostname,
32
},
33
},
···
9
const NgrokHostname = "routinely-right-barnacle.ngrok-free.app"
10
11
type DidDocument struct {
12
+
Context []string `json:"@context"`
13
+
ID string `json:"id"`
14
Services []DidService `json:"service"`
15
}
16
17
type DidService struct {
18
+
ID string `json:"id"`
19
+
ServiceType string `json:"type"`
20
ServiceEndpoint string `json:"serviceEndpoint"`
21
}
22
23
func didDoc(c echo.Context) error {
24
doc := DidDocument{
25
Context: []string{"https://www.w3.org/ns/did/v1"},
26
+
ID: `did:web:` + NgrokHostname,
27
Services: []DidService{
28
DidService{
29
+
ID: "#bsky_fg",
30
+
ServiceType: "BskyFeedGenerator",
31
ServiceEndpoint: `https://` + NgrokHostname,
32
},
33
},
+2
cmd/mostliked/main.go
+2
cmd/mostliked/main.go
+7
pkg/mostliked/handler.go
+7
pkg/mostliked/handler.go
+1
-1
service/feedweb.service
+1
-1
service/feedweb.service
+67
cmd/videostream/main.go
+67
cmd/videostream/main.go
···
···
1
+
package main
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
"log"
7
+
"os"
8
+
"os/signal"
9
+
"syscall"
10
+
11
+
jetstream "github.com/bluesky-social/jetstream/pkg/models"
12
+
"github.com/edavis/bsky-feeds/pkg/videostream"
13
+
"github.com/gorilla/websocket"
14
+
_ "github.com/mattn/go-sqlite3"
15
+
)
16
+
17
+
const JetstreamUrl = `wss://jetstream2.us-west.bsky.network/subscribe?wantedCollections=app.bsky.feed.post`
18
+
19
+
func main() {
20
+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
21
+
defer stop()
22
+
23
+
conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil)
24
+
if err != nil {
25
+
log.Fatalf("failed to open websocket: %v\n", err)
26
+
}
27
+
defer func() {
28
+
if err := conn.Close(); err != nil {
29
+
log.Printf("failed to close websocket: %v\n", err)
30
+
}
31
+
log.Printf("websocket closed\n")
32
+
}()
33
+
34
+
dbCnx, err := sql.Open("sqlite3", "data/videostream.db?_journal=WAL&_fk=on&_timeout=5000&_sync=1&_txlock=immediate")
35
+
if err != nil {
36
+
log.Fatalf("failed to open database: %v\n", err)
37
+
}
38
+
defer func() {
39
+
if _, err := dbCnx.Exec("PRAGMA wal_checkpoint(TRUNCATE)"); err != nil {
40
+
log.Printf("error doing final WAL checkpoint: %v\n", err)
41
+
}
42
+
if err := dbCnx.Close(); err != nil {
43
+
log.Printf("failed to close db: %v\n", err)
44
+
}
45
+
log.Printf("db closed\n")
46
+
}()
47
+
48
+
queue := videostream.NewQueue(1000)
49
+
go videostream.Handler(ctx, queue, dbCnx)
50
+
51
+
log.Printf("starting up\n")
52
+
go func() {
53
+
for {
54
+
var event jetstream.Event
55
+
err := conn.ReadJSON(&event)
56
+
if err != nil {
57
+
log.Printf("ReadJSON error: %v\n", err)
58
+
stop()
59
+
break
60
+
}
61
+
queue.Enqueue(event)
62
+
}
63
+
}()
64
+
65
+
<-ctx.Done()
66
+
log.Printf("shutting down\n")
67
+
}
+7
pkg/videostream/checkpoint.go
+7
pkg/videostream/checkpoint.go
+107
pkg/videostream/handler.go
+107
pkg/videostream/handler.go
···
···
1
+
package videostream
2
+
3
+
import (
4
+
"context"
5
+
"database/sql"
6
+
_ "embed"
7
+
"encoding/json"
8
+
"fmt"
9
+
"log"
10
+
"time"
11
+
12
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
13
+
jetstream "github.com/bluesky-social/jetstream/pkg/models"
14
+
"github.com/edavis/bsky-feeds/pkg/feeds"
15
+
_ "github.com/mattn/go-sqlite3"
16
+
)
17
+
18
+
//go:embed schema.sql
19
+
var ddl string
20
+
21
+
func Handler(ctx context.Context, events *Queue, dbCnx *sql.DB) {
22
+
var (
23
+
dbTx *sql.Tx
24
+
err error
25
+
eventCount int
26
+
)
27
+
28
+
if _, err = dbCnx.ExecContext(ctx, ddl); err != nil {
29
+
log.Printf("could not create tables: %v\n", err)
30
+
}
31
+
if _, err = dbCnx.ExecContext(ctx, `PRAGMA wal_autocheckpoint = 0`); err != nil {
32
+
log.Printf("could not set PRAGMA wal_autocheckpoint: %v\n", err)
33
+
}
34
+
35
+
for {
36
+
select {
37
+
case <-ctx.Done():
38
+
return
39
+
default:
40
+
}
41
+
42
+
event, ok := events.Dequeue()
43
+
if !ok {
44
+
time.Sleep(100 * time.Millisecond)
45
+
continue
46
+
}
47
+
48
+
if dbTx == nil {
49
+
dbTx, err = dbCnx.BeginTx(ctx, nil)
50
+
if err != nil {
51
+
log.Printf("failed to begin transaction: %v\n", err)
52
+
}
53
+
}
54
+
55
+
if event.Kind != jetstream.EventKindCommit {
56
+
continue
57
+
}
58
+
59
+
if event.Commit.Operation != jetstream.CommitOperationCreate {
60
+
continue
61
+
}
62
+
63
+
commit := *event.Commit
64
+
var post appbsky.FeedPost
65
+
if err = json.Unmarshal(commit.Record, &post); err != nil {
66
+
log.Printf("error parsing commit.Record: %v\n", err)
67
+
continue
68
+
}
69
+
70
+
if post.Embed != nil && post.Embed.EmbedVideo != nil {
71
+
uri := fmt.Sprintf("at://%s/%s/%s", event.Did, commit.Collection, commit.RKey)
72
+
ts := feeds.SafeTimestamp(post.CreatedAt)
73
+
dbTx.ExecContext(ctx, `insert or ignore into posts (uri, create_ts) values (?, ?)`, uri, ts)
74
+
} else {
75
+
continue
76
+
}
77
+
78
+
eventCount += 1
79
+
if eventCount%25 == 0 {
80
+
// TODO trim
81
+
82
+
if err = dbTx.Commit(); err != nil {
83
+
log.Printf("commit failed: %v\n", err)
84
+
}
85
+
86
+
var results CheckpointResults
87
+
err = dbCnx.QueryRowContext(ctx, `PRAGMA wal_checkpoint(RESTART)`).Scan(&results.Blocked, &results.Pages, &results.Transferred)
88
+
switch {
89
+
case err != nil:
90
+
log.Printf("failed checkpoint: %v\n", err)
91
+
case results.Blocked == 1:
92
+
log.Printf("checkpoint: blocked\n")
93
+
case results.Pages == results.Transferred:
94
+
log.Printf("checkpoint: %d pages transferred\n", results.Transferred)
95
+
case results.Pages != results.Transferred:
96
+
log.Printf("checkpoint: %d pages, %d transferred\n", results.Pages, results.Transferred)
97
+
}
98
+
99
+
dbTx, err = dbCnx.BeginTx(ctx, nil)
100
+
if err != nil {
101
+
log.Printf("failed to begin transaction: %v\n", err)
102
+
}
103
+
104
+
log.Printf("queue size: %d\n", events.Size())
105
+
}
106
+
}
107
+
}
+46
pkg/videostream/queue.go
+46
pkg/videostream/queue.go
···
···
1
+
package videostream
2
+
3
+
import (
4
+
"sync"
5
+
6
+
jetstream "github.com/bluesky-social/jetstream/pkg/models"
7
+
)
8
+
9
+
type Queue struct {
10
+
lk sync.Mutex
11
+
events []jetstream.Event
12
+
}
13
+
14
+
func NewQueue(capacity int) *Queue {
15
+
return &Queue{
16
+
events: make([]jetstream.Event, 0, capacity),
17
+
}
18
+
}
19
+
20
+
func (q *Queue) Enqueue(event jetstream.Event) {
21
+
q.lk.Lock()
22
+
defer q.lk.Unlock()
23
+
24
+
q.events = append(q.events, event)
25
+
}
26
+
27
+
func (q *Queue) Dequeue() (jetstream.Event, bool) {
28
+
q.lk.Lock()
29
+
defer q.lk.Unlock()
30
+
31
+
if len(q.events) == 0 {
32
+
var e jetstream.Event
33
+
return e, false
34
+
}
35
+
36
+
event := q.events[0]
37
+
q.events = q.events[1:]
38
+
return event, true
39
+
}
40
+
41
+
func (q *Queue) Size() int {
42
+
q.lk.Lock()
43
+
defer q.lk.Unlock()
44
+
45
+
return len(q.events)
46
+
}
+6
pkg/videostream/schema.sql
+6
pkg/videostream/schema.sql
+16
service/videostream.service
+16
service/videostream.service
···
···
1
+
[Unit]
2
+
Description=videostream
3
+
After=network.target syslog.target
4
+
5
+
[Service]
6
+
Type=simple
7
+
User=ubuntu
8
+
WorkingDirectory=/home/ubuntu/bsky-feeds
9
+
ExecStart=/home/ubuntu/bsky-feeds/bin/videostream
10
+
TimeoutSec=15
11
+
Restart=always
12
+
RestartSec=5
13
+
StandardOutput=journal
14
+
15
+
[Install]
16
+
WantedBy=multi-user.target
+8
-1
pkg/mostliked/generator.go
+8
-1
pkg/mostliked/generator.go
···
77
}
78
}
79
80
-
rows, err := getPosts(ctx, dbCnx, langs, params.Limit, offset)
81
if err != nil {
82
log.Printf("error fetching rows: %v\n", err)
83
}
···
85
var cursor string
86
posts := make([]*appbsky.FeedDefs_SkeletonFeedPost, 0, params.Limit)
87
88
for _, row := range rows {
89
posts = append(posts, &appbsky.FeedDefs_SkeletonFeedPost{Post: row.Uri})
90
}
···
77
}
78
}
79
80
+
rows, err := getPosts(ctx, dbCnx, langs, params.Limit-1, offset)
81
if err != nil {
82
log.Printf("error fetching rows: %v\n", err)
83
}
···
85
var cursor string
86
posts := make([]*appbsky.FeedDefs_SkeletonFeedPost, 0, params.Limit)
87
88
+
posts = append(posts, &appbsky.FeedDefs_SkeletonFeedPost{
89
+
Post: "at://did:plc:4nsduwlpivpuur4mqkbfvm6a/app.bsky.feed.post/3ltbfk5yfbs2n",
90
+
Reason: &appbsky.FeedDefs_SkeletonFeedPost_Reason{
91
+
FeedDefs_SkeletonReasonPin: &appbsky.FeedDefs_SkeletonReasonPin{},
92
+
},
93
+
})
94
+
95
for _, row := range rows {
96
posts = append(posts, &appbsky.FeedDefs_SkeletonFeedPost{Post: row.Uri})
97
}