Live video on the AT Protocol
at eli/deterministic-muxing 281 lines 8.4 kB view raw
1package statedb 2 3import ( 4 "encoding/json" 5 "errors" 6 "fmt" 7 "time" 8 9 "github.com/google/uuid" 10 "gorm.io/gorm" 11 "stream.place/streamplace/pkg/streamplace" 12) 13 14type Webhook struct { 15 // UUID primary key 16 ID string `gorm:"column:id;primarykey"` 17 UserDID string `gorm:"column:user_did;not null;index"` 18 URL string `gorm:"column:url;not null"` 19 20 Events json.RawMessage `gorm:"column:events;type:json"` 21 Active bool `gorm:"column:active;default:false"` 22 Prefix string `gorm:"column:prefix"` 23 Suffix string `gorm:"column:suffix"` 24 Rewrite json.RawMessage `gorm:"column:rewrite;type:json"` 25 Name string `gorm:"column:name"` 26 Description string `gorm:"column:description"` 27 CreatedAt time.Time `gorm:"column:created_at"` 28 UpdatedAt time.Time `gorm:"column:updated_at"` 29 LastTriggered *time.Time `gorm:"column:last_triggered"` 30 ErrorCount int `gorm:"column:error_count;default:0"` 31} 32 33func (w *Webhook) TableName() string { 34 return "webhooks" 35} 36 37// CreateWebhook creates a new webhook for a user 38func (state *StatefulDB) CreateWebhook(webhook *Webhook) error { 39 if webhook.URL == "" { 40 return fmt.Errorf("webhook URL cannot be empty") 41 } 42 43 // Generate ID if not provided 44 if webhook.ID == "" { 45 uu, err := uuid.NewV7() 46 if err != nil { 47 return fmt.Errorf("failed to generate webhook ID: %w", err) 48 } 49 webhook.ID = uu.String() 50 } 51 52 if webhook.CreatedAt.IsZero() { 53 webhook.CreatedAt = time.Now() 54 } 55 if webhook.UpdatedAt.IsZero() { 56 webhook.UpdatedAt = time.Now() 57 } 58 result := state.DB.Create(webhook) 59 if result.Error != nil { 60 return fmt.Errorf("database create failed - Error: %v, ErrorType: %T, RowsAffected: %d", 61 result.Error, result.Error, result.RowsAffected) 62 } 63 64 return nil 65} 66 67// GetWebhook retrieves a webhook by ID and user DID 68func (state *StatefulDB) GetWebhook(id string, userDID string) (*Webhook, error) { 69 var webhook Webhook 70 err := state.DB.Where("id = ? AND user_did = ?", id, userDID).First(&webhook).Error 71 if err != nil { 72 if errors.Is(err, gorm.ErrRecordNotFound) { 73 return nil, fmt.Errorf("webhook not found") 74 } 75 return nil, err 76 } 77 return &webhook, nil 78} 79 80// ListWebhooks retrieves webhooks for a user with optional filters 81func (state *StatefulDB) ListWebhooks(userDID string, limit int, offset int, filters map[string]any) ([]Webhook, error) { 82 var webhooks []Webhook 83 query := state.DB.Where("user_did = ?", userDID) 84 85 // Apply filters 86 for key, value := range filters { 87 if value != nil { 88 query = query.Where(key+" = ?", value) 89 } 90 } 91 92 err := query.Limit(limit).Offset(offset).Order("created_at DESC").Find(&webhooks).Error 93 return webhooks, err 94} 95 96// UpdateWebhook updates an existing webhook 97func (state *StatefulDB) UpdateWebhook(id string, userDID string, updates map[string]interface{}) (*Webhook, error) { 98 updates["updated_at"] = time.Now() 99 result := state.DB.Model(&Webhook{}).Where("id = ? AND user_did = ?", id, userDID).Updates(updates) 100 if result.Error != nil { 101 return nil, result.Error 102 } 103 if result.RowsAffected == 0 { 104 return nil, fmt.Errorf("webhook not found or access denied") 105 } 106 return state.GetWebhook(id, userDID) 107} 108 109// DeleteWebhook deletes a webhook by ID and user DID 110func (state *StatefulDB) DeleteWebhook(id string, userDID string) error { 111 result := state.DB.Where("id = ? AND user_did = ?", id, userDID).Delete(&Webhook{}) 112 if result.Error != nil { 113 return result.Error 114 } 115 if result.RowsAffected == 0 { 116 return fmt.Errorf("webhook not found or access denied") 117 } 118 return nil 119} 120 121// GetActiveWebhooksForUser retrieves active webhooks for a user filtered by event type 122func (state *StatefulDB) GetActiveWebhooksForUser(userDID string, eventType string) ([]Webhook, error) { 123 var webhooks []Webhook 124 var err error 125 if state.Type == DBTypePostgres { 126 // cast to jsonb and use @> operator here 127 err = state.DB.Where("user_did = ? AND active = ? AND events::jsonb @> ?", 128 userDID, true, fmt.Sprintf(`["%s"]`, eventType)).Find(&webhooks).Error 129 } else { 130 // SQLite: Use JSON_EXTRACT with JSON_EACH to check if array contains the event 131 err = state.DB.Where("user_did = ? AND active = ? AND EXISTS (SELECT 1 FROM json_each(events) WHERE value = ?)", 132 userDID, true, eventType).Find(&webhooks).Error 133 } 134 return webhooks, err 135} 136 137// IncrementWebhookError increments the error count for a webhook 138func (state *StatefulDB) IncrementWebhookError(id string) error { 139 return state.DB.Model(&Webhook{}).Where("id = ?", id).UpdateColumn("error_count", state.DB.Raw("error_count + 1")).Error 140} 141 142// ResetWebhookError resets the error count for a webhook 143func (state *StatefulDB) ResetWebhookError(id string) error { 144 return state.DB.Model(&Webhook{}).Where("id = ?", id).Updates(map[string]interface{}{ 145 "error_count": 0, 146 "last_triggered": time.Now(), 147 }).Error 148} 149 150// ToLexicon converts a database Webhook to a streamplace.ServerDefs_Webhook 151func (w *Webhook) ToLexicon() (*streamplace.ServerDefs_Webhook, error) { 152 var events []string 153 if len(w.Events) > 0 { 154 err := json.Unmarshal(w.Events, &events) 155 if err != nil { 156 return nil, fmt.Errorf("failed to unmarshal events: %w", err) 157 } 158 } 159 160 var rewriteRules []*streamplace.ServerDefs_RewriteRule 161 if len(w.Rewrite) > 0 { 162 var dbRules []map[string]string 163 err := json.Unmarshal(w.Rewrite, &dbRules) 164 if err != nil { 165 return nil, fmt.Errorf("failed to unmarshal rewrite rules: %w", err) 166 } 167 for _, rule := range dbRules { 168 rewriteRules = append(rewriteRules, &streamplace.ServerDefs_RewriteRule{ 169 From: rule["from"], 170 To: rule["to"], 171 }) 172 } 173 } 174 175 createdAt := w.CreatedAt.Format(time.RFC3339) 176 177 webhook := &streamplace.ServerDefs_Webhook{ 178 Id: w.ID, 179 Url: w.URL, 180 Events: events, 181 Active: w.Active, 182 CreatedAt: createdAt, 183 Rewrite: rewriteRules, 184 } 185 186 if w.Prefix != "" { 187 webhook.Prefix = &w.Prefix 188 } 189 if w.Suffix != "" { 190 webhook.Suffix = &w.Suffix 191 } 192 if w.Name != "" { 193 webhook.Name = &w.Name 194 } 195 if w.Description != "" { 196 webhook.Description = &w.Description 197 } 198 if !w.UpdatedAt.IsZero() { 199 updatedAt := w.UpdatedAt.Format(time.RFC3339) 200 webhook.UpdatedAt = &updatedAt 201 } 202 if w.LastTriggered != nil { 203 lastTriggered := w.LastTriggered.Format(time.RFC3339) 204 webhook.LastTriggered = &lastTriggered 205 } 206 if w.ErrorCount > 0 { 207 errorCount := int64(w.ErrorCount) 208 webhook.ErrorCount = &errorCount 209 } 210 211 return webhook, nil 212} 213 214// FromLexiconInput converts a streamplace.ServerCreateWebhook_Input to a database Webhook 215func WebhookFromLexiconInput(input *streamplace.ServerCreateWebhook_Input, userDID, id string) (*Webhook, error) { 216 // Debug log the raw input 217 fmt.Printf("DEBUG: WebhookFromLexiconInput input.Events: %+v (type: %T)\n", input.Events, input.Events) 218 for i, event := range input.Events { 219 fmt.Printf("DEBUG: Event[%d]: %q (type: %T)\n", i, event, event) 220 } 221 222 var eventsJSON json.RawMessage 223 if len(input.Events) > 0 { 224 jsonBytes, err := json.Marshal(input.Events) 225 if err != nil { 226 return nil, fmt.Errorf("failed to marshal events: %w", err) 227 } 228 fmt.Printf("DEBUG: Marshaled events JSON: %q\n", string(jsonBytes)) 229 eventsJSON = json.RawMessage(jsonBytes) 230 } else { 231 // Default to empty array if no events provided 232 eventsJSON = json.RawMessage(`[]`) 233 } 234 235 var rewriteJSON json.RawMessage 236 if len(input.Rewrite) > 0 { 237 dbRules := make([]map[string]string, len(input.Rewrite)) 238 for i, rule := range input.Rewrite { 239 dbRules[i] = map[string]string{ 240 "from": rule.From, 241 "to": rule.To, 242 } 243 } 244 jsonBytes, err := json.Marshal(dbRules) 245 if err != nil { 246 return nil, fmt.Errorf("failed to marshal rewrite rules: %w", err) 247 } 248 rewriteJSON = json.RawMessage(jsonBytes) 249 } 250 251 webhook := &Webhook{ 252 ID: id, 253 UserDID: userDID, 254 URL: input.Url, 255 Events: eventsJSON, 256 Active: true, // Default to true as per database schema 257 CreatedAt: time.Now(), 258 UpdatedAt: time.Now(), 259 Rewrite: rewriteJSON, 260 } 261 262 // if active is provided, use that value 263 if input.Active != nil { 264 webhook.Active = *input.Active 265 } 266 267 if input.Prefix != nil { 268 webhook.Prefix = *input.Prefix 269 } 270 if input.Suffix != nil { 271 webhook.Suffix = *input.Suffix 272 } 273 if input.Name != nil { 274 webhook.Name = *input.Name 275 } 276 if input.Description != nil { 277 webhook.Description = *input.Description 278 } 279 280 return webhook, nil 281}