+36
-8
cmd/mostliked/main.go
+36
-8
cmd/mostliked/main.go
···
1
1
package main
2
2
3
3
import (
4
+
"database/sql"
4
5
"log"
6
+
"os"
7
+
"os/signal"
8
+
"syscall"
5
9
6
10
"github.com/edavis/bsky-feeds/pkg/mostliked"
7
11
"github.com/gorilla/websocket"
12
+
_ "github.com/mattn/go-sqlite3"
8
13
)
9
14
10
15
func main() {
···
13
18
log.Fatal("websocket connection error:", err)
14
19
}
15
20
defer conn.Close()
21
+
22
+
dbCnx, err := sql.Open("sqlite3", "data/mostliked.db?_journal=WAL&_fk=on")
23
+
if err != nil {
24
+
log.Fatal("error opening db")
25
+
}
26
+
defer dbCnx.Close()
16
27
17
28
jetstreamEvents := make(chan []byte)
18
-
go mostliked.Handler(jetstreamEvents)
29
+
go mostliked.Handler(jetstreamEvents, dbCnx)
30
+
31
+
signalChan := make(chan os.Signal, 1)
32
+
cleanupDone := make(chan struct{})
33
+
signal.Notify(signalChan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
34
+
35
+
go func(conn *websocket.Conn, dbCnx *sql.DB, jetstreamEvents chan []byte) {
36
+
<-signalChan
37
+
log.Println("shutting down...")
38
+
close(jetstreamEvents)
39
+
dbCnx.Close()
40
+
conn.Close()
41
+
close(cleanupDone)
42
+
}(conn, dbCnx, jetstreamEvents)
19
43
20
44
log.Println("starting up")
21
-
for {
22
-
_, message, err := conn.ReadMessage()
23
-
if err != nil {
24
-
log.Println("error reading from websocket:", err)
25
-
break
45
+
go func(conn *websocket.Conn) {
46
+
for {
47
+
_, message, err := conn.ReadMessage()
48
+
if err != nil {
49
+
log.Println("error reading from websocket:", err)
50
+
break
51
+
}
52
+
jetstreamEvents <- message
26
53
}
27
-
jetstreamEvents <- message
28
-
}
54
+
}(conn)
55
+
56
+
<-cleanupDone
29
57
}
+1
-7
pkg/mostliked/handler.go
+1
-7
pkg/mostliked/handler.go
···
61
61
return ""
62
62
}
63
63
64
-
func Handler(events <-chan []byte) {
64
+
func Handler(events <-chan []byte, dbCnx *sql.DB) {
65
65
ctx := context.Background()
66
66
67
-
dbCnx, err := sql.Open("sqlite3", "data/mostliked.db?_journal=WAL&_fk=on")
68
-
if err != nil {
69
-
log.Fatal("error opening db")
70
-
}
71
67
if _, err := dbCnx.ExecContext(ctx, ddl); err != nil {
72
68
log.Fatal("couldn't create tables")
73
69
}
74
-
defer dbCnx.Close()
75
-
76
70
queries := db.New(dbCnx)
77
71
78
72
drafts := ccache.New(ccache.Configure[DraftPost]().MaxSize(50_000).GetsPerPromote(1))