porting all github actions from bluesky-social/indigo to tangled CI
at main 2.6 kB view raw
1package engine 2 3import ( 4 "fmt" 5 "io" 6 "net/http" 7 "strings" 8 "time" 9 10 "github.com/bluesky-social/indigo/atproto/data" 11 lexutil "github.com/bluesky-social/indigo/lex/util" 12 13 "github.com/carlmjohnson/versioninfo" 14) 15 16// Parses out any blobs from the enclosed record. 17// 18// NOTE: for consistency with other RecordContext methods, which don't usually return errors, maybe the error-returning version of this function should be a helper function, or defined on RecordOp, and the RecordContext version should return an empty array on error? 19func (c *RecordContext) Blobs() ([]lexutil.LexBlob, error) { 20 21 if c.RecordOp.Action == DeleteOp { 22 return []lexutil.LexBlob{}, nil 23 } 24 25 rec, err := data.UnmarshalCBOR(c.RecordOp.RecordCBOR) 26 if err != nil { 27 return nil, fmt.Errorf("parsing generic record CBOR: %v", err) 28 } 29 blobs := data.ExtractBlobs(rec) 30 31 // convert from data.Blob to lexutil.LexBlob; plan is to merge these types eventually 32 var out []lexutil.LexBlob 33 for _, b := range blobs { 34 lb := lexutil.LexBlob{ 35 Ref: lexutil.LexLink(b.Ref), 36 MimeType: b.MimeType, 37 Size: b.Size, 38 } 39 out = append(out, lb) 40 } 41 return out, nil 42} 43 44func (c *RecordContext) fetchBlob(blob lexutil.LexBlob) ([]byte, error) { 45 46 start := time.Now() 47 defer func() { 48 duration := time.Since(start) 49 blobDownloadDuration.Observe(duration.Seconds()) 50 }() 51 52 var blobBytes []byte 53 54 // TODO: potential security issue here with malformed or "localhost" PDS endpoint 55 pdsEndpoint := c.Account.Identity.PDSEndpoint() 56 xrpcURL := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsEndpoint, c.Account.Identity.DID, blob.Ref) 57 58 req, err := http.NewRequest("GET", xrpcURL, nil) 59 if err != nil { 60 return nil, err 61 } 62 63 req.Header.Set("User-Agent", "indigo-automod/"+versioninfo.Short()) 64 // TODO: more robust PDS hostname check (eg, future trailing slash or partial path) 65 if c.engine.BskyClient.Headers != nil && strings.HasSuffix(pdsEndpoint, ".bsky.network") { 66 val, ok := c.engine.BskyClient.Headers["x-ratelimit-bypass"] 67 if ok { 68 req.Header.Set("x-ratelimit-bypass", val) 69 } 70 } 71 72 client := c.engine.BlobClient 73 if client == nil { 74 client = http.DefaultClient 75 } 76 77 resp, err := client.Do(req) 78 if err != nil { 79 return nil, err 80 } 81 defer resp.Body.Close() 82 83 blobDownloadCount.WithLabelValues(fmt.Sprint(resp.StatusCode)).Inc() 84 if resp.StatusCode != 200 { 85 io.Copy(io.Discard, resp.Body) 86 return nil, fmt.Errorf("failed to fetch blob from PDS. did=%s cid=%s statusCode=%d", c.Account.Identity.DID, blob.Ref, resp.StatusCode) 87 } 88 89 blobBytes, err = io.ReadAll(resp.Body) 90 if err != nil { 91 return nil, err 92 } 93 94 return blobBytes, nil 95}