+4
pds/server.go
+4
pds/server.go
···
598
case evt.RepoCommit != nil:
599
header.MsgType = "#commit"
600
obj = evt.RepoCommit
601
case evt.RepoIdentity != nil:
602
header.MsgType = "#identity"
603
obj = evt.RepoIdentity
···
651
return fmt.Errorf("failed to update handle: %w", err)
652
}
653
654
if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
655
RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{
656
Did: u.Did,
···
598
case evt.RepoCommit != nil:
599
header.MsgType = "#commit"
600
obj = evt.RepoCommit
601
+
case evt.RepoSync != nil:
602
+
header.MsgType = "#sync"
603
+
obj = evt.RepoSync
604
case evt.RepoIdentity != nil:
605
header.MsgType = "#identity"
606
obj = evt.RepoIdentity
···
654
return fmt.Errorf("failed to update handle: %w", err)
655
}
656
657
+
// Push an Identity event
658
if err := s.events.AddEvent(ctx, &events.XRPCStreamEvent{
659
RepoIdentity: &comatproto.SyncSubscribeRepos_Identity{
660
Did: u.Did,
+7
testing/utils.go
+7
testing/utils.go
···
666
es.Lk.Unlock()
667
return nil
668
},
669
+
RepoSync: func(evt *atproto.SyncSubscribeRepos_Sync) error {
670
+
fmt.Println("received sync event: ", evt.Seq, evt.Did)
671
+
es.Lk.Lock()
672
+
es.Events = append(es.Events, &events.XRPCStreamEvent{RepoSync: evt})
673
+
es.Lk.Unlock()
674
+
return nil
675
+
},
676
RepoIdentity: func(evt *atproto.SyncSubscribeRepos_Identity) error {
677
fmt.Println("received identity event: ", evt.Seq, evt.Did)
678
es.Lk.Lock()