forked from
juliet.paris/streamplace-spa
minimal streamplace frontend
1import { Client } from "@atcute/client";
2import { OAuthUserAgent } from "@atcute/oauth-browser-client";
3
4export interface Facet {
5 index: { byteStart: number; byteEnd: number };
6 features: Array<
7 | { $type: "app.bsky.richtext.facet#mention"; did: string }
8 | { $type: "app.bsky.richtext.facet#link"; uri: string }
9 >;
10}
11
12export interface ChatMessage {
13 uri: string;
14 cid: string;
15 author: { handle: string; did: string };
16 record: {
17 text: string;
18 facets?: Facet[];
19 reply?: { parent?: { uri: string; cid: string }; root?: { uri: string; cid: string } };
20 };
21 chatProfile?: { color?: { red: number; green: number; blue: number } };
22 indexedAt: string;
23 badges?: Array<{ badgeType: string }>;
24 replyTo?: ChatMessage;
25}
26
27export interface RichTextSegment {
28 text: string;
29 facet?: Facet;
30}
31
32export function segmentRichText(text: string, facets?: Facet[]): RichTextSegment[] {
33 if (!facets || facets.length === 0) {
34 return [{ text }];
35 }
36
37 const encoder = new TextEncoder();
38 const decoder = new TextDecoder();
39 const bytes = encoder.encode(text);
40
41 // Sort facets by byteStart
42 const sorted = [...facets].sort((a, b) => a.index.byteStart - b.index.byteStart);
43
44 const segments: RichTextSegment[] = [];
45 let cursor = 0;
46
47 for (const facet of sorted) {
48 const { byteStart, byteEnd } = facet.index;
49 if (byteStart < cursor || byteEnd > bytes.length) continue;
50
51 // Text before this facet
52 if (byteStart > cursor) {
53 segments.push({ text: decoder.decode(bytes.slice(cursor, byteStart)) });
54 }
55
56 // The facet text
57 segments.push({
58 text: decoder.decode(bytes.slice(byteStart, byteEnd)),
59 facet,
60 });
61
62 cursor = byteEnd;
63 }
64
65 // Remaining text after last facet
66 if (cursor < bytes.length) {
67 segments.push({ text: decoder.decode(bytes.slice(cursor)) });
68 }
69
70 return segments;
71}
72
73export interface StreamInfo {
74 title: string;
75 handle: string;
76}
77
78export interface ChatCallbacks {
79 onMessage: (msg: ChatMessage) => void;
80 onStreamInfo: (info: StreamInfo) => void;
81 onViewerCount: (count: number) => void;
82 onOpen: () => void;
83 onClose: () => void;
84}
85
86export interface ChatConnection {
87 close(): void;
88}
89
90export function connectChatWs(handle: string, callbacks: ChatCallbacks): ChatConnection {
91 const wsUrl = `wss://stream.place/api/websocket/${encodeURIComponent(handle)}`;
92 let ws: WebSocket | undefined;
93 let closed = false;
94 let retryDelay = 1000;
95 let retryTimer: ReturnType<typeof setTimeout> | undefined;
96
97 function connect() {
98 if (closed) return;
99 ws = new WebSocket(wsUrl);
100
101 ws.onopen = () => {
102 retryDelay = 1000;
103 callbacks.onOpen();
104 };
105
106 ws.onclose = () => {
107 callbacks.onClose();
108 scheduleReconnect();
109 };
110
111 ws.onerror = () => {
112 ws?.close();
113 };
114
115 ws.onmessage = (event) => {
116 try {
117 const data = JSON.parse(event.data);
118
119 if (data.$type === "place.stream.chat.defs#messageView") {
120 callbacks.onMessage(data as ChatMessage);
121 } else if (data.$type === "place.stream.livestream#livestreamView") {
122 callbacks.onStreamInfo({
123 title: data.record?.title || "",
124 handle: data.author?.handle || "",
125 });
126 } else if (data.$type === "place.stream.livestream#viewerCount") {
127 callbacks.onViewerCount(data.count ?? 0);
128 }
129 } catch {
130 // ignore non-JSON messages
131 }
132 };
133 }
134
135 function scheduleReconnect() {
136 if (closed) return;
137 retryTimer = setTimeout(() => {
138 retryDelay = Math.min(retryDelay * 2, 30000);
139 connect();
140 }, retryDelay);
141 }
142
143 connect();
144
145 return {
146 close() {
147 closed = true;
148 clearTimeout(retryTimer);
149 ws?.close();
150 },
151 };
152}
153
154// URL regex - matches http:// and https:// URLs
155const URL_RE = /https?:\/\/[^\s\])<>]+/g;
156// Mention regex - matches @handle.tld patterns
157const MENTION_RE = /(?<![.\w])@([\w.-]+\.[\w.-]+)/g;
158
159export function detectFacets(text: string): Facet[] {
160 const encoder = new TextEncoder();
161 const bytes = encoder.encode(text);
162 const facets: Facet[] = [];
163
164 // We need byte offsets, so we convert char indices to byte indices
165 const charToByteOffset = (charIdx: number): number => {
166 return encoder.encode(text.slice(0, charIdx)).byteLength;
167 };
168
169 // Detect URLs
170 for (const match of text.matchAll(URL_RE)) {
171 const start = match.index!;
172 // Trim trailing punctuation that's likely not part of the URL
173 let uri = match[0];
174 while (uri.length > 0 && /[.,;:!?)}\]'"]+$/.test(uri)) {
175 uri = uri.slice(0, -1);
176 }
177 const byteStart = charToByteOffset(start);
178 const byteEnd = charToByteOffset(start + uri.length);
179 if (byteEnd <= bytes.byteLength) {
180 facets.push({
181 index: { byteStart, byteEnd },
182 features: [{ $type: "app.bsky.richtext.facet#link", uri }],
183 });
184 }
185 }
186
187 // Detect mentions
188 for (const match of text.matchAll(MENTION_RE)) {
189 const start = match.index!;
190 const end = start + match[0].length;
191 const byteStart = charToByteOffset(start);
192 const byteEnd = charToByteOffset(end);
193 if (byteEnd <= bytes.byteLength) {
194 // Store the handle; the DID will need to be resolved before sending
195 facets.push({
196 index: { byteStart, byteEnd },
197 features: [{ $type: "app.bsky.richtext.facet#mention", did: match[1] }],
198 });
199 }
200 }
201
202 return facets.sort((a, b) => a.index.byteStart - b.index.byteStart);
203}
204
205export interface ReplyRef {
206 uri: string;
207 cid: string;
208}
209
210export async function sendChatMessage(
211 oauthAgent: OAuthUserAgent,
212 repo: string,
213 streamerDid: string,
214 text: string,
215 resolveHandle?: (handle: string) => Promise<string>,
216 reply?: { root: ReplyRef; parent: ReplyRef },
217): Promise<void> {
218 const rpc = new Client({ handler: oauthAgent });
219
220 let facets = detectFacets(text);
221
222 // Resolve mention handles to DIDs
223 if (resolveHandle) {
224 facets = await Promise.all(
225 facets.map(async (facet) => {
226 const feature = facet.features[0];
227 if (feature.$type === "app.bsky.richtext.facet#mention") {
228 try {
229 const did = await resolveHandle(feature.did);
230 return {
231 ...facet,
232 features: [{ $type: "app.bsky.richtext.facet#mention" as const, did }],
233 };
234 } catch {
235 // If resolution fails, drop the facet
236 return null;
237 }
238 }
239 return facet;
240 }),
241 ).then((results) => results.filter((f): f is Facet => f !== null));
242 }
243
244 const record: Record<string, unknown> = {
245 $type: "place.stream.chat.message",
246 text,
247 streamer: streamerDid,
248 createdAt: new Date().toISOString(),
249 };
250
251 if (facets.length > 0) {
252 record.facets = facets;
253 }
254
255 if (reply) {
256 record.reply = reply;
257 }
258
259 await rpc.post("com.atproto.repo.createRecord", {
260 input: {
261 repo: repo as `did:${string}:${string}`,
262 collection: "place.stream.chat.message",
263 record,
264 },
265 });
266}