Live video on the AT Protocol

Merge pull request #853 from jazware/jazware/update_loops

Background workers for status and origin updates

authored by Eli Mallon and committed by GitHub 87ee60cf 71351174

+100 -33
+4
pkg/director/director.go
··· 76 76 started: make(chan struct{}), 77 77 statefulDB: d.statefulDB, 78 78 replicator: d.replicator, 79 + // Initialize notification channels (buffered size 1 for coalescing) 80 + statusUpdateChan: make(chan struct{}, 1), 81 + originUpdateChan: make(chan struct{}, 1), 82 + shutdown: make(chan struct{}), 79 83 } 80 84 d.streamSessions[not.Segment.RepoDID] = ss 81 85 g.Go(func() error {
+96 -33
pkg/director/stream_session.go
··· 5 5 "context" 6 6 "fmt" 7 7 "net/url" 8 - "sync" 9 8 "time" 10 9 11 10 comatproto "github.com/bluesky-social/indigo/api/atproto" ··· 44 43 segmentChan chan struct{} 45 44 lastStatus time.Time 46 45 lastStatusCID *string 47 - lastStatusLock sync.Mutex 48 46 lastOriginTime time.Time 49 - lastOriginLock sync.Mutex 50 - g *errgroup.Group 51 - started chan struct{} 52 - ctx context.Context 53 - packets []bus.PacketizedSegment 54 - statefulDB *statedb.StatefulDB 55 - replicator replication.Replicator 47 + 48 + // Channels for background workers 49 + statusUpdateChan chan struct{} // Signal to update status 50 + originUpdateChan chan struct{} // Signal to update broadcast origin 51 + shutdown chan struct{} 52 + 53 + g *errgroup.Group 54 + started chan struct{} 55 + ctx context.Context 56 + packets []bus.PacketizedSegment 57 + statefulDB *statedb.StatefulDB 58 + replicator replication.Replicator 56 59 } 57 60 58 61 func (ss *StreamSession) Start(ctx context.Context, notif *media.NewSegmentNotification) error { ··· 111 114 112 115 close(ss.started) 113 116 117 + // Start background workers for status and origin updates 118 + ss.g.Go(func() error { 119 + return ss.statusUpdateLoop(ctx, spseg.Creator) 120 + }) 121 + ss.g.Go(func() error { 122 + return ss.originUpdateLoop(ctx) 123 + }) 124 + 114 125 ss.Go(ctx, func() error { 115 126 return ss.HandleMultistreamTargets(ctx) 116 127 }) ··· 120 131 case <-ss.segmentChan: 121 132 // reset timer 122 133 case <-ctx.Done(): 134 + // Signal all background workers to stop 135 + close(ss.shutdown) 123 136 return ss.g.Wait() 124 137 // case <-time.After(time.Minute * 1): 125 138 case <-time.After(ss.cli.StreamSessionTimeout): ··· 128 141 for _, r := range allRenditions { 129 142 ss.bus.EndSession(ctx, spseg.Creator, r.Name) 130 143 } 144 + // Signal background workers to stop 145 + close(ss.shutdown) 131 146 if notif.Local { 132 147 ss.Go(ctx, func() error { 133 148 return ss.DeleteStatus(spseg.Creator) ··· 188 203 } 189 204 190 205 if notif.Local { 191 - ss.Go(ctx, func() error { 192 - return ss.UpdateStatus(ctx, spseg.Creator) 193 - }) 194 - 195 - ss.Go(ctx, func() error { 196 - return ss.UpdateBroadcastOrigin(ctx) 197 - }) 206 + ss.UpdateStatus(ctx, spseg.Creator) 207 + ss.UpdateBroadcastOrigin(ctx) 198 208 } 199 209 200 210 if ss.cli.LivepeerGatewayURL != "" { ··· 250 260 if err != nil { 251 261 log.Error(ctx, "failed to enqueue notification task", "err", err) 252 262 } 253 - return ss.UpdateStatus(ctx, spseg.Creator) 263 + ss.UpdateStatus(ctx, spseg.Creator) 264 + return nil 254 265 }) 255 266 } else { 256 267 log.Warn(ctx, "no livestream detected in stream, skipping notification blast", "repoDID", spseg.Creator) ··· 312 323 return nil 313 324 } 314 325 315 - func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) error { 316 - ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 317 - ss.lastStatusLock.Lock() 318 - defer ss.lastStatusLock.Unlock() 319 - if time.Since(ss.lastStatus) < time.Minute { 320 - log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 321 - return nil 326 + // UpdateStatus signals the background worker to update status (non-blocking) 327 + func (ss *StreamSession) UpdateStatus(ctx context.Context, repoDID string) { 328 + select { 329 + case ss.statusUpdateChan <- struct{}{}: 330 + default: 331 + // Channel full, signal already pending 332 + } 333 + } 334 + 335 + // statusUpdateLoop runs as a background goroutine for the session lifetime 336 + func (ss *StreamSession) statusUpdateLoop(ctx context.Context, repoDID string) error { 337 + ctx = log.WithLogValues(ctx, "func", "statusUpdateLoop") 338 + for { 339 + select { 340 + case <-ss.shutdown: 341 + log.Debug(ctx, "statusUpdateLoop shutting down") 342 + return nil 343 + case <-ctx.Done(): 344 + return nil 345 + case <-ss.statusUpdateChan: 346 + if time.Since(ss.lastStatus) < time.Minute { 347 + log.Debug(ctx, "not updating status, last status was less than 1 minute ago") 348 + continue 349 + } 350 + if err := ss.doUpdateStatus(ctx, repoDID); err != nil { 351 + log.Error(ctx, "failed to update status", "error", err) 352 + } 353 + } 322 354 } 355 + } 356 + 357 + // doUpdateStatus performs the actual status update work 358 + func (ss *StreamSession) doUpdateStatus(ctx context.Context, repoDID string) error { 359 + ctx = log.WithLogValues(ctx, "func", "doUpdateStatus") 323 360 324 361 client, err := ss.GetClientByDID(repoDID) 325 362 if err != nil { ··· 421 458 422 459 func (ss *StreamSession) DeleteStatus(repoDID string) error { 423 460 // need a special extra context because the stream session context is already cancelled 461 + // No lock needed - this runs during teardown after the background worker has exited 424 462 ctx := log.WithLogValues(context.Background(), "func", "DeleteStatus", "repoDID", repoDID) 425 - ss.lastStatusLock.Lock() 426 - defer ss.lastStatusLock.Unlock() 427 463 if ss.lastStatusCID == nil { 428 464 log.Debug(ctx, "no status cid to delete") 429 465 return nil ··· 452 488 453 489 var originUpdateInterval = time.Second * 30 454 490 455 - func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) error { 456 - ctx = log.WithLogValues(ctx, "func", "UpdateStatus") 457 - ss.lastOriginLock.Lock() 458 - defer ss.lastOriginLock.Unlock() 459 - if time.Since(ss.lastOriginTime) < originUpdateInterval { 460 - log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago") 461 - return nil 491 + // UpdateBroadcastOrigin signals the background worker to update origin (non-blocking) 492 + func (ss *StreamSession) UpdateBroadcastOrigin(ctx context.Context) { 493 + select { 494 + case ss.originUpdateChan <- struct{}{}: 495 + default: 496 + // Channel full, signal already pending 462 497 } 498 + } 499 + 500 + // originUpdateLoop runs as a background goroutine for the session lifetime 501 + func (ss *StreamSession) originUpdateLoop(ctx context.Context) error { 502 + ctx = log.WithLogValues(ctx, "func", "originUpdateLoop") 503 + for { 504 + select { 505 + case <-ss.shutdown: 506 + log.Debug(ctx, "originUpdateLoop shutting down") 507 + return nil 508 + case <-ctx.Done(): 509 + return nil 510 + case <-ss.originUpdateChan: 511 + if time.Since(ss.lastOriginTime) < originUpdateInterval { 512 + log.Debug(ctx, "not updating origin, last origin was less than 30 seconds ago") 513 + continue 514 + } 515 + if err := ss.doUpdateBroadcastOrigin(ctx); err != nil { 516 + log.Error(ctx, "failed to update broadcast origin", "error", err) 517 + } 518 + } 519 + } 520 + } 521 + 522 + // doUpdateBroadcastOrigin performs the actual broadcast origin update work 523 + func (ss *StreamSession) doUpdateBroadcastOrigin(ctx context.Context) error { 524 + ctx = log.WithLogValues(ctx, "func", "doUpdateBroadcastOrigin") 525 + 463 526 broadcaster := fmt.Sprintf("did:web:%s", ss.cli.BroadcasterHost) 464 527 origin := streamplace.BroadcastOrigin{ 465 528 Streamer: ss.repoDID,