this repo has no description
1package main
2
3import (
4 "bytes"
5 "context"
6 "encoding/json"
7 "fmt"
8 "io"
9 "log"
10 "log/slog"
11 "net/http"
12 "os"
13 "time"
14
15 "github.com/bluesky-social/indigo/api/bsky"
16 "github.com/bluesky-social/indigo/util"
17 "github.com/bluesky-social/indigo/xrpc"
18 "github.com/bluesky-social/jetstream/pkg/client"
19 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
20 "github.com/bluesky-social/jetstream/pkg/models"
21 lru "github.com/hashicorp/golang-lru/v2/expirable"
22 _ "github.com/joho/godotenv/autoload"
23 "github.com/urfave/cli/v2"
24)
25
26const (
27 LabelBadFaith = "bad-faith"
28 LabelOffTopic = "off-topic"
29 LabelFunny = "funny"
30)
31
32func main() {
33 app := &cli.App{
34 Name: "dontshowmethis",
35 Action: run,
36 Flags: []cli.Flag{
37 &cli.StringFlag{
38 Name: "pds-url",
39 EnvVars: []string{"PDS_URL"},
40 Required: true,
41 },
42 &cli.StringFlag{
43 Name: "account-handle",
44 EnvVars: []string{"ACCOUNT_HANDLE"},
45 Required: true,
46 },
47 &cli.StringFlag{
48 Name: "account-password",
49 EnvVars: []string{"ACCOUNT_PASSWORD"},
50 Required: true,
51 },
52 &cli.StringSliceFlag{
53 Name: "watched-ops",
54 EnvVars: []string{"WATCHED_OPS"},
55 Required: true,
56 },
57 &cli.StringFlag{
58 Name: "jetstream-url",
59 EnvVars: []string{"JETSTREAM_URL"},
60 Value: "wss://jetstream2.us-west.bsky.network/subscribe",
61 },
62 &cli.StringFlag{
63 Name: "labeler-url",
64 Usage: "skyware labeler event emission url",
65 Required: true,
66 EnvVars: []string{"LABELER_URL"},
67 },
68 &cli.StringFlag{
69 Name: "labeler-key",
70 Usage: "skyware labeler event emission key",
71 Required: true,
72 EnvVars: []string{"LABELER_KEY"},
73 },
74 &cli.StringFlag{
75 Name: "lmstudio-host",
76 Usage: "lmstudio host",
77 EnvVars: []string{"LMSTUDIO_HOST"},
78 Required: true,
79 },
80 },
81 }
82
83 app.Run(os.Args)
84}
85
86type DontShowMeThis struct {
87 logger *slog.Logger
88 xrpcc *xrpc.Client
89 httpc *http.Client
90
91 watchedOps map[string]struct{}
92
93 labelerUrl string
94 labelerKey string
95
96 lmstudioc *LMStudioClient
97
98 postCache *lru.LRU[string, *bsky.FeedPost]
99}
100
101var run = func(cmd *cli.Context) error {
102 opt := struct {
103 PdsUrl string
104 JetstreamUrl string
105 AccountHandle string
106 AccountPassword string
107 WatchedOps []string
108 LabelerUrl string
109 LabelerKey string
110 LmstudioHost string
111 }{
112 PdsUrl: cmd.String("pds-url"),
113 JetstreamUrl: cmd.String("jetstream-url"),
114 AccountHandle: cmd.String("account-handle"),
115 AccountPassword: cmd.String("account-password"),
116 WatchedOps: cmd.StringSlice("watched-ops"),
117 LabelerUrl: cmd.String("labeler-url"),
118 LabelerKey: cmd.String("labeler-key"),
119 LmstudioHost: cmd.String("lmstudio-host"),
120 }
121
122 logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
123 Level: slog.LevelInfo,
124 }))
125
126 watchedOps := make(map[string]struct{}, len(opt.WatchedOps))
127 for _, op := range opt.WatchedOps {
128 watchedOps[op] = struct{}{}
129 }
130
131 xrpcc := &xrpc.Client{
132 Host: opt.PdsUrl,
133 // Headers: make(map[string]string),
134 // Auth: &xrpc.AuthInfo{},
135 }
136
137 httpc := util.RobustHTTPClient()
138
139 lmstudioc := NewLMStudioClient(opt.LmstudioHost, logger)
140
141 postCache := lru.NewLRU[string, *bsky.FeedPost](100, nil, 1*time.Hour)
142
143 dsmt := &DontShowMeThis{
144 logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
145 Level: slog.LevelInfo,
146 AddSource: true,
147 })),
148 labelerUrl: opt.LabelerUrl,
149 labelerKey: opt.LabelerKey,
150 watchedOps: watchedOps,
151 xrpcc: xrpcc,
152 httpc: httpc,
153 lmstudioc: lmstudioc,
154 postCache: postCache,
155 }
156
157 dsmt.startConsumer(cmd.String("jetstream-url"))
158
159 return nil
160}
161
162func (dsmt *DontShowMeThis) startConsumer(jetstreamUrl string) {
163 config := client.DefaultClientConfig()
164 config.WebsocketURL = jetstreamUrl
165 config.Compress = true
166
167 scheduler := sequential.NewScheduler("jetstream_localdev", dsmt.logger, dsmt.handleEvent)
168
169 c, err := client.NewClient(config, dsmt.logger, scheduler)
170 if err != nil {
171 log.Fatalf("failed to create client: %v", err)
172 }
173
174 if err := c.ConnectAndRead(context.TODO(), nil); err != nil {
175 log.Fatalf("failed to connect: %v", err)
176 }
177
178 dsmt.logger.Info("shutdown")
179}
180
181func (dsmt *DontShowMeThis) handleEvent(ctx context.Context, event *models.Event) error {
182 if event.Commit != nil && (event.Commit.Operation == models.CommitOperationCreate || event.Commit.Operation == models.CommitOperationUpdate) {
183 switch event.Commit.Collection {
184 case "app.bsky.feed.post":
185 var post bsky.FeedPost
186 if err := json.Unmarshal(event.Commit.Record, &post); err != nil {
187 return fmt.Errorf("failed to unmarshal post: %w", err)
188 }
189
190 if err := dsmt.handlePost(ctx, event, &post); err != nil {
191 dsmt.logger.Error("error handling post", "error", err)
192 }
193 }
194 }
195 return nil
196}
197
198type EmitLabelRequest struct {
199 Uri string `json:"uri"`
200 Label string `json:"label"`
201}
202
203func (dsmt *DontShowMeThis) emitLabel(ctx context.Context, uri, label string) error {
204 body := &EmitLabelRequest{
205 Uri: uri,
206 Label: label,
207 }
208
209 b, err := json.Marshal(body)
210 if err != nil {
211 return err
212 }
213
214 req, err := http.NewRequestWithContext(ctx, "POST", dsmt.labelerUrl+"/emit", bytes.NewReader(b))
215 if err != nil {
216 return err
217 }
218
219 req.Header.Set("authorization", "Bearer "+dsmt.labelerKey)
220 req.Header.Set("content-type", "application/json")
221
222 resp, err := dsmt.httpc.Do(req)
223 if err != nil {
224 return err
225 }
226 defer resp.Body.Close()
227
228 io.Copy(io.Discard, resp.Body)
229
230 if resp.StatusCode != http.StatusOK {
231 return fmt.Errorf("received invalid status code from server: %d", resp.StatusCode)
232 }
233
234 return nil
235}
236
237func (dsmt *DontShowMeThis) getPost(ctx context.Context, uri string) (*bsky.FeedPost, error) {
238 post, ok := dsmt.postCache.Get(uri)
239 if ok {
240 return post, nil
241 }
242
243 resp, err := bsky.FeedGetPosts(ctx, dsmt.xrpcc, []string{uri})
244 if err != nil {
245 return nil, fmt.Errorf("failed to get post: %w", err)
246 }
247
248 if resp == nil || len(resp.Posts) == 0 {
249 return nil, fmt.Errorf("failed to get posts (empty response)")
250 }
251
252 postView := resp.Posts[0]
253 post, ok = postView.Record.Val.(*bsky.FeedPost)
254 if !ok {
255 return nil, fmt.Errorf("failed to get post (invalid record)")
256 }
257
258 dsmt.postCache.Add(uri, post)
259
260 return post, nil
261}