Live video on the AT Protocol
1package statedb
2
3import (
4 "encoding/json"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/google/uuid"
10 "gorm.io/gorm"
11 "stream.place/streamplace/pkg/streamplace"
12)
13
14type Webhook struct {
15 // UUID primary key
16 ID string `gorm:"column:id;primarykey"`
17 UserDID string `gorm:"column:user_did;not null;index"`
18 URL string `gorm:"column:url;not null"`
19
20 Events json.RawMessage `gorm:"column:events;type:json"`
21 Active bool `gorm:"column:active;default:false"`
22 Prefix string `gorm:"column:prefix"`
23 Suffix string `gorm:"column:suffix"`
24 Rewrite json.RawMessage `gorm:"column:rewrite;type:json"`
25 MuteWords json.RawMessage `gorm:"column:mute_words;type:json"`
26 Name string `gorm:"column:name"`
27 Description string `gorm:"column:description"`
28 CreatedAt time.Time `gorm:"column:created_at"`
29 UpdatedAt time.Time `gorm:"column:updated_at"`
30 LastTriggered *time.Time `gorm:"column:last_triggered"`
31 ErrorCount int `gorm:"column:error_count;default:0"`
32}
33
34func (w *Webhook) TableName() string {
35 return "webhooks"
36}
37
38// CreateWebhook creates a new webhook for a user
39func (state *StatefulDB) CreateWebhook(webhook *Webhook) error {
40 if webhook.URL == "" {
41 return fmt.Errorf("webhook URL cannot be empty")
42 }
43
44 // Generate ID if not provided
45 if webhook.ID == "" {
46 uu, err := uuid.NewV7()
47 if err != nil {
48 return fmt.Errorf("failed to generate webhook ID: %w", err)
49 }
50 webhook.ID = uu.String()
51 }
52
53 if webhook.CreatedAt.IsZero() {
54 webhook.CreatedAt = time.Now()
55 }
56 if webhook.UpdatedAt.IsZero() {
57 webhook.UpdatedAt = time.Now()
58 }
59 result := state.DB.Create(webhook)
60 if result.Error != nil {
61 return fmt.Errorf("database create failed - Error: %v, ErrorType: %T, RowsAffected: %d",
62 result.Error, result.Error, result.RowsAffected)
63 }
64
65 return nil
66}
67
68// GetWebhook retrieves a webhook by ID and user DID
69func (state *StatefulDB) GetWebhook(id string, userDID string) (*Webhook, error) {
70 var webhook Webhook
71 err := state.DB.Where("id = ? AND user_did = ?", id, userDID).First(&webhook).Error
72 if err != nil {
73 if errors.Is(err, gorm.ErrRecordNotFound) {
74 return nil, fmt.Errorf("webhook not found")
75 }
76 return nil, err
77 }
78 return &webhook, nil
79}
80
81// ListWebhooks retrieves webhooks for a user with optional filters
82func (state *StatefulDB) ListWebhooks(userDID string, limit int, offset int, filters map[string]any) ([]Webhook, error) {
83 var webhooks []Webhook
84 query := state.DB.Where("user_did = ?", userDID)
85
86 // Apply filters
87 for key, value := range filters {
88 if value != nil {
89 query = query.Where(key+" = ?", value)
90 }
91 }
92
93 err := query.Limit(limit).Offset(offset).Order("created_at DESC").Find(&webhooks).Error
94 return webhooks, err
95}
96
97// UpdateWebhook updates an existing webhook
98func (state *StatefulDB) UpdateWebhook(id string, userDID string, updates map[string]interface{}) (*Webhook, error) {
99 updates["updated_at"] = time.Now()
100 result := state.DB.Model(&Webhook{}).Where("id = ? AND user_did = ?", id, userDID).Updates(updates)
101 if result.Error != nil {
102 return nil, result.Error
103 }
104 if result.RowsAffected == 0 {
105 return nil, fmt.Errorf("webhook not found or access denied")
106 }
107 return state.GetWebhook(id, userDID)
108}
109
110// DeleteWebhook deletes a webhook by ID and user DID
111func (state *StatefulDB) DeleteWebhook(id string, userDID string) error {
112 result := state.DB.Where("id = ? AND user_did = ?", id, userDID).Delete(&Webhook{})
113 if result.Error != nil {
114 return result.Error
115 }
116 if result.RowsAffected == 0 {
117 return fmt.Errorf("webhook not found or access denied")
118 }
119 return nil
120}
121
122// GetActiveWebhooksForUser retrieves active webhooks for a user filtered by event type
123func (state *StatefulDB) GetActiveWebhooksForUser(userDID string, eventType string) ([]Webhook, error) {
124 var webhooks []Webhook
125 var err error
126 if state.Type == DBTypePostgres {
127 // cast to jsonb and use @> operator here
128 err = state.DB.Where("user_did = ? AND active = ? AND events::jsonb @> ?",
129 userDID, true, fmt.Sprintf(`["%s"]`, eventType)).Find(&webhooks).Error
130 } else {
131 // SQLite: Use JSON_EXTRACT with JSON_EACH to check if array contains the event
132 err = state.DB.Where("user_did = ? AND active = ? AND EXISTS (SELECT 1 FROM json_each(events) WHERE value = ?)",
133 userDID, true, eventType).Find(&webhooks).Error
134 }
135 return webhooks, err
136}
137
138// IncrementWebhookError increments the error count for a webhook
139func (state *StatefulDB) IncrementWebhookError(id string) error {
140 return state.DB.Model(&Webhook{}).Where("id = ?", id).UpdateColumn("error_count", state.DB.Raw("error_count + 1")).Error
141}
142
143// ResetWebhookError resets the error count for a webhook
144func (state *StatefulDB) ResetWebhookError(id string) error {
145 return state.DB.Model(&Webhook{}).Where("id = ?", id).Updates(map[string]interface{}{
146 "error_count": 0,
147 "last_triggered": time.Now(),
148 }).Error
149}
150
151// ToLexicon converts a database Webhook to a streamplace.ServerDefs_Webhook
152func (w *Webhook) ToLexicon() (*streamplace.ServerDefs_Webhook, error) {
153 var events []string
154 if len(w.Events) > 0 {
155 err := json.Unmarshal(w.Events, &events)
156 if err != nil {
157 return nil, fmt.Errorf("failed to unmarshal events: %w", err)
158 }
159 }
160
161 var rewriteRules []*streamplace.ServerDefs_RewriteRule
162 if len(w.Rewrite) > 0 {
163 var dbRules []map[string]string
164 err := json.Unmarshal(w.Rewrite, &dbRules)
165 if err != nil {
166 return nil, fmt.Errorf("failed to unmarshal rewrite rules: %w", err)
167 }
168 for _, rule := range dbRules {
169 rewriteRules = append(rewriteRules, &streamplace.ServerDefs_RewriteRule{
170 From: rule["from"],
171 To: rule["to"],
172 })
173 }
174 }
175
176 var muteWords []string
177 if len(w.MuteWords) > 0 {
178 err := json.Unmarshal(w.MuteWords, &muteWords)
179 if err != nil {
180 return nil, fmt.Errorf("failed to unmarshal mute words: %w", err)
181 }
182 }
183
184 createdAt := w.CreatedAt.Format(time.RFC3339)
185
186 webhook := &streamplace.ServerDefs_Webhook{
187 Id: w.ID,
188 Url: w.URL,
189 Events: events,
190 Active: w.Active,
191 CreatedAt: createdAt,
192 Rewrite: rewriteRules,
193 }
194
195 if w.Prefix != "" {
196 webhook.Prefix = &w.Prefix
197 }
198 if w.Suffix != "" {
199 webhook.Suffix = &w.Suffix
200 }
201 if w.Name != "" {
202 webhook.Name = &w.Name
203 }
204 if w.Description != "" {
205 webhook.Description = &w.Description
206 }
207 if len(muteWords) > 0 {
208 webhook.MuteWords = muteWords
209 }
210 if !w.UpdatedAt.IsZero() {
211 updatedAt := w.UpdatedAt.Format(time.RFC3339)
212 webhook.UpdatedAt = &updatedAt
213 }
214 if w.LastTriggered != nil {
215 lastTriggered := w.LastTriggered.Format(time.RFC3339)
216 webhook.LastTriggered = &lastTriggered
217 }
218 if w.ErrorCount > 0 {
219 errorCount := int64(w.ErrorCount)
220 webhook.ErrorCount = &errorCount
221 }
222
223 return webhook, nil
224}
225
226// FromLexiconInput converts a streamplace.ServerCreateWebhook_Input to a database Webhook
227func WebhookFromLexiconInput(input *streamplace.ServerCreateWebhook_Input, userDID, id string) (*Webhook, error) {
228 // Debug log the raw input
229 fmt.Printf("DEBUG: WebhookFromLexiconInput input.Events: %+v (type: %T)\n", input.Events, input.Events)
230 for i, event := range input.Events {
231 fmt.Printf("DEBUG: Event[%d]: %q (type: %T)\n", i, event, event)
232 }
233
234 var eventsJSON json.RawMessage
235 if len(input.Events) > 0 {
236 jsonBytes, err := json.Marshal(input.Events)
237 if err != nil {
238 return nil, fmt.Errorf("failed to marshal events: %w", err)
239 }
240 fmt.Printf("DEBUG: Marshaled events JSON: %q\n", string(jsonBytes))
241 eventsJSON = json.RawMessage(jsonBytes)
242 } else {
243 // Default to empty array if no events provided
244 eventsJSON = json.RawMessage(`[]`)
245 }
246
247 var rewriteJSON json.RawMessage
248 if len(input.Rewrite) > 0 {
249 dbRules := make([]map[string]string, len(input.Rewrite))
250 for i, rule := range input.Rewrite {
251 dbRules[i] = map[string]string{
252 "from": rule.From,
253 "to": rule.To,
254 }
255 }
256 jsonBytes, err := json.Marshal(dbRules)
257 if err != nil {
258 return nil, fmt.Errorf("failed to marshal rewrite rules: %w", err)
259 }
260 rewriteJSON = json.RawMessage(jsonBytes)
261 }
262
263 var muteWordsJSON json.RawMessage
264 if len(input.MuteWords) > 0 {
265 jsonBytes, err := json.Marshal(input.MuteWords)
266 if err != nil {
267 return nil, fmt.Errorf("failed to marshal mute words: %w", err)
268 }
269 muteWordsJSON = json.RawMessage(jsonBytes)
270 }
271
272 webhook := &Webhook{
273 ID: id,
274 UserDID: userDID,
275 URL: input.Url,
276 Events: eventsJSON,
277 Active: true, // Default to true as per database schema
278 CreatedAt: time.Now(),
279 UpdatedAt: time.Now(),
280 Rewrite: rewriteJSON,
281 MuteWords: muteWordsJSON,
282 }
283
284 // if active is provided, use that value
285 if input.Active != nil {
286 webhook.Active = *input.Active
287 }
288
289 if input.Prefix != nil {
290 webhook.Prefix = *input.Prefix
291 }
292 if input.Suffix != nil {
293 webhook.Suffix = *input.Suffix
294 }
295 if input.Name != nil {
296 webhook.Name = *input.Name
297 }
298 if input.Description != nil {
299 webhook.Description = *input.Description
300 }
301
302 return webhook, nil
303}