Live video on the AT Protocol
1package atproto
2
3import (
4 "context"
5 "fmt"
6 "slices"
7 "strings"
8 "testing"
9 "time"
10
11 comatproto "github.com/bluesky-social/indigo/api/atproto"
12 lexutil "github.com/bluesky-social/indigo/lex/util"
13 "github.com/bluesky-social/indigo/util"
14 "github.com/cenkalti/backoff"
15 "github.com/stretchr/testify/require"
16 "stream.place/streamplace/pkg/bus"
17 "stream.place/streamplace/pkg/config"
18 "stream.place/streamplace/pkg/devenv"
19 "stream.place/streamplace/pkg/model"
20 "stream.place/streamplace/pkg/statedb"
21 "stream.place/streamplace/pkg/streamplace"
22)
23
24func TestChatMessage(t *testing.T) {
25 dev := devenv.WithDevEnv(t)
26 t.Logf("dev: %+v", dev)
27 cli := config.CLI{
28 BroadcasterHost: "example.com",
29 DBURL: ":memory:",
30 RelayHost: strings.ReplaceAll(dev.PDSURL, "http://", "ws://"),
31 PLCURL: dev.PLCURL,
32 }
33 t.Logf("cli: %+v", cli)
34 b := bus.NewBus()
35 cli.DataDir = t.TempDir()
36 mod, err := model.MakeDB(":memory:")
37 require.NoError(t, err)
38 state, err := statedb.MakeDB(context.Background(), &cli, nil, mod)
39 require.NoError(t, err)
40 atsync := &ATProtoSynchronizer{
41 CLI: &cli,
42 StatefulDB: state,
43 Model: mod,
44 Bus: b,
45 }
46
47 ctx, cancel := context.WithCancel(context.Background())
48
49 done := make(chan struct{})
50
51 go func() {
52 err := atsync.StartFirehose(ctx)
53 require.NoError(t, err)
54 close(done)
55 }()
56
57 user := dev.CreateAccount(t)
58 user2 := dev.CreateAccount(t)
59
60 ch := b.Subscribe(user.DID)
61 defer b.Unsubscribe(user.DID, ch)
62
63 busMessages := []bus.Message{}
64 go func() {
65 for msg := range ch {
66 t.Logf("message: %+v", msg)
67 busMessages = append(busMessages, msg)
68 }
69 }()
70
71 msg := &streamplace.ChatMessage{
72 LexiconTypeID: "place.stream.chat.message",
73 Text: "Hello, world!",
74 CreatedAt: time.Now().Add(-time.Second).Format(util.ISO8601),
75 Streamer: user.DID,
76 }
77
78 rec1, err := comatproto.RepoCreateRecord(ctx, user.XRPC, &comatproto.RepoCreateRecord_Input{
79 Collection: "place.stream.chat.message",
80 Repo: user.DID,
81 Record: &lexutil.LexiconTypeDecoder{Val: msg},
82 })
83 require.NoError(t, err)
84
85 msg2 := &streamplace.ChatMessage{
86 LexiconTypeID: "place.stream.chat.message",
87 Text: "Hello, world 2!",
88 CreatedAt: time.Now().Format(util.ISO8601),
89 Streamer: user.DID,
90 }
91
92 _, err = comatproto.RepoCreateRecord(ctx, user2.XRPC, &comatproto.RepoCreateRecord_Input{
93 Collection: "place.stream.chat.message",
94 Repo: user2.DID,
95 Record: &lexutil.LexiconTypeDecoder{Val: msg2},
96 })
97 require.NoError(t, err)
98
99 messages := []*streamplace.ChatDefs_MessageView{}
100 err = untilNoErrors(t, func() error {
101 messages, err = mod.MostRecentChatMessages(user.DID)
102 if err != nil {
103 return err
104 }
105 if len(messages) != 2 {
106 return fmt.Errorf("expected 2 messages, got %d", len(messages))
107 }
108 if len(busMessages) != 2 {
109 return fmt.Errorf("expected 2 bus messages, got %d", len(busMessages))
110 }
111 return nil
112 })
113 // Reverse the messages slice to match expected order (most recent first)
114 slices.SortFunc(messages, func(a, b *streamplace.ChatDefs_MessageView) int {
115 aTime := a.Record.Val.(*streamplace.ChatMessage).CreatedAt
116 bTime := b.Record.Val.(*streamplace.ChatMessage).CreatedAt
117 if aTime < bTime {
118 return -1
119 } else if aTime > bTime {
120 return 1
121 }
122 return 0
123 })
124 slices.SortFunc(busMessages, func(a, b bus.Message) int {
125 aTime := a.(*streamplace.ChatDefs_MessageView).Record.Val.(*streamplace.ChatMessage).CreatedAt
126 bTime := b.(*streamplace.ChatDefs_MessageView).Record.Val.(*streamplace.ChatMessage).CreatedAt
127 if aTime < bTime {
128 return -1
129 } else if aTime > bTime {
130 return 1
131 }
132 return 0
133 })
134 require.Equal(t, msg.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text)
135 require.Equal(t, msg2.Text, messages[1].Record.Val.(*streamplace.ChatMessage).Text)
136 busMessage1 := busMessages[0].(*streamplace.ChatDefs_MessageView)
137 busMessage2 := busMessages[1].(*streamplace.ChatDefs_MessageView)
138 require.Equal(t, msg.Text, busMessage1.Record.Val.(*streamplace.ChatMessage).Text)
139 require.Equal(t, msg2.Text, busMessage2.Record.Val.(*streamplace.ChatMessage).Text)
140
141 rkey := strings.TrimPrefix(rec1.Uri, fmt.Sprintf("at://%s/place.stream.chat.message/", user.DID))
142
143 _, err = comatproto.RepoDeleteRecord(ctx, user.XRPC, &comatproto.RepoDeleteRecord_Input{
144 Collection: "place.stream.chat.message",
145 Repo: user.DID,
146 Rkey: rkey,
147 })
148
149 require.NoError(t, err)
150
151 err = untilNoErrors(t, func() error {
152 messages, err = mod.MostRecentChatMessages(user.DID)
153 if err != nil {
154 return err
155 }
156 if len(messages) != 1 {
157 return fmt.Errorf("expected 1 message, got %d", len(messages))
158 }
159 if len(busMessages) != 3 {
160 return fmt.Errorf("expected 3 bus messages, got %d", len(busMessages))
161 }
162 return nil
163 })
164 require.NoError(t, err)
165 require.Equal(t, msg2.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text)
166 busMessage3 := busMessages[2].(*streamplace.ChatDefs_MessageView)
167 require.Equal(t, true, *busMessage3.Deleted)
168
169 cancel()
170 <-done
171}
172
173func untilNoErrors(t *testing.T, f func() error) error {
174 ticker := backoff.NewTicker(devenv.NewExponentialBackOff())
175 defer ticker.Stop()
176 var err error
177 for i := 0; i < 10; i++ {
178 err = f()
179 if err == nil {
180 return err
181 }
182 if i < 9 {
183 <-ticker.C
184 }
185 }
186 return err
187}