Live video on the AT Protocol
at eli/docker-deployment-docs 268 lines 8.3 kB view raw
1package spxrpc 2 3import ( 4 "context" 5 "encoding/json" 6 "net/http" 7 "net/url" 8 "strconv" 9 "strings" 10 11 "github.com/labstack/echo/v4" 12 "github.com/streamplace/oatproxy/pkg/oatproxy" 13 "stream.place/streamplace/pkg/log" 14 "stream.place/streamplace/pkg/statedb" 15 placestreamtypes "stream.place/streamplace/pkg/streamplace" 16) 17 18func (s *Server) handlePlaceStreamServerCreateWebhook(ctx context.Context, input *placestreamtypes.ServerCreateWebhook_Input) (*placestreamtypes.ServerCreateWebhook_Output, error) { 19 // Get authenticated user 20 session, _ := oatproxy.GetOAuthSession(ctx) 21 if session == nil { 22 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found") 23 } 24 25 // Validate input 26 if input.Url == "" { 27 return nil, echo.NewHTTPError(http.StatusBadRequest, "URL is required") 28 } 29 if len(input.Events) == 0 { 30 return nil, echo.NewHTTPError(http.StatusBadRequest, "At least one event type is required") 31 } 32 33 // Validate URL format 34 if _, err := url.Parse(input.Url); err != nil { 35 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid URL format") 36 } 37 // Convert input to database model using the conversion function 38 webhook, err := statedb.WebhookFromLexiconInput(input, session.DID, "") // ID will be generated by the database 39 if err != nil { 40 log.Error(ctx, "failed to convert input to webhook", "err", err) 41 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to create webhook") 42 } 43 44 // Create webhook 45 err = s.statefulDB.CreateWebhook(webhook) 46 if err != nil { 47 log.Error(ctx, "failed to create webhook", "err", err) 48 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to create webhook") 49 } 50 51 // Convert to API response 52 apiWebhook, err := webhook.ToLexicon() 53 if err != nil { 54 log.Error(ctx, "failed to convert webhook to API format", "err", err) 55 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to format webhook response") 56 } 57 58 return &placestreamtypes.ServerCreateWebhook_Output{ 59 Webhook: apiWebhook, 60 }, nil 61} 62 63func (s *Server) handlePlaceStreamServerListWebhooks(ctx context.Context, active *bool, cursor string, event string, limit int) (*placestreamtypes.ServerListWebhooks_Output, error) { 64 // Get authenticated user 65 session, _ := oatproxy.GetOAuthSession(ctx) 66 if session == nil { 67 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found") 68 } 69 70 // Set default limit 71 if limit <= 0 || limit > 100 { 72 limit = 50 73 } 74 75 // Parse cursor for offset 76 offset := 0 77 if cursor != "" { 78 var err error 79 offset, err = strconv.Atoi(cursor) 80 if err != nil { 81 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid cursor") 82 } 83 } 84 85 // Build filters 86 filters := make(map[string]interface{}) 87 if active != nil { 88 filters["active"] = *active 89 } 90 91 // Get webhooks 92 webhooks, err := s.statefulDB.ListWebhooks(session.DID, limit+1, offset, filters) 93 if err != nil { 94 log.Error(ctx, "failed to list webhooks", "err", err) 95 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to list webhooks") 96 } 97 98 // Filter by event type if specified 99 if event != "" { 100 filtered := make([]statedb.Webhook, 0) 101 for _, w := range webhooks { 102 var events []string 103 if err := json.Unmarshal(w.Events, &events); err == nil { 104 for _, e := range events { 105 if e == event { 106 filtered = append(filtered, w) 107 break 108 } 109 } 110 } 111 } 112 webhooks = filtered 113 } 114 115 // Check if there are more results 116 var nextCursor *string 117 if len(webhooks) > limit { 118 webhooks = webhooks[:limit] 119 next := strconv.Itoa(offset + limit) 120 nextCursor = &next 121 } 122 123 // Convert to API format 124 apiWebhooks := make([]*placestreamtypes.ServerDefs_Webhook, len(webhooks)) 125 for i, webhook := range webhooks { 126 apiWebhook, err := webhook.ToLexicon() 127 if err != nil { 128 log.Error(ctx, "failed to convert webhook to API format", "err", err) 129 continue 130 } 131 apiWebhooks[i] = apiWebhook 132 } 133 134 return &placestreamtypes.ServerListWebhooks_Output{ 135 Webhooks: apiWebhooks, 136 Cursor: nextCursor, 137 }, nil 138} 139 140func (s *Server) handlePlaceStreamServerGetWebhook(ctx context.Context, id string) (*placestreamtypes.ServerGetWebhook_Output, error) { 141 // Get authenticated user 142 session, _ := oatproxy.GetOAuthSession(ctx) 143 if session == nil { 144 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found") 145 } 146 147 // Get webhook 148 webhook, err := s.statefulDB.GetWebhook(id, session.DID) 149 if err != nil { 150 if strings.Contains(err.Error(), "record not found") { 151 return nil, echo.NewHTTPError(http.StatusNotFound, "Webhook not found") 152 } 153 log.Error(ctx, "failed to get webhook", "err", err) 154 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to get webhook") 155 } 156 157 // Convert to API format 158 apiWebhook, err := webhook.ToLexicon() 159 if err != nil { 160 log.Error(ctx, "failed to convert webhook to API format", "err", err) 161 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to format webhook response") 162 } 163 164 return &placestreamtypes.ServerGetWebhook_Output{ 165 Webhook: apiWebhook, 166 }, nil 167} 168 169func (s *Server) handlePlaceStreamServerUpdateWebhook(ctx context.Context, input *placestreamtypes.ServerUpdateWebhook_Input) (*placestreamtypes.ServerUpdateWebhook_Output, error) { 170 // Get authenticated user 171 session, _ := oatproxy.GetOAuthSession(ctx) 172 if session == nil { 173 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found") 174 } 175 176 // Validate URL if provided 177 if input.Url != nil { 178 if _, err := url.Parse(*input.Url); err != nil { 179 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid URL format") 180 } 181 } 182 183 // Build updates map 184 updates := make(map[string]interface{}) 185 if input.Url != nil { 186 updates["url"] = *input.Url 187 } 188 if input.Events != nil { 189 eventsJSON, err := json.Marshal(input.Events) 190 if err != nil { 191 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid events format") 192 } 193 updates["events"] = json.RawMessage(eventsJSON) 194 } 195 if input.Active != nil { 196 updates["active"] = *input.Active 197 } 198 if input.Prefix != nil { 199 updates["prefix"] = *input.Prefix 200 } 201 if input.Suffix != nil { 202 updates["suffix"] = *input.Suffix 203 } 204 if input.Rewrite != nil { 205 rewriteJSON, err := json.Marshal(input.Rewrite) 206 if err != nil { 207 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid rewrite rules format") 208 } 209 updates["rewrite"] = json.RawMessage(rewriteJSON) 210 } 211 if input.Name != nil { 212 updates["name"] = *input.Name 213 } 214 if input.Description != nil { 215 updates["description"] = *input.Description 216 } 217 if input.MuteWords != nil { 218 muteWordsJSON, err := json.Marshal(input.MuteWords) 219 if err != nil { 220 return nil, echo.NewHTTPError(http.StatusBadRequest, "Invalid mute words format") 221 } 222 updates["mute_words"] = json.RawMessage(muteWordsJSON) 223 } 224 225 if len(updates) == 0 { 226 return nil, echo.NewHTTPError(http.StatusBadRequest, "No fields to update") 227 } 228 229 // Update webhook 230 webhook, err := s.statefulDB.UpdateWebhook(input.Id, session.DID, updates) 231 if err != nil { 232 if strings.Contains(err.Error(), "record not found") { 233 return nil, echo.NewHTTPError(http.StatusNotFound, "Webhook not found") 234 } 235 log.Error(ctx, "failed to update webhook", "err", err) 236 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to update webhook") 237 } 238 239 // Convert to API format 240 apiWebhook, err := webhook.ToLexicon() 241 if err != nil { 242 log.Error(ctx, "failed to convert webhook to API format", "err", err) 243 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to format webhook response") 244 } 245 246 return &placestreamtypes.ServerUpdateWebhook_Output{ 247 Webhook: apiWebhook, 248 }, nil 249} 250 251func (s *Server) handlePlaceStreamServerDeleteWebhook(ctx context.Context, input *placestreamtypes.ServerDeleteWebhook_Input) (*placestreamtypes.ServerDeleteWebhook_Output, error) { 252 // Get authenticated user 253 session, _ := oatproxy.GetOAuthSession(ctx) 254 if session == nil { 255 return nil, echo.NewHTTPError(http.StatusUnauthorized, "oauth session not found") 256 } 257 258 // Delete webhook 259 err := s.statefulDB.DeleteWebhook(input.Id, session.DID) 260 if err != nil { 261 log.Error(ctx, "failed to delete webhook", "err", err) 262 return nil, echo.NewHTTPError(http.StatusInternalServerError, "Failed to delete webhook") 263 } 264 265 return &placestreamtypes.ServerDeleteWebhook_Output{ 266 Success: true, 267 }, nil 268}