1package engine
2
3import (
4 "fmt"
5 "io"
6 "net/http"
7 "strings"
8 "time"
9
10 "github.com/bluesky-social/indigo/atproto/data"
11 lexutil "github.com/bluesky-social/indigo/lex/util"
12
13 "github.com/carlmjohnson/versioninfo"
14)
15
16// Parses out any blobs from the enclosed record.
17//
18// NOTE: for consistency with other RecordContext methods, which don't usually return errors, maybe the error-returning version of this function should be a helper function, or defined on RecordOp, and the RecordContext version should return an empty array on error?
19func (c *RecordContext) Blobs() ([]lexutil.LexBlob, error) {
20
21 if c.RecordOp.Action == DeleteOp {
22 return []lexutil.LexBlob{}, nil
23 }
24
25 rec, err := data.UnmarshalCBOR(c.RecordOp.RecordCBOR)
26 if err != nil {
27 return nil, fmt.Errorf("parsing generic record CBOR: %v", err)
28 }
29 blobs := data.ExtractBlobs(rec)
30
31 // convert from data.Blob to lexutil.LexBlob; plan is to merge these types eventually
32 var out []lexutil.LexBlob
33 for _, b := range blobs {
34 lb := lexutil.LexBlob{
35 Ref: lexutil.LexLink(b.Ref),
36 MimeType: b.MimeType,
37 Size: b.Size,
38 }
39 out = append(out, lb)
40 }
41 return out, nil
42}
43
44func (c *RecordContext) fetchBlob(blob lexutil.LexBlob) ([]byte, error) {
45
46 start := time.Now()
47 defer func() {
48 duration := time.Since(start)
49 blobDownloadDuration.Observe(duration.Seconds())
50 }()
51
52 var blobBytes []byte
53
54 // TODO: potential security issue here with malformed or "localhost" PDS endpoint
55 pdsEndpoint := c.Account.Identity.PDSEndpoint()
56 xrpcURL := fmt.Sprintf("%s/xrpc/com.atproto.sync.getBlob?did=%s&cid=%s", pdsEndpoint, c.Account.Identity.DID, blob.Ref)
57
58 req, err := http.NewRequest("GET", xrpcURL, nil)
59 if err != nil {
60 return nil, err
61 }
62
63 req.Header.Set("User-Agent", "indigo-automod/"+versioninfo.Short())
64 // TODO: more robust PDS hostname check (eg, future trailing slash or partial path)
65 if c.engine.BskyClient.Headers != nil && strings.HasSuffix(pdsEndpoint, ".bsky.network") {
66 val, ok := c.engine.BskyClient.Headers["x-ratelimit-bypass"]
67 if ok {
68 req.Header.Set("x-ratelimit-bypass", val)
69 }
70 }
71
72 client := c.engine.BlobClient
73 if client == nil {
74 client = http.DefaultClient
75 }
76
77 resp, err := client.Do(req)
78 if err != nil {
79 return nil, err
80 }
81 defer resp.Body.Close()
82
83 blobDownloadCount.WithLabelValues(fmt.Sprint(resp.StatusCode)).Inc()
84 if resp.StatusCode != 200 {
85 io.Copy(io.Discard, resp.Body)
86 return nil, fmt.Errorf("failed to fetch blob from PDS. did=%s cid=%s statusCode=%d", c.Account.Identity.DID, blob.Ref, resp.StatusCode)
87 }
88
89 blobBytes, err = io.ReadAll(resp.Body)
90 if err != nil {
91 return nil, err
92 }
93
94 return blobBytes, nil
95}