Live video on the AT Protocol
at eli/docker-deployment-docs 443 lines 13 kB view raw
1package iroh_replicator 2 3import ( 4 "bytes" 5 "context" 6 "crypto/rand" 7 "encoding/json" 8 "fmt" 9 "reflect" 10 "strings" 11 "sync" 12 "time" 13 14 "github.com/bluesky-social/indigo/util" 15 "golang.org/x/sync/errgroup" 16 "stream.place/streamplace/pkg/bus" 17 "stream.place/streamplace/pkg/config" 18 "stream.place/streamplace/pkg/iroh/generated/iroh_streamplace" 19 "stream.place/streamplace/pkg/log" 20 "stream.place/streamplace/pkg/media" 21 "stream.place/streamplace/pkg/model" 22 "stream.place/streamplace/pkg/streamplace" 23) 24 25type IrohSwarm struct { 26 Node *iroh_streamplace.Node 27 DB *iroh_streamplace.Db 28 w *iroh_streamplace.WriteScope 29 mm *media.MediaManager 30 segChan chan *media.NewSegmentNotification 31 NodeID string 32 NodeTicket string 33 bus *bus.Bus 34 originMutex sync.Mutex 35 mod model.Model 36 cli *config.CLI 37 activeSubs map[string]*SwarmOriginInfo 38 handleDataScoped func(pubKey *iroh_streamplace.PublicKey, topic string, data []byte) 39} 40 41// A message saying "hey I ingested node data at this time" 42type SwarmOriginInfo struct { 43 Type string `json:"$type"` 44 NodeID string `json:"node_id"` 45 Time string `json:"time"` 46 Streamer string `json:"streamer"` 47} 48 49type SwarmViewerCount struct { 50 Type string `json:"$type"` 51 Server string `json:"server"` 52 Streamer string `json:"streamer"` 53 Viewers int `json:"viewers"` 54} 55 56func NewSwarm(ctx context.Context, cli *config.CLI, secret []byte, topic []byte, mm *media.MediaManager, bus *bus.Bus, mod model.Model) (*IrohSwarm, error) { 57 ctx = log.WithLogValues(ctx, "func", "StartKV") 58 59 if topic == nil { 60 topic = make([]byte, 32) 61 _, err := rand.Read(topic) 62 if err != nil { 63 return nil, fmt.Errorf("failed to generate random topic: %w", err) 64 } 65 } 66 67 log.Log(ctx, "Starting with tickets", "tickets", cli.Tickets) 68 config := iroh_streamplace.Config{ 69 Key: secret, 70 Topic: topic, 71 MaxSendDuration: 1000_000_000, // 1s 72 } 73 log.Log(ctx, "Config created", "config", config) 74 75 swarm := IrohSwarm{ 76 mm: mm, 77 activeSubs: make(map[string]*SwarmOriginInfo), 78 bus: bus, 79 mod: mod, 80 cli: cli, 81 } 82 83 // workaround to get context into the HandleData callback 84 swarm.handleDataScoped = func(_ *iroh_streamplace.PublicKey, topic string, data []byte) { 85 if ctx.Err() != nil { 86 return 87 } 88 err := swarm.mm.ValidateMP4(context.Background(), bytes.NewReader(data), false) 89 if err != nil { 90 log.Error(ctx, "could not validate segment", "error", err, "topic", topic, "data", len(data)) 91 } 92 } 93 94 node, err := iroh_streamplace.NodeReceiver(config, &swarm) 95 if err != nil { 96 return nil, fmt.Errorf("failed to create NodeSender: %w", err) 97 } 98 99 db := node.Db() 100 w := node.NodeScope() 101 102 swarm.DB = db 103 swarm.w = w 104 swarm.Node = node 105 106 nodeId, err := node.NodeId() 107 if err != nil { 108 return nil, fmt.Errorf("failed to get NodeId: %w", err) 109 } 110 log.Log(ctx, "Node ID:", "node_id", nodeId) 111 swarm.NodeID = nodeId.String() 112 113 ticket, err := node.Ticket() 114 if err != nil { 115 return nil, fmt.Errorf("failed to get Ticket: %w", err) 116 } 117 swarm.NodeTicket = ticket 118 119 return &swarm, nil 120} 121 122func (swarm *IrohSwarm) Start(ctx context.Context, tickets []string) error { 123 if len(tickets) > 0 { 124 err := swarm.Node.JoinPeers(tickets) 125 if err != nil { 126 return fmt.Errorf("failed to join peers: %w", err) 127 } 128 } 129 130 nodeId, err := swarm.Node.NodeId() 131 if err != nil { 132 return fmt.Errorf("failed to get node id: %w", err) 133 } 134 nodeIdStr := nodeId.String() 135 log.Log(ctx, "Node ID:", "node_id", nodeIdStr) 136 137 g, ctx := errgroup.WithContext(ctx) 138 g.Go(func() error { 139 return swarm.startKV(ctx) 140 }) 141 g.Go(func() error { 142 return swarm.startSegmentSender(ctx) 143 }) 144 g.Go(func() error { 145 <-ctx.Done() 146 return swarm.Node.Shutdown() 147 }) 148 g.Go(func() error { 149 return swarm.startBusSubscribe(ctx) 150 }) 151 g.Go(func() error { 152 return swarm.startViewerCountSubscribe(ctx) 153 }) 154 return g.Wait() 155} 156 157func (swarm *IrohSwarm) startKV(ctx context.Context) error { 158 sub := swarm.DB.Subscribe(iroh_streamplace.NewFilter()) 159 for { 160 if ctx.Err() != nil { 161 return ctx.Err() 162 } 163 ev, err := sub.NextRaw() 164 if err != nil { 165 return fmt.Errorf("failed to get next subscription event: %w", err) 166 } 167 168 if ev == nil { 169 log.Warn(ctx, "Got empty event from sub.NextRaw(), pausing for a second and continuing") 170 time.Sleep(1 * time.Second) 171 continue 172 } 173 174 switch item := (*ev).(type) { 175 case iroh_streamplace.SubscribeItemEntry: 176 err := swarm.handleIrohMessage(ctx, item) 177 if err != nil { 178 log.Error(ctx, "could not handle iroh message", "error", err) 179 continue 180 } 181 182 case iroh_streamplace.SubscribeItemCurrentDone: 183 log.Debug(ctx, "SubscribeItemCurrentDone", "currentDone", item) 184 case iroh_streamplace.SubscribeItemExpired: 185 log.Debug(ctx, "SubscribeItemExpired", "expired", item) 186 case iroh_streamplace.SubscribeItemOther: 187 log.Debug(ctx, "SubscribeItemOther", "other", item) 188 } 189 } 190} 191 192func (swarm *IrohSwarm) handleIrohMessage(ctx context.Context, item iroh_streamplace.SubscribeItemEntry) error { 193 keyStr := string(item.Key) 194 valueStr := string(item.Value) 195 log.Warn(ctx, "SubscribeItemEntry", "key", keyStr, "value", valueStr) 196 if len(valueStr) > 0 && valueStr[0] != '{' { 197 // not JSON, it's one of the rust messages 198 log.Debug(ctx, "not JSON", "key", keyStr, "value", valueStr) 199 return nil 200 } 201 rawMessage, err := decodeIrohMessage(item.Key, item.Value) 202 if err != nil { 203 return fmt.Errorf("could not decode iroh message: %w", err) 204 } 205 switch message := rawMessage.(type) { 206 case SwarmOriginInfo: 207 err = swarm.checkOrigins(ctx, message.Streamer, message.NodeID) 208 if err != nil { 209 return fmt.Errorf("could not check origins: %w", err) 210 } 211 case SwarmViewerCount: 212 log.Log(ctx, "got viewer count", "viewerCount", message) 213 if message.Server == swarm.NodeID { 214 // no infinite loops allowed 215 return nil 216 } 217 swarm.bus.SetViewerCount(message.Streamer, message.Server, message.Viewers) 218 log.Log(ctx, "set viewer count", "viewerCount", message) 219 return nil 220 default: 221 return fmt.Errorf("unknown message type: %s", reflect.TypeOf(rawMessage)) 222 } 223 return nil 224} 225 226func decodeIrohMessage(key, value []byte) (any, error) { 227 keyStr := string(key) 228 if strings.HasPrefix(keyStr, "origin::") { 229 var originInfo SwarmOriginInfo 230 err := json.Unmarshal(value, &originInfo) 231 if err != nil { 232 return nil, fmt.Errorf("could not unmarshal origin info: %w", err) 233 } 234 return originInfo, nil 235 } 236 if strings.HasPrefix(keyStr, "viewers::") { 237 var viewerCount SwarmViewerCount 238 err := json.Unmarshal(value, &viewerCount) 239 if err != nil { 240 return nil, fmt.Errorf("could not unmarshal viewer count: %w", err) 241 } 242 return viewerCount, nil 243 } 244 return nil, fmt.Errorf("unknown key: %s", keyStr) 245} 246 247// subscribe to all streams 248func (swarm *IrohSwarm) startBusSubscribe(ctx context.Context) error { 249 // start subscription first so we're buffering new origins 250 busCh := swarm.bus.Subscribe("") 251 originViews, err := swarm.mod.GetRecentBroadcastOrigins(ctx) 252 if err != nil { 253 return fmt.Errorf("failed to get recent broadcast origins: %w", err) 254 } 255 for _, view := range originViews { 256 err = swarm.handleOriginMessage(ctx, view) 257 if err != nil { 258 log.Error(ctx, "could not check origin", "error", err) 259 } 260 } 261 log.Log(ctx, "Resumed recent broadcast origins", "count", len(originViews)) 262 for { 263 select { 264 case <-ctx.Done(): 265 return ctx.Err() 266 case msg := <-busCh: 267 if view, ok := msg.(*streamplace.BroadcastDefs_BroadcastOriginView); ok { 268 log.Debug(ctx, "got broadcast origin view", "view", view) 269 err = swarm.handleOriginMessage(ctx, view) 270 if err != nil { 271 log.Error(ctx, "could not handle origin message", "error", err) 272 } 273 } 274 } 275 } 276} 277 278func (swarm *IrohSwarm) startViewerCountSubscribe(ctx context.Context) error { 279 ch := swarm.bus.SubscribeToViewerCount() 280 for { 281 select { 282 case <-ctx.Done(): 283 return ctx.Err() 284 case msg := <-ch: 285 log.Log(ctx, "got viewer count update", "viewerCount", msg) 286 if msg.Origin != "local" { 287 continue 288 } 289 swarmMsg := SwarmViewerCount{ 290 Type: "place.stream.swarm.viewerCount", 291 Server: swarm.NodeID, 292 Streamer: msg.Streamer, 293 Viewers: msg.Count, 294 } 295 bs, err := json.Marshal(swarmMsg) 296 if err != nil { 297 log.Error(ctx, "could not marshal viewer count", "error", err) 298 continue 299 } 300 key := fmt.Sprintf("viewers::%s::%s", swarm.NodeID, msg.Streamer) 301 err = swarm.w.Put(nil, []byte(key), bs) 302 if err != nil { 303 log.Error(ctx, "could not put viewer count to swarm", "error", err) 304 continue 305 } 306 log.Log(ctx, "put viewer count to swarm", "viewerCount", msg) 307 308 } 309 } 310} 311 312func (swarm *IrohSwarm) handleOriginMessage(ctx context.Context, view *streamplace.BroadcastDefs_BroadcastOriginView) error { 313 origin, ok := view.Record.Val.(*streamplace.BroadcastOrigin) 314 if !ok { 315 return fmt.Errorf("record is not a BroadcastOrigin") 316 } 317 if view.Author.Did != origin.Streamer { 318 // currently, only streamers are allowed to advertise origins 319 return nil 320 } 321 if origin.IrohTicket == nil { 322 return fmt.Errorf("origin has no iroh ticket") 323 } 324 pubKey, err := iroh_streamplace.NodeIdFromTicket(*origin.IrohTicket) 325 if err != nil { 326 return fmt.Errorf("could not get node id from ticket: %w", err) 327 } 328 err = swarm.Node.AddTickets([]string{*origin.IrohTicket}) 329 if err != nil { 330 return fmt.Errorf("could not add tickets: %w", err) 331 } 332 pubKeyStr := pubKey.String() 333 err = swarm.checkOrigins(ctx, origin.Streamer, pubKeyStr) 334 if err != nil { 335 return fmt.Errorf("could not check origin: %w", err) 336 } 337 return nil 338} 339 340func (swarm *IrohSwarm) checkOrigins(ctx context.Context, streamer string, nodeID string) error { 341 ctx = log.WithLogValues(ctx, "streamer", streamer, "nodeID", nodeID, "func", "checkOrigins") 342 err := swarm.cli.StreamIsAllowed(streamer) 343 if err != nil { 344 return fmt.Errorf("user %s is not allowlisted on this node: %w", streamer, err) 345 } 346 swarm.originMutex.Lock() 347 defer swarm.originMutex.Unlock() 348 oldSub, ok := swarm.activeSubs[streamer] 349 if ok { 350 if oldSub.NodeID == nodeID { 351 log.Debug(ctx, "node hasn't changed", "streamer", streamer) 352 // mmyep. same node still has the stream. great news. 353 return nil 354 } 355 log.Log(ctx, "Stream origin changed, swapping to new node", "old_node", oldSub.NodeID, "new_node", nodeID, "streamer", streamer) 356 pubKey, err := iroh_streamplace.PublicKeyFromString(oldSub.NodeID) 357 if err != nil { 358 log.Error(ctx, "could not create public key", "error", err) 359 return err 360 } 361 // different node has the stream. we need to unsubscribe from the old node. 362 err = swarm.Node.Unsubscribe(streamer, pubKey) 363 if err != nil { 364 log.Error(ctx, "could not unsubscribe from key", "error", err) 365 return err 366 } 367 delete(swarm.activeSubs, streamer) 368 } 369 if nodeID == swarm.NodeID { 370 log.Debug(ctx, "I already have this stream", "streamer", streamer) 371 // oh, i have this stream. cool. do nothing. 372 return nil 373 } 374 log.Log(ctx, "Subscribing to stream", "new_node", nodeID, "streamer", streamer) 375 pubKey, err := iroh_streamplace.PublicKeyFromString(nodeID) 376 if err != nil { 377 log.Error(ctx, "could not create public key", "error", err) 378 return err 379 } 380 err = swarm.Node.Subscribe(streamer, pubKey) 381 if err != nil { 382 log.Error(ctx, "could not subscribe to key", "error", err) 383 return err 384 } 385 swarm.activeSubs[streamer] = &SwarmOriginInfo{ 386 Type: "place.stream.swarm.originInfo", 387 NodeID: nodeID, 388 Time: time.Now().Format(util.ISO8601), 389 Streamer: streamer, 390 } 391 return nil 392} 393 394func (swarm *IrohSwarm) startSegmentSender(ctx context.Context) error { 395 ch := swarm.mm.NewSegment() 396 for { 397 select { 398 case <-ctx.Done(): 399 return ctx.Err() 400 case not := <-ch: 401 err := swarm.SendSegment(ctx, not) 402 if err != nil { 403 log.Error(ctx, "could not send segment to swarm", "error", err) 404 } 405 continue 406 } 407 } 408} 409 410func (swarm *IrohSwarm) HandleData(pubKey *iroh_streamplace.PublicKey, topic string, data []byte) { 411 swarm.handleDataScoped(pubKey, topic, data) 412} 413 414func (swarm *IrohSwarm) SendSegment(ctx context.Context, not *media.NewSegmentNotification) error { 415 if !not.Local { 416 return nil 417 } 418 originInfo := SwarmOriginInfo{ 419 Type: "place.stream.swarm.originInfo", 420 NodeID: swarm.NodeID, 421 Time: not.Segment.StartTime.Format(util.ISO8601), 422 Streamer: not.Segment.RepoDID, 423 } 424 bs, err := json.Marshal(originInfo) 425 if err != nil { 426 log.Error(ctx, "could not marshal origin info", "error", err) 427 return err 428 } 429 keyBs := []byte(fmt.Sprintf("origin::%s", not.Segment.RepoDID)) 430 go func() { 431 err = swarm.w.Put(nil, keyBs, bs) 432 if err != nil { 433 log.Error(ctx, "could not put segment to swarm", "error", err) 434 } 435 }() 436 go func() { 437 err = swarm.Node.SendSegment(not.Segment.RepoDID, not.Data) 438 if err != nil { 439 log.Error(ctx, "could not send segment to swarm", "error", err) 440 } 441 }() 442 return nil 443}