Live video on the AT Protocol
at next 303 lines 9.0 kB view raw
1package statedb 2 3import ( 4 "encoding/json" 5 "errors" 6 "fmt" 7 "time" 8 9 "github.com/google/uuid" 10 "gorm.io/gorm" 11 "stream.place/streamplace/pkg/streamplace" 12) 13 14type Webhook struct { 15 // UUID primary key 16 ID string `gorm:"column:id;primarykey"` 17 UserDID string `gorm:"column:user_did;not null;index"` 18 URL string `gorm:"column:url;not null"` 19 20 Events json.RawMessage `gorm:"column:events;type:json"` 21 Active bool `gorm:"column:active;default:false"` 22 Prefix string `gorm:"column:prefix"` 23 Suffix string `gorm:"column:suffix"` 24 Rewrite json.RawMessage `gorm:"column:rewrite;type:json"` 25 MuteWords json.RawMessage `gorm:"column:mute_words;type:json"` 26 Name string `gorm:"column:name"` 27 Description string `gorm:"column:description"` 28 CreatedAt time.Time `gorm:"column:created_at"` 29 UpdatedAt time.Time `gorm:"column:updated_at"` 30 LastTriggered *time.Time `gorm:"column:last_triggered"` 31 ErrorCount int `gorm:"column:error_count;default:0"` 32} 33 34func (w *Webhook) TableName() string { 35 return "webhooks" 36} 37 38// CreateWebhook creates a new webhook for a user 39func (state *StatefulDB) CreateWebhook(webhook *Webhook) error { 40 if webhook.URL == "" { 41 return fmt.Errorf("webhook URL cannot be empty") 42 } 43 44 // Generate ID if not provided 45 if webhook.ID == "" { 46 uu, err := uuid.NewV7() 47 if err != nil { 48 return fmt.Errorf("failed to generate webhook ID: %w", err) 49 } 50 webhook.ID = uu.String() 51 } 52 53 if webhook.CreatedAt.IsZero() { 54 webhook.CreatedAt = time.Now() 55 } 56 if webhook.UpdatedAt.IsZero() { 57 webhook.UpdatedAt = time.Now() 58 } 59 result := state.DB.Create(webhook) 60 if result.Error != nil { 61 return fmt.Errorf("database create failed - Error: %v, ErrorType: %T, RowsAffected: %d", 62 result.Error, result.Error, result.RowsAffected) 63 } 64 65 return nil 66} 67 68// GetWebhook retrieves a webhook by ID and user DID 69func (state *StatefulDB) GetWebhook(id string, userDID string) (*Webhook, error) { 70 var webhook Webhook 71 err := state.DB.Where("id = ? AND user_did = ?", id, userDID).First(&webhook).Error 72 if err != nil { 73 if errors.Is(err, gorm.ErrRecordNotFound) { 74 return nil, fmt.Errorf("webhook not found") 75 } 76 return nil, err 77 } 78 return &webhook, nil 79} 80 81// ListWebhooks retrieves webhooks for a user with optional filters 82func (state *StatefulDB) ListWebhooks(userDID string, limit int, offset int, filters map[string]any) ([]Webhook, error) { 83 var webhooks []Webhook 84 query := state.DB.Where("user_did = ?", userDID) 85 86 // Apply filters 87 for key, value := range filters { 88 if value != nil { 89 query = query.Where(key+" = ?", value) 90 } 91 } 92 93 err := query.Limit(limit).Offset(offset).Order("created_at DESC").Find(&webhooks).Error 94 return webhooks, err 95} 96 97// UpdateWebhook updates an existing webhook 98func (state *StatefulDB) UpdateWebhook(id string, userDID string, updates map[string]interface{}) (*Webhook, error) { 99 updates["updated_at"] = time.Now() 100 result := state.DB.Model(&Webhook{}).Where("id = ? AND user_did = ?", id, userDID).Updates(updates) 101 if result.Error != nil { 102 return nil, result.Error 103 } 104 if result.RowsAffected == 0 { 105 return nil, fmt.Errorf("webhook not found or access denied") 106 } 107 return state.GetWebhook(id, userDID) 108} 109 110// DeleteWebhook deletes a webhook by ID and user DID 111func (state *StatefulDB) DeleteWebhook(id string, userDID string) error { 112 result := state.DB.Where("id = ? AND user_did = ?", id, userDID).Delete(&Webhook{}) 113 if result.Error != nil { 114 return result.Error 115 } 116 if result.RowsAffected == 0 { 117 return fmt.Errorf("webhook not found or access denied") 118 } 119 return nil 120} 121 122// GetActiveWebhooksForUser retrieves active webhooks for a user filtered by event type 123func (state *StatefulDB) GetActiveWebhooksForUser(userDID string, eventType string) ([]Webhook, error) { 124 var webhooks []Webhook 125 var err error 126 if state.Type == DBTypePostgres { 127 // cast to jsonb and use @> operator here 128 err = state.DB.Where("user_did = ? AND active = ? AND events::jsonb @> ?", 129 userDID, true, fmt.Sprintf(`["%s"]`, eventType)).Find(&webhooks).Error 130 } else { 131 // SQLite: Use JSON_EXTRACT with JSON_EACH to check if array contains the event 132 err = state.DB.Where("user_did = ? AND active = ? AND EXISTS (SELECT 1 FROM json_each(events) WHERE value = ?)", 133 userDID, true, eventType).Find(&webhooks).Error 134 } 135 return webhooks, err 136} 137 138// IncrementWebhookError increments the error count for a webhook 139func (state *StatefulDB) IncrementWebhookError(id string) error { 140 return state.DB.Model(&Webhook{}).Where("id = ?", id).UpdateColumn("error_count", state.DB.Raw("error_count + 1")).Error 141} 142 143// ResetWebhookError resets the error count for a webhook 144func (state *StatefulDB) ResetWebhookError(id string) error { 145 return state.DB.Model(&Webhook{}).Where("id = ?", id).Updates(map[string]interface{}{ 146 "error_count": 0, 147 "last_triggered": time.Now(), 148 }).Error 149} 150 151// ToLexicon converts a database Webhook to a streamplace.ServerDefs_Webhook 152func (w *Webhook) ToLexicon() (*streamplace.ServerDefs_Webhook, error) { 153 var events []string 154 if len(w.Events) > 0 { 155 err := json.Unmarshal(w.Events, &events) 156 if err != nil { 157 return nil, fmt.Errorf("failed to unmarshal events: %w", err) 158 } 159 } 160 161 var rewriteRules []*streamplace.ServerDefs_RewriteRule 162 if len(w.Rewrite) > 0 { 163 var dbRules []map[string]string 164 err := json.Unmarshal(w.Rewrite, &dbRules) 165 if err != nil { 166 return nil, fmt.Errorf("failed to unmarshal rewrite rules: %w", err) 167 } 168 for _, rule := range dbRules { 169 rewriteRules = append(rewriteRules, &streamplace.ServerDefs_RewriteRule{ 170 From: rule["from"], 171 To: rule["to"], 172 }) 173 } 174 } 175 176 var muteWords []string 177 if len(w.MuteWords) > 0 { 178 err := json.Unmarshal(w.MuteWords, &muteWords) 179 if err != nil { 180 return nil, fmt.Errorf("failed to unmarshal mute words: %w", err) 181 } 182 } 183 184 createdAt := w.CreatedAt.Format(time.RFC3339) 185 186 webhook := &streamplace.ServerDefs_Webhook{ 187 Id: w.ID, 188 Url: w.URL, 189 Events: events, 190 Active: w.Active, 191 CreatedAt: createdAt, 192 Rewrite: rewriteRules, 193 } 194 195 if w.Prefix != "" { 196 webhook.Prefix = &w.Prefix 197 } 198 if w.Suffix != "" { 199 webhook.Suffix = &w.Suffix 200 } 201 if w.Name != "" { 202 webhook.Name = &w.Name 203 } 204 if w.Description != "" { 205 webhook.Description = &w.Description 206 } 207 if len(muteWords) > 0 { 208 webhook.MuteWords = muteWords 209 } 210 if !w.UpdatedAt.IsZero() { 211 updatedAt := w.UpdatedAt.Format(time.RFC3339) 212 webhook.UpdatedAt = &updatedAt 213 } 214 if w.LastTriggered != nil { 215 lastTriggered := w.LastTriggered.Format(time.RFC3339) 216 webhook.LastTriggered = &lastTriggered 217 } 218 if w.ErrorCount > 0 { 219 errorCount := int64(w.ErrorCount) 220 webhook.ErrorCount = &errorCount 221 } 222 223 return webhook, nil 224} 225 226// FromLexiconInput converts a streamplace.ServerCreateWebhook_Input to a database Webhook 227func WebhookFromLexiconInput(input *streamplace.ServerCreateWebhook_Input, userDID, id string) (*Webhook, error) { 228 // Debug log the raw input 229 fmt.Printf("DEBUG: WebhookFromLexiconInput input.Events: %+v (type: %T)\n", input.Events, input.Events) 230 for i, event := range input.Events { 231 fmt.Printf("DEBUG: Event[%d]: %q (type: %T)\n", i, event, event) 232 } 233 234 var eventsJSON json.RawMessage 235 if len(input.Events) > 0 { 236 jsonBytes, err := json.Marshal(input.Events) 237 if err != nil { 238 return nil, fmt.Errorf("failed to marshal events: %w", err) 239 } 240 fmt.Printf("DEBUG: Marshaled events JSON: %q\n", string(jsonBytes)) 241 eventsJSON = json.RawMessage(jsonBytes) 242 } else { 243 // Default to empty array if no events provided 244 eventsJSON = json.RawMessage(`[]`) 245 } 246 247 var rewriteJSON json.RawMessage 248 if len(input.Rewrite) > 0 { 249 dbRules := make([]map[string]string, len(input.Rewrite)) 250 for i, rule := range input.Rewrite { 251 dbRules[i] = map[string]string{ 252 "from": rule.From, 253 "to": rule.To, 254 } 255 } 256 jsonBytes, err := json.Marshal(dbRules) 257 if err != nil { 258 return nil, fmt.Errorf("failed to marshal rewrite rules: %w", err) 259 } 260 rewriteJSON = json.RawMessage(jsonBytes) 261 } 262 263 var muteWordsJSON json.RawMessage 264 if len(input.MuteWords) > 0 { 265 jsonBytes, err := json.Marshal(input.MuteWords) 266 if err != nil { 267 return nil, fmt.Errorf("failed to marshal mute words: %w", err) 268 } 269 muteWordsJSON = json.RawMessage(jsonBytes) 270 } 271 272 webhook := &Webhook{ 273 ID: id, 274 UserDID: userDID, 275 URL: input.Url, 276 Events: eventsJSON, 277 Active: true, // Default to true as per database schema 278 CreatedAt: time.Now(), 279 UpdatedAt: time.Now(), 280 Rewrite: rewriteJSON, 281 MuteWords: muteWordsJSON, 282 } 283 284 // if active is provided, use that value 285 if input.Active != nil { 286 webhook.Active = *input.Active 287 } 288 289 if input.Prefix != nil { 290 webhook.Prefix = *input.Prefix 291 } 292 if input.Suffix != nil { 293 webhook.Suffix = *input.Suffix 294 } 295 if input.Name != nil { 296 webhook.Name = *input.Name 297 } 298 if input.Description != nil { 299 webhook.Description = *input.Description 300 } 301 302 return webhook, nil 303}