Live video on the AT Protocol
at eli/fix-postgres-locking 170 lines 5.3 kB view raw
1package atproto 2 3// func TestChatMessage(t *testing.T) { 4// dev := devenv.WithDevEnv(t) 5// t.Logf("dev: %+v", dev) 6// cli := config.CLI{ 7// PublicHost: "example.com", 8// DBURL: ":memory:", 9// RelayHost: strings.ReplaceAll(dev.PDSURL, "http://", "ws://"), 10// PLCURL: dev.PLCURL, 11// } 12// t.Logf("cli: %+v", cli) 13// b := bus.NewBus() 14// cli.DataDir = t.TempDir() 15// mod, err := model.MakeDB(":memory:") 16// require.NoError(t, err) 17// state, err := statedb.MakeDB(&cli, nil, mod) 18// require.NoError(t, err) 19// atsync := &ATProtoSynchronizer{ 20// CLI: &cli, 21// StatefulDB: state, 22// Model: mod, 23// Bus: b, 24// } 25 26// ctx, cancel := context.WithCancel(context.Background()) 27 28// done := make(chan struct{}) 29 30// go func() { 31// err := atsync.StartFirehose(ctx) 32// require.NoError(t, err) 33// close(done) 34// }() 35 36// user := dev.CreateAccount(t) 37// user2 := dev.CreateAccount(t) 38 39// ch := b.Subscribe(user.DID) 40// defer b.Unsubscribe(user.DID, ch) 41 42// busMessages := []bus.Message{} 43// go func() { 44// for msg := range ch { 45// t.Logf("message: %+v", msg) 46// busMessages = append(busMessages, msg) 47// } 48// }() 49 50// msg := &streamplace.ChatMessage{ 51// LexiconTypeID: "place.stream.chat.message", 52// Text: "Hello, world!", 53// CreatedAt: time.Now().Add(-time.Second).Format(util.ISO8601), 54// Streamer: user.DID, 55// } 56 57// rec1, err := comatproto.RepoCreateRecord(ctx, user.XRPC, &comatproto.RepoCreateRecord_Input{ 58// Collection: "place.stream.chat.message", 59// Repo: user.DID, 60// Record: &lexutil.LexiconTypeDecoder{Val: msg}, 61// }) 62// require.NoError(t, err) 63 64// msg2 := &streamplace.ChatMessage{ 65// LexiconTypeID: "place.stream.chat.message", 66// Text: "Hello, world 2!", 67// CreatedAt: time.Now().Format(util.ISO8601), 68// Streamer: user.DID, 69// } 70 71// _, err = comatproto.RepoCreateRecord(ctx, user2.XRPC, &comatproto.RepoCreateRecord_Input{ 72// Collection: "place.stream.chat.message", 73// Repo: user2.DID, 74// Record: &lexutil.LexiconTypeDecoder{Val: msg2}, 75// }) 76// require.NoError(t, err) 77 78// messages := []*streamplace.ChatDefs_MessageView{} 79// err = untilNoErrors(t, func() error { 80// messages, err = mod.MostRecentChatMessages(user.DID) 81// if err != nil { 82// return err 83// } 84// if len(messages) != 2 { 85// return fmt.Errorf("expected 2 messages, got %d", len(messages)) 86// } 87// if len(busMessages) != 2 { 88// return fmt.Errorf("expected 2 bus messages, got %d", len(busMessages)) 89// } 90// return nil 91// }) 92// // Reverse the messages slice to match expected order (most recent first) 93// slices.SortFunc(messages, func(a, b *streamplace.ChatDefs_MessageView) int { 94// aTime := a.Record.Val.(*streamplace.ChatMessage).CreatedAt 95// bTime := b.Record.Val.(*streamplace.ChatMessage).CreatedAt 96// if aTime < bTime { 97// return -1 98// } else if aTime > bTime { 99// return 1 100// } 101// return 0 102// }) 103// require.Equal(t, msg.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text) 104// require.Equal(t, msg2.Text, messages[1].Record.Val.(*streamplace.ChatMessage).Text) 105// busMessage1 := busMessages[0].(*streamplace.ChatDefs_MessageView) 106// busMessage2 := busMessages[1].(*streamplace.ChatDefs_MessageView) 107// require.Equal(t, msg.Text, busMessage1.Record.Val.(*streamplace.ChatMessage).Text) 108// require.Equal(t, msg2.Text, busMessage2.Record.Val.(*streamplace.ChatMessage).Text) 109 110// rkey := strings.TrimPrefix(rec1.Uri, fmt.Sprintf("at://%s/place.stream.chat.message/", user.DID)) 111 112// _, err = comatproto.RepoDeleteRecord(ctx, user.XRPC, &comatproto.RepoDeleteRecord_Input{ 113// Collection: "place.stream.chat.message", 114// Repo: user.DID, 115// Rkey: rkey, 116// }) 117 118// require.NoError(t, err) 119 120// err = untilNoErrors(t, func() error { 121// messages, err = mod.MostRecentChatMessages(user.DID) 122// if err != nil { 123// return err 124// } 125// if len(messages) != 1 { 126// return fmt.Errorf("expected 1 message, got %d", len(messages)) 127// } 128// if len(busMessages) != 3 { 129// return fmt.Errorf("expected 3 bus messages, got %d", len(busMessages)) 130// } 131// return nil 132// }) 133// require.NoError(t, err) 134// require.Equal(t, msg2.Text, messages[0].Record.Val.(*streamplace.ChatMessage).Text) 135// busMessage3 := busMessages[2].(*streamplace.ChatDefs_MessageView) 136// require.Equal(t, true, *busMessage3.Deleted) 137 138// cancel() 139// <-done 140// } 141 142// func untilNoErrors(t *testing.T, f func() error) error { 143// ticker := backoff.NewTicker(NewExponentialBackOff()) 144// defer ticker.Stop() 145// var err error 146// for i := 0; i < 10; i++ { 147// err = f() 148// if err == nil { 149// return err 150// } 151// if i < 9 { 152// <-ticker.C 153// } 154// } 155// return err 156// } 157 158// // More aggressive backoff for tests 159// func NewExponentialBackOff() *backoff.ExponentialBackOff { 160// b := &backoff.ExponentialBackOff{ 161// InitialInterval: 100 * time.Millisecond, 162// RandomizationFactor: backoff.DefaultRandomizationFactor, 163// Multiplier: backoff.DefaultMultiplier, 164// MaxInterval: 2 * time.Second, 165// MaxElapsedTime: 10 * time.Second, 166// Clock: backoff.SystemClock, 167// } 168// b.Reset() 169// return b 170// }