Live video on the AT Protocol
at eli/rtmp-push 454 lines 13 kB view raw
1package iroh_replicator 2 3import ( 4 "bytes" 5 "context" 6 "crypto/rand" 7 "encoding/json" 8 "fmt" 9 "reflect" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/indigo/util" 15 "golang.org/x/sync/errgroup" 16 "stream.place/streamplace/pkg/bus" 17 "stream.place/streamplace/pkg/config" 18 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 19 "stream.place/streamplace/pkg/log" 20 "stream.place/streamplace/pkg/media" 21 "stream.place/streamplace/pkg/model" 22 "stream.place/streamplace/pkg/spmetrics" 23 "stream.place/streamplace/pkg/streamplace" 24) 25 26type IrohSwarm struct { 27 Node *iroh_streamplace.Node 28 DB *iroh_streamplace.Db 29 w *iroh_streamplace.WriteScope 30 mm *media.MediaManager 31 segChan chan *media.NewSegmentNotification 32 NodeID string 33 NodeTicket string 34 bus *bus.Bus 35 originMutex sync.Mutex 36 mod model.Model 37 cli *config.CLI 38 activeSubs map[string]*SwarmOriginInfo 39 handleDataScoped func(pubKey *iroh_streamplace.PublicKey, topic string, data []byte) 40} 41 42// A message saying "hey I ingested node data at this time" 43type SwarmOriginInfo struct { 44 Type string `json:"$type"` 45 NodeID string `json:"node_id"` 46 Time string `json:"time"` 47 Streamer string `json:"streamer"` 48} 49 50type SwarmViewerCount struct { 51 Type string `json:"$type"` 52 Server string `json:"server"` 53 Streamer string `json:"streamer"` 54 Viewers int `json:"viewers"` 55} 56 57func NewSwarm(ctx context.Context, cli *config.CLI, secret []byte, topic []byte, mm *media.MediaManager, bus *bus.Bus, mod model.Model) (*IrohSwarm, error) { 58 ctx = log.WithLogValues(ctx, "func", "StartKV") 59 60 if topic == nil { 61 topic = make([]byte, 32) 62 _, err := rand.Read(topic) 63 if err != nil { 64 return nil, fmt.Errorf("failed to generate random topic: %w", err) 65 } 66 } 67 68 log.Log(ctx, "Starting with tickets", "tickets", cli.Tickets) 69 config := iroh_streamplace.Config{ 70 Key: secret, 71 Topic: topic, 72 MaxSendDuration: 1000_000_000, // 1s 73 DisableRelay: cli.DisableIrohRelay, 74 } 75 log.Log(ctx, "Config created", "config", config) 76 77 swarm := IrohSwarm{ 78 mm: mm, 79 activeSubs: make(map[string]*SwarmOriginInfo), 80 bus: bus, 81 mod: mod, 82 cli: cli, 83 } 84 85 // workaround to get context into the HandleData callback 86 swarm.handleDataScoped = func(_ *iroh_streamplace.PublicKey, topic string, data []byte) { 87 if ctx.Err() != nil { 88 return 89 } 90 err := swarm.mm.ValidateMP4(context.Background(), bytes.NewReader(data), false) 91 if err != nil { 92 log.Error(ctx, "could not validate segment", "error", err, "topic", topic, "data", len(data)) 93 } 94 } 95 96 node, err := iroh_streamplace.NodeReceiver(config, &swarm) 97 if err != nil { 98 return nil, fmt.Errorf("failed to create NodeSender: %w", err) 99 } 100 101 db := node.Db() 102 w := node.NodeScope() 103 104 swarm.DB = db 105 swarm.w = w 106 swarm.Node = node 107 108 nodeId, err := node.NodeId() 109 if err != nil { 110 return nil, fmt.Errorf("failed to get NodeId: %w", err) 111 } 112 log.Log(ctx, "Node ID:", "node_id", nodeId) 113 swarm.NodeID = nodeId.String() 114 115 ticket, err := node.Ticket() 116 if err != nil { 117 return nil, fmt.Errorf("failed to get Ticket: %w", err) 118 } 119 swarm.NodeTicket = ticket 120 121 return &swarm, nil 122} 123 124func (swarm *IrohSwarm) BuildOriginRecord(origin *streamplace.BroadcastOrigin) error { 125 origin.IrohTicket = &swarm.NodeTicket 126 return nil 127} 128 129func (swarm *IrohSwarm) Start(ctx context.Context, cli *config.CLI) error { 130 if len(cli.Tickets) > 0 { 131 err := swarm.Node.JoinPeers(cli.Tickets) 132 if err != nil { 133 return fmt.Errorf("failed to join peers: %w", err) 134 } 135 } 136 nodeId, err := swarm.Node.NodeId() 137 if err != nil { 138 return fmt.Errorf("failed to get node id: %w", err) 139 } 140 nodeIdStr := nodeId.String() 141 log.Log(ctx, "Node ID:", "node_id", nodeIdStr) 142 143 g, ctx := errgroup.WithContext(ctx) 144 g.Go(func() error { 145 return swarm.startKV(ctx) 146 }) 147 g.Go(func() error { 148 return swarm.startSegmentSender(ctx) 149 }) 150 g.Go(func() error { 151 <-ctx.Done() 152 return swarm.Node.Shutdown() 153 }) 154 g.Go(func() error { 155 return swarm.startBusSubscribe(ctx) 156 }) 157 g.Go(func() error { 158 return swarm.startViewerCountSubscribe(ctx) 159 }) 160 return g.Wait() 161} 162 163func (swarm *IrohSwarm) startKV(ctx context.Context) error { 164 sub := swarm.DB.Subscribe(iroh_streamplace.NewFilter()) 165 for { 166 if ctx.Err() != nil { 167 return ctx.Err() 168 } 169 ev, err := sub.NextRaw() 170 if err != nil { 171 return fmt.Errorf("failed to get next subscription event: %w", err) 172 } 173 174 if ev == nil { 175 log.Warn(ctx, "Got empty event from sub.NextRaw(), pausing for a second and continuing") 176 time.Sleep(1 * time.Second) 177 continue 178 } 179 180 switch item := (*ev).(type) { 181 case iroh_streamplace.SubscribeItemEntry: 182 err := swarm.handleIrohMessage(ctx, item) 183 if err != nil { 184 log.Error(ctx, "could not handle iroh message", "error", err) 185 continue 186 } 187 188 case iroh_streamplace.SubscribeItemCurrentDone: 189 log.Debug(ctx, "SubscribeItemCurrentDone", "currentDone", item) 190 case iroh_streamplace.SubscribeItemExpired: 191 log.Debug(ctx, "SubscribeItemExpired", "expired", item) 192 case iroh_streamplace.SubscribeItemOther: 193 log.Debug(ctx, "SubscribeItemOther", "other", item) 194 } 195 } 196} 197 198func (swarm *IrohSwarm) handleIrohMessage(ctx context.Context, item iroh_streamplace.SubscribeItemEntry) error { 199 keyStr := string(item.Key) 200 valueStr := string(item.Value) 201 log.Debug(ctx, "SubscribeItemEntry", "key", keyStr, "value", valueStr) 202 if len(valueStr) > 0 && valueStr[0] != '{' { 203 // not JSON, it's one of the rust messages 204 log.Debug(ctx, "not JSON", "key", keyStr, "value", valueStr) 205 return nil 206 } 207 rawMessage, err := decodeIrohMessage(item.Key, item.Value) 208 if err != nil { 209 return fmt.Errorf("could not decode iroh message: %w", err) 210 } 211 switch message := rawMessage.(type) { 212 case SwarmOriginInfo: 213 err = swarm.checkOrigins(ctx, message.Streamer, message.NodeID) 214 if err != nil { 215 return fmt.Errorf("could not check origins: %w", err) 216 } 217 case SwarmViewerCount: 218 log.Log(ctx, "got viewer count", "viewerCount", message) 219 if message.Server == swarm.NodeID { 220 // no infinite loops allowed 221 return nil 222 } 223 swarm.bus.SetViewerCount(message.Streamer, message.Server, message.Viewers) 224 log.Log(ctx, "set viewer count", "viewerCount", message) 225 return nil 226 default: 227 return fmt.Errorf("unknown message type: %s", reflect.TypeOf(rawMessage)) 228 } 229 return nil 230} 231 232func decodeIrohMessage(key, value []byte) (any, error) { 233 keyStr := string(key) 234 if strings.HasPrefix(keyStr, "origin::") { 235 var originInfo SwarmOriginInfo 236 err := json.Unmarshal(value, &originInfo) 237 if err != nil { 238 return nil, fmt.Errorf("could not unmarshal origin info: %w", err) 239 } 240 return originInfo, nil 241 } 242 if strings.HasPrefix(keyStr, "viewers::") { 243 var viewerCount SwarmViewerCount 244 err := json.Unmarshal(value, &viewerCount) 245 if err != nil { 246 return nil, fmt.Errorf("could not unmarshal viewer count: %w", err) 247 } 248 return viewerCount, nil 249 } 250 return nil, fmt.Errorf("unknown key: %s", keyStr) 251} 252 253// subscribe to all streams 254func (swarm *IrohSwarm) startBusSubscribe(ctx context.Context) error { 255 // start subscription first so we're buffering new origins 256 busCh := swarm.bus.Subscribe("") 257 originViews, err := swarm.mod.GetRecentBroadcastOrigins(ctx) 258 if err != nil { 259 return fmt.Errorf("failed to get recent broadcast origins: %w", err) 260 } 261 for _, view := range originViews { 262 err = swarm.handleOriginMessage(ctx, view) 263 if err != nil { 264 log.Error(ctx, "could not check origin", "error", err) 265 } 266 } 267 log.Log(ctx, "Resumed recent broadcast origins", "count", len(originViews)) 268 for { 269 select { 270 case <-ctx.Done(): 271 return ctx.Err() 272 case msg := <-busCh: 273 if view, ok := msg.(*streamplace.BroadcastDefs_BroadcastOriginView); ok { 274 log.Debug(ctx, "got broadcast origin view", "view", view) 275 err = swarm.handleOriginMessage(ctx, view) 276 if err != nil { 277 log.Error(ctx, "could not handle origin message", "error", err) 278 } 279 } 280 } 281 } 282} 283 284func (swarm *IrohSwarm) startViewerCountSubscribe(ctx context.Context) error { 285 ch := swarm.bus.SubscribeToViewerCount() 286 for { 287 select { 288 case <-ctx.Done(): 289 return ctx.Err() 290 case msg := <-ch: 291 log.Log(ctx, "got viewer count update", "viewerCount", msg) 292 if msg.Origin != "local" { 293 continue 294 } 295 swarmMsg := SwarmViewerCount{ 296 Type: "place.stream.swarm.viewerCount", 297 Server: swarm.NodeID, 298 Streamer: msg.Streamer, 299 Viewers: msg.Count, 300 } 301 bs, err := json.Marshal(swarmMsg) 302 if err != nil { 303 log.Error(ctx, "could not marshal viewer count", "error", err) 304 continue 305 } 306 key := fmt.Sprintf("viewers::%s::%s", swarm.NodeID, msg.Streamer) 307 err = swarm.w.Put(nil, []byte(key), bs) 308 if err != nil { 309 log.Error(ctx, "could not put viewer count to swarm", "error", err) 310 continue 311 } 312 log.Log(ctx, "put viewer count to swarm", "viewerCount", msg) 313 314 } 315 } 316} 317 318func (swarm *IrohSwarm) handleOriginMessage(ctx context.Context, view *streamplace.BroadcastDefs_BroadcastOriginView) error { 319 origin, ok := view.Record.Val.(*streamplace.BroadcastOrigin) 320 if !ok { 321 return fmt.Errorf("record is not a BroadcastOrigin") 322 } 323 if view.Author.Did != origin.Streamer { 324 // currently, only streamers are allowed to advertise origins 325 return nil 326 } 327 if origin.IrohTicket == nil { 328 return fmt.Errorf("origin has no iroh ticket") 329 } 330 pubKey, err := iroh_streamplace.NodeIdFromTicket(*origin.IrohTicket) 331 if err != nil { 332 return fmt.Errorf("could not get node id from ticket: %w", err) 333 } 334 err = swarm.Node.AddTickets([]string{*origin.IrohTicket}) 335 if err != nil { 336 return fmt.Errorf("could not add tickets: %w", err) 337 } 338 pubKeyStr := pubKey.String() 339 err = swarm.checkOrigins(ctx, origin.Streamer, pubKeyStr) 340 if err != nil { 341 return fmt.Errorf("could not check origin: %w", err) 342 } 343 return nil 344} 345 346func (swarm *IrohSwarm) checkOrigins(ctx context.Context, streamer string, nodeID string) error { 347 ctx = log.WithLogValues(ctx, "streamer", streamer, "nodeID", nodeID, "func", "checkOrigins") 348 err := swarm.cli.StreamIsAllowed(streamer) 349 if err != nil { 350 return fmt.Errorf("user %s is not allowlisted on this node: %w", streamer, err) 351 } 352 swarm.originMutex.Lock() 353 defer swarm.originMutex.Unlock() 354 oldSub, ok := swarm.activeSubs[streamer] 355 if ok { 356 if oldSub.NodeID == nodeID { 357 log.Debug(ctx, "node hasn't changed", "streamer", streamer) 358 // mmyep. same node still has the stream. great news. 359 return nil 360 } 361 log.Log(ctx, "Stream origin changed, swapping to new node", "old_node", oldSub.NodeID, "new_node", nodeID, "streamer", streamer) 362 pubKey, err := iroh_streamplace.PublicKeyFromString(oldSub.NodeID) 363 if err != nil { 364 log.Error(ctx, "could not create public key", "error", err) 365 return err 366 } 367 // different node has the stream. we need to unsubscribe from the old node. 368 err = swarm.Node.Unsubscribe(streamer, pubKey) 369 if err != nil { 370 log.Error(ctx, "could not unsubscribe from key", "error", err) 371 return err 372 } 373 delete(swarm.activeSubs, streamer) 374 } 375 if nodeID == swarm.NodeID { 376 log.Debug(ctx, "I already have this stream", "streamer", streamer) 377 // oh, i have this stream. cool. do nothing. 378 return nil 379 } 380 log.Log(ctx, "Subscribing to stream start", "new_node", nodeID, "streamer", streamer) 381 pubKey, err := iroh_streamplace.PublicKeyFromString(nodeID) 382 if err != nil { 383 log.Error(ctx, "could not create public key", "error", err) 384 return err 385 } 386 err = swarm.Node.Subscribe(streamer, pubKey) 387 log.Log(ctx, "Subscribing to stream done", "new_node", nodeID, "streamer", streamer, "pubKey", pubKey, "error", err) 388 if err != nil { 389 log.Error(ctx, "could not subscribe to key", "error", err) 390 return err 391 } 392 swarm.activeSubs[streamer] = &SwarmOriginInfo{ 393 Type: "place.stream.swarm.originInfo", 394 NodeID: nodeID, 395 Time: time.Now().Format(util.ISO8601), 396 Streamer: streamer, 397 } 398 return nil 399} 400 401func (swarm *IrohSwarm) startSegmentSender(ctx context.Context) error { 402 ch := swarm.mm.NewSegment() 403 for { 404 select { 405 case <-ctx.Done(): 406 return ctx.Err() 407 case not := <-ch: 408 err := swarm.SendSegment(ctx, not) 409 if err != nil { 410 log.Error(ctx, "could not send segment to swarm", "error", err) 411 } 412 continue 413 } 414 } 415} 416 417func (swarm *IrohSwarm) HandleData(pubKey *iroh_streamplace.PublicKey, topic string, data []byte) { 418 swarm.handleDataScoped(pubKey, topic, data) 419} 420 421func (swarm *IrohSwarm) SendSegment(ctx context.Context, not *media.NewSegmentNotification) error { 422 if !not.Local { 423 return nil 424 } 425 originInfo := SwarmOriginInfo{ 426 Type: "place.stream.swarm.originInfo", 427 NodeID: swarm.NodeID, 428 Time: not.Segment.StartTime.Format(util.ISO8601), 429 Streamer: not.Segment.RepoDID, 430 } 431 bs, err := json.Marshal(originInfo) 432 if err != nil { 433 log.Error(ctx, "could not marshal origin info", "error", err) 434 return err 435 } 436 keyBs := []byte(fmt.Sprintf("origin::%s", not.Segment.RepoDID)) 437 go func() { 438 spmetrics.SwarmPutCalls.WithLabelValues(not.Segment.RepoDID).Inc() 439 defer spmetrics.SwarmPutCalls.WithLabelValues(not.Segment.RepoDID).Dec() 440 err = swarm.w.Put(nil, keyBs, bs) 441 if err != nil { 442 log.Error(ctx, "could not put segment to swarm", "error", err) 443 } 444 }() 445 go func() { 446 spmetrics.SendSegmentCalls.Inc() 447 defer spmetrics.SendSegmentCalls.Dec() 448 err = swarm.Node.SendSegment(not.Segment.RepoDID, not.Data) 449 if err != nil { 450 log.Error(ctx, "could not send segment to swarm", "error", err) 451 } 452 }() 453 return nil 454}