Live video on the AT Protocol
1package atproto
2
3// func TestChatMessage(t *testing.T) {
4// dev := devenv.WithDevEnv(t)
5// t.Logf("dev: %+v", dev)
6// cli := config.CLI{
7// PublicHost: "example.com",
8// DBURL: ":memory:",
9// RelayHost: strings.ReplaceAll(dev.PDSURL, "http://", "ws://"),
10// PLCURL: dev.PLCURL,
11// }
12// t.Logf("cli: %+v", cli)
13// b := bus.NewBus()
14// cli.DataDir = t.TempDir()
15// mod, err := model.MakeDB(":memory:")
16// require.NoError(t, err)
17// state, err := statedb.MakeDB(&cli, nil, mod)
18// require.NoError(t, err)
19// atsync := &ATProtoSynchronizer{
20// CLI: &cli,
21// StatefulDB: state,
22// Model: mod,
23// Bus: b,
24// }
25
26// ctx, cancel := context.WithCancel(context.Background())
27
28// done := make(chan struct{})
29
30// go func() {
31// err := atsync.StartFirehose(ctx)
32// require.NoError(t, err)
33// close(done)
34// }()
35
36// user := dev.CreateAccount(t)
37// user2 := dev.CreateAccount(t)
38
39// ch := b.Subscribe(user.DID)
40// defer b.Unsubscribe(user.DID, ch)
41
42// busMessages := []bus.Message{}
43// go func() {
44// for msg := range ch {
45// t.Logf("message: %+v", msg)
46// busMessages = append(busMessages, msg)
47// }
48// }()
49
50// msg := &streamplace.ChatMessage{
51// LexiconTypeID: "place.stream.chat.message",
52// Text: "Hello, world!",
53// CreatedAt: time.Now().Add(-time.Second).Format(util.ISO8601),
54// Streamer: user.DID,
55// }
56
57// rec1, err := comatproto.RepoCreateRecord(ctx, user.XRPC, &comatproto.RepoCreateRecord_Input{
58// Collection: "place.stream.chat.message",
59// Repo: user.DID,
60// Record: &lexutil.LexiconTypeDecoder{Val: msg},
61// })
62// require.NoError(t, err)
63
64// msg2 := &streamplace.ChatMessage{
65// LexiconTypeID: "place.stream.chat.message",
66// Text: "Hello, world 2!",
67// CreatedAt: time.Now().Format(util.ISO8601),
68// Streamer: user.DID,
69// }
70
71// _, err = comatproto.RepoCreateRecord(ctx, user2.XRPC, &comatproto.RepoCreateRecord_Input{
72// Collection: "place.stream.chat.message",
73// Repo: user2.DID,
74// Record: &lexutil.LexiconTypeDecoder{Val: msg2},
75// })
76// require.NoError(t, err)
77
78// messages := []*streamplace.ChatDefs_MessageView{}
79// err = untilNoErrors(t, func() error {
80// messages, err = mod.MostRecentChatMessages(user.DID)
81// if err != nil {
82// return err
83// }
84// if len(messages) != 2 {
85// return fmt.Errorf("expected 2 messages, got %d", len(messages))
86// }
87// if len(busMessages) != 2 {
88// return fmt.Errorf("expected 2 bus messages, got %d", len(busMessages))
89// }
90// return nil
91// })
92// // Reverse the messages slice to match expected order (most recent first)
93// slices.SortFunc(messages, func(a, b *streamplace.ChatDefs_MessageView) int {
94// aTime := a.Record.Val.(*streamplace.ChatMessage).CreatedAt
95// bTime := b.Record.Val.(*streamplace.ChatMessage).CreatedAt
96// if aTime < bTime {
97// return -1
98// } else if aTime > bTime {
99// return 1
100// }
101// return 0
102// })
103// require.Equal(t, msg.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text)
104// require.Equal(t, msg2.Text, messages[1].Record.Val.(*streamplace.ChatMessage).Text)
105// busMessage1 := busMessages[0].(*streamplace.ChatDefs_MessageView)
106// busMessage2 := busMessages[1].(*streamplace.ChatDefs_MessageView)
107// require.Equal(t, msg.Text, busMessage1.Record.Val.(*streamplace.ChatMessage).Text)
108// require.Equal(t, msg2.Text, busMessage2.Record.Val.(*streamplace.ChatMessage).Text)
109
110// rkey := strings.TrimPrefix(rec1.Uri, fmt.Sprintf("at://%s/place.stream.chat.message/", user.DID))
111
112// _, err = comatproto.RepoDeleteRecord(ctx, user.XRPC, &comatproto.RepoDeleteRecord_Input{
113// Collection: "place.stream.chat.message",
114// Repo: user.DID,
115// Rkey: rkey,
116// })
117
118// require.NoError(t, err)
119
120// err = untilNoErrors(t, func() error {
121// messages, err = mod.MostRecentChatMessages(user.DID)
122// if err != nil {
123// return err
124// }
125// if len(messages) != 1 {
126// return fmt.Errorf("expected 1 message, got %d", len(messages))
127// }
128// if len(busMessages) != 3 {
129// return fmt.Errorf("expected 3 bus messages, got %d", len(busMessages))
130// }
131// return nil
132// })
133// require.NoError(t, err)
134// require.Equal(t, msg2.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text)
135// busMessage3 := busMessages[2].(*streamplace.ChatDefs_MessageView)
136// require.Equal(t, true, *busMessage3.Deleted)
137
138// cancel()
139// <-done
140// }
141
142// func untilNoErrors(t *testing.T, f func() error) error {
143// ticker := backoff.NewTicker(NewExponentialBackOff())
144// defer ticker.Stop()
145// var err error
146// for i := 0; i < 10; i++ {
147// err = f()
148// if err == nil {
149// return err
150// }
151// if i < 9 {
152// <-ticker.C
153// }
154// }
155// return err
156// }
157
158// // More aggressive backoff for tests
159// func NewExponentialBackOff() *backoff.ExponentialBackOff {
160// b := &backoff.ExponentialBackOff{
161// InitialInterval: 100 * time.Millisecond,
162// RandomizationFactor: backoff.DefaultRandomizationFactor,
163// Multiplier: backoff.DefaultMultiplier,
164// MaxInterval: 2 * time.Second,
165// MaxElapsedTime: 10 * time.Second,
166// Clock: backoff.SystemClock,
167// }
168// b.Reset()
169// return b
170// }