rss email digests over ssh because you're a cool kid herald.dunkirk.sh
go rss rss-reader ssh charm

fix: resolve database deadlock in sendDigestAndMarkSeen

- Root cause: RecordEmailSend() called while transaction tx was open
- With SetMaxOpenConns(1), this caused self-deadlock (same goroutine
trying to acquire connection it already holds)
- Solution: Added GenerateTrackingToken() and RecordEmailSendTx(tx)
that accepts transaction parameter
- Generate token before transaction, record within transaction
- Removed debug logging from email/send.go (no longer needed)

The hang occurred at scheduler/scheduler.go:412 where RecordEmailSend
tried to use db.Exec() while the transaction had the only connection
locked. SQLite's busy_timeout doesn't help with self-deadlock because
it's the same connection context.

dunkirk.sh 72d4a4e8 ad56e000

verified
Changed files
+239 -35
config
email
scheduler
ssh
store
web
+5
config/app.go
··· 19 19 HostKeyPath string `yaml:"host_key_path"` 20 20 DBPath string `yaml:"db_path"` 21 21 Origin string `yaml:"origin"` 22 + LogLevel string `yaml:"log_level"` 22 23 SMTP SMTPConfig `yaml:"smtp"` 23 24 AllowAllKeys bool `yaml:"allow_all_keys"` 24 25 AllowedKeys []string `yaml:"allowed_keys"` ··· 44 45 HostKeyPath: "./host_key", 45 46 DBPath: "./herald.db", 46 47 Origin: "http://localhost:8080", 48 + LogLevel: "info", 47 49 SMTP: SMTPConfig{ 48 50 Host: "localhost", 49 51 Port: 587, ··· 164 166 } 165 167 if v := os.Getenv("HERALD_ORIGIN"); v != "" { 166 168 cfg.Origin = v 169 + } 170 + if v := os.Getenv("HERALD_LOG_LEVEL"); v != "" { 171 + cfg.LogLevel = v 167 172 } 168 173 }
+62 -2
email/send.go
··· 257 257 return m.sendWithTLS(addr, auth, to, messageBytes) 258 258 } 259 259 260 - return smtp.SendMail(addr, auth, m.cfg.From, []string{to}, messageBytes) 260 + return m.sendWithSTARTTLS(addr, auth, to, messageBytes) 261 261 } 262 262 263 263 func encodeQuotedPrintable(s string) string { ··· 274 274 MinVersion: tls.VersionTLS12, 275 275 } 276 276 277 - conn, err := tls.Dial("tcp", addr, tlsConfig) 277 + dialer := &net.Dialer{Timeout: 30 * time.Second} 278 + conn, err := tls.DialWithDialer(dialer, "tcp", addr, tlsConfig) 278 279 if err != nil { 279 280 return fmt.Errorf("TLS dial: %w", err) 280 281 } 282 + if err := conn.SetDeadline(time.Now().Add(30 * time.Second)); err != nil { 283 + _ = conn.Close() 284 + return fmt.Errorf("set deadline: %w", err) 285 + } 281 286 defer func() { _ = conn.Close() }() 282 287 283 288 client, err := smtp.NewClient(conn, m.cfg.Host) ··· 285 290 return fmt.Errorf("SMTP client: %w", err) 286 291 } 287 292 defer func() { _ = client.Close() }() 293 + 294 + if auth != nil { 295 + if err = client.Auth(auth); err != nil { 296 + return fmt.Errorf("auth: %w", err) 297 + } 298 + } 299 + 300 + if err = client.Mail(m.cfg.From); err != nil { 301 + return fmt.Errorf("mail from: %w", err) 302 + } 303 + 304 + if err = client.Rcpt(to); err != nil { 305 + return fmt.Errorf("rcpt to: %w", err) 306 + } 307 + 308 + w, err := client.Data() 309 + if err != nil { 310 + return fmt.Errorf("data: %w", err) 311 + } 312 + 313 + if _, err = w.Write(msg); err != nil { 314 + return fmt.Errorf("write: %w", err) 315 + } 316 + 317 + if err = w.Close(); err != nil { 318 + return fmt.Errorf("close data: %w", err) 319 + } 320 + 321 + return client.Quit() 322 + } 323 + 324 + func (m *Mailer) sendWithSTARTTLS(addr string, auth smtp.Auth, to string, msg []byte) error { 325 + dialer := &net.Dialer{Timeout: 30 * time.Second} 326 + conn, err := dialer.Dial("tcp", addr) 327 + if err != nil { 328 + return fmt.Errorf("dial: %w", err) 329 + } 330 + if err := conn.SetDeadline(time.Now().Add(30 * time.Second)); err != nil { 331 + _ = conn.Close() 332 + return fmt.Errorf("set deadline: %w", err) 333 + } 334 + defer func() { _ = conn.Close() }() 335 + 336 + client, err := smtp.NewClient(conn, m.cfg.Host) 337 + if err != nil { 338 + return fmt.Errorf("SMTP client: %w", err) 339 + } 340 + defer func() { _ = client.Close() }() 341 + 342 + if err = client.StartTLS(&tls.Config{ 343 + ServerName: m.cfg.Host, 344 + MinVersion: tls.VersionTLS12, 345 + }); err != nil { 346 + return fmt.Errorf("STARTTLS: %w", err) 347 + } 288 348 289 349 if auth != nil { 290 350 if err = client.Auth(auth); err != nil {
+14
main.go
··· 130 130 return fmt.Errorf("failed to load config: %w", err) 131 131 } 132 132 133 + // Set log level from config 134 + level := log.InfoLevel 135 + switch strings.ToLower(cfg.LogLevel) { 136 + case "debug": 137 + level = log.DebugLevel 138 + case "info": 139 + level = log.InfoLevel 140 + case "warn": 141 + level = log.WarnLevel 142 + case "error": 143 + level = log.ErrorLevel 144 + } 145 + logger.SetLevel(level) 146 + 133 147 logger.Info("starting herald", 134 148 "ssh_port", cfg.SSHPort, 135 149 "http_port", cfg.HTTPPort,
+11 -5
scheduler/fetch.go
··· 5 5 "context" 6 6 "net/http" 7 7 "sync" 8 + "sync/atomic" 8 9 "time" 9 10 10 11 "github.com/kierank/herald/store" ··· 12 13 ) 13 14 14 15 const ( 15 - feedFetchTimeout = 30 * time.Second 16 - maxConcurrentFetch = 10 16 + feedFetchTimeout = 15 * time.Second 17 + maxConcurrentFetch = 30 17 18 ) 18 19 19 20 type FetchResult struct { ··· 63 64 } 64 65 65 66 client := &http.Client{ 66 - Timeout: 30 * time.Second, 67 + Timeout: 15 * time.Second, 67 68 } 68 69 69 70 resp, err := client.Do(req) ··· 125 126 return result 126 127 } 127 128 128 - func FetchFeeds(ctx context.Context, feeds []*store.Feed) []*FetchResult { 129 + func FetchFeeds(ctx context.Context, feeds []*store.Feed, progress *atomic.Int32) []*FetchResult { 129 130 results := make([]*FetchResult, len(feeds)) 130 131 var wg sync.WaitGroup 131 132 ··· 138 139 for i, feed := range feeds { 139 140 wg.Add(1) 140 141 go func(idx int, f *store.Feed) { 141 - defer wg.Done() 142 + defer func() { 143 + if progress != nil { 144 + progress.Add(1) 145 + } 146 + wg.Done() 147 + }() 142 148 semaphore <- struct{}{} // Acquire 143 149 defer func() { <-semaphore }() // Release 144 150 results[idx] = FetchFeed(ctx, f)
+72 -17
scheduler/scheduler.go
··· 3 3 import ( 4 4 "context" 5 5 "fmt" 6 + "sync/atomic" 6 7 "time" 7 8 8 9 "github.com/adhocore/gronx" ··· 31 32 inactivityThreshold = 90 // days without opens 32 33 minSendsBeforeDeactivate = 3 // minimum sends before considering deactivation 33 34 ) 35 + 36 + // RunStats contains detailed statistics from a feed fetch run 37 + type RunStats struct { 38 + TotalFeeds int 39 + FetchedFeeds int 40 + FailedFeeds int 41 + NewItems int 42 + EmailSent bool 43 + } 34 44 35 45 type Scheduler struct { 36 46 store *store.DB ··· 184 194 } 185 195 } 186 196 187 - func (s *Scheduler) RunNow(ctx context.Context, configID int64) (int, error) { 197 + func (s *Scheduler) RunNow(ctx context.Context, configID int64, progress *atomic.Int32) (*RunStats, error) { 188 198 cfg, err := s.store.GetConfigByID(ctx, configID) 189 199 if err != nil { 190 - return 0, fmt.Errorf("get config: %w", err) 200 + return nil, fmt.Errorf("get config: %w", err) 191 201 } 192 202 193 203 feeds, err := s.store.GetFeedsByConfig(ctx, cfg.ID) 194 204 if err != nil { 195 - return 0, fmt.Errorf("get feeds: %w", err) 205 + return nil, fmt.Errorf("get feeds: %w", err) 196 206 } 197 207 198 208 if len(feeds) == 0 { 199 - return 0, fmt.Errorf("no feeds configured") 209 + return nil, fmt.Errorf("no feeds configured") 200 210 } 201 211 202 - results := FetchFeeds(ctx, feeds) 212 + stats := &RunStats{ 213 + TotalFeeds: len(feeds), 214 + } 215 + 216 + results := FetchFeeds(ctx, feeds, progress) 217 + s.logger.Debug("RunNow: fetching complete", "total", len(feeds)) 218 + 219 + // Count successful and failed fetches 220 + for _, result := range results { 221 + if result.Error != nil { 222 + stats.FailedFeeds++ 223 + } else { 224 + stats.FetchedFeeds++ 225 + } 226 + } 227 + s.logger.Debug("RunNow: counting complete", "fetched", stats.FetchedFeeds, "failed", stats.FailedFeeds) 203 228 204 229 feedGroups, totalNew, err := s.collectNewItems(ctx, results) 230 + s.logger.Debug("RunNow: collectNewItems complete", "totalNew", totalNew, "err", err) 205 231 if err != nil { 206 - return 0, err 232 + return stats, err 207 233 } 208 234 235 + stats.NewItems = totalNew 236 + 209 237 if totalNew > 0 { 238 + s.logger.Debug("RunNow: starting email send") 210 239 if err := s.sendDigestAndMarkSeen(ctx, cfg, feedGroups, totalNew, results); err != nil { 211 - return 0, err 240 + s.logger.Error("RunNow: sendDigestAndMarkSeen failed", "err", err) 241 + return stats, err 212 242 } 243 + stats.EmailSent = true 213 244 s.logger.Info("email sent", "to", cfg.Email, "items", totalNew) 214 245 } 246 + s.logger.Debug("RunNow: email phase complete") 215 247 216 248 // Update feed metadata 249 + s.logger.Debug("RunNow: updating feed metadata", "count", len(results)) 217 250 for _, result := range results { 218 251 if result.ETag != "" || result.LastModified != "" { 219 252 if err := s.store.UpdateFeedFetched(ctx, result.FeedID, result.ETag, result.LastModified); err != nil { ··· 221 254 } 222 255 } 223 256 } 257 + s.logger.Debug("RunNow: feed metadata updated") 224 258 259 + s.logger.Debug("RunNow: calculating next run") 225 260 now := time.Now() 226 261 nextRun, err := gronx.NextTick(cfg.CronExpr, false) 227 262 if err != nil { 228 - return totalNew, fmt.Errorf("calculate next run: %w", err) 263 + return stats, fmt.Errorf("calculate next run: %w", err) 229 264 } 265 + s.logger.Debug("RunNow: updating last run", "nextRun", nextRun) 230 266 231 267 if err := s.store.UpdateLastRun(ctx, cfg.ID, now, nextRun); err != nil { 232 - return totalNew, fmt.Errorf("update last run: %w", err) 268 + return stats, fmt.Errorf("update last run: %w", err) 233 269 } 234 270 235 271 _ = s.store.AddLog(ctx, cfg.ID, "info", fmt.Sprintf("Processed: %d new items, next run: %s", totalNew, nextRun.Format(time.RFC3339))) 236 272 237 - return totalNew, nil 273 + s.logger.Debug("RunNow: complete") 274 + return stats, nil 238 275 } 239 276 240 277 func (s *Scheduler) collectNewItems(ctx context.Context, results []*FetchResult) ([]email.FeedGroup, int, error) { ··· 305 342 } 306 343 307 344 func (s *Scheduler) sendDigestAndMarkSeen(ctx context.Context, cfg *store.Config, feedGroups []email.FeedGroup, totalNew int, results []*FetchResult) error { 345 + s.logger.Debug("sendDigestAndMarkSeen: start", "totalNew", totalNew) 308 346 digestData := &email.DigestData{ 309 347 ConfigName: cfg.Filename, 310 348 TotalItems: totalNew, ··· 316 354 inline = false 317 355 } 318 356 357 + s.logger.Debug("sendDigestAndMarkSeen: rendering digest") 319 358 htmlBody, textBody, err := email.RenderDigest(digestData, inline) 320 359 if err != nil { 321 360 return fmt.Errorf("render digest: %w", err) 322 361 } 362 + s.logger.Debug("sendDigestAndMarkSeen: digest rendered") 323 363 324 364 unsubToken, err := s.store.GetOrCreateUnsubscribeToken(ctx, cfg.ID) 325 365 if err != nil { 326 366 s.logger.Warn("failed to create unsubscribe token", "err", err) 327 367 unsubToken = "" 328 368 } 369 + s.logger.Debug("sendDigestAndMarkSeen: got unsub token") 329 370 330 371 user, err := s.store.GetUserByID(ctx, cfg.UserID) 331 372 dashboardURL := "" ··· 334 375 } else { 335 376 s.logger.Warn("failed to get user for dashboard URL", "err", err) 336 377 } 378 + s.logger.Debug("sendDigestAndMarkSeen: got dashboard URL") 337 379 338 380 // Rate limit email sending per user 339 381 if !s.rateLimiter.Allow(fmt.Sprintf("email:%d", cfg.UserID)) { 340 382 return fmt.Errorf("rate limit exceeded for email sending") 341 383 } 384 + s.logger.Debug("sendDigestAndMarkSeen: rate limit ok") 342 385 343 386 // Begin transaction to mark items seen 344 387 tx, err := s.store.BeginTx(ctx) ··· 346 389 return fmt.Errorf("begin transaction: %w", err) 347 390 } 348 391 defer func() { _ = tx.Rollback() }() 392 + s.logger.Debug("sendDigestAndMarkSeen: transaction started") 349 393 350 394 // Mark items seen BEFORE sending email 351 395 for _, result := range results { ··· 358 402 } 359 403 } 360 404 } 405 + s.logger.Debug("sendDigestAndMarkSeen: items marked seen") 361 406 362 - // Send email - if this fails, transaction will rollback 363 - subject := "feed digest" 364 - 365 - // Record email send with tracking 366 - trackingToken, err := s.store.RecordEmailSend(cfg.ID, cfg.Email, subject, true) 407 + // Generate tracking token BEFORE recording (needed for email pixel URL) 408 + trackingToken, err := s.store.GenerateTrackingToken() 367 409 if err != nil { 368 - s.logger.Warn("failed to record email send", "err", err) 410 + s.logger.Warn("failed to generate tracking token", "err", err) 369 411 trackingToken = "" 370 412 } 413 + s.logger.Debug("sendDigestAndMarkSeen: generated tracking token") 414 + 415 + // Record email send with tracking (within transaction) 416 + subject := "feed digest" 417 + s.logger.Debug("sendDigestAndMarkSeen: recording email send") 418 + if err := s.store.RecordEmailSendTx(tx, cfg.ID, cfg.Email, subject, trackingToken); err != nil { 419 + s.logger.Warn("failed to record email send", "err", err) 420 + } 421 + s.logger.Debug("sendDigestAndMarkSeen: recorded email send") 371 422 423 + // Send email - if this fails, transaction will rollback 424 + s.logger.Debug("sendDigestAndMarkSeen: calling mailer.Send", "to", cfg.Email) 372 425 if err := s.mailer.Send(cfg.Email, subject, htmlBody, textBody, unsubToken, dashboardURL, trackingToken); err != nil { 426 + s.logger.Error("sendDigestAndMarkSeen: mailer.Send failed", "err", err) 373 427 return fmt.Errorf("send email: %w", err) 374 428 } 429 + s.logger.Debug("sendDigestAndMarkSeen: mailer.Send returned successfully") 375 430 376 431 // Commit transaction only after successful email send 377 432 if err := tx.Commit(); err != nil { ··· 394 449 return nil 395 450 } 396 451 397 - results := FetchFeeds(ctx, feeds) 452 + results := FetchFeeds(ctx, feeds, nil) // No progress tracking for background jobs 398 453 399 454 feedGroups, totalNew, err := s.collectNewItems(ctx, results) 400 455 if err != nil {
+42 -11
ssh/commands.go
··· 5 5 "fmt" 6 6 "io" 7 7 "strings" 8 + "sync/atomic" 8 9 "time" 9 10 10 11 "github.com/charmbracelet/lipgloss" ··· 175 176 return 176 177 } 177 178 178 - // Simple spinner animation 179 + // Get feed count for progress display 180 + feeds, err := st.GetFeedsByConfig(ctx, cfg.ID) 181 + if err != nil { 182 + println(sess, errorStyle.Render("Error: "+err.Error())) 183 + return 184 + } 185 + totalFeeds := len(feeds) 186 + 187 + // Progress tracking 188 + var progress atomic.Int32 179 189 spinChars := []string{"⠋", "⠙", "⠹", "⠸", "⠼", "⠴", "⠦", "⠧", "⠇", "⠏"} 180 190 done := make(chan struct{}) 181 191 result := make(chan struct { 182 - items int 192 + stats *scheduler.RunStats 183 193 err error 184 194 }) 185 195 186 - // Spinner goroutine 196 + // Spinner goroutine with real-time progress 187 197 go func() { 188 198 i := 0 189 199 for { ··· 191 201 case <-done: 192 202 return 193 203 default: 194 - printf(sess, "\r%s Fetching feeds...", spinChars[i%len(spinChars)]) 204 + completed := progress.Load() 205 + printf(sess, "\r%s Fetching feeds... %d/%d", spinChars[i%len(spinChars)], completed, totalFeeds) 195 206 i++ 196 207 time.Sleep(80 * time.Millisecond) 197 208 } ··· 200 211 201 212 // Work goroutine 202 213 go func() { 203 - newItems, err := sched.RunNow(ctx, cfg.ID) 214 + stats, err := sched.RunNow(ctx, cfg.ID, &progress) 204 215 result <- struct { 205 - items int 216 + stats *scheduler.RunStats 206 217 err error 207 - }{items: newItems, err: err} 218 + }{stats: stats, err: err} 208 219 }() 209 220 210 221 // Wait for result ··· 217 228 return 218 229 } 219 230 220 - if res.items == 0 { 221 - println(sess, dimStyle.Render("No new items found.")) 222 - } else { 223 - println(sess, successStyle.Render(fmt.Sprintf("Sent %d new item(s) to %s", res.items, cfg.Email))) 231 + // Display detailed stats 232 + if res.stats != nil { 233 + if res.stats.FailedFeeds > 0 { 234 + printf(sess, "%s Fetched %d/%d feeds (%d failed)\n", 235 + dimStyle.Render("⚠"), 236 + res.stats.FetchedFeeds, 237 + res.stats.TotalFeeds, 238 + res.stats.FailedFeeds) 239 + } else { 240 + printf(sess, "%s Fetched %d/%d feeds\n", 241 + successStyle.Render("✓"), 242 + res.stats.FetchedFeeds, 243 + res.stats.TotalFeeds) 244 + } 245 + 246 + if res.stats.NewItems == 0 { 247 + println(sess, dimStyle.Render("No new items found.")) 248 + } else { 249 + if res.stats.EmailSent { 250 + println(sess, successStyle.Render(fmt.Sprintf("Sent %d new item(s) to %s", res.stats.NewItems, cfg.Email))) 251 + } else { 252 + println(sess, dimStyle.Render(fmt.Sprintf("Found %d new item(s) but did not send email", res.stats.NewItems))) 253 + } 254 + } 224 255 } 225 256 } 226 257
+20
store/tracking.go
··· 42 42 return trackingToken, nil 43 43 } 44 44 45 + // RecordEmailSendTx records an email send within an existing transaction 46 + func (db *DB) RecordEmailSendTx(tx *sql.Tx, configID int64, recipient, subject, trackingToken string) error { 47 + query := `INSERT INTO email_sends (config_id, recipient, subject, tracking_token) 48 + VALUES (?, ?, ?, ?)` 49 + _, err := tx.Exec(query, configID, recipient, subject, sql.NullString{String: trackingToken, Valid: trackingToken != ""}) 50 + if err != nil { 51 + return fmt.Errorf("insert email send: %w", err) 52 + } 53 + return nil 54 + } 55 + 45 56 // MarkEmailBounced marks an email as bounced 46 57 func (db *DB) MarkEmailBounced(configID int64, recipient, reason string) error { 47 58 query := `UPDATE email_sends ··· 168 179 return 0, fmt.Errorf("cleanup old sends: %w", err) 169 180 } 170 181 return result.RowsAffected() 182 + } 183 + 184 + // GenerateTrackingToken generates a secure random tracking token 185 + func (db *DB) GenerateTrackingToken() (string, error) { 186 + b := make([]byte, 24) 187 + if _, err := rand.Read(b); err != nil { 188 + return "", err 189 + } 190 + return base64.URLEncoding.EncodeToString(b), nil 171 191 } 172 192 173 193 func generateTrackingToken() (string, error) {
+13
web/handlers.go
··· 1 1 package web 2 2 3 3 import ( 4 + "context" 4 5 "database/sql" 5 6 "encoding/json" 6 7 "encoding/xml" ··· 89 90 if errors.Is(err, sql.ErrNoRows) { 90 91 s.handle404(w, r) 91 92 return 93 + } 94 + if errors.Is(err, context.Canceled) { 95 + return // Client disconnected, don't log as error 92 96 } 93 97 s.logger.Warn("get user", "err", err) 94 98 http.Error(w, "Internal Server Error", http.StatusInternalServerError) ··· 228 232 if errors.Is(err, sql.ErrNoRows) { 229 233 s.handle404(w, r) 230 234 return 235 + } 236 + if errors.Is(err, context.Canceled) { 237 + return // Client disconnected 231 238 } 232 239 s.logger.Warn("get user", "err", err) 233 240 http.Error(w, "Internal Server Error", http.StatusInternalServerError) ··· 358 365 s.handle404(w, r) 359 366 return 360 367 } 368 + if errors.Is(err, context.Canceled) { 369 + return // Client disconnected 370 + } 361 371 s.logger.Warn("get user", "err", err) 362 372 http.Error(w, "Internal Server Error", http.StatusInternalServerError) 363 373 return ··· 463 473 if errors.Is(err, sql.ErrNoRows) { 464 474 s.handle404(w, r) 465 475 return 476 + } 477 + if errors.Is(err, context.Canceled) { 478 + return // Client disconnected 466 479 } 467 480 s.logger.Warn("get user", "err", err) 468 481 http.Error(w, "Internal Server Error", http.StatusInternalServerError)