tangled
alpha
login
or
join now
stream.place
/
streamplace
Live video on the AT Protocol
74
fork
atom
overview
issues
1
pulls
pipelines
iroh: working KV replication!
Eli Mallon
4 months ago
03d34c60
504fd255
+217
-122
10 changed files
expand all
collapse all
unified
split
.vscode
settings.json
pkg
cmd
streamplace.go
config
config.go
director
director.go
iroh
generated
iroh_streamplace
iroh_streamplace.go
replication
iroh_replicator
iroh.go
kv.go
rust
iroh-streamplace
Makefile
go
go.mod
main.go
+2
-2
.vscode/settings.json
···
10
10
},
11
11
"mesonbuild.configureOnOpen": false,
12
12
"cSpell.words": ["Devplace", "streamplace", "webrtc"],
13
13
-
"go.lintTool": "golangci-lint",
13
13
+
"go.lintTool": "golangci-lint-v2",
14
14
"go.lintFlags": ["--path-mode=abs"],
15
15
"go.formatTool": "custom",
16
16
"go.alternateTools": {
17
17
-
"customFormatter": "golangci-lint"
17
17
+
"customFormatter": "golangci-lint-v2"
18
18
},
19
19
"go.formatFlags": ["fmt", "--stdin"],
20
20
"editor.codeActionsOnSave": {
+34
-1
pkg/cmd/streamplace.go
···
1
1
package cmd
2
2
3
3
import (
4
4
+
"bytes"
4
5
"context"
5
6
"crypto"
7
7
+
"crypto/rand"
6
8
"errors"
7
9
"flag"
8
10
"fmt"
···
32
34
"stream.place/streamplace/pkg/notifications"
33
35
"stream.place/streamplace/pkg/replication"
34
36
"stream.place/streamplace/pkg/replication/boring"
37
37
+
"stream.place/streamplace/pkg/replication/iroh_replicator"
35
38
"stream.place/streamplace/pkg/rtmps"
36
39
v0 "stream.place/streamplace/pkg/schema/v0"
37
40
"stream.place/streamplace/pkg/spmetrics"
···
379
382
},
380
383
}
381
384
385
385
+
exists, err := cli.DataFileExists([]string{"iroh-kv-secret"})
386
386
+
if err != nil {
387
387
+
return err
388
388
+
}
389
389
+
if !exists {
390
390
+
secret := make([]byte, 32)
391
391
+
_, err := rand.Read(secret)
392
392
+
if err != nil {
393
393
+
return fmt.Errorf("failed to generate random secret: %w", err)
394
394
+
}
395
395
+
err = cli.DataFileWrite([]string{"iroh-kv-secret"}, bytes.NewReader(secret), true)
396
396
+
if err != nil {
397
397
+
return err
398
398
+
}
399
399
+
}
400
400
+
buf := bytes.Buffer{}
401
401
+
err = cli.DataFileRead([]string{"iroh-kv-secret"}, &buf)
402
402
+
if err != nil {
403
403
+
return err
404
404
+
}
405
405
+
secret := buf.Bytes()
406
406
+
swarm, err := iroh_replicator.StartKV(ctx, cli.Tickets, secret)
407
407
+
if err != nil {
408
408
+
return err
409
409
+
}
410
410
+
382
411
op := oatproxy.New(&oatproxy.Config{
383
412
Host: cli.PublicHost,
384
413
CreateOAuthSession: state.CreateOAuthSession,
···
390
419
DownstreamJWK: cli.AccessJWK,
391
420
ClientMetadata: clientMetadata,
392
421
})
393
393
-
d := director.NewDirector(mm, mod, &cli, b, op, state)
422
422
+
d := director.NewDirector(mm, mod, &cli, b, op, state, swarm)
394
423
a, err := api.MakeStreamplaceAPI(&cli, mod, state, eip712signer, noter, mm, ms, b, atsync, d, op)
395
424
if err != nil {
396
425
return err
···
451
480
452
481
group.Go(func() error {
453
482
return mod.StartSegmentCleaner(ctx)
483
483
+
})
484
484
+
485
485
+
group.Go(func() error {
486
486
+
return swarm.Start(ctx, cli.Tickets)
454
487
})
455
488
456
489
if cli.LivepeerGateway {
+2
pkg/config/config.go
···
118
118
SQLLogging bool
119
119
SentryDSN string
120
120
LivepeerDebug bool
121
121
+
Tickets []string
121
122
}
122
123
123
124
func (cli *CLI) NewFlagSet(name string) *flag.FlagSet {
···
184
185
fs.BoolVar(&cli.SQLLogging, "sql-logging", false, "enable sql logging")
185
186
fs.StringVar(&cli.SentryDSN, "sentry-dsn", "", "sentry dsn for error reporting")
186
187
fs.BoolVar(&cli.LivepeerDebug, "livepeer-debug", false, "log livepeer segments to $SP_DATA_DIR/livepeer-debug")
188
188
+
cli.StringSliceFlag(fs, &cli.Tickets, "tickets", "[]", "tickets to join the swarm with")
187
189
188
190
lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError)
189
191
_ = starter.NewLivepeerConfig(lpFlags)
+11
-1
pkg/director/director.go
···
5
5
"fmt"
6
6
"sync"
7
7
8
8
+
"github.com/bluesky-social/indigo/util"
8
9
"github.com/streamplace/oatproxy/pkg/oatproxy"
9
10
"golang.org/x/sync/errgroup"
10
11
"stream.place/streamplace/pkg/bus"
···
12
13
"stream.place/streamplace/pkg/log"
13
14
"stream.place/streamplace/pkg/media"
14
15
"stream.place/streamplace/pkg/model"
16
16
+
"stream.place/streamplace/pkg/replication/iroh_replicator"
15
17
"stream.place/streamplace/pkg/statedb"
16
18
)
17
19
···
30
32
streamSessionsMu sync.Mutex
31
33
op *oatproxy.OATProxy
32
34
statefulDB *statedb.StatefulDB
35
35
+
swarm *iroh_replicator.SwarmKV
33
36
}
34
37
35
35
-
func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB) *Director {
38
38
+
func NewDirector(mm *media.MediaManager, mod model.Model, cli *config.CLI, bus *bus.Bus, op *oatproxy.OATProxy, statefulDB *statedb.StatefulDB, swarm *iroh_replicator.SwarmKV) *Director {
36
39
return &Director{
37
40
mm: mm,
38
41
mod: mod,
···
42
45
streamSessionsMu: sync.Mutex{},
43
46
op: op,
44
47
statefulDB: statefulDB,
48
48
+
swarm: swarm,
45
49
}
46
50
}
47
51
···
86
90
})
87
91
}
88
92
d.streamSessionsMu.Unlock()
93
93
+
go func() {
94
94
+
err := d.swarm.Put(ctx, not.Segment.RepoDID, not.Segment.StartTime.Format(util.ISO8601))
95
95
+
if err != nil {
96
96
+
log.Error(ctx, "could not put segment to swarm", "error", err)
97
97
+
}
98
98
+
}()
89
99
err := ss.NewSegment(ctx, not)
90
100
if err != nil {
91
101
log.Error(ctx, "could not add segment to stream session", "error", err)
+68
pkg/iroh/generated/iroh_streamplace/iroh_streamplace.go
···
1440
1440
},
1441
1441
)
1442
1442
1443
1443
+
if err == nil {
1444
1444
+
return res, nil
1445
1445
+
}
1446
1446
+
1443
1447
return res, err
1444
1448
}
1445
1449
···
1555
1559
C.ffi_iroh_streamplace_rust_future_free_pointer(handle)
1556
1560
},
1557
1561
)
1562
1562
+
1563
1563
+
if err == nil {
1564
1564
+
return res, nil
1565
1565
+
}
1558
1566
1559
1567
return res, err
1560
1568
}
···
1961
1969
},
1962
1970
)
1963
1971
1972
1972
+
if err == nil {
1973
1973
+
return res, nil
1974
1974
+
}
1975
1975
+
1964
1976
return res, err
1965
1977
}
1966
1978
···
1987
1999
},
1988
2000
)
1989
2001
2002
2002
+
if err == nil {
2003
2003
+
return res, nil
2004
2004
+
}
2005
2005
+
1990
2006
return res, err
1991
2007
}
1992
2008
···
2013
2029
C.ffi_iroh_streamplace_rust_future_free_pointer(handle)
2014
2030
},
2015
2031
)
2032
2032
+
2033
2033
+
if err == nil {
2034
2034
+
return res, nil
2035
2035
+
}
2016
2036
2017
2037
return res, err
2018
2038
}
···
2052
2072
},
2053
2073
)
2054
2074
2075
2075
+
if err == nil {
2076
2076
+
return nil
2077
2077
+
}
2078
2078
+
2055
2079
return err
2056
2080
}
2057
2081
···
2081
2105
C.ffi_iroh_streamplace_rust_future_free_pointer(handle)
2082
2106
},
2083
2107
)
2108
2108
+
2109
2109
+
if err == nil {
2110
2110
+
return res, nil
2111
2111
+
}
2084
2112
2085
2113
return res, err
2086
2114
}
···
2122
2150
},
2123
2151
)
2124
2152
2153
2153
+
if err == nil {
2154
2154
+
return nil
2155
2155
+
}
2156
2156
+
2125
2157
return err
2126
2158
}
2127
2159
···
2150
2182
},
2151
2183
)
2152
2184
2185
2185
+
if err == nil {
2186
2186
+
return nil
2187
2187
+
}
2188
2188
+
2153
2189
return err
2154
2190
}
2155
2191
···
2182
2218
},
2183
2219
)
2184
2220
2221
2221
+
if err == nil {
2222
2222
+
return res, nil
2223
2223
+
}
2224
2224
+
2185
2225
return res, err
2186
2226
}
2187
2227
···
2209
2249
C.ffi_iroh_streamplace_rust_future_free_void(handle)
2210
2250
},
2211
2251
)
2252
2252
+
2253
2253
+
if err == nil {
2254
2254
+
return nil
2255
2255
+
}
2212
2256
2213
2257
return err
2214
2258
}
···
2553
2597
},
2554
2598
)
2555
2599
2600
2600
+
if err == nil {
2601
2601
+
return res, nil
2602
2602
+
}
2603
2603
+
2556
2604
return res, err
2557
2605
}
2558
2606
···
2610
2658
},
2611
2659
)
2612
2660
2661
2661
+
if err == nil {
2662
2662
+
return nil
2663
2663
+
}
2664
2664
+
2613
2665
return err
2614
2666
}
2615
2667
···
2637
2689
C.ffi_iroh_streamplace_rust_future_free_void(handle)
2638
2690
},
2639
2691
)
2692
2692
+
2693
2693
+
if err == nil {
2694
2694
+
return nil
2695
2695
+
}
2640
2696
2641
2697
return err
2642
2698
}
···
2778
2834
C.ffi_iroh_streamplace_rust_future_free_void(handle)
2779
2835
},
2780
2836
)
2837
2837
+
2838
2838
+
if err == nil {
2839
2839
+
return nil
2840
2840
+
}
2781
2841
2782
2842
return err
2783
2843
}
···
2872
2932
},
2873
2933
)
2874
2934
2935
2935
+
if err == nil {
2936
2936
+
return res, nil
2937
2937
+
}
2938
2938
+
2875
2939
return res, err
2876
2940
}
2877
2941
···
2972
3036
C.ffi_iroh_streamplace_rust_future_free_void(handle)
2973
3037
},
2974
3038
)
3039
3039
+
3040
3040
+
if err == nil {
3041
3041
+
return nil
3042
3042
+
}
2975
3043
2976
3044
return err
2977
3045
}
+1
-1
pkg/replication/iroh/iroh.go
pkg/replication/iroh_replicator/iroh.go
···
1
1
-
package iroh
1
1
+
package iroh_replicator
2
2
3
3
import (
4
4
"context"
+99
pkg/replication/iroh_replicator/kv.go
···
1
1
+
package iroh_replicator
2
2
+
3
3
+
import (
4
4
+
"context"
5
5
+
"fmt"
6
6
+
"time"
7
7
+
8
8
+
"stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
9
9
+
"stream.place/streamplace/pkg/log"
10
10
+
)
11
11
+
12
12
+
type SwarmKV struct {
13
13
+
node *iroh_streamplace.Node
14
14
+
db *iroh_streamplace.Db
15
15
+
w *iroh_streamplace.WriteScope
16
16
+
}
17
17
+
18
18
+
func StartKV(ctx context.Context, tickets []string, secret []byte) (*SwarmKV, error) {
19
19
+
ctx = log.WithLogValues(ctx, "func", "StartKV")
20
20
+
21
21
+
log.Log(ctx, "Starting with tickets", "tickets", tickets)
22
22
+
config := iroh_streamplace.Config{
23
23
+
Key: secret,
24
24
+
Topic: make([]byte, 32), // all zero topic for testing
25
25
+
MaxSendDuration: 1000_000_000, // 1s
26
26
+
}
27
27
+
log.Log(ctx, "Config created", "config", config)
28
28
+
node, err := iroh_streamplace.NodeSender(config)
29
29
+
if err != nil {
30
30
+
return nil, fmt.Errorf("failed to create NodeSender: %w", err)
31
31
+
}
32
32
+
33
33
+
db := node.Db()
34
34
+
w := node.NodeScope()
35
35
+
36
36
+
node_id, err := node.NodeId()
37
37
+
if err != nil {
38
38
+
return nil, fmt.Errorf("failed to get NodeId: %w", err)
39
39
+
}
40
40
+
log.Log(ctx, "Node ID:", "node_id", node_id)
41
41
+
42
42
+
ticket, err := node.Ticket()
43
43
+
if err != nil {
44
44
+
return nil, fmt.Errorf("failed to get Ticket: %w", err)
45
45
+
}
46
46
+
log.Log(ctx, "Ticket:", "ticket", ticket)
47
47
+
48
48
+
swarm := SwarmKV{
49
49
+
node: node,
50
50
+
db: db,
51
51
+
w: w,
52
52
+
}
53
53
+
return &swarm, nil
54
54
+
}
55
55
+
56
56
+
func (swarm *SwarmKV) Start(ctx context.Context, tickets []string) error {
57
57
+
if len(tickets) > 0 {
58
58
+
err := swarm.node.JoinPeers(tickets)
59
59
+
if err != nil {
60
60
+
return fmt.Errorf("failed to join peers: %w", err)
61
61
+
}
62
62
+
}
63
63
+
64
64
+
sub := swarm.db.Subscribe(iroh_streamplace.NewFilter())
65
65
+
for {
66
66
+
if ctx.Err() != nil {
67
67
+
return ctx.Err()
68
68
+
}
69
69
+
ev, err := sub.NextRaw()
70
70
+
if err != nil {
71
71
+
return fmt.Errorf("failed to get next subscription event: %w", err)
72
72
+
}
73
73
+
if ev == nil {
74
74
+
log.Log(ctx, "Got empty event from sub.NextRaw(), pausing for a second")
75
75
+
time.Sleep(1 * time.Second)
76
76
+
continue
77
77
+
}
78
78
+
switch item := (*ev).(type) {
79
79
+
case iroh_streamplace.SubscribeItemEntry:
80
80
+
keyStr := string(item.Key)
81
81
+
valueStr := string(item.Value)
82
82
+
log.Log(ctx, "SubscribeItemEntry", "key", keyStr, "value", valueStr)
83
83
+
84
84
+
case iroh_streamplace.SubscribeItemCurrentDone:
85
85
+
log.Log(ctx, "SubscribeItemCurrentDone", "currentDone", item)
86
86
+
case iroh_streamplace.SubscribeItemExpired:
87
87
+
log.Log(ctx, "SubscribeItemExpired", "expired", item)
88
88
+
case iroh_streamplace.SubscribeItemOther:
89
89
+
log.Log(ctx, "SubscribeItemOther", "other", item)
90
90
+
}
91
91
+
}
92
92
+
}
93
93
+
94
94
+
func (swarm *SwarmKV) Put(ctx context.Context, key, value string) error {
95
95
+
// streamerBs := []byte(streamer)
96
96
+
keyBs := []byte(key)
97
97
+
valueBs := []byte(value)
98
98
+
return swarm.w.Put(nil, keyBs, valueBs)
99
99
+
}
-19
rust/iroh-streamplace/Makefile
···
1
1
-
.PHONY: clean build-rust generate run
2
2
-
3
3
-
install:
4
4
-
cargo install uniffi-bindgen-go --git https://github.com/NordSecurity/uniffi-bindgen-go --tag v0.4.0+v0.28.3
5
5
-
6
6
-
clean:
7
7
-
cargo clean
8
8
-
rm -rf go/uniffi_example/
9
9
-
10
10
-
build-rust:
11
11
-
cargo build
12
12
-
13
13
-
generate: build-rust
14
14
-
uniffi-bindgen-go --library ../../target/debug/libiroh_streamplace.dylib --out-dir go/
15
15
-
cp ../../target/debug/libiroh_streamplace.dylib go/
16
16
-
17
17
-
run: generate
18
18
-
cd go && CGO_LDFLAGS="-L. -liroh_streamplace" go run main.go $(TICKET)
19
19
-
-3
rust/iroh-streamplace/go/go.mod
···
1
1
-
module hello-app
2
2
-
3
3
-
go 1.25.1
-95
rust/iroh-streamplace/go/main.go
···
1
1
-
2
2
-
package main
3
3
-
4
4
-
import (
5
5
-
"fmt"
6
6
-
"os"
7
7
-
sp "hello-app/iroh_streamplace"
8
8
-
"reflect"
9
9
-
"crypto/rand"
10
10
-
)
11
11
-
12
12
-
func isNilError(err error) bool {
13
13
-
if err == nil {
14
14
-
return true
15
15
-
}
16
16
-
17
17
-
v := reflect.ValueOf(err)
18
18
-
return v.Kind() == reflect.Ptr && v.IsNil()
19
19
-
}
20
20
-
21
21
-
22
22
-
func panicIfErr(err error) {
23
23
-
if !isNilError(err) {
24
24
-
panic(err)
25
25
-
}
26
26
-
}
27
27
-
28
28
-
func main() {
29
29
-
tickets := os.Args[1:];
30
30
-
31
31
-
secret := make([]byte, 32)
32
32
-
_, err := rand.Read(secret)
33
33
-
panicIfErr(err)
34
34
-
35
35
-
fmt.Println("Starting with tickets", tickets)
36
36
-
config := sp.Config {
37
37
-
Key : secret,
38
38
-
Topic: make([]byte, 32), // all zero topic for testing
39
39
-
MaxSendDuration: 1000_000_000, // 1s
40
40
-
}
41
41
-
fmt.Printf("Config created %+v\n", config)
42
42
-
node, err := sp.NodeSender(config)
43
43
-
panicIfErr(err)
44
44
-
45
45
-
db := node.Db()
46
46
-
w := node.NodeScope()
47
47
-
48
48
-
node_id, err := node.NodeId()
49
49
-
panicIfErr(err)
50
50
-
fmt.Println("Node ID:", node_id)
51
51
-
52
52
-
ticket, err := node.Ticket()
53
53
-
panicIfErr(err)
54
54
-
fmt.Println("Ticket:", ticket)
55
55
-
56
56
-
if len(tickets) > 0 {
57
57
-
err = node.JoinPeers(tickets)
58
58
-
panicIfErr(err)
59
59
-
}
60
60
-
61
61
-
w.Put(nil, []byte("hello"), []byte("world"))
62
62
-
stream := []byte("stream1")
63
63
-
w.Put(&stream, []byte("subscribed"), []byte("true"))
64
64
-
65
65
-
filter := sp.NewFilter()
66
66
-
items, err := db.IterWithOpts(filter)
67
67
-
panicIfErr(err)
68
68
-
fmt.Printf("Iter items: %+v\n", items)
69
69
-
70
70
-
filter2 := sp.NewFilter().Global()
71
71
-
items2, err := db.IterWithOpts(filter2)
72
72
-
panicIfErr(err)
73
73
-
fmt.Printf("Iter items: %+v\n", items2)
74
74
-
75
75
-
filter3 := sp.NewFilter().Stream(stream)
76
76
-
items3, err := db.IterWithOpts(filter3)
77
77
-
panicIfErr(err)
78
78
-
fmt.Printf("Iter items: %+v\n", items3)
79
79
-
80
80
-
sub := db.Subscribe(sp.NewFilter())
81
81
-
for {
82
82
-
ev, err := sub.NextRaw()
83
83
-
panicIfErr(err)
84
84
-
switch (*ev).(type) {
85
85
-
case sp.SubscribeItemEntry:
86
86
-
fmt.Printf("%+v\n", (*ev).(sp.SubscribeItemEntry))
87
87
-
case sp.SubscribeItemCurrentDone:
88
88
-
fmt.Printf("Got current done event: %+v\n", (*ev).(sp.SubscribeItemCurrentDone))
89
89
-
case sp.SubscribeItemExpired:
90
90
-
fmt.Printf("Got expired event: %+v\n", (*ev).(sp.SubscribeItemExpired))
91
91
-
case sp.SubscribeItemOther:
92
92
-
fmt.Printf("Got other event: %+v\n", (*ev).(sp.SubscribeItemOther))
93
93
-
}
94
94
-
}
95
95
-
}