+188
cmd/bsky-activity/main.go
+188
cmd/bsky-activity/main.go
···
1
+
package main
2
+
3
+
import (
4
+
"context"
5
+
"encoding/json"
6
+
"log"
7
+
"os"
8
+
"os/signal"
9
+
"strings"
10
+
"syscall"
11
+
12
+
appbsky "github.com/bluesky-social/indigo/api/bsky"
13
+
jetstream "github.com/bluesky-social/jetstream/pkg/models"
14
+
"github.com/gorilla/websocket"
15
+
"github.com/redis/go-redis/v9"
16
+
)
17
+
18
+
const JetstreamUrl = `wss://jetstream1.us-west.bsky.network/subscribe`
19
+
20
+
var AppBskyAllowlist = map[string]bool{
21
+
"app.bsky.actor.profile": true,
22
+
"app.bsky.feed.generator": true,
23
+
"app.bsky.feed.like": true,
24
+
"app.bsky.feed.post": true,
25
+
"app.bsky.feed.postgate": true,
26
+
"app.bsky.feed.repost": true,
27
+
"app.bsky.feed.threadgate": true,
28
+
"app.bsky.graph.block": true,
29
+
"app.bsky.graph.follow": true,
30
+
"app.bsky.graph.list": true,
31
+
"app.bsky.graph.listblock": true,
32
+
"app.bsky.graph.listitem": true,
33
+
"app.bsky.graph.starterpack": true,
34
+
"app.bsky.labeler.service": true,
35
+
"chat.bsky.actor.declaration": true,
36
+
}
37
+
38
+
var AtprotoAllowlist = map[string]bool{
39
+
"social.psky": true,
40
+
"blue.zio.atfile": true,
41
+
"com.shinolabs.pinksea": true,
42
+
"com.whtwnd": true,
43
+
"events.smokesignal": true,
44
+
"fyi.unravel": true,
45
+
"xyz.statusphere": true,
46
+
}
47
+
48
+
func trackedRecordType(collection string) bool {
49
+
for k, _ := range AppBskyAllowlist {
50
+
if collection == k {
51
+
return true
52
+
}
53
+
}
54
+
for k, _ := range AtprotoAllowlist {
55
+
if strings.HasPrefix(collection, k) {
56
+
return true
57
+
}
58
+
}
59
+
return false
60
+
}
61
+
62
+
func handler(ctx context.Context, events <-chan jetstream.Event) {
63
+
rdb := redis.NewClient(&redis.Options{
64
+
Addr: "localhost:6379",
65
+
Password: "",
66
+
DB: 0,
67
+
})
68
+
pipe := rdb.Pipeline()
69
+
var eventCount int
70
+
71
+
eventLoop:
72
+
for event := range events {
73
+
select {
74
+
case <-ctx.Done():
75
+
break eventLoop
76
+
default:
77
+
}
78
+
79
+
if event.Kind != jetstream.EventKindCommit {
80
+
continue
81
+
}
82
+
if event.Commit.Operation != jetstream.CommitOperationCreate {
83
+
continue
84
+
}
85
+
86
+
commit := *event.Commit
87
+
collection := commit.Collection
88
+
89
+
// if collection doesn't start with either allowlist, continue
90
+
if !trackedRecordType(collection) {
91
+
continue
92
+
}
93
+
94
+
// if collection starts with one of the Atproto allowlist keys, incr
95
+
for k, _ := range AtprotoAllowlist {
96
+
if strings.HasPrefix(collection, k) {
97
+
ckey := strings.ReplaceAll(collection, ".", "_")
98
+
if err := pipe.Incr(ctx, "dev.edavis.atproto.collection."+ckey); err != nil {
99
+
log.Printf("failed incrementing an atproto collection: %v\n", err)
100
+
}
101
+
}
102
+
}
103
+
104
+
// if a post with an embed, incr that $embed type
105
+
if collection == "app.bsky.feed.post" {
106
+
var post appbsky.FeedPost
107
+
if err := json.Unmarshal(commit.Record, &post); err != nil {
108
+
log.Printf("error parsing appbsky.FeedPost: %v\n", err)
109
+
}
110
+
if post.Embed != nil {
111
+
var ekey string
112
+
switch {
113
+
case post.Embed.EmbedImages != nil:
114
+
ekey = post.Embed.EmbedImages.LexiconTypeID
115
+
case post.Embed.EmbedVideo != nil:
116
+
ekey = post.Embed.EmbedVideo.LexiconTypeID
117
+
case post.Embed.EmbedExternal != nil:
118
+
ekey = post.Embed.EmbedExternal.LexiconTypeID
119
+
case post.Embed.EmbedRecord != nil:
120
+
ekey = post.Embed.EmbedRecord.LexiconTypeID
121
+
case post.Embed.EmbedRecordWithMedia != nil:
122
+
ekey = post.Embed.EmbedRecordWithMedia.LexiconTypeID
123
+
}
124
+
if ekey == "" {
125
+
continue
126
+
}
127
+
if err := pipe.Incr(ctx, "app.bsky.feed.post:embed:"+ekey).Err(); err != nil {
128
+
log.Printf("failed incrementing embed key: %v\n", err)
129
+
}
130
+
}
131
+
}
132
+
133
+
// incr the collection and ops
134
+
if err := pipe.Incr(ctx, collection).Err(); err != nil {
135
+
log.Printf("failed incrementing collection: %v\n", err)
136
+
}
137
+
138
+
if err := pipe.Incr(ctx, `dev.edavis.muninsky.ops`).Err(); err != nil {
139
+
log.Printf("failed incrementing ops: %v\n", err)
140
+
}
141
+
142
+
// add one to the count, every 500 ops execute the piepline
143
+
eventCount += 1
144
+
if eventCount%500 == 0 {
145
+
if _, err := pipe.Exec(ctx); err != nil {
146
+
log.Printf("failed to exec pipe\n")
147
+
}
148
+
}
149
+
}
150
+
}
151
+
152
+
func main() {
153
+
ctx, stop := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
154
+
defer stop()
155
+
156
+
conn, _, err := websocket.DefaultDialer.DialContext(ctx, JetstreamUrl, nil)
157
+
if err != nil {
158
+
log.Fatalf("failed to open websocket: %v\n", err)
159
+
}
160
+
defer func() {
161
+
if err := conn.Close(); err != nil {
162
+
log.Printf("failed to close websocket: %v\n", err)
163
+
}
164
+
log.Printf("websocket closed\n")
165
+
}()
166
+
167
+
jetstreamEvents := make(chan jetstream.Event)
168
+
go handler(ctx, jetstreamEvents)
169
+
170
+
log.Printf("starting up\n")
171
+
var event jetstream.Event
172
+
go func() {
173
+
for {
174
+
event = jetstream.Event{}
175
+
err := conn.ReadJSON(&event)
176
+
if err != nil {
177
+
log.Printf("ReadJSON error: %v\n", err)
178
+
stop()
179
+
break
180
+
} else {
181
+
jetstreamEvents <- event
182
+
}
183
+
}
184
+
}()
185
+
186
+
<-ctx.Done()
187
+
log.Printf("shutting down\n")
188
+
}