rss email digests over ssh because you're a cool kid herald.dunkirk.sh
go rss rss-reader ssh charm
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: security / reliability fixes

dunkirk.sh d4cea711 dfd5418d

verified
+226 -158
+9
scheduler/fetch.go
··· 122 122 func FetchFeeds(ctx context.Context, feeds []*store.Feed) []*FetchResult { 123 123 results := make([]*FetchResult, len(feeds)) 124 124 var wg sync.WaitGroup 125 + 126 + // Limit concurrent fetches to 10 to avoid overwhelming the system 127 + maxConcurrent := 10 128 + if len(feeds) < maxConcurrent { 129 + maxConcurrent = len(feeds) 130 + } 131 + semaphore := make(chan struct{}, maxConcurrent) 125 132 126 133 for i, feed := range feeds { 127 134 wg.Add(1) 128 135 go func(idx int, f *store.Feed) { 129 136 defer wg.Done() 137 + semaphore <- struct{}{} // Acquire 138 + defer func() { <-semaphore }() // Release 130 139 results[idx] = FetchFeed(ctx, f) 131 140 }(i, feed) 132 141 }
+101 -153
scheduler/scheduler.go
··· 79 79 80 80 results := FetchFeeds(ctx, feeds) 81 81 82 + feedGroups, totalNew, err := s.collectNewItems(ctx, results) 83 + if err != nil { 84 + return 0, err 85 + } 86 + 87 + if totalNew > 0 { 88 + if err := s.sendDigestAndMarkSeen(ctx, cfg, feedGroups, totalNew, results); err != nil { 89 + return 0, err 90 + } 91 + s.logger.Info("email sent", "to", cfg.Email, "items", totalNew) 92 + } 93 + 94 + // Update feed metadata 95 + for _, result := range results { 96 + if result.ETag != "" || result.LastModified != "" { 97 + if err := s.store.UpdateFeedFetched(ctx, result.FeedID, result.ETag, result.LastModified); err != nil { 98 + s.logger.Warn("failed to update feed fetched", "err", err) 99 + } 100 + } 101 + } 102 + 103 + now := time.Now() 104 + nextRun, err := gronx.NextTick(cfg.CronExpr, false) 105 + if err != nil { 106 + return totalNew, fmt.Errorf("calculate next run: %w", err) 107 + } 108 + 109 + if err := s.store.UpdateLastRun(ctx, cfg.ID, now, nextRun); err != nil { 110 + return totalNew, fmt.Errorf("update last run: %w", err) 111 + } 112 + 113 + _ = s.store.AddLog(ctx, cfg.ID, "info", fmt.Sprintf("Processed: %d new items, next run: %s", totalNew, nextRun.Format(time.RFC3339))) 114 + 115 + return totalNew, nil 116 + } 117 + 118 + func (s *Scheduler) collectNewItems(ctx context.Context, results []*FetchResult) ([]email.FeedGroup, int, error) { 82 119 var feedGroups []email.FeedGroup 83 120 totalNew := 0 84 121 threeMonthsAgo := time.Now().AddDate(0, -3, 0) ··· 125 162 }) 126 163 totalNew += len(newItems) 127 164 } 128 - 129 - if result.ETag != "" || result.LastModified != "" { 130 - if err := s.store.UpdateFeedFetched(ctx, result.FeedID, result.ETag, result.LastModified); err != nil { 131 - s.logger.Warn("failed to update feed fetched", "err", err) 132 - } 133 - } 134 165 } 135 166 136 167 if feedErrors == len(results) { 137 - return 0, fmt.Errorf("all feeds failed to fetch") 168 + return nil, 0, fmt.Errorf("all feeds failed to fetch") 138 169 } 139 170 140 - if totalNew > 0 { 141 - digestData := &email.DigestData{ 142 - ConfigName: cfg.Filename, 143 - TotalItems: totalNew, 144 - FeedGroups: feedGroups, 145 - } 171 + return feedGroups, totalNew, nil 172 + } 146 173 147 - inline := cfg.InlineContent 148 - if totalNew > 5 { 149 - inline = false 150 - } 174 + func (s *Scheduler) sendDigestAndMarkSeen(ctx context.Context, cfg *store.Config, feedGroups []email.FeedGroup, totalNew int, results []*FetchResult) error { 175 + digestData := &email.DigestData{ 176 + ConfigName: cfg.Filename, 177 + TotalItems: totalNew, 178 + FeedGroups: feedGroups, 179 + } 151 180 152 - htmlBody, textBody, err := email.RenderDigest(digestData, inline) 153 - if err != nil { 154 - return 0, fmt.Errorf("render digest: %w", err) 155 - } 181 + inline := cfg.InlineContent 182 + if totalNew > 5 { 183 + inline = false 184 + } 156 185 157 - unsubToken, err := s.store.GetOrCreateUnsubscribeToken(ctx, cfg.ID) 158 - if err != nil { 159 - s.logger.Warn("failed to create unsubscribe token", "err", err) 160 - unsubToken = "" 161 - } 186 + htmlBody, textBody, err := email.RenderDigest(digestData, inline) 187 + if err != nil { 188 + return fmt.Errorf("render digest: %w", err) 189 + } 162 190 163 - // Get user fingerprint for dashboard URL 164 - user, err := s.store.GetUserByID(ctx, cfg.UserID) 165 - dashboardURL := "" 166 - if err == nil { 167 - dashboardURL = s.originURL + "/" + user.PubkeyFP 168 - } else { 169 - s.logger.Warn("failed to get user for dashboard URL", "err", err) 170 - } 191 + unsubToken, err := s.store.GetOrCreateUnsubscribeToken(ctx, cfg.ID) 192 + if err != nil { 193 + s.logger.Warn("failed to create unsubscribe token", "err", err) 194 + unsubToken = "" 195 + } 171 196 172 - subject := "feed digest" 173 - if err := s.mailer.Send(cfg.Email, subject, htmlBody, textBody, unsubToken, dashboardURL); err != nil { 174 - return 0, fmt.Errorf("send email: %w", err) 175 - } 197 + user, err := s.store.GetUserByID(ctx, cfg.UserID) 198 + dashboardURL := "" 199 + if err == nil { 200 + dashboardURL = s.originURL + "/" + user.PubkeyFP 201 + } else { 202 + s.logger.Warn("failed to get user for dashboard URL", "err", err) 203 + } 176 204 177 - s.logger.Info("email sent", "to", cfg.Email, "items", totalNew) 205 + // Begin transaction to mark items seen 206 + tx, err := s.store.BeginTx(ctx) 207 + if err != nil { 208 + return fmt.Errorf("begin transaction: %w", err) 209 + } 210 + defer tx.Rollback() 178 211 179 - for _, result := range results { 180 - if result.Error != nil { 181 - continue 182 - } 183 - for _, item := range result.Items { 184 - if err := s.store.MarkItemSeen(ctx, result.FeedID, item.GUID, item.Title, item.Link); err != nil { 185 - s.logger.Warn("failed to mark item seen", "err", err) 186 - } 212 + // Mark items seen BEFORE sending email 213 + for _, result := range results { 214 + if result.Error != nil { 215 + continue 216 + } 217 + for _, item := range result.Items { 218 + if err := s.store.MarkItemSeenTx(ctx, tx, result.FeedID, item.GUID, item.Title, item.Link); err != nil { 219 + s.logger.Warn("failed to mark item seen", "err", err) 187 220 } 188 221 } 189 222 } 190 223 191 - now := time.Now() 192 - nextRun, err := gronx.NextTick(cfg.CronExpr, false) 193 - if err != nil { 194 - return totalNew, fmt.Errorf("calculate next run: %w", err) 224 + // Send email - if this fails, transaction will rollback 225 + subject := "feed digest" 226 + if err := s.mailer.Send(cfg.Email, subject, htmlBody, textBody, unsubToken, dashboardURL); err != nil { 227 + return fmt.Errorf("send email: %w", err) 195 228 } 196 229 197 - if err := s.store.UpdateLastRun(ctx, cfg.ID, now, nextRun); err != nil { 198 - return totalNew, fmt.Errorf("update last run: %w", err) 230 + // Commit transaction only after successful email send 231 + if err := tx.Commit(); err != nil { 232 + return fmt.Errorf("commit transaction: %w", err) 199 233 } 200 234 201 - _ = s.store.AddLog(ctx, cfg.ID, "info", fmt.Sprintf("Processed: %d new items, next run: %s", totalNew, nextRun.Format(time.RFC3339))) 202 - 203 - return totalNew, nil 235 + return nil 204 236 } 205 237 206 238 func (s *Scheduler) processConfig(ctx context.Context, cfg *store.Config) error { ··· 218 250 219 251 results := FetchFeeds(ctx, feeds) 220 252 221 - var feedGroups []email.FeedGroup 222 - totalNew := 0 223 - threeMonthsAgo := time.Now().AddDate(0, -3, 0) 224 - 225 - for _, result := range results { 226 - if result.Error != nil { 227 - s.logger.Warn("feed fetch error", "feed_id", result.FeedID, "url", result.FeedURL, "err", result.Error) 228 - continue 229 - } 230 - 231 - var newItems []email.FeedItem 232 - for _, item := range result.Items { 233 - // Skip items older than 3 months 234 - if !item.Published.IsZero() && item.Published.Before(threeMonthsAgo) { 235 - continue 236 - } 237 - 238 - seen, err := s.store.IsItemSeen(ctx, result.FeedID, item.GUID) 239 - if err != nil { 240 - s.logger.Warn("failed to check if item seen", "err", err) 241 - continue 242 - } 253 + feedGroups, totalNew, err := s.collectNewItems(ctx, results) 254 + if err != nil { 255 + s.logger.Warn("failed to collect items", "config_id", cfg.ID, "err", err) 256 + } 243 257 244 - if !seen { 245 - newItems = append(newItems, email.FeedItem{ 246 - Title: item.Title, 247 - Link: item.Link, 248 - Content: item.Content, 249 - Published: item.Published, 250 - }) 251 - } 258 + if totalNew > 0 { 259 + if err := s.sendDigestAndMarkSeen(ctx, cfg, feedGroups, totalNew, results); err != nil { 260 + return fmt.Errorf("send digest: %w", err) 252 261 } 262 + s.logger.Info("email sent", "to", cfg.Email, "items", totalNew) 263 + } else { 264 + s.logger.Info("no new items", "config_id", cfg.ID) 265 + } 253 266 254 - if len(newItems) > 0 { 255 - feedName := result.FeedName 256 - if feedName == "" { 257 - feedName = result.FeedURL 258 - } 259 - feedGroups = append(feedGroups, email.FeedGroup{ 260 - FeedName: feedName, 261 - FeedURL: result.FeedURL, 262 - Items: newItems, 263 - }) 264 - totalNew += len(newItems) 265 - } 266 - 267 + // Update feed metadata 268 + for _, result := range results { 267 269 if result.ETag != "" || result.LastModified != "" { 268 270 if err := s.store.UpdateFeedFetched(ctx, result.FeedID, result.ETag, result.LastModified); err != nil { 269 271 s.logger.Warn("failed to update feed fetched", "err", err) 270 - } 271 - } 272 - } 273 - 274 - if totalNew == 0 { 275 - s.logger.Info("no new items", "config_id", cfg.ID) 276 - } else { 277 - digestData := &email.DigestData{ 278 - ConfigName: cfg.Filename, 279 - TotalItems: totalNew, 280 - FeedGroups: feedGroups, 281 - } 282 - 283 - // Auto-disable inline content if more than 5 items 284 - inline := cfg.InlineContent 285 - if totalNew > 5 { 286 - inline = false 287 - } 288 - 289 - htmlBody, textBody, err := email.RenderDigest(digestData, inline) 290 - if err != nil { 291 - return fmt.Errorf("render digest: %w", err) 292 - } 293 - 294 - unsubToken, err := s.store.GetOrCreateUnsubscribeToken(ctx, cfg.ID) 295 - if err != nil { 296 - s.logger.Warn("failed to create unsubscribe token", "err", err) 297 - unsubToken = "" 298 - } 299 - 300 - // Get user fingerprint for dashboard URL 301 - user, err := s.store.GetUserByID(ctx, cfg.UserID) 302 - dashboardURL := "" 303 - if err == nil { 304 - dashboardURL = s.originURL + "/" + user.PubkeyFP 305 - } else { 306 - s.logger.Warn("failed to get user for dashboard URL", "err", err) 307 - } 308 - 309 - subject := "feed digest" 310 - if err := s.mailer.Send(cfg.Email, subject, htmlBody, textBody, unsubToken, dashboardURL); err != nil { 311 - return fmt.Errorf("send email: %w", err) 312 - } 313 - 314 - s.logger.Info("email sent", "to", cfg.Email, "items", totalNew) 315 - 316 - for _, result := range results { 317 - if result.Error != nil { 318 - continue 319 - } 320 - for _, item := range result.Items { 321 - if err := s.store.MarkItemSeen(ctx, result.FeedID, item.GUID, item.Title, item.Link); err != nil { 322 - s.logger.Warn("failed to mark item seen", "err", err) 323 - } 324 272 } 325 273 } 326 274 }
+15 -3
ssh/scp.go
··· 131 131 } 132 132 133 133 ctx := s.Context() 134 - if err := h.store.DeleteConfig(ctx, user.ID, name); err != nil { 134 + 135 + // Use transaction for config update 136 + tx, err := h.store.BeginTx(ctx) 137 + if err != nil { 138 + return 0, fmt.Errorf("begin transaction: %w", err) 139 + } 140 + defer tx.Rollback() 141 + 142 + if err := h.store.DeleteConfigTx(ctx, tx, user.ID, name); err != nil { 135 143 h.logger.Debug("no existing config to delete", "filename", name) 136 144 } else { 137 145 h.logger.Debug("deleted existing config", "filename", name) 138 146 } 139 147 140 - cfg, err := h.store.CreateConfig(ctx, user.ID, name, parsed.Email, parsed.CronExpr, parsed.Digest, parsed.Inline, string(content), nextRun) 148 + cfg, err := h.store.CreateConfigTx(ctx, tx, user.ID, name, parsed.Email, parsed.CronExpr, parsed.Digest, parsed.Inline, string(content), nextRun) 141 149 if err != nil { 142 150 return 0, fmt.Errorf("failed to save config: %w", err) 143 151 } 144 152 145 153 for _, feed := range parsed.Feeds { 146 - if _, err := h.store.CreateFeed(ctx, cfg.ID, feed.URL, feed.Name); err != nil { 154 + if _, err := h.store.CreateFeedTx(ctx, tx, cfg.ID, feed.URL, feed.Name); err != nil { 147 155 return 0, fmt.Errorf("failed to save feed: %w", err) 148 156 } 157 + } 158 + 159 + if err := tx.Commit(); err != nil { 160 + return 0, fmt.Errorf("commit transaction: %w", err) 149 161 } 150 162 151 163 h.logger.Info("config uploaded", "user_id", user.ID, "filename", name, "feeds", len(parsed.Feeds), "next_run", nextRun)
+48
store/configs.go
··· 52 52 }, nil 53 53 } 54 54 55 + func (db *DB) CreateConfigTx(ctx context.Context, tx *sql.Tx, userID int64, filename, email, cronExpr string, digest, inline bool, rawText string, nextRun time.Time) (*Config, error) { 56 + result, err := tx.ExecContext(ctx, 57 + `INSERT INTO configs (user_id, filename, email, cron_expr, digest, inline_content, raw_text, next_run) 58 + VALUES (?, ?, ?, ?, ?, ?, ?, ?)`, 59 + userID, filename, email, cronExpr, digest, inline, rawText, nextRun, 60 + ) 61 + if err != nil { 62 + return nil, fmt.Errorf("insert config: %w", err) 63 + } 64 + 65 + id, err := result.LastInsertId() 66 + if err != nil { 67 + return nil, fmt.Errorf("get last insert id: %w", err) 68 + } 69 + 70 + return &Config{ 71 + ID: id, 72 + UserID: userID, 73 + Filename: filename, 74 + Email: email, 75 + CronExpr: cronExpr, 76 + Digest: digest, 77 + InlineContent: inline, 78 + RawText: rawText, 79 + NextRun: sql.NullTime{Time: nextRun, Valid: true}, 80 + CreatedAt: time.Now(), 81 + }, nil 82 + } 83 + 84 + func (db *DB) DeleteConfigTx(ctx context.Context, tx *sql.Tx, userID int64, filename string) error { 85 + result, err := tx.ExecContext(ctx, 86 + `DELETE FROM configs WHERE user_id = ? AND filename = ?`, 87 + userID, filename, 88 + ) 89 + if err != nil { 90 + return fmt.Errorf("delete config: %w", err) 91 + } 92 + 93 + n, err := result.RowsAffected() 94 + if err != nil { 95 + return fmt.Errorf("rows affected: %w", err) 96 + } 97 + if n == 0 { 98 + return sql.ErrNoRows 99 + } 100 + return nil 101 + } 102 + 55 103 func (db *DB) GetConfig(ctx context.Context, userID int64, filename string) (*Config, error) { 56 104 var cfg Config 57 105 err := db.QueryRowContext(ctx,
+6 -2
store/db.go
··· 13 13 } 14 14 15 15 func Open(path string) (*DB, error) { 16 - db, err := sql.Open("sqlite3", path+"?_foreign_keys=on") 16 + db, err := sql.Open("sqlite3", path+"?_foreign_keys=on&_journal_mode=WAL&_busy_timeout=5000") 17 17 if err != nil { 18 18 return nil, fmt.Errorf("open database: %w", err) 19 19 } 20 + 21 + // SQLite works best with single writer for WAL mode 22 + db.SetMaxOpenConns(1) 23 + db.SetMaxIdleConns(1) 20 24 21 25 if err := db.Ping(); err != nil { 22 26 return nil, fmt.Errorf("ping database: %w", err) ··· 90 94 ); 91 95 92 96 CREATE INDEX IF NOT EXISTS idx_configs_user_id ON configs(user_id); 93 - CREATE INDEX IF NOT EXISTS idx_configs_next_run ON configs(next_run); 97 + CREATE INDEX IF NOT EXISTS idx_configs_active_next_run ON configs(next_run) WHERE next_run IS NOT NULL; 94 98 CREATE INDEX IF NOT EXISTS idx_feeds_config_id ON feeds(config_id); 95 99 CREATE INDEX IF NOT EXISTS idx_seen_items_feed_id ON seen_items(feed_id); 96 100 CREATE INDEX IF NOT EXISTS idx_logs_config_id ON logs(config_id);
+27
store/feeds.go
··· 44 44 }, nil 45 45 } 46 46 47 + func (db *DB) CreateFeedTx(ctx context.Context, tx *sql.Tx, configID int64, url, name string) (*Feed, error) { 48 + var nameVal sql.NullString 49 + if name != "" { 50 + nameVal = sql.NullString{String: name, Valid: true} 51 + } 52 + 53 + result, err := tx.ExecContext(ctx, 54 + `INSERT INTO feeds (config_id, url, name) VALUES (?, ?, ?)`, 55 + configID, url, nameVal, 56 + ) 57 + if err != nil { 58 + return nil, fmt.Errorf("insert feed: %w", err) 59 + } 60 + 61 + id, err := result.LastInsertId() 62 + if err != nil { 63 + return nil, fmt.Errorf("get last insert id: %w", err) 64 + } 65 + 66 + return &Feed{ 67 + ID: id, 68 + ConfigID: configID, 69 + URL: url, 70 + Name: nameVal, 71 + }, nil 72 + } 73 + 47 74 func (db *DB) GetFeedsByConfig(ctx context.Context, configID int64) ([]*Feed, error) { 48 75 rows, err := db.QueryContext(ctx, 49 76 `SELECT id, config_id, url, name, last_fetched, etag, last_modified
+20
store/items.go
··· 52 52 return true, nil 53 53 } 54 54 55 + func (db *DB) MarkItemSeenTx(ctx context.Context, tx *sql.Tx, feedID int64, guid, title, link string) error { 56 + var titleVal, linkVal sql.NullString 57 + if title != "" { 58 + titleVal = sql.NullString{String: title, Valid: true} 59 + } 60 + if link != "" { 61 + linkVal = sql.NullString{String: link, Valid: true} 62 + } 63 + 64 + _, err := tx.ExecContext(ctx, 65 + `INSERT INTO seen_items (feed_id, guid, title, link) VALUES (?, ?, ?, ?) 66 + ON CONFLICT(feed_id, guid) DO UPDATE SET title = excluded.title, link = excluded.link`, 67 + feedID, guid, titleVal, linkVal, 68 + ) 69 + if err != nil { 70 + return fmt.Errorf("mark item seen: %w", err) 71 + } 72 + return nil 73 + } 74 + 55 75 func (db *DB) GetSeenItems(ctx context.Context, feedID int64, limit int) ([]*SeenItem, error) { 56 76 rows, err := db.QueryContext(ctx, 57 77 `SELECT id, feed_id, guid, title, link, seen_at