Monorepo for Tangled tangled.org

appview/notify: add Push event and webhook notifier #1067

merged opened by anirudh.fi targeting master from icy/qlyxxp

Add Push method to Notifier interface for git push events. Implement WebhookNotifier that sends webhook payloads with HMAC-SHA256 signatures for authentication. Supports push events with delivery tracking and retry logic (3 attempts with exponential backoff).

Signed-off-by: Anirudh Oppiliappan anirudh@tangled.org

Labels

None yet.

assignee

None yet.

Participants 2
AT URI
at://did:plc:hwevmowznbiukdf6uk5dwrrq/sh.tangled.repo.pull/3menyebs7qn22
+262
Diff #1
+4
appview/notify/db/db.go
··· 354 354 // no-op 355 355 } 356 356 357 + func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 358 + // no-op for now; webhooks are handled by the webhook notifier 359 + } 360 + 357 361 func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 358 362 l := log.FromContext(ctx) 359 363
+5
appview/notify/logging_notifier.go
··· 103 103 ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "DeleteString")) 104 104 l.inner.DeleteString(ctx, did, rkey) 105 105 } 106 + 107 + func (l *loggingNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 108 + ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "Push")) 109 + l.inner.Push(ctx, repo, ref, oldSha, newSha, committerDid) 110 + }
+4
appview/notify/merged_notifier.go
··· 93 93 func (m *mergedNotifier) DeleteString(ctx context.Context, did, rkey string) { 94 94 m.fanout(func(n Notifier) { n.DeleteString(ctx, did, rkey) }) 95 95 } 96 + 97 + func (m *mergedNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 98 + m.fanout(func(n Notifier) { n.Push(ctx, repo, ref, oldSha, newSha, committerDid) }) 99 + }
+5
appview/notify/notifier.go
··· 30 30 NewString(ctx context.Context, s *models.String) 31 31 EditString(ctx context.Context, s *models.String) 32 32 DeleteString(ctx context.Context, did, rkey string) 33 + 34 + Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) 33 35 } 34 36 35 37 // BaseNotifier is a listener that does nothing ··· 61 63 func (m *BaseNotifier) NewString(ctx context.Context, s *models.String) {} 62 64 func (m *BaseNotifier) EditString(ctx context.Context, s *models.String) {} 63 65 func (m *BaseNotifier) DeleteString(ctx context.Context, did, rkey string) {} 66 + 67 + func (m *BaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 68 + }
+244
appview/notify/webhook_notifier.go
··· 1 + package notify 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "crypto/hmac" 7 + "crypto/sha256" 8 + "encoding/hex" 9 + "encoding/json" 10 + "fmt" 11 + "io" 12 + "log/slog" 13 + "net/http" 14 + "slices" 15 + "time" 16 + 17 + "github.com/avast/retry-go/v4" 18 + "github.com/google/uuid" 19 + "tangled.org/core/appview/db" 20 + "tangled.org/core/appview/models" 21 + "tangled.org/core/log" 22 + ) 23 + 24 + type WebhookNotifier struct { 25 + BaseNotifier 26 + db *db.DB 27 + logger *slog.Logger 28 + client *http.Client 29 + } 30 + 31 + func NewWebhookNotifier(database *db.DB) *WebhookNotifier { 32 + return &WebhookNotifier{ 33 + db: database, 34 + logger: log.New("webhook-notifier"), 35 + client: &http.Client{ 36 + Timeout: 30 * time.Second, 37 + }, 38 + } 39 + } 40 + 41 + // Push implements the Notifier interface for git push events 42 + func (w *WebhookNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 43 + webhooks, err := db.GetActiveWebhooksForRepo(w.db, repo.RepoAt()) 44 + if err != nil { 45 + w.logger.Error("failed to get webhooks for repo", "repo", repo.RepoAt(), "err", err) 46 + return 47 + } 48 + 49 + // check if any webhooks are subscribed to push events 50 + var pushWebhooks []models.Webhook 51 + for _, webhook := range webhooks { 52 + if slices.Contains(webhook.Events, "push") { 53 + pushWebhooks = append(pushWebhooks, webhook) 54 + } 55 + } 56 + 57 + if len(pushWebhooks) == 0 { 58 + return 59 + } 60 + 61 + payload, err := w.buildPushPayload(repo, ref, oldSha, newSha, committerDid) 62 + if err != nil { 63 + w.logger.Error("failed to build push payload", "repo", repo.RepoAt(), "err", err) 64 + return 65 + } 66 + 67 + // Send webhooks 68 + for _, webhook := range pushWebhooks { 69 + go w.sendWebhook(ctx, webhook, "push", payload) 70 + } 71 + } 72 + 73 + // buildPushPayload creates a the webhook payload 74 + func (w *WebhookNotifier) buildPushPayload(repo *models.Repo, ref, oldSha, newSha, committerDid string) (map[string]interface{}, error) { 75 + owner := repo.Did 76 + 77 + pusher := committerDid 78 + if committerDid == "" { 79 + pusher = owner 80 + } 81 + 82 + // build repository object 83 + repository := map[string]any{ 84 + "name": repo.Name, 85 + "full_name": fmt.Sprintf("%s/%s", repo.Did, repo.Name), 86 + "description": repo.Description, 87 + "fork": repo.Source != "", 88 + "html_url": fmt.Sprintf("https://%s/%s/%s", repo.Knot, repo.Did, repo.Name), 89 + "clone_url": fmt.Sprintf("https://%s/%s/%s", repo.Knot, repo.Did, repo.Name), 90 + "ssh_url": fmt.Sprintf("ssh://git@%s/%s/%s", repo.Knot, repo.Did, repo.Name), 91 + "created_at": repo.Created.Format(time.RFC3339), 92 + "updated_at": repo.Created.Format(time.RFC3339), 93 + } 94 + 95 + // add optional fields if available 96 + if repo.Website != "" { 97 + repository["website"] = repo.Website 98 + } 99 + if repo.RepoStats != nil { 100 + repository["stars_count"] = repo.RepoStats.StarCount 101 + repository["open_issues_count"] = repo.RepoStats.IssueCount.Open 102 + } 103 + 104 + ownerObj := map[string]any{ 105 + "did": owner, 106 + } 107 + 108 + repository["owner"] = ownerObj 109 + 110 + pusherObj := map[string]interface{}{ 111 + "did": pusher, 112 + } 113 + 114 + // final payload 115 + payload := map[string]interface{}{ 116 + "ref": ref, 117 + "before": oldSha, 118 + "after": newSha, 119 + "repository": repository, 120 + "pusher": pusherObj, 121 + } 122 + 123 + return payload, nil 124 + } 125 + 126 + // sendWebhook sends the webhook http request 127 + func (w *WebhookNotifier) sendWebhook(ctx context.Context, webhook models.Webhook, event string, payload map[string]interface{}) { 128 + deliveryId := uuid.New().String() 129 + 130 + payloadBytes, err := json.Marshal(payload) 131 + if err != nil { 132 + w.logger.Error("failed to marshal webhook payload", "webhook_id", webhook.Id, "err", err) 133 + return 134 + } 135 + 136 + req, err := http.NewRequestWithContext(ctx, "POST", webhook.Url, bytes.NewReader(payloadBytes)) 137 + if err != nil { 138 + w.logger.Error("failed to create webhook request", "webhook_id", webhook.Id, "err", err) 139 + return 140 + } 141 + 142 + shortSha := payload["after"].(string)[:7] 143 + 144 + req.Header.Set("Content-Type", "application/json") 145 + req.Header.Set("User-Agent", "Tangled-Hook/"+shortSha) 146 + req.Header.Set("X-Tangled-Event", event) 147 + req.Header.Set("X-Tangled-Hook-ID", fmt.Sprintf("%d", webhook.Id)) 148 + req.Header.Set("X-Tangled-Delivery", deliveryId) 149 + 150 + if webhook.Secret != "" { 151 + signature := w.computeSignature(payloadBytes, webhook.Secret) 152 + req.Header.Set("X-Tangled-Signature-256", "sha256="+signature) 153 + } 154 + 155 + delivery := &models.WebhookDelivery{ 156 + WebhookId: webhook.Id, 157 + Event: event, 158 + DeliveryId: deliveryId, 159 + Url: webhook.Url, 160 + RequestBody: string(payloadBytes), 161 + } 162 + 163 + // retry webhook delivery with exponential backoff 164 + retryOpts := []retry.Option{ 165 + retry.Attempts(3), 166 + retry.Delay(1 * time.Second), 167 + retry.MaxDelay(10 * time.Second), 168 + retry.DelayType(retry.BackOffDelay), 169 + retry.LastErrorOnly(true), 170 + retry.OnRetry(func(n uint, err error) { 171 + w.logger.Info("retrying webhook delivery", 172 + "webhook_id", webhook.Id, 173 + "attempt", n+1, 174 + "err", err) 175 + }), 176 + retry.Context(ctx), 177 + retry.RetryIf(func(err error) bool { 178 + // only retry on network errors or 5xx responses 179 + if err != nil { 180 + return true 181 + } 182 + return false 183 + }), 184 + } 185 + 186 + var resp *http.Response 187 + err = retry.Do(func() error { 188 + var err error 189 + resp, err = w.client.Do(req) 190 + if err != nil { 191 + return err 192 + } 193 + 194 + // retry on 5xx server errors 195 + if resp.StatusCode >= 500 { 196 + defer resp.Body.Close() 197 + return fmt.Errorf("server error: %d", resp.StatusCode) 198 + } 199 + 200 + return nil 201 + }, retryOpts...) 202 + 203 + if err != nil { 204 + w.logger.Error("webhook request failed after retries", "webhook_id", webhook.Id, "err", err) 205 + delivery.Success = false 206 + delivery.ResponseBody = err.Error() 207 + } else { 208 + defer resp.Body.Close() 209 + 210 + delivery.ResponseCode = resp.StatusCode 211 + delivery.Success = resp.StatusCode >= 200 && resp.StatusCode < 300 212 + 213 + // Read response body (limit to 10KB) 214 + bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 10*1024)) 215 + if err != nil { 216 + w.logger.Warn("failed to read webhook response body", "webhook_id", webhook.Id, "err", err) 217 + } else { 218 + delivery.ResponseBody = string(bodyBytes) 219 + } 220 + 221 + if !delivery.Success { 222 + w.logger.Warn("webhook delivery failed", 223 + "webhook_id", webhook.Id, 224 + "status", resp.StatusCode, 225 + "url", webhook.Url) 226 + } else { 227 + w.logger.Info("webhook delivered successfully", 228 + "webhook_id", webhook.Id, 229 + "url", webhook.Url, 230 + "delivery_id", deliveryId) 231 + } 232 + } 233 + 234 + if err := db.AddWebhookDelivery(w.db, delivery); err != nil { 235 + w.logger.Error("failed to record webhook delivery", "webhook_id", webhook.Id, "err", err) 236 + } 237 + } 238 + 239 + // computeSignature computes HMAC-SHA256 signature for the payload 240 + func (w *WebhookNotifier) computeSignature(payload []byte, secret string) string { 241 + mac := hmac.New(sha256.New, []byte(secret)) 242 + mac.Write(payload) 243 + return hex.EncodeToString(mac.Sum(nil)) 244 + }

History

6 rounds 3 comments
sign up or login to add to the discussion
1 commit
expand
appview/notify: add Push event and webhook notifier
3/3 success
expand
expand 0 comments
pull request successfully merged
1 commit
expand
appview/notify: add Push event and webhook notifier
3/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: add Push event and webhook notifier
3/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: add Push event and webhook notifier
3/3 success
expand
expand 0 comments
1 commit
expand
appview/notify: add Push event and webhook notifier
3/3 success
expand
expand 3 comments
  • would be nice to have a strongly typed payload object here
  • the value for user-agent seems a bit strange, its Tangled-Hook/<short-sha>, would it be better to identify the repo here?

changeset lgtm otherwise!

GitHub does the same for their hook version. I suppose we could have a separate header with the repository info?

makes sense, let's do the same then

1 commit
expand
appview/notify: add Push event and webhook notifier
3/3 success
expand
expand 0 comments