Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.8.2 303 lines 9.0 kB view raw
1package statedb 2 3import ( 4 "encoding/json" 5 "errors" 6 "fmt" 7 "time" 8 9 "github.com/google/uuid" 10 "gorm.io/gorm" 11 "stream.place/streamplace/pkg/streamplace" 12) 13 14type Webhook struct { 15 // UUID primary key 16 ID string `gorm:"column:id;primarykey"` 17 UserDID string `gorm:"column:user_did;not null;index"` 18 URL string `gorm:"column:url;not null"` 19 20 Events json.RawMessage `gorm:"column:events;type:json"` 21 Active bool `gorm:"column:active;default:false"` 22 Prefix string `gorm:"column:prefix"` 23 Suffix string `gorm:"column:suffix"` 24 Rewrite json.RawMessage `gorm:"column:rewrite;type:json"` 25 MuteWords json.RawMessage `gorm:"column:mute_words;type:json"` 26 Name string `gorm:"column:name"` 27 Description string `gorm:"column:description"` 28 CreatedAt time.Time `gorm:"column:created_at"` 29 UpdatedAt time.Time `gorm:"column:updated_at"` 30 LastTriggered *time.Time `gorm:"column:last_triggered"` 31 ErrorCount int `gorm:"column:error_count;default:0"` 32} 33 34func (w *Webhook) TableName() string { 35 return "webhooks" 36} 37 38// CreateWebhook creates a new webhook for a user 39func (state *StatefulDB) CreateWebhook(webhook *Webhook) error { 40 if webhook.URL == "" { 41 return fmt.Errorf("webhook URL cannot be empty") 42 } 43 44 // Generate ID if not provided 45 if webhook.ID == "" { 46 uu, err := uuid.NewV7() 47 if err != nil { 48 return fmt.Errorf("failed to generate webhook ID: %w", err) 49 } 50 webhook.ID = uu.String() 51 } 52 53 if webhook.CreatedAt.IsZero() { 54 webhook.CreatedAt = time.Now() 55 } 56 if webhook.UpdatedAt.IsZero() { 57 webhook.UpdatedAt = time.Now() 58 } 59 result := state.DB.Create(webhook) 60 if result.Error != nil { 61 return fmt.Errorf("database create failed - Error: %v, ErrorType: %T, RowsAffected: %d", 62 result.Error, result.Error, result.RowsAffected) 63 } 64 65 return nil 66} 67 68// GetWebhook retrieves a webhook by ID and user DID 69func (state *StatefulDB) GetWebhook(id string, userDID string) (*Webhook, error) { 70 var webhook Webhook 71 err := state.DB.Where("id = ? AND user_did = ?", id, userDID).First(&webhook).Error 72 if err != nil { 73 if errors.Is(err, gorm.ErrRecordNotFound) { 74 return nil, fmt.Errorf("webhook not found") 75 } 76 return nil, err 77 } 78 return &webhook, nil 79} 80 81// ListWebhooks retrieves webhooks for a user with optional filters 82func (state *StatefulDB) ListWebhooks(userDID string, limit int, offset int, filters map[string]any) ([]Webhook, error) { 83 var webhooks []Webhook 84 query := state.DB.Where("user_did = ?", userDID) 85 86 // Apply filters 87 for key, value := range filters { 88 if value != nil { 89 query = query.Where(key+" = ?", value) 90 } 91 } 92 93 err := query.Limit(limit).Offset(offset).Order("created_at DESC").Find(&webhooks).Error 94 return webhooks, err 95} 96 97// UpdateWebhook updates an existing webhook 98func (state *StatefulDB) UpdateWebhook(id string, userDID string, updates map[string]interface{}) (*Webhook, error) { 99 updates["updated_at"] = time.Now() 100 result := state.DB.Model(&Webhook{}).Where("id = ? AND user_did = ?", id, userDID).Updates(updates) 101 if result.Error != nil { 102 return nil, result.Error 103 } 104 if result.RowsAffected == 0 { 105 return nil, fmt.Errorf("webhook not found or access denied") 106 } 107 return state.GetWebhook(id, userDID) 108} 109 110// DeleteWebhook deletes a webhook by ID and user DID 111func (state *StatefulDB) DeleteWebhook(id string, userDID string) error { 112 result := state.DB.Where("id = ? AND user_did = ?", id, userDID).Delete(&Webhook{}) 113 if result.Error != nil { 114 return result.Error 115 } 116 if result.RowsAffected == 0 { 117 return fmt.Errorf("webhook not found or access denied") 118 } 119 return nil 120} 121 122// GetActiveWebhooksForUser retrieves active webhooks for a user filtered by event type 123func (state *StatefulDB) GetActiveWebhooksForUser(userDID string, eventType string) ([]Webhook, error) { 124 var webhooks []Webhook 125 var err error 126 if state.Type == DBTypePostgres { 127 // cast to jsonb and use @> operator here 128 err = state.DB.Where("user_did = ? AND active = ? AND events::jsonb @> ?", 129 userDID, true, fmt.Sprintf(`["%s"]`, eventType)).Find(&webhooks).Error 130 } else { 131 // SQLite: Use JSON_EXTRACT with JSON_EACH to check if array contains the event 132 err = state.DB.Where("user_did = ? AND active = ? AND EXISTS (SELECT 1 FROM json_each(events) WHERE value = ?)", 133 userDID, true, eventType).Find(&webhooks).Error 134 } 135 return webhooks, err 136} 137 138// IncrementWebhookError increments the error count for a webhook 139func (state *StatefulDB) IncrementWebhookError(id string) error { 140 return state.DB.Model(&Webhook{}).Where("id = ?", id).UpdateColumn("error_count", state.DB.Raw("error_count + 1")).Error 141} 142 143// ResetWebhookError resets the error count for a webhook 144func (state *StatefulDB) ResetWebhookError(id string) error { 145 return state.DB.Model(&Webhook{}).Where("id = ?", id).Updates(map[string]interface{}{ 146 "error_count": 0, 147 "last_triggered": time.Now(), 148 }).Error 149} 150 151// ToLexicon converts a database Webhook to a streamplace.ServerDefs_Webhook 152func (w *Webhook) ToLexicon() (*streamplace.ServerDefs_Webhook, error) { 153 var events []string 154 if len(w.Events) > 0 { 155 err := json.Unmarshal(w.Events, &events) 156 if err != nil { 157 return nil, fmt.Errorf("failed to unmarshal events: %w", err) 158 } 159 } 160 161 var rewriteRules []*streamplace.ServerDefs_RewriteRule 162 if len(w.Rewrite) > 0 { 163 var dbRules []map[string]string 164 err := json.Unmarshal(w.Rewrite, &dbRules) 165 if err != nil { 166 return nil, fmt.Errorf("failed to unmarshal rewrite rules: %w", err) 167 } 168 for _, rule := range dbRules { 169 rewriteRules = append(rewriteRules, &streamplace.ServerDefs_RewriteRule{ 170 From: rule["from"], 171 To: rule["to"], 172 }) 173 } 174 } 175 176 var muteWords []string 177 if len(w.MuteWords) > 0 { 178 err := json.Unmarshal(w.MuteWords, &muteWords) 179 if err != nil { 180 return nil, fmt.Errorf("failed to unmarshal mute words: %w", err) 181 } 182 } 183 184 createdAt := w.CreatedAt.Format(time.RFC3339) 185 186 webhook := &streamplace.ServerDefs_Webhook{ 187 Id: w.ID, 188 Url: w.URL, 189 Events: events, 190 Active: w.Active, 191 CreatedAt: createdAt, 192 Rewrite: rewriteRules, 193 } 194 195 if w.Prefix != "" { 196 webhook.Prefix = &w.Prefix 197 } 198 if w.Suffix != "" { 199 webhook.Suffix = &w.Suffix 200 } 201 if w.Name != "" { 202 webhook.Name = &w.Name 203 } 204 if w.Description != "" { 205 webhook.Description = &w.Description 206 } 207 if len(muteWords) > 0 { 208 webhook.MuteWords = muteWords 209 } 210 if !w.UpdatedAt.IsZero() { 211 updatedAt := w.UpdatedAt.Format(time.RFC3339) 212 webhook.UpdatedAt = &updatedAt 213 } 214 if w.LastTriggered != nil { 215 lastTriggered := w.LastTriggered.Format(time.RFC3339) 216 webhook.LastTriggered = &lastTriggered 217 } 218 if w.ErrorCount > 0 { 219 errorCount := int64(w.ErrorCount) 220 webhook.ErrorCount = &errorCount 221 } 222 223 return webhook, nil 224} 225 226// FromLexiconInput converts a streamplace.ServerCreateWebhook_Input to a database Webhook 227func WebhookFromLexiconInput(input *streamplace.ServerCreateWebhook_Input, userDID, id string) (*Webhook, error) { 228 // Debug log the raw input 229 fmt.Printf("DEBUG: WebhookFromLexiconInput input.Events: %+v (type: %T)\n", input.Events, input.Events) 230 for i, event := range input.Events { 231 fmt.Printf("DEBUG: Event[%d]: %q (type: %T)\n", i, event, event) 232 } 233 234 var eventsJSON json.RawMessage 235 if len(input.Events) > 0 { 236 jsonBytes, err := json.Marshal(input.Events) 237 if err != nil { 238 return nil, fmt.Errorf("failed to marshal events: %w", err) 239 } 240 fmt.Printf("DEBUG: Marshaled events JSON: %q\n", string(jsonBytes)) 241 eventsJSON = json.RawMessage(jsonBytes) 242 } else { 243 // Default to empty array if no events provided 244 eventsJSON = json.RawMessage(`[]`) 245 } 246 247 var rewriteJSON json.RawMessage 248 if len(input.Rewrite) > 0 { 249 dbRules := make([]map[string]string, len(input.Rewrite)) 250 for i, rule := range input.Rewrite { 251 dbRules[i] = map[string]string{ 252 "from": rule.From, 253 "to": rule.To, 254 } 255 } 256 jsonBytes, err := json.Marshal(dbRules) 257 if err != nil { 258 return nil, fmt.Errorf("failed to marshal rewrite rules: %w", err) 259 } 260 rewriteJSON = json.RawMessage(jsonBytes) 261 } 262 263 var muteWordsJSON json.RawMessage 264 if len(input.MuteWords) > 0 { 265 jsonBytes, err := json.Marshal(input.MuteWords) 266 if err != nil { 267 return nil, fmt.Errorf("failed to marshal mute words: %w", err) 268 } 269 muteWordsJSON = json.RawMessage(jsonBytes) 270 } 271 272 webhook := &Webhook{ 273 ID: id, 274 UserDID: userDID, 275 URL: input.Url, 276 Events: eventsJSON, 277 Active: true, // Default to true as per database schema 278 CreatedAt: time.Now(), 279 UpdatedAt: time.Now(), 280 Rewrite: rewriteJSON, 281 MuteWords: muteWordsJSON, 282 } 283 284 // if active is provided, use that value 285 if input.Active != nil { 286 webhook.Active = *input.Active 287 } 288 289 if input.Prefix != nil { 290 webhook.Prefix = *input.Prefix 291 } 292 if input.Suffix != nil { 293 webhook.Suffix = *input.Suffix 294 } 295 if input.Name != nil { 296 webhook.Name = *input.Name 297 } 298 if input.Description != nil { 299 webhook.Description = *input.Description 300 } 301 302 return webhook, nil 303}