Live video on the AT Protocol
1package spxrpc
2
3import (
4 "context"
5 "encoding/json"
6 "net/http"
7 "net/url"
8 "strconv"
9 "strings"
10
11 "github.com/labstack/echo/v4"
12 "github.com/streamplace/oatproxy/pkg/oatproxy"
13 "stream.place/streamplace/pkg/log"
14 "stream.place/streamplace/pkg/statedb"
15 placestreamtypes "stream.place/streamplace/pkg/streamplace"
16)
17
18func (s *Server) handlePlaceStreamServerCreateWebhook(ctx context.Context, input *placestreamtypes.ServerCreateWebhook_Input) (*placestreamtypes.ServerCreateWebhook_Output, error) {
19 // Get authenticated user
20 session, _ := oatproxy.GetOAuthSession(ctx)
21 if session == nil {
22 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found")
23 }
24
25 // Validate input
26 if input.Url == "" {
27 return nil, echo.NewHTTPError(http.StatusBadRequest, "URL is required")
28 }
29 if len(input.Events) == 0 {
30 return nil, echo.NewHTTPError(http.StatusBadRequest, "At least one event type is required")
31 }
32
33 // Validate URL format
34 if _, err := url.Parse(input.Url); err != nil {
35 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid URL format")
36 }
37 // Convert input to database model using the conversion function
38 webhook, err := statedb.WebhookFromLexiconInput(input, session.DID, "") // ID will be generated by the database
39 if err != nil {
40 log.Error(ctx, "failed to convert input to webhook", "err", err)
41 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to create webhook")
42 }
43
44 // Create webhook
45 err = s.statefulDB.CreateWebhook(webhook)
46 if err != nil {
47 log.Error(ctx, "failed to create webhook", "err", err)
48 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to create webhook")
49 }
50
51 // Convert to API response
52 apiWebhook, err := webhook.ToLexicon()
53 if err != nil {
54 log.Error(ctx, "failed to convert webhook to API format", "err", err)
55 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to format webhook response")
56 }
57
58 return &placestreamtypes.ServerCreateWebhook_Output{
59 Webhook: apiWebhook,
60 }, nil
61}
62
63func (s *Server) handlePlaceStreamServerListWebhooks(ctx context.Context, active *bool, cursor string, event string, limit int) (*placestreamtypes.ServerListWebhooks_Output, error) {
64 // Get authenticated user
65 session, _ := oatproxy.GetOAuthSession(ctx)
66 if session == nil {
67 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found")
68 }
69
70 // Set default limit
71 if limit <= 0 || limit > 100 {
72 limit = 50
73 }
74
75 // Parse cursor for offset
76 offset := 0
77 if cursor != "" {
78 var err error
79 offset, err = strconv.Atoi(cursor)
80 if err != nil {
81 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid cursor")
82 }
83 }
84
85 // Build filters
86 filters := make(map[string]interface{})
87 if active != nil {
88 filters["active"] = *active
89 }
90
91 // Get webhooks
92 webhooks, err := s.statefulDB.ListWebhooks(session.DID, limit+1, offset, filters)
93 if err != nil {
94 log.Error(ctx, "failed to list webhooks", "err", err)
95 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to list webhooks")
96 }
97
98 // Filter by event type if specified
99 if event != "" {
100 filtered := make([]statedb.Webhook, 0)
101 for _, w := range webhooks {
102 var events []string
103 if err := json.Unmarshal(w.Events, &events); err == nil {
104 for _, e := range events {
105 if e == event {
106 filtered = append(filtered, w)
107 break
108 }
109 }
110 }
111 }
112 webhooks = filtered
113 }
114
115 // Check if there are more results
116 var nextCursor *string
117 if len(webhooks) > limit {
118 webhooks = webhooks[:limit]
119 next := strconv.Itoa(offset + limit)
120 nextCursor = &next
121 }
122
123 // Convert to API format
124 apiWebhooks := make([]*placestreamtypes.ServerDefs_Webhook, len(webhooks))
125 for i, webhook := range webhooks {
126 apiWebhook, err := webhook.ToLexicon()
127 if err != nil {
128 log.Error(ctx, "failed to convert webhook to API format", "err", err)
129 continue
130 }
131 apiWebhooks[i] = apiWebhook
132 }
133
134 return &placestreamtypes.ServerListWebhooks_Output{
135 Webhooks: apiWebhooks,
136 Cursor: nextCursor,
137 }, nil
138}
139
140func (s *Server) handlePlaceStreamServerGetWebhook(ctx context.Context, id string) (*placestreamtypes.ServerGetWebhook_Output, error) {
141 // Get authenticated user
142 session, _ := oatproxy.GetOAuthSession(ctx)
143 if session == nil {
144 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found")
145 }
146
147 // Get webhook
148 webhook, err := s.statefulDB.GetWebhook(id, session.DID)
149 if err != nil {
150 if strings.Contains(err.Error(), "record not found") {
151 return nil, echo.NewHTTPError(http.StatusNotFound, "Webhook not found")
152 }
153 log.Error(ctx, "failed to get webhook", "err", err)
154 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to get webhook")
155 }
156
157 // Convert to API format
158 apiWebhook, err := webhook.ToLexicon()
159 if err != nil {
160 log.Error(ctx, "failed to convert webhook to API format", "err", err)
161 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to format webhook response")
162 }
163
164 return &placestreamtypes.ServerGetWebhook_Output{
165 Webhook: apiWebhook,
166 }, nil
167}
168
169func (s *Server) handlePlaceStreamServerUpdateWebhook(ctx context.Context, input *placestreamtypes.ServerUpdateWebhook_Input) (*placestreamtypes.ServerUpdateWebhook_Output, error) {
170 // Get authenticated user
171 session, _ := oatproxy.GetOAuthSession(ctx)
172 if session == nil {
173 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found")
174 }
175
176 // Validate URL if provided
177 if input.Url != nil {
178 if _, err := url.Parse(*input.Url); err != nil {
179 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid URL format")
180 }
181 }
182
183 // Build updates map
184 updates := make(map[string]interface{})
185 if input.Url != nil {
186 updates["url"] = *input.Url
187 }
188 if input.Events != nil {
189 eventsJSON, err := json.Marshal(input.Events)
190 if err != nil {
191 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid events format")
192 }
193 updates["events"] = json.RawMessage(eventsJSON)
194 }
195 if input.Active != nil {
196 updates["active"] = *input.Active
197 }
198 if input.Prefix != nil {
199 updates["prefix"] = *input.Prefix
200 }
201 if input.Suffix != nil {
202 updates["suffix"] = *input.Suffix
203 }
204 if input.Rewrite != nil {
205 rewriteJSON, err := json.Marshal(input.Rewrite)
206 if err != nil {
207 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid rewrite rules format")
208 }
209 updates["rewrite"] = json.RawMessage(rewriteJSON)
210 }
211 if input.Name != nil {
212 updates["name"] = *input.Name
213 }
214 if input.Description != nil {
215 updates["description"] = *input.Description
216 }
217 if input.MuteWords != nil {
218 muteWordsJSON, err := json.Marshal(input.MuteWords)
219 if err != nil {
220 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid mute words format")
221 }
222 updates["mute_words"] = json.RawMessage(muteWordsJSON)
223 }
224
225 if len(updates) == 0 {
226 return nil, echo.NewHTTPError(http.StatusBadRequest, "No fields to update")
227 }
228
229 // Update webhook
230 webhook, err := s.statefulDB.UpdateWebhook(input.Id, session.DID, updates)
231 if err != nil {
232 if strings.Contains(err.Error(), "record not found") {
233 return nil, echo.NewHTTPError(http.StatusNotFound, "Webhook not found")
234 }
235 log.Error(ctx, "failed to update webhook", "err", err)
236 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to update webhook")
237 }
238
239 // Convert to API format
240 apiWebhook, err := webhook.ToLexicon()
241 if err != nil {
242 log.Error(ctx, "failed to convert webhook to API format", "err", err)
243 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to format webhook response")
244 }
245
246 return &placestreamtypes.ServerUpdateWebhook_Output{
247 Webhook: apiWebhook,
248 }, nil
249}
250
251func (s *Server) handlePlaceStreamServerDeleteWebhook(ctx context.Context, input *placestreamtypes.ServerDeleteWebhook_Input) (*placestreamtypes.ServerDeleteWebhook_Output, error) {
252 // Get authenticated user
253 session, _ := oatproxy.GetOAuthSession(ctx)
254 if session == nil {
255 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found")
256 }
257
258 // Delete webhook
259 err := s.statefulDB.DeleteWebhook(input.Id, session.DID)
260 if err != nil {
261 log.Error(ctx, "failed to delete webhook", "err", err)
262 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to delete webhook")
263 }
264
265 return &placestreamtypes.ServerDeleteWebhook_Output{
266 Success: true,
267 }, nil
268}