Live video on the AT Protocol
1package iroh_replicator
2
3import (
4 "bytes"
5 "context"
6 "crypto/rand"
7 "encoding/json"
8 "fmt"
9 "reflect"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/indigo/util"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/bus"
17 "stream.place/streamplace/pkg/config"
18 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
19 "stream.place/streamplace/pkg/log"
20 "stream.place/streamplace/pkg/media"
21 "stream.place/streamplace/pkg/model"
22 "stream.place/streamplace/pkg/spmetrics"
23 "stream.place/streamplace/pkg/streamplace"
24)
25
26type IrohSwarm struct {
27 Node *iroh_streamplace.Node
28 DB *iroh_streamplace.Db
29 w *iroh_streamplace.WriteScope
30 mm *media.MediaManager
31 segChan chan *media.NewSegmentNotification
32 NodeID string
33 NodeTicket string
34 bus *bus.Bus
35 originMutex sync.Mutex
36 mod model.Model
37 cli *config.CLI
38 activeSubs map[string]*SwarmOriginInfo
39 handleDataScoped func(pubKey *iroh_streamplace.PublicKey, topic string, data []byte)
40}
41
42// A message saying "hey I ingested node data at this time"
43type SwarmOriginInfo struct {
44 Type string `json:"$type"`
45 NodeID string `json:"node_id"`
46 Time string `json:"time"`
47 Streamer string `json:"streamer"`
48}
49
50type SwarmViewerCount struct {
51 Type string `json:"$type"`
52 Server string `json:"server"`
53 Streamer string `json:"streamer"`
54 Viewers int `json:"viewers"`
55}
56
57func NewSwarm(ctx context.Context, cli *config.CLI, secret []byte, topic []byte, mm *media.MediaManager, bus *bus.Bus, mod model.Model) (*IrohSwarm, error) {
58 ctx = log.WithLogValues(ctx, "func", "StartKV")
59
60 if topic == nil {
61 topic = make([]byte, 32)
62 _, err := rand.Read(topic)
63 if err != nil {
64 return nil, fmt.Errorf("failed to generate random topic: %w", err)
65 }
66 }
67
68 log.Log(ctx, "Starting with tickets", "tickets", cli.Tickets)
69 config := iroh_streamplace.Config{
70 Key: secret,
71 Topic: topic,
72 MaxSendDuration: 1000_000_000, // 1s
73 DisableRelay: cli.DisableIrohRelay,
74 }
75 log.Log(ctx, "Config created", "config", config)
76
77 swarm := IrohSwarm{
78 mm: mm,
79 activeSubs: make(map[string]*SwarmOriginInfo),
80 bus: bus,
81 mod: mod,
82 cli: cli,
83 }
84
85 // workaround to get context into the HandleData callback
86 swarm.handleDataScoped = func(_ *iroh_streamplace.PublicKey, topic string, data []byte) {
87 if ctx.Err() != nil {
88 return
89 }
90 err := swarm.mm.ValidateMP4(context.Background(), bytes.NewReader(data), false)
91 if err != nil {
92 log.Error(ctx, "could not validate segment", "error", err, "topic", topic, "data", len(data))
93 }
94 }
95
96 node, err := iroh_streamplace.NodeReceiver(config, &swarm)
97 if err != nil {
98 return nil, fmt.Errorf("failed to create NodeSender: %w", err)
99 }
100
101 db := node.Db()
102 w := node.NodeScope()
103
104 swarm.DB = db
105 swarm.w = w
106 swarm.Node = node
107
108 nodeId, err := node.NodeId()
109 if err != nil {
110 return nil, fmt.Errorf("failed to get NodeId: %w", err)
111 }
112 log.Log(ctx, "Node ID:", "node_id", nodeId)
113 swarm.NodeID = nodeId.String()
114
115 ticket, err := node.Ticket()
116 if err != nil {
117 return nil, fmt.Errorf("failed to get Ticket: %w", err)
118 }
119 swarm.NodeTicket = ticket
120
121 return &swarm, nil
122}
123
124func (swarm *IrohSwarm) BuildOriginRecord(origin *streamplace.BroadcastOrigin) error {
125 origin.IrohTicket = &swarm.NodeTicket
126 return nil
127}
128
129func (swarm *IrohSwarm) Start(ctx context.Context, cli *config.CLI) error {
130 if len(cli.Tickets) > 0 {
131 err := swarm.Node.JoinPeers(cli.Tickets)
132 if err != nil {
133 return fmt.Errorf("failed to join peers: %w", err)
134 }
135 }
136 nodeId, err := swarm.Node.NodeId()
137 if err != nil {
138 return fmt.Errorf("failed to get node id: %w", err)
139 }
140 nodeIdStr := nodeId.String()
141 log.Log(ctx, "Node ID:", "node_id", nodeIdStr)
142
143 g, ctx := errgroup.WithContext(ctx)
144 g.Go(func() error {
145 return swarm.startKV(ctx)
146 })
147 g.Go(func() error {
148 return swarm.startSegmentSender(ctx)
149 })
150 g.Go(func() error {
151 <-ctx.Done()
152 return swarm.Node.Shutdown()
153 })
154 g.Go(func() error {
155 return swarm.startBusSubscribe(ctx)
156 })
157 g.Go(func() error {
158 return swarm.startViewerCountSubscribe(ctx)
159 })
160 return g.Wait()
161}
162
163func (swarm *IrohSwarm) startKV(ctx context.Context) error {
164 sub := swarm.DB.Subscribe(iroh_streamplace.NewFilter())
165 for {
166 if ctx.Err() != nil {
167 return ctx.Err()
168 }
169 ev, err := sub.NextRaw()
170 if err != nil {
171 return fmt.Errorf("failed to get next subscription event: %w", err)
172 }
173
174 if ev == nil {
175 log.Warn(ctx, "Got empty event from sub.NextRaw(), pausing for a second and continuing")
176 time.Sleep(1 * time.Second)
177 continue
178 }
179
180 switch item := (*ev).(type) {
181 case iroh_streamplace.SubscribeItemEntry:
182 err := swarm.handleIrohMessage(ctx, item)
183 if err != nil {
184 log.Error(ctx, "could not handle iroh message", "error", err)
185 continue
186 }
187
188 case iroh_streamplace.SubscribeItemCurrentDone:
189 log.Debug(ctx, "SubscribeItemCurrentDone", "currentDone", item)
190 case iroh_streamplace.SubscribeItemExpired:
191 log.Debug(ctx, "SubscribeItemExpired", "expired", item)
192 case iroh_streamplace.SubscribeItemOther:
193 log.Debug(ctx, "SubscribeItemOther", "other", item)
194 }
195 }
196}
197
198func (swarm *IrohSwarm) handleIrohMessage(ctx context.Context, item iroh_streamplace.SubscribeItemEntry) error {
199 keyStr := string(item.Key)
200 valueStr := string(item.Value)
201 log.Debug(ctx, "SubscribeItemEntry", "key", keyStr, "value", valueStr)
202 if len(valueStr) > 0 && valueStr[0] != '{' {
203 // not JSON, it's one of the rust messages
204 log.Debug(ctx, "not JSON", "key", keyStr, "value", valueStr)
205 return nil
206 }
207 rawMessage, err := decodeIrohMessage(item.Key, item.Value)
208 if err != nil {
209 return fmt.Errorf("could not decode iroh message: %w", err)
210 }
211 switch message := rawMessage.(type) {
212 case SwarmOriginInfo:
213 err = swarm.checkOrigins(ctx, message.Streamer, message.NodeID)
214 if err != nil {
215 return fmt.Errorf("could not check origins: %w", err)
216 }
217 case SwarmViewerCount:
218 log.Log(ctx, "got viewer count", "viewerCount", message)
219 if message.Server == swarm.NodeID {
220 // no infinite loops allowed
221 return nil
222 }
223 swarm.bus.SetViewerCount(message.Streamer, message.Server, message.Viewers)
224 log.Log(ctx, "set viewer count", "viewerCount", message)
225 return nil
226 default:
227 return fmt.Errorf("unknown message type: %s", reflect.TypeOf(rawMessage))
228 }
229 return nil
230}
231
232func decodeIrohMessage(key, value []byte) (any, error) {
233 keyStr := string(key)
234 if strings.HasPrefix(keyStr, "origin::") {
235 var originInfo SwarmOriginInfo
236 err := json.Unmarshal(value, &originInfo)
237 if err != nil {
238 return nil, fmt.Errorf("could not unmarshal origin info: %w", err)
239 }
240 return originInfo, nil
241 }
242 if strings.HasPrefix(keyStr, "viewers::") {
243 var viewerCount SwarmViewerCount
244 err := json.Unmarshal(value, &viewerCount)
245 if err != nil {
246 return nil, fmt.Errorf("could not unmarshal viewer count: %w", err)
247 }
248 return viewerCount, nil
249 }
250 return nil, fmt.Errorf("unknown key: %s", keyStr)
251}
252
253// subscribe to all streams
254func (swarm *IrohSwarm) startBusSubscribe(ctx context.Context) error {
255 // start subscription first so we're buffering new origins
256 busCh := swarm.bus.Subscribe("")
257 originViews, err := swarm.mod.GetRecentBroadcastOrigins(ctx)
258 if err != nil {
259 return fmt.Errorf("failed to get recent broadcast origins: %w", err)
260 }
261 for _, view := range originViews {
262 err = swarm.handleOriginMessage(ctx, view)
263 if err != nil {
264 log.Error(ctx, "could not check origin", "error", err)
265 }
266 }
267 log.Log(ctx, "Resumed recent broadcast origins", "count", len(originViews))
268 for {
269 select {
270 case <-ctx.Done():
271 return ctx.Err()
272 case msg := <-busCh:
273 if view, ok := msg.(*streamplace.BroadcastDefs_BroadcastOriginView); ok {
274 log.Debug(ctx, "got broadcast origin view", "view", view)
275 err = swarm.handleOriginMessage(ctx, view)
276 if err != nil {
277 log.Error(ctx, "could not handle origin message", "error", err)
278 }
279 }
280 }
281 }
282}
283
284func (swarm *IrohSwarm) startViewerCountSubscribe(ctx context.Context) error {
285 ch := swarm.bus.SubscribeToViewerCount()
286 for {
287 select {
288 case <-ctx.Done():
289 return ctx.Err()
290 case msg := <-ch:
291 log.Log(ctx, "got viewer count update", "viewerCount", msg)
292 if msg.Origin != "local" {
293 continue
294 }
295 swarmMsg := SwarmViewerCount{
296 Type: "place.stream.swarm.viewerCount",
297 Server: swarm.NodeID,
298 Streamer: msg.Streamer,
299 Viewers: msg.Count,
300 }
301 bs, err := json.Marshal(swarmMsg)
302 if err != nil {
303 log.Error(ctx, "could not marshal viewer count", "error", err)
304 continue
305 }
306 key := fmt.Sprintf("viewers::%s::%s", swarm.NodeID, msg.Streamer)
307 err = swarm.w.Put(nil, []byte(key), bs)
308 if err != nil {
309 log.Error(ctx, "could not put viewer count to swarm", "error", err)
310 continue
311 }
312 log.Log(ctx, "put viewer count to swarm", "viewerCount", msg)
313
314 }
315 }
316}
317
318func (swarm *IrohSwarm) handleOriginMessage(ctx context.Context, view *streamplace.BroadcastDefs_BroadcastOriginView) error {
319 origin, ok := view.Record.Val.(*streamplace.BroadcastOrigin)
320 if !ok {
321 return fmt.Errorf("record is not a BroadcastOrigin")
322 }
323 if view.Author.Did != origin.Streamer {
324 // currently, only streamers are allowed to advertise origins
325 return nil
326 }
327 if origin.IrohTicket == nil {
328 return fmt.Errorf("origin has no iroh ticket")
329 }
330 pubKey, err := iroh_streamplace.NodeIdFromTicket(*origin.IrohTicket)
331 if err != nil {
332 return fmt.Errorf("could not get node id from ticket: %w", err)
333 }
334 err = swarm.Node.AddTickets([]string{*origin.IrohTicket})
335 if err != nil {
336 return fmt.Errorf("could not add tickets: %w", err)
337 }
338 pubKeyStr := pubKey.String()
339 err = swarm.checkOrigins(ctx, origin.Streamer, pubKeyStr)
340 if err != nil {
341 return fmt.Errorf("could not check origin: %w", err)
342 }
343 return nil
344}
345
346func (swarm *IrohSwarm) checkOrigins(ctx context.Context, streamer string, nodeID string) error {
347 ctx = log.WithLogValues(ctx, "streamer", streamer, "nodeID", nodeID, "func", "checkOrigins")
348 err := swarm.cli.StreamIsAllowed(streamer)
349 if err != nil {
350 return fmt.Errorf("user %s is not allowlisted on this node: %w", streamer, err)
351 }
352 swarm.originMutex.Lock()
353 defer swarm.originMutex.Unlock()
354 oldSub, ok := swarm.activeSubs[streamer]
355 if ok {
356 if oldSub.NodeID == nodeID {
357 log.Debug(ctx, "node hasn't changed", "streamer", streamer)
358 // mmyep. same node still has the stream. great news.
359 return nil
360 }
361 log.Log(ctx, "Stream origin changed, swapping to new node", "old_node", oldSub.NodeID, "new_node", nodeID, "streamer", streamer)
362 pubKey, err := iroh_streamplace.PublicKeyFromString(oldSub.NodeID)
363 if err != nil {
364 log.Error(ctx, "could not create public key", "error", err)
365 return err
366 }
367 // different node has the stream. we need to unsubscribe from the old node.
368 err = swarm.Node.Unsubscribe(streamer, pubKey)
369 if err != nil {
370 log.Error(ctx, "could not unsubscribe from key", "error", err)
371 return err
372 }
373 delete(swarm.activeSubs, streamer)
374 }
375 if nodeID == swarm.NodeID {
376 log.Debug(ctx, "I already have this stream", "streamer", streamer)
377 // oh, i have this stream. cool. do nothing.
378 return nil
379 }
380 log.Log(ctx, "Subscribing to stream start", "new_node", nodeID, "streamer", streamer)
381 pubKey, err := iroh_streamplace.PublicKeyFromString(nodeID)
382 if err != nil {
383 log.Error(ctx, "could not create public key", "error", err)
384 return err
385 }
386 err = swarm.Node.Subscribe(streamer, pubKey)
387 log.Log(ctx, "Subscribing to stream done", "new_node", nodeID, "streamer", streamer, "pubKey", pubKey, "error", err)
388 if err != nil {
389 log.Error(ctx, "could not subscribe to key", "error", err)
390 return err
391 }
392 swarm.activeSubs[streamer] = &SwarmOriginInfo{
393 Type: "place.stream.swarm.originInfo",
394 NodeID: nodeID,
395 Time: time.Now().Format(util.ISO8601),
396 Streamer: streamer,
397 }
398 return nil
399}
400
401func (swarm *IrohSwarm) startSegmentSender(ctx context.Context) error {
402 ch := swarm.mm.NewSegment()
403 for {
404 select {
405 case <-ctx.Done():
406 return ctx.Err()
407 case not := <-ch:
408 err := swarm.SendSegment(ctx, not)
409 if err != nil {
410 log.Error(ctx, "could not send segment to swarm", "error", err)
411 }
412 continue
413 }
414 }
415}
416
417func (swarm *IrohSwarm) HandleData(pubKey *iroh_streamplace.PublicKey, topic string, data []byte) {
418 swarm.handleDataScoped(pubKey, topic, data)
419}
420
421func (swarm *IrohSwarm) SendSegment(ctx context.Context, not *media.NewSegmentNotification) error {
422 if !not.Local {
423 return nil
424 }
425 originInfo := SwarmOriginInfo{
426 Type: "place.stream.swarm.originInfo",
427 NodeID: swarm.NodeID,
428 Time: not.Segment.StartTime.Format(util.ISO8601),
429 Streamer: not.Segment.RepoDID,
430 }
431 bs, err := json.Marshal(originInfo)
432 if err != nil {
433 log.Error(ctx, "could not marshal origin info", "error", err)
434 return err
435 }
436 keyBs := []byte(fmt.Sprintf("origin::%s", not.Segment.RepoDID))
437 go func() {
438 spmetrics.SwarmPutCalls.WithLabelValues(not.Segment.RepoDID).Inc()
439 defer spmetrics.SwarmPutCalls.WithLabelValues(not.Segment.RepoDID).Dec()
440 err = swarm.w.Put(nil, keyBs, bs)
441 if err != nil {
442 log.Error(ctx, "could not put segment to swarm", "error", err)
443 }
444 }()
445 go func() {
446 spmetrics.SendSegmentCalls.Inc()
447 defer spmetrics.SendSegmentCalls.Dec()
448 err = swarm.Node.SendSegment(not.Segment.RepoDID, not.Data)
449 if err != nil {
450 log.Error(ctx, "could not send segment to swarm", "error", err)
451 }
452 }()
453 return nil
454}