loading up the forgejo repo on tangled to test page performance
at forgejo 263 lines 7.8 kB view raw
1// Copyright 2017 The Gitea Authors. All rights reserved. 2// SPDX-License-Identifier: MIT 3 4package webhook 5 6import ( 7 "context" 8 "errors" 9 "time" 10 11 "forgejo.org/models/db" 12 "forgejo.org/modules/json" 13 "forgejo.org/modules/log" 14 "forgejo.org/modules/setting" 15 "forgejo.org/modules/timeutil" 16 webhook_module "forgejo.org/modules/webhook" 17 18 gouuid "github.com/google/uuid" 19 "xorm.io/builder" 20) 21 22// ___ ___ __ ___________ __ 23// / | \ ____ ____ | | _\__ ___/____ _____| | __ 24// / ~ \/ _ \ / _ \| |/ / | | \__ \ / ___/ |/ / 25// \ Y ( <_> | <_> ) < | | / __ \_\___ \| < 26// \___|_ / \____/ \____/|__|_ \ |____| (____ /____ >__|_ \ 27// \/ \/ \/ \/ \/ 28 29// HookRequest represents hook task request information. 30type HookRequest struct { 31 URL string `json:"url"` 32 HTTPMethod string `json:"http_method"` 33 Headers map[string]string `json:"headers"` 34 Body string `json:"body"` 35} 36 37// HookResponse represents hook task response information. 38type HookResponse struct { 39 Status int `json:"status"` 40 Headers map[string]string `json:"headers"` 41 Body string `json:"body"` 42} 43 44// HookTask represents a hook task. 45type HookTask struct { 46 ID int64 `xorm:"pk autoincr"` 47 HookID int64 `xorm:"index"` 48 UUID string `xorm:"unique"` 49 PayloadContent string `xorm:"LONGTEXT"` 50 // PayloadVersion number to allow for smooth version upgrades: 51 // - PayloadVersion 1: PayloadContent contains the JSON as sent to the URL 52 // - PayloadVersion 2: PayloadContent contains the original event 53 PayloadVersion int `xorm:"DEFAULT 1"` 54 55 EventType webhook_module.HookEventType 56 IsDelivered bool 57 Delivered timeutil.TimeStampNano 58 59 // History info. 60 IsSucceed bool 61 RequestContent string `xorm:"LONGTEXT"` 62 RequestInfo *HookRequest `xorm:"-"` 63 ResponseContent string `xorm:"LONGTEXT"` 64 ResponseInfo *HookResponse `xorm:"-"` 65} 66 67func init() { 68 db.RegisterModel(new(HookTask)) 69} 70 71// BeforeUpdate will be invoked by XORM before updating a record 72// representing this object 73func (t *HookTask) BeforeUpdate() { 74 if t.RequestInfo != nil { 75 t.RequestContent = t.simpleMarshalJSON(t.RequestInfo) 76 } 77 if t.ResponseInfo != nil { 78 t.ResponseContent = t.simpleMarshalJSON(t.ResponseInfo) 79 } 80} 81 82// AfterLoad updates the webhook object upon setting a column 83func (t *HookTask) AfterLoad() { 84 if len(t.RequestContent) == 0 { 85 return 86 } 87 88 t.RequestInfo = &HookRequest{} 89 if err := json.Unmarshal([]byte(t.RequestContent), t.RequestInfo); err != nil { 90 log.Error("Unmarshal RequestContent[%d]: %v", t.ID, err) 91 } 92 93 if len(t.ResponseContent) > 0 { 94 t.ResponseInfo = &HookResponse{} 95 if err := json.Unmarshal([]byte(t.ResponseContent), t.ResponseInfo); err != nil { 96 log.Error("Unmarshal ResponseContent[%d]: %v", t.ID, err) 97 } 98 } 99} 100 101func (t *HookTask) simpleMarshalJSON(v any) string { 102 p, err := json.Marshal(v) 103 if err != nil { 104 log.Error("Marshal [%d]: %v", t.ID, err) 105 } 106 return string(p) 107} 108 109// HookTasks returns a list of hook tasks by given conditions, order by ID desc. 110func HookTasks(ctx context.Context, hookID int64, page int) ([]*HookTask, error) { 111 tasks := make([]*HookTask, 0, setting.Webhook.PagingNum) 112 return tasks, db.GetEngine(ctx). 113 Limit(setting.Webhook.PagingNum, (page-1)*setting.Webhook.PagingNum). 114 Where("hook_id=?", hookID). 115 Desc("id"). 116 Find(&tasks) 117} 118 119// CreateHookTask creates a new hook task, 120// it handles conversion from Payload to PayloadContent. 121func CreateHookTask(ctx context.Context, t *HookTask) (*HookTask, error) { 122 t.UUID = gouuid.New().String() 123 if t.Delivered == 0 { 124 t.Delivered = timeutil.TimeStampNanoNow() 125 } 126 if t.PayloadVersion == 0 { 127 return nil, errors.New("missing HookTask.PayloadVersion") 128 } 129 return t, db.Insert(ctx, t) 130} 131 132func GetHookTaskByID(ctx context.Context, id int64) (*HookTask, error) { 133 t := &HookTask{} 134 135 has, err := db.GetEngine(ctx).ID(id).Get(t) 136 if err != nil { 137 return nil, err 138 } 139 if !has { 140 return nil, ErrHookTaskNotExist{ 141 TaskID: id, 142 } 143 } 144 return t, nil 145} 146 147// UpdateHookTask updates information of hook task. 148func UpdateHookTask(ctx context.Context, t *HookTask) error { 149 _, err := db.GetEngine(ctx).ID(t.ID).AllCols().Update(t) 150 return err 151} 152 153// ReplayHookTask copies a hook task to get re-delivered 154func ReplayHookTask(ctx context.Context, hookID int64, uuid string) (*HookTask, error) { 155 task, exist, err := db.Get[HookTask](ctx, builder.Eq{"hook_id": hookID, "uuid": uuid}) 156 if err != nil { 157 return nil, err 158 } else if !exist { 159 return nil, ErrHookTaskNotExist{ 160 HookID: hookID, 161 UUID: uuid, 162 } 163 } 164 165 return CreateHookTask(ctx, &HookTask{ 166 HookID: task.HookID, 167 PayloadContent: task.PayloadContent, 168 EventType: task.EventType, 169 PayloadVersion: task.PayloadVersion, 170 }) 171} 172 173// FindUndeliveredHookTaskIDs will find the next 100 undelivered hook tasks with ID greater than the provided lowerID 174func FindUndeliveredHookTaskIDs(ctx context.Context, lowerID int64) ([]int64, error) { 175 const batchSize = 100 176 177 tasks := make([]int64, 0, batchSize) 178 return tasks, db.GetEngine(ctx). 179 Select("id"). 180 Table(new(HookTask)). 181 Where("is_delivered=?", false). 182 And("id > ?", lowerID). 183 Asc("id"). 184 Limit(batchSize). 185 Find(&tasks) 186} 187 188func MarkTaskDelivered(ctx context.Context, task *HookTask) (bool, error) { 189 count, err := db.GetEngine(ctx).ID(task.ID).Where("is_delivered = ?", false).Cols("is_delivered").Update(&HookTask{ 190 ID: task.ID, 191 IsDelivered: true, 192 }) 193 194 return count != 0, err 195} 196 197// CleanupHookTaskTable deletes rows from hook_task as needed. 198func CleanupHookTaskTable(ctx context.Context, cleanupType HookTaskCleanupType, olderThan time.Duration, numberToKeep int) error { 199 log.Trace("Doing: CleanupHookTaskTable") 200 201 switch cleanupType { 202 case OlderThan: 203 deleteOlderThan := time.Now().Add(-olderThan).UnixNano() 204 deletes, err := db.GetEngine(ctx). 205 Where("is_delivered = ? and delivered < ?", true, deleteOlderThan). 206 Delete(new(HookTask)) 207 if err != nil { 208 return err 209 } 210 log.Trace("Deleted %d rows from hook_task", deletes) 211 case PerWebhook: 212 hookIDs := make([]int64, 0, 10) 213 err := db.GetEngine(ctx). 214 Table("webhook"). 215 Where("id > 0"). 216 Cols("id"). 217 Find(&hookIDs) 218 if err != nil { 219 return err 220 } 221 for _, hookID := range hookIDs { 222 select { 223 case <-ctx.Done(): 224 return db.ErrCancelledf("Before deleting hook_task records for hook id %d", hookID) 225 default: 226 } 227 if err = deleteDeliveredHookTasksByWebhook(ctx, hookID, numberToKeep); err != nil { 228 return err 229 } 230 } 231 } 232 log.Trace("Finished: CleanupHookTaskTable") 233 return nil 234} 235 236func deleteDeliveredHookTasksByWebhook(ctx context.Context, hookID int64, numberDeliveriesToKeep int) error { 237 log.Trace("Deleting hook_task rows for webhook %d, keeping the most recent %d deliveries", hookID, numberDeliveriesToKeep) 238 deliveryDates := make([]int64, 0, 10) 239 err := db.GetEngine(ctx).Table("hook_task"). 240 Where("hook_task.hook_id = ? AND hook_task.is_delivered = ? AND hook_task.delivered is not null", hookID, true). 241 Cols("hook_task.delivered"). 242 Join("INNER", "webhook", "hook_task.hook_id = webhook.id"). 243 OrderBy("hook_task.delivered desc"). 244 Limit(1, numberDeliveriesToKeep). 245 Find(&deliveryDates) 246 if err != nil { 247 return err 248 } 249 250 if len(deliveryDates) > 0 { 251 deletes, err := db.GetEngine(ctx). 252 Where("hook_id = ? and is_delivered = ? and delivered <= ?", hookID, true, deliveryDates[0]). 253 Delete(new(HookTask)) 254 if err != nil { 255 return err 256 } 257 log.Trace("Deleted %d hook_task rows for webhook %d", deletes, hookID) 258 } else { 259 log.Trace("No hook_task rows to delete for webhook %d", hookID) 260 } 261 262 return nil 263}