Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

multitest: first replication test works!

+161 -40
+21 -20
pkg/cmd/whep.go
··· 47 47 Endpoint string 48 48 Count int 49 49 FreezeAfter time.Duration 50 + Stats []map[string]*TrackStats 50 51 } 51 52 52 53 type WHEPConnection struct { ··· 57 58 Done func() <-chan struct{} 58 59 } 59 60 60 - func (w *WHEPClient) StartWHEPConnection(ctx context.Context) (*WHEPConnection, error) { 61 + type TrackStats struct { 62 + Total int 63 + lastTotal int 64 + lastUpdate time.Time 65 + mu sync.Mutex 66 + } 67 + 68 + func (w *WHEPClient) StartWHEPConnection(ctx context.Context, stats map[string]*TrackStats) (*WHEPConnection, error) { 61 69 62 70 // Prepare the configuration 63 71 config := webrtc.Configuration{} ··· 68 76 return nil, err 69 77 } 70 78 71 - // Track statistics 72 - type trackStats struct { 73 - total int 74 - lastTotal int 75 - lastUpdate time.Time 76 - mu sync.Mutex 77 - } 78 - 79 - stats := map[string]*trackStats{ 80 - "video": {lastUpdate: time.Now()}, 81 - "audio": {lastUpdate: time.Now()}, 82 - } 83 - 84 79 // Create a ticker to print combined bitrate every 5 seconds 85 80 ticker := time.NewTicker(5 * time.Second) 86 81 ··· 102 97 videoElapsed := currentTime.Sub(videoStats.lastUpdate).Seconds() 103 98 audioElapsed := currentTime.Sub(audioStats.lastUpdate).Seconds() 104 99 105 - videoBytes := videoStats.total - videoStats.lastTotal 106 - audioBytes := audioStats.total - audioStats.lastTotal 100 + videoBytes := videoStats.Total - videoStats.lastTotal 101 + audioBytes := audioStats.Total - audioStats.lastTotal 107 102 108 103 videoBitrate := float64(videoBytes) * 8 / videoElapsed / 1000 // kbps 109 104 audioBitrate := float64(audioBytes) * 8 / audioElapsed / 1000 // kbps ··· 114 109 "total", fmt.Sprintf("%.2f kbps", videoBitrate+audioBitrate)) 115 110 116 111 // Update last values 117 - videoStats.lastTotal = videoStats.total 112 + videoStats.lastTotal = videoStats.Total 118 113 videoStats.lastUpdate = currentTime 119 - audioStats.lastTotal = audioStats.total 114 + audioStats.lastTotal = audioStats.Total 120 115 audioStats.lastUpdate = currentTime 121 116 122 117 // Unlock stats ··· 156 151 } 157 152 158 153 trackStat.mu.Lock() 159 - trackStat.total += len(rtp.Payload) 154 + trackStat.Total += len(rtp.Payload) 160 155 trackStat.mu.Unlock() 161 156 } 162 157 }) ··· 252 247 } 253 248 254 249 func (w *WHEPClient) WHEP(ctx context.Context) error { 250 + w.Stats = []map[string]*TrackStats{} 255 251 ctx, cancel := context.WithCancel(ctx) 256 252 defer cancel() 257 253 258 254 conns := make([]*WHEPConnection, w.Count) 259 255 g := &errgroup.Group{} 260 256 for i := 0; i < w.Count; i++ { 257 + stats := map[string]*TrackStats{ 258 + "video": {lastUpdate: time.Now()}, 259 + "audio": {lastUpdate: time.Now()}, 260 + } 261 + w.Stats = append(w.Stats, stats) 261 262 g.Go(func() error { 262 - conn, err := w.StartWHEPConnection(ctx) 263 + conn, err := w.StartWHEPConnection(ctx, stats) 263 264 if err != nil { 264 265 return err 265 266 }
+21
pkg/config/config.go
··· 129 129 IrohTopic string 130 130 DID string 131 131 DisableIrohRelay bool 132 + DevAccountCreds map[string]string 132 133 } 133 134 134 135 // ContentFilters represents the content filtering configuration ··· 214 215 cli.StringSliceFlag(fs, &cli.Tickets, "tickets", "[]", "tickets to join the swarm with") 215 216 fs.StringVar(&cli.IrohTopic, "iroh-topic", "", "topic to use for the iroh swarm (must be 32 bytes in hex)") 216 217 fs.BoolVar(&cli.DisableIrohRelay, "disable-iroh-relay", false, "disable the iroh relay") 218 + cli.KVSliceFlag(fs, &cli.DevAccountCreds, "dev-account-creds", "", "(FOR DEVELOPMENT ONLY) did=password pairs for logging into test accounts without oauth") 217 219 218 220 lpFlags := flag.NewFlagSet("livepeer", flag.ContinueOnError) 219 221 _ = starter.NewLivepeerConfig(lpFlags) ··· 538 540 } 539 541 strs := strings.Split(s, ",") 540 542 *dest = append(*dest, strs...) 543 + return nil 544 + }) 545 + } 546 + 547 + func (cli *CLI) KVSliceFlag(fs *flag.FlagSet, dest *map[string]string, name, defaultValue, usage string) { 548 + *dest = map[string]string{} 549 + usage = fmt.Sprintf(`%s (default: "%s")`, usage, *dest) 550 + fs.Func(name, usage, func(s string) error { 551 + if s == "" { 552 + return nil 553 + } 554 + pairs := strings.Split(s, ",") 555 + for _, pair := range pairs { 556 + parts := strings.Split(pair, "=") 557 + if len(parts) != 2 { 558 + return fmt.Errorf("invalid kv flag: %s", pair) 559 + } 560 + (*dest)[parts[0]] = parts[1] 561 + } 541 562 return nil 542 563 }) 543 564 }
+1 -1
pkg/crypto/spkey/spkey.go
··· 30 30 } 31 31 didBytes := []byte(did) 32 32 combinedBytes := append(priv.Bytes(), didBytes...) 33 - multibaseKey := base58.Encode(combinedBytes) 33 + multibaseKey := "z" + base58.Encode(combinedBytes) 34 34 return multibaseKey, pub, nil 35 35 } 36 36
+7 -4
pkg/devenv/devenv.go
··· 25 25 ) 26 26 27 27 type DevEnv struct { 28 - PDSURL string `json:"pds-url"` 29 - PLCURL string `json:"plc-url"` 28 + PDSURL string `json:"pds-url"` 29 + PLCURL string `json:"plc-url"` 30 + Accounts []*DevEnvAccount `json:"accounts"` 30 31 } 31 32 32 33 func WithDevEnv(t *testing.T) *DevEnv { ··· 55 56 t.Logf("Error unmarshalling dev-env stdout: %v", err) 56 57 t.FailNow() 57 58 } 59 + env.Accounts = []*DevEnvAccount{} 58 60 59 61 go func() { 60 62 scanner := bufio.NewScanner(stdout) ··· 124 126 Handle: out.Handle, 125 127 }, 126 128 } 127 - 128 - return &DevEnvAccount{ 129 + acct := &DevEnvAccount{ 129 130 Handle: out.Handle, 130 131 Email: email, 131 132 Password: password, 132 133 DID: out.Did, 133 134 XRPC: xrpcc, 134 135 } 136 + d.Accounts = append(d.Accounts, acct) 137 + return acct 135 138 } 136 139 137 140 // Custom RoundTripper for intercepting .test domain requests
+44 -9
pkg/director/stream_session.go
··· 7 7 "sync" 8 8 "time" 9 9 10 - "github.com/bluesky-social/indigo/api/atproto" 10 + comatproto "github.com/bluesky-social/indigo/api/atproto" 11 11 "github.com/bluesky-social/indigo/api/bsky" 12 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 13 "github.com/bluesky-social/indigo/util" 14 14 "github.com/bluesky-social/indigo/xrpc" 15 15 "github.com/streamplace/oatproxy/pkg/oatproxy" 16 16 "golang.org/x/sync/errgroup" 17 + "stream.place/streamplace/pkg/aqhttp" 17 18 "stream.place/streamplace/pkg/aqtime" 18 19 "stream.place/streamplace/pkg/bus" 19 20 "stream.place/streamplace/pkg/config" ··· 370 371 } 371 372 372 373 var swapRecord *string 373 - getOutput := atproto.RepoGetRecord_Output{} 374 + getOutput := comatproto.RepoGetRecord_Output{} 374 375 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 375 376 "repo": repoDID, 376 377 "collection": "app.bsky.actor.status", ··· 390 391 swapRecord = getOutput.Cid 391 392 } 392 393 393 - inp := atproto.RepoPutRecord_Input{ 394 + inp := comatproto.RepoPutRecord_Input{ 394 395 Collection: "app.bsky.actor.status", 395 396 Record: &lexutil.LexiconTypeDecoder{Val: &status}, 396 397 Rkey: "self", 397 398 Repo: repoDID, 398 399 SwapRecord: swapRecord, 399 400 } 400 - out := atproto.RepoPutRecord_Output{} 401 + out := comatproto.RepoPutRecord_Output{} 401 402 402 403 ss.lastStatusCID = &out.Cid 403 404 ··· 421 422 log.Debug(ctx, "no status cid to delete") 422 423 return nil 423 424 } 424 - inp := atproto.RepoDeleteRecord_Input{ 425 + inp := comatproto.RepoDeleteRecord_Input{ 425 426 Collection: "app.bsky.actor.status", 426 427 Rkey: "self", 427 428 Repo: repoDID, 428 429 } 429 430 inp.SwapRecord = ss.lastStatusCID 430 - out := atproto.RepoDeleteRecord_Output{} 431 + out := comatproto.RepoDeleteRecord_Output{} 431 432 432 433 client, err := ss.GetClientByDID(repoDID) 433 434 if err != nil { ··· 470 471 rkey := fmt.Sprintf("%s::did:web:%s", ss.repoDID, ss.cli.ServerHost) 471 472 472 473 var swapRecord *string 473 - getOutput := atproto.RepoGetRecord_Output{} 474 + getOutput := comatproto.RepoGetRecord_Output{} 474 475 err = client.Do(ctx, xrpc.Query, "application/json", "com.atproto.repo.getRecord", map[string]any{ 475 476 "repo": ss.repoDID, 476 477 "collection": "place.stream.broadcast.origin", ··· 490 491 swapRecord = getOutput.Cid 491 492 } 492 493 493 - inp := atproto.RepoPutRecord_Input{ 494 + inp := comatproto.RepoPutRecord_Input{ 494 495 Collection: "place.stream.broadcast.origin", 495 496 Record: &lexutil.LexiconTypeDecoder{Val: &origin}, 496 497 Rkey: rkey, 497 498 Repo: ss.repoDID, 498 499 SwapRecord: swapRecord, 499 500 } 500 - out := atproto.RepoPutRecord_Output{} 501 + out := comatproto.RepoPutRecord_Output{} 501 502 502 503 err = client.Do(ctx, xrpc.Procedure, "application/json", "com.atproto.repo.putRecord", map[string]any{}, inp, &out) 503 504 if err != nil { ··· 621 622 } 622 623 623 624 func (ss *StreamSession) GetClientByDID(did string) (XRPCClient, error) { 625 + password, ok := ss.cli.DevAccountCreds[did] 626 + if ok { 627 + repo, err := ss.mod.GetRepoByHandleOrDID(did) 628 + if err != nil { 629 + return nil, fmt.Errorf("could not get repo by did: %w", err) 630 + } 631 + if repo == nil { 632 + return nil, fmt.Errorf("repo not found for did: %s", did) 633 + } 634 + anonXRPCC := &xrpc.Client{ 635 + Host: repo.PDS, 636 + Client: &aqhttp.Client, 637 + } 638 + session, err := comatproto.ServerCreateSession(context.Background(), anonXRPCC, &comatproto.ServerCreateSession_Input{ 639 + Identifier: repo.DID, 640 + Password: password, 641 + }) 642 + if err != nil { 643 + return nil, fmt.Errorf("could not create session: %w", err) 644 + } 645 + 646 + log.Warn(context.Background(), "created session for dev account", "did", repo.DID, "handle", repo.Handle, "pds", repo.PDS) 647 + 648 + return &xrpc.Client{ 649 + Host: repo.PDS, 650 + Client: &aqhttp.Client, 651 + Auth: &xrpc.AuthInfo{ 652 + Did: repo.DID, 653 + AccessJwt: session.AccessJwt, 654 + RefreshJwt: session.RefreshJwt, 655 + Handle: repo.Handle, 656 + }, 657 + }, nil 658 + } 624 659 session, err := ss.statefulDB.GetSessionByDID(ss.repoDID) 625 660 if err != nil { 626 661 return nil, fmt.Errorf("could not get OAuth session for repoDID: %w", err)
+67 -6
pkg/multitest/multitest_test.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "net/http" 6 7 "os" 7 8 "os/exec" 8 9 "path/filepath" ··· 15 16 lexutil "github.com/bluesky-social/indigo/lex/util" 16 17 "github.com/bluesky-social/indigo/util" 17 18 "github.com/stretchr/testify/require" 19 + "golang.org/x/sync/errgroup" 20 + "stream.place/streamplace/pkg/cmd" 18 21 "stream.place/streamplace/pkg/crypto/spkey" 19 22 "stream.place/streamplace/pkg/devenv" 23 + "stream.place/streamplace/pkg/gstinit" 20 24 "stream.place/streamplace/pkg/log" 21 25 "stream.place/streamplace/pkg/streamplace" 22 26 ) 23 27 24 28 func TestMultinodeSyndication(t *testing.T) { 29 + gstinit.InitGST() 25 30 dev := devenv.WithDevEnv(t) 26 - startStreamplaceNode(t, dev) 27 - // startStreamplaceNode(t, dev) 28 31 acct := dev.CreateAccount(t) 29 - _, pub, err := spkey.GenerateStreamKeyForDID(acct.DID) 32 + node1 := startStreamplaceNode(t, dev) 33 + node2 := startStreamplaceNode(t, dev) 34 + priv, pub, err := spkey.GenerateStreamKeyForDID(acct.DID) 30 35 require.NoError(t, err) 31 36 createdBy := "multitest" 32 37 streamKey := streamplace.Key{ ··· 41 46 }) 42 47 require.NoError(t, err) 43 48 log.Log(context.Background(), "created stream key", "did", acct.DID, "pub", pub.DIDKey()) 49 + whip := &cmd.WHIPClient{ 50 + StreamKey: priv, 51 + File: "/home/iameli/testvids/RocketLeague_1h55m_1sGOP_1080p60_NoBframes.mp4", 52 + Endpoint: fmt.Sprintf("http://%s", node1.Env["SP_HTTP_ADDR"]), 53 + Count: 1, 54 + } 55 + 56 + whep := &cmd.WHEPClient{ 57 + Endpoint: fmt.Sprintf("http://%s/api/playback/%s/webrtc", node2.Env["SP_HTTP_ADDR"], acct.DID), 58 + Count: 1, 59 + } 60 + 61 + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) 62 + defer cancel() 63 + g, ctx := errgroup.WithContext(ctx) 64 + g.Go(func() error { 65 + return whip.WHIP(ctx) 66 + }) 67 + g.Go(func() error { 68 + return whep.WHEP(ctx) 69 + }) 70 + 71 + <-ctx.Done() 72 + 73 + err = g.Wait() 74 + require.ErrorIs(t, err, context.DeadlineExceeded) 75 + stats := whep.Stats[0] 76 + videoStats := stats["video"] 77 + audioStats := stats["audio"] 78 + require.Greater(t, videoStats.Total, 0) 79 + require.Greater(t, audioStats.Total, 0) 44 80 } 45 81 46 82 var currentPort = 10000 ··· 50 86 return currentPort 51 87 } 52 88 53 - func startStreamplaceNode(t *testing.T, dev *devenv.DevEnv) { 89 + type TestNode struct { 90 + Env map[string]string 91 + } 92 + 93 + func startStreamplaceNode(t *testing.T, dev *devenv.DevEnv) *TestNode { 54 94 dataDir := t.TempDir() 95 + devAccountCreds := []string{} 96 + for _, acct := range dev.Accounts { 97 + devAccountCreds = append(devAccountCreds, fmt.Sprintf("%s=%s", acct.DID, acct.Password)) 98 + } 55 99 env := map[string]string{ 56 - "SP_HTTP_ADDR": fmt.Sprintf(":%d", nextPort()), 57 - "SP_HTTP_INTERNAL_ADDR": fmt.Sprintf(":%d", nextPort()), 100 + "SP_HTTP_ADDR": fmt.Sprintf("127.0.0.1:%d", nextPort()), 101 + "SP_HTTP_INTERNAL_ADDR": fmt.Sprintf("127.0.0.1:%d", nextPort()), 58 102 "SP_RELAY_HOST": strings.ReplaceAll(dev.PDSURL, "http://", "ws://"), 59 103 "SP_PLC_URL": dev.PLCURL, 60 104 "SP_DATA_DIR": dataDir, 105 + "SP_DEV_ACCOUNT_CREDS": strings.Join(devAccountCreds, ","), 61 106 } 62 107 _, file, _, _ := runtime.Caller(0) 63 108 abs, err := filepath.Abs(filepath.Join(filepath.Dir(file), "..", "..", "build-linux-amd64", "streamplace")) ··· 78 123 _, err = cmd.Process.Wait() 79 124 require.NoError(t, err) 80 125 }) 126 + // Wait for the streamplace node to be ready by polling the health endpoint 127 + healthz := fmt.Sprintf("http://%s/api/healthz", env["SP_HTTP_ADDR"]) 128 + client := &http.Client{Timeout: 2 * time.Second} 129 + for { 130 + resp, err := client.Get(healthz) 131 + if err == nil { 132 + defer resp.Body.Close() 133 + if resp.StatusCode == 200 { 134 + break 135 + } 136 + } 137 + time.Sleep(200 * time.Millisecond) 138 + } 139 + return &TestNode{ 140 + Env: env, 141 + } 81 142 }