1// Copyright 2017 The Gitea Authors. All rights reserved.
2// SPDX-License-Identifier: MIT
3
4package webhook
5
6import (
7 "context"
8 "errors"
9 "time"
10
11 "forgejo.org/models/db"
12 "forgejo.org/modules/json"
13 "forgejo.org/modules/log"
14 "forgejo.org/modules/setting"
15 "forgejo.org/modules/timeutil"
16 webhook_module "forgejo.org/modules/webhook"
17
18 gouuid "github.com/google/uuid"
19 "xorm.io/builder"
20)
21
22// ___ ___ __ ___________ __
23// / | \ ____ ____ | | _\__ ___/____ _____| | __
24// / ~ \/ _ \ / _ \| |/ / | | \__ \ / ___/ |/ /
25// \ Y ( <_> | <_> ) < | | / __ \_\___ \| <
26// \___|_ / \____/ \____/|__|_ \ |____| (____ /____ >__|_ \
27// \/ \/ \/ \/ \/
28
29// HookRequest represents hook task request information.
30type HookRequest struct {
31 URL string `json:"url"`
32 HTTPMethod string `json:"http_method"`
33 Headers map[string]string `json:"headers"`
34 Body string `json:"body"`
35}
36
37// HookResponse represents hook task response information.
38type HookResponse struct {
39 Status int `json:"status"`
40 Headers map[string]string `json:"headers"`
41 Body string `json:"body"`
42}
43
44// HookTask represents a hook task.
45type HookTask struct {
46 ID int64 `xorm:"pk autoincr"`
47 HookID int64 `xorm:"index"`
48 UUID string `xorm:"unique"`
49 PayloadContent string `xorm:"LONGTEXT"`
50 // PayloadVersion number to allow for smooth version upgrades:
51 // - PayloadVersion 1: PayloadContent contains the JSON as sent to the URL
52 // - PayloadVersion 2: PayloadContent contains the original event
53 PayloadVersion int `xorm:"DEFAULT 1"`
54
55 EventType webhook_module.HookEventType
56 IsDelivered bool
57 Delivered timeutil.TimeStampNano
58
59 // History info.
60 IsSucceed bool
61 RequestContent string `xorm:"LONGTEXT"`
62 RequestInfo *HookRequest `xorm:"-"`
63 ResponseContent string `xorm:"LONGTEXT"`
64 ResponseInfo *HookResponse `xorm:"-"`
65}
66
67func init() {
68 db.RegisterModel(new(HookTask))
69}
70
71// BeforeUpdate will be invoked by XORM before updating a record
72// representing this object
73func (t *HookTask) BeforeUpdate() {
74 if t.RequestInfo != nil {
75 t.RequestContent = t.simpleMarshalJSON(t.RequestInfo)
76 }
77 if t.ResponseInfo != nil {
78 t.ResponseContent = t.simpleMarshalJSON(t.ResponseInfo)
79 }
80}
81
82// AfterLoad updates the webhook object upon setting a column
83func (t *HookTask) AfterLoad() {
84 if len(t.RequestContent) == 0 {
85 return
86 }
87
88 t.RequestInfo = &HookRequest{}
89 if err := json.Unmarshal([]byte(t.RequestContent), t.RequestInfo); err != nil {
90 log.Error("Unmarshal RequestContent[%d]: %v", t.ID, err)
91 }
92
93 if len(t.ResponseContent) > 0 {
94 t.ResponseInfo = &HookResponse{}
95 if err := json.Unmarshal([]byte(t.ResponseContent), t.ResponseInfo); err != nil {
96 log.Error("Unmarshal ResponseContent[%d]: %v", t.ID, err)
97 }
98 }
99}
100
101func (t *HookTask) simpleMarshalJSON(v any) string {
102 p, err := json.Marshal(v)
103 if err != nil {
104 log.Error("Marshal [%d]: %v", t.ID, err)
105 }
106 return string(p)
107}
108
109// HookTasks returns a list of hook tasks by given conditions, order by ID desc.
110func HookTasks(ctx context.Context, hookID int64, page int) ([]*HookTask, error) {
111 tasks := make([]*HookTask, 0, setting.Webhook.PagingNum)
112 return tasks, db.GetEngine(ctx).
113 Limit(setting.Webhook.PagingNum, (page-1)*setting.Webhook.PagingNum).
114 Where("hook_id=?", hookID).
115 Desc("id").
116 Find(&tasks)
117}
118
119// CreateHookTask creates a new hook task,
120// it handles conversion from Payload to PayloadContent.
121func CreateHookTask(ctx context.Context, t *HookTask) (*HookTask, error) {
122 t.UUID = gouuid.New().String()
123 if t.Delivered == 0 {
124 t.Delivered = timeutil.TimeStampNanoNow()
125 }
126 if t.PayloadVersion == 0 {
127 return nil, errors.New("missing HookTask.PayloadVersion")
128 }
129 return t, db.Insert(ctx, t)
130}
131
132func GetHookTaskByID(ctx context.Context, id int64) (*HookTask, error) {
133 t := &HookTask{}
134
135 has, err := db.GetEngine(ctx).ID(id).Get(t)
136 if err != nil {
137 return nil, err
138 }
139 if !has {
140 return nil, ErrHookTaskNotExist{
141 TaskID: id,
142 }
143 }
144 return t, nil
145}
146
147// UpdateHookTask updates information of hook task.
148func UpdateHookTask(ctx context.Context, t *HookTask) error {
149 _, err := db.GetEngine(ctx).ID(t.ID).AllCols().Update(t)
150 return err
151}
152
153// ReplayHookTask copies a hook task to get re-delivered
154func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask, error) {
155 task, exist, err := db.Get[HookTask](ctx, builder.Eq{"hook_id": hookID, "uuid": uuid})
156 if err != nil {
157 return nil, err
158 } else if !exist {
159 return nil, ErrHookTaskNotExist{
160 HookID: hookID,
161 UUID: uuid,
162 }
163 }
164
165 return CreateHookTask(ctx, &HookTask{
166 HookID: task.HookID,
167 PayloadContent: task.PayloadContent,
168 EventType: task.EventType,
169 PayloadVersion: task.PayloadVersion,
170 })
171}
172
173// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID
174func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) {
175 const batchSize = 100
176
177 tasks := make([]int64, 0, batchSize)
178 return tasks, db.GetEngine(ctx).
179 Select("id").
180 Table(new(HookTask)).
181 Where("is_delivered=?", false).
182 And("id > ?", lowerID).
183 Asc("id").
184 Limit(batchSize).
185 Find(&tasks)
186}
187
188func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) {
189 count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Cols("is_delivered").Update(&HookTask{
190 ID: task.ID,
191 IsDelivered: true,
192 })
193
194 return count != 0, err
195}
196
197// CleanupHookTaskTable deletes rows from hook_task as needed.
198func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error {
199 log.Trace("Doing: CleanupHookTaskTable")
200
201 switch cleanupType {
202 case OlderThan:
203 deleteOlderThan := time.Now().Add(-olderThan).UnixNano()
204 deletes, err := db.GetEngine(ctx).
205 Where("is_delivered = ? and delivered < ?", true, deleteOlderThan).
206 Delete(new(HookTask))
207 if err != nil {
208 return err
209 }
210 log.Trace("Deleted %d rows from hook_task", deletes)
211 case PerWebhook:
212 hookIDs := make([]int64, 0, 10)
213 err := db.GetEngine(ctx).
214 Table("webhook").
215 Where("id > 0").
216 Cols("id").
217 Find(&hookIDs)
218 if err != nil {
219 return err
220 }
221 for _, hookID := range hookIDs {
222 select {
223 case <-ctx.Done():
224 return db.ErrCancelledf("Before deleting hook_task records for hook id %d", hookID)
225 default:
226 }
227 if err = deleteDeliveredHookTasksByWebhook(ctx, hookID, numberToKeep); err != nil {
228 return err
229 }
230 }
231 }
232 log.Trace("Finished: CleanupHookTaskTable")
233 return nil
234}
235
236func deleteDeliveredHookTasksByWebhook(ctx context.Context, hookID int64, numberDeliveriesToKeep int) error {
237 log.Trace("Deleting hook_task rows for webhook %d, keeping the most recent %d deliveries", hookID, numberDeliveriesToKeep)
238 deliveryDates := make([]int64, 0, 10)
239 err := db.GetEngine(ctx).Table("hook_task").
240 Where("hook_task.hook_id = ? AND hook_task.is_delivered = ? AND hook_task.delivered is not null", hookID, true).
241 Cols("hook_task.delivered").
242 Join("INNER", "webhook", "hook_task.hook_id = webhook.id").
243 OrderBy("hook_task.delivered desc").
244 Limit(1, numberDeliveriesToKeep).
245 Find(&deliveryDates)
246 if err != nil {
247 return err
248 }
249
250 if len(deliveryDates) > 0 {
251 deletes, err := db.GetEngine(ctx).
252 Where("hook_id = ? and is_delivered = ? and delivered <= ?", hookID, true, deliveryDates[0]).
253 Delete(new(HookTask))
254 if err != nil {
255 return err
256 }
257 log.Trace("Deleted %d hook_task rows for webhook %d", deletes, hookID)
258 } else {
259 log.Trace("No hook_task rows to delete for webhook %d", hookID)
260 }
261
262 return nil
263}