-627
api/atproto/cbor_gen.go
-627
api/atproto/cbor_gen.go
···
1206
1206
1207
1207
return nil
1208
1208
}
1209
-
func (t *SyncSubscribeRepos_Handle) MarshalCBOR(w io.Writer) error {
1210
-
if t == nil {
1211
-
_, err := w.Write(cbg.CborNull)
1212
-
return err
1213
-
}
1214
1209
1215
-
cw := cbg.NewCborWriter(w)
1216
-
1217
-
if _, err := cw.Write([]byte{164}); err != nil {
1218
-
return err
1219
-
}
1220
-
1221
-
// t.Did (string) (string)
1222
-
if len("did") > 1000000 {
1223
-
return xerrors.Errorf("Value in field \"did\" was too long")
1224
-
}
1225
-
1226
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil {
1227
-
return err
1228
-
}
1229
-
if _, err := cw.WriteString(string("did")); err != nil {
1230
-
return err
1231
-
}
1232
-
1233
-
if len(t.Did) > 1000000 {
1234
-
return xerrors.Errorf("Value in field t.Did was too long")
1235
-
}
1236
-
1237
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil {
1238
-
return err
1239
-
}
1240
-
if _, err := cw.WriteString(string(t.Did)); err != nil {
1241
-
return err
1242
-
}
1243
-
1244
-
// t.Seq (int64) (int64)
1245
-
if len("seq") > 1000000 {
1246
-
return xerrors.Errorf("Value in field \"seq\" was too long")
1247
-
}
1248
-
1249
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil {
1250
-
return err
1251
-
}
1252
-
if _, err := cw.WriteString(string("seq")); err != nil {
1253
-
return err
1254
-
}
1255
-
1256
-
if t.Seq >= 0 {
1257
-
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil {
1258
-
return err
1259
-
}
1260
-
} else {
1261
-
if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil {
1262
-
return err
1263
-
}
1264
-
}
1265
-
1266
-
// t.Time (string) (string)
1267
-
if len("time") > 1000000 {
1268
-
return xerrors.Errorf("Value in field \"time\" was too long")
1269
-
}
1270
-
1271
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil {
1272
-
return err
1273
-
}
1274
-
if _, err := cw.WriteString(string("time")); err != nil {
1275
-
return err
1276
-
}
1277
-
1278
-
if len(t.Time) > 1000000 {
1279
-
return xerrors.Errorf("Value in field t.Time was too long")
1280
-
}
1281
-
1282
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil {
1283
-
return err
1284
-
}
1285
-
if _, err := cw.WriteString(string(t.Time)); err != nil {
1286
-
return err
1287
-
}
1288
-
1289
-
// t.Handle (string) (string)
1290
-
if len("handle") > 1000000 {
1291
-
return xerrors.Errorf("Value in field \"handle\" was too long")
1292
-
}
1293
-
1294
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("handle"))); err != nil {
1295
-
return err
1296
-
}
1297
-
if _, err := cw.WriteString(string("handle")); err != nil {
1298
-
return err
1299
-
}
1300
-
1301
-
if len(t.Handle) > 1000000 {
1302
-
return xerrors.Errorf("Value in field t.Handle was too long")
1303
-
}
1304
-
1305
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Handle))); err != nil {
1306
-
return err
1307
-
}
1308
-
if _, err := cw.WriteString(string(t.Handle)); err != nil {
1309
-
return err
1310
-
}
1311
-
return nil
1312
-
}
1313
-
1314
-
func (t *SyncSubscribeRepos_Handle) UnmarshalCBOR(r io.Reader) (err error) {
1315
-
*t = SyncSubscribeRepos_Handle{}
1316
-
1317
-
cr := cbg.NewCborReader(r)
1318
-
1319
-
maj, extra, err := cr.ReadHeader()
1320
-
if err != nil {
1321
-
return err
1322
-
}
1323
-
defer func() {
1324
-
if err == io.EOF {
1325
-
err = io.ErrUnexpectedEOF
1326
-
}
1327
-
}()
1328
-
1329
-
if maj != cbg.MajMap {
1330
-
return fmt.Errorf("cbor input should be of type map")
1331
-
}
1332
-
1333
-
if extra > cbg.MaxLength {
1334
-
return fmt.Errorf("SyncSubscribeRepos_Handle: map struct too large (%d)", extra)
1335
-
}
1336
-
1337
-
n := extra
1338
-
1339
-
nameBuf := make([]byte, 6)
1340
-
for i := uint64(0); i < n; i++ {
1341
-
nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000)
1342
-
if err != nil {
1343
-
return err
1344
-
}
1345
-
1346
-
if !ok {
1347
-
// Field doesn't exist on this type, so ignore it
1348
-
if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil {
1349
-
return err
1350
-
}
1351
-
continue
1352
-
}
1353
-
1354
-
switch string(nameBuf[:nameLen]) {
1355
-
// t.Did (string) (string)
1356
-
case "did":
1357
-
1358
-
{
1359
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
1360
-
if err != nil {
1361
-
return err
1362
-
}
1363
-
1364
-
t.Did = string(sval)
1365
-
}
1366
-
// t.Seq (int64) (int64)
1367
-
case "seq":
1368
-
{
1369
-
maj, extra, err := cr.ReadHeader()
1370
-
if err != nil {
1371
-
return err
1372
-
}
1373
-
var extraI int64
1374
-
switch maj {
1375
-
case cbg.MajUnsignedInt:
1376
-
extraI = int64(extra)
1377
-
if extraI < 0 {
1378
-
return fmt.Errorf("int64 positive overflow")
1379
-
}
1380
-
case cbg.MajNegativeInt:
1381
-
extraI = int64(extra)
1382
-
if extraI < 0 {
1383
-
return fmt.Errorf("int64 negative overflow")
1384
-
}
1385
-
extraI = -1 - extraI
1386
-
default:
1387
-
return fmt.Errorf("wrong type for int64 field: %d", maj)
1388
-
}
1389
-
1390
-
t.Seq = int64(extraI)
1391
-
}
1392
-
// t.Time (string) (string)
1393
-
case "time":
1394
-
1395
-
{
1396
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
1397
-
if err != nil {
1398
-
return err
1399
-
}
1400
-
1401
-
t.Time = string(sval)
1402
-
}
1403
-
// t.Handle (string) (string)
1404
-
case "handle":
1405
-
1406
-
{
1407
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
1408
-
if err != nil {
1409
-
return err
1410
-
}
1411
-
1412
-
t.Handle = string(sval)
1413
-
}
1414
-
1415
-
default:
1416
-
// Field doesn't exist on this type, so ignore it
1417
-
if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil {
1418
-
return err
1419
-
}
1420
-
}
1421
-
}
1422
-
1423
-
return nil
1424
-
}
1425
1210
func (t *SyncSubscribeRepos_Identity) MarshalCBOR(w io.Writer) error {
1426
1211
if t == nil {
1427
1212
_, err := w.Write(cbg.CborNull)
···
2094
1879
2095
1880
return nil
2096
1881
}
2097
-
func (t *SyncSubscribeRepos_Migrate) MarshalCBOR(w io.Writer) error {
2098
-
if t == nil {
2099
-
_, err := w.Write(cbg.CborNull)
2100
-
return err
2101
-
}
2102
1882
2103
-
cw := cbg.NewCborWriter(w)
2104
-
2105
-
if _, err := cw.Write([]byte{164}); err != nil {
2106
-
return err
2107
-
}
2108
-
2109
-
// t.Did (string) (string)
2110
-
if len("did") > 1000000 {
2111
-
return xerrors.Errorf("Value in field \"did\" was too long")
2112
-
}
2113
-
2114
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil {
2115
-
return err
2116
-
}
2117
-
if _, err := cw.WriteString(string("did")); err != nil {
2118
-
return err
2119
-
}
2120
-
2121
-
if len(t.Did) > 1000000 {
2122
-
return xerrors.Errorf("Value in field t.Did was too long")
2123
-
}
2124
-
2125
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil {
2126
-
return err
2127
-
}
2128
-
if _, err := cw.WriteString(string(t.Did)); err != nil {
2129
-
return err
2130
-
}
2131
-
2132
-
// t.Seq (int64) (int64)
2133
-
if len("seq") > 1000000 {
2134
-
return xerrors.Errorf("Value in field \"seq\" was too long")
2135
-
}
2136
-
2137
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil {
2138
-
return err
2139
-
}
2140
-
if _, err := cw.WriteString(string("seq")); err != nil {
2141
-
return err
2142
-
}
2143
-
2144
-
if t.Seq >= 0 {
2145
-
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil {
2146
-
return err
2147
-
}
2148
-
} else {
2149
-
if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil {
2150
-
return err
2151
-
}
2152
-
}
2153
-
2154
-
// t.Time (string) (string)
2155
-
if len("time") > 1000000 {
2156
-
return xerrors.Errorf("Value in field \"time\" was too long")
2157
-
}
2158
-
2159
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil {
2160
-
return err
2161
-
}
2162
-
if _, err := cw.WriteString(string("time")); err != nil {
2163
-
return err
2164
-
}
2165
-
2166
-
if len(t.Time) > 1000000 {
2167
-
return xerrors.Errorf("Value in field t.Time was too long")
2168
-
}
2169
-
2170
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil {
2171
-
return err
2172
-
}
2173
-
if _, err := cw.WriteString(string(t.Time)); err != nil {
2174
-
return err
2175
-
}
2176
-
2177
-
// t.MigrateTo (string) (string)
2178
-
if len("migrateTo") > 1000000 {
2179
-
return xerrors.Errorf("Value in field \"migrateTo\" was too long")
2180
-
}
2181
-
2182
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("migrateTo"))); err != nil {
2183
-
return err
2184
-
}
2185
-
if _, err := cw.WriteString(string("migrateTo")); err != nil {
2186
-
return err
2187
-
}
2188
-
2189
-
if t.MigrateTo == nil {
2190
-
if _, err := cw.Write(cbg.CborNull); err != nil {
2191
-
return err
2192
-
}
2193
-
} else {
2194
-
if len(*t.MigrateTo) > 1000000 {
2195
-
return xerrors.Errorf("Value in field t.MigrateTo was too long")
2196
-
}
2197
-
2198
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(*t.MigrateTo))); err != nil {
2199
-
return err
2200
-
}
2201
-
if _, err := cw.WriteString(string(*t.MigrateTo)); err != nil {
2202
-
return err
2203
-
}
2204
-
}
2205
-
return nil
2206
-
}
2207
-
2208
-
func (t *SyncSubscribeRepos_Migrate) UnmarshalCBOR(r io.Reader) (err error) {
2209
-
*t = SyncSubscribeRepos_Migrate{}
2210
-
2211
-
cr := cbg.NewCborReader(r)
2212
-
2213
-
maj, extra, err := cr.ReadHeader()
2214
-
if err != nil {
2215
-
return err
2216
-
}
2217
-
defer func() {
2218
-
if err == io.EOF {
2219
-
err = io.ErrUnexpectedEOF
2220
-
}
2221
-
}()
2222
-
2223
-
if maj != cbg.MajMap {
2224
-
return fmt.Errorf("cbor input should be of type map")
2225
-
}
2226
-
2227
-
if extra > cbg.MaxLength {
2228
-
return fmt.Errorf("SyncSubscribeRepos_Migrate: map struct too large (%d)", extra)
2229
-
}
2230
-
2231
-
n := extra
2232
-
2233
-
nameBuf := make([]byte, 9)
2234
-
for i := uint64(0); i < n; i++ {
2235
-
nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000)
2236
-
if err != nil {
2237
-
return err
2238
-
}
2239
-
2240
-
if !ok {
2241
-
// Field doesn't exist on this type, so ignore it
2242
-
if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil {
2243
-
return err
2244
-
}
2245
-
continue
2246
-
}
2247
-
2248
-
switch string(nameBuf[:nameLen]) {
2249
-
// t.Did (string) (string)
2250
-
case "did":
2251
-
2252
-
{
2253
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
2254
-
if err != nil {
2255
-
return err
2256
-
}
2257
-
2258
-
t.Did = string(sval)
2259
-
}
2260
-
// t.Seq (int64) (int64)
2261
-
case "seq":
2262
-
{
2263
-
maj, extra, err := cr.ReadHeader()
2264
-
if err != nil {
2265
-
return err
2266
-
}
2267
-
var extraI int64
2268
-
switch maj {
2269
-
case cbg.MajUnsignedInt:
2270
-
extraI = int64(extra)
2271
-
if extraI < 0 {
2272
-
return fmt.Errorf("int64 positive overflow")
2273
-
}
2274
-
case cbg.MajNegativeInt:
2275
-
extraI = int64(extra)
2276
-
if extraI < 0 {
2277
-
return fmt.Errorf("int64 negative overflow")
2278
-
}
2279
-
extraI = -1 - extraI
2280
-
default:
2281
-
return fmt.Errorf("wrong type for int64 field: %d", maj)
2282
-
}
2283
-
2284
-
t.Seq = int64(extraI)
2285
-
}
2286
-
// t.Time (string) (string)
2287
-
case "time":
2288
-
2289
-
{
2290
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
2291
-
if err != nil {
2292
-
return err
2293
-
}
2294
-
2295
-
t.Time = string(sval)
2296
-
}
2297
-
// t.MigrateTo (string) (string)
2298
-
case "migrateTo":
2299
-
2300
-
{
2301
-
b, err := cr.ReadByte()
2302
-
if err != nil {
2303
-
return err
2304
-
}
2305
-
if b != cbg.CborNull[0] {
2306
-
if err := cr.UnreadByte(); err != nil {
2307
-
return err
2308
-
}
2309
-
2310
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
2311
-
if err != nil {
2312
-
return err
2313
-
}
2314
-
2315
-
t.MigrateTo = (*string)(&sval)
2316
-
}
2317
-
}
2318
-
2319
-
default:
2320
-
// Field doesn't exist on this type, so ignore it
2321
-
if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil {
2322
-
return err
2323
-
}
2324
-
}
2325
-
}
2326
-
2327
-
return nil
2328
-
}
2329
1883
func (t *SyncSubscribeRepos_RepoOp) MarshalCBOR(w io.Writer) error {
2330
1884
if t == nil {
2331
1885
_, err := w.Write(cbg.CborNull)
···
2540
2094
2541
2095
return nil
2542
2096
}
2543
-
func (t *SyncSubscribeRepos_Tombstone) MarshalCBOR(w io.Writer) error {
2544
-
if t == nil {
2545
-
_, err := w.Write(cbg.CborNull)
2546
-
return err
2547
-
}
2548
2097
2549
-
cw := cbg.NewCborWriter(w)
2550
-
2551
-
if _, err := cw.Write([]byte{163}); err != nil {
2552
-
return err
2553
-
}
2554
-
2555
-
// t.Did (string) (string)
2556
-
if len("did") > 1000000 {
2557
-
return xerrors.Errorf("Value in field \"did\" was too long")
2558
-
}
2559
-
2560
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil {
2561
-
return err
2562
-
}
2563
-
if _, err := cw.WriteString(string("did")); err != nil {
2564
-
return err
2565
-
}
2566
-
2567
-
if len(t.Did) > 1000000 {
2568
-
return xerrors.Errorf("Value in field t.Did was too long")
2569
-
}
2570
-
2571
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil {
2572
-
return err
2573
-
}
2574
-
if _, err := cw.WriteString(string(t.Did)); err != nil {
2575
-
return err
2576
-
}
2577
-
2578
-
// t.Seq (int64) (int64)
2579
-
if len("seq") > 1000000 {
2580
-
return xerrors.Errorf("Value in field \"seq\" was too long")
2581
-
}
2582
-
2583
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil {
2584
-
return err
2585
-
}
2586
-
if _, err := cw.WriteString(string("seq")); err != nil {
2587
-
return err
2588
-
}
2589
-
2590
-
if t.Seq >= 0 {
2591
-
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil {
2592
-
return err
2593
-
}
2594
-
} else {
2595
-
if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil {
2596
-
return err
2597
-
}
2598
-
}
2599
-
2600
-
// t.Time (string) (string)
2601
-
if len("time") > 1000000 {
2602
-
return xerrors.Errorf("Value in field \"time\" was too long")
2603
-
}
2604
-
2605
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil {
2606
-
return err
2607
-
}
2608
-
if _, err := cw.WriteString(string("time")); err != nil {
2609
-
return err
2610
-
}
2611
-
2612
-
if len(t.Time) > 1000000 {
2613
-
return xerrors.Errorf("Value in field t.Time was too long")
2614
-
}
2615
-
2616
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil {
2617
-
return err
2618
-
}
2619
-
if _, err := cw.WriteString(string(t.Time)); err != nil {
2620
-
return err
2621
-
}
2622
-
return nil
2623
-
}
2624
-
2625
-
func (t *SyncSubscribeRepos_Tombstone) UnmarshalCBOR(r io.Reader) (err error) {
2626
-
*t = SyncSubscribeRepos_Tombstone{}
2627
-
2628
-
cr := cbg.NewCborReader(r)
2629
-
2630
-
maj, extra, err := cr.ReadHeader()
2631
-
if err != nil {
2632
-
return err
2633
-
}
2634
-
defer func() {
2635
-
if err == io.EOF {
2636
-
err = io.ErrUnexpectedEOF
2637
-
}
2638
-
}()
2639
-
2640
-
if maj != cbg.MajMap {
2641
-
return fmt.Errorf("cbor input should be of type map")
2642
-
}
2643
-
2644
-
if extra > cbg.MaxLength {
2645
-
return fmt.Errorf("SyncSubscribeRepos_Tombstone: map struct too large (%d)", extra)
2646
-
}
2647
-
2648
-
n := extra
2649
-
2650
-
nameBuf := make([]byte, 4)
2651
-
for i := uint64(0); i < n; i++ {
2652
-
nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000)
2653
-
if err != nil {
2654
-
return err
2655
-
}
2656
-
2657
-
if !ok {
2658
-
// Field doesn't exist on this type, so ignore it
2659
-
if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil {
2660
-
return err
2661
-
}
2662
-
continue
2663
-
}
2664
-
2665
-
switch string(nameBuf[:nameLen]) {
2666
-
// t.Did (string) (string)
2667
-
case "did":
2668
-
2669
-
{
2670
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
2671
-
if err != nil {
2672
-
return err
2673
-
}
2674
-
2675
-
t.Did = string(sval)
2676
-
}
2677
-
// t.Seq (int64) (int64)
2678
-
case "seq":
2679
-
{
2680
-
maj, extra, err := cr.ReadHeader()
2681
-
if err != nil {
2682
-
return err
2683
-
}
2684
-
var extraI int64
2685
-
switch maj {
2686
-
case cbg.MajUnsignedInt:
2687
-
extraI = int64(extra)
2688
-
if extraI < 0 {
2689
-
return fmt.Errorf("int64 positive overflow")
2690
-
}
2691
-
case cbg.MajNegativeInt:
2692
-
extraI = int64(extra)
2693
-
if extraI < 0 {
2694
-
return fmt.Errorf("int64 negative overflow")
2695
-
}
2696
-
extraI = -1 - extraI
2697
-
default:
2698
-
return fmt.Errorf("wrong type for int64 field: %d", maj)
2699
-
}
2700
-
2701
-
t.Seq = int64(extraI)
2702
-
}
2703
-
// t.Time (string) (string)
2704
-
case "time":
2705
-
2706
-
{
2707
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
2708
-
if err != nil {
2709
-
return err
2710
-
}
2711
-
2712
-
t.Time = string(sval)
2713
-
}
2714
-
2715
-
default:
2716
-
// Field doesn't exist on this type, so ignore it
2717
-
if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil {
2718
-
return err
2719
-
}
2720
-
}
2721
-
}
2722
-
2723
-
return nil
2724
-
}
2725
2098
func (t *LabelDefs_SelfLabels) MarshalCBOR(w io.Writer) error {
2726
2099
if t == nil {
2727
2100
_, err := w.Write(cbg.CborNull)
-29
api/atproto/syncsubscribeRepos.go
-29
api/atproto/syncsubscribeRepos.go
···
49
49
TooBig bool `json:"tooBig" cborgen:"tooBig"`
50
50
}
51
51
52
-
// SyncSubscribeRepos_Handle is a "handle" in the com.atproto.sync.subscribeRepos schema.
53
-
//
54
-
// DEPRECATED -- Use #identity event instead
55
-
type SyncSubscribeRepos_Handle struct {
56
-
Did string `json:"did" cborgen:"did"`
57
-
Handle string `json:"handle" cborgen:"handle"`
58
-
Seq int64 `json:"seq" cborgen:"seq"`
59
-
Time string `json:"time" cborgen:"time"`
60
-
}
61
-
62
52
// SyncSubscribeRepos_Identity is a "identity" in the com.atproto.sync.subscribeRepos schema.
63
53
//
64
54
// Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache.
···
76
66
Name string `json:"name" cborgen:"name"`
77
67
}
78
68
79
-
// SyncSubscribeRepos_Migrate is a "migrate" in the com.atproto.sync.subscribeRepos schema.
80
-
//
81
-
// DEPRECATED -- Use #account event instead
82
-
type SyncSubscribeRepos_Migrate struct {
83
-
Did string `json:"did" cborgen:"did"`
84
-
MigrateTo *string `json:"migrateTo" cborgen:"migrateTo"`
85
-
Seq int64 `json:"seq" cborgen:"seq"`
86
-
Time string `json:"time" cborgen:"time"`
87
-
}
88
-
89
69
// SyncSubscribeRepos_RepoOp is a "repoOp" in the com.atproto.sync.subscribeRepos schema.
90
70
//
91
71
// A repo operation, ie a mutation of a single record.
···
113
93
// time: Timestamp of when this message was originally broadcast.
114
94
Time string `json:"time" cborgen:"time"`
115
95
}
116
-
117
-
// SyncSubscribeRepos_Tombstone is a "tombstone" in the com.atproto.sync.subscribeRepos schema.
118
-
//
119
-
// DEPRECATED -- Use #account event instead
120
-
type SyncSubscribeRepos_Tombstone struct {
121
-
Did string `json:"did" cborgen:"did"`
122
-
Seq int64 `json:"seq" cborgen:"seq"`
123
-
Time string `json:"time" cborgen:"time"`
124
-
}
-1
api/ozone/moderationdefs.go
-1
api/ozone/moderationdefs.go
···
1043
1043
//
1044
1044
// Detailed view of a subject. For record subjects, the author's repo and profile will be returned.
1045
1045
type ModerationDefs_SubjectView struct {
1046
-
Profile *ModerationDefs_SubjectView_Profile `json:"profile,omitempty" cborgen:"profile,omitempty"`
1047
1046
Record *ModerationDefs_RecordViewDetail `json:"record,omitempty" cborgen:"record,omitempty"`
1048
1047
Repo *ModerationDefs_RepoViewDetail `json:"repo,omitempty" cborgen:"repo,omitempty"`
1049
1048
Status *ModerationDefs_SubjectStatusView `json:"status,omitempty" cborgen:"status,omitempty"`
-2
api/ozone/verificationdefs.go
-2
api/ozone/verificationdefs.go
···
23
23
Handle string `json:"handle" cborgen:"handle"`
24
24
// issuer: The user who issued this verification.
25
25
Issuer string `json:"issuer" cborgen:"issuer"`
26
-
IssuerProfile *VerificationDefs_VerificationView_IssuerProfile `json:"issuerProfile,omitempty" cborgen:"issuerProfile,omitempty"`
27
26
IssuerRepo *VerificationDefs_VerificationView_IssuerRepo `json:"issuerRepo,omitempty" cborgen:"issuerRepo,omitempty"`
28
27
// revokeReason: Describes the reason for revocation, also indicating that the verification is no longer valid.
29
28
RevokeReason *string `json:"revokeReason,omitempty" cborgen:"revokeReason,omitempty"`
···
33
32
RevokedBy *string `json:"revokedBy,omitempty" cborgen:"revokedBy,omitempty"`
34
33
// subject: The subject of the verification.
35
34
Subject string `json:"subject" cborgen:"subject"`
36
-
SubjectProfile *VerificationDefs_VerificationView_SubjectProfile `json:"subjectProfile,omitempty" cborgen:"subjectProfile,omitempty"`
37
35
SubjectRepo *VerificationDefs_VerificationView_SubjectRepo `json:"subjectRepo,omitempty" cborgen:"subjectRepo,omitempty"`
38
36
// uri: The AT-URI of the verification record.
39
37
Uri string `json:"uri" cborgen:"uri"`
-78
bgs/bgs.go
-78
bgs/bgs.go
···
903
903
904
904
repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc()
905
905
return nil
906
-
case env.RepoHandle != nil:
907
-
bgs.log.Info("bgs got repo handle event", "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle)
908
-
// Flush any cached DID documents for this user
909
-
bgs.didr.FlushCacheFor(env.RepoHandle.Did)
910
-
911
-
// TODO: ignoring the data in the message and just going out to the DID doc
912
-
act, err := bgs.createExternalUser(ctx, env.RepoHandle.Did)
913
-
if err != nil {
914
-
return err
915
-
}
916
-
917
-
if act.Handle.String != env.RepoHandle.Handle {
918
-
bgs.log.Warn("handle update did not update handle to asserted value", "did", env.RepoHandle.Did, "expected", env.RepoHandle.Handle, "actual", act.Handle)
919
-
}
920
-
921
-
// TODO: Update the ReposHandle event type to include "verified" or something
922
-
923
-
// Broadcast the handle update to all consumers
924
-
err = bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{
925
-
RepoHandle: &comatproto.SyncSubscribeRepos_Handle{
926
-
Did: env.RepoHandle.Did,
927
-
Handle: env.RepoHandle.Handle,
928
-
Time: env.RepoHandle.Time,
929
-
},
930
-
})
931
-
if err != nil {
932
-
bgs.log.Error("failed to broadcast RepoHandle event", "error", err, "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle)
933
-
return fmt.Errorf("failed to broadcast RepoHandle event: %w", err)
934
-
}
935
-
936
-
return nil
937
906
case env.RepoIdentity != nil:
938
907
bgs.log.Info("bgs got identity event", "did", env.RepoIdentity.Did)
939
908
// Flush any cached DID documents for this user
···
1035
1004
}
1036
1005
1037
1006
return nil
1038
-
case env.RepoMigrate != nil:
1039
-
if _, err := bgs.createExternalUser(ctx, env.RepoMigrate.Did); err != nil {
1040
-
return err
1041
-
}
1042
-
1043
-
return nil
1044
-
case env.RepoTombstone != nil:
1045
-
if err := bgs.handleRepoTombstone(ctx, host, env.RepoTombstone); err != nil {
1046
-
return err
1047
-
}
1048
-
1049
-
return nil
1050
1007
default:
1051
1008
return fmt.Errorf("invalid fed event")
1052
1009
}
1053
-
}
1054
-
1055
-
func (bgs *BGS) handleRepoTombstone(ctx context.Context, pds *models.PDS, evt *atproto.SyncSubscribeRepos_Tombstone) error {
1056
-
u, err := bgs.lookupUserByDid(ctx, evt.Did)
1057
-
if err != nil {
1058
-
return err
1059
-
}
1060
-
1061
-
if u.PDS != pds.ID {
1062
-
return fmt.Errorf("unauthoritative tombstone event from %s for %s", pds.Host, evt.Did)
1063
-
}
1064
-
1065
-
if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{
1066
-
"tombstoned": true,
1067
-
"handle": nil,
1068
-
}).Error; err != nil {
1069
-
return err
1070
-
}
1071
-
u.SetTombstoned(true)
1072
-
1073
-
if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
1074
-
"handle": nil,
1075
-
}).Error; err != nil {
1076
-
return err
1077
-
}
1078
-
1079
-
// delete data from carstore
1080
-
if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
1081
-
// don't let a failure here prevent us from propagating this event
1082
-
bgs.log.Error("failed to delete user data from carstore", "err", err)
1083
-
}
1084
-
1085
-
return bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{
1086
-
RepoTombstone: evt,
1087
-
})
1088
1010
}
1089
1011
1090
1012
// TODO: rename? This also updates users, and 'external' is an old phrasing
-45
bgs/fedmgr.go
-45
bgs/fedmgr.go
···
569
569
570
570
return nil
571
571
},
572
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
573
-
log.Info("got remote handle update event", "pdsHost", host.Host, "did", evt.Did, "handle", evt.Handle)
574
-
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
575
-
RepoHandle: evt,
576
-
}); err != nil {
577
-
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
578
-
}
579
-
*lastCursor = evt.Seq
580
-
581
-
if err := s.updateCursor(sub, *lastCursor); err != nil {
582
-
return fmt.Errorf("updating cursor: %w", err)
583
-
}
584
-
585
-
return nil
586
-
},
587
-
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error {
588
-
log.Info("got remote repo migrate event", "pdsHost", host.Host, "did", evt.Did, "migrateTo", evt.MigrateTo)
589
-
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
590
-
RepoMigrate: evt,
591
-
}); err != nil {
592
-
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
593
-
}
594
-
*lastCursor = evt.Seq
595
-
596
-
if err := s.updateCursor(sub, *lastCursor); err != nil {
597
-
return fmt.Errorf("updating cursor: %w", err)
598
-
}
599
-
600
-
return nil
601
-
},
602
-
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
603
-
log.Info("got remote repo tombstone event", "pdsHost", host.Host, "did", evt.Did)
604
-
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
605
-
RepoTombstone: evt,
606
-
}); err != nil {
607
-
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
608
-
}
609
-
*lastCursor = evt.Seq
610
-
611
-
if err := s.updateCursor(sub, *lastCursor); err != nil {
612
-
return fmt.Errorf("updating cursor: %w", err)
613
-
}
614
-
615
-
return nil
616
-
},
617
572
RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error {
618
573
log.Info("info event", "name", info.Name, "message", info.Message, "pdsHost", host.Host)
619
574
return nil
-18
cmd/goat/firehose.go
-18
cmd/goat/firehose.go
···
189
189
}
190
190
return nil
191
191
},
192
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
193
-
if gfc.VerifyBasic {
194
-
slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq)
195
-
}
196
-
return nil
197
-
},
198
-
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error {
199
-
if gfc.VerifyBasic {
200
-
slog.Info("deprecated event type", "eventType", "migrate", "did", evt.Did, "seq", evt.Seq)
201
-
}
202
-
return nil
203
-
},
204
-
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
205
-
if gfc.VerifyBasic {
206
-
slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq)
207
-
}
208
-
return nil
209
-
},
210
192
}
211
193
212
194
scheduler := parallel.NewScheduler(
-16
cmd/gosky/debug.go
-16
cmd/gosky/debug.go
···
262
262
263
263
return nil
264
264
},
265
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
266
-
fmt.Printf("\rChecking seq: %d ", evt.Seq)
267
-
if lastSeq > 0 && evt.Seq != lastSeq+1 {
268
-
fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq)
269
-
}
270
-
lastSeq = evt.Seq
271
-
return nil
272
-
},
273
-
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
274
-
fmt.Printf("\rChecking seq: %d ", evt.Seq)
275
-
if lastSeq > 0 && evt.Seq != lastSeq+1 {
276
-
fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq)
277
-
}
278
-
lastSeq = evt.Seq
279
-
return nil
280
-
},
281
265
RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error {
282
266
return nil
283
267
},
-41
cmd/gosky/main.go
-41
cmd/gosky/main.go
···
284
284
285
285
return nil
286
286
},
287
-
RepoMigrate: func(migrate *comatproto.SyncSubscribeRepos_Migrate) error {
288
-
if jsonfmt {
289
-
b, err := json.Marshal(migrate)
290
-
if err != nil {
291
-
return err
292
-
}
293
-
fmt.Println(string(b))
294
-
} else {
295
-
fmt.Printf("(%d) RepoMigrate: %s moving to: %s\n", migrate.Seq, migrate.Did, *migrate.MigrateTo)
296
-
}
297
-
298
-
return nil
299
-
},
300
-
RepoHandle: func(handle *comatproto.SyncSubscribeRepos_Handle) error {
301
-
if jsonfmt {
302
-
b, err := json.Marshal(handle)
303
-
if err != nil {
304
-
return err
305
-
}
306
-
fmt.Println(string(b))
307
-
} else {
308
-
fmt.Printf("(%d) RepoHandle: %s (changed to: %s)\n", handle.Seq, handle.Did, handle.Handle)
309
-
}
310
-
311
-
return nil
312
-
313
-
},
314
287
RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error {
315
288
if jsonfmt {
316
289
b, err := json.Marshal(info)
···
323
296
}
324
297
325
298
return nil
326
-
},
327
-
RepoTombstone: func(tomb *comatproto.SyncSubscribeRepos_Tombstone) error {
328
-
if jsonfmt {
329
-
b, err := json.Marshal(tomb)
330
-
if err != nil {
331
-
return err
332
-
}
333
-
fmt.Println(string(b))
334
-
} else {
335
-
fmt.Printf("(%d) Tombstone: %s\n", tomb.Seq, tomb.Did)
336
-
}
337
-
338
-
return nil
339
-
340
299
},
341
300
// TODO: all the other event types
342
301
Error: func(errf *events.ErrorFrame) error {
-10
cmd/gosky/streamdiff.go
-10
cmd/gosky/streamdiff.go
···
127
127
return "ERROR"
128
128
case evt.RepoCommit != nil:
129
129
return "#commit"
130
-
case evt.RepoHandle != nil:
131
-
return "#handle"
132
130
case evt.RepoInfo != nil:
133
131
return "#info"
134
-
case evt.RepoMigrate != nil:
135
-
return "#migrate"
136
-
case evt.RepoTombstone != nil:
137
-
return "#tombstone"
138
132
default:
139
133
return "unknown"
140
134
}
···
157
151
if sameCommit(evt.RepoCommit, oe.RepoCommit) {
158
152
return i
159
153
}
160
-
case evt.RepoHandle != nil:
161
-
panic("not handling handle updates yet")
162
-
case evt.RepoMigrate != nil:
163
-
panic("not handling repo migrates yet")
164
154
default:
165
155
panic("unhandled event type: " + evtop)
166
156
}
-9
cmd/relay/relay/ingest.go
-9
cmd/relay/relay/ingest.go
···
43
43
case evt.RepoAccount != nil:
44
44
//repoAccountReceivedCounter.WithLabelValues(hostname).Add(1)
45
45
return r.processAccountEvent(ctx, evt.RepoAccount, hostname, hostID)
46
-
case evt.RepoHandle != nil: // DEPRECATED
47
-
eventsWarningsCounter.WithLabelValues(hostname, "handle").Add(1)
48
-
return nil
49
-
case evt.RepoMigrate != nil: // DEPRECATED
50
-
eventsWarningsCounter.WithLabelValues(hostname, "migrate").Add(1)
51
-
return nil
52
-
case evt.RepoTombstone != nil: // DEPRECATED
53
-
eventsWarningsCounter.WithLabelValues(hostname, "tombstone").Add(1)
54
-
return nil
55
46
default:
56
47
return fmt.Errorf("unhandled repo stream event type")
57
48
}
-27
cmd/relay/relay/slurper.go
-27
cmd/relay/relay/slurper.go
···
454
454
s.logger.Debug("info event", "name", info.Name, "message", info.Message, "host", sub.Hostname)
455
455
return nil
456
456
},
457
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { // DEPRECATED
458
-
logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "handle")
459
-
logger.Debug("got remote handle update event", "handle", evt.Handle)
460
-
if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoHandle: evt}, sub.Hostname, sub.HostID); err != nil {
461
-
logger.Error("failed handling event", "err", err)
462
-
}
463
-
sub.UpdateSeq()
464
-
return nil
465
-
},
466
-
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { // DEPRECATED
467
-
logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "migrate")
468
-
logger.Debug("got remote repo migrate event", "migrateTo", evt.MigrateTo)
469
-
if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoMigrate: evt}, sub.Hostname, sub.HostID); err != nil {
470
-
logger.Error("failed handling event", "err", err)
471
-
}
472
-
sub.UpdateSeq()
473
-
return nil
474
-
},
475
-
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { // DEPRECATED
476
-
logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "tombstone")
477
-
logger.Debug("got remote repo tombstone event")
478
-
if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoTombstone: evt}, sub.Hostname, sub.HostID); err != nil {
479
-
logger.Error("failed handling event", "err", err)
480
-
}
481
-
sub.UpdateSeq()
482
-
return nil
483
-
},
484
457
}
485
458
486
459
limiters := []*slidingwindow.Limiter{
+8
-71
cmd/relay/stream/consumer.go
+8
-71
cmd/relay/stream/consumer.go
···
18
18
const MaxMessageBytes = 5_000_000
19
19
20
20
type RepoStreamCallbacks struct {
21
-
RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error
22
-
RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error
23
-
RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error
24
-
RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error
25
-
RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error
26
-
RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error
27
-
RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error
28
-
RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error
29
-
LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error
30
-
LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error
31
-
Error func(evt *ErrorFrame) error
21
+
RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error
22
+
RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error
23
+
RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error
24
+
RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error
25
+
RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error
26
+
LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error
27
+
LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error
28
+
Error func(evt *ErrorFrame) error
32
29
}
33
30
34
31
func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error {
···
37
34
return rsc.RepoCommit(xev.RepoCommit)
38
35
case xev.RepoSync != nil && rsc.RepoSync != nil:
39
36
return rsc.RepoSync(xev.RepoSync)
40
-
case xev.RepoHandle != nil && rsc.RepoHandle != nil:
41
-
return rsc.RepoHandle(xev.RepoHandle)
42
37
case xev.RepoInfo != nil && rsc.RepoInfo != nil:
43
38
return rsc.RepoInfo(xev.RepoInfo)
44
-
case xev.RepoMigrate != nil && rsc.RepoMigrate != nil:
45
-
return rsc.RepoMigrate(xev.RepoMigrate)
46
39
case xev.RepoIdentity != nil && rsc.RepoIdentity != nil:
47
40
return rsc.RepoIdentity(xev.RepoIdentity)
48
41
case xev.RepoAccount != nil && rsc.RepoAccount != nil:
49
42
return rsc.RepoAccount(xev.RepoAccount)
50
-
case xev.RepoTombstone != nil && rsc.RepoTombstone != nil:
51
-
return rsc.RepoTombstone(xev.RepoTombstone)
52
43
case xev.LabelLabels != nil && rsc.LabelLabels != nil:
53
44
return rsc.LabelLabels(xev.LabelLabels)
54
45
case xev.LabelInfo != nil && rsc.LabelInfo != nil:
···
248
239
}); err != nil {
249
240
return err
250
241
}
251
-
case "#handle":
252
-
// TODO: DEPRECATED message; warning/counter; drop message
253
-
var evt comatproto.SyncSubscribeRepos_Handle
254
-
if err := evt.UnmarshalCBOR(r); err != nil {
255
-
return err
256
-
}
257
-
258
-
if evt.Seq <= lastSeq {
259
-
logger.Error("got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
260
-
continue
261
-
}
262
-
lastSeq = evt.Seq
263
-
264
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
265
-
RepoHandle: &evt,
266
-
}); err != nil {
267
-
return err
268
-
}
269
242
case "#identity":
270
243
var evt comatproto.SyncSubscribeRepos_Identity
271
244
if err := evt.UnmarshalCBOR(r); err != nil {
···
309
282
310
283
if err := sched.AddWork(ctx, "", &XRPCStreamEvent{
311
284
RepoInfo: &evt,
312
-
}); err != nil {
313
-
return err
314
-
}
315
-
case "#migrate":
316
-
// TODO: DEPRECATED message; warning/counter; drop message
317
-
var evt comatproto.SyncSubscribeRepos_Migrate
318
-
if err := evt.UnmarshalCBOR(r); err != nil {
319
-
return err
320
-
}
321
-
322
-
if evt.Seq <= lastSeq {
323
-
logger.Error("got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
324
-
continue
325
-
}
326
-
lastSeq = evt.Seq
327
-
328
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
329
-
RepoMigrate: &evt,
330
-
}); err != nil {
331
-
return err
332
-
}
333
-
case "#tombstone":
334
-
// TODO: DEPRECATED message; warning/counter; drop message
335
-
var evt comatproto.SyncSubscribeRepos_Tombstone
336
-
if err := evt.UnmarshalCBOR(r); err != nil {
337
-
return err
338
-
}
339
-
340
-
if evt.Seq <= lastSeq {
341
-
logger.Error("got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
342
-
continue
343
-
}
344
-
lastSeq = evt.Seq
345
-
346
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
347
-
RepoTombstone: &evt,
348
285
}); err != nil {
349
286
return err
350
287
}
+8
-53
cmd/relay/stream/events.go
+8
-53
cmd/relay/stream/events.go
···
23
23
}
24
24
25
25
type XRPCStreamEvent struct {
26
-
Error *ErrorFrame
27
-
RepoCommit *comatproto.SyncSubscribeRepos_Commit
28
-
RepoSync *comatproto.SyncSubscribeRepos_Sync
29
-
RepoHandle *comatproto.SyncSubscribeRepos_Handle // DEPRECATED
30
-
RepoIdentity *comatproto.SyncSubscribeRepos_Identity
31
-
RepoInfo *comatproto.SyncSubscribeRepos_Info
32
-
RepoMigrate *comatproto.SyncSubscribeRepos_Migrate // DEPRECATED
33
-
RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone // DEPRECATED
34
-
RepoAccount *comatproto.SyncSubscribeRepos_Account
35
-
LabelLabels *comatproto.LabelSubscribeLabels_Labels
36
-
LabelInfo *comatproto.LabelSubscribeLabels_Info
26
+
Error *ErrorFrame
27
+
RepoCommit *comatproto.SyncSubscribeRepos_Commit
28
+
RepoSync *comatproto.SyncSubscribeRepos_Sync
29
+
RepoIdentity *comatproto.SyncSubscribeRepos_Identity
30
+
RepoInfo *comatproto.SyncSubscribeRepos_Info
31
+
RepoAccount *comatproto.SyncSubscribeRepos_Account
32
+
LabelLabels *comatproto.LabelSubscribeLabels_Labels
33
+
LabelInfo *comatproto.LabelSubscribeLabels_Info
37
34
38
35
// some private fields for internal routing perf
39
36
PrivUid uint64 `json:"-" cborgen:"-"`
···
56
53
case evt.RepoSync != nil:
57
54
header.MsgType = "#sync"
58
55
obj = evt.RepoSync
59
-
case evt.RepoHandle != nil:
60
-
header.MsgType = "#handle"
61
-
obj = evt.RepoHandle
62
56
case evt.RepoIdentity != nil:
63
57
header.MsgType = "#identity"
64
58
obj = evt.RepoIdentity
···
68
62
case evt.RepoInfo != nil:
69
63
header.MsgType = "#info"
70
64
obj = evt.RepoInfo
71
-
case evt.RepoMigrate != nil:
72
-
header.MsgType = "#migrate"
73
-
obj = evt.RepoMigrate
74
-
case evt.RepoTombstone != nil:
75
-
header.MsgType = "#tombstone"
76
-
obj = evt.RepoTombstone
77
65
default:
78
66
return fmt.Errorf("unrecognized event kind")
79
67
}
···
105
93
return fmt.Errorf("reading repoSync event: %w", err)
106
94
}
107
95
xevt.RepoSync = &evt
108
-
case "#handle":
109
-
// TODO: DEPRECATED message; warning/counter; drop message
110
-
var evt comatproto.SyncSubscribeRepos_Handle
111
-
if err := evt.UnmarshalCBOR(r); err != nil {
112
-
return err
113
-
}
114
-
xevt.RepoHandle = &evt
115
96
case "#identity":
116
97
var evt comatproto.SyncSubscribeRepos_Identity
117
98
if err := evt.UnmarshalCBOR(r); err != nil {
···
131
112
return err
132
113
}
133
114
xevt.RepoInfo = &evt
134
-
case "#migrate":
135
-
// TODO: DEPRECATED message; warning/counter; drop message
136
-
var evt comatproto.SyncSubscribeRepos_Migrate
137
-
if err := evt.UnmarshalCBOR(r); err != nil {
138
-
return err
139
-
}
140
-
xevt.RepoMigrate = &evt
141
-
case "#tombstone":
142
-
// TODO: DEPRECATED message; warning/counter; drop message
143
-
var evt comatproto.SyncSubscribeRepos_Tombstone
144
-
if err := evt.UnmarshalCBOR(r); err != nil {
145
-
return err
146
-
}
147
-
xevt.RepoTombstone = &evt
148
115
case "#labels":
149
116
var evt comatproto.LabelSubscribeLabels_Labels
150
117
if err := evt.UnmarshalCBOR(r); err != nil {
···
193
160
return evt.RepoCommit.Seq
194
161
case evt.RepoSync != nil:
195
162
return evt.RepoSync.Seq
196
-
case evt.RepoHandle != nil:
197
-
return evt.RepoHandle.Seq
198
-
case evt.RepoMigrate != nil:
199
-
return evt.RepoMigrate.Seq
200
-
case evt.RepoTombstone != nil:
201
-
return evt.RepoTombstone.Seq
202
163
case evt.RepoIdentity != nil:
203
164
return evt.RepoIdentity.Seq
204
165
case evt.RepoAccount != nil:
···
220
181
return evt.RepoCommit.Seq, true
221
182
case evt.RepoSync != nil:
222
183
return evt.RepoSync.Seq, true
223
-
case evt.RepoHandle != nil:
224
-
return evt.RepoHandle.Seq, true
225
-
case evt.RepoMigrate != nil:
226
-
return evt.RepoMigrate.Seq, true
227
-
case evt.RepoTombstone != nil:
228
-
return evt.RepoTombstone.Seq, true
229
184
case evt.RepoIdentity != nil:
230
185
return evt.RepoIdentity.Seq, true
231
186
case evt.RepoAccount != nil:
-34
cmd/relay/stream/persist/diskpersist/diskpersist.go
-34
cmd/relay/stream/persist/diskpersist/diskpersist.go
···
484
484
pjob.Evt.RepoCommit.Seq = seq
485
485
case pjob.Evt.RepoSync != nil:
486
486
pjob.Evt.RepoSync.Seq = seq
487
-
case pjob.Evt.RepoHandle != nil:
488
-
pjob.Evt.RepoHandle.Seq = seq
489
487
case pjob.Evt.RepoIdentity != nil:
490
488
pjob.Evt.RepoIdentity.Seq = seq
491
489
case pjob.Evt.RepoAccount != nil:
492
490
pjob.Evt.RepoAccount.Seq = seq
493
-
case pjob.Evt.RepoTombstone != nil:
494
-
pjob.Evt.RepoTombstone.Seq = seq
495
491
default:
496
492
// only those three get peristed right now
497
493
// we should not actually ever get here...
···
547
543
if err := xevt.RepoSync.MarshalCBOR(cw); err != nil {
548
544
return fmt.Errorf("failed to marshal: %w", err)
549
545
}
550
-
case xevt.RepoHandle != nil:
551
-
evtKind = evtKindHandle
552
-
did = xevt.RepoHandle.Did
553
-
if err := xevt.RepoHandle.MarshalCBOR(cw); err != nil {
554
-
return fmt.Errorf("failed to marshal: %w", err)
555
-
}
556
546
case xevt.RepoIdentity != nil:
557
547
evtKind = evtKindIdentity
558
548
did = xevt.RepoIdentity.Did
···
563
553
evtKind = evtKindAccount
564
554
did = xevt.RepoAccount.Did
565
555
if err := xevt.RepoAccount.MarshalCBOR(cw); err != nil {
566
-
return fmt.Errorf("failed to marshal: %w", err)
567
-
}
568
-
case xevt.RepoTombstone != nil:
569
-
evtKind = evtKindTombstone
570
-
did = xevt.RepoTombstone.Did
571
-
if err := xevt.RepoTombstone.MarshalCBOR(cw); err != nil {
572
556
return fmt.Errorf("failed to marshal: %w", err)
573
557
}
574
558
default:
···
810
794
if err := cb(&stream.XRPCStreamEvent{RepoSync: &evt}); err != nil {
811
795
return nil, err
812
796
}
813
-
case evtKindHandle:
814
-
var evt atproto.SyncSubscribeRepos_Handle
815
-
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
816
-
return nil, err
817
-
}
818
-
evt.Seq = h.Seq
819
-
if err := cb(&stream.XRPCStreamEvent{RepoHandle: &evt}); err != nil {
820
-
return nil, err
821
-
}
822
797
case evtKindIdentity:
823
798
var evt atproto.SyncSubscribeRepos_Identity
824
799
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
···
835
810
}
836
811
evt.Seq = h.Seq
837
812
if err := cb(&stream.XRPCStreamEvent{RepoAccount: &evt}); err != nil {
838
-
return nil, err
839
-
}
840
-
case evtKindTombstone:
841
-
var evt atproto.SyncSubscribeRepos_Tombstone
842
-
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
843
-
return nil, err
844
-
}
845
-
evt.Seq = h.Seq
846
-
if err := cb(&stream.XRPCStreamEvent{RepoTombstone: &evt}); err != nil {
847
813
return nil, err
848
814
}
849
815
default:
-8
cmd/relay/testing/consumer.go
-8
cmd/relay/testing/consumer.go
···
62
62
c.LastSeq = evt.Seq
63
63
return nil
64
64
},
65
-
// NOTE: this is included to test that the events are *not* passed through; can be removed in the near future
66
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
67
-
c.eventsLk.Lock()
68
-
defer c.eventsLk.Unlock()
69
-
c.Events = append(c.Events, &stream.XRPCStreamEvent{RepoHandle: evt})
70
-
c.LastSeq = evt.Seq
71
-
return nil
72
-
},
73
65
}
74
66
return rsc
75
67
}
-36
cmd/sonar/sonar.go
-36
cmd/sonar/sonar.go
···
109
109
case xe.RepoCommit != nil:
110
110
eventsProcessedCounter.WithLabelValues("repo_commit", s.SocketURL).Inc()
111
111
return s.HandleRepoCommit(ctx, xe.RepoCommit)
112
-
case xe.RepoHandle != nil:
113
-
eventsProcessedCounter.WithLabelValues("repo_handle", s.SocketURL).Inc()
114
-
now := time.Now()
115
-
s.ProgMux.Lock()
116
-
s.Progress.LastSeq = xe.RepoHandle.Seq
117
-
s.Progress.LastSeqProcessedAt = now
118
-
s.ProgMux.Unlock()
119
-
// Parse time from the event time string
120
-
t, err := time.Parse(time.RFC3339, xe.RepoHandle.Time)
121
-
if err != nil {
122
-
s.Logger.Error("error parsing time", "err", err)
123
-
return nil
124
-
}
125
-
lastEvtCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
126
-
lastEvtProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano()))
127
-
lastEvtCreatedEvtProcessedGapGauge.WithLabelValues(s.SocketURL).Set(float64(now.Sub(t).Seconds()))
128
-
lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(xe.RepoHandle.Seq))
129
112
case xe.RepoIdentity != nil:
130
113
eventsProcessedCounter.WithLabelValues("identity", s.SocketURL).Inc()
131
114
now := time.Now()
···
142
125
s.ProgMux.Unlock()
143
126
case xe.RepoInfo != nil:
144
127
eventsProcessedCounter.WithLabelValues("repo_info", s.SocketURL).Inc()
145
-
case xe.RepoMigrate != nil:
146
-
eventsProcessedCounter.WithLabelValues("repo_migrate", s.SocketURL).Inc()
147
-
now := time.Now()
148
-
s.ProgMux.Lock()
149
-
s.Progress.LastSeq = xe.RepoMigrate.Seq
150
-
s.Progress.LastSeqProcessedAt = time.Now()
151
-
s.ProgMux.Unlock()
152
-
// Parse time from the event time string
153
-
t, err := time.Parse(time.RFC3339, xe.RepoMigrate.Time)
154
-
if err != nil {
155
-
s.Logger.Error("error parsing time", "err", err)
156
-
return nil
157
-
}
158
-
lastEvtCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
159
-
lastEvtProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano()))
160
-
lastEvtCreatedEvtProcessedGapGauge.WithLabelValues(s.SocketURL).Set(float64(now.Sub(t).Seconds()))
161
-
lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(xe.RepoHandle.Seq))
162
-
case xe.RepoTombstone != nil:
163
-
eventsProcessedCounter.WithLabelValues("repo_tombstone", s.SocketURL).Inc()
164
128
case xe.LabelInfo != nil:
165
129
eventsProcessedCounter.WithLabelValues("label_info", s.SocketURL).Inc()
166
130
case xe.LabelLabels != nil:
-9
cmd/supercollider/main.go
-9
cmd/supercollider/main.go
···
338
338
case evt.RepoCommit != nil:
339
339
header.MsgType = "#commit"
340
340
obj = evt.RepoCommit
341
-
case evt.RepoHandle != nil:
342
-
header.MsgType = "#handle"
343
-
obj = evt.RepoHandle
344
341
case evt.RepoInfo != nil:
345
342
header.MsgType = "#info"
346
343
obj = evt.RepoInfo
347
-
case evt.RepoMigrate != nil:
348
-
header.MsgType = "#migrate"
349
-
obj = evt.RepoMigrate
350
-
case evt.RepoTombstone != nil:
351
-
header.MsgType = "#tombstone"
352
-
obj = evt.RepoTombstone
353
344
default:
354
345
logger.Error("unrecognized event kind")
355
346
continue
+8
-68
events/consumer.go
+8
-68
events/consumer.go
···
16
16
)
17
17
18
18
type RepoStreamCallbacks struct {
19
-
RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error
20
-
RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error
21
-
RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error
22
-
RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error
23
-
RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error
24
-
RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error
25
-
RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error
26
-
RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error
27
-
LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error
28
-
LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error
29
-
Error func(evt *ErrorFrame) error
19
+
RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error
20
+
RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error
21
+
RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error
22
+
RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error
23
+
RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error
24
+
LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error
25
+
LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error
26
+
Error func(evt *ErrorFrame) error
30
27
}
31
28
32
29
func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error {
···
35
32
return rsc.RepoCommit(xev.RepoCommit)
36
33
case xev.RepoSync != nil && rsc.RepoSync != nil:
37
34
return rsc.RepoSync(xev.RepoSync)
38
-
case xev.RepoHandle != nil && rsc.RepoHandle != nil:
39
-
return rsc.RepoHandle(xev.RepoHandle)
40
35
case xev.RepoInfo != nil && rsc.RepoInfo != nil:
41
36
return rsc.RepoInfo(xev.RepoInfo)
42
-
case xev.RepoMigrate != nil && rsc.RepoMigrate != nil:
43
-
return rsc.RepoMigrate(xev.RepoMigrate)
44
37
case xev.RepoIdentity != nil && rsc.RepoIdentity != nil:
45
38
return rsc.RepoIdentity(xev.RepoIdentity)
46
39
case xev.RepoAccount != nil && rsc.RepoAccount != nil:
47
40
return rsc.RepoAccount(xev.RepoAccount)
48
-
case xev.RepoTombstone != nil && rsc.RepoTombstone != nil:
49
-
return rsc.RepoTombstone(xev.RepoTombstone)
50
41
case xev.LabelLabels != nil && rsc.LabelLabels != nil:
51
42
return rsc.LabelLabels(xev.LabelLabels)
52
43
case xev.LabelInfo != nil && rsc.LabelInfo != nil:
···
241
232
}); err != nil {
242
233
return err
243
234
}
244
-
case "#handle":
245
-
// TODO: DEPRECATED message; warning/counter; drop message
246
-
var evt comatproto.SyncSubscribeRepos_Handle
247
-
if err := evt.UnmarshalCBOR(r); err != nil {
248
-
return err
249
-
}
250
-
251
-
if evt.Seq < lastSeq {
252
-
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
253
-
}
254
-
lastSeq = evt.Seq
255
-
256
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
257
-
RepoHandle: &evt,
258
-
}); err != nil {
259
-
return err
260
-
}
261
235
case "#identity":
262
236
var evt comatproto.SyncSubscribeRepos_Identity
263
237
if err := evt.UnmarshalCBOR(r); err != nil {
···
299
273
300
274
if err := sched.AddWork(ctx, "", &XRPCStreamEvent{
301
275
RepoInfo: &evt,
302
-
}); err != nil {
303
-
return err
304
-
}
305
-
case "#migrate":
306
-
// TODO: DEPRECATED message; warning/counter; drop message
307
-
var evt comatproto.SyncSubscribeRepos_Migrate
308
-
if err := evt.UnmarshalCBOR(r); err != nil {
309
-
return err
310
-
}
311
-
312
-
if evt.Seq < lastSeq {
313
-
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
314
-
}
315
-
lastSeq = evt.Seq
316
-
317
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
318
-
RepoMigrate: &evt,
319
-
}); err != nil {
320
-
return err
321
-
}
322
-
case "#tombstone":
323
-
// TODO: DEPRECATED message; warning/counter; drop message
324
-
var evt comatproto.SyncSubscribeRepos_Tombstone
325
-
if err := evt.UnmarshalCBOR(r); err != nil {
326
-
return err
327
-
}
328
-
329
-
if evt.Seq < lastSeq {
330
-
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
331
-
}
332
-
lastSeq = evt.Seq
333
-
334
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
335
-
RepoTombstone: &evt,
336
276
}); err != nil {
337
277
return err
338
278
}
-88
events/dbpersist/dbpersist.go
-88
events/dbpersist/dbpersist.go
···
171
171
switch {
172
172
case e.RepoCommit != nil:
173
173
e.RepoCommit.Seq = int64(item.Seq)
174
-
case e.RepoHandle != nil:
175
-
e.RepoHandle.Seq = int64(item.Seq)
176
174
case e.RepoIdentity != nil:
177
175
e.RepoIdentity.Seq = int64(item.Seq)
178
176
case e.RepoAccount != nil:
179
177
e.RepoAccount.Seq = int64(item.Seq)
180
-
case e.RepoTombstone != nil:
181
-
e.RepoTombstone.Seq = int64(item.Seq)
182
178
default:
183
179
return fmt.Errorf("unknown event type")
184
180
}
···
218
214
if err != nil {
219
215
return err
220
216
}
221
-
case e.RepoHandle != nil:
222
-
rer, err = p.RecordFromHandleChange(ctx, e.RepoHandle)
223
-
if err != nil {
224
-
return err
225
-
}
226
217
case e.RepoIdentity != nil:
227
218
rer, err = p.RecordFromRepoIdentity(ctx, e.RepoIdentity)
228
219
if err != nil {
···
233
224
if err != nil {
234
225
return err
235
226
}
236
-
case e.RepoTombstone != nil:
237
-
rer, err = p.RecordFromTombstone(ctx, e.RepoTombstone)
238
-
if err != nil {
239
-
return err
240
-
}
241
227
default:
242
228
return nil
243
229
}
···
249
235
return nil
250
236
}
251
237
252
-
func (p *DbPersistence) RecordFromHandleChange(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Handle) (*RepoEventRecord, error) {
253
-
t, err := time.Parse(util.ISO8601, evt.Time)
254
-
if err != nil {
255
-
return nil, err
256
-
}
257
-
258
-
uid, err := p.uidForDid(ctx, evt.Did)
259
-
if err != nil {
260
-
return nil, err
261
-
}
262
-
263
-
return &RepoEventRecord{
264
-
Repo: uid,
265
-
Type: "repo_handle",
266
-
Time: t,
267
-
NewHandle: &evt.Handle,
268
-
}, nil
269
-
}
270
-
271
238
func (p *DbPersistence) RecordFromRepoIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) (*RepoEventRecord, error) {
272
239
t, err := time.Parse(util.ISO8601, evt.Time)
273
240
if err != nil {
···
306
273
}, nil
307
274
}
308
275
309
-
func (p *DbPersistence) RecordFromTombstone(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Tombstone) (*RepoEventRecord, error) {
310
-
t, err := time.Parse(util.ISO8601, evt.Time)
311
-
if err != nil {
312
-
return nil, err
313
-
}
314
-
315
-
uid, err := p.uidForDid(ctx, evt.Did)
316
-
if err != nil {
317
-
return nil, err
318
-
}
319
-
320
-
return &RepoEventRecord{
321
-
Repo: uid,
322
-
Type: "repo_tombstone",
323
-
Time: t,
324
-
}, nil
325
-
}
326
-
327
276
func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*RepoEventRecord, error) {
328
277
// TODO: hack hack hack
329
278
if len(evt.Ops) > 8192 {
···
449
398
switch {
450
399
case record.Commit != nil:
451
400
streamEvent, err = p.hydrateCommit(ctx, record)
452
-
case record.NewHandle != nil:
453
-
streamEvent, err = p.hydrateHandleChange(ctx, record)
454
401
case record.Type == "repo_identity":
455
402
streamEvent, err = p.hydrateIdentityEvent(ctx, record)
456
403
case record.Type == "repo_account":
457
404
streamEvent, err = p.hydrateAccountEvent(ctx, record)
458
-
case record.Type == "repo_tombstone":
459
-
streamEvent, err = p.hydrateTombstone(ctx, record)
460
405
default:
461
406
err = fmt.Errorf("unknown event type: %s", record.Type)
462
407
}
···
519
464
return u.Did, nil
520
465
}
521
466
522
-
func (p *DbPersistence) hydrateHandleChange(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
523
-
if rer.NewHandle == nil {
524
-
return nil, fmt.Errorf("NewHandle is nil")
525
-
}
526
-
527
-
did, err := p.didForUid(ctx, rer.Repo)
528
-
if err != nil {
529
-
return nil, err
530
-
}
531
-
532
-
return &events.XRPCStreamEvent{
533
-
RepoHandle: &comatproto.SyncSubscribeRepos_Handle{
534
-
Did: did,
535
-
Handle: *rer.NewHandle,
536
-
Time: rer.Time.Format(util.ISO8601),
537
-
},
538
-
}, nil
539
-
}
540
-
541
467
func (p *DbPersistence) hydrateIdentityEvent(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
542
468
did, err := p.didForUid(ctx, rer.Repo)
543
469
if err != nil {
···
564
490
Time: rer.Time.Format(util.ISO8601),
565
491
Active: rer.Active,
566
492
Status: rer.Status,
567
-
},
568
-
}, nil
569
-
}
570
-
571
-
func (p *DbPersistence) hydrateTombstone(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
572
-
did, err := p.didForUid(ctx, rer.Repo)
573
-
if err != nil {
574
-
return nil, err
575
-
}
576
-
577
-
return &events.XRPCStreamEvent{
578
-
RepoTombstone: &comatproto.SyncSubscribeRepos_Tombstone{
579
-
Did: did,
580
-
Time: rer.Time.Format(util.ISO8601),
581
493
},
582
494
}, nil
583
495
}
-34
events/diskpersist/diskpersist.go
-34
events/diskpersist/diskpersist.go
···
455
455
switch {
456
456
case e.RepoCommit != nil:
457
457
e.RepoCommit.Seq = seq
458
-
case e.RepoHandle != nil:
459
-
e.RepoHandle.Seq = seq
460
458
case e.RepoIdentity != nil:
461
459
e.RepoIdentity.Seq = seq
462
460
case e.RepoAccount != nil:
463
461
e.RepoAccount.Seq = seq
464
-
case e.RepoTombstone != nil:
465
-
e.RepoTombstone.Seq = seq
466
462
default:
467
463
// only those three get peristed right now
468
464
// we should not actually ever get here...
···
509
505
if err := e.RepoCommit.MarshalCBOR(cw); err != nil {
510
506
return fmt.Errorf("failed to marshal: %w", err)
511
507
}
512
-
case e.RepoHandle != nil:
513
-
evtKind = evtKindHandle
514
-
did = e.RepoHandle.Did
515
-
if err := e.RepoHandle.MarshalCBOR(cw); err != nil {
516
-
return fmt.Errorf("failed to marshal: %w", err)
517
-
}
518
508
case e.RepoIdentity != nil:
519
509
evtKind = evtKindIdentity
520
510
did = e.RepoIdentity.Did
···
525
515
evtKind = evtKindAccount
526
516
did = e.RepoAccount.Did
527
517
if err := e.RepoAccount.MarshalCBOR(cw); err != nil {
528
-
return fmt.Errorf("failed to marshal: %w", err)
529
-
}
530
-
case e.RepoTombstone != nil:
531
-
evtKind = evtKindTombstone
532
-
did = e.RepoTombstone.Did
533
-
if err := e.RepoTombstone.MarshalCBOR(cw); err != nil {
534
518
return fmt.Errorf("failed to marshal: %w", err)
535
519
}
536
520
default:
···
745
729
if err := cb(&events.XRPCStreamEvent{RepoCommit: &evt}); err != nil {
746
730
return nil, err
747
731
}
748
-
case evtKindHandle:
749
-
var evt atproto.SyncSubscribeRepos_Handle
750
-
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
751
-
return nil, err
752
-
}
753
-
evt.Seq = h.Seq
754
-
if err := cb(&events.XRPCStreamEvent{RepoHandle: &evt}); err != nil {
755
-
return nil, err
756
-
}
757
732
case evtKindIdentity:
758
733
var evt atproto.SyncSubscribeRepos_Identity
759
734
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
···
770
745
}
771
746
evt.Seq = h.Seq
772
747
if err := cb(&events.XRPCStreamEvent{RepoAccount: &evt}); err != nil {
773
-
return nil, err
774
-
}
775
-
case evtKindTombstone:
776
-
var evt atproto.SyncSubscribeRepos_Tombstone
777
-
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
778
-
return nil, err
779
-
}
780
-
evt.Seq = h.Seq
781
-
if err := cb(&events.XRPCStreamEvent{RepoTombstone: &evt}); err != nil {
782
748
return nil, err
783
749
}
784
750
default:
+8
-53
events/events.go
+8
-53
events/events.go
···
187
187
}
188
188
189
189
type XRPCStreamEvent struct {
190
-
Error *ErrorFrame
191
-
RepoCommit *comatproto.SyncSubscribeRepos_Commit
192
-
RepoSync *comatproto.SyncSubscribeRepos_Sync
193
-
RepoHandle *comatproto.SyncSubscribeRepos_Handle // DEPRECATED
194
-
RepoIdentity *comatproto.SyncSubscribeRepos_Identity
195
-
RepoInfo *comatproto.SyncSubscribeRepos_Info
196
-
RepoMigrate *comatproto.SyncSubscribeRepos_Migrate // DEPRECATED
197
-
RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone // DEPRECATED
198
-
RepoAccount *comatproto.SyncSubscribeRepos_Account
199
-
LabelLabels *comatproto.LabelSubscribeLabels_Labels
200
-
LabelInfo *comatproto.LabelSubscribeLabels_Info
190
+
Error *ErrorFrame
191
+
RepoCommit *comatproto.SyncSubscribeRepos_Commit
192
+
RepoSync *comatproto.SyncSubscribeRepos_Sync
193
+
RepoIdentity *comatproto.SyncSubscribeRepos_Identity
194
+
RepoInfo *comatproto.SyncSubscribeRepos_Info
195
+
RepoAccount *comatproto.SyncSubscribeRepos_Account
196
+
LabelLabels *comatproto.LabelSubscribeLabels_Labels
197
+
LabelInfo *comatproto.LabelSubscribeLabels_Info
201
198
202
199
// some private fields for internal routing perf
203
200
PrivUid models.Uid `json:"-" cborgen:"-"`
···
220
217
case evt.RepoSync != nil:
221
218
header.MsgType = "#sync"
222
219
obj = evt.RepoSync
223
-
case evt.RepoHandle != nil:
224
-
header.MsgType = "#handle"
225
-
obj = evt.RepoHandle
226
220
case evt.RepoIdentity != nil:
227
221
header.MsgType = "#identity"
228
222
obj = evt.RepoIdentity
···
232
226
case evt.RepoInfo != nil:
233
227
header.MsgType = "#info"
234
228
obj = evt.RepoInfo
235
-
case evt.RepoMigrate != nil:
236
-
header.MsgType = "#migrate"
237
-
obj = evt.RepoMigrate
238
-
case evt.RepoTombstone != nil:
239
-
header.MsgType = "#tombstone"
240
-
obj = evt.RepoTombstone
241
229
default:
242
230
return fmt.Errorf("unrecognized event kind")
243
231
}
···
269
257
return fmt.Errorf("reading repoSync event: %w", err)
270
258
}
271
259
xevt.RepoSync = &evt
272
-
case "#handle":
273
-
// TODO: DEPRECATED message; warning/counter; drop message
274
-
var evt comatproto.SyncSubscribeRepos_Handle
275
-
if err := evt.UnmarshalCBOR(r); err != nil {
276
-
return err
277
-
}
278
-
xevt.RepoHandle = &evt
279
260
case "#identity":
280
261
var evt comatproto.SyncSubscribeRepos_Identity
281
262
if err := evt.UnmarshalCBOR(r); err != nil {
···
295
276
return err
296
277
}
297
278
xevt.RepoInfo = &evt
298
-
case "#migrate":
299
-
// TODO: DEPRECATED message; warning/counter; drop message
300
-
var evt comatproto.SyncSubscribeRepos_Migrate
301
-
if err := evt.UnmarshalCBOR(r); err != nil {
302
-
return err
303
-
}
304
-
xevt.RepoMigrate = &evt
305
-
case "#tombstone":
306
-
// TODO: DEPRECATED message; warning/counter; drop message
307
-
var evt comatproto.SyncSubscribeRepos_Tombstone
308
-
if err := evt.UnmarshalCBOR(r); err != nil {
309
-
return err
310
-
}
311
-
xevt.RepoTombstone = &evt
312
279
case "#labels":
313
280
var evt comatproto.LabelSubscribeLabels_Labels
314
281
if err := evt.UnmarshalCBOR(r); err != nil {
···
476
443
return evt.RepoCommit.Seq
477
444
case evt.RepoSync != nil:
478
445
return evt.RepoSync.Seq
479
-
case evt.RepoHandle != nil:
480
-
return evt.RepoHandle.Seq
481
-
case evt.RepoMigrate != nil:
482
-
return evt.RepoMigrate.Seq
483
-
case evt.RepoTombstone != nil:
484
-
return evt.RepoTombstone.Seq
485
446
case evt.RepoIdentity != nil:
486
447
return evt.RepoIdentity.Seq
487
448
case evt.RepoAccount != nil:
···
503
464
return evt.RepoCommit.Seq, true
504
465
case evt.RepoSync != nil:
505
466
return evt.RepoSync.Seq, true
506
-
case evt.RepoHandle != nil:
507
-
return evt.RepoHandle.Seq, true
508
-
case evt.RepoMigrate != nil:
509
-
return evt.RepoMigrate.Seq, true
510
-
case evt.RepoTombstone != nil:
511
-
return evt.RepoTombstone.Seq, true
512
467
case evt.RepoIdentity != nil:
513
468
return evt.RepoIdentity.Seq, true
514
469
case evt.RepoAccount != nil:
-6
events/persist.go
-6
events/persist.go
···
42
42
switch {
43
43
case e.RepoCommit != nil:
44
44
e.RepoCommit.Seq = mp.seq
45
-
case e.RepoHandle != nil:
46
-
e.RepoHandle.Seq = mp.seq
47
45
case e.RepoIdentity != nil:
48
46
e.RepoIdentity.Seq = mp.seq
49
47
case e.RepoAccount != nil:
50
48
e.RepoAccount.Seq = mp.seq
51
-
case e.RepoMigrate != nil:
52
-
e.RepoMigrate.Seq = mp.seq
53
-
case e.RepoTombstone != nil:
54
-
e.RepoTombstone.Seq = mp.seq
55
49
case e.LabelLabels != nil:
56
50
e.LabelLabels.Seq = mp.seq
57
51
default:
-6
events/yolopersist/yolopersist.go
-6
events/yolopersist/yolopersist.go
···
28
28
switch {
29
29
case e.RepoCommit != nil:
30
30
e.RepoCommit.Seq = yp.seq
31
-
case e.RepoHandle != nil:
32
-
e.RepoHandle.Seq = yp.seq
33
31
case e.RepoIdentity != nil:
34
32
e.RepoIdentity.Seq = yp.seq
35
33
case e.RepoAccount != nil:
36
34
e.RepoAccount.Seq = yp.seq
37
-
case e.RepoMigrate != nil:
38
-
e.RepoMigrate.Seq = yp.seq
39
-
case e.RepoTombstone != nil:
40
-
e.RepoTombstone.Seq = yp.seq
41
35
case e.LabelLabels != nil:
42
36
e.LabelLabels.Seq = yp.seq
43
37
default:
-3
gen/main.go
-3
gen/main.go
···
98
98
atproto.RepoStrongRef{},
99
99
atproto.SyncSubscribeRepos_Commit{},
100
100
atproto.SyncSubscribeRepos_Sync{},
101
-
atproto.SyncSubscribeRepos_Handle{},
102
101
atproto.SyncSubscribeRepos_Identity{},
103
102
atproto.SyncSubscribeRepos_Account{},
104
103
atproto.SyncSubscribeRepos_Info{},
105
-
atproto.SyncSubscribeRepos_Migrate{},
106
104
atproto.SyncSubscribeRepos_RepoOp{},
107
-
atproto.SyncSubscribeRepos_Tombstone{},
108
105
atproto.LabelDefs_SelfLabels{},
109
106
atproto.LabelDefs_SelfLabel{},
110
107
atproto.LabelDefs_Label{},
-20
pds/server.go
-20
pds/server.go
···
598
598
case evt.RepoCommit != nil:
599
599
header.MsgType = "#commit"
600
600
obj = evt.RepoCommit
601
-
case evt.RepoHandle != nil:
602
-
header.MsgType = "#handle"
603
-
obj = evt.RepoHandle
604
601
case evt.RepoIdentity != nil:
605
602
header.MsgType = "#identity"
606
603
obj = evt.RepoIdentity
···
610
607
case evt.RepoInfo != nil:
611
608
header.MsgType = "#info"
612
609
obj = evt.RepoInfo
613
-
case evt.RepoMigrate != nil:
614
-
header.MsgType = "#migrate"
615
-
obj = evt.RepoMigrate
616
-
case evt.RepoTombstone != nil:
617
-
header.MsgType = "#tombstone"
618
-
obj = evt.RepoTombstone
619
610
default:
620
611
return fmt.Errorf("unrecognized event kind")
621
612
}
···
660
651
return fmt.Errorf("failed to update handle: %w", err)
661
652
}
662
653
663
-
if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
664
-
RepoHandle: &comatproto.SyncSubscribeRepos_Handle{
665
-
Did: u.Did,
666
-
Handle: handle,
667
-
Time: time.Now().Format(util.ISO8601),
668
-
},
669
-
}); err != nil {
670
-
return fmt.Errorf("failed to push event: %s", err)
671
-
}
672
-
673
-
// Also push an Identity event
674
654
if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
675
655
RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{
676
656
Did: u.Did,
-16
search/firehose.go
-16
search/firehose.go
···
115
115
return nil
116
116
117
117
},
118
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
119
-
ctx := context.Background()
120
-
ctx, span := tracer.Start(ctx, "RepoHandle")
121
-
defer span.End()
122
-
123
-
did, err := syntax.ParseDID(evt.Did)
124
-
if err != nil {
125
-
idx.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
126
-
return nil
127
-
}
128
-
if err := idx.updateUserHandle(ctx, did, evt.Handle); err != nil {
129
-
// TODO: handle this case (instead of return nil)
130
-
idx.logger.Error("failed to update user handle", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
131
-
}
132
-
return nil
133
-
},
134
118
}
135
119
136
120
return events.HandleRepoStream(
-2
testing/integ_test.go
-2
testing/integ_test.go
-7
testing/utils.go
-7
testing/utils.go
···
666
666
es.Lk.Unlock()
667
667
return nil
668
668
},
669
-
RepoHandle: func(evt *atproto.SyncSubscribeRepos_Handle) error {
670
-
fmt.Println("received handle event: ", evt.Seq, evt.Did)
671
-
es.Lk.Lock()
672
-
es.Events = append(es.Events, &events.XRPCStreamEvent{RepoHandle: evt})
673
-
es.Lk.Unlock()
674
-
return nil
675
-
},
676
669
RepoIdentity: func(evt *atproto.SyncSubscribeRepos_Identity) error {
677
670
fmt.Println("received identity event: ", evt.Seq, evt.Did)
678
671
es.Lk.Lock()