+3
-3
api/agnostic/actorgetPreferences.go
+3
-3
api/agnostic/actorgetPreferences.go
···
7
7
import (
8
8
"context"
9
9
10
-
"github.com/bluesky-social/indigo/xrpc"
10
+
"github.com/bluesky-social/indigo/lex/util"
11
11
)
12
12
13
13
// ActorGetPreferences_Output is the output of a app.bsky.actor.getPreferences call.
···
16
16
}
17
17
18
18
// ActorGetPreferences calls the XRPC method "app.bsky.actor.getPreferences".
19
-
func ActorGetPreferences(ctx context.Context, c *xrpc.Client) (*ActorGetPreferences_Output, error) {
19
+
func ActorGetPreferences(ctx context.Context, c util.LexClient) (*ActorGetPreferences_Output, error) {
20
20
var out ActorGetPreferences_Output
21
21
22
22
params := map[string]interface{}{}
23
-
if err := c.Do(ctx, xrpc.Query, "", "app.bsky.actor.getPreferences", params, nil, &out); err != nil {
23
+
if err := c.LexDo(ctx, util.Query, "", "app.bsky.actor.getPreferences", params, nil, &out); err != nil {
24
24
return nil, err
25
25
}
26
26
+3
-3
api/agnostic/actorputPreferences.go
+3
-3
api/agnostic/actorputPreferences.go
···
7
7
import (
8
8
"context"
9
9
10
-
"github.com/bluesky-social/indigo/xrpc"
10
+
"github.com/bluesky-social/indigo/lex/util"
11
11
)
12
12
13
13
// ActorPutPreferences_Input is the input argument to a app.bsky.actor.putPreferences call.
···
16
16
}
17
17
18
18
// ActorPutPreferences calls the XRPC method "app.bsky.actor.putPreferences".
19
-
func ActorPutPreferences(ctx context.Context, c *xrpc.Client, input *ActorPutPreferences_Input) error {
20
-
if err := c.Do(ctx, xrpc.Procedure, "application/json", "app.bsky.actor.putPreferences", nil, input, nil); err != nil {
19
+
func ActorPutPreferences(ctx context.Context, c util.LexClient, input *ActorPutPreferences_Input) error {
20
+
if err := c.LexDo(ctx, util.Procedure, "application/json", "app.bsky.actor.putPreferences", nil, input, nil); err != nil {
21
21
return err
22
22
}
23
23
+3
-3
api/agnostic/identitygetRecommendedDidCredentials.go
+3
-3
api/agnostic/identitygetRecommendedDidCredentials.go
···
8
8
"context"
9
9
"encoding/json"
10
10
11
-
"github.com/bluesky-social/indigo/xrpc"
11
+
"github.com/bluesky-social/indigo/lex/util"
12
12
)
13
13
14
14
// IdentityGetRecommendedDidCredentials calls the XRPC method "com.atproto.identity.getRecommendedDidCredentials".
15
-
func IdentityGetRecommendedDidCredentials(ctx context.Context, c *xrpc.Client) (*json.RawMessage, error) {
15
+
func IdentityGetRecommendedDidCredentials(ctx context.Context, c util.LexClient) (*json.RawMessage, error) {
16
16
var out json.RawMessage
17
17
18
-
if err := c.Do(ctx, xrpc.Query, "", "com.atproto.identity.getRecommendedDidCredentials", nil, nil, &out); err != nil {
18
+
if err := c.LexDo(ctx, util.Query, "", "com.atproto.identity.getRecommendedDidCredentials", nil, nil, &out); err != nil {
19
19
return nil, err
20
20
}
21
21
+3
-3
api/agnostic/identitysignPlcOperation.go
+3
-3
api/agnostic/identitysignPlcOperation.go
···
8
8
"context"
9
9
"encoding/json"
10
10
11
-
"github.com/bluesky-social/indigo/xrpc"
11
+
"github.com/bluesky-social/indigo/lex/util"
12
12
)
13
13
14
14
// IdentitySignPlcOperation_Input is the input argument to a com.atproto.identity.signPlcOperation call.
···
28
28
}
29
29
30
30
// IdentitySignPlcOperation calls the XRPC method "com.atproto.identity.signPlcOperation".
31
-
func IdentitySignPlcOperation(ctx context.Context, c *xrpc.Client, input *IdentitySignPlcOperation_Input) (*IdentitySignPlcOperation_Output, error) {
31
+
func IdentitySignPlcOperation(ctx context.Context, c util.LexClient, input *IdentitySignPlcOperation_Input) (*IdentitySignPlcOperation_Output, error) {
32
32
var out IdentitySignPlcOperation_Output
33
-
if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.identity.signPlcOperation", nil, input, &out); err != nil {
33
+
if err := c.LexDo(ctx, util.Procedure, "application/json", "com.atproto.identity.signPlcOperation", nil, input, &out); err != nil {
34
34
return nil, err
35
35
}
36
36
+3
-3
api/agnostic/identitysubmitPlcOperation.go
+3
-3
api/agnostic/identitysubmitPlcOperation.go
···
8
8
"context"
9
9
"encoding/json"
10
10
11
-
"github.com/bluesky-social/indigo/xrpc"
11
+
"github.com/bluesky-social/indigo/lex/util"
12
12
)
13
13
14
14
// IdentitySubmitPlcOperation_Input is the input argument to a com.atproto.identity.submitPlcOperation call.
···
17
17
}
18
18
19
19
// IdentitySubmitPlcOperation calls the XRPC method "com.atproto.identity.submitPlcOperation".
20
-
func IdentitySubmitPlcOperation(ctx context.Context, c *xrpc.Client, input *IdentitySubmitPlcOperation_Input) error {
21
-
if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.identity.submitPlcOperation", nil, input, nil); err != nil {
20
+
func IdentitySubmitPlcOperation(ctx context.Context, c util.LexClient, input *IdentitySubmitPlcOperation_Input) error {
21
+
if err := c.LexDo(ctx, util.Procedure, "application/json", "com.atproto.identity.submitPlcOperation", nil, input, nil); err != nil {
22
22
return err
23
23
}
24
24
+2
-3
api/agnostic/repoapplyWrites.go
+2
-3
api/agnostic/repoapplyWrites.go
···
10
10
"fmt"
11
11
12
12
"github.com/bluesky-social/indigo/lex/util"
13
-
"github.com/bluesky-social/indigo/xrpc"
14
13
)
15
14
16
15
// RepoApplyWrites_Create is a "create" in the com.atproto.repo.applyWrites schema.
···
179
178
}
180
179
181
180
// RepoApplyWrites calls the XRPC method "com.atproto.repo.applyWrites".
182
-
func RepoApplyWrites(ctx context.Context, c *xrpc.Client, input *RepoApplyWrites_Input) (*RepoApplyWrites_Output, error) {
181
+
func RepoApplyWrites(ctx context.Context, c util.LexClient, input *RepoApplyWrites_Input) (*RepoApplyWrites_Output, error) {
183
182
var out RepoApplyWrites_Output
184
-
if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.applyWrites", nil, input, &out); err != nil {
183
+
if err := c.LexDo(ctx, util.Procedure, "application/json", "com.atproto.repo.applyWrites", nil, input, &out); err != nil {
185
184
return nil, err
186
185
}
187
186
+3
-3
api/agnostic/repocreateRecord.go
+3
-3
api/agnostic/repocreateRecord.go
···
7
7
import (
8
8
"context"
9
9
10
-
"github.com/bluesky-social/indigo/xrpc"
10
+
"github.com/bluesky-social/indigo/lex/util"
11
11
)
12
12
13
13
// RepoDefs_CommitMeta is a "commitMeta" in the com.atproto.repo.defs schema.
···
41
41
}
42
42
43
43
// RepoCreateRecord calls the XRPC method "com.atproto.repo.createRecord".
44
-
func RepoCreateRecord(ctx context.Context, c *xrpc.Client, input *RepoCreateRecord_Input) (*RepoCreateRecord_Output, error) {
44
+
func RepoCreateRecord(ctx context.Context, c util.LexClient, input *RepoCreateRecord_Input) (*RepoCreateRecord_Output, error) {
45
45
var out RepoCreateRecord_Output
46
-
if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.createRecord", nil, input, &out); err != nil {
46
+
if err := c.LexDo(ctx, util.Procedure, "application/json", "com.atproto.repo.createRecord", nil, input, &out); err != nil {
47
47
return nil, err
48
48
}
49
49
+3
-3
api/agnostic/repogetRecord.go
+3
-3
api/agnostic/repogetRecord.go
···
8
8
"context"
9
9
"encoding/json"
10
10
11
-
"github.com/bluesky-social/indigo/xrpc"
11
+
"github.com/bluesky-social/indigo/lex/util"
12
12
)
13
13
14
14
// RepoGetRecord_Output is the output of a com.atproto.repo.getRecord call.
···
25
25
// collection: The NSID of the record collection.
26
26
// repo: The handle or DID of the repo.
27
27
// rkey: The Record Key.
28
-
func RepoGetRecord(ctx context.Context, c *xrpc.Client, cid string, collection string, repo string, rkey string) (*RepoGetRecord_Output, error) {
28
+
func RepoGetRecord(ctx context.Context, c util.LexClient, cid string, collection string, repo string, rkey string) (*RepoGetRecord_Output, error) {
29
29
var out RepoGetRecord_Output
30
30
31
31
params := map[string]interface{}{
···
36
36
if cid != "" {
37
37
params["cid"] = cid
38
38
}
39
-
if err := c.Do(ctx, xrpc.Query, "", "com.atproto.repo.getRecord", params, nil, &out); err != nil {
39
+
if err := c.LexDo(ctx, util.Query, "", "com.atproto.repo.getRecord", params, nil, &out); err != nil {
40
40
return nil, err
41
41
}
42
42
+3
-3
api/agnostic/repolistRecords.go
+3
-3
api/agnostic/repolistRecords.go
···
8
8
"context"
9
9
"encoding/json"
10
10
11
-
"github.com/bluesky-social/indigo/xrpc"
11
+
"github.com/bluesky-social/indigo/lex/util"
12
12
)
13
13
14
14
// RepoListRecords_Output is the output of a com.atproto.repo.listRecords call.
···
31
31
// limit: The number of records to return.
32
32
// repo: The handle or DID of the repo.
33
33
// reverse: Flag to reverse the order of the returned records.
34
-
func RepoListRecords(ctx context.Context, c *xrpc.Client, collection string, cursor string, limit int64, repo string, reverse bool) (*RepoListRecords_Output, error) {
34
+
func RepoListRecords(ctx context.Context, c util.LexClient, collection string, cursor string, limit int64, repo string, reverse bool) (*RepoListRecords_Output, error) {
35
35
var out RepoListRecords_Output
36
36
37
37
params := map[string]interface{}{
···
48
48
params["reverse"] = reverse
49
49
}
50
50
51
-
if err := c.Do(ctx, xrpc.Query, "", "com.atproto.repo.listRecords", params, nil, &out); err != nil {
51
+
if err := c.LexDo(ctx, util.Query, "", "com.atproto.repo.listRecords", params, nil, &out); err != nil {
52
52
return nil, err
53
53
}
54
54
+3
-3
api/agnostic/repoputRecord.go
+3
-3
api/agnostic/repoputRecord.go
···
7
7
import (
8
8
"context"
9
9
10
-
"github.com/bluesky-social/indigo/xrpc"
10
+
"github.com/bluesky-social/indigo/lex/util"
11
11
)
12
12
13
13
// RepoPutRecord_Input is the input argument to a com.atproto.repo.putRecord call.
···
37
37
}
38
38
39
39
// RepoPutRecord calls the XRPC method "com.atproto.repo.putRecord".
40
-
func RepoPutRecord(ctx context.Context, c *xrpc.Client, input *RepoPutRecord_Input) (*RepoPutRecord_Output, error) {
40
+
func RepoPutRecord(ctx context.Context, c util.LexClient, input *RepoPutRecord_Input) (*RepoPutRecord_Output, error) {
41
41
var out RepoPutRecord_Output
42
-
if err := c.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", nil, input, &out); err != nil {
42
+
if err := c.LexDo(ctx, util.Procedure, "application/json", "com.atproto.repo.putRecord", nil, input, &out); err != nil {
43
43
return nil, err
44
44
}
45
45
-627
api/atproto/cbor_gen.go
-627
api/atproto/cbor_gen.go
···
1206
1206
1207
1207
return nil
1208
1208
}
1209
-
func (t *SyncSubscribeRepos_Handle) MarshalCBOR(w io.Writer) error {
1210
-
if t == nil {
1211
-
_, err := w.Write(cbg.CborNull)
1212
-
return err
1213
-
}
1214
1209
1215
-
cw := cbg.NewCborWriter(w)
1216
-
1217
-
if _, err := cw.Write([]byte{164}); err != nil {
1218
-
return err
1219
-
}
1220
-
1221
-
// t.Did (string) (string)
1222
-
if len("did") > 1000000 {
1223
-
return xerrors.Errorf("Value in field \"did\" was too long")
1224
-
}
1225
-
1226
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil {
1227
-
return err
1228
-
}
1229
-
if _, err := cw.WriteString(string("did")); err != nil {
1230
-
return err
1231
-
}
1232
-
1233
-
if len(t.Did) > 1000000 {
1234
-
return xerrors.Errorf("Value in field t.Did was too long")
1235
-
}
1236
-
1237
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil {
1238
-
return err
1239
-
}
1240
-
if _, err := cw.WriteString(string(t.Did)); err != nil {
1241
-
return err
1242
-
}
1243
-
1244
-
// t.Seq (int64) (int64)
1245
-
if len("seq") > 1000000 {
1246
-
return xerrors.Errorf("Value in field \"seq\" was too long")
1247
-
}
1248
-
1249
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil {
1250
-
return err
1251
-
}
1252
-
if _, err := cw.WriteString(string("seq")); err != nil {
1253
-
return err
1254
-
}
1255
-
1256
-
if t.Seq >= 0 {
1257
-
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil {
1258
-
return err
1259
-
}
1260
-
} else {
1261
-
if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil {
1262
-
return err
1263
-
}
1264
-
}
1265
-
1266
-
// t.Time (string) (string)
1267
-
if len("time") > 1000000 {
1268
-
return xerrors.Errorf("Value in field \"time\" was too long")
1269
-
}
1270
-
1271
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil {
1272
-
return err
1273
-
}
1274
-
if _, err := cw.WriteString(string("time")); err != nil {
1275
-
return err
1276
-
}
1277
-
1278
-
if len(t.Time) > 1000000 {
1279
-
return xerrors.Errorf("Value in field t.Time was too long")
1280
-
}
1281
-
1282
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil {
1283
-
return err
1284
-
}
1285
-
if _, err := cw.WriteString(string(t.Time)); err != nil {
1286
-
return err
1287
-
}
1288
-
1289
-
// t.Handle (string) (string)
1290
-
if len("handle") > 1000000 {
1291
-
return xerrors.Errorf("Value in field \"handle\" was too long")
1292
-
}
1293
-
1294
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("handle"))); err != nil {
1295
-
return err
1296
-
}
1297
-
if _, err := cw.WriteString(string("handle")); err != nil {
1298
-
return err
1299
-
}
1300
-
1301
-
if len(t.Handle) > 1000000 {
1302
-
return xerrors.Errorf("Value in field t.Handle was too long")
1303
-
}
1304
-
1305
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Handle))); err != nil {
1306
-
return err
1307
-
}
1308
-
if _, err := cw.WriteString(string(t.Handle)); err != nil {
1309
-
return err
1310
-
}
1311
-
return nil
1312
-
}
1313
-
1314
-
func (t *SyncSubscribeRepos_Handle) UnmarshalCBOR(r io.Reader) (err error) {
1315
-
*t = SyncSubscribeRepos_Handle{}
1316
-
1317
-
cr := cbg.NewCborReader(r)
1318
-
1319
-
maj, extra, err := cr.ReadHeader()
1320
-
if err != nil {
1321
-
return err
1322
-
}
1323
-
defer func() {
1324
-
if err == io.EOF {
1325
-
err = io.ErrUnexpectedEOF
1326
-
}
1327
-
}()
1328
-
1329
-
if maj != cbg.MajMap {
1330
-
return fmt.Errorf("cbor input should be of type map")
1331
-
}
1332
-
1333
-
if extra > cbg.MaxLength {
1334
-
return fmt.Errorf("SyncSubscribeRepos_Handle: map struct too large (%d)", extra)
1335
-
}
1336
-
1337
-
n := extra
1338
-
1339
-
nameBuf := make([]byte, 6)
1340
-
for i := uint64(0); i < n; i++ {
1341
-
nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000)
1342
-
if err != nil {
1343
-
return err
1344
-
}
1345
-
1346
-
if !ok {
1347
-
// Field doesn't exist on this type, so ignore it
1348
-
if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil {
1349
-
return err
1350
-
}
1351
-
continue
1352
-
}
1353
-
1354
-
switch string(nameBuf[:nameLen]) {
1355
-
// t.Did (string) (string)
1356
-
case "did":
1357
-
1358
-
{
1359
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
1360
-
if err != nil {
1361
-
return err
1362
-
}
1363
-
1364
-
t.Did = string(sval)
1365
-
}
1366
-
// t.Seq (int64) (int64)
1367
-
case "seq":
1368
-
{
1369
-
maj, extra, err := cr.ReadHeader()
1370
-
if err != nil {
1371
-
return err
1372
-
}
1373
-
var extraI int64
1374
-
switch maj {
1375
-
case cbg.MajUnsignedInt:
1376
-
extraI = int64(extra)
1377
-
if extraI < 0 {
1378
-
return fmt.Errorf("int64 positive overflow")
1379
-
}
1380
-
case cbg.MajNegativeInt:
1381
-
extraI = int64(extra)
1382
-
if extraI < 0 {
1383
-
return fmt.Errorf("int64 negative overflow")
1384
-
}
1385
-
extraI = -1 - extraI
1386
-
default:
1387
-
return fmt.Errorf("wrong type for int64 field: %d", maj)
1388
-
}
1389
-
1390
-
t.Seq = int64(extraI)
1391
-
}
1392
-
// t.Time (string) (string)
1393
-
case "time":
1394
-
1395
-
{
1396
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
1397
-
if err != nil {
1398
-
return err
1399
-
}
1400
-
1401
-
t.Time = string(sval)
1402
-
}
1403
-
// t.Handle (string) (string)
1404
-
case "handle":
1405
-
1406
-
{
1407
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
1408
-
if err != nil {
1409
-
return err
1410
-
}
1411
-
1412
-
t.Handle = string(sval)
1413
-
}
1414
-
1415
-
default:
1416
-
// Field doesn't exist on this type, so ignore it
1417
-
if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil {
1418
-
return err
1419
-
}
1420
-
}
1421
-
}
1422
-
1423
-
return nil
1424
-
}
1425
1210
func (t *SyncSubscribeRepos_Identity) MarshalCBOR(w io.Writer) error {
1426
1211
if t == nil {
1427
1212
_, err := w.Write(cbg.CborNull)
···
2094
1879
2095
1880
return nil
2096
1881
}
2097
-
func (t *SyncSubscribeRepos_Migrate) MarshalCBOR(w io.Writer) error {
2098
-
if t == nil {
2099
-
_, err := w.Write(cbg.CborNull)
2100
-
return err
2101
-
}
2102
1882
2103
-
cw := cbg.NewCborWriter(w)
2104
-
2105
-
if _, err := cw.Write([]byte{164}); err != nil {
2106
-
return err
2107
-
}
2108
-
2109
-
// t.Did (string) (string)
2110
-
if len("did") > 1000000 {
2111
-
return xerrors.Errorf("Value in field \"did\" was too long")
2112
-
}
2113
-
2114
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil {
2115
-
return err
2116
-
}
2117
-
if _, err := cw.WriteString(string("did")); err != nil {
2118
-
return err
2119
-
}
2120
-
2121
-
if len(t.Did) > 1000000 {
2122
-
return xerrors.Errorf("Value in field t.Did was too long")
2123
-
}
2124
-
2125
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil {
2126
-
return err
2127
-
}
2128
-
if _, err := cw.WriteString(string(t.Did)); err != nil {
2129
-
return err
2130
-
}
2131
-
2132
-
// t.Seq (int64) (int64)
2133
-
if len("seq") > 1000000 {
2134
-
return xerrors.Errorf("Value in field \"seq\" was too long")
2135
-
}
2136
-
2137
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil {
2138
-
return err
2139
-
}
2140
-
if _, err := cw.WriteString(string("seq")); err != nil {
2141
-
return err
2142
-
}
2143
-
2144
-
if t.Seq >= 0 {
2145
-
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil {
2146
-
return err
2147
-
}
2148
-
} else {
2149
-
if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil {
2150
-
return err
2151
-
}
2152
-
}
2153
-
2154
-
// t.Time (string) (string)
2155
-
if len("time") > 1000000 {
2156
-
return xerrors.Errorf("Value in field \"time\" was too long")
2157
-
}
2158
-
2159
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil {
2160
-
return err
2161
-
}
2162
-
if _, err := cw.WriteString(string("time")); err != nil {
2163
-
return err
2164
-
}
2165
-
2166
-
if len(t.Time) > 1000000 {
2167
-
return xerrors.Errorf("Value in field t.Time was too long")
2168
-
}
2169
-
2170
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil {
2171
-
return err
2172
-
}
2173
-
if _, err := cw.WriteString(string(t.Time)); err != nil {
2174
-
return err
2175
-
}
2176
-
2177
-
// t.MigrateTo (string) (string)
2178
-
if len("migrateTo") > 1000000 {
2179
-
return xerrors.Errorf("Value in field \"migrateTo\" was too long")
2180
-
}
2181
-
2182
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("migrateTo"))); err != nil {
2183
-
return err
2184
-
}
2185
-
if _, err := cw.WriteString(string("migrateTo")); err != nil {
2186
-
return err
2187
-
}
2188
-
2189
-
if t.MigrateTo == nil {
2190
-
if _, err := cw.Write(cbg.CborNull); err != nil {
2191
-
return err
2192
-
}
2193
-
} else {
2194
-
if len(*t.MigrateTo) > 1000000 {
2195
-
return xerrors.Errorf("Value in field t.MigrateTo was too long")
2196
-
}
2197
-
2198
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(*t.MigrateTo))); err != nil {
2199
-
return err
2200
-
}
2201
-
if _, err := cw.WriteString(string(*t.MigrateTo)); err != nil {
2202
-
return err
2203
-
}
2204
-
}
2205
-
return nil
2206
-
}
2207
-
2208
-
func (t *SyncSubscribeRepos_Migrate) UnmarshalCBOR(r io.Reader) (err error) {
2209
-
*t = SyncSubscribeRepos_Migrate{}
2210
-
2211
-
cr := cbg.NewCborReader(r)
2212
-
2213
-
maj, extra, err := cr.ReadHeader()
2214
-
if err != nil {
2215
-
return err
2216
-
}
2217
-
defer func() {
2218
-
if err == io.EOF {
2219
-
err = io.ErrUnexpectedEOF
2220
-
}
2221
-
}()
2222
-
2223
-
if maj != cbg.MajMap {
2224
-
return fmt.Errorf("cbor input should be of type map")
2225
-
}
2226
-
2227
-
if extra > cbg.MaxLength {
2228
-
return fmt.Errorf("SyncSubscribeRepos_Migrate: map struct too large (%d)", extra)
2229
-
}
2230
-
2231
-
n := extra
2232
-
2233
-
nameBuf := make([]byte, 9)
2234
-
for i := uint64(0); i < n; i++ {
2235
-
nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000)
2236
-
if err != nil {
2237
-
return err
2238
-
}
2239
-
2240
-
if !ok {
2241
-
// Field doesn't exist on this type, so ignore it
2242
-
if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil {
2243
-
return err
2244
-
}
2245
-
continue
2246
-
}
2247
-
2248
-
switch string(nameBuf[:nameLen]) {
2249
-
// t.Did (string) (string)
2250
-
case "did":
2251
-
2252
-
{
2253
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
2254
-
if err != nil {
2255
-
return err
2256
-
}
2257
-
2258
-
t.Did = string(sval)
2259
-
}
2260
-
// t.Seq (int64) (int64)
2261
-
case "seq":
2262
-
{
2263
-
maj, extra, err := cr.ReadHeader()
2264
-
if err != nil {
2265
-
return err
2266
-
}
2267
-
var extraI int64
2268
-
switch maj {
2269
-
case cbg.MajUnsignedInt:
2270
-
extraI = int64(extra)
2271
-
if extraI < 0 {
2272
-
return fmt.Errorf("int64 positive overflow")
2273
-
}
2274
-
case cbg.MajNegativeInt:
2275
-
extraI = int64(extra)
2276
-
if extraI < 0 {
2277
-
return fmt.Errorf("int64 negative overflow")
2278
-
}
2279
-
extraI = -1 - extraI
2280
-
default:
2281
-
return fmt.Errorf("wrong type for int64 field: %d", maj)
2282
-
}
2283
-
2284
-
t.Seq = int64(extraI)
2285
-
}
2286
-
// t.Time (string) (string)
2287
-
case "time":
2288
-
2289
-
{
2290
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
2291
-
if err != nil {
2292
-
return err
2293
-
}
2294
-
2295
-
t.Time = string(sval)
2296
-
}
2297
-
// t.MigrateTo (string) (string)
2298
-
case "migrateTo":
2299
-
2300
-
{
2301
-
b, err := cr.ReadByte()
2302
-
if err != nil {
2303
-
return err
2304
-
}
2305
-
if b != cbg.CborNull[0] {
2306
-
if err := cr.UnreadByte(); err != nil {
2307
-
return err
2308
-
}
2309
-
2310
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
2311
-
if err != nil {
2312
-
return err
2313
-
}
2314
-
2315
-
t.MigrateTo = (*string)(&sval)
2316
-
}
2317
-
}
2318
-
2319
-
default:
2320
-
// Field doesn't exist on this type, so ignore it
2321
-
if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil {
2322
-
return err
2323
-
}
2324
-
}
2325
-
}
2326
-
2327
-
return nil
2328
-
}
2329
1883
func (t *SyncSubscribeRepos_RepoOp) MarshalCBOR(w io.Writer) error {
2330
1884
if t == nil {
2331
1885
_, err := w.Write(cbg.CborNull)
···
2540
2094
2541
2095
return nil
2542
2096
}
2543
-
func (t *SyncSubscribeRepos_Tombstone) MarshalCBOR(w io.Writer) error {
2544
-
if t == nil {
2545
-
_, err := w.Write(cbg.CborNull)
2546
-
return err
2547
-
}
2548
2097
2549
-
cw := cbg.NewCborWriter(w)
2550
-
2551
-
if _, err := cw.Write([]byte{163}); err != nil {
2552
-
return err
2553
-
}
2554
-
2555
-
// t.Did (string) (string)
2556
-
if len("did") > 1000000 {
2557
-
return xerrors.Errorf("Value in field \"did\" was too long")
2558
-
}
2559
-
2560
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("did"))); err != nil {
2561
-
return err
2562
-
}
2563
-
if _, err := cw.WriteString(string("did")); err != nil {
2564
-
return err
2565
-
}
2566
-
2567
-
if len(t.Did) > 1000000 {
2568
-
return xerrors.Errorf("Value in field t.Did was too long")
2569
-
}
2570
-
2571
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Did))); err != nil {
2572
-
return err
2573
-
}
2574
-
if _, err := cw.WriteString(string(t.Did)); err != nil {
2575
-
return err
2576
-
}
2577
-
2578
-
// t.Seq (int64) (int64)
2579
-
if len("seq") > 1000000 {
2580
-
return xerrors.Errorf("Value in field \"seq\" was too long")
2581
-
}
2582
-
2583
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("seq"))); err != nil {
2584
-
return err
2585
-
}
2586
-
if _, err := cw.WriteString(string("seq")); err != nil {
2587
-
return err
2588
-
}
2589
-
2590
-
if t.Seq >= 0 {
2591
-
if err := cw.WriteMajorTypeHeader(cbg.MajUnsignedInt, uint64(t.Seq)); err != nil {
2592
-
return err
2593
-
}
2594
-
} else {
2595
-
if err := cw.WriteMajorTypeHeader(cbg.MajNegativeInt, uint64(-t.Seq-1)); err != nil {
2596
-
return err
2597
-
}
2598
-
}
2599
-
2600
-
// t.Time (string) (string)
2601
-
if len("time") > 1000000 {
2602
-
return xerrors.Errorf("Value in field \"time\" was too long")
2603
-
}
2604
-
2605
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len("time"))); err != nil {
2606
-
return err
2607
-
}
2608
-
if _, err := cw.WriteString(string("time")); err != nil {
2609
-
return err
2610
-
}
2611
-
2612
-
if len(t.Time) > 1000000 {
2613
-
return xerrors.Errorf("Value in field t.Time was too long")
2614
-
}
2615
-
2616
-
if err := cw.WriteMajorTypeHeader(cbg.MajTextString, uint64(len(t.Time))); err != nil {
2617
-
return err
2618
-
}
2619
-
if _, err := cw.WriteString(string(t.Time)); err != nil {
2620
-
return err
2621
-
}
2622
-
return nil
2623
-
}
2624
-
2625
-
func (t *SyncSubscribeRepos_Tombstone) UnmarshalCBOR(r io.Reader) (err error) {
2626
-
*t = SyncSubscribeRepos_Tombstone{}
2627
-
2628
-
cr := cbg.NewCborReader(r)
2629
-
2630
-
maj, extra, err := cr.ReadHeader()
2631
-
if err != nil {
2632
-
return err
2633
-
}
2634
-
defer func() {
2635
-
if err == io.EOF {
2636
-
err = io.ErrUnexpectedEOF
2637
-
}
2638
-
}()
2639
-
2640
-
if maj != cbg.MajMap {
2641
-
return fmt.Errorf("cbor input should be of type map")
2642
-
}
2643
-
2644
-
if extra > cbg.MaxLength {
2645
-
return fmt.Errorf("SyncSubscribeRepos_Tombstone: map struct too large (%d)", extra)
2646
-
}
2647
-
2648
-
n := extra
2649
-
2650
-
nameBuf := make([]byte, 4)
2651
-
for i := uint64(0); i < n; i++ {
2652
-
nameLen, ok, err := cbg.ReadFullStringIntoBuf(cr, nameBuf, 1000000)
2653
-
if err != nil {
2654
-
return err
2655
-
}
2656
-
2657
-
if !ok {
2658
-
// Field doesn't exist on this type, so ignore it
2659
-
if err := cbg.ScanForLinks(cr, func(cid.Cid) {}); err != nil {
2660
-
return err
2661
-
}
2662
-
continue
2663
-
}
2664
-
2665
-
switch string(nameBuf[:nameLen]) {
2666
-
// t.Did (string) (string)
2667
-
case "did":
2668
-
2669
-
{
2670
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
2671
-
if err != nil {
2672
-
return err
2673
-
}
2674
-
2675
-
t.Did = string(sval)
2676
-
}
2677
-
// t.Seq (int64) (int64)
2678
-
case "seq":
2679
-
{
2680
-
maj, extra, err := cr.ReadHeader()
2681
-
if err != nil {
2682
-
return err
2683
-
}
2684
-
var extraI int64
2685
-
switch maj {
2686
-
case cbg.MajUnsignedInt:
2687
-
extraI = int64(extra)
2688
-
if extraI < 0 {
2689
-
return fmt.Errorf("int64 positive overflow")
2690
-
}
2691
-
case cbg.MajNegativeInt:
2692
-
extraI = int64(extra)
2693
-
if extraI < 0 {
2694
-
return fmt.Errorf("int64 negative overflow")
2695
-
}
2696
-
extraI = -1 - extraI
2697
-
default:
2698
-
return fmt.Errorf("wrong type for int64 field: %d", maj)
2699
-
}
2700
-
2701
-
t.Seq = int64(extraI)
2702
-
}
2703
-
// t.Time (string) (string)
2704
-
case "time":
2705
-
2706
-
{
2707
-
sval, err := cbg.ReadStringWithMax(cr, 1000000)
2708
-
if err != nil {
2709
-
return err
2710
-
}
2711
-
2712
-
t.Time = string(sval)
2713
-
}
2714
-
2715
-
default:
2716
-
// Field doesn't exist on this type, so ignore it
2717
-
if err := cbg.ScanForLinks(r, func(cid.Cid) {}); err != nil {
2718
-
return err
2719
-
}
2720
-
}
2721
-
}
2722
-
2723
-
return nil
2724
-
}
2725
2098
func (t *LabelDefs_SelfLabels) MarshalCBOR(w io.Writer) error {
2726
2099
if t == nil {
2727
2100
_, err := w.Write(cbg.CborNull)
-29
api/atproto/syncsubscribeRepos.go
-29
api/atproto/syncsubscribeRepos.go
···
49
49
TooBig bool `json:"tooBig" cborgen:"tooBig"`
50
50
}
51
51
52
-
// SyncSubscribeRepos_Handle is a "handle" in the com.atproto.sync.subscribeRepos schema.
53
-
//
54
-
// DEPRECATED -- Use #identity event instead
55
-
type SyncSubscribeRepos_Handle struct {
56
-
Did string `json:"did" cborgen:"did"`
57
-
Handle string `json:"handle" cborgen:"handle"`
58
-
Seq int64 `json:"seq" cborgen:"seq"`
59
-
Time string `json:"time" cborgen:"time"`
60
-
}
61
-
62
52
// SyncSubscribeRepos_Identity is a "identity" in the com.atproto.sync.subscribeRepos schema.
63
53
//
64
54
// Represents a change to an account's identity. Could be an updated handle, signing key, or pds hosting endpoint. Serves as a prod to all downstream services to refresh their identity cache.
···
76
66
Name string `json:"name" cborgen:"name"`
77
67
}
78
68
79
-
// SyncSubscribeRepos_Migrate is a "migrate" in the com.atproto.sync.subscribeRepos schema.
80
-
//
81
-
// DEPRECATED -- Use #account event instead
82
-
type SyncSubscribeRepos_Migrate struct {
83
-
Did string `json:"did" cborgen:"did"`
84
-
MigrateTo *string `json:"migrateTo" cborgen:"migrateTo"`
85
-
Seq int64 `json:"seq" cborgen:"seq"`
86
-
Time string `json:"time" cborgen:"time"`
87
-
}
88
-
89
69
// SyncSubscribeRepos_RepoOp is a "repoOp" in the com.atproto.sync.subscribeRepos schema.
90
70
//
91
71
// A repo operation, ie a mutation of a single record.
···
113
93
// time: Timestamp of when this message was originally broadcast.
114
94
Time string `json:"time" cborgen:"time"`
115
95
}
116
-
117
-
// SyncSubscribeRepos_Tombstone is a "tombstone" in the com.atproto.sync.subscribeRepos schema.
118
-
//
119
-
// DEPRECATED -- Use #account event instead
120
-
type SyncSubscribeRepos_Tombstone struct {
121
-
Did string `json:"did" cborgen:"did"`
122
-
Seq int64 `json:"seq" cborgen:"seq"`
123
-
Time string `json:"time" cborgen:"time"`
124
-
}
+6
api/bsky/feeddefs.go
+6
api/bsky/feeddefs.go
···
35
35
Post *FeedDefs_PostView `json:"post" cborgen:"post"`
36
36
Reason *FeedDefs_FeedViewPost_Reason `json:"reason,omitempty" cborgen:"reason,omitempty"`
37
37
Reply *FeedDefs_ReplyRef `json:"reply,omitempty" cborgen:"reply,omitempty"`
38
+
// reqId: Unique identifier per request that may be passed back alongside interactions.
39
+
ReqId *string `json:"reqId,omitempty" cborgen:"reqId,omitempty"`
38
40
}
39
41
40
42
type FeedDefs_FeedViewPost_Reason struct {
···
104
106
// feedContext: Context on a feed item that was originally supplied by the feed generator on getFeedSkeleton.
105
107
FeedContext *string `json:"feedContext,omitempty" cborgen:"feedContext,omitempty"`
106
108
Item *string `json:"item,omitempty" cborgen:"item,omitempty"`
109
+
// reqId: Unique identifier per request that may be passed back alongside interactions.
110
+
ReqId *string `json:"reqId,omitempty" cborgen:"reqId,omitempty"`
107
111
}
108
112
109
113
// FeedDefs_NotFoundPost is a "notFoundPost" in the app.bsky.feed.defs schema.
···
207
211
type FeedDefs_ReasonRepost struct {
208
212
LexiconTypeID string `json:"$type,const=app.bsky.feed.defs#reasonRepost" cborgen:"$type,const=app.bsky.feed.defs#reasonRepost"`
209
213
By *ActorDefs_ProfileViewBasic `json:"by" cborgen:"by"`
214
+
Cid *string `json:"cid,omitempty" cborgen:"cid,omitempty"`
210
215
IndexedAt string `json:"indexedAt" cborgen:"indexedAt"`
216
+
Uri *string `json:"uri,omitempty" cborgen:"uri,omitempty"`
211
217
}
212
218
213
219
// FeedDefs_ReplyRef is a "replyRef" in the app.bsky.feed.defs schema.
+2
api/bsky/feedgetFeedSkeleton.go
+2
api/bsky/feedgetFeedSkeleton.go
···
14
14
type FeedGetFeedSkeleton_Output struct {
15
15
Cursor *string `json:"cursor,omitempty" cborgen:"cursor,omitempty"`
16
16
Feed []*FeedDefs_SkeletonFeedPost `json:"feed" cborgen:"feed"`
17
+
// reqId: Unique identifier per request that may be passed back alongside interactions.
18
+
ReqId *string `json:"reqId,omitempty" cborgen:"reqId,omitempty"`
17
19
}
18
20
19
21
// FeedGetFeedSkeleton calls the XRPC method "app.bsky.feed.getFeedSkeleton".
+1
api/bsky/feedlike.go
+1
api/bsky/feedlike.go
···
17
17
LexiconTypeID string `json:"$type,const=app.bsky.feed.like" cborgen:"$type,const=app.bsky.feed.like"`
18
18
CreatedAt string `json:"createdAt" cborgen:"createdAt"`
19
19
Subject *comatprototypes.RepoStrongRef `json:"subject" cborgen:"subject"`
20
+
Via *comatprototypes.RepoStrongRef `json:"via,omitempty" cborgen:"via,omitempty"`
20
21
}
+1
api/bsky/feedrepost.go
+1
api/bsky/feedrepost.go
···
17
17
LexiconTypeID string `json:"$type,const=app.bsky.feed.repost" cborgen:"$type,const=app.bsky.feed.repost"`
18
18
CreatedAt string `json:"createdAt" cborgen:"createdAt"`
19
19
Subject *comatprototypes.RepoStrongRef `json:"subject" cborgen:"subject"`
20
+
Via *comatprototypes.RepoStrongRef `json:"via,omitempty" cborgen:"via,omitempty"`
20
21
}
+1
-1
api/bsky/notificationlistNotifications.go
+1
-1
api/bsky/notificationlistNotifications.go
···
19
19
IndexedAt string `json:"indexedAt" cborgen:"indexedAt"`
20
20
IsRead bool `json:"isRead" cborgen:"isRead"`
21
21
Labels []*comatprototypes.LabelDefs_Label `json:"labels,omitempty" cborgen:"labels,omitempty"`
22
-
// reason: Expected values are 'like', 'repost', 'follow', 'mention', 'reply', 'quote', 'starterpack-joined', 'verified', and 'unverified'.
22
+
// reason: The reason why this notification was delivered - e.g. your post was liked, or you received a new follower.
23
23
Reason string `json:"reason" cborgen:"reason"`
24
24
ReasonSubject *string `json:"reasonSubject,omitempty" cborgen:"reasonSubject,omitempty"`
25
25
Record *util.LexiconTypeDecoder `json:"record" cborgen:"record"`
+8
-1
api/bsky/unspeccedgetConfig.go
+8
-1
api/bsky/unspeccedgetConfig.go
···
10
10
"github.com/bluesky-social/indigo/xrpc"
11
11
)
12
12
13
+
// UnspeccedGetConfig_LiveNowConfig is a "liveNowConfig" in the app.bsky.unspecced.getConfig schema.
14
+
type UnspeccedGetConfig_LiveNowConfig struct {
15
+
Did string `json:"did" cborgen:"did"`
16
+
Domains []string `json:"domains" cborgen:"domains"`
17
+
}
18
+
13
19
// UnspeccedGetConfig_Output is the output of a app.bsky.unspecced.getConfig call.
14
20
type UnspeccedGetConfig_Output struct {
15
-
CheckEmailConfirmed *bool `json:"checkEmailConfirmed,omitempty" cborgen:"checkEmailConfirmed,omitempty"`
21
+
CheckEmailConfirmed *bool `json:"checkEmailConfirmed,omitempty" cborgen:"checkEmailConfirmed,omitempty"`
22
+
LiveNow []*UnspeccedGetConfig_LiveNowConfig `json:"liveNow,omitempty" cborgen:"liveNow,omitempty"`
16
23
}
17
24
18
25
// UnspeccedGetConfig calls the XRPC method "app.bsky.unspecced.getConfig".
-1
api/ozone/moderationdefs.go
-1
api/ozone/moderationdefs.go
···
1043
1043
//
1044
1044
// Detailed view of a subject. For record subjects, the author's repo and profile will be returned.
1045
1045
type ModerationDefs_SubjectView struct {
1046
-
//Profile *ModerationDefs_SubjectView_Profile `json:"profile,omitempty" cborgen:"profile,omitempty"`
1047
1046
Record *ModerationDefs_RecordViewDetail `json:"record,omitempty" cborgen:"record,omitempty"`
1048
1047
Repo *ModerationDefs_RepoViewDetail `json:"repo,omitempty" cborgen:"repo,omitempty"`
1049
1048
Status *ModerationDefs_SubjectStatusView `json:"status,omitempty" cborgen:"status,omitempty"`
+4
-6
api/ozone/verificationdefs.go
+4
-6
api/ozone/verificationdefs.go
···
22
22
// handle: Handle of the subject the verification applies to at the moment of verifying, which might not be the same at the time of viewing. The verification is only valid if the current handle matches the one at the time of verifying.
23
23
Handle string `json:"handle" cborgen:"handle"`
24
24
// issuer: The user who issued this verification.
25
-
Issuer string `json:"issuer" cborgen:"issuer"`
26
-
//IssuerProfile *VerificationDefs_VerificationView_IssuerProfile `json:"issuerProfile,omitempty" cborgen:"issuerProfile,omitempty"`
27
-
//IssuerRepo *VerificationDefs_VerificationView_IssuerRepo `json:"issuerRepo,omitempty" cborgen:"issuerRepo,omitempty"`
25
+
Issuer string `json:"issuer" cborgen:"issuer"`
26
+
IssuerRepo *VerificationDefs_VerificationView_IssuerRepo `json:"issuerRepo,omitempty" cborgen:"issuerRepo,omitempty"`
28
27
// revokeReason: Describes the reason for revocation, also indicating that the verification is no longer valid.
29
28
RevokeReason *string `json:"revokeReason,omitempty" cborgen:"revokeReason,omitempty"`
30
29
// revokedAt: Timestamp when the verification was revoked.
···
32
31
// revokedBy: The user who revoked this verification.
33
32
RevokedBy *string `json:"revokedBy,omitempty" cborgen:"revokedBy,omitempty"`
34
33
// subject: The subject of the verification.
35
-
Subject string `json:"subject" cborgen:"subject"`
36
-
//SubjectProfile *VerificationDefs_VerificationView_SubjectProfile `json:"subjectProfile,omitempty" cborgen:"subjectProfile,omitempty"`
37
-
//SubjectRepo *VerificationDefs_VerificationView_SubjectRepo `json:"subjectRepo,omitempty" cborgen:"subjectRepo,omitempty"`
34
+
Subject string `json:"subject" cborgen:"subject"`
35
+
SubjectRepo *VerificationDefs_VerificationView_SubjectRepo `json:"subjectRepo,omitempty" cborgen:"subjectRepo,omitempty"`
38
36
// uri: The AT-URI of the verification record.
39
37
Uri string `json:"uri" cborgen:"uri"`
40
38
}
+124
atproto/crypto/jwk.go
+124
atproto/crypto/jwk.go
···
1
+
package crypto
2
+
3
+
import (
4
+
"crypto/ecdsa"
5
+
"crypto/elliptic"
6
+
"encoding/base64"
7
+
"encoding/json"
8
+
"fmt"
9
+
"math/big"
10
+
11
+
secp256k1 "gitlab.com/yawning/secp256k1-voi"
12
+
secp256k1secec "gitlab.com/yawning/secp256k1-voi/secec"
13
+
)
14
+
15
+
// Representation of a JSON Web Key (JWK), as relevant to the keys supported by this package.
16
+
//
17
+
// Expected to be marshalled/unmarshalled as JSON.
18
+
type JWK struct {
19
+
KeyType string `json:"kty"`
20
+
Curve string `json:"crv"`
21
+
X string `json:"x"` // base64url, no padding
22
+
Y string `json:"y"` // base64url, no padding
23
+
Use string `json:"use,omitempty"`
24
+
KeyID *string `json:"kid,omitempty"`
25
+
}
26
+
27
+
// Loads a [PublicKey] from JWK (serialized as JSON bytes)
28
+
func ParsePublicJWKBytes(jwkBytes []byte) (PublicKey, error) {
29
+
var jwk JWK
30
+
if err := json.Unmarshal(jwkBytes, &jwk); err != nil {
31
+
return nil, fmt.Errorf("parsing JWK JSON: %w", err)
32
+
}
33
+
return ParsePublicJWK(jwk)
34
+
}
35
+
36
+
// Loads a [PublicKey] from JWK struct.
37
+
func ParsePublicJWK(jwk JWK) (PublicKey, error) {
38
+
39
+
if jwk.KeyType != "EC" {
40
+
return nil, fmt.Errorf("unsupported JWK key type: %s", jwk.KeyType)
41
+
}
42
+
43
+
// base64url with no encoding
44
+
xbuf, err := base64.RawURLEncoding.DecodeString(jwk.X)
45
+
if err != nil {
46
+
return nil, fmt.Errorf("invalid JWK base64 encoding: %w", err)
47
+
}
48
+
ybuf, err := base64.RawURLEncoding.DecodeString(jwk.Y)
49
+
if err != nil {
50
+
return nil, fmt.Errorf("invalid JWK base64 encoding: %w", err)
51
+
}
52
+
53
+
switch jwk.Curve {
54
+
case "P-256":
55
+
curve := elliptic.P256()
56
+
57
+
var x, y big.Int
58
+
x.SetBytes(xbuf)
59
+
y.SetBytes(ybuf)
60
+
61
+
if !curve.Params().IsOnCurve(&x, &y) {
62
+
return nil, fmt.Errorf("invalid P-256 public key (not on curve)")
63
+
}
64
+
pubECDSA := &ecdsa.PublicKey{
65
+
Curve: curve,
66
+
X: &x,
67
+
Y: &y,
68
+
}
69
+
pub := PublicKeyP256{pubP256: *pubECDSA}
70
+
err := pub.checkCurve()
71
+
if err != nil {
72
+
return nil, err
73
+
}
74
+
return &pub, nil
75
+
case "secp256k1": // K-256
76
+
if len(xbuf) != 32 || len(ybuf) != 32 {
77
+
return nil, fmt.Errorf("invalid K-256 coordinates")
78
+
}
79
+
xarr := ([32]byte)(xbuf[:32])
80
+
yarr := ([32]byte)(ybuf[:32])
81
+
p, err := secp256k1.NewPointFromCoords(&xarr, &yarr)
82
+
if err != nil {
83
+
return nil, fmt.Errorf("invalid K-256 coordinates: %w", err)
84
+
}
85
+
pubK, err := secp256k1secec.NewPublicKeyFromPoint(p)
86
+
if err != nil {
87
+
return nil, fmt.Errorf("invalid K-256/secp256k1 public key: %w", err)
88
+
}
89
+
pub := PublicKeyK256{pubK256: pubK}
90
+
err = pub.ensureBytes()
91
+
if err != nil {
92
+
return nil, err
93
+
}
94
+
return &pub, nil
95
+
default:
96
+
return nil, fmt.Errorf("unsupported JWK cryptography: %s", jwk.Curve)
97
+
}
98
+
}
99
+
100
+
func (k *PublicKeyP256) JWK() (*JWK, error) {
101
+
jwk := JWK{
102
+
KeyType: "EC",
103
+
Curve: "P-256",
104
+
X: base64.RawURLEncoding.EncodeToString(k.pubP256.X.Bytes()),
105
+
Y: base64.RawURLEncoding.EncodeToString(k.pubP256.Y.Bytes()),
106
+
}
107
+
return &jwk, nil
108
+
}
109
+
110
+
func (k *PublicKeyK256) JWK() (*JWK, error) {
111
+
raw := k.UncompressedBytes()
112
+
if len(raw) != 65 {
113
+
return nil, fmt.Errorf("unexpected K-256 bytes size")
114
+
}
115
+
xbytes := raw[1:33]
116
+
ybytes := raw[33:65]
117
+
jwk := JWK{
118
+
KeyType: "EC",
119
+
Curve: "secp256k1",
120
+
X: base64.RawURLEncoding.EncodeToString(xbytes),
121
+
Y: base64.RawURLEncoding.EncodeToString(ybytes),
122
+
}
123
+
return &jwk, nil
124
+
}
+98
atproto/crypto/jwk_test.go
+98
atproto/crypto/jwk_test.go
···
1
+
package crypto
2
+
3
+
import (
4
+
"testing"
5
+
6
+
"github.com/stretchr/testify/assert"
7
+
)
8
+
9
+
func TestParseJWK(t *testing.T) {
10
+
assert := assert.New(t)
11
+
12
+
jwkTestFixtures := []string{
13
+
// https://datatracker.ietf.org/doc/html/rfc7517#appendix-A.1
14
+
`{
15
+
"kty":"EC",
16
+
"crv":"P-256",
17
+
"x":"MKBCTNIcKUSDii11ySs3526iDZ8AiTo7Tu6KPAqv7D4",
18
+
"y":"4Etl6SRW2YiLUrN5vfvVHuhp7x8PxltmWWlbbM4IFyM",
19
+
"d":"870MB6gfuTJ4HtUnUvYMyJpr5eUZNP4Bk43bVdj3eAE",
20
+
"use":"enc",
21
+
"kid":"1"
22
+
}`,
23
+
// https://openid.net/specs/draft-jones-json-web-key-03.html; with kty in addition to alg
24
+
`{
25
+
"alg":"EC",
26
+
"kty":"EC",
27
+
"crv":"P-256",
28
+
"x":"MKBCTNIcKUSDii11ySs3526iDZ8AiTo7Tu6KPAqv7D4",
29
+
"y":"4Etl6SRW2YiLUrN5vfvVHuhp7x8PxltmWWlbbM4IFyM",
30
+
"use":"enc",
31
+
"kid":"1"
32
+
}`,
33
+
// https://w3c-ccg.github.io/lds-ecdsa-secp256k1-2019/; with kty in addition to alg
34
+
`{
35
+
"alg": "EC",
36
+
"kty": "EC",
37
+
"crv": "secp256k1",
38
+
"kid": "JUvpllMEYUZ2joO59UNui_XYDqxVqiFLLAJ8klWuPBw",
39
+
"x": "dWCvM4fTdeM0KmloF57zxtBPXTOythHPMm1HCLrdd3A",
40
+
"y": "36uMVGM7hnw-N6GnjFcihWE3SkrhMLzzLCdPMXPEXlA"
41
+
}`,
42
+
}
43
+
44
+
for _, jwkBytes := range jwkTestFixtures {
45
+
_, err := ParsePublicJWKBytes([]byte(jwkBytes))
46
+
assert.NoError(err)
47
+
}
48
+
}
49
+
50
+
func TestP256GenJWK(t *testing.T) {
51
+
assert := assert.New(t)
52
+
53
+
priv, err := GeneratePrivateKeyP256()
54
+
if err != nil {
55
+
t.Fatal(err)
56
+
}
57
+
pub, err := priv.PublicKey()
58
+
if err != nil {
59
+
t.Fatal(err)
60
+
}
61
+
62
+
pk, ok := pub.(*PublicKeyP256)
63
+
if !ok {
64
+
t.Fatal()
65
+
}
66
+
jwk, err := pk.JWK()
67
+
if err != nil {
68
+
t.Fatal(err)
69
+
}
70
+
71
+
_, err = ParsePublicJWK(*jwk)
72
+
assert.NoError(err)
73
+
}
74
+
75
+
func TestK256GenJWK(t *testing.T) {
76
+
assert := assert.New(t)
77
+
78
+
priv, err := GeneratePrivateKeyK256()
79
+
if err != nil {
80
+
t.Fatal(err)
81
+
}
82
+
pub, err := priv.PublicKey()
83
+
if err != nil {
84
+
t.Fatal(err)
85
+
}
86
+
87
+
pk, ok := pub.(*PublicKeyK256)
88
+
if !ok {
89
+
t.Fatal()
90
+
}
91
+
jwk, err := pk.JWK()
92
+
if err != nil {
93
+
t.Fatal(err)
94
+
}
95
+
96
+
_, err = ParsePublicJWK(*jwk)
97
+
assert.NoError(err)
98
+
}
+5
-1
atproto/crypto/keys.go
+5
-1
atproto/crypto/keys.go
···
30
30
// No ASN.1 or other enclosing structure is applied to the bytes.
31
31
Bytes() []byte
32
32
33
-
// NOTE: should Multibase() (string, error) be part of this interface? Probably.
33
+
// String serialization of the key bytes in "Multibase" format.
34
+
Multibase() string
34
35
}
35
36
36
37
// Common interface for all the supported atproto cryptographic systems.
···
63
64
// For systems with no compressed/uncompressed distinction, returns the same
64
65
// value as Bytes().
65
66
UncompressedBytes() []byte
67
+
68
+
// Serialization as JWK struct (which can be marshalled to JSON)
69
+
JWK() (*JWK, error)
66
70
}
67
71
68
72
var ErrInvalidSignature = errors.New("crytographic signature invalid")
+2
atproto/crypto/keys_test.go
+2
atproto/crypto/keys_test.go
···
166
166
assert.NoError(err)
167
167
_, ok := privP256FromMB.(*PrivateKeyP256)
168
168
assert.True(ok)
169
+
assert.Equal(privP256MB, privP256FromMB.Multibase())
169
170
170
171
privK256FromMB, err := ParsePrivateMultibase(privK256MB)
171
172
assert.NoError(err)
172
173
_, ok = privK256FromMB.(*PrivateKeyK256)
173
174
assert.True(ok)
175
+
assert.Equal(privK256MB, privK256FromMB.Multibase())
174
176
}
+1
-1
atproto/data/parse.go
+1
-1
atproto/data/parse.go
-78
bgs/bgs.go
-78
bgs/bgs.go
···
903
903
904
904
repoCommitsResultCounter.WithLabelValues(host.Host, "ok").Inc()
905
905
return nil
906
-
case env.RepoHandle != nil:
907
-
bgs.log.Info("bgs got repo handle event", "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle)
908
-
// Flush any cached DID documents for this user
909
-
bgs.didr.FlushCacheFor(env.RepoHandle.Did)
910
-
911
-
// TODO: ignoring the data in the message and just going out to the DID doc
912
-
act, err := bgs.createExternalUser(ctx, env.RepoHandle.Did)
913
-
if err != nil {
914
-
return err
915
-
}
916
-
917
-
if act.Handle.String != env.RepoHandle.Handle {
918
-
bgs.log.Warn("handle update did not update handle to asserted value", "did", env.RepoHandle.Did, "expected", env.RepoHandle.Handle, "actual", act.Handle)
919
-
}
920
-
921
-
// TODO: Update the ReposHandle event type to include "verified" or something
922
-
923
-
// Broadcast the handle update to all consumers
924
-
err = bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{
925
-
RepoHandle: &comatproto.SyncSubscribeRepos_Handle{
926
-
Did: env.RepoHandle.Did,
927
-
Handle: env.RepoHandle.Handle,
928
-
Time: env.RepoHandle.Time,
929
-
},
930
-
})
931
-
if err != nil {
932
-
bgs.log.Error("failed to broadcast RepoHandle event", "error", err, "did", env.RepoHandle.Did, "handle", env.RepoHandle.Handle)
933
-
return fmt.Errorf("failed to broadcast RepoHandle event: %w", err)
934
-
}
935
-
936
-
return nil
937
906
case env.RepoIdentity != nil:
938
907
bgs.log.Info("bgs got identity event", "did", env.RepoIdentity.Did)
939
908
// Flush any cached DID documents for this user
···
1035
1004
}
1036
1005
1037
1006
return nil
1038
-
case env.RepoMigrate != nil:
1039
-
if _, err := bgs.createExternalUser(ctx, env.RepoMigrate.Did); err != nil {
1040
-
return err
1041
-
}
1042
-
1043
-
return nil
1044
-
case env.RepoTombstone != nil:
1045
-
if err := bgs.handleRepoTombstone(ctx, host, env.RepoTombstone); err != nil {
1046
-
return err
1047
-
}
1048
-
1049
-
return nil
1050
1007
default:
1051
1008
return fmt.Errorf("invalid fed event")
1052
1009
}
1053
-
}
1054
-
1055
-
func (bgs *BGS) handleRepoTombstone(ctx context.Context, pds *models.PDS, evt *atproto.SyncSubscribeRepos_Tombstone) error {
1056
-
u, err := bgs.lookupUserByDid(ctx, evt.Did)
1057
-
if err != nil {
1058
-
return err
1059
-
}
1060
-
1061
-
if u.PDS != pds.ID {
1062
-
return fmt.Errorf("unauthoritative tombstone event from %s for %s", pds.Host, evt.Did)
1063
-
}
1064
-
1065
-
if err := bgs.db.Model(&User{}).Where("id = ?", u.ID).UpdateColumns(map[string]any{
1066
-
"tombstoned": true,
1067
-
"handle": nil,
1068
-
}).Error; err != nil {
1069
-
return err
1070
-
}
1071
-
u.SetTombstoned(true)
1072
-
1073
-
if err := bgs.db.Model(&models.ActorInfo{}).Where("uid = ?", u.ID).UpdateColumns(map[string]any{
1074
-
"handle": nil,
1075
-
}).Error; err != nil {
1076
-
return err
1077
-
}
1078
-
1079
-
// delete data from carstore
1080
-
if err := bgs.repoman.TakeDownRepo(ctx, u.ID); err != nil {
1081
-
// don't let a failure here prevent us from propagating this event
1082
-
bgs.log.Error("failed to delete user data from carstore", "err", err)
1083
-
}
1084
-
1085
-
return bgs.events.AddEvent(ctx, &events.XRPCStreamEvent{
1086
-
RepoTombstone: evt,
1087
-
})
1088
1010
}
1089
1011
1090
1012
// TODO: rename? This also updates users, and 'external' is an old phrasing
-45
bgs/fedmgr.go
-45
bgs/fedmgr.go
···
569
569
570
570
return nil
571
571
},
572
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
573
-
log.Info("got remote handle update event", "pdsHost", host.Host, "did", evt.Did, "handle", evt.Handle)
574
-
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
575
-
RepoHandle: evt,
576
-
}); err != nil {
577
-
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
578
-
}
579
-
*lastCursor = evt.Seq
580
-
581
-
if err := s.updateCursor(sub, *lastCursor); err != nil {
582
-
return fmt.Errorf("updating cursor: %w", err)
583
-
}
584
-
585
-
return nil
586
-
},
587
-
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error {
588
-
log.Info("got remote repo migrate event", "pdsHost", host.Host, "did", evt.Did, "migrateTo", evt.MigrateTo)
589
-
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
590
-
RepoMigrate: evt,
591
-
}); err != nil {
592
-
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
593
-
}
594
-
*lastCursor = evt.Seq
595
-
596
-
if err := s.updateCursor(sub, *lastCursor); err != nil {
597
-
return fmt.Errorf("updating cursor: %w", err)
598
-
}
599
-
600
-
return nil
601
-
},
602
-
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
603
-
log.Info("got remote repo tombstone event", "pdsHost", host.Host, "did", evt.Did)
604
-
if err := s.cb(context.TODO(), host, &events.XRPCStreamEvent{
605
-
RepoTombstone: evt,
606
-
}); err != nil {
607
-
log.Error("failed handling event", "host", host.Host, "seq", evt.Seq, "err", err)
608
-
}
609
-
*lastCursor = evt.Seq
610
-
611
-
if err := s.updateCursor(sub, *lastCursor); err != nil {
612
-
return fmt.Errorf("updating cursor: %w", err)
613
-
}
614
-
615
-
return nil
616
-
},
617
572
RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error {
618
573
log.Info("info event", "name", info.Name, "message", info.Message, "pdsHost", host.Host)
619
574
return nil
+65
-48
cmd/collectiondir/serve.go
+65
-48
cmd/collectiondir/serve.go
···
5
5
"context"
6
6
"encoding/csv"
7
7
"encoding/json"
8
+
"errors"
8
9
"fmt"
9
10
"log/slog"
10
11
"net"
11
12
"net/http"
13
+
_ "net/http/pprof"
12
14
"net/url"
13
15
"os"
14
16
"os/signal"
···
193
195
if err != nil {
194
196
return fmt.Errorf("lru init, %w", err)
195
197
}
196
-
cs.wg.Add(1)
197
-
go cs.ingestReceiver()
198
198
cs.log = log
199
199
cs.ctx = cctx.Context
200
200
cs.AdminToken = cctx.String("admin-token")
201
201
cs.ExepctedAuthHeader = "Bearer " + cs.AdminToken
202
+
cs.wg.Add(1)
203
+
go cs.ingestReceiver()
202
204
pebblePath := cctx.String("pebble")
203
205
cs.pcd = &PebbleCollectionDirectory{
204
206
log: cs.log,
···
215
217
}
216
218
}
217
219
cs.statsCacheFresh.L = &cs.statsCacheLock
218
-
errchan := make(chan error, 3)
219
-
apiAddr := cctx.String("api-listen")
220
+
221
+
apiServerEcho, err := cs.createApiServer(cctx.Context, cctx.String("api-listen"))
222
+
if err != nil {
223
+
return err
224
+
}
220
225
cs.wg.Add(1)
221
-
go func() {
222
-
errchan <- cs.StartApiServer(cctx.Context, apiAddr)
223
-
}()
224
-
metricsAddr := cctx.String("metrics-listen")
226
+
go func() { cs.StartApiServer(cctx.Context, apiServerEcho) }()
227
+
228
+
cs.createMetricsServer(cctx.String("metrics-listen"))
225
229
cs.wg.Add(1)
226
-
go func() {
227
-
errchan <- cs.StartMetricsServer(cctx.Context, metricsAddr)
228
-
}()
230
+
go func() { cs.StartMetricsServer(cctx.Context) }()
229
231
230
232
upstream := cctx.String("upstream")
231
233
if upstream != "" {
···
247
249
go cs.handleFirehose(fhevents)
248
250
}
249
251
250
-
select {
251
-
case <-signals:
252
-
log.Info("received shutdown signal")
253
-
go errchanlog(cs.log, "server error", errchan)
254
-
return cs.Shutdown()
255
-
case err := <-errchan:
256
-
if err != nil {
257
-
log.Error("server error", "err", err)
258
-
go errchanlog(cs.log, "server error", errchan)
259
-
return cs.Shutdown()
260
-
}
261
-
}
262
-
return nil
252
+
<-signals
253
+
log.Info("received shutdown signal")
254
+
return cs.Shutdown()
263
255
}
264
256
265
257
func (cs *collectionServer) openDau() error {
···
282
274
return nil
283
275
}
284
276
285
-
func errchanlog(log *slog.Logger, msg string, errchan <-chan error) {
286
-
for err := range errchan {
287
-
log.Error(msg, "err", err)
288
-
}
289
-
}
290
-
291
277
func (cs *collectionServer) Shutdown() error {
292
278
close(cs.shutdown)
293
-
go func() {
279
+
280
+
func() {
281
+
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
282
+
defer cancel()
283
+
294
284
cs.log.Info("metrics shutdown start")
295
-
sherr := cs.metricsServer.Shutdown(context.Background())
285
+
sherr := cs.metricsServer.Shutdown(ctx)
296
286
cs.log.Info("metrics shutdown", "err", sherr)
297
287
}()
298
-
cs.log.Info("api shutdown start...")
299
-
err := cs.apiServer.Shutdown(context.Background())
300
-
cs.log.Info("api shutdown, thread wait...", "err", err)
301
-
cs.wg.Wait()
288
+
289
+
func() {
290
+
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
291
+
defer cancel()
292
+
293
+
cs.log.Info("api shutdown start...")
294
+
err := cs.apiServer.Shutdown(ctx)
295
+
cs.log.Info("api shutdown, thread wait...", "err", err)
296
+
}()
297
+
302
298
cs.log.Info("threads done, db close...")
303
-
ee := cs.pcd.Close()
304
-
if ee != nil {
305
-
cs.log.Error("failed to shutdown pebble", "err", ee)
299
+
err := cs.pcd.Close()
300
+
if err != nil {
301
+
cs.log.Error("failed to shutdown pebble", "err", err)
306
302
}
307
303
cs.log.Info("db done. done.")
304
+
cs.wg.Wait()
308
305
return err
309
306
}
310
307
···
384
381
}
385
382
}
386
383
387
-
func (cs *collectionServer) StartMetricsServer(ctx context.Context, addr string) error {
388
-
defer cs.wg.Done()
389
-
defer cs.log.Info("metrics server exit")
384
+
func (cs *collectionServer) createMetricsServer(addr string) {
385
+
e := echo.New()
386
+
e.GET("/metrics", echo.WrapHandler(promhttp.Handler()))
387
+
e.Any("/debug/pprof/*", echo.WrapHandler(http.DefaultServeMux))
388
+
390
389
cs.metricsServer = &http.Server{
391
390
Addr: addr,
392
-
Handler: promhttp.Handler(),
391
+
Handler: e,
393
392
}
394
-
return cs.metricsServer.ListenAndServe()
395
393
}
396
394
397
-
func (cs *collectionServer) StartApiServer(ctx context.Context, addr string) error {
395
+
func (cs *collectionServer) StartMetricsServer(ctx context.Context) {
398
396
defer cs.wg.Done()
399
-
defer cs.log.Info("api server exit")
397
+
defer cs.log.Info("metrics server exit")
398
+
399
+
err := cs.metricsServer.ListenAndServe()
400
+
if err != nil && !errors.Is(err, http.ErrServerClosed) {
401
+
slog.Error("error in metrics server", "err", err)
402
+
os.Exit(1)
403
+
}
404
+
}
405
+
406
+
func (cs *collectionServer) createApiServer(ctx context.Context, addr string) (*echo.Echo, error) {
400
407
var lc net.ListenConfig
401
408
li, err := lc.Listen(ctx, "tcp", addr)
402
409
if err != nil {
403
-
return err
410
+
return nil, err
404
411
}
405
412
e := echo.New()
406
413
e.HideBanner = true
···
430
437
Handler: e,
431
438
}
432
439
cs.apiServer = srv
433
-
return srv.Serve(li)
440
+
return e, nil
441
+
}
442
+
443
+
func (cs *collectionServer) StartApiServer(ctx context.Context, e *echo.Echo) {
444
+
defer cs.wg.Done()
445
+
defer cs.log.Info("api server exit")
446
+
err := cs.apiServer.Serve(e.Listener)
447
+
if err != nil && !errors.Is(err, http.ErrServerClosed) {
448
+
slog.Error("error in api server", "err", err)
449
+
os.Exit(1)
450
+
}
434
451
}
435
452
436
453
const statsCacheDuration = time.Second * 300
-18
cmd/goat/firehose.go
-18
cmd/goat/firehose.go
···
189
189
}
190
190
return nil
191
191
},
192
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
193
-
if gfc.VerifyBasic {
194
-
slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq)
195
-
}
196
-
return nil
197
-
},
198
-
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error {
199
-
if gfc.VerifyBasic {
200
-
slog.Info("deprecated event type", "eventType", "migrate", "did", evt.Did, "seq", evt.Seq)
201
-
}
202
-
return nil
203
-
},
204
-
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
205
-
if gfc.VerifyBasic {
206
-
slog.Info("deprecated event type", "eventType", "handle", "did", evt.Did, "seq", evt.Seq)
207
-
}
208
-
return nil
209
-
},
210
192
}
211
193
212
194
scheduler := parallel.NewScheduler(
-16
cmd/gosky/debug.go
-16
cmd/gosky/debug.go
···
262
262
263
263
return nil
264
264
},
265
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
266
-
fmt.Printf("\rChecking seq: %d ", evt.Seq)
267
-
if lastSeq > 0 && evt.Seq != lastSeq+1 {
268
-
fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq)
269
-
}
270
-
lastSeq = evt.Seq
271
-
return nil
272
-
},
273
-
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error {
274
-
fmt.Printf("\rChecking seq: %d ", evt.Seq)
275
-
if lastSeq > 0 && evt.Seq != lastSeq+1 {
276
-
fmt.Println("Gap in sequence numbers: ", lastSeq, evt.Seq)
277
-
}
278
-
lastSeq = evt.Seq
279
-
return nil
280
-
},
281
265
RepoInfo: func(evt *comatproto.SyncSubscribeRepos_Info) error {
282
266
return nil
283
267
},
-41
cmd/gosky/main.go
-41
cmd/gosky/main.go
···
284
284
285
285
return nil
286
286
},
287
-
RepoMigrate: func(migrate *comatproto.SyncSubscribeRepos_Migrate) error {
288
-
if jsonfmt {
289
-
b, err := json.Marshal(migrate)
290
-
if err != nil {
291
-
return err
292
-
}
293
-
fmt.Println(string(b))
294
-
} else {
295
-
fmt.Printf("(%d) RepoMigrate: %s moving to: %s\n", migrate.Seq, migrate.Did, *migrate.MigrateTo)
296
-
}
297
-
298
-
return nil
299
-
},
300
-
RepoHandle: func(handle *comatproto.SyncSubscribeRepos_Handle) error {
301
-
if jsonfmt {
302
-
b, err := json.Marshal(handle)
303
-
if err != nil {
304
-
return err
305
-
}
306
-
fmt.Println(string(b))
307
-
} else {
308
-
fmt.Printf("(%d) RepoHandle: %s (changed to: %s)\n", handle.Seq, handle.Did, handle.Handle)
309
-
}
310
-
311
-
return nil
312
-
313
-
},
314
287
RepoInfo: func(info *comatproto.SyncSubscribeRepos_Info) error {
315
288
if jsonfmt {
316
289
b, err := json.Marshal(info)
···
323
296
}
324
297
325
298
return nil
326
-
},
327
-
RepoTombstone: func(tomb *comatproto.SyncSubscribeRepos_Tombstone) error {
328
-
if jsonfmt {
329
-
b, err := json.Marshal(tomb)
330
-
if err != nil {
331
-
return err
332
-
}
333
-
fmt.Println(string(b))
334
-
} else {
335
-
fmt.Printf("(%d) Tombstone: %s\n", tomb.Seq, tomb.Did)
336
-
}
337
-
338
-
return nil
339
-
340
299
},
341
300
// TODO: all the other event types
342
301
Error: func(errf *events.ErrorFrame) error {
-10
cmd/gosky/streamdiff.go
-10
cmd/gosky/streamdiff.go
···
127
127
return "ERROR"
128
128
case evt.RepoCommit != nil:
129
129
return "#commit"
130
-
case evt.RepoHandle != nil:
131
-
return "#handle"
132
130
case evt.RepoInfo != nil:
133
131
return "#info"
134
-
case evt.RepoMigrate != nil:
135
-
return "#migrate"
136
-
case evt.RepoTombstone != nil:
137
-
return "#tombstone"
138
132
default:
139
133
return "unknown"
140
134
}
···
157
151
if sameCommit(evt.RepoCommit, oe.RepoCommit) {
158
152
return i
159
153
}
160
-
case evt.RepoHandle != nil:
161
-
panic("not handling handle updates yet")
162
-
case evt.RepoMigrate != nil:
163
-
panic("not handling repo migrates yet")
164
154
default:
165
155
panic("unhandled event type: " + evtop)
166
156
}
+19
-5
cmd/lexgen/main.go
+19
-5
cmd/lexgen/main.go
···
64
64
&cli.StringFlag{
65
65
Name: "outdir",
66
66
},
67
-
&cli.StringFlag{
68
-
Name: "prefix",
69
-
},
70
67
&cli.BoolFlag{
71
68
Name: "gen-server",
72
69
},
···
75
72
},
76
73
&cli.StringSliceFlag{
77
74
Name: "types-import",
75
+
},
76
+
&cli.StringSliceFlag{
77
+
Name: "external-lexicons",
78
78
},
79
79
&cli.StringFlag{
80
80
Name: "package",
···
109
109
schemas = append(schemas, s)
110
110
}
111
111
112
+
externalPaths, err := expandArgs(cctx.StringSlice("external-lexicons"))
113
+
if err != nil {
114
+
return err
115
+
}
116
+
var externalSchemas []*lex.Schema
117
+
for _, arg := range externalPaths {
118
+
s, err := lex.ReadSchema(arg)
119
+
if err != nil {
120
+
return fmt.Errorf("failed to read file %q: %w", arg, err)
121
+
}
122
+
123
+
externalSchemas = append(externalSchemas, s)
124
+
}
125
+
112
126
buildLiteral := cctx.String("build")
113
127
buildPath := cctx.String("build-file")
114
128
var packages []lex.Package
···
145
159
if outdir == "" {
146
160
return fmt.Errorf("must specify output directory (--outdir)")
147
161
}
148
-
defmap := lex.BuildExtDefMap(schemas, packages)
162
+
defmap := lex.BuildExtDefMap(append(schemas, externalSchemas...), packages)
149
163
_ = defmap
150
164
151
165
paths := cctx.StringSlice("types-import")
···
162
176
}
163
177
164
178
} else {
165
-
return lex.Run(schemas, packages)
179
+
return lex.Run(schemas, externalSchemas, packages)
166
180
}
167
181
168
182
return nil
+1
-1
cmd/relay/handlers.go
+1
-1
cmd/relay/handlers.go
···
65
65
}
66
66
go s.ForwardSiblingRequest(c, b)
67
67
68
-
return s.relay.SubscribeToHost(ctx, hostname, noSSL, false)
68
+
return s.relay.SubscribeToHost(ctx, hostname, noSSL, admin)
69
69
}
70
70
71
71
func (s *Service) handleComAtprotoSyncListHosts(c echo.Context, cursor int64, limit int) (*comatproto.SyncListHosts_Output, error) {
-9
cmd/relay/relay/ingest.go
-9
cmd/relay/relay/ingest.go
···
43
43
case evt.RepoAccount != nil:
44
44
//repoAccountReceivedCounter.WithLabelValues(hostname).Add(1)
45
45
return r.processAccountEvent(ctx, evt.RepoAccount, hostname, hostID)
46
-
case evt.RepoHandle != nil: // DEPRECATED
47
-
eventsWarningsCounter.WithLabelValues(hostname, "handle").Add(1)
48
-
return nil
49
-
case evt.RepoMigrate != nil: // DEPRECATED
50
-
eventsWarningsCounter.WithLabelValues(hostname, "migrate").Add(1)
51
-
return nil
52
-
case evt.RepoTombstone != nil: // DEPRECATED
53
-
eventsWarningsCounter.WithLabelValues(hostname, "tombstone").Add(1)
54
-
return nil
55
46
default:
56
47
return fmt.Errorf("unhandled repo stream event type")
57
48
}
-27
cmd/relay/relay/slurper.go
-27
cmd/relay/relay/slurper.go
···
454
454
s.logger.Debug("info event", "name", info.Name, "message", info.Message, "host", sub.Hostname)
455
455
return nil
456
456
},
457
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error { // DEPRECATED
458
-
logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "handle")
459
-
logger.Debug("got remote handle update event", "handle", evt.Handle)
460
-
if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoHandle: evt}, sub.Hostname, sub.HostID); err != nil {
461
-
logger.Error("failed handling event", "err", err)
462
-
}
463
-
sub.UpdateSeq()
464
-
return nil
465
-
},
466
-
RepoMigrate: func(evt *comatproto.SyncSubscribeRepos_Migrate) error { // DEPRECATED
467
-
logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "migrate")
468
-
logger.Debug("got remote repo migrate event", "migrateTo", evt.MigrateTo)
469
-
if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoMigrate: evt}, sub.Hostname, sub.HostID); err != nil {
470
-
logger.Error("failed handling event", "err", err)
471
-
}
472
-
sub.UpdateSeq()
473
-
return nil
474
-
},
475
-
RepoTombstone: func(evt *comatproto.SyncSubscribeRepos_Tombstone) error { // DEPRECATED
476
-
logger := s.logger.With("host", sub.Hostname, "did", evt.Did, "seq", evt.Seq, "eventType", "tombstone")
477
-
logger.Debug("got remote repo tombstone event")
478
-
if err := s.processCallback(context.Background(), &stream.XRPCStreamEvent{RepoTombstone: evt}, sub.Hostname, sub.HostID); err != nil {
479
-
logger.Error("failed handling event", "err", err)
480
-
}
481
-
sub.UpdateSeq()
482
-
return nil
483
-
},
484
457
}
485
458
486
459
limiters := []*slidingwindow.Limiter{
+8
-71
cmd/relay/stream/consumer.go
+8
-71
cmd/relay/stream/consumer.go
···
18
18
const MaxMessageBytes = 5_000_000
19
19
20
20
type RepoStreamCallbacks struct {
21
-
RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error
22
-
RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error
23
-
RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error
24
-
RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error
25
-
RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error
26
-
RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error
27
-
RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error
28
-
RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error
29
-
LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error
30
-
LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error
31
-
Error func(evt *ErrorFrame) error
21
+
RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error
22
+
RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error
23
+
RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error
24
+
RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error
25
+
RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error
26
+
LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error
27
+
LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error
28
+
Error func(evt *ErrorFrame) error
32
29
}
33
30
34
31
func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error {
···
37
34
return rsc.RepoCommit(xev.RepoCommit)
38
35
case xev.RepoSync != nil && rsc.RepoSync != nil:
39
36
return rsc.RepoSync(xev.RepoSync)
40
-
case xev.RepoHandle != nil && rsc.RepoHandle != nil:
41
-
return rsc.RepoHandle(xev.RepoHandle)
42
37
case xev.RepoInfo != nil && rsc.RepoInfo != nil:
43
38
return rsc.RepoInfo(xev.RepoInfo)
44
-
case xev.RepoMigrate != nil && rsc.RepoMigrate != nil:
45
-
return rsc.RepoMigrate(xev.RepoMigrate)
46
39
case xev.RepoIdentity != nil && rsc.RepoIdentity != nil:
47
40
return rsc.RepoIdentity(xev.RepoIdentity)
48
41
case xev.RepoAccount != nil && rsc.RepoAccount != nil:
49
42
return rsc.RepoAccount(xev.RepoAccount)
50
-
case xev.RepoTombstone != nil && rsc.RepoTombstone != nil:
51
-
return rsc.RepoTombstone(xev.RepoTombstone)
52
43
case xev.LabelLabels != nil && rsc.LabelLabels != nil:
53
44
return rsc.LabelLabels(xev.LabelLabels)
54
45
case xev.LabelInfo != nil && rsc.LabelInfo != nil:
···
248
239
}); err != nil {
249
240
return err
250
241
}
251
-
case "#handle":
252
-
// TODO: DEPRECATED message; warning/counter; drop message
253
-
var evt comatproto.SyncSubscribeRepos_Handle
254
-
if err := evt.UnmarshalCBOR(r); err != nil {
255
-
return err
256
-
}
257
-
258
-
if evt.Seq <= lastSeq {
259
-
logger.Error("got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
260
-
continue
261
-
}
262
-
lastSeq = evt.Seq
263
-
264
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
265
-
RepoHandle: &evt,
266
-
}); err != nil {
267
-
return err
268
-
}
269
242
case "#identity":
270
243
var evt comatproto.SyncSubscribeRepos_Identity
271
244
if err := evt.UnmarshalCBOR(r); err != nil {
···
309
282
310
283
if err := sched.AddWork(ctx, "", &XRPCStreamEvent{
311
284
RepoInfo: &evt,
312
-
}); err != nil {
313
-
return err
314
-
}
315
-
case "#migrate":
316
-
// TODO: DEPRECATED message; warning/counter; drop message
317
-
var evt comatproto.SyncSubscribeRepos_Migrate
318
-
if err := evt.UnmarshalCBOR(r); err != nil {
319
-
return err
320
-
}
321
-
322
-
if evt.Seq <= lastSeq {
323
-
logger.Error("got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
324
-
continue
325
-
}
326
-
lastSeq = evt.Seq
327
-
328
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
329
-
RepoMigrate: &evt,
330
-
}); err != nil {
331
-
return err
332
-
}
333
-
case "#tombstone":
334
-
// TODO: DEPRECATED message; warning/counter; drop message
335
-
var evt comatproto.SyncSubscribeRepos_Tombstone
336
-
if err := evt.UnmarshalCBOR(r); err != nil {
337
-
return err
338
-
}
339
-
340
-
if evt.Seq <= lastSeq {
341
-
logger.Error("got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
342
-
continue
343
-
}
344
-
lastSeq = evt.Seq
345
-
346
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
347
-
RepoTombstone: &evt,
348
285
}); err != nil {
349
286
return err
350
287
}
+8
-53
cmd/relay/stream/events.go
+8
-53
cmd/relay/stream/events.go
···
23
23
}
24
24
25
25
type XRPCStreamEvent struct {
26
-
Error *ErrorFrame
27
-
RepoCommit *comatproto.SyncSubscribeRepos_Commit
28
-
RepoSync *comatproto.SyncSubscribeRepos_Sync
29
-
RepoHandle *comatproto.SyncSubscribeRepos_Handle // DEPRECATED
30
-
RepoIdentity *comatproto.SyncSubscribeRepos_Identity
31
-
RepoInfo *comatproto.SyncSubscribeRepos_Info
32
-
RepoMigrate *comatproto.SyncSubscribeRepos_Migrate // DEPRECATED
33
-
RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone // DEPRECATED
34
-
RepoAccount *comatproto.SyncSubscribeRepos_Account
35
-
LabelLabels *comatproto.LabelSubscribeLabels_Labels
36
-
LabelInfo *comatproto.LabelSubscribeLabels_Info
26
+
Error *ErrorFrame
27
+
RepoCommit *comatproto.SyncSubscribeRepos_Commit
28
+
RepoSync *comatproto.SyncSubscribeRepos_Sync
29
+
RepoIdentity *comatproto.SyncSubscribeRepos_Identity
30
+
RepoInfo *comatproto.SyncSubscribeRepos_Info
31
+
RepoAccount *comatproto.SyncSubscribeRepos_Account
32
+
LabelLabels *comatproto.LabelSubscribeLabels_Labels
33
+
LabelInfo *comatproto.LabelSubscribeLabels_Info
37
34
38
35
// some private fields for internal routing perf
39
36
PrivUid uint64 `json:"-" cborgen:"-"`
···
56
53
case evt.RepoSync != nil:
57
54
header.MsgType = "#sync"
58
55
obj = evt.RepoSync
59
-
case evt.RepoHandle != nil:
60
-
header.MsgType = "#handle"
61
-
obj = evt.RepoHandle
62
56
case evt.RepoIdentity != nil:
63
57
header.MsgType = "#identity"
64
58
obj = evt.RepoIdentity
···
68
62
case evt.RepoInfo != nil:
69
63
header.MsgType = "#info"
70
64
obj = evt.RepoInfo
71
-
case evt.RepoMigrate != nil:
72
-
header.MsgType = "#migrate"
73
-
obj = evt.RepoMigrate
74
-
case evt.RepoTombstone != nil:
75
-
header.MsgType = "#tombstone"
76
-
obj = evt.RepoTombstone
77
65
default:
78
66
return fmt.Errorf("unrecognized event kind")
79
67
}
···
105
93
return fmt.Errorf("reading repoSync event: %w", err)
106
94
}
107
95
xevt.RepoSync = &evt
108
-
case "#handle":
109
-
// TODO: DEPRECATED message; warning/counter; drop message
110
-
var evt comatproto.SyncSubscribeRepos_Handle
111
-
if err := evt.UnmarshalCBOR(r); err != nil {
112
-
return err
113
-
}
114
-
xevt.RepoHandle = &evt
115
96
case "#identity":
116
97
var evt comatproto.SyncSubscribeRepos_Identity
117
98
if err := evt.UnmarshalCBOR(r); err != nil {
···
131
112
return err
132
113
}
133
114
xevt.RepoInfo = &evt
134
-
case "#migrate":
135
-
// TODO: DEPRECATED message; warning/counter; drop message
136
-
var evt comatproto.SyncSubscribeRepos_Migrate
137
-
if err := evt.UnmarshalCBOR(r); err != nil {
138
-
return err
139
-
}
140
-
xevt.RepoMigrate = &evt
141
-
case "#tombstone":
142
-
// TODO: DEPRECATED message; warning/counter; drop message
143
-
var evt comatproto.SyncSubscribeRepos_Tombstone
144
-
if err := evt.UnmarshalCBOR(r); err != nil {
145
-
return err
146
-
}
147
-
xevt.RepoTombstone = &evt
148
115
case "#labels":
149
116
var evt comatproto.LabelSubscribeLabels_Labels
150
117
if err := evt.UnmarshalCBOR(r); err != nil {
···
193
160
return evt.RepoCommit.Seq
194
161
case evt.RepoSync != nil:
195
162
return evt.RepoSync.Seq
196
-
case evt.RepoHandle != nil:
197
-
return evt.RepoHandle.Seq
198
-
case evt.RepoMigrate != nil:
199
-
return evt.RepoMigrate.Seq
200
-
case evt.RepoTombstone != nil:
201
-
return evt.RepoTombstone.Seq
202
163
case evt.RepoIdentity != nil:
203
164
return evt.RepoIdentity.Seq
204
165
case evt.RepoAccount != nil:
···
220
181
return evt.RepoCommit.Seq, true
221
182
case evt.RepoSync != nil:
222
183
return evt.RepoSync.Seq, true
223
-
case evt.RepoHandle != nil:
224
-
return evt.RepoHandle.Seq, true
225
-
case evt.RepoMigrate != nil:
226
-
return evt.RepoMigrate.Seq, true
227
-
case evt.RepoTombstone != nil:
228
-
return evt.RepoTombstone.Seq, true
229
184
case evt.RepoIdentity != nil:
230
185
return evt.RepoIdentity.Seq, true
231
186
case evt.RepoAccount != nil:
-34
cmd/relay/stream/persist/diskpersist/diskpersist.go
-34
cmd/relay/stream/persist/diskpersist/diskpersist.go
···
484
484
pjob.Evt.RepoCommit.Seq = seq
485
485
case pjob.Evt.RepoSync != nil:
486
486
pjob.Evt.RepoSync.Seq = seq
487
-
case pjob.Evt.RepoHandle != nil:
488
-
pjob.Evt.RepoHandle.Seq = seq
489
487
case pjob.Evt.RepoIdentity != nil:
490
488
pjob.Evt.RepoIdentity.Seq = seq
491
489
case pjob.Evt.RepoAccount != nil:
492
490
pjob.Evt.RepoAccount.Seq = seq
493
-
case pjob.Evt.RepoTombstone != nil:
494
-
pjob.Evt.RepoTombstone.Seq = seq
495
491
default:
496
492
// only those three get peristed right now
497
493
// we should not actually ever get here...
···
547
543
if err := xevt.RepoSync.MarshalCBOR(cw); err != nil {
548
544
return fmt.Errorf("failed to marshal: %w", err)
549
545
}
550
-
case xevt.RepoHandle != nil:
551
-
evtKind = evtKindHandle
552
-
did = xevt.RepoHandle.Did
553
-
if err := xevt.RepoHandle.MarshalCBOR(cw); err != nil {
554
-
return fmt.Errorf("failed to marshal: %w", err)
555
-
}
556
546
case xevt.RepoIdentity != nil:
557
547
evtKind = evtKindIdentity
558
548
did = xevt.RepoIdentity.Did
···
563
553
evtKind = evtKindAccount
564
554
did = xevt.RepoAccount.Did
565
555
if err := xevt.RepoAccount.MarshalCBOR(cw); err != nil {
566
-
return fmt.Errorf("failed to marshal: %w", err)
567
-
}
568
-
case xevt.RepoTombstone != nil:
569
-
evtKind = evtKindTombstone
570
-
did = xevt.RepoTombstone.Did
571
-
if err := xevt.RepoTombstone.MarshalCBOR(cw); err != nil {
572
556
return fmt.Errorf("failed to marshal: %w", err)
573
557
}
574
558
default:
···
810
794
if err := cb(&stream.XRPCStreamEvent{RepoSync: &evt}); err != nil {
811
795
return nil, err
812
796
}
813
-
case evtKindHandle:
814
-
var evt atproto.SyncSubscribeRepos_Handle
815
-
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
816
-
return nil, err
817
-
}
818
-
evt.Seq = h.Seq
819
-
if err := cb(&stream.XRPCStreamEvent{RepoHandle: &evt}); err != nil {
820
-
return nil, err
821
-
}
822
797
case evtKindIdentity:
823
798
var evt atproto.SyncSubscribeRepos_Identity
824
799
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
···
835
810
}
836
811
evt.Seq = h.Seq
837
812
if err := cb(&stream.XRPCStreamEvent{RepoAccount: &evt}); err != nil {
838
-
return nil, err
839
-
}
840
-
case evtKindTombstone:
841
-
var evt atproto.SyncSubscribeRepos_Tombstone
842
-
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
843
-
return nil, err
844
-
}
845
-
evt.Seq = h.Seq
846
-
if err := cb(&stream.XRPCStreamEvent{RepoTombstone: &evt}); err != nil {
847
813
return nil, err
848
814
}
849
815
default:
-8
cmd/relay/testing/consumer.go
-8
cmd/relay/testing/consumer.go
···
62
62
c.LastSeq = evt.Seq
63
63
return nil
64
64
},
65
-
// NOTE: this is included to test that the events are *not* passed through; can be removed in the near future
66
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
67
-
c.eventsLk.Lock()
68
-
defer c.eventsLk.Unlock()
69
-
c.Events = append(c.Events, &stream.XRPCStreamEvent{RepoHandle: evt})
70
-
c.LastSeq = evt.Seq
71
-
return nil
72
-
},
73
65
}
74
66
return rsc
75
67
}
-36
cmd/sonar/sonar.go
-36
cmd/sonar/sonar.go
···
109
109
case xe.RepoCommit != nil:
110
110
eventsProcessedCounter.WithLabelValues("repo_commit", s.SocketURL).Inc()
111
111
return s.HandleRepoCommit(ctx, xe.RepoCommit)
112
-
case xe.RepoHandle != nil:
113
-
eventsProcessedCounter.WithLabelValues("repo_handle", s.SocketURL).Inc()
114
-
now := time.Now()
115
-
s.ProgMux.Lock()
116
-
s.Progress.LastSeq = xe.RepoHandle.Seq
117
-
s.Progress.LastSeqProcessedAt = now
118
-
s.ProgMux.Unlock()
119
-
// Parse time from the event time string
120
-
t, err := time.Parse(time.RFC3339, xe.RepoHandle.Time)
121
-
if err != nil {
122
-
s.Logger.Error("error parsing time", "err", err)
123
-
return nil
124
-
}
125
-
lastEvtCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
126
-
lastEvtProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano()))
127
-
lastEvtCreatedEvtProcessedGapGauge.WithLabelValues(s.SocketURL).Set(float64(now.Sub(t).Seconds()))
128
-
lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(xe.RepoHandle.Seq))
129
112
case xe.RepoIdentity != nil:
130
113
eventsProcessedCounter.WithLabelValues("identity", s.SocketURL).Inc()
131
114
now := time.Now()
···
142
125
s.ProgMux.Unlock()
143
126
case xe.RepoInfo != nil:
144
127
eventsProcessedCounter.WithLabelValues("repo_info", s.SocketURL).Inc()
145
-
case xe.RepoMigrate != nil:
146
-
eventsProcessedCounter.WithLabelValues("repo_migrate", s.SocketURL).Inc()
147
-
now := time.Now()
148
-
s.ProgMux.Lock()
149
-
s.Progress.LastSeq = xe.RepoMigrate.Seq
150
-
s.Progress.LastSeqProcessedAt = time.Now()
151
-
s.ProgMux.Unlock()
152
-
// Parse time from the event time string
153
-
t, err := time.Parse(time.RFC3339, xe.RepoMigrate.Time)
154
-
if err != nil {
155
-
s.Logger.Error("error parsing time", "err", err)
156
-
return nil
157
-
}
158
-
lastEvtCreatedAtGauge.WithLabelValues(s.SocketURL).Set(float64(t.UnixNano()))
159
-
lastEvtProcessedAtGauge.WithLabelValues(s.SocketURL).Set(float64(now.UnixNano()))
160
-
lastEvtCreatedEvtProcessedGapGauge.WithLabelValues(s.SocketURL).Set(float64(now.Sub(t).Seconds()))
161
-
lastSeqGauge.WithLabelValues(s.SocketURL).Set(float64(xe.RepoHandle.Seq))
162
-
case xe.RepoTombstone != nil:
163
-
eventsProcessedCounter.WithLabelValues("repo_tombstone", s.SocketURL).Inc()
164
128
case xe.LabelInfo != nil:
165
129
eventsProcessedCounter.WithLabelValues("label_info", s.SocketURL).Inc()
166
130
case xe.LabelLabels != nil:
-9
cmd/supercollider/main.go
-9
cmd/supercollider/main.go
···
338
338
case evt.RepoCommit != nil:
339
339
header.MsgType = "#commit"
340
340
obj = evt.RepoCommit
341
-
case evt.RepoHandle != nil:
342
-
header.MsgType = "#handle"
343
-
obj = evt.RepoHandle
344
341
case evt.RepoInfo != nil:
345
342
header.MsgType = "#info"
346
343
obj = evt.RepoInfo
347
-
case evt.RepoMigrate != nil:
348
-
header.MsgType = "#migrate"
349
-
obj = evt.RepoMigrate
350
-
case evt.RepoTombstone != nil:
351
-
header.MsgType = "#tombstone"
352
-
obj = evt.RepoTombstone
353
344
default:
354
345
logger.Error("unrecognized event kind")
355
346
continue
+8
-68
events/consumer.go
+8
-68
events/consumer.go
···
16
16
)
17
17
18
18
type RepoStreamCallbacks struct {
19
-
RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error
20
-
RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error
21
-
RepoHandle func(evt *comatproto.SyncSubscribeRepos_Handle) error
22
-
RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error
23
-
RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error
24
-
RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error
25
-
RepoMigrate func(evt *comatproto.SyncSubscribeRepos_Migrate) error
26
-
RepoTombstone func(evt *comatproto.SyncSubscribeRepos_Tombstone) error
27
-
LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error
28
-
LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error
29
-
Error func(evt *ErrorFrame) error
19
+
RepoCommit func(evt *comatproto.SyncSubscribeRepos_Commit) error
20
+
RepoSync func(evt *comatproto.SyncSubscribeRepos_Sync) error
21
+
RepoIdentity func(evt *comatproto.SyncSubscribeRepos_Identity) error
22
+
RepoAccount func(evt *comatproto.SyncSubscribeRepos_Account) error
23
+
RepoInfo func(evt *comatproto.SyncSubscribeRepos_Info) error
24
+
LabelLabels func(evt *comatproto.LabelSubscribeLabels_Labels) error
25
+
LabelInfo func(evt *comatproto.LabelSubscribeLabels_Info) error
26
+
Error func(evt *ErrorFrame) error
30
27
}
31
28
32
29
func (rsc *RepoStreamCallbacks) EventHandler(ctx context.Context, xev *XRPCStreamEvent) error {
···
35
32
return rsc.RepoCommit(xev.RepoCommit)
36
33
case xev.RepoSync != nil && rsc.RepoSync != nil:
37
34
return rsc.RepoSync(xev.RepoSync)
38
-
case xev.RepoHandle != nil && rsc.RepoHandle != nil:
39
-
return rsc.RepoHandle(xev.RepoHandle)
40
35
case xev.RepoInfo != nil && rsc.RepoInfo != nil:
41
36
return rsc.RepoInfo(xev.RepoInfo)
42
-
case xev.RepoMigrate != nil && rsc.RepoMigrate != nil:
43
-
return rsc.RepoMigrate(xev.RepoMigrate)
44
37
case xev.RepoIdentity != nil && rsc.RepoIdentity != nil:
45
38
return rsc.RepoIdentity(xev.RepoIdentity)
46
39
case xev.RepoAccount != nil && rsc.RepoAccount != nil:
47
40
return rsc.RepoAccount(xev.RepoAccount)
48
-
case xev.RepoTombstone != nil && rsc.RepoTombstone != nil:
49
-
return rsc.RepoTombstone(xev.RepoTombstone)
50
41
case xev.LabelLabels != nil && rsc.LabelLabels != nil:
51
42
return rsc.LabelLabels(xev.LabelLabels)
52
43
case xev.LabelInfo != nil && rsc.LabelInfo != nil:
···
241
232
}); err != nil {
242
233
return err
243
234
}
244
-
case "#handle":
245
-
// TODO: DEPRECATED message; warning/counter; drop message
246
-
var evt comatproto.SyncSubscribeRepos_Handle
247
-
if err := evt.UnmarshalCBOR(r); err != nil {
248
-
return err
249
-
}
250
-
251
-
if evt.Seq < lastSeq {
252
-
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
253
-
}
254
-
lastSeq = evt.Seq
255
-
256
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
257
-
RepoHandle: &evt,
258
-
}); err != nil {
259
-
return err
260
-
}
261
235
case "#identity":
262
236
var evt comatproto.SyncSubscribeRepos_Identity
263
237
if err := evt.UnmarshalCBOR(r); err != nil {
···
299
273
300
274
if err := sched.AddWork(ctx, "", &XRPCStreamEvent{
301
275
RepoInfo: &evt,
302
-
}); err != nil {
303
-
return err
304
-
}
305
-
case "#migrate":
306
-
// TODO: DEPRECATED message; warning/counter; drop message
307
-
var evt comatproto.SyncSubscribeRepos_Migrate
308
-
if err := evt.UnmarshalCBOR(r); err != nil {
309
-
return err
310
-
}
311
-
312
-
if evt.Seq < lastSeq {
313
-
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
314
-
}
315
-
lastSeq = evt.Seq
316
-
317
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
318
-
RepoMigrate: &evt,
319
-
}); err != nil {
320
-
return err
321
-
}
322
-
case "#tombstone":
323
-
// TODO: DEPRECATED message; warning/counter; drop message
324
-
var evt comatproto.SyncSubscribeRepos_Tombstone
325
-
if err := evt.UnmarshalCBOR(r); err != nil {
326
-
return err
327
-
}
328
-
329
-
if evt.Seq < lastSeq {
330
-
log.Error("Got events out of order from stream", "seq", evt.Seq, "prev", lastSeq)
331
-
}
332
-
lastSeq = evt.Seq
333
-
334
-
if err := sched.AddWork(ctx, evt.Did, &XRPCStreamEvent{
335
-
RepoTombstone: &evt,
336
276
}); err != nil {
337
277
return err
338
278
}
+51
-85
events/dbpersist/dbpersist.go
+51
-85
events/dbpersist/dbpersist.go
···
171
171
switch {
172
172
case e.RepoCommit != nil:
173
173
e.RepoCommit.Seq = int64(item.Seq)
174
-
case e.RepoHandle != nil:
175
-
e.RepoHandle.Seq = int64(item.Seq)
174
+
case e.RepoSync != nil:
175
+
e.RepoSync.Seq = int64(item.Seq)
176
176
case e.RepoIdentity != nil:
177
177
e.RepoIdentity.Seq = int64(item.Seq)
178
178
case e.RepoAccount != nil:
179
179
e.RepoAccount.Seq = int64(item.Seq)
180
-
case e.RepoTombstone != nil:
181
-
e.RepoTombstone.Seq = int64(item.Seq)
182
180
default:
183
181
return fmt.Errorf("unknown event type")
184
182
}
···
218
216
if err != nil {
219
217
return err
220
218
}
221
-
case e.RepoHandle != nil:
222
-
rer, err = p.RecordFromHandleChange(ctx, e.RepoHandle)
219
+
case e.RepoSync != nil:
220
+
rer, err = p.RecordFromRepoSync(ctx, e.RepoSync)
223
221
if err != nil {
224
222
return err
225
223
}
···
233
231
if err != nil {
234
232
return err
235
233
}
236
-
case e.RepoTombstone != nil:
237
-
rer, err = p.RecordFromTombstone(ctx, e.RepoTombstone)
238
-
if err != nil {
239
-
return err
240
-
}
241
234
default:
242
235
return nil
243
236
}
···
247
240
}
248
241
249
242
return nil
250
-
}
251
-
252
-
func (p *DbPersistence) RecordFromHandleChange(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Handle) (*RepoEventRecord, error) {
253
-
t, err := time.Parse(util.ISO8601, evt.Time)
254
-
if err != nil {
255
-
return nil, err
256
-
}
257
-
258
-
uid, err := p.uidForDid(ctx, evt.Did)
259
-
if err != nil {
260
-
return nil, err
261
-
}
262
-
263
-
return &RepoEventRecord{
264
-
Repo: uid,
265
-
Type: "repo_handle",
266
-
Time: t,
267
-
NewHandle: &evt.Handle,
268
-
}, nil
269
243
}
270
244
271
245
func (p *DbPersistence) RecordFromRepoIdentity(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Identity) (*RepoEventRecord, error) {
···
306
280
}, nil
307
281
}
308
282
309
-
func (p *DbPersistence) RecordFromTombstone(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Tombstone) (*RepoEventRecord, error) {
310
-
t, err := time.Parse(util.ISO8601, evt.Time)
311
-
if err != nil {
312
-
return nil, err
313
-
}
314
-
315
-
uid, err := p.uidForDid(ctx, evt.Did)
316
-
if err != nil {
317
-
return nil, err
318
-
}
319
-
320
-
return &RepoEventRecord{
321
-
Repo: uid,
322
-
Type: "repo_tombstone",
323
-
Time: t,
324
-
}, nil
325
-
}
326
-
327
283
func (p *DbPersistence) RecordFromRepoCommit(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Commit) (*RepoEventRecord, error) {
328
284
// TODO: hack hack hack
329
285
if len(evt.Ops) > 8192 {
···
371
327
return &rer, nil
372
328
}
373
329
330
+
func (p *DbPersistence) RecordFromRepoSync(ctx context.Context, evt *comatproto.SyncSubscribeRepos_Sync) (*RepoEventRecord, error) {
331
+
332
+
uid, err := p.uidForDid(ctx, evt.Did)
333
+
if err != nil {
334
+
return nil, err
335
+
}
336
+
337
+
t, err := time.Parse(util.ISO8601, evt.Time)
338
+
if err != nil {
339
+
return nil, err
340
+
}
341
+
342
+
rer := RepoEventRecord{
343
+
Repo: uid,
344
+
Type: "repo_sync",
345
+
Time: t,
346
+
Rev: evt.Rev,
347
+
}
348
+
349
+
return &rer, nil
350
+
}
351
+
374
352
func (p *DbPersistence) Playback(ctx context.Context, since int64, cb func(*events.XRPCStreamEvent) error) error {
375
353
pageSize := 1000
376
354
···
449
427
switch {
450
428
case record.Commit != nil:
451
429
streamEvent, err = p.hydrateCommit(ctx, record)
452
-
case record.NewHandle != nil:
453
-
streamEvent, err = p.hydrateHandleChange(ctx, record)
430
+
case record.Type == "repo_sync":
431
+
streamEvent, err = p.hydrateSyncEvent(ctx, record)
454
432
case record.Type == "repo_identity":
455
433
streamEvent, err = p.hydrateIdentityEvent(ctx, record)
456
434
case record.Type == "repo_account":
457
435
streamEvent, err = p.hydrateAccountEvent(ctx, record)
458
-
case record.Type == "repo_tombstone":
459
-
streamEvent, err = p.hydrateTombstone(ctx, record)
460
436
default:
461
437
err = fmt.Errorf("unknown event type: %s", record.Type)
462
438
}
···
519
495
return u.Did, nil
520
496
}
521
497
522
-
func (p *DbPersistence) hydrateHandleChange(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
523
-
if rer.NewHandle == nil {
524
-
return nil, fmt.Errorf("NewHandle is nil")
525
-
}
526
-
527
-
did, err := p.didForUid(ctx, rer.Repo)
528
-
if err != nil {
529
-
return nil, err
530
-
}
531
-
532
-
return &events.XRPCStreamEvent{
533
-
RepoHandle: &comatproto.SyncSubscribeRepos_Handle{
534
-
Did: did,
535
-
Handle: *rer.NewHandle,
536
-
Time: rer.Time.Format(util.ISO8601),
537
-
},
538
-
}, nil
539
-
}
540
-
541
498
func (p *DbPersistence) hydrateIdentityEvent(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
542
499
did, err := p.didForUid(ctx, rer.Repo)
543
500
if err != nil {
···
568
525
}, nil
569
526
}
570
527
571
-
func (p *DbPersistence) hydrateTombstone(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
572
-
did, err := p.didForUid(ctx, rer.Repo)
573
-
if err != nil {
574
-
return nil, err
575
-
}
576
-
577
-
return &events.XRPCStreamEvent{
578
-
RepoTombstone: &comatproto.SyncSubscribeRepos_Tombstone{
579
-
Did: did,
580
-
Time: rer.Time.Format(util.ISO8601),
581
-
},
582
-
}, nil
583
-
}
584
-
585
528
func (p *DbPersistence) hydrateCommit(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
586
529
if rer.Commit == nil {
587
530
return nil, fmt.Errorf("commit is nil")
···
637
580
}
638
581
639
582
return &events.XRPCStreamEvent{RepoCommit: out}, nil
583
+
}
584
+
585
+
func (p *DbPersistence) hydrateSyncEvent(ctx context.Context, rer *RepoEventRecord) (*events.XRPCStreamEvent, error) {
586
+
587
+
did, err := p.didForUid(ctx, rer.Repo)
588
+
if err != nil {
589
+
return nil, err
590
+
}
591
+
592
+
evt := &comatproto.SyncSubscribeRepos_Sync{
593
+
Seq: int64(rer.Seq),
594
+
Did: did,
595
+
Time: rer.Time.Format(util.ISO8601),
596
+
Rev: rer.Rev,
597
+
}
598
+
599
+
cs, err := p.readCarSlice(ctx, rer)
600
+
if err != nil {
601
+
return nil, fmt.Errorf("read car slice: %w", err)
602
+
}
603
+
evt.Blocks = cs
604
+
605
+
return &events.XRPCStreamEvent{RepoSync: evt}, nil
640
606
}
641
607
642
608
func (p *DbPersistence) readCarSlice(ctx context.Context, rer *RepoEventRecord) ([]byte, error) {
+10
-26
events/diskpersist/diskpersist.go
+10
-26
events/diskpersist/diskpersist.go
···
282
282
evtKindTombstone = 3
283
283
evtKindIdentity = 4
284
284
evtKindAccount = 5
285
+
evtKindSync = 6
285
286
)
286
287
287
288
var emptyHeader = make([]byte, headerSize)
···
455
456
switch {
456
457
case e.RepoCommit != nil:
457
458
e.RepoCommit.Seq = seq
458
-
case e.RepoHandle != nil:
459
-
e.RepoHandle.Seq = seq
459
+
case e.RepoSync != nil:
460
+
e.RepoSync.Seq = seq
460
461
case e.RepoIdentity != nil:
461
462
e.RepoIdentity.Seq = seq
462
463
case e.RepoAccount != nil:
463
464
e.RepoAccount.Seq = seq
464
-
case e.RepoTombstone != nil:
465
-
e.RepoTombstone.Seq = seq
466
465
default:
467
466
// only those three get peristed right now
468
467
// we should not actually ever get here...
···
509
508
if err := e.RepoCommit.MarshalCBOR(cw); err != nil {
510
509
return fmt.Errorf("failed to marshal: %w", err)
511
510
}
512
-
case e.RepoHandle != nil:
513
-
evtKind = evtKindHandle
514
-
did = e.RepoHandle.Did
515
-
if err := e.RepoHandle.MarshalCBOR(cw); err != nil {
511
+
case e.RepoSync != nil:
512
+
evtKind = evtKindSync
513
+
did = e.RepoSync.Did
514
+
if err := e.RepoSync.MarshalCBOR(cw); err != nil {
516
515
return fmt.Errorf("failed to marshal: %w", err)
517
516
}
518
517
case e.RepoIdentity != nil:
···
527
526
if err := e.RepoAccount.MarshalCBOR(cw); err != nil {
528
527
return fmt.Errorf("failed to marshal: %w", err)
529
528
}
530
-
case e.RepoTombstone != nil:
531
-
evtKind = evtKindTombstone
532
-
did = e.RepoTombstone.Did
533
-
if err := e.RepoTombstone.MarshalCBOR(cw); err != nil {
534
-
return fmt.Errorf("failed to marshal: %w", err)
535
-
}
536
529
default:
537
530
return nil
538
531
// only those two get peristed right now
···
745
738
if err := cb(&events.XRPCStreamEvent{RepoCommit: &evt}); err != nil {
746
739
return nil, err
747
740
}
748
-
case evtKindHandle:
749
-
var evt atproto.SyncSubscribeRepos_Handle
741
+
case evtKindSync:
742
+
var evt atproto.SyncSubscribeRepos_Sync
750
743
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
751
744
return nil, err
752
745
}
753
746
evt.Seq = h.Seq
754
-
if err := cb(&events.XRPCStreamEvent{RepoHandle: &evt}); err != nil {
747
+
if err := cb(&events.XRPCStreamEvent{RepoSync: &evt}); err != nil {
755
748
return nil, err
756
749
}
757
750
case evtKindIdentity:
···
770
763
}
771
764
evt.Seq = h.Seq
772
765
if err := cb(&events.XRPCStreamEvent{RepoAccount: &evt}); err != nil {
773
-
return nil, err
774
-
}
775
-
case evtKindTombstone:
776
-
var evt atproto.SyncSubscribeRepos_Tombstone
777
-
if err := evt.UnmarshalCBOR(io.LimitReader(bufr, h.Len64())); err != nil {
778
-
return nil, err
779
-
}
780
-
evt.Seq = h.Seq
781
-
if err := cb(&events.XRPCStreamEvent{RepoTombstone: &evt}); err != nil {
782
766
return nil, err
783
767
}
784
768
default:
+8
-53
events/events.go
+8
-53
events/events.go
···
187
187
}
188
188
189
189
type XRPCStreamEvent struct {
190
-
Error *ErrorFrame
191
-
RepoCommit *comatproto.SyncSubscribeRepos_Commit
192
-
RepoSync *comatproto.SyncSubscribeRepos_Sync
193
-
RepoHandle *comatproto.SyncSubscribeRepos_Handle // DEPRECATED
194
-
RepoIdentity *comatproto.SyncSubscribeRepos_Identity
195
-
RepoInfo *comatproto.SyncSubscribeRepos_Info
196
-
RepoMigrate *comatproto.SyncSubscribeRepos_Migrate // DEPRECATED
197
-
RepoTombstone *comatproto.SyncSubscribeRepos_Tombstone // DEPRECATED
198
-
RepoAccount *comatproto.SyncSubscribeRepos_Account
199
-
LabelLabels *comatproto.LabelSubscribeLabels_Labels
200
-
LabelInfo *comatproto.LabelSubscribeLabels_Info
190
+
Error *ErrorFrame
191
+
RepoCommit *comatproto.SyncSubscribeRepos_Commit
192
+
RepoSync *comatproto.SyncSubscribeRepos_Sync
193
+
RepoIdentity *comatproto.SyncSubscribeRepos_Identity
194
+
RepoInfo *comatproto.SyncSubscribeRepos_Info
195
+
RepoAccount *comatproto.SyncSubscribeRepos_Account
196
+
LabelLabels *comatproto.LabelSubscribeLabels_Labels
197
+
LabelInfo *comatproto.LabelSubscribeLabels_Info
201
198
202
199
// some private fields for internal routing perf
203
200
PrivUid models.Uid `json:"-" cborgen:"-"`
···
220
217
case evt.RepoSync != nil:
221
218
header.MsgType = "#sync"
222
219
obj = evt.RepoSync
223
-
case evt.RepoHandle != nil:
224
-
header.MsgType = "#handle"
225
-
obj = evt.RepoHandle
226
220
case evt.RepoIdentity != nil:
227
221
header.MsgType = "#identity"
228
222
obj = evt.RepoIdentity
···
232
226
case evt.RepoInfo != nil:
233
227
header.MsgType = "#info"
234
228
obj = evt.RepoInfo
235
-
case evt.RepoMigrate != nil:
236
-
header.MsgType = "#migrate"
237
-
obj = evt.RepoMigrate
238
-
case evt.RepoTombstone != nil:
239
-
header.MsgType = "#tombstone"
240
-
obj = evt.RepoTombstone
241
229
default:
242
230
return fmt.Errorf("unrecognized event kind")
243
231
}
···
269
257
return fmt.Errorf("reading repoSync event: %w", err)
270
258
}
271
259
xevt.RepoSync = &evt
272
-
case "#handle":
273
-
// TODO: DEPRECATED message; warning/counter; drop message
274
-
var evt comatproto.SyncSubscribeRepos_Handle
275
-
if err := evt.UnmarshalCBOR(r); err != nil {
276
-
return err
277
-
}
278
-
xevt.RepoHandle = &evt
279
260
case "#identity":
280
261
var evt comatproto.SyncSubscribeRepos_Identity
281
262
if err := evt.UnmarshalCBOR(r); err != nil {
···
295
276
return err
296
277
}
297
278
xevt.RepoInfo = &evt
298
-
case "#migrate":
299
-
// TODO: DEPRECATED message; warning/counter; drop message
300
-
var evt comatproto.SyncSubscribeRepos_Migrate
301
-
if err := evt.UnmarshalCBOR(r); err != nil {
302
-
return err
303
-
}
304
-
xevt.RepoMigrate = &evt
305
-
case "#tombstone":
306
-
// TODO: DEPRECATED message; warning/counter; drop message
307
-
var evt comatproto.SyncSubscribeRepos_Tombstone
308
-
if err := evt.UnmarshalCBOR(r); err != nil {
309
-
return err
310
-
}
311
-
xevt.RepoTombstone = &evt
312
279
case "#labels":
313
280
var evt comatproto.LabelSubscribeLabels_Labels
314
281
if err := evt.UnmarshalCBOR(r); err != nil {
···
476
443
return evt.RepoCommit.Seq
477
444
case evt.RepoSync != nil:
478
445
return evt.RepoSync.Seq
479
-
case evt.RepoHandle != nil:
480
-
return evt.RepoHandle.Seq
481
-
case evt.RepoMigrate != nil:
482
-
return evt.RepoMigrate.Seq
483
-
case evt.RepoTombstone != nil:
484
-
return evt.RepoTombstone.Seq
485
446
case evt.RepoIdentity != nil:
486
447
return evt.RepoIdentity.Seq
487
448
case evt.RepoAccount != nil:
···
503
464
return evt.RepoCommit.Seq, true
504
465
case evt.RepoSync != nil:
505
466
return evt.RepoSync.Seq, true
506
-
case evt.RepoHandle != nil:
507
-
return evt.RepoHandle.Seq, true
508
-
case evt.RepoMigrate != nil:
509
-
return evt.RepoMigrate.Seq, true
510
-
case evt.RepoTombstone != nil:
511
-
return evt.RepoTombstone.Seq, true
512
467
case evt.RepoIdentity != nil:
513
468
return evt.RepoIdentity.Seq, true
514
469
case evt.RepoAccount != nil:
-6
events/persist.go
-6
events/persist.go
···
42
42
switch {
43
43
case e.RepoCommit != nil:
44
44
e.RepoCommit.Seq = mp.seq
45
-
case e.RepoHandle != nil:
46
-
e.RepoHandle.Seq = mp.seq
47
45
case e.RepoIdentity != nil:
48
46
e.RepoIdentity.Seq = mp.seq
49
47
case e.RepoAccount != nil:
50
48
e.RepoAccount.Seq = mp.seq
51
-
case e.RepoMigrate != nil:
52
-
e.RepoMigrate.Seq = mp.seq
53
-
case e.RepoTombstone != nil:
54
-
e.RepoTombstone.Seq = mp.seq
55
49
case e.LabelLabels != nil:
56
50
e.LabelLabels.Seq = mp.seq
57
51
default:
+2
-6
events/yolopersist/yolopersist.go
+2
-6
events/yolopersist/yolopersist.go
···
28
28
switch {
29
29
case e.RepoCommit != nil:
30
30
e.RepoCommit.Seq = yp.seq
31
-
case e.RepoHandle != nil:
32
-
e.RepoHandle.Seq = yp.seq
31
+
case e.RepoSync != nil:
32
+
e.RepoSync.Seq = yp.seq
33
33
case e.RepoIdentity != nil:
34
34
e.RepoIdentity.Seq = yp.seq
35
35
case e.RepoAccount != nil:
36
36
e.RepoAccount.Seq = yp.seq
37
-
case e.RepoMigrate != nil:
38
-
e.RepoMigrate.Seq = yp.seq
39
-
case e.RepoTombstone != nil:
40
-
e.RepoTombstone.Seq = yp.seq
41
37
case e.LabelLabels != nil:
42
38
e.LabelLabels.Seq = yp.seq
43
39
default:
-3
gen/main.go
-3
gen/main.go
···
98
98
atproto.RepoStrongRef{},
99
99
atproto.SyncSubscribeRepos_Commit{},
100
100
atproto.SyncSubscribeRepos_Sync{},
101
-
atproto.SyncSubscribeRepos_Handle{},
102
101
atproto.SyncSubscribeRepos_Identity{},
103
102
atproto.SyncSubscribeRepos_Account{},
104
103
atproto.SyncSubscribeRepos_Info{},
105
-
atproto.SyncSubscribeRepos_Migrate{},
106
104
atproto.SyncSubscribeRepos_RepoOp{},
107
-
atproto.SyncSubscribeRepos_Tombstone{},
108
105
atproto.LabelDefs_SelfLabels{},
109
106
atproto.LabelDefs_SelfLabel{},
110
107
atproto.LabelDefs_Label{},
+2
-3
lex/gen.go
+2
-3
lex/gen.go
···
135
135
pf("\t\"fmt\"\n")
136
136
pf("\t\"encoding/json\"\n")
137
137
pf("\tcbg \"github.com/whyrusleeping/cbor-gen\"\n")
138
-
pf("\t\"github.com/bluesky-social/indigo/xrpc\"\n")
139
138
pf("\t\"github.com/bluesky-social/indigo/lex/util\"\n")
140
139
for _, xpkg := range packages {
141
140
if xpkg.Prefix != pkg.Prefix {
···
468
467
return packages, nil
469
468
}
470
469
471
-
func Run(schemas []*Schema, packages []Package) error {
472
-
defmap := BuildExtDefMap(schemas, packages)
470
+
func Run(schemas []*Schema, externalSchemas []*Schema, packages []Package) error {
471
+
defmap := BuildExtDefMap(append(schemas, externalSchemas...), packages)
473
472
474
473
for _, pkg := range packages {
475
474
prefix := pkg.Prefix
+4
-4
lex/type_schema.go
+4
-4
lex/type_schema.go
···
55
55
pf := printerf(w)
56
56
fname := typename
57
57
58
-
params := "ctx context.Context, c *xrpc.Client"
58
+
params := "ctx context.Context, c util.LexClient"
59
59
inpvar := "nil"
60
60
inpenc := ""
61
61
···
174
174
var reqtype string
175
175
switch s.Type {
176
176
case "procedure":
177
-
reqtype = "xrpc.Procedure"
177
+
reqtype = "util.Procedure"
178
178
case "query":
179
-
reqtype = "xrpc.Query"
179
+
reqtype = "util.Query"
180
180
default:
181
181
return fmt.Errorf("can only generate RPC for Query or Procedure (got %s)", s.Type)
182
182
}
183
183
184
-
pf("\tif err := c.Do(ctx, %s, %q, \"%s\", %s, %s, %s); err != nil {\n", reqtype, inpenc, s.id, queryparams, inpvar, outvar)
184
+
pf("\tif err := c.LexDo(ctx, %s, %q, \"%s\", %s, %s, %s); err != nil {\n", reqtype, inpenc, s.id, queryparams, inpvar, outvar)
185
185
pf("\t\treturn %s\n", errRet)
186
186
pf("\t}\n\n")
187
187
pf("\treturn %s\n", outRet)
+18
lex/util/client.go
+18
lex/util/client.go
···
1
+
package util
2
+
3
+
import (
4
+
"context"
5
+
"net/http"
6
+
)
7
+
8
+
const (
9
+
Query = http.MethodGet
10
+
Procedure = http.MethodPost
11
+
)
12
+
13
+
// API client interface used in lexgen.
14
+
//
15
+
// 'method' is the HTTP method type. 'inputEncoding' is the Content-Type for bodyData in Procedure calls. 'params' are query parameters. 'bodyData' should be either 'nil', an [io.Reader], or a type which can be marshalled to JSON. 'out' is optional; if not nil it should be a pointer to a type which can be un-Marshaled as JSON, for the response body.
16
+
type LexClient interface {
17
+
LexDo(ctx context.Context, method string, inputEncoding string, endpoint string, params map[string]any, bodyData any, out any) error
18
+
}
-20
pds/server.go
-20
pds/server.go
···
598
598
case evt.RepoCommit != nil:
599
599
header.MsgType = "#commit"
600
600
obj = evt.RepoCommit
601
-
case evt.RepoHandle != nil:
602
-
header.MsgType = "#handle"
603
-
obj = evt.RepoHandle
604
601
case evt.RepoIdentity != nil:
605
602
header.MsgType = "#identity"
606
603
obj = evt.RepoIdentity
···
610
607
case evt.RepoInfo != nil:
611
608
header.MsgType = "#info"
612
609
obj = evt.RepoInfo
613
-
case evt.RepoMigrate != nil:
614
-
header.MsgType = "#migrate"
615
-
obj = evt.RepoMigrate
616
-
case evt.RepoTombstone != nil:
617
-
header.MsgType = "#tombstone"
618
-
obj = evt.RepoTombstone
619
610
default:
620
611
return fmt.Errorf("unrecognized event kind")
621
612
}
···
660
651
return fmt.Errorf("failed to update handle: %w", err)
661
652
}
662
653
663
-
if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
664
-
RepoHandle: &comatproto.SyncSubscribeRepos_Handle{
665
-
Did: u.Did,
666
-
Handle: handle,
667
-
Time: time.Now().Format(util.ISO8601),
668
-
},
669
-
}); err != nil {
670
-
return fmt.Errorf("failed to push event: %s", err)
671
-
}
672
-
673
-
// Also push an Identity event
674
654
if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
675
655
RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{
676
656
Did: u.Did,
-16
search/firehose.go
-16
search/firehose.go
···
115
115
return nil
116
116
117
117
},
118
-
RepoHandle: func(evt *comatproto.SyncSubscribeRepos_Handle) error {
119
-
ctx := context.Background()
120
-
ctx, span := tracer.Start(ctx, "RepoHandle")
121
-
defer span.End()
122
-
123
-
did, err := syntax.ParseDID(evt.Did)
124
-
if err != nil {
125
-
idx.logger.Error("bad DID in RepoHandle event", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
126
-
return nil
127
-
}
128
-
if err := idx.updateUserHandle(ctx, did, evt.Handle); err != nil {
129
-
// TODO: handle this case (instead of return nil)
130
-
idx.logger.Error("failed to update user handle", "did", evt.Did, "handle", evt.Handle, "seq", evt.Seq, "err", err)
131
-
}
132
-
return nil
133
-
},
134
118
}
135
119
136
120
return events.HandleRepoStream(
-2
testing/integ_test.go
-2
testing/integ_test.go
-7
testing/utils.go
-7
testing/utils.go
···
666
666
es.Lk.Unlock()
667
667
return nil
668
668
},
669
-
RepoHandle: func(evt *atproto.SyncSubscribeRepos_Handle) error {
670
-
fmt.Println("received handle event: ", evt.Seq, evt.Did)
671
-
es.Lk.Lock()
672
-
es.Events = append(es.Events, &events.XRPCStreamEvent{RepoHandle: evt})
673
-
es.Lk.Unlock()
674
-
return nil
675
-
},
676
669
RepoIdentity: func(evt *atproto.SyncSubscribeRepos_Identity) error {
677
670
fmt.Println("received identity event: ", evt.Seq, evt.Did)
678
671
es.Lk.Lock()
+10
-8
xrpc/xrpc.go
+10
-8
xrpc/xrpc.go
···
34
34
return c.Client
35
35
}
36
36
37
-
type XRPCRequestType int
37
+
var (
38
+
Query = http.MethodGet
39
+
Procedure = http.MethodPost
40
+
)
38
41
39
42
type AuthInfo struct {
40
43
AccessJwt string `json:"accessJwt"`
···
110
113
Reset time.Time
111
114
}
112
115
113
-
const (
114
-
Query = XRPCRequestType(iota)
115
-
Procedure
116
-
)
117
-
118
116
// makeParams converts a map of string keys and any values into a URL-encoded string.
119
117
// If a value is a slice of strings, it will be joined with commas.
120
118
// Generally the values will be strings, numbers, booleans, or slices of strings
···
133
131
return params.Encode()
134
132
}
135
133
136
-
func (c *Client) Do(ctx context.Context, kind XRPCRequestType, inpenc string, method string, params map[string]interface{}, bodyobj interface{}, out interface{}) error {
134
+
func (c *Client) Do(ctx context.Context, kind string, inpenc string, method string, params map[string]interface{}, bodyobj interface{}, out interface{}) error {
137
135
var body io.Reader
138
136
if bodyobj != nil {
139
137
if rr, ok := bodyobj.(io.Reader); ok {
···
155
153
case Procedure:
156
154
m = "POST"
157
155
default:
158
-
return fmt.Errorf("unsupported request kind: %d", kind)
156
+
return fmt.Errorf("unsupported request kind: %s", kind)
159
157
}
160
158
161
159
var paramStr string
···
227
225
228
226
return nil
229
227
}
228
+
229
+
func (c *Client) LexDo(ctx context.Context, method string, inputEncoding string, endpoint string, params map[string]any, bodyData any, out any) error {
230
+
return c.Do(ctx, method, inputEncoding, endpoint, params, bodyData, out)
231
+
}