stream.place chat terminal ui
1package main
2
3import (
4 "context"
5 "encoding/json"
6 "errors"
7 "fmt"
8 "github.com/bluesky-social/indigo/atproto/identity"
9 "github.com/bluesky-social/indigo/atproto/syntax"
10 "github.com/bluesky-social/jetstream/pkg/client"
11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential"
12 "github.com/bluesky-social/jetstream/pkg/models"
13 tea "github.com/charmbracelet/bubbletea"
14 "github.com/charmbracelet/lipgloss"
15 "github.com/gopxl/beep"
16 "github.com/gopxl/beep/speaker"
17 "github.com/gopxl/beep/wav"
18 "hash/fnv"
19 "log/slog"
20 "os"
21 "time"
22)
23
24type NotificationSystem struct {
25 np *NotificationPlayer
26 tp *tea.Program
27}
28
29type NotificationPlayer struct {
30 audioData beep.Buffer
31 sampleRate beep.SampleRate
32}
33
34var (
35 hasArgs bool
36 didToHandle map[syntax.DID]*syntax.Handle
37 noNeedForFrom bool
38)
39
40func main() {
41 fmt.Println("beep")
42 if len(os.Args) != 1 {
43 hasArgs = true
44 }
45 if len(os.Args) == 2 {
46 noNeedForFrom = true
47 }
48 f, err := os.Open("thread_notification.wav")
49 if err != nil {
50 panic(err)
51 }
52 streamer, format, err := wav.Decode(f)
53 if err != nil {
54 panic(err)
55 }
56 defer streamer.Close()
57 buffer := beep.NewBuffer(format)
58 buffer.Append(streamer)
59 np := &NotificationPlayer{
60 audioData: *buffer,
61 sampleRate: format.SampleRate,
62 }
63 err = speaker.Init(np.sampleRate, np.sampleRate.N(time.Second/10))
64 if err != nil {
65 panic(err)
66 }
67 tp := tea.NewProgram(initialModel())
68 ns := &NotificationSystem{
69 np,
70 tp,
71 }
72 didToHandle = make(map[syntax.DID]*syntax.Handle)
73 go consumeLoop(context.Background(), ns)
74 ns.tp.Run()
75}
76
77func consumeLoop(ctx context.Context, ns *NotificationSystem) {
78 jsServerAddr := os.Getenv("JS_SERVER_ADDR")
79 if jsServerAddr == "" {
80 jsServerAddr = "wss://jetstream.atproto.tools/subscribe"
81 }
82 consumer := NewConsumer(jsServerAddr, ns)
83 for {
84 err := consumer.Consume(ctx)
85 if err != nil {
86 fmt.Printf("error in consumeLoop: %s\n", err.Error())
87 if errors.Is(err, context.Canceled) {
88 fmt.Println("exiting consume loop")
89 return
90 }
91 }
92 }
93}
94
95type Consumer struct {
96 cfg *client.ClientConfig
97 handler *handler
98}
99
100type handler struct {
101 ns *NotificationSystem
102}
103
104func NewConsumer(jsAddr string, ns *NotificationSystem) *Consumer {
105 cfg := client.DefaultClientConfig()
106 if jsAddr != "" {
107 cfg.WebsocketURL = jsAddr
108 }
109 cfg.WantedCollections = []string{
110 "place.stream.chat.message",
111 }
112 cfg.WantedDids = []string{}
113 return &Consumer{
114 cfg: cfg,
115 handler: &handler{ns},
116 }
117}
118
119func (c *Consumer) Consume(ctx context.Context) error {
120 scheduler := sequential.NewScheduler("jetstream_localdev", slog.Default(), c.handler.HandleEvent)
121 defer scheduler.Shutdown()
122 opts := slog.HandlerOptions{
123 Level: slog.LevelError,
124 }
125 handler := slog.NewJSONHandler(os.Stdout, &opts)
126 client, err := client.NewClient(c.cfg, slog.New(handler), scheduler)
127 if err != nil {
128 return errors.New("failed to create client: " + err.Error())
129 }
130 cursor := time.Now().Add(1 * -time.Minute).UnixMicro()
131 err = client.ConnectAndRead(ctx, &cursor)
132 if err != nil {
133 return errors.New("error connecting and reading: " + err.Error())
134 }
135 return nil
136}
137
138type ChatMessage struct {
139 LexiconTypeID string `json:"$type,const=place.stream.chat.message"`
140 CreatedAt string `json:"createdAt"`
141 Streamer string `json:"streamer"`
142 Text string `json:"text"`
143}
144
145func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error {
146 if event.Commit != nil && event.Commit.Collection == "place.stream.chat.message" && event.Commit.Operation == "create" {
147 handle, err := getHandle(event.Did, ctx)
148 if err != nil {
149 panic(err)
150 }
151 var v ChatMessage
152 err = json.Unmarshal(event.Commit.Record, &v)
153 if err != nil {
154 return nil
155 }
156 shouldSend := !hasArgs
157 if hasArgs {
158 for _, streamer := range os.Args[1:] {
159 if streamer == v.Streamer {
160 shouldSend = true
161 }
162 }
163 }
164 if shouldSend {
165 var streamer string
166 if !noNeedForFrom {
167 streamer, err = getHandle(v.Streamer, ctx)
168 if err != nil {
169 panic(err)
170 }
171 }
172 h.ns.Notify(v.Text, handle, streamer)
173 }
174 }
175 return nil
176}
177
178func getHandle(did string, ctx context.Context) (string, error) {
179 sdid, err := syntax.ParseDID(did)
180 if err != nil {
181 return "", err
182 }
183 h, ok := didToHandle[sdid]
184 if ok {
185 return h.String(), nil
186 }
187 dd := identity.DefaultDirectory()
188 id, err := dd.LookupDID(ctx, sdid)
189 if err != nil {
190 return "failed.to.lookup", nil
191 }
192 didToHandle[sdid] = &id.Handle
193 return id.Handle.String(), nil
194}
195
196func (ns *NotificationSystem) Notify(text string, handle string, streamer string) {
197
198 noise := ns.np.audioData.Streamer(0, ns.np.audioData.Len())
199 speaker.Play(noise)
200 ns.tp.Send(ChatMsg{text: &text, handle: &handle, streamer: &streamer})
201}
202
203type model struct {
204 records []*record
205 width int
206 height int
207}
208
209type record struct {
210 handle *string
211 text *string
212 streamer *string
213}
214
215func initialModel() model {
216 return model{
217 records: make([]*record, 0),
218 }
219}
220
221func (m model) Init() tea.Cmd {
222 return nil
223}
224
225//6c67ea
226//15191e
227
228type ChatMsg struct {
229 handle *string
230 text *string
231 streamer *string
232}
233
234func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) {
235 switch msg := msg.(type) {
236 case tea.KeyMsg:
237 if msg.String() == "q" || msg.String() == "ctrl+c" {
238
239 return m, tea.Quit
240 }
241 case tea.WindowSizeMsg:
242 m.width = msg.Width
243 m.height = msg.Height
244 case ChatMsg:
245 record := record{
246 text: msg.text,
247 handle: msg.handle,
248 streamer: msg.streamer,
249 }
250 m.records = append(m.records, &record)
251 }
252 return m, nil
253}
254
255func (m model) View() string {
256 s := ""
257 for _, record := range m.records {
258 str := "invalid handle"
259 if record.handle != nil {
260 str = fmt.Sprintf("%s", *record.handle)
261 }
262 bold := lipgloss.NewStyle().Bold(true).Foreground(lipgloss.Color(hashStringToColor(str)))
263 bdy := ""
264 if record.text != nil {
265 bdy = fmt.Sprintf("%s", *record.text)
266 }
267 middleText := "\n"
268 if !noNeedForFrom {
269 boldStrmr := lipgloss.NewStyle().Bold(true).Foreground(lipgloss.Color(hashStringToColor(*record.streamer)))
270 middleText = fmt.Sprintf(" in %s's chat\n", boldStrmr.Render(*record.streamer))
271 }
272 style := lipgloss.NewStyle().Width(m.width)
273 s = s + "\n" + bold.Render(str) + middleText + style.Render(bdy) + "\n"
274 }
275 return s
276}
277
278func hashStringToColor(s string) string {
279 h := fnv.New32a()
280 h.Write([]byte(s))
281 ui := h.Sum32()
282 guess := fmt.Sprintf("#%06x", ui)
283 return guess[0:7]
284}