Live video on the AT Protocol

iroh replicator: extremely rough PoC

+78 -14
+1 -3
pkg/cmd/streamplace.go
··· 32 32 "stream.place/streamplace/pkg/log" 33 33 "stream.place/streamplace/pkg/media" 34 34 "stream.place/streamplace/pkg/notifications" 35 - "stream.place/streamplace/pkg/replication" 36 - "stream.place/streamplace/pkg/replication/boring" 37 35 "stream.place/streamplace/pkg/replication/iroh_replicator" 38 36 "stream.place/streamplace/pkg/rtmps" 39 37 v0 "stream.place/streamplace/pkg/schema/v0" ··· 307 305 log.Log(ctx, "successfully initialized hardware signer", "address", addr) 308 306 signer = hwsigner 309 307 } 310 - var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 308 + // var rep replication.Replicator = &boring.BoringReplicator{Peers: cli.Peers} 311 309 312 310 mod, err := model.MakeDB(cli.DataFilePath([]string{"index"})) 313 311 if err != nil {
+22 -1
pkg/director/director.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/json" 5 6 "fmt" 6 7 "sync" 7 8 ··· 50 51 } 51 52 52 53 func (d *Director) Start(ctx context.Context) error { 54 + nodeId, err := d.swarm.Node.NodeId() 55 + if err != nil { 56 + return fmt.Errorf("failed to get node id: %w", err) 57 + } 58 + 53 59 newSeg := d.mm.NewSegment() 54 60 ctx, cancel := context.WithCancel(ctx) 55 61 defer cancel() ··· 91 97 } 92 98 d.streamSessionsMu.Unlock() 93 99 go func() { 94 - err := d.swarm.Put(ctx, not.Segment.RepoDID, not.Segment.StartTime.Format(util.ISO8601)) 100 + originInfo := iroh_replicator.OriginInfo{ 101 + NodeID: nodeId.String(), 102 + Time: not.Segment.StartTime.Format(util.ISO8601), 103 + } 104 + bs, err := json.Marshal(originInfo) 105 + if err != nil { 106 + log.Error(ctx, "could not marshal origin info", "error", err) 107 + return 108 + } 109 + err = d.swarm.Put(ctx, not.Segment.RepoDID, bs) 95 110 if err != nil { 96 111 log.Error(ctx, "could not put segment to swarm", "error", err) 112 + return 113 + } 114 + err = d.swarm.Node.SendSegment(not.Segment.RepoDID, not.Data) 115 + if err != nil { 116 + log.Error(ctx, "could not send segment to swarm", "error", err) 117 + return 97 118 } 98 119 }() 99 120 err := ss.NewSegment(ctx, not)
+55 -10
pkg/replication/iroh_replicator/kv.go
··· 2 2 3 3 import ( 4 4 "context" 5 + "encoding/json" 5 6 "fmt" 6 7 "time" 7 8 ··· 10 11 ) 11 12 12 13 type SwarmKV struct { 13 - node *iroh_streamplace.Node 14 - db *iroh_streamplace.Db 14 + Node *iroh_streamplace.Node 15 + DB *iroh_streamplace.Db 15 16 w *iroh_streamplace.WriteScope 17 + } 18 + 19 + // A message saying "hey I ingested node data at this time" 20 + type OriginInfo struct { 21 + NodeID string `json:"node_id"` 22 + Time string `json:"time"` 23 + } 24 + 25 + type DataHandler struct{} 26 + 27 + func (handler *DataHandler) HandleData(topic string, data []byte) { 28 + log.Log(context.Background(), "HandleData", "topic", topic, "data", len(data)) 16 29 } 17 30 18 31 func StartKV(ctx context.Context, tickets []string, secret []byte) (*SwarmKV, error) { 32 + handler := &DataHandler{} 19 33 ctx = log.WithLogValues(ctx, "func", "StartKV") 20 34 21 35 log.Log(ctx, "Starting with tickets", "tickets", tickets) ··· 25 39 MaxSendDuration: 1000_000_000, // 1s 26 40 } 27 41 log.Log(ctx, "Config created", "config", config) 28 - node, err := iroh_streamplace.NodeSender(config) 42 + node, err := iroh_streamplace.NodeReceiver(config, handler) 29 43 if err != nil { 30 44 return nil, fmt.Errorf("failed to create NodeSender: %w", err) 31 45 } ··· 46 60 log.Log(ctx, "Ticket:", "ticket", ticket) 47 61 48 62 swarm := SwarmKV{ 49 - node: node, 50 - db: db, 63 + Node: node, 64 + DB: db, 51 65 w: w, 52 66 } 53 67 return &swarm, nil 54 68 } 55 69 70 + var activeSubs = make(map[string]bool) 71 + 56 72 func (swarm *SwarmKV) Start(ctx context.Context, tickets []string) error { 57 73 if len(tickets) > 0 { 58 - err := swarm.node.JoinPeers(tickets) 74 + err := swarm.Node.JoinPeers(tickets) 59 75 if err != nil { 60 76 return fmt.Errorf("failed to join peers: %w", err) 61 77 } 62 78 } 63 79 64 - sub := swarm.db.Subscribe(iroh_streamplace.NewFilter()) 80 + nodeId, err := swarm.Node.NodeId() 81 + if err != nil { 82 + return fmt.Errorf("failed to get node id: %w", err) 83 + } 84 + nodeIdStr := nodeId.String() 85 + log.Log(ctx, "Node ID:", "node_id", nodeIdStr) 86 + 87 + sub := swarm.DB.Subscribe(iroh_streamplace.NewFilter()) 65 88 for { 66 89 if ctx.Err() != nil { 67 90 return ctx.Err() ··· 80 103 keyStr := string(item.Key) 81 104 valueStr := string(item.Value) 82 105 log.Log(ctx, "SubscribeItemEntry", "key", keyStr, "value", valueStr) 106 + var info OriginInfo 107 + err := json.Unmarshal(item.Value, &info) 108 + if err != nil { 109 + log.Error(ctx, "could not unmarshal origin info", "error", err) 110 + continue 111 + } 112 + if !activeSubs[keyStr] { 113 + if info.NodeID == nodeIdStr { 114 + activeSubs[keyStr] = true 115 + continue 116 + } 117 + pubKey, err := iroh_streamplace.PublicKeyFromString(info.NodeID) 118 + if err != nil { 119 + log.Error(ctx, "could not create public key", "error", err) 120 + continue 121 + } 122 + activeSubs[keyStr] = true 123 + err = swarm.Node.Subscribe(keyStr, pubKey) 124 + if err != nil { 125 + log.Error(ctx, "could not subscribe to key", "error", err) 126 + continue 127 + } 128 + } 83 129 84 130 case iroh_streamplace.SubscribeItemCurrentDone: 85 131 log.Log(ctx, "SubscribeItemCurrentDone", "currentDone", item) ··· 91 137 } 92 138 } 93 139 94 - func (swarm *SwarmKV) Put(ctx context.Context, key, value string) error { 140 + func (swarm *SwarmKV) Put(ctx context.Context, key string, value []byte) error { 95 141 // streamerBs := []byte(streamer) 96 142 keyBs := []byte(key) 97 - valueBs := []byte(value) 98 - return swarm.w.Put(nil, keyBs, valueBs) 143 + return swarm.w.Put(nil, keyBs, value) 99 144 }