Live video on the AT Protocol
1package statedb
2
3import (
4 "encoding/json"
5 "errors"
6 "fmt"
7 "time"
8
9 "github.com/google/uuid"
10 "gorm.io/gorm"
11 "stream.place/streamplace/pkg/streamplace"
12)
13
14type Webhook struct {
15 // UUID primary key
16 ID string `gorm:"column:id;primarykey"`
17 UserDID string `gorm:"column:user_did;not null;index"`
18 URL string `gorm:"column:url;not null"`
19
20 Events json.RawMessage `gorm:"column:events;type:json"`
21 Active bool `gorm:"column:active;default:false"`
22 Prefix string `gorm:"column:prefix"`
23 Suffix string `gorm:"column:suffix"`
24 Rewrite json.RawMessage `gorm:"column:rewrite;type:json"`
25 Name string `gorm:"column:name"`
26 Description string `gorm:"column:description"`
27 CreatedAt time.Time `gorm:"column:created_at"`
28 UpdatedAt time.Time `gorm:"column:updated_at"`
29 LastTriggered *time.Time `gorm:"column:last_triggered"`
30 ErrorCount int `gorm:"column:error_count;default:0"`
31}
32
33func (w *Webhook) TableName() string {
34 return "webhooks"
35}
36
37// CreateWebhook creates a new webhook for a user
38func (state *StatefulDB) CreateWebhook(webhook *Webhook) error {
39 if webhook.URL == "" {
40 return fmt.Errorf("webhook URL cannot be empty")
41 }
42
43 // Generate ID if not provided
44 if webhook.ID == "" {
45 uu, err := uuid.NewV7()
46 if err != nil {
47 return fmt.Errorf("failed to generate webhook ID: %w", err)
48 }
49 webhook.ID = uu.String()
50 }
51
52 if webhook.CreatedAt.IsZero() {
53 webhook.CreatedAt = time.Now()
54 }
55 if webhook.UpdatedAt.IsZero() {
56 webhook.UpdatedAt = time.Now()
57 }
58 result := state.DB.Create(webhook)
59 if result.Error != nil {
60 return fmt.Errorf("database create failed - Error: %v, ErrorType: %T, RowsAffected: %d",
61 result.Error, result.Error, result.RowsAffected)
62 }
63
64 return nil
65}
66
67// GetWebhook retrieves a webhook by ID and user DID
68func (state *StatefulDB) GetWebhook(id string, userDID string) (*Webhook, error) {
69 var webhook Webhook
70 err := state.DB.Where("id = ? AND user_did = ?", id, userDID).First(&webhook).Error
71 if err != nil {
72 if errors.Is(err, gorm.ErrRecordNotFound) {
73 return nil, fmt.Errorf("webhook not found")
74 }
75 return nil, err
76 }
77 return &webhook, nil
78}
79
80// ListWebhooks retrieves webhooks for a user with optional filters
81func (state *StatefulDB) ListWebhooks(userDID string, limit int, offset int, filters map[string]any) ([]Webhook, error) {
82 var webhooks []Webhook
83 query := state.DB.Where("user_did = ?", userDID)
84
85 // Apply filters
86 for key, value := range filters {
87 if value != nil {
88 query = query.Where(key+" = ?", value)
89 }
90 }
91
92 err := query.Limit(limit).Offset(offset).Order("created_at DESC").Find(&webhooks).Error
93 return webhooks, err
94}
95
96// UpdateWebhook updates an existing webhook
97func (state *StatefulDB) UpdateWebhook(id string, userDID string, updates map[string]interface{}) (*Webhook, error) {
98 updates["updated_at"] = time.Now()
99 result := state.DB.Model(&Webhook{}).Where("id = ? AND user_did = ?", id, userDID).Updates(updates)
100 if result.Error != nil {
101 return nil, result.Error
102 }
103 if result.RowsAffected == 0 {
104 return nil, fmt.Errorf("webhook not found or access denied")
105 }
106 return state.GetWebhook(id, userDID)
107}
108
109// DeleteWebhook deletes a webhook by ID and user DID
110func (state *StatefulDB) DeleteWebhook(id string, userDID string) error {
111 result := state.DB.Where("id = ? AND user_did = ?", id, userDID).Delete(&Webhook{})
112 if result.Error != nil {
113 return result.Error
114 }
115 if result.RowsAffected == 0 {
116 return fmt.Errorf("webhook not found or access denied")
117 }
118 return nil
119}
120
121// GetActiveWebhooksForUser retrieves active webhooks for a user filtered by event type
122func (state *StatefulDB) GetActiveWebhooksForUser(userDID string, eventType string) ([]Webhook, error) {
123 var webhooks []Webhook
124 var err error
125 if state.Type == DBTypePostgres {
126 // cast to jsonb and use @> operator here
127 err = state.DB.Where("user_did = ? AND active = ? AND events::jsonb @> ?",
128 userDID, true, fmt.Sprintf(`["%s"]`, eventType)).Find(&webhooks).Error
129 } else {
130 // SQLite: Use JSON_EXTRACT with JSON_EACH to check if array contains the event
131 err = state.DB.Where("user_did = ? AND active = ? AND EXISTS (SELECT 1 FROM json_each(events) WHERE value = ?)",
132 userDID, true, eventType).Find(&webhooks).Error
133 }
134 return webhooks, err
135}
136
137// IncrementWebhookError increments the error count for a webhook
138func (state *StatefulDB) IncrementWebhookError(id string) error {
139 return state.DB.Model(&Webhook{}).Where("id = ?", id).UpdateColumn("error_count", state.DB.Raw("error_count + 1")).Error
140}
141
142// ResetWebhookError resets the error count for a webhook
143func (state *StatefulDB) ResetWebhookError(id string) error {
144 return state.DB.Model(&Webhook{}).Where("id = ?", id).Updates(map[string]interface{}{
145 "error_count": 0,
146 "last_triggered": time.Now(),
147 }).Error
148}
149
150// ToLexicon converts a database Webhook to a streamplace.ServerDefs_Webhook
151func (w *Webhook) ToLexicon() (*streamplace.ServerDefs_Webhook, error) {
152 var events []string
153 if len(w.Events) > 0 {
154 err := json.Unmarshal(w.Events, &events)
155 if err != nil {
156 return nil, fmt.Errorf("failed to unmarshal events: %w", err)
157 }
158 }
159
160 var rewriteRules []*streamplace.ServerDefs_RewriteRule
161 if len(w.Rewrite) > 0 {
162 var dbRules []map[string]string
163 err := json.Unmarshal(w.Rewrite, &dbRules)
164 if err != nil {
165 return nil, fmt.Errorf("failed to unmarshal rewrite rules: %w", err)
166 }
167 for _, rule := range dbRules {
168 rewriteRules = append(rewriteRules, &streamplace.ServerDefs_RewriteRule{
169 From: rule["from"],
170 To: rule["to"],
171 })
172 }
173 }
174
175 createdAt := w.CreatedAt.Format(time.RFC3339)
176
177 webhook := &streamplace.ServerDefs_Webhook{
178 Id: w.ID,
179 Url: w.URL,
180 Events: events,
181 Active: w.Active,
182 CreatedAt: createdAt,
183 Rewrite: rewriteRules,
184 }
185
186 if w.Prefix != "" {
187 webhook.Prefix = &w.Prefix
188 }
189 if w.Suffix != "" {
190 webhook.Suffix = &w.Suffix
191 }
192 if w.Name != "" {
193 webhook.Name = &w.Name
194 }
195 if w.Description != "" {
196 webhook.Description = &w.Description
197 }
198 if !w.UpdatedAt.IsZero() {
199 updatedAt := w.UpdatedAt.Format(time.RFC3339)
200 webhook.UpdatedAt = &updatedAt
201 }
202 if w.LastTriggered != nil {
203 lastTriggered := w.LastTriggered.Format(time.RFC3339)
204 webhook.LastTriggered = &lastTriggered
205 }
206 if w.ErrorCount > 0 {
207 errorCount := int64(w.ErrorCount)
208 webhook.ErrorCount = &errorCount
209 }
210
211 return webhook, nil
212}
213
214// FromLexiconInput converts a streamplace.ServerCreateWebhook_Input to a database Webhook
215func WebhookFromLexiconInput(input *streamplace.ServerCreateWebhook_Input, userDID, id string) (*Webhook, error) {
216 // Debug log the raw input
217 fmt.Printf("DEBUG: WebhookFromLexiconInput input.Events: %+v (type: %T)\n", input.Events, input.Events)
218 for i, event := range input.Events {
219 fmt.Printf("DEBUG: Event[%d]: %q (type: %T)\n", i, event, event)
220 }
221
222 var eventsJSON json.RawMessage
223 if len(input.Events) > 0 {
224 jsonBytes, err := json.Marshal(input.Events)
225 if err != nil {
226 return nil, fmt.Errorf("failed to marshal events: %w", err)
227 }
228 fmt.Printf("DEBUG: Marshaled events JSON: %q\n", string(jsonBytes))
229 eventsJSON = json.RawMessage(jsonBytes)
230 } else {
231 // Default to empty array if no events provided
232 eventsJSON = json.RawMessage(`[]`)
233 }
234
235 var rewriteJSON json.RawMessage
236 if len(input.Rewrite) > 0 {
237 dbRules := make([]map[string]string, len(input.Rewrite))
238 for i, rule := range input.Rewrite {
239 dbRules[i] = map[string]string{
240 "from": rule.From,
241 "to": rule.To,
242 }
243 }
244 jsonBytes, err := json.Marshal(dbRules)
245 if err != nil {
246 return nil, fmt.Errorf("failed to marshal rewrite rules: %w", err)
247 }
248 rewriteJSON = json.RawMessage(jsonBytes)
249 }
250
251 webhook := &Webhook{
252 ID: id,
253 UserDID: userDID,
254 URL: input.Url,
255 Events: eventsJSON,
256 Active: true, // Default to true as per database schema
257 CreatedAt: time.Now(),
258 UpdatedAt: time.Now(),
259 Rewrite: rewriteJSON,
260 }
261
262 // if active is provided, use that value
263 if input.Active != nil {
264 webhook.Active = *input.Active
265 }
266
267 if input.Prefix != nil {
268 webhook.Prefix = *input.Prefix
269 }
270 if input.Suffix != nil {
271 webhook.Suffix = *input.Suffix
272 }
273 if input.Name != nil {
274 webhook.Name = *input.Name
275 }
276 if input.Description != nil {
277 webhook.Description = *input.Description
278 }
279
280 return webhook, nil
281}