Live video on the AT Protocol

add teleport arrival backend, add cancelling teleport

authored by Natalie B. and committed by Eli Mallon cb275890 e8d41ec8

+768 -21
+2 -1
js/components/src/lib/slash-commands.ts
··· 44 44 const command = commands.get(commandName); 45 45 if (!command) { 46 46 return { 47 - handled: true, 47 + // for now - return false 48 + handled: false, 48 49 error: `Unknown command: /${commandName}`, 49 50 }; 50 51 }
+23 -1
js/components/src/lib/slash-commands/teleport.ts
··· 1 1 import { PlaceStreamLiveTeleport, StreamplaceAgent } from "streamplace"; 2 2 import { registerSlashCommand, SlashCommandResult } from "../slash-commands"; 3 3 4 + export async function deleteTeleport( 5 + pdsAgent: StreamplaceAgent, 6 + userDID: string, 7 + uri: string, 8 + ) { 9 + const rkey = uri.split("/").pop(); 10 + if (!rkey) { 11 + throw new Error("No rkey found in teleport URI"); 12 + } 13 + return await pdsAgent.com.atproto.repo.deleteRecord({ 14 + repo: userDID, 15 + collection: "place.stream.live.teleport", 16 + rkey: rkey, 17 + }); 18 + } 19 + 4 20 export function registerTeleportCommand( 5 21 pdsAgent: StreamplaceAgent, 6 22 userDID: string, 23 + setActiveTeleportUri?: (uri: string | null) => void, 7 24 ) { 8 25 registerSlashCommand({ 9 26 name: "teleport", ··· 72 89 }; 73 90 74 91 try { 75 - await pdsAgent.com.atproto.repo.createRecord({ 92 + const result = await pdsAgent.com.atproto.repo.createRecord({ 76 93 repo: userDID, 77 94 collection: "place.stream.live.teleport", 78 95 record, 79 96 }); 97 + 98 + // store the URI in the livestream store 99 + if (setActiveTeleportUri) { 100 + setActiveTeleportUri(result.data.uri); 101 + } 80 102 81 103 return { handled: true }; 82 104 } catch (err) {
+35 -3
js/components/src/livestream-provider/index.tsx
··· 1 1 import React, { useContext, useEffect, useRef } from "react"; 2 2 import { useAvatars } from "../hooks"; 3 + import { deleteTeleport } from "../lib/slash-commands/teleport"; 3 4 import { StreamNotifications } from "../lib/stream-notifications"; 4 5 import { 5 6 LivestreamContext, 6 7 makeLivestreamStore, 7 8 useLivestreamStore, 8 9 } from "../livestream-store"; 10 + import { useDID, usePDSAgent } from "../streamplace-store"; 9 11 import { useLivestreamWebsocket } from "./websocket"; 10 12 11 13 export function LivestreamProvider({ ··· 50 52 onTeleport?: (targetHandle: string, targetDID: string) => void; 51 53 }) { 52 54 const activeTeleport = useLivestreamStore((state) => state.activeTeleport); 55 + const activeTeleportUri = useLivestreamStore( 56 + (state) => state.activeTeleportUri, 57 + ); 53 58 const profile = useAvatars(activeTeleport ? [activeTeleport.streamer] : []); 59 + const pdsAgent = usePDSAgent(); 60 + const userDID = useDID(); 61 + const prevActiveTeleportRef = useRef(activeTeleport); 54 62 55 63 useEffect(() => { 56 64 if (!activeTeleport || !profile[activeTeleport.streamer]) return; ··· 70 78 targetHandle: targetHandle, 71 79 targetDID: activeTeleport.streamer, 72 80 countdown: countdown, 73 - onCancel: () => { 74 - console.log("Teleport cancelled by user"); 81 + onCancel: async () => { 82 + if (activeTeleportUri && pdsAgent && userDID) { 83 + try { 84 + await deleteTeleport(pdsAgent, userDID, activeTeleportUri); 85 + } catch (err) { 86 + console.error("Failed to delete teleport:", err); 87 + } 88 + } 75 89 }, 76 90 onAutoDismiss: () => { 77 91 console.log("Teleport dismissed bestie!"); ··· 81 95 } 82 96 }, 83 97 }); 84 - }, [activeTeleport, profile, onTeleport]); 98 + }, [ 99 + activeTeleport, 100 + activeTeleportUri, 101 + profile, 102 + onTeleport, 103 + pdsAgent, 104 + userDID, 105 + ]); 106 + 107 + useEffect(() => { 108 + if ( 109 + prevActiveTeleportRef.current && 110 + !activeTeleport && 111 + !activeTeleportUri 112 + ) { 113 + StreamNotifications.teleportCancelled(); 114 + } 115 + prevActiveTeleportRef.current = activeTeleport; 116 + }, [activeTeleport, activeTeleportUri]); 85 117 86 118 return <></>; 87 119 }
+2
js/components/src/livestream-store/livestream-state.tsx
··· 24 24 streamKey: string | null; 25 25 setStreamKey: (key: string | null) => void; 26 26 activeTeleport: PlaceStreamLiveTeleport.Record | null; 27 + activeTeleportUri: string | null; 28 + setActiveTeleportUri: (uri: string | null) => void; 27 29 websocketConnected: boolean; 28 30 hasReceivedSegment: boolean; 29 31 moderationPermissions: PlaceStreamModerationPermission.Record[];
+2
js/components/src/livestream-store/livestream-store.tsx
··· 23 23 recentSegments: [], 24 24 problems: [], 25 25 activeTeleport: null, 26 + activeTeleportUri: null, 27 + setActiveTeleportUri: (uri) => set({ activeTeleportUri: uri }), 26 28 websocketConnected: false, 27 29 hasReceivedSegment: false, 28 30 moderationPermissions: [],
+19
js/components/src/livestream-store/websocket-consumer.tsx
··· 122 122 pendingHides: newPendingHides, 123 123 }; 124 124 state = reduceChat(state, [], [], [hiddenMessageUri]); 125 + } else if (PlaceStreamLiveTeleport.isRecord(message)) { 126 + const teleportRecord = message as PlaceStreamLiveTeleport.Record; 127 + state = { 128 + ...state, 129 + activeTeleport: teleportRecord, 130 + }; 131 + } else if (PlaceStreamLivestream.isTeleportArrival(message)) { 132 + const arrival = message as PlaceStreamLivestream.TeleportArrival; 133 + // when receiving a teleportArrival, we're the target 134 + // the source is teleporting to us 135 + console.log("Received teleport arrival", arrival); 136 + // TODO: show notification or UI for incoming teleport 137 + } else if (PlaceStreamLivestream.isTeleportCanceled(message)) { 138 + // teleport was canceled (deleted or denied) 139 + state = { 140 + ...state, 141 + activeTeleport: null, 142 + activeTeleportUri: null, 143 + }; 125 144 } 126 145 } 127 146 }
+1
js/components/src/streamplace-provider/index.tsx
··· 19 19 url: string; 20 20 oauthSession?: SessionManager | null; 21 21 }) { 22 + console.log("yeh"); 22 23 // todo: handle url changes? 23 24 const store = useRef(makeStreamplaceStore({ url })).current; 24 25
+99
js/docs/src/content/docs/lex-reference/live/place-stream-live-denyteleport.md
··· 1 + --- 2 + title: place.stream.live.denyTeleport 3 + description: Reference for the place.stream.live.denyTeleport lexicon 4 + --- 5 + 6 + **Lexicon Version:** 1 7 + 8 + ## Definitions 9 + 10 + <a name="main"></a> 11 + 12 + ### `main` 13 + 14 + **Type:** `procedure` 15 + 16 + Deny an incoming teleport request. 17 + 18 + **Parameters:** _(None defined)_ 19 + 20 + **Input:** 21 + 22 + - **Encoding:** `application/json` 23 + - **Schema:** 24 + 25 + **Schema Type:** `object` 26 + 27 + | Name | Type | Req'd | Description | Constraints | 28 + | ----- | -------- | ----- | --------------------------------------- | ---------------- | 29 + | `uri` | `string` | ✅ | The URI of the teleport record to deny. | Format: `at-uri` | 30 + 31 + **Output:** 32 + 33 + - **Encoding:** `application/json` 34 + - **Schema:** 35 + 36 + **Schema Type:** `object` 37 + 38 + | Name | Type | Req'd | Description | Constraints | 39 + | --------- | --------- | ----- | --------------------------------------------- | ----------- | 40 + | `success` | `boolean` | ✅ | Whether the teleport was successfully denied. | | 41 + 42 + **Possible Errors:** 43 + 44 + - `TeleportNotFound`: The specified teleport was not found. 45 + - `Unauthorized`: The authenticated user is not the target of this teleport. 46 + 47 + --- 48 + 49 + ## Lexicon Source 50 + 51 + ```json 52 + { 53 + "lexicon": 1, 54 + "id": "place.stream.live.denyTeleport", 55 + "defs": { 56 + "main": { 57 + "type": "procedure", 58 + "description": "Deny an incoming teleport request.", 59 + "input": { 60 + "encoding": "application/json", 61 + "schema": { 62 + "type": "object", 63 + "required": ["uri"], 64 + "properties": { 65 + "uri": { 66 + "type": "string", 67 + "format": "at-uri", 68 + "description": "The URI of the teleport record to deny." 69 + } 70 + } 71 + } 72 + }, 73 + "output": { 74 + "encoding": "application/json", 75 + "schema": { 76 + "type": "object", 77 + "required": ["success"], 78 + "properties": { 79 + "success": { 80 + "type": "boolean", 81 + "description": "Whether the teleport was successfully denied." 82 + } 83 + } 84 + } 85 + }, 86 + "errors": [ 87 + { 88 + "name": "TeleportNotFound", 89 + "description": "The specified teleport was not found." 90 + }, 91 + { 92 + "name": "Unauthorized", 93 + "description": "The authenticated user is not the target of this teleport." 94 + } 95 + ] 96 + } 97 + } 98 + } 99 + ```
+66
js/docs/src/content/docs/lex-reference/live/place-stream-live-teleport.md
··· 1 + --- 2 + title: place.stream.live.teleport 3 + description: Reference for the place.stream.live.teleport lexicon 4 + --- 5 + 6 + **Lexicon Version:** 1 7 + 8 + ## Definitions 9 + 10 + <a name="main"></a> 11 + 12 + ### `main` 13 + 14 + **Type:** `record` 15 + 16 + Record defining a 'teleport', that is active during a certain time. 17 + 18 + **Record Key:** `tid` 19 + 20 + **Record Properties:** 21 + 22 + | Name | Type | Req'd | Description | Constraints | 23 + | ----------------- | --------- | ----- | ---------------------------------------------------------------------------------------------------------------------------------------------------------- | ---------------------- | 24 + | `streamer` | `string` | ✅ | The DID of the streamer to teleport to. | Format: `did` | 25 + | `startsAt` | `string` | ✅ | The time the teleport becomes active. | Format: `datetime` | 26 + | `durationSeconds` | `integer` | ❌ | The time limit in seconds for the teleport. If not set, the teleport is permanent. Must be at least 60 seconds, and no more than 32,400 seconds (9 hours). | Min: 60<br/>Max: 32400 | 27 + 28 + --- 29 + 30 + ## Lexicon Source 31 + 32 + ```json 33 + { 34 + "lexicon": 1, 35 + "id": "place.stream.live.teleport", 36 + "defs": { 37 + "main": { 38 + "type": "record", 39 + "key": "tid", 40 + "description": "Record defining a 'teleport', that is active during a certain time.", 41 + "record": { 42 + "type": "object", 43 + "required": ["streamer", "startsAt"], 44 + "properties": { 45 + "streamer": { 46 + "type": "string", 47 + "format": "did", 48 + "description": "The DID of the streamer to teleport to." 49 + }, 50 + "startsAt": { 51 + "type": "string", 52 + "format": "datetime", 53 + "description": "The time the teleport becomes active." 54 + }, 55 + "durationSeconds": { 56 + "type": "integer", 57 + "description": "The time limit in seconds for the teleport. If not set, the teleport is permanent. Must be at least 60 seconds, and no more than 32,400 seconds (9 hours).", 58 + "minimum": 60, 59 + "maximum": 32400 60 + } 61 + } 62 + } 63 + } 64 + } 65 + } 66 + ```
+71
js/docs/src/content/docs/lex-reference/openapi.json
··· 517 517 } 518 518 } 519 519 }, 520 + "/xrpc/place.stream.live.denyTeleport": { 521 + "post": { 522 + "summary": "Deny an incoming teleport request.", 523 + "operationId": "place.stream.live.denyTeleport", 524 + "tags": ["place.stream.live"], 525 + "responses": { 526 + "200": { 527 + "description": "Success", 528 + "content": { 529 + "application/json": { 530 + "schema": { 531 + "type": "object", 532 + "properties": { 533 + "success": { 534 + "type": "boolean", 535 + "description": "Whether the teleport was successfully denied." 536 + } 537 + }, 538 + "required": ["success"] 539 + } 540 + } 541 + } 542 + }, 543 + "400": { 544 + "description": "Bad Request", 545 + "content": { 546 + "application/json": { 547 + "schema": { 548 + "type": "object", 549 + "required": ["error", "message"], 550 + "properties": { 551 + "error": { 552 + "type": "string", 553 + "oneOf": [ 554 + { 555 + "const": "TeleportNotFound" 556 + }, 557 + { 558 + "const": "Unauthorized" 559 + } 560 + ] 561 + }, 562 + "message": { 563 + "type": "string" 564 + } 565 + } 566 + } 567 + } 568 + } 569 + } 570 + }, 571 + "requestBody": { 572 + "required": true, 573 + "content": { 574 + "application/json": { 575 + "schema": { 576 + "type": "object", 577 + "properties": { 578 + "uri": { 579 + "type": "string", 580 + "description": "The URI of the teleport record to deny.", 581 + "format": "uri" 582 + } 583 + }, 584 + "required": ["uri"] 585 + } 586 + } 587 + } 588 + } 589 + } 590 + }, 520 591 "/xrpc/place.stream.multistream.createTarget": { 521 592 "post": { 522 593 "summary": "Create a new target for rebroadcasting a Streamplace stream.",
+78 -3
js/docs/src/content/docs/lex-reference/place-stream-livestream.md
··· 79 79 80 80 --- 81 81 82 + <a name="teleportarrival"></a> 83 + 84 + ### `teleportArrival` 85 + 86 + **Type:** `object` 87 + 88 + **Properties:** 89 + 90 + | Name | Type | Req'd | Description | Constraints | 91 + | ------------- | ------------------------------------------------------------------------------------------------------------------------------------------------ | ----- | -------------------------------------------------- | ------------------ | 92 + | `teleportUri` | `string` | ✅ | The URI of the teleport record | Format: `at-uri` | 93 + | `source` | [`app.bsky.actor.defs#profileViewBasic`](https://github.com/bluesky-social/atproto/tree/main/lexicons/app/bsky/actor/defs.json#profileViewBasic) | ✅ | The streamer who is teleporting their viewers here | | 94 + | `viewerCount` | `integer` | ✅ | How many viewers are arriving from this teleport | | 95 + | `startsAt` | `string` | ✅ | When this teleport started | Format: `datetime` | 96 + 97 + --- 98 + 99 + <a name="teleportcanceled"></a> 100 + 101 + ### `teleportCanceled` 102 + 103 + **Type:** `object` 104 + 105 + **Properties:** 106 + 107 + | Name | Type | Req'd | Description | Constraints | 108 + | ------------- | -------- | ----- | ------------------------------------------------ | ------------------------------------ | 109 + | `teleportUri` | `string` | ✅ | The URI of the teleport record that was canceled | Format: `at-uri` | 110 + | `reason` | `string` | ✅ | Why this teleport was canceled | Enum: `deleted`, `denied`, `expired` | 111 + 112 + --- 113 + 82 114 <a name="streamplaceanything"></a> 83 115 84 116 ### `streamplaceAnything` ··· 87 119 88 120 **Properties:** 89 121 90 - | Name | Type | Req'd | Description | Constraints | 91 - | ------------ | ------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----- | ----------- | ----------- | 92 - | `livestream` | Union of:<br/>&nbsp;&nbsp;[`#livestreamView`](#livestreamview)<br/>&nbsp;&nbsp;[`#viewerCount`](#viewercount)<br/>&nbsp;&nbsp;[`place.stream.defs#blockView`](/lex-reference/place-stream-defs#blockview)<br/>&nbsp;&nbsp;[`place.stream.defs#renditions`](/lex-reference/place-stream-defs#renditions)<br/>&nbsp;&nbsp;[`place.stream.defs#rendition`](/lex-reference/place-stream-defs#rendition)<br/>&nbsp;&nbsp;[`place.stream.chat.defs#messageView`](/lex-reference/place-stream-chat-defs#messageview) | ✅ | | | 122 + | Name | Type | Req'd | Description | Constraints | 123 + | ------------ | ----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------- | ----- | ----------- | ----------- | 124 + | `livestream` | Union of:<br/>&nbsp;&nbsp;[`#livestreamView`](#livestreamview)<br/>&nbsp;&nbsp;[`#viewerCount`](#viewercount)<br/>&nbsp;&nbsp;[`#teleportArrival`](#teleportarrival)<br/>&nbsp;&nbsp;[`#teleportCanceled`](#teleportcanceled)<br/>&nbsp;&nbsp;[`place.stream.defs#blockView`](/lex-reference/place-stream-defs#blockview)<br/>&nbsp;&nbsp;[`place.stream.defs#renditions`](/lex-reference/place-stream-defs#renditions)<br/>&nbsp;&nbsp;[`place.stream.defs#rendition`](/lex-reference/place-stream-defs#rendition)<br/>&nbsp;&nbsp;[`place.stream.chat.defs#messageView`](/lex-reference/place-stream-chat-defs#messageview) | ✅ | | | 93 125 94 126 --- 95 127 ··· 199 231 } 200 232 } 201 233 }, 234 + "teleportArrival": { 235 + "type": "object", 236 + "required": ["teleportUri", "source", "viewerCount", "startsAt"], 237 + "properties": { 238 + "teleportUri": { 239 + "type": "string", 240 + "format": "at-uri", 241 + "description": "The URI of the teleport record" 242 + }, 243 + "source": { 244 + "type": "ref", 245 + "ref": "app.bsky.actor.defs#profileViewBasic", 246 + "description": "The streamer who is teleporting their viewers here" 247 + }, 248 + "viewerCount": { 249 + "type": "integer", 250 + "description": "How many viewers are arriving from this teleport" 251 + }, 252 + "startsAt": { 253 + "type": "string", 254 + "format": "datetime", 255 + "description": "When this teleport started" 256 + } 257 + } 258 + }, 259 + "teleportCanceled": { 260 + "type": "object", 261 + "required": ["teleportUri", "reason"], 262 + "properties": { 263 + "teleportUri": { 264 + "type": "string", 265 + "format": "at-uri", 266 + "description": "The URI of the teleport record that was canceled" 267 + }, 268 + "reason": { 269 + "type": "string", 270 + "enum": ["deleted", "denied", "expired"], 271 + "description": "Why this teleport was canceled" 272 + } 273 + } 274 + }, 202 275 "streamplaceAnything": { 203 276 "type": "object", 204 277 "required": ["livestream"], ··· 208 281 "refs": [ 209 282 "#livestreamView", 210 283 "#viewerCount", 284 + "#teleportArrival", 285 + "#teleportCanceled", 211 286 "place.stream.defs#blockView", 212 287 "place.stream.defs#renditions", 213 288 "place.stream.defs#rendition",
+47
lexicons/place/stream/live/denyTeleport.json
··· 1 + { 2 + "lexicon": 1, 3 + "id": "place.stream.live.denyTeleport", 4 + "defs": { 5 + "main": { 6 + "type": "procedure", 7 + "description": "Deny an incoming teleport request.", 8 + "input": { 9 + "encoding": "application/json", 10 + "schema": { 11 + "type": "object", 12 + "required": ["uri"], 13 + "properties": { 14 + "uri": { 15 + "type": "string", 16 + "format": "at-uri", 17 + "description": "The URI of the teleport record to deny." 18 + } 19 + } 20 + } 21 + }, 22 + "output": { 23 + "encoding": "application/json", 24 + "schema": { 25 + "type": "object", 26 + "required": ["success"], 27 + "properties": { 28 + "success": { 29 + "type": "boolean", 30 + "description": "Whether the teleport was successfully denied." 31 + } 32 + } 33 + } 34 + }, 35 + "errors": [ 36 + { 37 + "name": "TeleportNotFound", 38 + "description": "The specified teleport was not found." 39 + }, 40 + { 41 + "name": "Unauthorized", 42 + "description": "The authenticated user is not the target of this teleport." 43 + } 44 + ] 45 + } 46 + } 47 + }
+43
lexicons/place/stream/livestream.json
··· 88 88 "count": { "type": "integer" } 89 89 } 90 90 }, 91 + "teleportArrival": { 92 + "type": "object", 93 + "required": ["teleportUri", "source", "viewerCount", "startsAt"], 94 + "properties": { 95 + "teleportUri": { 96 + "type": "string", 97 + "format": "at-uri", 98 + "description": "The URI of the teleport record" 99 + }, 100 + "source": { 101 + "type": "ref", 102 + "ref": "app.bsky.actor.defs#profileViewBasic", 103 + "description": "The streamer who is teleporting their viewers here" 104 + }, 105 + "viewerCount": { 106 + "type": "integer", 107 + "description": "How many viewers are arriving from this teleport" 108 + }, 109 + "startsAt": { 110 + "type": "string", 111 + "format": "datetime", 112 + "description": "When this teleport started" 113 + } 114 + } 115 + }, 116 + "teleportCanceled": { 117 + "type": "object", 118 + "required": ["teleportUri", "reason"], 119 + "properties": { 120 + "teleportUri": { 121 + "type": "string", 122 + "format": "at-uri", 123 + "description": "The URI of the teleport record that was canceled" 124 + }, 125 + "reason": { 126 + "type": "string", 127 + "enum": ["deleted", "denied", "expired"], 128 + "description": "Why this teleport was canceled" 129 + } 130 + } 131 + }, 91 132 "streamplaceAnything": { 92 133 "type": "object", 93 134 "required": ["livestream"], ··· 97 138 "refs": [ 98 139 "#livestreamView", 99 140 "#viewerCount", 141 + "#teleportArrival", 142 + "#teleportCanceled", 100 143 "place.stream.defs#blockView", 101 144 "place.stream.defs#renditions", 102 145 "place.stream.defs#rendition",
+29
pkg/api/websocket.go
··· 7 7 "net/http" 8 8 "time" 9 9 10 + bsky "github.com/bluesky-social/indigo/api/bsky" 10 11 "github.com/google/uuid" 11 12 "github.com/gorilla/websocket" 12 13 "github.com/julienschmidt/httprouter" ··· 238 239 } 239 240 for _, message := range messages { 240 241 initialBurst <- message 242 + } 243 + }() 244 + 245 + go func() { 246 + teleports, err := a.Model.GetActiveTeleportsToRepo(repoDID) 247 + if err != nil { 248 + log.Error(ctx, "could not get active teleports", "error", err) 249 + return 250 + } 251 + log.Log(ctx, "found active teleports in initial burst", "count", len(teleports), "targetDID", repoDID) 252 + for _, tp := range teleports { 253 + if tp.Repo == nil { 254 + log.Error(ctx, "teleport repo is nil", "uri", tp.URI) 255 + continue 256 + } 257 + viewerCount := a.Bus.GetViewerCount(tp.RepoDID) 258 + arrivalMsg := streamplace.Livestream_TeleportArrival{ 259 + LexiconTypeID: "place.stream.livestream#teleportArrival", 260 + TeleportUri: tp.URI, 261 + Source: &bsky.ActorDefs_ProfileViewBasic{ 262 + Did: tp.RepoDID, 263 + Handle: tp.Repo.Handle, 264 + }, 265 + ViewerCount: int64(viewerCount), 266 + StartsAt: tp.StartsAt.Format(time.RFC3339), 267 + } 268 + log.Log(ctx, "sending teleport arrival in initial burst", "from", tp.RepoDID, "to", repoDID) 269 + initialBurst <- arrivalMsg 241 270 } 242 271 }() 243 272
+8
pkg/atproto/firehose.go
··· 305 305 atsync.Bus.Publish(msg.StreamerRepoDID, mv) 306 306 } 307 307 308 + if collection.String() == constants.PLACE_STREAM_LIVE_TELEPORT { 309 + log.Warn(ctx, "deleting teleport", "userDID", evt.Repo, "uri", uri) 310 + err := atsync.Model.DeleteTeleport(ctx, uri) 311 + if err != nil { 312 + log.Error(ctx, "failed to delete teleport", "err", err) 313 + } 314 + } 315 + 308 316 if collection.String() == constants.PLACE_STREAM_MODERATION_PERMISSION { 309 317 log.Debug(ctx, "deleting moderation delegation", "userDID", evt.Repo, "rkey", rkey.String()) 310 318 err := atsync.Model.DeleteModerationDelegation(ctx, rkey.String())
+43
pkg/atproto/sync.go
··· 404 404 } 405 405 go atsync.Bus.Publish(userDID, rec) 406 406 407 + // schedule arrival notification 10 seconds after startsAt 408 + arrivalTime := startsAt.Add(10 * time.Second) 409 + waitDuration := time.Until(arrivalTime) 410 + if waitDuration < 0 { 411 + waitDuration = 0 412 + } 413 + 414 + time.AfterFunc(waitDuration, func() { 415 + // verify the teleport still exists 416 + existingTp, err := atsync.Model.GetTeleportByURI(aturi.String()) 417 + if err != nil { 418 + log.Error(ctx, "failed to get teleport by uri", "err", err) 419 + return 420 + } 421 + if existingTp == nil || existingTp.Denied { 422 + log.Debug(ctx, "teleport no longer active, skipping arrival notification", "uri", aturi.String()) 423 + return 424 + } 425 + 426 + // get the source profile 427 + sourceRepo, err := atsync.Model.GetRepo(userDID) 428 + if err != nil { 429 + log.Error(ctx, "failed to get source repo", "err", err) 430 + return 431 + } 432 + 433 + viewerCount := atsync.Bus.GetViewerCount(userDID) 434 + 435 + arrivalMsg := &streamplace.Livestream_TeleportArrival{ 436 + LexiconTypeID: "place.stream.livestream#teleportArrival", 437 + TeleportUri: aturi.String(), 438 + Source: &bsky.ActorDefs_ProfileViewBasic{ 439 + Did: userDID, 440 + Handle: sourceRepo.Handle, 441 + }, 442 + ViewerCount: int64(viewerCount), 443 + StartsAt: rec.StartsAt, 444 + } 445 + 446 + log.Log(ctx, "sending teleport arrival notification", "from", userDID, "to", rec.Streamer, "uri", aturi.String()) 447 + atsync.Bus.Publish(rec.Streamer, arrivalMsg) 448 + }) 449 + 407 450 case *streamplace.Key: 408 451 log.Debug(ctx, "creating key", "key", rec) 409 452 time, err := aqtime.FromString(rec.CreatedAt)
+1
pkg/constants/constants.go
··· 5 5 var PLACE_STREAM_CHAT_MESSAGE = "place.stream.chat.message" //nolint:all 6 6 var PLACE_STREAM_CHAT_PROFILE = "place.stream.chat.profile" //nolint:all 7 7 var PLACE_STREAM_SERVER_SETTINGS = "place.stream.server.settings" //nolint:all 8 + var PLACE_STREAM_LIVE_TELEPORT = "place.stream.live.teleport" //nolint:all 8 9 var PLACE_STREAM_MODERATION_PERMISSION = "place.stream.moderation.permission" //nolint:all 9 10 var STREAMPLACE_SIGNING_KEY = "signingKey" //nolint:all 10 11 var APP_BSKY_GRAPH_FOLLOW = "app.bsky.graph.follow" //nolint:all
+4
pkg/model/model.go
··· 77 77 CreateTeleport(ctx context.Context, tp *Teleport) error 78 78 GetLatestTeleportForRepo(repoDID string) (*Teleport, error) 79 79 GetActiveTeleportsForRepo(repoDID string) ([]Teleport, error) 80 + GetActiveTeleportsToRepo(targetDID string) ([]Teleport, error) 81 + GetTeleportByURI(uri string) (*Teleport, error) 82 + DeleteTeleport(ctx context.Context, uri string) error 83 + DenyTeleport(ctx context.Context, uri string) error 80 84 81 85 CreateBlock(ctx context.Context, block *Block) error 82 86 GetBlock(ctx context.Context, rkey string) (*Block, error)
+47
pkg/model/teleport.go
··· 18 18 Teleport *[]byte `json:"teleport"` 19 19 RepoDID string `json:"repoDID" gorm:"column:repo_did;index:idx_repo_starts,priority:1"` 20 20 TargetDID string `json:"targetDID" gorm:"column:target_did;index:idx_target_did"` 21 + Denied bool `json:"denied" gorm:"column:denied;default:false"` 21 22 Repo *Repo `json:"repo,omitempty" gorm:"foreignKey:DID;references:RepoDID"` 22 23 Target *Repo `json:"target,omitempty" gorm:"foreignKey:DID;references:TargetDID"` 23 24 } ··· 53 54 Preload("Repo"). 54 55 Preload("Target"). 55 56 Where("repo_did = ?", repoDID). 57 + Where("denied = ?", false). 56 58 Where("starts_at <= ?", now). 57 59 Where("(duration_seconds IS NULL OR DATE_ADD(starts_at, INTERVAL duration_seconds SECOND) > ?)", now). 58 60 Order("starts_at DESC"). ··· 65 67 } 66 68 return teleports, nil 67 69 } 70 + 71 + func (m *DBModel) GetActiveTeleportsToRepo(targetDID string) ([]Teleport, error) { 72 + now := time.Now() 73 + var teleports []Teleport 74 + err := m.DB. 75 + Preload("Repo"). 76 + Preload("Target"). 77 + Where("target_did = ?", targetDID). 78 + Where("denied = ?", false). 79 + Where("starts_at <= ?", now). 80 + Where("(duration_seconds IS NULL OR datetime(starts_at, '+' || duration_seconds || ' seconds') > ?)", now). 81 + Order("starts_at DESC"). 82 + Find(&teleports).Error 83 + if errors.Is(err, gorm.ErrRecordNotFound) { 84 + return nil, nil 85 + } 86 + if err != nil { 87 + return nil, fmt.Errorf("error retrieving active teleports to repo: %w", err) 88 + } 89 + return teleports, nil 90 + } 91 + 92 + func (m *DBModel) GetTeleportByURI(uri string) (*Teleport, error) { 93 + var teleport Teleport 94 + err := m.DB. 95 + Preload("Repo"). 96 + Preload("Target"). 97 + Where("uri = ?", uri). 98 + First(&teleport).Error 99 + if errors.Is(err, gorm.ErrRecordNotFound) { 100 + return nil, nil 101 + } 102 + if err != nil { 103 + return nil, fmt.Errorf("error retrieving teleport by uri: %w", err) 104 + } 105 + return &teleport, nil 106 + } 107 + 108 + func (m *DBModel) DeleteTeleport(ctx context.Context, uri string) error { 109 + return m.DB.Where("uri = ?", uri).Delete(&Teleport{}).Error 110 + } 111 + 112 + func (m *DBModel) DenyTeleport(ctx context.Context, uri string) error { 113 + return m.DB.Model(&Teleport{}).Where("uri = ?", uri).Update("denied", true).Error 114 + }
+45
pkg/spxrpc/place_stream_live.go
··· 9 9 "github.com/bluesky-social/indigo/lex/util" 10 10 "github.com/gorilla/websocket" 11 11 "github.com/labstack/echo/v4" 12 + "github.com/streamplace/oatproxy/pkg/oatproxy" 12 13 "stream.place/streamplace/pkg/log" 13 14 "stream.place/streamplace/pkg/spid" 14 15 "stream.place/streamplace/pkg/spmetrics" 15 16 16 17 placestreamtypes "stream.place/streamplace/pkg/streamplace" 17 18 ) 19 + 20 + func (s *Server) handlePlaceStreamLiveDenyTeleport(ctx context.Context, input *placestreamtypes.LiveDenyTeleport_Input) (*placestreamtypes.LiveDenyTeleport_Output, error) { 21 + session, _ := oatproxy.GetOAuthSession(ctx) 22 + if session == nil { 23 + return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found") 24 + } 25 + 26 + if input.Uri == "" { 27 + return nil, echo.NewHTTPError(http.StatusBadRequest, "URI is required") 28 + } 29 + 30 + teleport, err := s.model.GetTeleportByURI(input.Uri) 31 + if err != nil { 32 + log.Error(ctx, "failed to get teleport", "err", err) 33 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to retrieve teleport") 34 + } 35 + 36 + if teleport == nil { 37 + return nil, echo.NewHTTPError(http.StatusNotFound, "Teleport not found") 38 + } 39 + 40 + if teleport.TargetDID != session.DID { 41 + return nil, echo.NewHTTPError(http.StatusForbidden, "You are not the target of this teleport") 42 + } 43 + 44 + err = s.model.DenyTeleport(ctx, input.Uri) 45 + if err != nil { 46 + log.Error(ctx, "failed to deny teleport", "err", err) 47 + return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to deny teleport") 48 + } 49 + 50 + cancelMsg := &placestreamtypes.Livestream_TeleportCanceled{ 51 + LexiconTypeID: "place.stream.livestream#teleportCanceled", 52 + TeleportUri: input.Uri, 53 + Reason: "denied", 54 + } 55 + 56 + s.bus.Publish(teleport.RepoDID, cancelMsg) 57 + s.bus.Publish(teleport.TargetDID, cancelMsg) 58 + 59 + return &placestreamtypes.LiveDenyTeleport_Output{ 60 + Success: true, 61 + }, nil 62 + } 18 63 19 64 var replicationUpgrader = websocket.Upgrader{ 20 65 ReadBufferSize: 1024,
+19
pkg/spxrpc/stubs.go
··· 268 268 e.POST("/xrpc/place.stream.branding.updateBlob", s.HandlePlaceStreamBrandingUpdateBlob) 269 269 e.GET("/xrpc/place.stream.broadcast.getBroadcaster", s.HandlePlaceStreamBroadcastGetBroadcaster) 270 270 e.GET("/xrpc/place.stream.graph.getFollowingUser", s.HandlePlaceStreamGraphGetFollowingUser) 271 + e.POST("/xrpc/place.stream.live.denyTeleport", s.HandlePlaceStreamLiveDenyTeleport) 271 272 e.GET("/xrpc/place.stream.live.getLiveUsers", s.HandlePlaceStreamLiveGetLiveUsers) 272 273 e.GET("/xrpc/place.stream.live.getProfileCard", s.HandlePlaceStreamLiveGetProfileCard) 273 274 e.GET("/xrpc/place.stream.live.getRecommendations", s.HandlePlaceStreamLiveGetRecommendations) ··· 378 379 var handleErr error 379 380 // func (s *Server) handlePlaceStreamGraphGetFollowingUser(ctx context.Context,subjectDID string,userDID string) (*placestream.GraphGetFollowingUser_Output, error) 380 381 out, handleErr = s.handlePlaceStreamGraphGetFollowingUser(ctx, subjectDID, userDID) 382 + if handleErr != nil { 383 + return handleErr 384 + } 385 + return c.JSON(200, out) 386 + } 387 + 388 + func (s *Server) HandlePlaceStreamLiveDenyTeleport(c echo.Context) error { 389 + ctx, span := otel.Tracer("server").Start(c.Request().Context(), "HandlePlaceStreamLiveDenyTeleport") 390 + defer span.End() 391 + 392 + var body placestream.LiveDenyTeleport_Input 393 + if err := c.Bind(&body); err != nil { 394 + return err 395 + } 396 + var out *placestream.LiveDenyTeleport_Output 397 + var handleErr error 398 + // func (s *Server) handlePlaceStreamLiveDenyTeleport(ctx context.Context,body *placestream.LiveDenyTeleport_Input) (*placestream.LiveDenyTeleport_Output, error) 399 + out, handleErr = s.handlePlaceStreamLiveDenyTeleport(ctx, &body) 381 400 if handleErr != nil { 382 401 return handleErr 383 402 }
+33
pkg/streamplace/livedenyTeleport.go
··· 1 + // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 + 3 + // Lexicon schema: place.stream.live.denyTeleport 4 + 5 + package streamplace 6 + 7 + import ( 8 + "context" 9 + 10 + lexutil "github.com/bluesky-social/indigo/lex/util" 11 + ) 12 + 13 + // LiveDenyTeleport_Input is the input argument to a place.stream.live.denyTeleport call. 14 + type LiveDenyTeleport_Input struct { 15 + // uri: The URI of the teleport record to deny. 16 + Uri string `json:"uri" cborgen:"uri"` 17 + } 18 + 19 + // LiveDenyTeleport_Output is the output of a place.stream.live.denyTeleport call. 20 + type LiveDenyTeleport_Output struct { 21 + // success: Whether the teleport was successfully denied. 22 + Success bool `json:"success" cborgen:"success"` 23 + } 24 + 25 + // LiveDenyTeleport calls the XRPC method "place.stream.live.denyTeleport". 26 + func LiveDenyTeleport(ctx context.Context, c lexutil.LexClient, input *LiveDenyTeleport_Input) (*LiveDenyTeleport_Output, error) { 27 + var out LiveDenyTeleport_Output 28 + if err := c.LexDo(ctx, lexutil.Procedure, "application/json", "place.stream.live.denyTeleport", nil, input, &out); err != nil { 29 + return nil, err 30 + } 31 + 32 + return &out, nil 33 + }
+7 -7
pkg/streamplace/liveteleport.go
··· 1 1 // Code generated by cmd/lexgen (see Makefile's lexgen); DO NOT EDIT. 2 2 3 - package streamplace 3 + // Lexicon schema: place.stream.live.teleport 4 4 5 - // schema: place.stream.live.teleport 5 + package streamplace 6 6 7 7 import ( 8 - "github.com/bluesky-social/indigo/lex/util" 8 + lexutil "github.com/bluesky-social/indigo/lex/util" 9 9 ) 10 10 11 11 func init() { 12 - util.RegisterType("place.stream.live.teleport", &LiveTeleport{}) 13 - } // 14 - // RECORDTYPE: LiveTeleport 12 + lexutil.RegisterType("place.stream.live.teleport", &LiveTeleport{}) 13 + } 14 + 15 15 type LiveTeleport struct { 16 - LexiconTypeID string `json:"$type,const=place.stream.live.teleport" cborgen:"$type,const=place.stream.live.teleport"` 16 + LexiconTypeID string `json:"$type" cborgen:"$type,const=place.stream.live.teleport"` 17 17 // durationSeconds: The time limit in seconds for the teleport. If not set, the teleport is permanent. Must be at least 60 seconds, and no more than 32,400 seconds (9 hours). 18 18 DurationSeconds *int64 `json:"durationSeconds,omitempty" cborgen:"durationSeconds,omitempty"` 19 19 // startsAt: The time the teleport becomes active.
+44 -6
pkg/streamplace/streamlivestream.go
··· 59 59 } 60 60 61 61 type Livestream_StreamplaceAnything_Livestream struct { 62 - Livestream_LivestreamView *Livestream_LivestreamView 63 - Livestream_ViewerCount *Livestream_ViewerCount 64 - Defs_BlockView *Defs_BlockView 65 - Defs_Renditions *Defs_Renditions 66 - Defs_Rendition *Defs_Rendition 67 - ChatDefs_MessageView *ChatDefs_MessageView 62 + Livestream_LivestreamView *Livestream_LivestreamView 63 + Livestream_ViewerCount *Livestream_ViewerCount 64 + Livestream_TeleportArrival *Livestream_TeleportArrival 65 + Livestream_TeleportCanceled *Livestream_TeleportCanceled 66 + Defs_BlockView *Defs_BlockView 67 + Defs_Renditions *Defs_Renditions 68 + Defs_Rendition *Defs_Rendition 69 + ChatDefs_MessageView *ChatDefs_MessageView 68 70 } 69 71 70 72 func (t *Livestream_StreamplaceAnything_Livestream) MarshalJSON() ([]byte, error) { ··· 75 77 if t.Livestream_ViewerCount != nil { 76 78 t.Livestream_ViewerCount.LexiconTypeID = "place.stream.livestream#viewerCount" 77 79 return json.Marshal(t.Livestream_ViewerCount) 80 + } 81 + if t.Livestream_TeleportArrival != nil { 82 + t.Livestream_TeleportArrival.LexiconTypeID = "place.stream.livestream#teleportArrival" 83 + return json.Marshal(t.Livestream_TeleportArrival) 84 + } 85 + if t.Livestream_TeleportCanceled != nil { 86 + t.Livestream_TeleportCanceled.LexiconTypeID = "place.stream.livestream#teleportCanceled" 87 + return json.Marshal(t.Livestream_TeleportCanceled) 78 88 } 79 89 if t.Defs_BlockView != nil { 80 90 t.Defs_BlockView.LexiconTypeID = "place.stream.defs#blockView" ··· 108 118 case "place.stream.livestream#viewerCount": 109 119 t.Livestream_ViewerCount = new(Livestream_ViewerCount) 110 120 return json.Unmarshal(b, t.Livestream_ViewerCount) 121 + case "place.stream.livestream#teleportArrival": 122 + t.Livestream_TeleportArrival = new(Livestream_TeleportArrival) 123 + return json.Unmarshal(b, t.Livestream_TeleportArrival) 124 + case "place.stream.livestream#teleportCanceled": 125 + t.Livestream_TeleportCanceled = new(Livestream_TeleportCanceled) 126 + return json.Unmarshal(b, t.Livestream_TeleportCanceled) 111 127 case "place.stream.defs#blockView": 112 128 t.Defs_BlockView = new(Defs_BlockView) 113 129 return json.Unmarshal(b, t.Defs_BlockView) ··· 123 139 default: 124 140 return nil 125 141 } 142 + } 143 + 144 + // Livestream_TeleportArrival is a "teleportArrival" in the place.stream.livestream schema. 145 + type Livestream_TeleportArrival struct { 146 + LexiconTypeID string `json:"$type" cborgen:"$type,const=place.stream.livestream#teleportArrival"` 147 + // source: The streamer who is teleporting their viewers here 148 + Source *appbsky.ActorDefs_ProfileViewBasic `json:"source" cborgen:"source"` 149 + // startsAt: When this teleport started 150 + StartsAt string `json:"startsAt" cborgen:"startsAt"` 151 + // teleportUri: The URI of the teleport record 152 + TeleportUri string `json:"teleportUri" cborgen:"teleportUri"` 153 + // viewerCount: How many viewers are arriving from this teleport 154 + ViewerCount int64 `json:"viewerCount" cborgen:"viewerCount"` 155 + } 156 + 157 + // Livestream_TeleportCanceled is a "teleportCanceled" in the place.stream.livestream schema. 158 + type Livestream_TeleportCanceled struct { 159 + LexiconTypeID string `json:"$type" cborgen:"$type,const=place.stream.livestream#teleportCanceled"` 160 + // reason: Why this teleport was canceled 161 + Reason string `json:"reason" cborgen:"reason"` 162 + // teleportUri: The URI of the teleport record that was canceled 163 + TeleportUri string `json:"teleportUri" cborgen:"teleportUri"` 126 164 } 127 165 128 166 // Livestream_ViewerCount is a "viewerCount" in the place.stream.livestream schema.