Live video on the AT Protocol
at restructure 187 lines 5.3 kB view raw
1package atproto 2 3import ( 4 "context" 5 "fmt" 6 "slices" 7 "strings" 8 "testing" 9 "time" 10 11 comatproto "github.com/bluesky-social/indigo/api/atproto" 12 lexutil "github.com/bluesky-social/indigo/lex/util" 13 "github.com/bluesky-social/indigo/util" 14 "github.com/cenkalti/backoff" 15 "github.com/stretchr/testify/require" 16 "stream.place/streamplace/pkg/bus" 17 "stream.place/streamplace/pkg/config" 18 "stream.place/streamplace/pkg/devenv" 19 "stream.place/streamplace/pkg/model" 20 "stream.place/streamplace/pkg/statedb" 21 "stream.place/streamplace/pkg/streamplace" 22) 23 24func TestChatMessage(t *testing.T) { 25 dev := devenv.WithDevEnv(t) 26 t.Logf("dev: %+v", dev) 27 cli := config.CLI{ 28 BroadcasterHost: "example.com", 29 DBURL: ":memory:", 30 RelayHost: strings.ReplaceAll(dev.PDSURL, "http://", "ws://"), 31 PLCURL: dev.PLCURL, 32 } 33 t.Logf("cli: %+v", cli) 34 b := bus.NewBus() 35 cli.DataDir = t.TempDir() 36 mod, err := model.MakeDB(":memory:") 37 require.NoError(t, err) 38 state, err := statedb.MakeDB(context.Background(), &cli, nil, mod) 39 require.NoError(t, err) 40 atsync := &ATProtoSynchronizer{ 41 CLI: &cli, 42 StatefulDB: state, 43 Model: mod, 44 Bus: b, 45 } 46 47 ctx, cancel := context.WithCancel(context.Background()) 48 49 done := make(chan struct{}) 50 51 go func() { 52 err := atsync.StartFirehose(ctx) 53 require.NoError(t, err) 54 close(done) 55 }() 56 57 user := dev.CreateAccount(t) 58 user2 := dev.CreateAccount(t) 59 60 ch := b.Subscribe(user.DID) 61 defer b.Unsubscribe(user.DID, ch) 62 63 busMessages := []bus.Message{} 64 go func() { 65 for msg := range ch { 66 t.Logf("message: %+v", msg) 67 busMessages = append(busMessages, msg) 68 } 69 }() 70 71 msg := &streamplace.ChatMessage{ 72 LexiconTypeID: "place.stream.chat.message", 73 Text: "Hello, world!", 74 CreatedAt: time.Now().Add(-time.Second).Format(util.ISO8601), 75 Streamer: user.DID, 76 } 77 78 rec1, err := comatproto.RepoCreateRecord(ctx, user.XRPC, &comatproto.RepoCreateRecord_Input{ 79 Collection: "place.stream.chat.message", 80 Repo: user.DID, 81 Record: &lexutil.LexiconTypeDecoder{Val: msg}, 82 }) 83 require.NoError(t, err) 84 85 msg2 := &streamplace.ChatMessage{ 86 LexiconTypeID: "place.stream.chat.message", 87 Text: "Hello, world 2!", 88 CreatedAt: time.Now().Format(util.ISO8601), 89 Streamer: user.DID, 90 } 91 92 _, err = comatproto.RepoCreateRecord(ctx, user2.XRPC, &comatproto.RepoCreateRecord_Input{ 93 Collection: "place.stream.chat.message", 94 Repo: user2.DID, 95 Record: &lexutil.LexiconTypeDecoder{Val: msg2}, 96 }) 97 require.NoError(t, err) 98 99 messages := []*streamplace.ChatDefs_MessageView{} 100 err = untilNoErrors(t, func() error { 101 messages, err = mod.MostRecentChatMessages(user.DID) 102 if err != nil { 103 return err 104 } 105 if len(messages) != 2 { 106 return fmt.Errorf("expected 2 messages, got %d", len(messages)) 107 } 108 if len(busMessages) != 2 { 109 return fmt.Errorf("expected 2 bus messages, got %d", len(busMessages)) 110 } 111 return nil 112 }) 113 // Reverse the messages slice to match expected order (most recent first) 114 slices.SortFunc(messages, func(a, b *streamplace.ChatDefs_MessageView) int { 115 aTime := a.Record.Val.(*streamplace.ChatMessage).CreatedAt 116 bTime := b.Record.Val.(*streamplace.ChatMessage).CreatedAt 117 if aTime < bTime { 118 return -1 119 } else if aTime > bTime { 120 return 1 121 } 122 return 0 123 }) 124 slices.SortFunc(busMessages, func(a, b bus.Message) int { 125 aTime := a.(*streamplace.ChatDefs_MessageView).Record.Val.(*streamplace.ChatMessage).CreatedAt 126 bTime := b.(*streamplace.ChatDefs_MessageView).Record.Val.(*streamplace.ChatMessage).CreatedAt 127 if aTime < bTime { 128 return -1 129 } else if aTime > bTime { 130 return 1 131 } 132 return 0 133 }) 134 require.Equal(t, msg.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text) 135 require.Equal(t, msg2.Text, messages[1].Record.Val.(*streamplace.ChatMessage).Text) 136 busMessage1 := busMessages[0].(*streamplace.ChatDefs_MessageView) 137 busMessage2 := busMessages[1].(*streamplace.ChatDefs_MessageView) 138 require.Equal(t, msg.Text, busMessage1.Record.Val.(*streamplace.ChatMessage).Text) 139 require.Equal(t, msg2.Text, busMessage2.Record.Val.(*streamplace.ChatMessage).Text) 140 141 rkey := strings.TrimPrefix(rec1.Uri, fmt.Sprintf("at://%s/place.stream.chat.message/", user.DID)) 142 143 _, err = comatproto.RepoDeleteRecord(ctx, user.XRPC, &comatproto.RepoDeleteRecord_Input{ 144 Collection: "place.stream.chat.message", 145 Repo: user.DID, 146 Rkey: rkey, 147 }) 148 149 require.NoError(t, err) 150 151 err = untilNoErrors(t, func() error { 152 messages, err = mod.MostRecentChatMessages(user.DID) 153 if err != nil { 154 return err 155 } 156 if len(messages) != 1 { 157 return fmt.Errorf("expected 1 message, got %d", len(messages)) 158 } 159 if len(busMessages) != 3 { 160 return fmt.Errorf("expected 3 bus messages, got %d", len(busMessages)) 161 } 162 return nil 163 }) 164 require.NoError(t, err) 165 require.Equal(t, msg2.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text) 166 busMessage3 := busMessages[2].(*streamplace.ChatDefs_MessageView) 167 require.Equal(t, true, *busMessage3.Deleted) 168 169 cancel() 170 <-done 171} 172 173func untilNoErrors(t *testing.T, f func() error) error { 174 ticker := backoff.NewTicker(devenv.NewExponentialBackOff()) 175 defer ticker.Stop() 176 var err error 177 for i := 0; i < 10; i++ { 178 err = f() 179 if err == nil { 180 return err 181 } 182 if i < 9 { 183 <-ticker.C 184 } 185 } 186 return err 187}