stream.place chat terminal ui
at main 6.4 kB view raw
1package main 2 3import ( 4 "context" 5 "encoding/json" 6 "errors" 7 "fmt" 8 "github.com/bluesky-social/indigo/atproto/identity" 9 "github.com/bluesky-social/indigo/atproto/syntax" 10 "github.com/bluesky-social/jetstream/pkg/client" 11 "github.com/bluesky-social/jetstream/pkg/client/schedulers/sequential" 12 "github.com/bluesky-social/jetstream/pkg/models" 13 tea "github.com/charmbracelet/bubbletea" 14 "github.com/charmbracelet/lipgloss" 15 "github.com/gopxl/beep" 16 "github.com/gopxl/beep/speaker" 17 "github.com/gopxl/beep/wav" 18 "hash/fnv" 19 "log/slog" 20 "os" 21 "time" 22) 23 24type NotificationSystem struct { 25 np *NotificationPlayer 26 tp *tea.Program 27} 28 29type NotificationPlayer struct { 30 audioData beep.Buffer 31 sampleRate beep.SampleRate 32} 33 34var ( 35 hasArgs bool 36 didToHandle map[syntax.DID]*syntax.Handle 37 noNeedForFrom bool 38) 39 40func main() { 41 fmt.Println("beep") 42 if len(os.Args) != 1 { 43 hasArgs = true 44 } 45 if len(os.Args) == 2 { 46 noNeedForFrom = true 47 } 48 f, err := os.Open("thread_notification.wav") 49 if err != nil { 50 panic(err) 51 } 52 streamer, format, err := wav.Decode(f) 53 if err != nil { 54 panic(err) 55 } 56 defer streamer.Close() 57 buffer := beep.NewBuffer(format) 58 buffer.Append(streamer) 59 np := &NotificationPlayer{ 60 audioData: *buffer, 61 sampleRate: format.SampleRate, 62 } 63 err = speaker.Init(np.sampleRate, np.sampleRate.N(time.Second/10)) 64 if err != nil { 65 panic(err) 66 } 67 tp := tea.NewProgram(initialModel()) 68 ns := &NotificationSystem{ 69 np, 70 tp, 71 } 72 didToHandle = make(map[syntax.DID]*syntax.Handle) 73 go consumeLoop(context.Background(), ns) 74 ns.tp.Run() 75} 76 77func consumeLoop(ctx context.Context, ns *NotificationSystem) { 78 jsServerAddr := os.Getenv("JS_SERVER_ADDR") 79 if jsServerAddr == "" { 80 jsServerAddr = "wss://jetstream.atproto.tools/subscribe" 81 } 82 consumer := NewConsumer(jsServerAddr, ns) 83 for { 84 err := consumer.Consume(ctx) 85 if err != nil { 86 fmt.Printf("error in consumeLoop: %s\n", err.Error()) 87 if errors.Is(err, context.Canceled) { 88 fmt.Println("exiting consume loop") 89 return 90 } 91 } 92 } 93} 94 95type Consumer struct { 96 cfg *client.ClientConfig 97 handler *handler 98} 99 100type handler struct { 101 ns *NotificationSystem 102} 103 104func NewConsumer(jsAddr string, ns *NotificationSystem) *Consumer { 105 cfg := client.DefaultClientConfig() 106 if jsAddr != "" { 107 cfg.WebsocketURL = jsAddr 108 } 109 cfg.WantedCollections = []string{ 110 "place.stream.chat.message", 111 } 112 cfg.WantedDids = []string{} 113 return &Consumer{ 114 cfg: cfg, 115 handler: &handler{ns}, 116 } 117} 118 119func (c *Consumer) Consume(ctx context.Context) error { 120 scheduler := sequential.NewScheduler("jetstream_localdev", slog.Default(), c.handler.HandleEvent) 121 defer scheduler.Shutdown() 122 opts := slog.HandlerOptions{ 123 Level: slog.LevelError, 124 } 125 handler := slog.NewJSONHandler(os.Stdout, &opts) 126 client, err := client.NewClient(c.cfg, slog.New(handler), scheduler) 127 if err != nil { 128 return errors.New("failed to create client: " + err.Error()) 129 } 130 cursor := time.Now().Add(1 * -time.Minute).UnixMicro() 131 err = client.ConnectAndRead(ctx, &cursor) 132 if err != nil { 133 return errors.New("error connecting and reading: " + err.Error()) 134 } 135 return nil 136} 137 138type ChatMessage struct { 139 LexiconTypeID string `json:"$type,const=place.stream.chat.message"` 140 CreatedAt string `json:"createdAt"` 141 Streamer string `json:"streamer"` 142 Text string `json:"text"` 143} 144 145func (h *handler) HandleEvent(ctx context.Context, event *models.Event) error { 146 if event.Commit != nil && event.Commit.Collection == "place.stream.chat.message" && event.Commit.Operation == "create" { 147 handle, err := getHandle(event.Did, ctx) 148 if err != nil { 149 panic(err) 150 } 151 var v ChatMessage 152 err = json.Unmarshal(event.Commit.Record, &v) 153 if err != nil { 154 return nil 155 } 156 shouldSend := !hasArgs 157 if hasArgs { 158 for _, streamer := range os.Args[1:] { 159 if streamer == v.Streamer { 160 shouldSend = true 161 } 162 } 163 } 164 if shouldSend { 165 var streamer string 166 if !noNeedForFrom { 167 streamer, err = getHandle(v.Streamer, ctx) 168 if err != nil { 169 panic(err) 170 } 171 } 172 h.ns.Notify(v.Text, handle, streamer) 173 } 174 } 175 return nil 176} 177 178func getHandle(did string, ctx context.Context) (string, error) { 179 sdid, err := syntax.ParseDID(did) 180 if err != nil { 181 return "", err 182 } 183 h, ok := didToHandle[sdid] 184 if ok { 185 return h.String(), nil 186 } 187 dd := identity.DefaultDirectory() 188 id, err := dd.LookupDID(ctx, sdid) 189 if err != nil { 190 return "failed.to.lookup", nil 191 } 192 didToHandle[sdid] = &id.Handle 193 return id.Handle.String(), nil 194} 195 196func (ns *NotificationSystem) Notify(text string, handle string, streamer string) { 197 198 noise := ns.np.audioData.Streamer(0, ns.np.audioData.Len()) 199 speaker.Play(noise) 200 ns.tp.Send(ChatMsg{text: &text, handle: &handle, streamer: &streamer}) 201} 202 203type model struct { 204 records []*record 205 width int 206 height int 207} 208 209type record struct { 210 handle *string 211 text *string 212 streamer *string 213} 214 215func initialModel() model { 216 return model{ 217 records: make([]*record, 0), 218 } 219} 220 221func (m model) Init() tea.Cmd { 222 return nil 223} 224 225//6c67ea 226//15191e 227 228type ChatMsg struct { 229 handle *string 230 text *string 231 streamer *string 232} 233 234func (m model) Update(msg tea.Msg) (tea.Model, tea.Cmd) { 235 switch msg := msg.(type) { 236 case tea.KeyMsg: 237 if msg.String() == "q" || msg.String() == "ctrl+c" { 238 239 return m, tea.Quit 240 } 241 case tea.WindowSizeMsg: 242 m.width = msg.Width 243 m.height = msg.Height 244 case ChatMsg: 245 record := record{ 246 text: msg.text, 247 handle: msg.handle, 248 streamer: msg.streamer, 249 } 250 m.records = append(m.records, &record) 251 } 252 return m, nil 253} 254 255func (m model) View() string { 256 s := "" 257 for _, record := range m.records { 258 str := "invalid handle" 259 if record.handle != nil { 260 str = fmt.Sprintf("%s", *record.handle) 261 } 262 bold := lipgloss.NewStyle().Bold(true).Foreground(lipgloss.Color(hashStringToColor(str))) 263 bdy := "" 264 if record.text != nil { 265 bdy = fmt.Sprintf("%s", *record.text) 266 } 267 middleText := "\n" 268 if !noNeedForFrom { 269 boldStrmr := lipgloss.NewStyle().Bold(true).Foreground(lipgloss.Color(hashStringToColor(*record.streamer))) 270 middleText = fmt.Sprintf(" in %s's chat\n", boldStrmr.Render(*record.streamer)) 271 } 272 style := lipgloss.NewStyle().Width(m.width) 273 s = s + "\n" + bold.Render(str) + middleText + style.Render(bdy) + "\n" 274 } 275 return s 276} 277 278func hashStringToColor(s string) string { 279 h := fnv.New32a() 280 h.Write([]byte(s)) 281 ui := h.Sum32() 282 guess := fmt.Sprintf("#%06x", ui) 283 return guess[0:7] 284}