this repo has no description

feat: backfilling

Changed files
+852 -41
clickhouse_inserter
cmd
bodega
photocopy
models
+1
Makefile
··· 13 13 .PHONY: build 14 14 build: ## Build all executables 15 15 go build -ldflags "-X main.Version=$(VERSION)" -o photocopy ./cmd/photocopy 16 + go build -o bodega ./cmd/bodega 16 17 17 18 .PHONY: run 18 19 run:
+408
backfiller.go
··· 1 + package photocopy 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "net/http" 10 + "strings" 11 + "sync" 12 + "time" 13 + 14 + atproto_repo "github.com/bluesky-social/indigo/atproto/repo" 15 + "github.com/bluesky-social/indigo/repo" 16 + "github.com/bluesky-social/indigo/util" 17 + "github.com/ipfs/go-cid" 18 + "github.com/ipld/go-car" 19 + _ "github.com/joho/godotenv/autoload" 20 + "go.uber.org/ratelimit" 21 + ) 22 + 23 + type RepoDownloader struct { 24 + clients map[string]*http.Client 25 + rateLimits map[string]ratelimit.Limiter 26 + mu sync.RWMutex 27 + p *Photocopy 28 + } 29 + 30 + func NewRepoDownloader(p *Photocopy) *RepoDownloader { 31 + return &RepoDownloader{ 32 + clients: make(map[string]*http.Client), 33 + rateLimits: make(map[string]ratelimit.Limiter), 34 + p: p, 35 + } 36 + } 37 + 38 + func (rd *RepoDownloader) getClient(service string) *http.Client { 39 + rd.mu.RLock() 40 + client, exists := rd.clients[service] 41 + rd.mu.RUnlock() 42 + 43 + if exists { 44 + return client 45 + } 46 + 47 + rd.mu.Lock() 48 + defer rd.mu.Unlock() 49 + 50 + if client, exists := rd.clients[service]; exists { 51 + return client 52 + } 53 + 54 + client = util.RobustHTTPClient() 55 + client.Timeout = 45 * time.Second 56 + rd.clients[service] = client 57 + return client 58 + } 59 + 60 + func (rd *RepoDownloader) getRateLimiter(service string) ratelimit.Limiter { 61 + if !strings.HasSuffix(service, ".bsky.network") { 62 + service = "third-party" 63 + } 64 + 65 + rd.mu.RLock() 66 + limiter, exists := rd.rateLimits[service] 67 + rd.mu.RUnlock() 68 + 69 + if exists { 70 + return limiter 71 + } 72 + 73 + rd.mu.Lock() 74 + defer rd.mu.Unlock() 75 + 76 + if limiter, exists := rd.rateLimits[service]; exists { 77 + return limiter 78 + } 79 + 80 + // 3000 per five minutes 81 + limiter = ratelimit.New(10) 82 + rd.rateLimits[service] = limiter 83 + return limiter 84 + } 85 + 86 + func (rd *RepoDownloader) downloadRepo(service, did string) ([]byte, error) { 87 + dlurl := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", service, did) 88 + 89 + req, err := http.NewRequestWithContext(context.TODO(), "GET", dlurl, nil) 90 + if err != nil { 91 + return nil, fmt.Errorf("failed to create request: %w", err) 92 + } 93 + 94 + if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") { 95 + req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey) 96 + } 97 + 98 + client := rd.getClient(service) 99 + 100 + resp, err := client.Do(req) 101 + if err != nil { 102 + return nil, fmt.Errorf("failed to download repo: %w", err) 103 + } 104 + defer resp.Body.Close() 105 + 106 + if resp.StatusCode != http.StatusOK { 107 + if resp.StatusCode == 400 { 108 + return nil, nil 109 + } 110 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 111 + } 112 + 113 + b, err := io.ReadAll(resp.Body) 114 + if err != nil { 115 + return nil, fmt.Errorf("could not read bytes from response: %w", err) 116 + } 117 + 118 + return b, nil 119 + } 120 + 121 + func (p *Photocopy) processRepo(ctx context.Context, b []byte, did string) error { 122 + bs := atproto_repo.NewTinyBlockstore() 123 + cs, err := car.NewCarReader(bytes.NewReader(b)) 124 + if err != nil { 125 + return fmt.Errorf("error opening car: %v\n", err) 126 + } 127 + 128 + currBlock, _ := cs.Next() 129 + for currBlock != nil { 130 + bs.Put(context.TODO(), currBlock) 131 + next, _ := cs.Next() 132 + currBlock = next 133 + } 134 + 135 + r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0]) 136 + if err != nil || r == nil { 137 + fmt.Printf("could not open repo: %v", err) 138 + return nil 139 + } 140 + 141 + if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error { 142 + pts := strings.Split(key, "/") 143 + nsid := pts[0] 144 + rkey := pts[1] 145 + cidStr := cid.String() 146 + b, err := bs.Get(context.TODO(), cid) 147 + if err != nil { 148 + return nil 149 + } 150 + if err := p.handleCreate(ctx, b.RawData(), time.Now().Format(time.RFC3339Nano), "unk", did, nsid, rkey, cidStr, "unk"); err != nil { 151 + return err 152 + } 153 + return nil 154 + }); err != nil { 155 + return fmt.Errorf("erorr traversing records: %v", err) 156 + } 157 + 158 + return nil 159 + } 160 + 161 + type ListReposResponse struct { 162 + Cursor string `json:"cursor"` 163 + Repos []ListReposRepo `json:"repos"` 164 + } 165 + 166 + type ListReposRepo struct { 167 + Did string `json:"did"` 168 + Head string `json:"head"` 169 + Rev string `json:"rev"` 170 + Active bool `json:"active"` 171 + Status *string `json:"status,omitempty"` 172 + } 173 + 174 + func (rd *RepoDownloader) getDidsFromService(ctx context.Context, service string) ([]ListReposRepo, error) { 175 + var cursor string 176 + var repos []ListReposRepo 177 + if service == "https://atproto.brid.gy" { 178 + return nil, nil 179 + } 180 + for { 181 + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000&cursor=%s", service, cursor), nil) 182 + if err != nil { 183 + return nil, err 184 + } 185 + 186 + if rd.p.ratelimitBypassKey != "" && strings.HasSuffix(service, ".bsky.network") { 187 + req.Header.Set("x-ratelimit-bypass", rd.p.ratelimitBypassKey) 188 + } 189 + 190 + rl := rd.getRateLimiter(service) 191 + rl.Take() 192 + 193 + cli := rd.getClient(service) 194 + resp, err := cli.Do(req) 195 + if err != nil { 196 + return nil, err 197 + } 198 + defer resp.Body.Close() 199 + 200 + if resp.StatusCode != http.StatusOK { 201 + return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode) 202 + } 203 + 204 + var reposResp ListReposResponse 205 + if err := json.NewDecoder(resp.Body).Decode(&reposResp); err != nil { 206 + return nil, fmt.Errorf("error decoding repos response: %w", err) 207 + } 208 + 209 + for _, repo := range reposResp.Repos { 210 + if repo.Status != nil { 211 + if *repo.Status == "deleted" || *repo.Status == "takendown" || *repo.Status == "deactivated" { 212 + continue 213 + } 214 + } 215 + 216 + repos = append(repos, repo) 217 + } 218 + 219 + if len(reposResp.Repos) != 1000 || reposResp.Cursor == "" { 220 + break 221 + } 222 + 223 + fmt.Printf("cursor %s service %s\n", reposResp.Cursor, service) 224 + 225 + cursor = reposResp.Cursor 226 + } 227 + 228 + return repos, nil 229 + } 230 + 231 + type ListServicesResponse struct { 232 + Cursor string `json:"cursor"` 233 + Hosts []ListServicesResponseItem `json:"hosts"` 234 + } 235 + 236 + type ListServicesResponseItem struct { 237 + Hostname string `json:"hostname"` 238 + Status string `json:"status"` 239 + } 240 + 241 + func (p *Photocopy) runBackfiller(ctx context.Context) error { 242 + startTime := time.Now() 243 + 244 + fmt.Println("querying clickhouse for dids and services...") 245 + 246 + var hostsCursor string 247 + var sevs []ListServicesResponseItem 248 + for { 249 + if hostsCursor != "" { 250 + hostsCursor = "&cursor=" + hostsCursor 251 + } 252 + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("https://relay1.us-east.bsky.network/xrpc/com.atproto.sync.listHosts?limit=1000%s", hostsCursor), nil) 253 + if err != nil { 254 + return err 255 + } 256 + 257 + resp, err := http.DefaultClient.Do(req) 258 + if err != nil { 259 + return err 260 + } 261 + defer resp.Body.Close() 262 + 263 + if resp.StatusCode != http.StatusOK { 264 + return fmt.Errorf("received non-200 response code: %d", resp.StatusCode) 265 + } 266 + 267 + var sevsResp ListServicesResponse 268 + if err := json.NewDecoder(resp.Body).Decode(&sevsResp); err != nil { 269 + return fmt.Errorf("error decoding sevs response: %w", err) 270 + } 271 + 272 + for _, sev := range sevsResp.Hosts { 273 + if sev.Status != "active" { 274 + continue 275 + } 276 + 277 + sevs = append(sevs, sev) 278 + } 279 + 280 + if len(sevsResp.Hosts) != 1000 || sevsResp.Cursor == "" { 281 + break 282 + } 283 + 284 + hostsCursor = sevsResp.Cursor 285 + } 286 + 287 + servicesDids := map[string][]string{} 288 + for _, sev := range sevs { 289 + servicesDids["https://"+sev.Hostname] = []string{} 290 + } 291 + 292 + fmt.Printf("found %d services\n", len(servicesDids)) 293 + 294 + fmt.Printf("collecting dids...\n") 295 + 296 + fmt.Printf("building download buckets...") 297 + 298 + skipped := 0 299 + downloader := NewRepoDownloader(p) 300 + serviceDids := map[string][]string{} 301 + 302 + wg := sync.WaitGroup{} 303 + mplk := sync.Mutex{} 304 + for s := range servicesDids { 305 + wg.Add(1) 306 + go func() { 307 + defer wg.Done() 308 + repos, err := downloader.getDidsFromService(context.TODO(), s) 309 + if err != nil { 310 + fmt.Printf("error getting dids for services %s: %v", s, err) 311 + return 312 + } 313 + dids := []string{} 314 + for _, r := range repos { 315 + dids = append(dids, r.Did) 316 + } 317 + mplk.Lock() 318 + defer mplk.Unlock() 319 + serviceDids[s] = dids 320 + }() 321 + } 322 + 323 + fmt.Println("getting all the repos...") 324 + wg.Wait() 325 + 326 + fmt.Printf("was able to skip %d repos\n", skipped) 327 + 328 + total := 0 329 + 330 + for service, dids := range serviceDids { 331 + if len(dids) < 100 { 332 + continue 333 + } 334 + fmt.Printf("%s: %d jobs\n", service, len(dids)) 335 + total += len(dids) 336 + } 337 + 338 + fmt.Printf("Total jobs: %d across %d services \n", total, len(serviceDids)) 339 + 340 + for _, c := range downloader.clients { 341 + c.Timeout = 10 * time.Minute 342 + } 343 + 344 + for s := range downloader.rateLimits { 345 + if p.ratelimitBypassKey != "" && strings.HasSuffix(s, ".bsky.network") { 346 + downloader.rateLimits[s] = ratelimit.New(25) 347 + } 348 + } 349 + 350 + processed := 0 351 + errored := 0 352 + var errors []error 353 + for service, dids := range serviceDids { 354 + go func() { 355 + for _, did := range dids { 356 + ratelimiter := downloader.getRateLimiter(service) 357 + ratelimiter.Take() 358 + 359 + b, err := downloader.downloadRepo(service, did) 360 + if err != nil { 361 + errored++ 362 + processed++ 363 + errors = append(errors, err) 364 + continue 365 + } 366 + 367 + go func(b []byte, did string) { 368 + if err := p.processRepo(ctx, b, did); err != nil { 369 + fmt.Printf("error processing backfill record: %v\n", err) 370 + } 371 + }(b, did) 372 + 373 + processed++ 374 + } 375 + }() 376 + } 377 + 378 + ticker := time.NewTicker(1 * time.Second) 379 + defer ticker.Stop() 380 + 381 + for range ticker.C { 382 + elapsed := time.Since(startTime) 383 + rate := float64(processed) / elapsed.Seconds() 384 + remaining := total - processed 385 + 386 + var eta string 387 + if rate > 0 { 388 + etaSeconds := float64(remaining) / rate 389 + etaDuration := time.Duration(etaSeconds * float64(time.Second)) 390 + eta = fmt.Sprintf(", ETA: %v", etaDuration.Round(time.Second)) 391 + } else { 392 + eta = ", ETA: calculating..." 393 + } 394 + 395 + for _, err := range errors { 396 + fmt.Printf("%v\n", err) 397 + } 398 + 399 + errors = nil 400 + 401 + fmt.Printf("\rProgress: %d/%d processed (%.1f%%), %d skipped, %d errors, %.1f jobs/sec%s", 402 + processed, total, float64(processed)/float64(total)*100, skipped, errored, rate, eta) 403 + } 404 + 405 + fmt.Printf("\nCompleted: %d processed, %d errors\n", processed, errored) 406 + 407 + return nil 408 + }
+21 -5
clickhouse_inserter/inserter.go
··· 11 11 "github.com/ClickHouse/clickhouse-go/v2/lib/driver" 12 12 "github.com/prometheus/client_golang/prometheus" 13 13 "github.com/prometheus/client_golang/prometheus/promauto" 14 + "go.uber.org/ratelimit" 14 15 ) 15 16 16 17 type Inserter struct { ··· 24 25 histogram *prometheus.HistogramVec 25 26 logger *slog.Logger 26 27 prefix string 28 + rateLimit ratelimit.Limiter 27 29 } 28 30 29 31 type Args struct { ··· 33 35 PrometheusCounterPrefix string 34 36 Logger *slog.Logger 35 37 Histogram *prometheus.HistogramVec 38 + RateLimit int 36 39 } 37 40 38 41 func New(ctx context.Context, args *Args) (*Inserter, error) { ··· 48 51 histogram: args.Histogram, 49 52 logger: args.Logger, 50 53 prefix: args.PrometheusCounterPrefix, 54 + } 55 + 56 + if args.RateLimit != 0 { 57 + rateLimit := ratelimit.New(args.RateLimit) 58 + inserter.rateLimit = rateLimit 51 59 } 52 60 53 61 if args.PrometheusCounterPrefix != "" { ··· 110 118 } 111 119 112 120 func (i *Inserter) sendStream(ctx context.Context, toInsert []any) { 113 - i.pendingSends.Inc() 114 - defer i.pendingSends.Dec() 121 + if i.pendingSends != nil { 122 + i.pendingSends.Inc() 123 + defer i.pendingSends.Dec() 124 + } 115 125 116 126 if i.histogram != nil { 117 127 start := time.Now() ··· 125 135 } 126 136 127 137 status := "ok" 128 - defer func() { 129 - i.insertsCounter.WithLabelValues(status).Add(float64(len(toInsert))) 130 - }() 138 + if i.insertsCounter != nil { 139 + defer func() { 140 + i.insertsCounter.WithLabelValues(status).Add(float64(len(toInsert))) 141 + }() 142 + } 131 143 132 144 batch, err := i.conn.PrepareBatch(ctx, i.query) 133 145 if err != nil { ··· 154 166 if err := batch.AppendStruct(structPtr); err != nil { 155 167 i.logger.Error("error appending to batch", "prefix", i.prefix, "error", err) 156 168 } 169 + } 170 + 171 + if i.rateLimit != nil { 172 + i.rateLimit.Take() 157 173 } 158 174 159 175 if err := batch.Send(); err != nil {
+361 -2
cmd/bodega/main.go
··· 1 1 package main 2 2 3 3 import ( 4 + "bytes" 5 + "context" 6 + "encoding/json" 7 + "fmt" 8 + "io" 9 + "log/slog" 10 + "net/http" 4 11 "os" 12 + "strings" 13 + "sync" 14 + "time" 5 15 6 16 "github.com/ClickHouse/clickhouse-go/v2" 17 + atproto_repo "github.com/bluesky-social/indigo/atproto/repo" 18 + "github.com/bluesky-social/indigo/atproto/syntax" 19 + "github.com/bluesky-social/indigo/repo" 20 + "github.com/bluesky-social/indigo/util" 21 + "github.com/haileyok/photocopy/clickhouse_inserter" 22 + "github.com/haileyok/photocopy/models" 23 + "github.com/ipfs/go-cid" 24 + "github.com/ipld/go-car" 25 + _ "github.com/joho/godotenv/autoload" 7 26 "github.com/urfave/cli/v2" 27 + "go.uber.org/ratelimit" 8 28 ) 9 29 10 30 func main() { ··· 32 52 EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_PASS"}, 33 53 Required: true, 34 54 }, 55 + &cli.BoolFlag{ 56 + Name: "debug", 57 + Value: false, 58 + }, 35 59 }, 36 60 } 37 61 38 62 app.Run(os.Args) 39 63 } 40 64 65 + type RepoDownloader struct { 66 + clients map[string]*http.Client 67 + rateLimits map[string]ratelimit.Limiter 68 + mu sync.RWMutex 69 + } 70 + 71 + func NewRepoDownloader() *RepoDownloader { 72 + return &RepoDownloader{ 73 + clients: make(map[string]*http.Client), 74 + rateLimits: make(map[string]ratelimit.Limiter), 75 + } 76 + } 77 + 78 + func (rd *RepoDownloader) getClient(service string) *http.Client { 79 + rd.mu.RLock() 80 + client, exists := rd.clients[service] 81 + rd.mu.RUnlock() 82 + 83 + if exists { 84 + return client 85 + } 86 + 87 + rd.mu.Lock() 88 + defer rd.mu.Unlock() 89 + 90 + if client, exists := rd.clients[service]; exists { 91 + return client 92 + } 93 + 94 + client = util.RobustHTTPClient() 95 + client.Timeout = 30 * time.Minute 96 + rd.clients[service] = client 97 + return client 98 + } 99 + 100 + func (rd *RepoDownloader) getRateLimiter(service string) ratelimit.Limiter { 101 + rd.mu.RLock() 102 + limiter, exists := rd.rateLimits[service] 103 + rd.mu.RUnlock() 104 + 105 + if exists { 106 + return limiter 107 + } 108 + 109 + rd.mu.Lock() 110 + defer rd.mu.Unlock() 111 + 112 + if limiter, exists := rd.rateLimits[service]; exists { 113 + return limiter 114 + } 115 + 116 + // 3000 per five minutes 117 + limiter = ratelimit.New(10) 118 + rd.rateLimits[service] = limiter 119 + return limiter 120 + } 121 + 122 + func (rd *RepoDownloader) downloadRepo(service, did string) ([]byte, error) { 123 + dlurl := fmt.Sprintf("%s/xrpc/com.atproto.sync.getRepo?did=%s", service, did) 124 + 125 + req, err := http.NewRequestWithContext(context.TODO(), "GET", dlurl, nil) 126 + if err != nil { 127 + return nil, fmt.Errorf("failed to create request: %w", err) 128 + } 129 + 130 + client := rd.getClient(service) 131 + 132 + resp, err := client.Do(req) 133 + if err != nil { 134 + return nil, fmt.Errorf("failed to download repo: %w", err) 135 + } 136 + defer resp.Body.Close() 137 + 138 + if resp.StatusCode != http.StatusOK { 139 + return nil, fmt.Errorf("unexpected status code: %d", resp.StatusCode) 140 + } 141 + 142 + b, err := io.ReadAll(resp.Body) 143 + if err != nil { 144 + return nil, fmt.Errorf("could not read bytes from response: %w", err) 145 + } 146 + 147 + return b, nil 148 + } 149 + 150 + func processRepo(b []byte, did string, inserter *clickhouse_inserter.Inserter) error { 151 + bs := atproto_repo.NewTinyBlockstore() 152 + cs, err := car.NewCarReader(bytes.NewReader(b)) 153 + if err != nil { 154 + return fmt.Errorf("error opening car: %v\n", err) 155 + } 156 + 157 + currBlock, _ := cs.Next() 158 + for currBlock != nil { 159 + bs.Put(context.TODO(), currBlock) 160 + next, _ := cs.Next() 161 + currBlock = next 162 + } 163 + 164 + r, err := repo.OpenRepo(context.TODO(), bs, cs.Header.Roots[0]) 165 + if err != nil || r == nil { 166 + fmt.Printf("could not open repo: %v", err) 167 + return nil 168 + } 169 + 170 + if err := r.ForEach(context.TODO(), "", func(key string, cid cid.Cid) error { 171 + pts := strings.Split(key, "/") 172 + nsid := pts[0] 173 + rkey := pts[1] 174 + cidStr := cid.String() 175 + b, err := bs.Get(context.TODO(), cid) 176 + if err != nil { 177 + return nil 178 + } 179 + 180 + var cat time.Time 181 + tid, err := syntax.ParseTID(rkey) 182 + if err != nil { 183 + cat = time.Now() 184 + } else { 185 + cat = tid.Time() 186 + } 187 + 188 + rec := models.Record{ 189 + Did: did, 190 + Rkey: rkey, 191 + Collection: nsid, 192 + Cid: cidStr, 193 + Seq: "", 194 + Raw: string(b.RawData()), 195 + CreatedAt: cat, 196 + } 197 + 198 + inserter.Insert(context.TODO(), rec) 199 + 200 + return nil 201 + }); err != nil { 202 + return fmt.Errorf("erorr traversing records: %v", err) 203 + } 204 + 205 + return nil 206 + } 207 + 208 + type ListReposResponse struct { 209 + Cursor string `json:"cursor"` 210 + Repos []ListReposRepo `json:"repos"` 211 + } 212 + 213 + type ListReposRepo struct { 214 + Did string `json:"did"` 215 + Head string `json:"head"` 216 + Rev string `json:"rev"` 217 + Active bool `json:"active"` 218 + Status *string `json:"status,omitempty"` 219 + } 220 + 221 + func (rd *RepoDownloader) getDidsFromService(ctx context.Context, service string) ([]ListReposRepo, error) { 222 + var cursor string 223 + var repos []ListReposRepo 224 + for { 225 + req, err := http.NewRequestWithContext(ctx, "GET", fmt.Sprintf("%s/xrpc/com.atproto.sync.listRepos?limit=1000&cursor=%s", service, cursor), nil) 226 + if err != nil { 227 + return nil, err 228 + } 229 + 230 + rl := rd.getRateLimiter(service) 231 + rl.Take() 232 + 233 + cli := rd.getClient(service) 234 + resp, err := cli.Do(req) 235 + if err != nil { 236 + return nil, err 237 + } 238 + defer resp.Body.Close() 239 + 240 + if resp.StatusCode != http.StatusOK { 241 + return nil, fmt.Errorf("received non-200 response code: %d", resp.StatusCode) 242 + } 243 + 244 + var reposResp ListReposResponse 245 + if err := json.NewDecoder(resp.Body).Decode(&reposResp); err != nil { 246 + return nil, fmt.Errorf("error decoding repos response: %w", err) 247 + } 248 + 249 + repos = append(repos, reposResp.Repos...) 250 + 251 + if len(reposResp.Repos) != 1000 { 252 + break 253 + } 254 + } 255 + 256 + return repos, nil 257 + } 258 + 41 259 var run = func(cmd *cli.Context) error { 260 + startTime := time.Now() 261 + 42 262 conn, err := clickhouse.Open(&clickhouse.Options{ 43 263 Addr: []string{cmd.String("clickhouse-addr")}, 44 264 Auth: clickhouse.Auth{ ··· 52 272 } 53 273 defer conn.Close() 54 274 55 - var entries []ClickhousePLCEntry 56 - if err := conn.Select(cmd.Context, &entries, "SELECT..."); err != nil { 275 + fmt.Println("querying clickhouse for dids and services...") 276 + 277 + type servicesQueryRow struct { 278 + PlcOpServices []string `ch:"plc_op_services"` 279 + } 280 + var servicesQueryRows []servicesQueryRow 281 + if err := conn.Select(cmd.Context, &servicesQueryRows, ` 282 + SELECT DISTINCT(plc_op_services) FROM default.plc WHERE arrayExists(x -> x LIKE '%.bsky.network', plc_op_services) 283 + `); err != nil { 284 + return err 285 + } 286 + 287 + servicesDids := map[string][]string{} 288 + for _, svcs := range servicesQueryRows { 289 + for _, s := range svcs.PlcOpServices { 290 + servicesDids[s] = []string{} 291 + } 292 + } 293 + 294 + fmt.Printf("found %d services\n", len(servicesDids)) 295 + 296 + fmt.Printf("getting most recent record for each did...") 297 + var records []models.Record 298 + if err := conn.Select(cmd.Context, &records, ` 299 + SELECT did, created_at 300 + FROM default.record 301 + QUALIFY row_number() OVER (PARTITION BY did ORDER BY created_at ASC) = 1 302 + `); err != nil { 303 + return err 304 + } 305 + 306 + fmt.Printf("collecting dids...\n") 307 + 308 + didCreatedAt := map[string]time.Time{} 309 + for _, r := range records { 310 + didCreatedAt[r.Did] = r.CreatedAt 311 + } 312 + 313 + inserter, err := clickhouse_inserter.New(context.TODO(), &clickhouse_inserter.Args{ 314 + BatchSize: 100000, 315 + Logger: slog.Default(), 316 + Conn: conn, 317 + Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)", 318 + RateLimit: 2, // two inserts per second in the event of massive repos 319 + }) 320 + if err != nil { 57 321 return err 58 322 } 323 + 324 + fmt.Printf("building download buckets...") 325 + 326 + skipped := 0 327 + total := 0 328 + needOlderThan, _ := time.Parse(time.DateTime, "2025-06-28 04:18:22") 329 + downloader := NewRepoDownloader() 330 + serviceDids := map[string][]string{} 331 + 332 + wg := sync.WaitGroup{} 333 + for s := range servicesDids { 334 + wg.Add(1) 335 + go func() { 336 + defer wg.Done() 337 + repos, err := downloader.getDidsFromService(context.TODO(), s) 338 + if err != nil { 339 + fmt.Printf("error getting dids for services %s: %v", s, err) 340 + return 341 + } 342 + dids := []string{} 343 + for _, r := range repos { 344 + lastRecord, exists := didCreatedAt[r.Did] 345 + if exists && lastRecord.Before(needOlderThan) { 346 + skipped++ 347 + continue 348 + } 349 + 350 + dids = append(dids, r.Did) 351 + } 352 + serviceDids[s] = dids 353 + }() 354 + } 355 + 356 + fmt.Println("getting all the repos...") 357 + wg.Wait() 358 + 359 + fmt.Printf("Total jobs: %d across %d services \n", total, len(serviceDids)) 360 + fmt.Printf("was able to skip %d repos\n", skipped) 361 + 362 + for service, dids := range serviceDids { 363 + if len(dids) < 100 { 364 + continue 365 + } 366 + fmt.Printf("%s: %d jobs\n", service, len(dids)) 367 + } 368 + 369 + processed := 0 370 + errored := 0 371 + 372 + for service, dids := range serviceDids { 373 + go func() { 374 + for _, did := range dids { 375 + ratelimiter := downloader.getRateLimiter(service) 376 + ratelimiter.Take() 377 + 378 + b, err := downloader.downloadRepo(service, did) 379 + if err != nil { 380 + errored++ 381 + processed++ 382 + continue 383 + } 384 + 385 + go func(b []byte, did string, inserter *clickhouse_inserter.Inserter) { 386 + processRepo(b, did, inserter) 387 + }(b, did, inserter) 388 + 389 + processed++ 390 + } 391 + }() 392 + } 393 + 394 + ticker := time.NewTicker(1 * time.Second) 395 + defer ticker.Stop() 396 + 397 + for range ticker.C { 398 + elapsed := time.Since(startTime) 399 + rate := float64(processed) / elapsed.Seconds() 400 + remaining := total - processed 401 + 402 + var eta string 403 + if rate > 0 { 404 + etaSeconds := float64(remaining) / rate 405 + etaDuration := time.Duration(etaSeconds * float64(time.Second)) 406 + eta = fmt.Sprintf(", ETA: %v", etaDuration.Round(time.Second)) 407 + } else { 408 + eta = ", ETA: calculating..." 409 + } 410 + 411 + fmt.Printf("\rProgress: %d/%d processed (%.1f%%), %d skipped, %d errors, %.1f jobs/sec%s", 412 + processed, total, float64(processed)/float64(total)*100, skipped, errored, rate, eta) 413 + } 414 + 415 + fmt.Printf("\nCompleted: %d processed, %d errors\n", processed, errored) 416 + 417 + inserter.Close(context.TODO()) 59 418 60 419 return nil 61 420 }
+10 -1
cmd/photocopy/main.go
··· 62 62 EnvVars: []string{"PHOTOCOPY_CLICKHOUSE_PASS"}, 63 63 Required: true, 64 64 }, 65 + &cli.StringFlag{ 66 + Name: "ratelimit-bypass-key", 67 + EnvVars: []string{"PHOTOCOPY_RATELIMIT_BYPASS_KEY"}, 68 + Required: false, 69 + }, 70 + &cli.BoolFlag{ 71 + Name: "with-backfill", 72 + }, 65 73 }, 66 74 Commands: cli.Commands{ 67 75 &cli.Command{ ··· 112 120 ClickhouseDatabase: cmd.String("clickhouse-database"), 113 121 ClickhouseUser: cmd.String("clickhouse-user"), 114 122 ClickhousePass: cmd.String("clickhouse-pass"), 123 + RatelimitBypassKey: cmd.String("ratelimit-bypass-key"), 115 124 }) 116 125 if err != nil { 117 126 panic(err) ··· 127 136 cancel() 128 137 }() 129 138 130 - if err := p.Run(ctx); err != nil { 139 + if err := p.Run(ctx, cmd.Bool("with-backfill")); err != nil { 131 140 panic(err) 132 141 } 133 142
+4 -4
go.mod
··· 3 3 go 1.24.4 4 4 5 5 require ( 6 - github.com/ClickHouse/clickhouse-go v1.5.4 7 6 github.com/ClickHouse/clickhouse-go/v2 v2.37.2 8 7 github.com/araddon/dateparse v0.0.0-20210429162001-6b43995a97de 9 8 github.com/bluesky-social/indigo v0.0.0-20250626183556-5641d3c27325 10 9 github.com/gorilla/websocket v1.5.1 11 10 github.com/ipfs/go-cid v0.5.0 11 + github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 12 12 github.com/joho/godotenv v1.5.1 13 13 github.com/prometheus/client_golang v1.22.0 14 14 github.com/urfave/cli/v2 v2.25.7 15 + go.uber.org/ratelimit v0.3.1 16 + golang.org/x/sync v0.15.0 15 17 ) 16 18 17 19 require ( 18 20 github.com/ClickHouse/ch-go v0.66.1 // indirect 19 21 github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b // indirect 20 22 github.com/andybalholm/brotli v1.1.1 // indirect 23 + github.com/benbjohnson/clock v1.3.0 // indirect 21 24 github.com/beorn7/perks v1.0.1 // indirect 22 25 github.com/carlmjohnson/versioninfo v0.22.5 // indirect 23 26 github.com/cespare/xxhash/v2 v2.3.0 // indirect 24 - github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 // indirect 25 27 github.com/cpuguy83/go-md2man/v2 v2.0.3 // indirect 26 28 github.com/felixge/httpsnoop v1.0.4 // indirect 27 29 github.com/go-faster/city v1.0.1 // indirect ··· 54 56 github.com/ipfs/go-merkledag v0.11.0 // indirect 55 57 github.com/ipfs/go-metrics-interface v0.0.1 // indirect 56 58 github.com/ipfs/go-verifcid v0.0.3 // indirect 57 - github.com/ipld/go-car v0.6.1-0.20230509095817-92d28eb23ba4 // indirect 58 59 github.com/ipld/go-codec-dagpb v1.6.0 // indirect 59 60 github.com/ipld/go-ipld-prime v0.21.0 // indirect 60 61 github.com/jackc/pgpassfile v1.0.0 // indirect ··· 102 103 golang.org/x/crypto v0.39.0 // indirect 103 104 golang.org/x/exp v0.0.0-20240719175910-8a7402abbf56 // indirect 104 105 golang.org/x/net v0.41.0 // indirect 105 - golang.org/x/sync v0.15.0 // indirect 106 106 golang.org/x/sys v0.33.0 // indirect 107 107 golang.org/x/text v0.26.0 // indirect 108 108 golang.org/x/time v0.11.0 // indirect
+2 -12
go.sum
··· 1 1 github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= 2 2 github.com/ClickHouse/ch-go v0.66.1 h1:LQHFslfVYZsISOY0dnOYOXGkOUvpv376CCm8g7W74A4= 3 3 github.com/ClickHouse/ch-go v0.66.1/go.mod h1:NEYcg3aOFv2EmTJfo4m2WF7sHB/YFbLUuIWv9iq76xY= 4 - github.com/ClickHouse/clickhouse-go v1.5.4 h1:cKjXeYLNWVJIx2J1K6H2CqyRmfwVJVY1OV1coaaFcI0= 5 - github.com/ClickHouse/clickhouse-go v1.5.4/go.mod h1:EaI/sW7Azgz9UATzd5ZdZHRUhHgv5+JMS9NSr2smCJI= 6 4 github.com/ClickHouse/clickhouse-go/v2 v2.37.2 h1:wRLNKoynvHQEN4znnVHNLaYnrqVc9sGJmGYg+GGCfto= 7 5 github.com/ClickHouse/clickhouse-go/v2 v2.37.2/go.mod h1:pH2zrBGp5Y438DMwAxXMm1neSXPPjSI7tD4MURVULw8= 8 6 github.com/RussellLuo/slidingwindow v0.0.0-20200528002341-535bb99d338b h1:5/++qT1/z812ZqBvqQt6ToRswSuPZ/B33m6xVHRzADU= ··· 20 18 github.com/beorn7/perks v1.0.1/go.mod h1:G2ZrVWU2WbWT9wwq4/hrbKbnv/1ERSJQ0ibhJ6rlkpw= 21 19 github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932 h1:mXoPYz/Ul5HYEDvkta6I8/rnYM5gSdSV2tJ6XbZuEtY= 22 20 github.com/bitly/go-hostpool v0.0.0-20171023180738-a3a6125de932/go.mod h1:NOuUCSz6Q9T7+igc/hlvDOUdtWKryOrtFyIVABv/p7k= 23 - github.com/bkaradzic/go-lz4 v1.0.0 h1:RXc4wYsyz985CkXXeX04y4VnZFGG8Rd43pRaHsOXAKk= 24 - github.com/bkaradzic/go-lz4 v1.0.0/go.mod h1:0YdlkowM3VswSROI7qDxhRvJ3sLhlFrRRwjwegp5jy4= 25 21 github.com/bluesky-social/indigo v0.0.0-20250626183556-5641d3c27325 h1:Bftt2EcoLZK2Z2m12Ih5QqbReX8j29hbf4zJU/FKzaY= 26 22 github.com/bluesky-social/indigo v0.0.0-20250626183556-5641d3c27325/go.mod h1:8FlFpF5cIq3DQG0kEHqyTkPV/5MDQoaWLcVwza5ZPJU= 27 23 github.com/bmizerany/assert v0.0.0-20160611221934-b7ed37b82869 h1:DDGfHa7BWjL4YnC6+E63dPcxHo2sUxDIu8g3QgEJdRY= ··· 30 26 github.com/carlmjohnson/versioninfo v0.22.5/go.mod h1:QT9mph3wcVfISUKd0i9sZfVrPviHuSF+cUtLjm2WSf8= 31 27 github.com/cespare/xxhash/v2 v2.3.0 h1:UL815xU9SqsFlibzuggzjXhog7bL6oX9BbNZnL2UFvs= 32 28 github.com/cespare/xxhash/v2 v2.3.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= 33 - github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58 h1:F1EaeKL/ta07PY/k9Os/UFtwERei2/XzGemhpGnBKNg= 34 - github.com/cloudflare/golz4 v0.0.0-20150217214814-ef862a3cdc58/go.mod h1:EOBUe0h4xcZ5GoxqC5SDxFQ8gwyZPKQoEzownBlhI80= 35 29 github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU= 36 30 github.com/cpuguy83/go-md2man/v2 v2.0.3 h1:qMCsGGgs+MAzDFyp9LpAe1Lqy/fY/qCovCm0qnXZOBM= 37 31 github.com/cpuguy83/go-md2man/v2 v2.0.3/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= ··· 57 51 github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= 58 52 github.com/go-redis/redis v6.15.9+incompatible h1:K0pv1D7EQUjfyoMql+r/jZqCLizCGKFlFgcHWWmHQjg= 59 53 github.com/go-redis/redis v6.15.9+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA= 60 - github.com/go-sql-driver/mysql v1.4.0/go.mod h1:zAC/RDZ24gD3HViQzih4MyKcchzm+sOG5ZlKdlhCg5w= 61 54 github.com/go-yaml/yaml v2.1.0+incompatible/go.mod h1:w2MrLa16VYP0jy6N7M5kHaCkaLENm+P+Tv+MfurjSw0= 62 55 github.com/gocql/gocql v1.7.0 h1:O+7U7/1gSN7QTEAaMEsJc1Oq2QHXvCWoF3DFK9HDHus= 63 56 github.com/gocql/gocql v1.7.0/go.mod h1:vnlvXyFZeLBF0Wy+RS8hrOdbn0UWsWtdg07XJnFxZ+4= ··· 177 170 github.com/jinzhu/inflection v1.0.0/go.mod h1:h+uFLlag+Qp1Va5pdKtLDYj+kHp5pxUVkryuEj+Srlc= 178 171 github.com/jinzhu/now v1.1.5 h1:/o9tlHleP7gOFmsnYNz3RGnqzefHA47wQpKrrdTIwXQ= 179 172 github.com/jinzhu/now v1.1.5/go.mod h1:d3SSVoowX0Lcu0IBviAWJpolVfI5UJVZZ7cO71lE/z8= 180 - github.com/jmoiron/sqlx v1.2.0/go.mod h1:1FEQNm3xlJgrMD+FBdI9+xvCksHtbpVBBw5dYhBSsks= 181 173 github.com/joho/godotenv v1.5.1 h1:7eLL/+HRGLY0ldzfGMeQkb7vMd0as4CfYvUVzLqw0N0= 182 174 github.com/joho/godotenv v1.5.1/go.mod h1:f4LDr5Voq0i2e/R5DDNOoa2zzDfwtkZa6DnEwAbqwq4= 183 175 github.com/jtolds/gls v4.20.0+incompatible h1:xdiiI2gbIgH/gLH7ADydsJ1uDOEzR8yvV7C0MuV77Wo= ··· 200 192 github.com/kr/text v0.2.0/go.mod h1:eLer722TekiGuMkidMxC/pM04lWEeraHUUmBw8l2grE= 201 193 github.com/kylelemons/godebug v1.1.0 h1:RPNrshWIDI6G2gRW9EHilWtl7Z6Sb1BR0xunSBf0SNc= 202 194 github.com/kylelemons/godebug v1.1.0/go.mod h1:9/0rRGxNHcop5bhtWyNeEfOS8JIWk580+fNqagV/RAw= 203 - github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo= 204 195 github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= 205 196 github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= 206 197 github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c= ··· 223 214 github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= 224 215 github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= 225 216 github.com/mattn/go-runewidth v0.0.10/go.mod h1:RAqKPSqVFrSLVXbA8x7dzmKdmGzieGRCM46jaSJTDAk= 226 - github.com/mattn/go-sqlite3 v1.9.0/go.mod h1:FPy6KqzDD04eiIsT53CuJW3U88zkxoIYsOqkbpncsNc= 227 217 github.com/mattn/go-sqlite3 v1.14.22 h1:2gZY6PC6kBnID23Tichd1K+Z0oS6nE/XwU+Vz/5o4kU= 228 218 github.com/mattn/go-sqlite3 v1.14.22/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= 229 219 github.com/miekg/dns v1.1.50 h1:DQUfb9uc6smULcREF09Uc+/Gd46YWqJd5DbpPE9xkcA= ··· 262 252 github.com/paulmach/protoscan v0.2.1/go.mod h1:SpcSwydNLrxUGSDvXvO0P7g7AuhJ7lcKfDlhJCDw2gY= 263 253 github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9 h1:1/WtZae0yGtPq+TI6+Tv1WTxkukpXeMlviSxvL7SRgk= 264 254 github.com/petar/GoLLRB v0.0.0-20210522233825-ae3b015fd3e9/go.mod h1:x3N5drFsm2uilKKuuYo6LdyD8vZAW55sH/9w+pbo1sw= 265 - github.com/pierrec/lz4 v2.0.5+incompatible h1:2xWsjqPFWcplujydGg4WmhC/6fZqK42wMM8aXeqhl0I= 266 - github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= 267 255 github.com/pierrec/lz4/v4 v4.1.22 h1:cKFw6uJDK+/gfw5BcDL0JL5aBsAFdsIT18eRtLj7VIU= 268 256 github.com/pierrec/lz4/v4 v4.1.22/go.mod h1:gZWDp/Ze/IJXGXf23ltt2EXimqmTUXEy0GFuRQyBid4= 269 257 github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= ··· 360 348 go.uber.org/multierr v1.6.0/go.mod h1:cdWPpRnG4AhwMwsgIHip0KRBQjJy5kYEpYjJxpXp9iU= 361 349 go.uber.org/multierr v1.11.0 h1:blXXJkSxSSfBVBlC76pxqeO+LN3aDfLQo+309xJstO0= 362 350 go.uber.org/multierr v1.11.0/go.mod h1:20+QtiLqy0Nd6FdQB9TLXag12DsQkrbs3htMFfDN80Y= 351 + go.uber.org/ratelimit v0.3.1 h1:K4qVE+byfv/B3tC+4nYWP7v/6SimcO7HzHekoMNBma0= 352 + go.uber.org/ratelimit v0.3.1/go.mod h1:6euWsTB6U/Nb3X++xEUXA8ciPJvr19Q/0h1+oDcJhRk= 363 353 go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee/go.mod h1:vJERXedbb3MVM5f9Ejo0C68/HhF8uaILCdgjnY+goOA= 364 354 go.uber.org/zap v1.16.0/go.mod h1:MA8QOfq0BHJwdXa996Y4dYkAqRKB8/1K1QMMZVaNZjQ= 365 355 go.uber.org/zap v1.19.1/go.mod h1:j3DNczoxDZroyBnOT1L/Q79cfUMGZxlv/9dzN7SM1rI=
+7 -1
handle_create.go
··· 50 50 Collection: collection, 51 51 Cid: cid, 52 52 Seq: seq, 53 - Raw: raw, 53 + Raw: string(raw), 54 54 CreatedAt: cat, 55 55 } 56 56 ··· 72 72 return err 73 73 } 74 74 75 + lang := "" 76 + if len(rec.Langs) != 0 { 77 + lang = rec.Langs[0] 78 + } 79 + 75 80 post := models.Post{ 76 81 Uri: uri, 77 82 Rkey: rkey, 78 83 CreatedAt: *cat, 79 84 IndexedAt: indexedAt, 80 85 Did: did, 86 + Lang: lang, 81 87 } 82 88 83 89 if rec.Reply != nil {
+1
models/post.go
··· 14 14 ParentDid string `ch:"parent_did"` 15 15 QuoteUri string `ch:"quote_uri"` 16 16 QuoteDid string `ch:"quote_did"` 17 + Lang string `ch:"lang"` 17 18 }
+1 -1
models/record.go
··· 8 8 Collection string `ch:"collection"` 9 9 Cid string `ch:"cid"` 10 10 Seq string `ch:"seq"` 11 - Raw []byte `ch:"raw"` 11 + Raw string `ch:"raw"` 12 12 CreatedAt time.Time `ch:"created_at"` 13 13 }
+36 -15
photocopy.go
··· 9 9 "time" 10 10 11 11 "github.com/ClickHouse/clickhouse-go/v2" 12 + "github.com/ClickHouse/clickhouse-go/v2/lib/driver" 12 13 "github.com/haileyok/photocopy/clickhouse_inserter" 13 14 "github.com/prometheus/client_golang/prometheus" 14 15 "github.com/prometheus/client_golang/prometheus/promauto" ··· 27 28 inserters *Inserters 28 29 29 30 plcScraper *PLCScraper 31 + 32 + ratelimitBypassKey string 33 + 34 + conn driver.Conn 30 35 } 31 36 32 37 type Inserters struct { ··· 48 53 ClickhouseDatabase string 49 54 ClickhouseUser string 50 55 ClickhousePass string 56 + RatelimitBypassKey string 51 57 } 52 58 53 59 func New(ctx context.Context, args *Args) (*Photocopy, error) { 54 - p := &Photocopy{ 55 - logger: args.Logger, 56 - metricsAddr: args.MetricsAddr, 57 - relayHost: args.RelayHost, 58 - wg: sync.WaitGroup{}, 59 - cursorFile: args.CursorFile, 60 - } 61 - 62 60 conn, err := clickhouse.Open(&clickhouse.Options{ 63 61 Addr: []string{args.ClickhouseAddr}, 64 62 Auth: clickhouse.Auth{ ··· 71 69 return nil, err 72 70 } 73 71 72 + p := &Photocopy{ 73 + logger: args.Logger, 74 + metricsAddr: args.MetricsAddr, 75 + relayHost: args.RelayHost, 76 + wg: sync.WaitGroup{}, 77 + cursorFile: args.CursorFile, 78 + ratelimitBypassKey: args.RatelimitBypassKey, 79 + conn: conn, 80 + } 81 + 74 82 insertionsHist := promauto.NewHistogramVec(prometheus.HistogramOpts{ 75 83 Name: "photocopy_inserts_time", 76 84 Help: "histogram of photocopy inserts", ··· 80 88 fi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 81 89 PrometheusCounterPrefix: "photocopy_follows", 82 90 Histogram: insertionsHist, 83 - BatchSize: 1000, 91 + BatchSize: 250_000, 84 92 Logger: p.logger, 85 93 Conn: conn, 86 94 Query: "INSERT INTO follow (uri, did, rkey, created_at, indexed_at, subject)", 95 + RateLimit: 3, 87 96 }) 88 97 if err != nil { 89 98 return nil, err ··· 92 101 pi, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 93 102 PrometheusCounterPrefix: "photocopy_posts", 94 103 Histogram: insertionsHist, 95 - BatchSize: 100, 104 + BatchSize: 250_000, 96 105 Logger: p.logger, 97 106 Conn: conn, 98 - Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did)", 107 + Query: "INSERT INTO post (uri, did, rkey, created_at, indexed_at, root_uri, root_did, parent_uri, parent_did, quote_uri, quote_did, lang)", 108 + RateLimit: 3, 99 109 }) 100 110 if err != nil { 101 111 return nil, err ··· 104 114 ii, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 105 115 PrometheusCounterPrefix: "photocopy_interactions", 106 116 Histogram: insertionsHist, 107 - BatchSize: 1000, 117 + BatchSize: 250_000, 108 118 Logger: p.logger, 109 119 Conn: conn, 110 120 Query: "INSERT INTO interaction (uri, did, rkey, kind, created_at, indexed_at, subject_uri, subject_did)", 121 + RateLimit: 3, 111 122 }) 112 123 if err != nil { 113 124 return nil, err ··· 116 127 ri, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 117 128 PrometheusCounterPrefix: "photocopy_records", 118 129 Histogram: insertionsHist, 119 - BatchSize: 1000, 130 + BatchSize: 250_000, 120 131 Logger: p.logger, 121 132 Conn: conn, 122 133 Query: "INSERT INTO record (did, rkey, collection, cid, seq, raw, created_at)", 134 + RateLimit: 3, 123 135 }) 124 136 if err != nil { 125 137 return nil, err ··· 128 140 di, err := clickhouse_inserter.New(ctx, &clickhouse_inserter.Args{ 129 141 PrometheusCounterPrefix: "photocopy_deletes", 130 142 Histogram: insertionsHist, 131 - BatchSize: 100, 143 + BatchSize: 250_000, 132 144 Logger: p.logger, 133 145 Conn: conn, 134 146 Query: "INSERT INTO delete (did, rkey, created_at)", 147 + RateLimit: 3, 135 148 }) 136 149 if err != nil { 137 150 return nil, err ··· 180 193 return p, nil 181 194 } 182 195 183 - func (p *Photocopy) Run(baseCtx context.Context) error { 196 + func (p *Photocopy) Run(baseCtx context.Context, withBackfill bool) error { 184 197 ctx, cancel := context.WithCancel(baseCtx) 185 198 186 199 metricsServer := http.NewServeMux() ··· 205 218 panic(fmt.Errorf("failed to start plc scraper: %w", err)) 206 219 } 207 220 }(ctx) 221 + 222 + if withBackfill { 223 + go func(ctx context.Context) { 224 + if err := p.runBackfiller(ctx); err != nil { 225 + panic(fmt.Errorf("error starting backfiller: %w", err)) 226 + } 227 + }(ctx) 228 + } 208 229 209 230 <-ctx.Done() 210 231