Live video on the AT Protocol
79
fork

Configure Feed

Select the types of activity you want to include in your feed.

at v0.7.35 281 lines 8.4 kB view raw
1package statedb 2 3import ( 4 "encoding/json" 5 "errors" 6 "fmt" 7 "time" 8 9 "github.com/google/uuid" 10 "gorm.io/gorm" 11 "stream.place/streamplace/pkg/streamplace" 12) 13 14type Webhook struct { 15 // UUID primary key 16 ID string `gorm:"column:id;primarykey"` 17 UserDID string `gorm:"column:user_did;not null;index"` 18 URL string `gorm:"column:url;not null"` 19 20 Events json.RawMessage `gorm:"column:events;type:json"` 21 Active bool `gorm:"column:active;default:false"` 22 Prefix string `gorm:"column:prefix"` 23 Suffix string `gorm:"column:suffix"` 24 Rewrite json.RawMessage `gorm:"column:rewrite;type:json"` 25 Name string `gorm:"column:name"` 26 Description string `gorm:"column:description"` 27 CreatedAt time.Time `gorm:"column:created_at"` 28 UpdatedAt time.Time `gorm:"column:updated_at"` 29 LastTriggered *time.Time `gorm:"column:last_triggered"` 30 ErrorCount int `gorm:"column:error_count;default:0"` 31} 32 33func (w *Webhook) TableName() string { 34 return "webhooks" 35} 36 37// CreateWebhook creates a new webhook for a user 38func (state *StatefulDB) CreateWebhook(webhook *Webhook) error { 39 if webhook.URL == "" { 40 return fmt.Errorf("webhook URL cannot be empty") 41 } 42 43 // Generate ID if not provided 44 if webhook.ID == "" { 45 uu, err := uuid.NewV7() 46 if err != nil { 47 return fmt.Errorf("failed to generate webhook ID: %w", err) 48 } 49 webhook.ID = uu.String() 50 } 51 52 if webhook.CreatedAt.IsZero() { 53 webhook.CreatedAt = time.Now() 54 } 55 if webhook.UpdatedAt.IsZero() { 56 webhook.UpdatedAt = time.Now() 57 } 58 result := state.DB.Create(webhook) 59 if result.Error != nil { 60 return fmt.Errorf("database create failed - Error: %v, ErrorType: %T, RowsAffected: %d", 61 result.Error, result.Error, result.RowsAffected) 62 } 63 64 return nil 65} 66 67// GetWebhook retrieves a webhook by ID and user DID 68func (state *StatefulDB) GetWebhook(id string, userDID string) (*Webhook, error) { 69 var webhook Webhook 70 err := state.DB.Where("id = ? AND user_did = ?", id, userDID).First(&webhook).Error 71 if err != nil { 72 if errors.Is(err, gorm.ErrRecordNotFound) { 73 return nil, fmt.Errorf("webhook not found") 74 } 75 return nil, err 76 } 77 return &webhook, nil 78} 79 80// ListWebhooks retrieves webhooks for a user with optional filters 81func (state *StatefulDB) ListWebhooks(userDID string, limit int, offset int, filters map[string]any) ([]Webhook, error) { 82 var webhooks []Webhook 83 query := state.DB.Where("user_did = ?", userDID) 84 85 // Apply filters 86 for key, value := range filters { 87 if value != nil { 88 query = query.Where(key+" = ?", value) 89 } 90 } 91 92 err := query.Limit(limit).Offset(offset).Order("created_at DESC").Find(&webhooks).Error 93 return webhooks, err 94} 95 96// UpdateWebhook updates an existing webhook 97func (state *StatefulDB) UpdateWebhook(id string, userDID string, updates map[string]interface{}) (*Webhook, error) { 98 updates["updated_at"] = time.Now() 99 result := state.DB.Model(&Webhook{}).Where("id = ? AND user_did = ?", id, userDID).Updates(updates) 100 if result.Error != nil { 101 return nil, result.Error 102 } 103 if result.RowsAffected == 0 { 104 return nil, fmt.Errorf("webhook not found or access denied") 105 } 106 return state.GetWebhook(id, userDID) 107} 108 109// DeleteWebhook deletes a webhook by ID and user DID 110func (state *StatefulDB) DeleteWebhook(id string, userDID string) error { 111 result := state.DB.Where("id = ? AND user_did = ?", id, userDID).Delete(&Webhook{}) 112 if result.Error != nil { 113 return result.Error 114 } 115 if result.RowsAffected == 0 { 116 return fmt.Errorf("webhook not found or access denied") 117 } 118 return nil 119} 120 121// GetActiveWebhooksForUser retrieves active webhooks for a user filtered by event type 122func (state *StatefulDB) GetActiveWebhooksForUser(userDID string, eventType string) ([]Webhook, error) { 123 var webhooks []Webhook 124 var err error 125 if state.Type == DBTypePostgres { 126 // cast to jsonb and use @> operator here 127 err = state.DB.Where("user_did = ? AND active = ? AND events::jsonb @> ?", 128 userDID, true, fmt.Sprintf(`["%s"]`, eventType)).Find(&webhooks).Error 129 } else { 130 // SQLite: Use JSON_EXTRACT with JSON_EACH to check if array contains the event 131 err = state.DB.Where("user_did = ? AND active = ? AND EXISTS (SELECT 1 FROM json_each(events) WHERE value = ?)", 132 userDID, true, eventType).Find(&webhooks).Error 133 } 134 return webhooks, err 135} 136 137// IncrementWebhookError increments the error count for a webhook 138func (state *StatefulDB) IncrementWebhookError(id string) error { 139 return state.DB.Model(&Webhook{}).Where("id = ?", id).UpdateColumn("error_count", state.DB.Raw("error_count + 1")).Error 140} 141 142// ResetWebhookError resets the error count for a webhook 143func (state *StatefulDB) ResetWebhookError(id string) error { 144 return state.DB.Model(&Webhook{}).Where("id = ?", id).Updates(map[string]interface{}{ 145 "error_count": 0, 146 "last_triggered": time.Now(), 147 }).Error 148} 149 150// ToLexicon converts a database Webhook to a streamplace.ServerDefs_Webhook 151func (w *Webhook) ToLexicon() (*streamplace.ServerDefs_Webhook, error) { 152 var events []string 153 if len(w.Events) > 0 { 154 err := json.Unmarshal(w.Events, &events) 155 if err != nil { 156 return nil, fmt.Errorf("failed to unmarshal events: %w", err) 157 } 158 } 159 160 var rewriteRules []*streamplace.ServerDefs_RewriteRule 161 if len(w.Rewrite) > 0 { 162 var dbRules []map[string]string 163 err := json.Unmarshal(w.Rewrite, &dbRules) 164 if err != nil { 165 return nil, fmt.Errorf("failed to unmarshal rewrite rules: %w", err) 166 } 167 for _, rule := range dbRules { 168 rewriteRules = append(rewriteRules, &streamplace.ServerDefs_RewriteRule{ 169 From: rule["from"], 170 To: rule["to"], 171 }) 172 } 173 } 174 175 createdAt := w.CreatedAt.Format(time.RFC3339) 176 177 webhook := &streamplace.ServerDefs_Webhook{ 178 Id: w.ID, 179 Url: w.URL, 180 Events: events, 181 Active: w.Active, 182 CreatedAt: createdAt, 183 Rewrite: rewriteRules, 184 } 185 186 if w.Prefix != "" { 187 webhook.Prefix = &w.Prefix 188 } 189 if w.Suffix != "" { 190 webhook.Suffix = &w.Suffix 191 } 192 if w.Name != "" { 193 webhook.Name = &w.Name 194 } 195 if w.Description != "" { 196 webhook.Description = &w.Description 197 } 198 if !w.UpdatedAt.IsZero() { 199 updatedAt := w.UpdatedAt.Format(time.RFC3339) 200 webhook.UpdatedAt = &updatedAt 201 } 202 if w.LastTriggered != nil { 203 lastTriggered := w.LastTriggered.Format(time.RFC3339) 204 webhook.LastTriggered = &lastTriggered 205 } 206 if w.ErrorCount > 0 { 207 errorCount := int64(w.ErrorCount) 208 webhook.ErrorCount = &errorCount 209 } 210 211 return webhook, nil 212} 213 214// FromLexiconInput converts a streamplace.ServerCreateWebhook_Input to a database Webhook 215func WebhookFromLexiconInput(input *streamplace.ServerCreateWebhook_Input, userDID, id string) (*Webhook, error) { 216 // Debug log the raw input 217 fmt.Printf("DEBUG: WebhookFromLexiconInput input.Events: %+v (type: %T)\n", input.Events, input.Events) 218 for i, event := range input.Events { 219 fmt.Printf("DEBUG: Event[%d]: %q (type: %T)\n", i, event, event) 220 } 221 222 var eventsJSON json.RawMessage 223 if len(input.Events) > 0 { 224 jsonBytes, err := json.Marshal(input.Events) 225 if err != nil { 226 return nil, fmt.Errorf("failed to marshal events: %w", err) 227 } 228 fmt.Printf("DEBUG: Marshaled events JSON: %q\n", string(jsonBytes)) 229 eventsJSON = json.RawMessage(jsonBytes) 230 } else { 231 // Default to empty array if no events provided 232 eventsJSON = json.RawMessage(`[]`) 233 } 234 235 var rewriteJSON json.RawMessage 236 if len(input.Rewrite) > 0 { 237 dbRules := make([]map[string]string, len(input.Rewrite)) 238 for i, rule := range input.Rewrite { 239 dbRules[i] = map[string]string{ 240 "from": rule.From, 241 "to": rule.To, 242 } 243 } 244 jsonBytes, err := json.Marshal(dbRules) 245 if err != nil { 246 return nil, fmt.Errorf("failed to marshal rewrite rules: %w", err) 247 } 248 rewriteJSON = json.RawMessage(jsonBytes) 249 } 250 251 webhook := &Webhook{ 252 ID: id, 253 UserDID: userDID, 254 URL: input.Url, 255 Events: eventsJSON, 256 Active: true, // Default to true as per database schema 257 CreatedAt: time.Now(), 258 UpdatedAt: time.Now(), 259 Rewrite: rewriteJSON, 260 } 261 262 // if active is provided, use that value 263 if input.Active != nil { 264 webhook.Active = *input.Active 265 } 266 267 if input.Prefix != nil { 268 webhook.Prefix = *input.Prefix 269 } 270 if input.Suffix != nil { 271 webhook.Suffix = *input.Suffix 272 } 273 if input.Name != nil { 274 webhook.Name = *input.Name 275 } 276 if input.Description != nil { 277 webhook.Description = *input.Description 278 } 279 280 return webhook, nil 281}