fork of indigo with slightly nicer lexgen
at main 7.2 kB view raw
1package main 2 3import ( 4 "context" 5 "fmt" 6 "log/slog" 7 "net/http" 8 "os" 9 "time" 10 11 "github.com/bluesky-social/indigo/atproto/identity" 12 "github.com/bluesky-social/indigo/atproto/syntax" 13 "github.com/bluesky-social/indigo/automod" 14 "github.com/bluesky-social/indigo/automod/cachestore" 15 "github.com/bluesky-social/indigo/automod/countstore" 16 "github.com/bluesky-social/indigo/automod/engine" 17 "github.com/bluesky-social/indigo/automod/flagstore" 18 "github.com/bluesky-social/indigo/automod/rules" 19 "github.com/bluesky-social/indigo/automod/setstore" 20 "github.com/bluesky-social/indigo/automod/visual" 21 "github.com/bluesky-social/indigo/util" 22 "github.com/bluesky-social/indigo/xrpc" 23 24 "github.com/prometheus/client_golang/prometheus/promhttp" 25 "github.com/redis/go-redis/v9" 26) 27 28type Server struct { 29 Engine *automod.Engine 30 RedisClient *redis.Client 31 32 logger *slog.Logger 33} 34 35type Config struct { 36 Logger *slog.Logger 37 BskyHost string 38 OzoneHost string 39 OzoneDID string 40 OzoneAdminToken string 41 PDSHost string 42 PDSAdminToken string 43 SetsFileJSON string 44 RedisURL string 45 SlackWebhookURL string 46 HiveAPIToken string 47 AbyssHost string 48 AbyssPassword string 49 RulesetName string 50 RatelimitBypass string 51 PreScreenHost string 52 PreScreenToken string 53 ReportDupePeriod time.Duration 54 QuotaModReportDay int 55 QuotaModTakedownDay int 56 QuotaModActionDay int 57 RecordEventTimeout time.Duration 58 IdentityEventTimeout time.Duration 59 OzoneEventTimeout time.Duration 60} 61 62func NewServer(dir identity.Directory, config Config) (*Server, error) { 63 logger := config.Logger 64 if logger == nil { 65 logger = slog.New(slog.NewJSONHandler(os.Stdout, &slog.HandlerOptions{ 66 Level: slog.LevelInfo, 67 })) 68 } 69 70 var ozoneClient *xrpc.Client 71 if config.OzoneAdminToken != "" && config.OzoneDID != "" { 72 ozoneClient = &xrpc.Client{ 73 Client: util.RobustHTTPClient(), 74 Host: config.OzoneHost, 75 AdminToken: &config.OzoneAdminToken, 76 Auth: &xrpc.AuthInfo{}, 77 } 78 if config.RatelimitBypass != "" { 79 ozoneClient.Headers = make(map[string]string) 80 ozoneClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass 81 } 82 od, err := syntax.ParseDID(config.OzoneDID) 83 if err != nil { 84 return nil, fmt.Errorf("ozone account DID supplied was not valid: %v", err) 85 } 86 ozoneClient.Auth.Did = od.String() 87 logger.Info("configured ozone admin client", "did", od.String(), "ozoneHost", config.OzoneHost) 88 } else { 89 logger.Info("did not configure ozone client") 90 } 91 92 var adminClient *xrpc.Client 93 if config.PDSAdminToken != "" { 94 adminClient = &xrpc.Client{ 95 Client: util.RobustHTTPClient(), 96 Host: config.PDSHost, 97 AdminToken: &config.PDSAdminToken, 98 Auth: &xrpc.AuthInfo{}, 99 } 100 if config.RatelimitBypass != "" { 101 adminClient.Headers = make(map[string]string) 102 adminClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass 103 } 104 logger.Info("configured PDS admin client", "pdsHost", config.PDSHost) 105 } else { 106 logger.Info("did not configure PDS admin client") 107 } 108 109 sets := setstore.NewMemSetStore() 110 if config.SetsFileJSON != "" { 111 if err := sets.LoadFromFileJSON(config.SetsFileJSON); err != nil { 112 return nil, fmt.Errorf("initializing in-process setstore: %v", err) 113 } else { 114 logger.Info("loaded set config from JSON", "path", config.SetsFileJSON) 115 } 116 } 117 118 var counters countstore.CountStore 119 var cache cachestore.CacheStore 120 var flags flagstore.FlagStore 121 var rdb *redis.Client 122 if config.RedisURL != "" { 123 // generic client, for cursor state 124 opt, err := redis.ParseURL(config.RedisURL) 125 if err != nil { 126 return nil, fmt.Errorf("parsing redis URL: %v", err) 127 } 128 rdb = redis.NewClient(opt) 129 // check redis connection 130 _, err = rdb.Ping(context.TODO()).Result() 131 if err != nil { 132 return nil, fmt.Errorf("redis ping failed: %v", err) 133 } 134 135 cnt, err := countstore.NewRedisCountStore(config.RedisURL) 136 if err != nil { 137 return nil, fmt.Errorf("initializing redis countstore: %v", err) 138 } 139 counters = cnt 140 141 csh, err := cachestore.NewRedisCacheStore(config.RedisURL, 6*time.Hour) 142 if err != nil { 143 return nil, fmt.Errorf("initializing redis cachestore: %v", err) 144 } 145 cache = csh 146 147 flg, err := flagstore.NewRedisFlagStore(config.RedisURL) 148 if err != nil { 149 return nil, fmt.Errorf("initializing redis flagstore: %v", err) 150 } 151 flags = flg 152 } else { 153 counters = countstore.NewMemCountStore() 154 cache = cachestore.NewMemCacheStore(5_000, 1*time.Hour) 155 flags = flagstore.NewMemFlagStore() 156 } 157 158 // IMPORTANT: reminder that these are the indigo-edition rules, not production rules 159 extraBlobRules := []automod.BlobRuleFunc{} 160 if config.HiveAPIToken != "" && config.RulesetName != "no-hive" { 161 logger.Info("configuring Hive AI image labeler") 162 hc := visual.NewHiveAIClient(config.HiveAPIToken) 163 extraBlobRules = append(extraBlobRules, hc.HiveLabelBlobRule) 164 165 if config.PreScreenHost != "" { 166 psc := visual.NewPreScreenClient(config.PreScreenHost, config.PreScreenToken) 167 hc.PreScreenClient = psc 168 } 169 } 170 171 if config.AbyssHost != "" && config.AbyssPassword != "" { 172 logger.Info("configuring abyss abusive image scanning") 173 ac := visual.NewAbyssClient(config.AbyssHost, config.AbyssPassword, config.RatelimitBypass) 174 extraBlobRules = append(extraBlobRules, ac.AbyssScanBlobRule) 175 } 176 177 var ruleset automod.RuleSet 178 switch config.RulesetName { 179 case "", "default", "no-hive": 180 ruleset = rules.DefaultRules() 181 ruleset.BlobRules = append(ruleset.BlobRules, extraBlobRules...) 182 case "no-blobs": 183 ruleset = rules.DefaultRules() 184 ruleset.BlobRules = []automod.BlobRuleFunc{} 185 case "only-blobs": 186 ruleset.BlobRules = extraBlobRules 187 default: 188 return nil, fmt.Errorf("unknown ruleset config: %s", config.RulesetName) 189 } 190 191 var notifier automod.Notifier 192 if config.SlackWebhookURL != "" { 193 notifier = &automod.SlackNotifier{ 194 SlackWebhookURL: config.SlackWebhookURL, 195 } 196 } 197 198 bskyClient := xrpc.Client{ 199 Client: util.RobustHTTPClient(), 200 Host: config.BskyHost, 201 } 202 if config.RatelimitBypass != "" { 203 bskyClient.Headers = make(map[string]string) 204 bskyClient.Headers["x-ratelimit-bypass"] = config.RatelimitBypass 205 } 206 blobClient := util.RobustHTTPClient() 207 eng := automod.Engine{ 208 Logger: logger, 209 Directory: dir, 210 Counters: counters, 211 Sets: sets, 212 Flags: flags, 213 Cache: cache, 214 Rules: ruleset, 215 Notifier: notifier, 216 BskyClient: &bskyClient, 217 OzoneClient: ozoneClient, 218 AdminClient: adminClient, 219 BlobClient: blobClient, 220 Config: engine.EngineConfig{ 221 ReportDupePeriod: config.ReportDupePeriod, 222 QuotaModReportDay: config.QuotaModReportDay, 223 QuotaModTakedownDay: config.QuotaModTakedownDay, 224 QuotaModActionDay: config.QuotaModActionDay, 225 RecordEventTimeout: config.RecordEventTimeout, 226 IdentityEventTimeout: config.IdentityEventTimeout, 227 OzoneEventTimeout: config.OzoneEventTimeout, 228 }, 229 } 230 231 s := &Server{ 232 logger: logger, 233 Engine: &eng, 234 RedisClient: rdb, 235 } 236 237 return s, nil 238} 239 240func (s *Server) RunMetrics(listen string) error { 241 http.Handle("/metrics", promhttp.Handler()) 242 return http.ListenAndServe(listen, nil) 243}