Monorepo for Tangled tangled.org

appview/notify: add Push event and webhook notifier

Add Push method to Notifier interface for git push events. Implement
WebhookNotifier that sends webhook payloads with HMAC-SHA256 signatures
for authentication. Supports push events with delivery tracking and
retry logic (3 attempts with exponential backoff).

Signed-off-by: Anirudh Oppiliappan <anirudh@tangled.org>

anirudh.fi 9ba02abc 1c4085b4

verified
+257
+4
appview/notify/db/db.go
··· 354 354 // no-op 355 355 } 356 356 357 + func (n *databaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 358 + // no-op for now; webhooks are handled by the webhook notifier 359 + } 360 + 357 361 func (n *databaseNotifier) NewIssueState(ctx context.Context, actor syntax.DID, issue *models.Issue) { 358 362 l := log.FromContext(ctx) 359 363
+5
appview/notify/logging_notifier.go
··· 103 103 ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "DeleteString")) 104 104 l.inner.DeleteString(ctx, did, rkey) 105 105 } 106 + 107 + func (l *loggingNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 108 + ctx = tlog.IntoContext(ctx, tlog.SubLogger(l.logger, "Push")) 109 + l.inner.Push(ctx, repo, ref, oldSha, newSha, committerDid) 110 + }
+4
appview/notify/merged_notifier.go
··· 93 93 func (m *mergedNotifier) DeleteString(ctx context.Context, did, rkey string) { 94 94 m.fanout(func(n Notifier) { n.DeleteString(ctx, did, rkey) }) 95 95 } 96 + 97 + func (m *mergedNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 98 + m.fanout(func(n Notifier) { n.Push(ctx, repo, ref, oldSha, newSha, committerDid) }) 99 + }
+5
appview/notify/notifier.go
··· 30 30 NewString(ctx context.Context, s *models.String) 31 31 EditString(ctx context.Context, s *models.String) 32 32 DeleteString(ctx context.Context, did, rkey string) 33 + 34 + Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) 33 35 } 34 36 35 37 // BaseNotifier is a listener that does nothing ··· 61 63 func (m *BaseNotifier) NewString(ctx context.Context, s *models.String) {} 62 64 func (m *BaseNotifier) EditString(ctx context.Context, s *models.String) {} 63 65 func (m *BaseNotifier) DeleteString(ctx context.Context, did, rkey string) {} 66 + 67 + func (m *BaseNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 68 + }
+239
appview/notify/webhook_notifier.go
··· 1 + package notify 2 + 3 + import ( 4 + "bytes" 5 + "context" 6 + "crypto/hmac" 7 + "crypto/sha256" 8 + "encoding/hex" 9 + "encoding/json" 10 + "fmt" 11 + "io" 12 + "log/slog" 13 + "net/http" 14 + "time" 15 + 16 + "github.com/avast/retry-go/v4" 17 + "github.com/google/uuid" 18 + "tangled.org/core/appview/db" 19 + "tangled.org/core/appview/models" 20 + "tangled.org/core/log" 21 + ) 22 + 23 + type WebhookNotifier struct { 24 + BaseNotifier 25 + db *db.DB 26 + logger *slog.Logger 27 + client *http.Client 28 + } 29 + 30 + func NewWebhookNotifier(database *db.DB) *WebhookNotifier { 31 + return &WebhookNotifier{ 32 + db: database, 33 + logger: log.New("webhook-notifier"), 34 + client: &http.Client{ 35 + Timeout: 30 * time.Second, 36 + }, 37 + } 38 + } 39 + 40 + // Push implements the Notifier interface for git push events 41 + func (w *WebhookNotifier) Push(ctx context.Context, repo *models.Repo, ref, oldSha, newSha, committerDid string) { 42 + webhooks, err := db.GetActiveWebhooksForRepo(w.db, repo.RepoAt()) 43 + if err != nil { 44 + w.logger.Error("failed to get webhooks for repo", "repo", repo.RepoAt(), "err", err) 45 + return 46 + } 47 + 48 + // check if any webhooks are subscribed to push events 49 + var pushWebhooks []models.Webhook 50 + for _, webhook := range webhooks { 51 + if webhook.HasEvent(models.WebhookEventPush) { 52 + pushWebhooks = append(pushWebhooks, webhook) 53 + } 54 + } 55 + 56 + if len(pushWebhooks) == 0 { 57 + return 58 + } 59 + 60 + payload, err := w.buildPushPayload(repo, ref, oldSha, newSha, committerDid) 61 + if err != nil { 62 + w.logger.Error("failed to build push payload", "repo", repo.RepoAt(), "err", err) 63 + return 64 + } 65 + 66 + // Send webhooks 67 + for _, webhook := range pushWebhooks { 68 + go w.sendWebhook(ctx, webhook, string(models.WebhookEventPush), payload) 69 + } 70 + } 71 + 72 + // buildPushPayload creates the webhook payload 73 + func (w *WebhookNotifier) buildPushPayload(repo *models.Repo, ref, oldSha, newSha, committerDid string) (*models.WebhookPayload, error) { 74 + owner := repo.Did 75 + 76 + pusher := committerDid 77 + if committerDid == "" { 78 + pusher = owner 79 + } 80 + 81 + // Build repository object 82 + repository := models.WebhookRepository{ 83 + Name: repo.Name, 84 + FullName: fmt.Sprintf("%s/%s", repo.Did, repo.Name), 85 + Description: repo.Description, 86 + Fork: repo.Source != "", 87 + HtmlUrl: fmt.Sprintf("https://%s/%s/%s", repo.Knot, repo.Did, repo.Name), 88 + CloneUrl: fmt.Sprintf("https://%s/%s/%s", repo.Knot, repo.Did, repo.Name), 89 + SshUrl: fmt.Sprintf("ssh://git@%s/%s/%s", repo.Knot, repo.Did, repo.Name), 90 + CreatedAt: repo.Created.Format(time.RFC3339), 91 + UpdatedAt: repo.Created.Format(time.RFC3339), 92 + Owner: models.WebhookUser{ 93 + Did: owner, 94 + }, 95 + } 96 + 97 + // Add optional fields 98 + if repo.Website != "" { 99 + repository.Website = repo.Website 100 + } 101 + if repo.RepoStats != nil { 102 + repository.StarsCount = repo.RepoStats.StarCount 103 + repository.OpenIssues = repo.RepoStats.IssueCount.Open 104 + } 105 + 106 + // Build payload 107 + payload := &models.WebhookPayload{ 108 + Ref: ref, 109 + Before: oldSha, 110 + After: newSha, 111 + Repository: repository, 112 + Pusher: models.WebhookUser{ 113 + Did: pusher, 114 + }, 115 + } 116 + 117 + return payload, nil 118 + } 119 + 120 + // sendWebhook sends the webhook http request 121 + func (w *WebhookNotifier) sendWebhook(ctx context.Context, webhook models.Webhook, event string, payload *models.WebhookPayload) { 122 + deliveryId := uuid.New().String() 123 + 124 + payloadBytes, err := json.Marshal(payload) 125 + if err != nil { 126 + w.logger.Error("failed to marshal webhook payload", "webhook_id", webhook.Id, "err", err) 127 + return 128 + } 129 + 130 + req, err := http.NewRequestWithContext(ctx, "POST", webhook.Url, bytes.NewReader(payloadBytes)) 131 + if err != nil { 132 + w.logger.Error("failed to create webhook request", "webhook_id", webhook.Id, "err", err) 133 + return 134 + } 135 + 136 + shortSha := payload.After[:7] 137 + 138 + req.Header.Set("Content-Type", "application/json") 139 + req.Header.Set("User-Agent", "Tangled-Hook/"+shortSha) 140 + req.Header.Set("X-Tangled-Event", event) 141 + req.Header.Set("X-Tangled-Hook-ID", fmt.Sprintf("%d", webhook.Id)) 142 + req.Header.Set("X-Tangled-Delivery", deliveryId) 143 + req.Header.Set("X-Tangled-Repo", payload.Repository.FullName) 144 + 145 + if webhook.Secret != "" { 146 + signature := w.computeSignature(payloadBytes, webhook.Secret) 147 + req.Header.Set("X-Tangled-Signature-256", "sha256="+signature) 148 + } 149 + 150 + delivery := &models.WebhookDelivery{ 151 + WebhookId: webhook.Id, 152 + Event: event, 153 + DeliveryId: deliveryId, 154 + Url: webhook.Url, 155 + RequestBody: string(payloadBytes), 156 + } 157 + 158 + // retry webhook delivery with exponential backoff 159 + retryOpts := []retry.Option{ 160 + retry.Attempts(3), 161 + retry.Delay(1 * time.Second), 162 + retry.MaxDelay(10 * time.Second), 163 + retry.DelayType(retry.BackOffDelay), 164 + retry.LastErrorOnly(true), 165 + retry.OnRetry(func(n uint, err error) { 166 + w.logger.Info("retrying webhook delivery", 167 + "webhook_id", webhook.Id, 168 + "attempt", n+1, 169 + "err", err) 170 + }), 171 + retry.Context(ctx), 172 + retry.RetryIf(func(err error) bool { 173 + // only retry on network errors or 5xx responses 174 + if err != nil { 175 + return true 176 + } 177 + return false 178 + }), 179 + } 180 + 181 + var resp *http.Response 182 + err = retry.Do(func() error { 183 + var err error 184 + resp, err = w.client.Do(req) 185 + if err != nil { 186 + return err 187 + } 188 + 189 + // retry on 5xx server errors 190 + if resp.StatusCode >= 500 { 191 + defer resp.Body.Close() 192 + return fmt.Errorf("server error: %d", resp.StatusCode) 193 + } 194 + 195 + return nil 196 + }, retryOpts...) 197 + 198 + if err != nil { 199 + w.logger.Error("webhook request failed after retries", "webhook_id", webhook.Id, "err", err) 200 + delivery.Success = false 201 + delivery.ResponseBody = err.Error() 202 + } else { 203 + defer resp.Body.Close() 204 + 205 + delivery.ResponseCode = resp.StatusCode 206 + delivery.Success = resp.StatusCode >= 200 && resp.StatusCode < 300 207 + 208 + // Read response body (limit to 10KB) 209 + bodyBytes, err := io.ReadAll(io.LimitReader(resp.Body, 10*1024)) 210 + if err != nil { 211 + w.logger.Warn("failed to read webhook response body", "webhook_id", webhook.Id, "err", err) 212 + } else { 213 + delivery.ResponseBody = string(bodyBytes) 214 + } 215 + 216 + if !delivery.Success { 217 + w.logger.Warn("webhook delivery failed", 218 + "webhook_id", webhook.Id, 219 + "status", resp.StatusCode, 220 + "url", webhook.Url) 221 + } else { 222 + w.logger.Info("webhook delivered successfully", 223 + "webhook_id", webhook.Id, 224 + "url", webhook.Url, 225 + "delivery_id", deliveryId) 226 + } 227 + } 228 + 229 + if err := db.AddWebhookDelivery(w.db, delivery); err != nil { 230 + w.logger.Error("failed to record webhook delivery", "webhook_id", webhook.Id, "err", err) 231 + } 232 + } 233 + 234 + // computeSignature computes HMAC-SHA256 signature for the payload 235 + func (w *WebhookNotifier) computeSignature(payload []byte, secret string) string { 236 + mac := hmac.New(sha256.New, []byte(secret)) 237 + mac.Write(payload) 238 + return hex.EncodeToString(mac.Sum(nil)) 239 + }