1package main
2
3import (
4 "context"
5 "fmt"
6 "log/slog"
7 "net/http"
8 "os"
9 "time"
10
11 "github.com/bluesky-social/indigo/atproto/identity"
12 "github.com/bluesky-social/indigo/atproto/syntax"
13 "github.com/bluesky-social/indigo/automod"
14 "github.com/bluesky-social/indigo/automod/cachestore"
15 "github.com/bluesky-social/indigo/automod/countstore"
16 "github.com/bluesky-social/indigo/automod/engine"
17 "github.com/bluesky-social/indigo/automod/flagstore"
18 "github.com/bluesky-social/indigo/automod/rules"
19 "github.com/bluesky-social/indigo/automod/setstore"
20 "github.com/bluesky-social/indigo/automod/visual"
21 "github.com/bluesky-social/indigo/util"
22 "github.com/bluesky-social/indigo/xrpc"
23
24 "github.com/prometheus/client_golang/prometheus/promhttp"
25 "github.com/redis/go-redis/v9"
26)
27
28type Server struct {
29 Engine *automod.Engine
30 RedisClient *redis.Client
31
32 logger *slog.Logger
33}
34
35type Config struct {
36 Logger *slog.Logger
37 BskyHost string
38 OzoneHost string
39 OzoneDID string
40 OzoneAdminToken string
41 PDSHost string
42 PDSAdminToken string
43 SetsFileJSON string
44 RedisURL string
45 SlackWebhookURL string
46 HiveAPIToken string
47 AbyssHost string
48 AbyssPassword string
49 RulesetName string
50 RatelimitBypass string
51 PreScreenHost string
52 PreScreenToken string
53 ReportDupePeriod time.Duration
54 QuotaModReportDay int
55 QuotaModTakedownDay int
56 QuotaModActionDay int
57 RecordEventTimeout time.Duration
58 IdentityEventTimeout time.Duration
59 OzoneEventTimeout time.Duration
60}
61
62func NewServer(dir identity.Directory, config Config) (*Server, error) {
63 logger := config.Logger
64 if logger == nil {
65 logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{
66 Level: slog.LevelInfo,
67 }))
68 }
69
70 var ozoneClient *xrpc.Client
71 if config.OzoneAdminToken != "" && config.OzoneDID != "" {
72 ozoneClient = &xrpc.Client{
73 Client: util.RobustHTTPClient(),
74 Host: config.OzoneHost,
75 AdminToken: &config.OzoneAdminToken,
76 Auth: &xrpc.AuthInfo{},
77 }
78 if config.RatelimitBypass != "" {
79 ozoneClient.Headers = make(map[string]string)
80 ozoneClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass
81 }
82 od, err := syntax.ParseDID(config.OzoneDID)
83 if err != nil {
84 return nil, fmt.Errorf("ozone account DID supplied was not valid: %v", err)
85 }
86 ozoneClient.Auth.Did = od.String()
87 logger.Info("configured ozone admin client", "did", od.String(), "ozoneHost", config.OzoneHost)
88 } else {
89 logger.Info("did not configure ozone client")
90 }
91
92 var adminClient *xrpc.Client
93 if config.PDSAdminToken != "" {
94 adminClient = &xrpc.Client{
95 Client: util.RobustHTTPClient(),
96 Host: config.PDSHost,
97 AdminToken: &config.PDSAdminToken,
98 Auth: &xrpc.AuthInfo{},
99 }
100 if config.RatelimitBypass != "" {
101 adminClient.Headers = make(map[string]string)
102 adminClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass
103 }
104 logger.Info("configured PDS admin client", "pdsHost", config.PDSHost)
105 } else {
106 logger.Info("did not configure PDS admin client")
107 }
108
109 sets := setstore.NewMemSetStore()
110 if config.SetsFileJSON != "" {
111 if err := sets.LoadFromFileJSON(config.SetsFileJSON); err != nil {
112 return nil, fmt.Errorf("initializing in-process setstore: %v", err)
113 } else {
114 logger.Info("loaded set config from JSON", "path", config.SetsFileJSON)
115 }
116 }
117
118 var counters countstore.CountStore
119 var cache cachestore.CacheStore
120 var flags flagstore.FlagStore
121 var rdb *redis.Client
122 if config.RedisURL != "" {
123 // generic client, for cursor state
124 opt, err := redis.ParseURL(config.RedisURL)
125 if err != nil {
126 return nil, fmt.Errorf("parsing redis URL: %v", err)
127 }
128 rdb = redis.NewClient(opt)
129 // check redis connection
130 _, err = rdb.Ping(context.TODO()).Result()
131 if err != nil {
132 return nil, fmt.Errorf("redis ping failed: %v", err)
133 }
134
135 cnt, err := countstore.NewRedisCountStore(config.RedisURL)
136 if err != nil {
137 return nil, fmt.Errorf("initializing redis countstore: %v", err)
138 }
139 counters = cnt
140
141 csh, err := cachestore.NewRedisCacheStore(config.RedisURL, 6*time.Hour)
142 if err != nil {
143 return nil, fmt.Errorf("initializing redis cachestore: %v", err)
144 }
145 cache = csh
146
147 flg, err := flagstore.NewRedisFlagStore(config.RedisURL)
148 if err != nil {
149 return nil, fmt.Errorf("initializing redis flagstore: %v", err)
150 }
151 flags = flg
152 } else {
153 counters = countstore.NewMemCountStore()
154 cache = cachestore.NewMemCacheStore(5_000, 1*time.Hour)
155 flags = flagstore.NewMemFlagStore()
156 }
157
158 // IMPORTANT: reminder that these are the indigo-edition rules, not production rules
159 extraBlobRules := []automod.BlobRuleFunc{}
160 if config.HiveAPIToken != "" && config.RulesetName != "no-hive" {
161 logger.Info("configuring Hive AI image labeler")
162 hc := visual.NewHiveAIClient(config.HiveAPIToken)
163 extraBlobRules = append(extraBlobRules, hc.HiveLabelBlobRule)
164
165 if config.PreScreenHost != "" {
166 psc := visual.NewPreScreenClient(config.PreScreenHost, config.PreScreenToken)
167 hc.PreScreenClient = psc
168 }
169 }
170
171 if config.AbyssHost != "" && config.AbyssPassword != "" {
172 logger.Info("configuring abyss abusive image scanning")
173 ac := visual.NewAbyssClient(config.AbyssHost, config.AbyssPassword, config.RatelimitBypass)
174 extraBlobRules = append(extraBlobRules, ac.AbyssScanBlobRule)
175 }
176
177 var ruleset automod.RuleSet
178 switch config.RulesetName {
179 case "", "default", "no-hive":
180 ruleset = rules.DefaultRules()
181 ruleset.BlobRules = append(ruleset.BlobRules, extraBlobRules...)
182 case "no-blobs":
183 ruleset = rules.DefaultRules()
184 ruleset.BlobRules = []automod.BlobRuleFunc{}
185 case "only-blobs":
186 ruleset.BlobRules = extraBlobRules
187 default:
188 return nil, fmt.Errorf("unknown ruleset config: %s", config.RulesetName)
189 }
190
191 var notifier automod.Notifier
192 if config.SlackWebhookURL != "" {
193 notifier = &automod.SlackNotifier{
194 SlackWebhookURL: config.SlackWebhookURL,
195 }
196 }
197
198 bskyClient := xrpc.Client{
199 Client: util.RobustHTTPClient(),
200 Host: config.BskyHost,
201 }
202 if config.RatelimitBypass != "" {
203 bskyClient.Headers = make(map[string]string)
204 bskyClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass
205 }
206 blobClient := util.RobustHTTPClient()
207 eng := automod.Engine{
208 Logger: logger,
209 Directory: dir,
210 Counters: counters,
211 Sets: sets,
212 Flags: flags,
213 Cache: cache,
214 Rules: ruleset,
215 Notifier: notifier,
216 BskyClient: &bskyClient,
217 OzoneClient: ozoneClient,
218 AdminClient: adminClient,
219 BlobClient: blobClient,
220 Config: engine.EngineConfig{
221 ReportDupePeriod: config.ReportDupePeriod,
222 QuotaModReportDay: config.QuotaModReportDay,
223 QuotaModTakedownDay: config.QuotaModTakedownDay,
224 QuotaModActionDay: config.QuotaModActionDay,
225 RecordEventTimeout: config.RecordEventTimeout,
226 IdentityEventTimeout: config.IdentityEventTimeout,
227 OzoneEventTimeout: config.OzoneEventTimeout,
228 },
229 }
230
231 s := &Server{
232 logger: logger,
233 Engine: &eng,
234 RedisClient: rdb,
235 }
236
237 return s, nil
238}
239
240func (s *Server) RunMetrics(listen string) error {
241 http.Handle("/metrics", promhttp.Handler())
242 return http.ListenAndServe(listen, nil)
243}