Live video on the AT Protocol
1package iroh_replicator
2
3import (
4 "bytes"
5 "context"
6 "crypto/rand"
7 "encoding/json"
8 "fmt"
9 "reflect"
10 "strings"
11 "sync"
12 "time"
13
14 "github.com/bluesky-social/indigo/util"
15 "golang.org/x/sync/errgroup"
16 "stream.place/streamplace/pkg/bus"
17 "stream.place/streamplace/pkg/config"
18 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace"
19 "stream.place/streamplace/pkg/log"
20 "stream.place/streamplace/pkg/media"
21 "stream.place/streamplace/pkg/model"
22 "stream.place/streamplace/pkg/streamplace"
23)
24
25type IrohSwarm struct {
26 Node *iroh_streamplace.Node
27 DB *iroh_streamplace.Db
28 w *iroh_streamplace.WriteScope
29 mm *media.MediaManager
30 segChan chan *media.NewSegmentNotification
31 NodeID string
32 NodeTicket string
33 bus *bus.Bus
34 originMutex sync.Mutex
35 mod model.Model
36 cli *config.CLI
37 activeSubs map[string]*SwarmOriginInfo
38 handleDataScoped func(pubKey *iroh_streamplace.PublicKey, topic string, data []byte)
39}
40
41// A message saying "hey I ingested node data at this time"
42type SwarmOriginInfo struct {
43 Type string `json:"$type"`
44 NodeID string `json:"node_id"`
45 Time string `json:"time"`
46 Streamer string `json:"streamer"`
47}
48
49type SwarmViewerCount struct {
50 Type string `json:"$type"`
51 Server string `json:"server"`
52 Streamer string `json:"streamer"`
53 Viewers int `json:"viewers"`
54}
55
56func NewSwarm(ctx context.Context, cli *config.CLI, secret []byte, topic []byte, mm *media.MediaManager, bus *bus.Bus, mod model.Model) (*IrohSwarm, error) {
57 ctx = log.WithLogValues(ctx, "func", "StartKV")
58
59 if topic == nil {
60 topic = make([]byte, 32)
61 _, err := rand.Read(topic)
62 if err != nil {
63 return nil, fmt.Errorf("failed to generate random topic: %w", err)
64 }
65 }
66
67 log.Log(ctx, "Starting with tickets", "tickets", cli.Tickets)
68 config := iroh_streamplace.Config{
69 Key: secret,
70 Topic: topic,
71 MaxSendDuration: 1000_000_000, // 1s
72 }
73 log.Log(ctx, "Config created", "config", config)
74
75 swarm := IrohSwarm{
76 mm: mm,
77 activeSubs: make(map[string]*SwarmOriginInfo),
78 bus: bus,
79 mod: mod,
80 cli: cli,
81 }
82
83 // workaround to get context into the HandleData callback
84 swarm.handleDataScoped = func(_ *iroh_streamplace.PublicKey, topic string, data []byte) {
85 if ctx.Err() != nil {
86 return
87 }
88 err := swarm.mm.ValidateMP4(context.Background(), bytes.NewReader(data), false)
89 if err != nil {
90 log.Error(ctx, "could not validate segment", "error", err, "topic", topic, "data", len(data))
91 }
92 }
93
94 node, err := iroh_streamplace.NodeReceiver(config, &swarm)
95 if err != nil {
96 return nil, fmt.Errorf("failed to create NodeSender: %w", err)
97 }
98
99 db := node.Db()
100 w := node.NodeScope()
101
102 swarm.DB = db
103 swarm.w = w
104 swarm.Node = node
105
106 nodeId, err := node.NodeId()
107 if err != nil {
108 return nil, fmt.Errorf("failed to get NodeId: %w", err)
109 }
110 log.Log(ctx, "Node ID:", "node_id", nodeId)
111 swarm.NodeID = nodeId.String()
112
113 ticket, err := node.Ticket()
114 if err != nil {
115 return nil, fmt.Errorf("failed to get Ticket: %w", err)
116 }
117 swarm.NodeTicket = ticket
118
119 return &swarm, nil
120}
121
122func (swarm *IrohSwarm) Start(ctx context.Context, tickets []string) error {
123 if len(tickets) > 0 {
124 err := swarm.Node.JoinPeers(tickets)
125 if err != nil {
126 return fmt.Errorf("failed to join peers: %w", err)
127 }
128 }
129
130 nodeId, err := swarm.Node.NodeId()
131 if err != nil {
132 return fmt.Errorf("failed to get node id: %w", err)
133 }
134 nodeIdStr := nodeId.String()
135 log.Log(ctx, "Node ID:", "node_id", nodeIdStr)
136
137 g, ctx := errgroup.WithContext(ctx)
138 g.Go(func() error {
139 return swarm.startKV(ctx)
140 })
141 g.Go(func() error {
142 return swarm.startSegmentSender(ctx)
143 })
144 g.Go(func() error {
145 <-ctx.Done()
146 return swarm.Node.Shutdown()
147 })
148 g.Go(func() error {
149 return swarm.startBusSubscribe(ctx)
150 })
151 g.Go(func() error {
152 return swarm.startViewerCountSubscribe(ctx)
153 })
154 return g.Wait()
155}
156
157func (swarm *IrohSwarm) startKV(ctx context.Context) error {
158 sub := swarm.DB.Subscribe(iroh_streamplace.NewFilter())
159 for {
160 if ctx.Err() != nil {
161 return ctx.Err()
162 }
163 ev, err := sub.NextRaw()
164 if err != nil {
165 return fmt.Errorf("failed to get next subscription event: %w", err)
166 }
167
168 if ev == nil {
169 log.Warn(ctx, "Got empty event from sub.NextRaw(), pausing for a second and continuing")
170 time.Sleep(1 * time.Second)
171 continue
172 }
173
174 switch item := (*ev).(type) {
175 case iroh_streamplace.SubscribeItemEntry:
176 err := swarm.handleIrohMessage(ctx, item)
177 if err != nil {
178 log.Error(ctx, "could not handle iroh message", "error", err)
179 continue
180 }
181
182 case iroh_streamplace.SubscribeItemCurrentDone:
183 log.Debug(ctx, "SubscribeItemCurrentDone", "currentDone", item)
184 case iroh_streamplace.SubscribeItemExpired:
185 log.Debug(ctx, "SubscribeItemExpired", "expired", item)
186 case iroh_streamplace.SubscribeItemOther:
187 log.Debug(ctx, "SubscribeItemOther", "other", item)
188 }
189 }
190}
191
192func (swarm *IrohSwarm) handleIrohMessage(ctx context.Context, item iroh_streamplace.SubscribeItemEntry) error {
193 keyStr := string(item.Key)
194 valueStr := string(item.Value)
195 log.Warn(ctx, "SubscribeItemEntry", "key", keyStr, "value", valueStr)
196 if len(valueStr) > 0 && valueStr[0] != '{' {
197 // not JSON, it's one of the rust messages
198 log.Debug(ctx, "not JSON", "key", keyStr, "value", valueStr)
199 return nil
200 }
201 rawMessage, err := decodeIrohMessage(item.Key, item.Value)
202 if err != nil {
203 return fmt.Errorf("could not decode iroh message: %w", err)
204 }
205 switch message := rawMessage.(type) {
206 case SwarmOriginInfo:
207 err = swarm.checkOrigins(ctx, message.Streamer, message.NodeID)
208 if err != nil {
209 return fmt.Errorf("could not check origins: %w", err)
210 }
211 case SwarmViewerCount:
212 log.Log(ctx, "got viewer count", "viewerCount", message)
213 if message.Server == swarm.NodeID {
214 // no infinite loops allowed
215 return nil
216 }
217 swarm.bus.SetViewerCount(message.Streamer, message.Server, message.Viewers)
218 log.Log(ctx, "set viewer count", "viewerCount", message)
219 return nil
220 default:
221 return fmt.Errorf("unknown message type: %s", reflect.TypeOf(rawMessage))
222 }
223 return nil
224}
225
226func decodeIrohMessage(key, value []byte) (any, error) {
227 keyStr := string(key)
228 if strings.HasPrefix(keyStr, "origin::") {
229 var originInfo SwarmOriginInfo
230 err := json.Unmarshal(value, &originInfo)
231 if err != nil {
232 return nil, fmt.Errorf("could not unmarshal origin info: %w", err)
233 }
234 return originInfo, nil
235 }
236 if strings.HasPrefix(keyStr, "viewers::") {
237 var viewerCount SwarmViewerCount
238 err := json.Unmarshal(value, &viewerCount)
239 if err != nil {
240 return nil, fmt.Errorf("could not unmarshal viewer count: %w", err)
241 }
242 return viewerCount, nil
243 }
244 return nil, fmt.Errorf("unknown key: %s", keyStr)
245}
246
247// subscribe to all streams
248func (swarm *IrohSwarm) startBusSubscribe(ctx context.Context) error {
249 // start subscription first so we're buffering new origins
250 busCh := swarm.bus.Subscribe("")
251 originViews, err := swarm.mod.GetRecentBroadcastOrigins(ctx)
252 if err != nil {
253 return fmt.Errorf("failed to get recent broadcast origins: %w", err)
254 }
255 for _, view := range originViews {
256 err = swarm.handleOriginMessage(ctx, view)
257 if err != nil {
258 log.Error(ctx, "could not check origin", "error", err)
259 }
260 }
261 log.Log(ctx, "Resumed recent broadcast origins", "count", len(originViews))
262 for {
263 select {
264 case <-ctx.Done():
265 return ctx.Err()
266 case msg := <-busCh:
267 if view, ok := msg.(*streamplace.BroadcastDefs_BroadcastOriginView); ok {
268 log.Debug(ctx, "got broadcast origin view", "view", view)
269 err = swarm.handleOriginMessage(ctx, view)
270 if err != nil {
271 log.Error(ctx, "could not handle origin message", "error", err)
272 }
273 }
274 }
275 }
276}
277
278func (swarm *IrohSwarm) startViewerCountSubscribe(ctx context.Context) error {
279 ch := swarm.bus.SubscribeToViewerCount()
280 for {
281 select {
282 case <-ctx.Done():
283 return ctx.Err()
284 case msg := <-ch:
285 log.Log(ctx, "got viewer count update", "viewerCount", msg)
286 if msg.Origin != "local" {
287 continue
288 }
289 swarmMsg := SwarmViewerCount{
290 Type: "place.stream.swarm.viewerCount",
291 Server: swarm.NodeID,
292 Streamer: msg.Streamer,
293 Viewers: msg.Count,
294 }
295 bs, err := json.Marshal(swarmMsg)
296 if err != nil {
297 log.Error(ctx, "could not marshal viewer count", "error", err)
298 continue
299 }
300 key := fmt.Sprintf("viewers::%s::%s", swarm.NodeID, msg.Streamer)
301 err = swarm.w.Put(nil, []byte(key), bs)
302 if err != nil {
303 log.Error(ctx, "could not put viewer count to swarm", "error", err)
304 continue
305 }
306 log.Log(ctx, "put viewer count to swarm", "viewerCount", msg)
307
308 }
309 }
310}
311
312func (swarm *IrohSwarm) handleOriginMessage(ctx context.Context, view *streamplace.BroadcastDefs_BroadcastOriginView) error {
313 origin, ok := view.Record.Val.(*streamplace.BroadcastOrigin)
314 if !ok {
315 return fmt.Errorf("record is not a BroadcastOrigin")
316 }
317 if view.Author.Did != origin.Streamer {
318 // currently, only streamers are allowed to advertise origins
319 return nil
320 }
321 if origin.IrohTicket == nil {
322 return fmt.Errorf("origin has no iroh ticket")
323 }
324 pubKey, err := iroh_streamplace.NodeIdFromTicket(*origin.IrohTicket)
325 if err != nil {
326 return fmt.Errorf("could not get node id from ticket: %w", err)
327 }
328 err = swarm.Node.AddTickets([]string{*origin.IrohTicket})
329 if err != nil {
330 return fmt.Errorf("could not add tickets: %w", err)
331 }
332 pubKeyStr := pubKey.String()
333 err = swarm.checkOrigins(ctx, origin.Streamer, pubKeyStr)
334 if err != nil {
335 return fmt.Errorf("could not check origin: %w", err)
336 }
337 return nil
338}
339
340func (swarm *IrohSwarm) checkOrigins(ctx context.Context, streamer string, nodeID string) error {
341 ctx = log.WithLogValues(ctx, "streamer", streamer, "nodeID", nodeID, "func", "checkOrigins")
342 err := swarm.cli.StreamIsAllowed(streamer)
343 if err != nil {
344 return fmt.Errorf("user %s is not allowlisted on this node: %w", streamer, err)
345 }
346 swarm.originMutex.Lock()
347 defer swarm.originMutex.Unlock()
348 oldSub, ok := swarm.activeSubs[streamer]
349 if ok {
350 if oldSub.NodeID == nodeID {
351 log.Debug(ctx, "node hasn't changed", "streamer", streamer)
352 // mmyep. same node still has the stream. great news.
353 return nil
354 }
355 log.Log(ctx, "Stream origin changed, swapping to new node", "old_node", oldSub.NodeID, "new_node", nodeID, "streamer", streamer)
356 pubKey, err := iroh_streamplace.PublicKeyFromString(oldSub.NodeID)
357 if err != nil {
358 log.Error(ctx, "could not create public key", "error", err)
359 return err
360 }
361 // different node has the stream. we need to unsubscribe from the old node.
362 err = swarm.Node.Unsubscribe(streamer, pubKey)
363 if err != nil {
364 log.Error(ctx, "could not unsubscribe from key", "error", err)
365 return err
366 }
367 delete(swarm.activeSubs, streamer)
368 }
369 if nodeID == swarm.NodeID {
370 log.Debug(ctx, "I already have this stream", "streamer", streamer)
371 // oh, i have this stream. cool. do nothing.
372 return nil
373 }
374 log.Log(ctx, "Subscribing to stream", "new_node", nodeID, "streamer", streamer)
375 pubKey, err := iroh_streamplace.PublicKeyFromString(nodeID)
376 if err != nil {
377 log.Error(ctx, "could not create public key", "error", err)
378 return err
379 }
380 err = swarm.Node.Subscribe(streamer, pubKey)
381 if err != nil {
382 log.Error(ctx, "could not subscribe to key", "error", err)
383 return err
384 }
385 swarm.activeSubs[streamer] = &SwarmOriginInfo{
386 Type: "place.stream.swarm.originInfo",
387 NodeID: nodeID,
388 Time: time.Now().Format(util.ISO8601),
389 Streamer: streamer,
390 }
391 return nil
392}
393
394func (swarm *IrohSwarm) startSegmentSender(ctx context.Context) error {
395 ch := swarm.mm.NewSegment()
396 for {
397 select {
398 case <-ctx.Done():
399 return ctx.Err()
400 case not := <-ch:
401 err := swarm.SendSegment(ctx, not)
402 if err != nil {
403 log.Error(ctx, "could not send segment to swarm", "error", err)
404 }
405 continue
406 }
407 }
408}
409
410func (swarm *IrohSwarm) HandleData(pubKey *iroh_streamplace.PublicKey, topic string, data []byte) {
411 swarm.handleDataScoped(pubKey, topic, data)
412}
413
414func (swarm *IrohSwarm) SendSegment(ctx context.Context, not *media.NewSegmentNotification) error {
415 if !not.Local {
416 return nil
417 }
418 originInfo := SwarmOriginInfo{
419 Type: "place.stream.swarm.originInfo",
420 NodeID: swarm.NodeID,
421 Time: not.Segment.StartTime.Format(util.ISO8601),
422 Streamer: not.Segment.RepoDID,
423 }
424 bs, err := json.Marshal(originInfo)
425 if err != nil {
426 log.Error(ctx, "could not marshal origin info", "error", err)
427 return err
428 }
429 keyBs := []byte(fmt.Sprintf("origin::%s", not.Segment.RepoDID))
430 go func() {
431 err = swarm.w.Put(nil, keyBs, bs)
432 if err != nil {
433 log.Error(ctx, "could not put segment to swarm", "error", err)
434 }
435 }()
436 go func() {
437 err = swarm.Node.SendSegment(not.Segment.RepoDID, not.Data)
438 if err != nil {
439 log.Error(ctx, "could not send segment to swarm", "error", err)
440 }
441 }()
442 return nil
443}