this repo has no description
at main 6.3 kB view raw
1package main 2 3import ( 4 "bytes" 5 "context" 6 "encoding/json" 7 "fmt" 8 "io" 9 "log" 10 "log/slog" 11 "net/http" 12 "os" 13 "time" 14 15 "github.com/bluesky-social/indigo/api/bsky" 16 "github.com/bluesky-social/indigo/util" 17 "github.com/bluesky-social/indigo/xrpc" 18 "github.com/bluesky-social/jetstream/pkg/client" 19 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 20 "github.com/bluesky-social/jetstream/pkg/models" 21 lru "github.com/hashicorp/golang-lru/v2/expirable" 22 _ "github.com/joho/godotenv/autoload" 23 "github.com/urfave/cli/v2" 24) 25 26const ( 27 LabelBadFaith = "bad-faith" 28 LabelOffTopic = "off-topic" 29 LabelFunny = "funny" 30) 31 32func main() { 33 app := &cli.App{ 34 Name: "dontshowmethis", 35 Action: run, 36 Flags: []cli.Flag{ 37 &cli.StringFlag{ 38 Name: "pds-url", 39 EnvVars: []string{"PDS_URL"}, 40 Required: true, 41 }, 42 &cli.StringFlag{ 43 Name: "account-handle", 44 EnvVars: []string{"ACCOUNT_HANDLE"}, 45 Required: true, 46 }, 47 &cli.StringFlag{ 48 Name: "account-password", 49 EnvVars: []string{"ACCOUNT_PASSWORD"}, 50 Required: true, 51 }, 52 &cli.StringSliceFlag{ 53 Name: "watched-ops", 54 EnvVars: []string{"WATCHED_OPS"}, 55 Required: true, 56 }, 57 &cli.StringFlag{ 58 Name: "jetstream-url", 59 EnvVars: []string{"JETSTREAM_URL"}, 60 Value: "wss://jetstream2.us-west.bsky.network/subscribe", 61 }, 62 &cli.StringFlag{ 63 Name: "labeler-url", 64 Usage: "skyware labeler event emission url", 65 Required: true, 66 EnvVars: []string{"LABELER_URL"}, 67 }, 68 &cli.StringFlag{ 69 Name: "labeler-key", 70 Usage: "skyware labeler event emission key", 71 Required: true, 72 EnvVars: []string{"LABELER_KEY"}, 73 }, 74 &cli.StringFlag{ 75 Name: "lmstudio-host", 76 Usage: "lmstudio host", 77 EnvVars: []string{"LMSTUDIO_HOST"}, 78 Required: true, 79 }, 80 }, 81 } 82 83 app.Run(os.Args) 84} 85 86type DontShowMeThis struct { 87 logger *slog.Logger 88 xrpcc *xrpc.Client 89 httpc *http.Client 90 91 watchedOps map[string]struct{} 92 93 labelerUrl string 94 labelerKey string 95 96 lmstudioc *LMStudioClient 97 98 postCache *lru.LRU[string, *bsky.FeedPost] 99} 100 101var run = func(cmd *cli.Context) error { 102 opt := struct { 103 PdsUrl string 104 JetstreamUrl string 105 AccountHandle string 106 AccountPassword string 107 WatchedOps []string 108 LabelerUrl string 109 LabelerKey string 110 LmstudioHost string 111 }{ 112 PdsUrl: cmd.String("pds-url"), 113 JetstreamUrl: cmd.String("jetstream-url"), 114 AccountHandle: cmd.String("account-handle"), 115 AccountPassword: cmd.String("account-password"), 116 WatchedOps: cmd.StringSlice("watched-ops"), 117 LabelerUrl: cmd.String("labeler-url"), 118 LabelerKey: cmd.String("labeler-key"), 119 LmstudioHost: cmd.String("lmstudio-host"), 120 } 121 122 logger := slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ 123 Level: slog.LevelInfo, 124 })) 125 126 watchedOps := make(map[string]struct{}, len(opt.WatchedOps)) 127 for _, op := range opt.WatchedOps { 128 watchedOps[op] = struct{}{} 129 } 130 131 xrpcc := &xrpc.Client{ 132 Host: opt.PdsUrl, 133 // Headers: make(map[string]string), 134 // Auth: &xrpc.AuthInfo{}, 135 } 136 137 httpc := util.RobustHTTPClient() 138 139 lmstudioc := NewLMStudioClient(opt.LmstudioHost, logger) 140 141 postCache := lru.NewLRU[string, *bsky.FeedPost](100, nil, 1*time.Hour) 142 143 dsmt := &DontShowMeThis{ 144 logger: slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ 145 Level: slog.LevelInfo, 146 AddSource: true, 147 })), 148 labelerUrl: opt.LabelerUrl, 149 labelerKey: opt.LabelerKey, 150 watchedOps: watchedOps, 151 xrpcc: xrpcc, 152 httpc: httpc, 153 lmstudioc: lmstudioc, 154 postCache: postCache, 155 } 156 157 dsmt.startConsumer(cmd.String("jetstream-url")) 158 159 return nil 160} 161 162func (dsmt *DontShowMeThis) startConsumer(jetstreamUrl string) { 163 config := client.DefaultClientConfig() 164 config.WebsocketURL = jetstreamUrl 165 config.Compress = true 166 167 scheduler := sequential.NewScheduler("jetstream_localdev", dsmt.logger, dsmt.handleEvent) 168 169 c, err := client.NewClient(config, dsmt.logger, scheduler) 170 if err != nil { 171 log.Fatalf("failed to create client: %v", err) 172 } 173 174 if err := c.ConnectAndRead(context.TODO(), nil); err != nil { 175 log.Fatalf("failed to connect: %v", err) 176 } 177 178 dsmt.logger.Info("shutdown") 179} 180 181func (dsmt *DontShowMeThis) handleEvent(ctx context.Context, event *models.Event) error { 182 if event.Commit != nil && (event.Commit.Operation == models.CommitOperationCreate || event.Commit.Operation == models.CommitOperationUpdate) { 183 switch event.Commit.Collection { 184 case "app.bsky.feed.post": 185 var post bsky.FeedPost 186 if err := json.Unmarshal(event.Commit.Record, &post); err != nil { 187 return fmt.Errorf("failed to unmarshal post: %w", err) 188 } 189 190 if err := dsmt.handlePost(ctx, event, &post); err != nil { 191 dsmt.logger.Error("error handling post", "error", err) 192 } 193 } 194 } 195 return nil 196} 197 198type EmitLabelRequest struct { 199 Uri string `json:"uri"` 200 Label string `json:"label"` 201} 202 203func (dsmt *DontShowMeThis) emitLabel(ctx context.Context, uri, label string) error { 204 body := &EmitLabelRequest{ 205 Uri: uri, 206 Label: label, 207 } 208 209 b, err := json.Marshal(body) 210 if err != nil { 211 return err 212 } 213 214 req, err := http.NewRequestWithContext(ctx, "POST", dsmt.labelerUrl+"/emit", bytes.NewReader(b)) 215 if err != nil { 216 return err 217 } 218 219 req.Header.Set("authorization", "Bearer "+dsmt.labelerKey) 220 req.Header.Set("content-type", "application/json") 221 222 resp, err := dsmt.httpc.Do(req) 223 if err != nil { 224 return err 225 } 226 defer resp.Body.Close() 227 228 io.Copy(io.Discard, resp.Body) 229 230 if resp.StatusCode != http.StatusOK { 231 return fmt.Errorf("received invalid status code from server: %d", resp.StatusCode) 232 } 233 234 return nil 235} 236 237func (dsmt *DontShowMeThis) getPost(ctx context.Context, uri string) (*bsky.FeedPost, error) { 238 post, ok := dsmt.postCache.Get(uri) 239 if ok { 240 return post, nil 241 } 242 243 resp, err := bsky.FeedGetPosts(ctx, dsmt.xrpcc, []string{uri}) 244 if err != nil { 245 return nil, fmt.Errorf("failed to get post: %w", err) 246 } 247 248 if resp == nil || len(resp.Posts) == 0 { 249 return nil, fmt.Errorf("failed to get posts (empty response)") 250 } 251 252 postView := resp.Posts[0] 253 post, ok = postView.Record.Val.(*bsky.FeedPost) 254 if !ok { 255 return nil, fmt.Errorf("failed to get post (invalid record)") 256 } 257 258 dsmt.postCache.Add(uri, post) 259 260 return post, nil 261}