tangled
alpha
login
or
join now
stream.place
/
streamplace
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
wip: generate list of cbor troublemakers
Eli Mallon
4 months ago
bbc01d2e
d23faa9a
+39
-2
1 changed file
expand all
collapse all
unified
split
pkg
atproto
sync.go
+39
-2
pkg/atproto/sync.go
···
4
"context"
5
"errors"
6
"fmt"
0
7
"reflect"
0
8
"time"
9
10
"github.com/bluesky-social/indigo/api/bsky"
···
19
lexutil "github.com/bluesky-social/indigo/lex/util"
20
)
21
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
22
func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error {
23
ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String())
24
now := time.Now()
···
33
}
34
d, err := data.UnmarshalCBOR(*recCBOR)
35
if err != nil {
36
-
return fmt.Errorf("failed to unmarhsal record CBOR: %w", err)
0
0
37
}
38
cb, err := lexutil.CborDecodeValue(*recCBOR)
39
if errors.Is(err, lexutil.ErrUnrecognizedType) {
40
log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err)
41
return nil
42
} else if err != nil {
43
-
return fmt.Errorf("failed to decode record CBOR: %w", err)
0
0
44
}
45
switch rec := cb.(type) {
46
case *bsky.GraphFollow:
···
4
"context"
5
"errors"
6
"fmt"
7
+
"os"
8
"reflect"
9
+
"sync"
10
"time"
11
12
"github.com/bluesky-social/indigo/api/bsky"
···
21
lexutil "github.com/bluesky-social/indigo/lex/util"
22
)
23
24
+
var writeMutex = sync.Mutex{}
25
+
var writeFile *os.File
26
+
var writeErrs *os.File
27
+
28
+
func writeProblematic(recCBOR *[]byte, errMsg error) {
29
+
writeMutex.Lock()
30
+
defer writeMutex.Unlock()
31
+
if writeFile == nil {
32
+
var err error
33
+
writeFile, err = os.Create("problematic.cbor")
34
+
if err != nil {
35
+
panic(err)
36
+
}
37
+
}
38
+
if writeErrs == nil {
39
+
var err error
40
+
writeErrs, err = os.Create("problematic.txt")
41
+
if err != nil {
42
+
panic(err)
43
+
}
44
+
}
45
+
_, err := writeFile.Write(*recCBOR)
46
+
if err != nil {
47
+
panic(fmt.Sprintf("failed to write to problematic.cbor: %s", err))
48
+
}
49
+
_, err = writeErrs.WriteString(errMsg.Error() + "\n")
50
+
if err != nil {
51
+
panic(fmt.Sprintf("failed to write to problematic.txt: %s", err))
52
+
}
53
+
}
54
+
55
func (atsync *ATProtoSynchronizer) handleCreateUpdate(ctx context.Context, userDID string, rkey syntax.RecordKey, recCBOR *[]byte, cid string, collection syntax.NSID, isUpdate bool, isFirstSync bool) error {
56
ctx = log.WithLogValues(ctx, "func", "handleCreateUpdate", "userDID", userDID, "rkey", rkey.String(), "cid", cid, "collection", collection.String())
57
now := time.Now()
···
66
}
67
d, err := data.UnmarshalCBOR(*recCBOR)
68
if err != nil {
69
+
ret := fmt.Errorf("failed to unmarshal record CBOR: %w aturi=%s cid=%s handle=%s", err, aturi.String(), cid, r.Handle)
70
+
go writeProblematic(recCBOR, ret)
71
+
return ret
72
}
73
cb, err := lexutil.CborDecodeValue(*recCBOR)
74
if errors.Is(err, lexutil.ErrUnrecognizedType) {
75
log.Debug(ctx, "unrecognized record type", "key", rkey.String(), "type", err)
76
return nil
77
} else if err != nil {
78
+
ret := fmt.Errorf("failed to decode record CBOR: %w aturi=%s cid=%s handle=%s", err, aturi.String(), cid, r.Handle)
79
+
go writeProblematic(recCBOR, ret)
80
+
return ret
81
}
82
switch rec := cb.(type) {
83
case *bsky.GraphFollow: