job monitoring

Changed files
+154 -40
internal
+51
internal/api/handlers.go
··· 15 15 "time" 16 16 17 17 "github.com/atscan/atscanner/internal/log" 18 + "github.com/atscan/atscanner/internal/monitor" 18 19 "github.com/atscan/atscanner/internal/plc" 19 20 "github.com/atscan/atscanner/internal/storage" 20 21 "github.com/gorilla/mux" ··· 1260 1261 1261 1262 func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { 1262 1263 newResponse(w).json(map[string]string{"status": "ok"}) 1264 + } 1265 + 1266 + func (s *Server) handleGetJobStatus(w http.ResponseWriter, r *http.Request) { 1267 + resp := newResponse(w) 1268 + tracker := monitor.GetTracker() 1269 + 1270 + jobs := tracker.GetAllJobs() 1271 + 1272 + result := make(map[string]interface{}) 1273 + for name, job := range jobs { 1274 + jobData := map[string]interface{}{ 1275 + "name": job.Name, 1276 + "status": job.Status, 1277 + "run_count": job.RunCount, 1278 + "success_count": job.SuccessCount, 1279 + "error_count": job.ErrorCount, 1280 + } 1281 + 1282 + if !job.LastRun.IsZero() { 1283 + jobData["last_run"] = job.LastRun 1284 + jobData["last_duration"] = job.Duration.String() 1285 + } 1286 + 1287 + if !job.NextRun.IsZero() { 1288 + jobData["next_run"] = job.NextRun 1289 + jobData["next_run_in"] = time.Until(job.NextRun).Round(time.Second).String() 1290 + } 1291 + 1292 + if job.Status == "running" { 1293 + jobData["running_for"] = job.Duration.Round(time.Second).String() 1294 + 1295 + if job.Progress != nil { 1296 + jobData["progress"] = job.Progress 1297 + } 1298 + 1299 + // Add worker status 1300 + workers := tracker.GetWorkers(name) 1301 + if len(workers) > 0 { 1302 + jobData["workers"] = workers 1303 + } 1304 + } 1305 + 1306 + if job.Error != "" { 1307 + jobData["error"] = job.Error 1308 + } 1309 + 1310 + result[name] = jobData 1311 + } 1312 + 1313 + resp.json(result) 1263 1314 } 1264 1315 1265 1316 // ===== UTILITY FUNCTIONS =====
+3
internal/api/server.go
··· 89 89 // Metrics routes 90 90 api.HandleFunc("/metrics/plc", s.handleGetPLCMetrics).Methods("GET") 91 91 92 + // Job status endpoint 93 + api.HandleFunc("/jobs", s.handleGetJobStatus).Methods("GET") 94 + 92 95 // Health check 93 96 s.router.HandleFunc("/health", s.handleHealth).Methods("GET") 94 97 }
+25 -26
internal/monitor/tracker.go
··· 1 - package monitor 2 1 package monitor 3 2 4 3 import ( ··· 21 20 } 22 21 23 22 type Progress struct { 24 - Current int `json:"current"` 25 - Total int `json:"total"` 23 + Current int `json:"current"` 24 + Total int `json:"total"` 26 25 Percent float64 `json:"percent"` 27 - Message string `json:"message,omitempty"` 26 + Message string `json:"message,omitempty"` 28 27 } 29 28 30 29 type WorkerStatus struct { 31 - ID int `json:"id"` 32 - Status string `json:"status"` // "idle", "working" 33 - CurrentTask string `json:"current_task,omitempty"` 34 - StartedAt time.Time `json:"started_at,omitempty"` 30 + ID int `json:"id"` 31 + Status string `json:"status"` // "idle", "working" 32 + CurrentTask string `json:"current_task,omitempty"` 33 + StartedAt time.Time `json:"started_at,omitempty"` 35 34 Duration time.Duration `json:"duration,omitempty"` 36 35 } 37 36 ··· 58 57 func (t *Tracker) RegisterJob(name string) { 59 58 t.mu.Lock() 60 59 defer t.mu.Unlock() 61 - 60 + 62 61 t.jobs[name] = &JobStatus{ 63 62 Name: name, 64 63 Status: "idle", ··· 68 67 func (t *Tracker) StartJob(name string) { 69 68 t.mu.Lock() 70 69 defer t.mu.Unlock() 71 - 70 + 72 71 if job, exists := t.jobs[name]; exists { 73 72 job.Status = "running" 74 73 job.StartTime = time.Now() ··· 80 79 func (t *Tracker) CompleteJob(name string, err error) { 81 80 t.mu.Lock() 82 81 defer t.mu.Unlock() 83 - 82 + 84 83 if job, exists := t.jobs[name]; exists { 85 84 job.LastRun = time.Now() 86 85 job.Duration = time.Since(job.StartTime) 87 - 86 + 88 87 if err != nil { 89 88 job.Status = "error" 90 89 job.Error = err.Error() ··· 93 92 job.Status = "completed" 94 93 job.SuccessCount++ 95 94 } 96 - 95 + 97 96 job.Progress = nil // Clear progress 98 97 } 99 98 } ··· 101 100 func (t *Tracker) UpdateProgress(name string, current, total int, message string) { 102 101 t.mu.Lock() 103 102 defer t.mu.Unlock() 104 - 103 + 105 104 if job, exists := t.jobs[name]; exists { 106 105 var percent float64 107 106 if total > 0 { 108 107 percent = float64(current) / float64(total) * 100 109 108 } 110 - 109 + 111 110 job.Progress = &Progress{ 112 111 Current: current, 113 112 Total: total, ··· 120 119 func (t *Tracker) SetNextRun(name string, nextRun time.Time) { 121 120 t.mu.Lock() 122 121 defer t.mu.Unlock() 123 - 122 + 124 123 if job, exists := t.jobs[name]; exists { 125 124 job.NextRun = nextRun 126 125 } ··· 129 128 func (t *Tracker) GetJobStatus(name string) *JobStatus { 130 129 t.mu.RLock() 131 130 defer t.mu.RUnlock() 132 - 131 + 133 132 if job, exists := t.jobs[name]; exists { 134 133 // Create a copy 135 134 jobCopy := *job ··· 137 136 progressCopy := *job.Progress 138 137 jobCopy.Progress = &progressCopy 139 138 } 140 - 139 + 141 140 // Calculate duration for running jobs 142 141 if jobCopy.Status == "running" { 143 142 jobCopy.Duration = time.Since(jobCopy.StartTime) 144 143 } 145 - 144 + 146 145 return &jobCopy 147 146 } 148 147 return nil ··· 151 150 func (t *Tracker) GetAllJobs() map[string]*JobStatus { 152 151 t.mu.RLock() 153 152 defer t.mu.RUnlock() 154 - 153 + 155 154 result := make(map[string]*JobStatus) 156 155 for name, job := range t.jobs { 157 156 jobCopy := *job ··· 159 158 progressCopy := *job.Progress 160 159 jobCopy.Progress = &progressCopy 161 160 } 162 - 161 + 163 162 // Calculate duration for running jobs 164 163 if jobCopy.Status == "running" { 165 164 jobCopy.Duration = time.Since(jobCopy.StartTime) 166 165 } 167 - 166 + 168 167 result[name] = &jobCopy 169 168 } 170 169 return result ··· 174 173 func (t *Tracker) InitWorkers(jobName string, count int) { 175 174 t.mu.Lock() 176 175 defer t.mu.Unlock() 177 - 176 + 178 177 workers := make([]WorkerStatus, count) 179 178 for i := 0; i < count; i++ { 180 179 workers[i] = WorkerStatus{ ··· 188 187 func (t *Tracker) StartWorker(jobName string, workerID int, task string) { 189 188 t.mu.Lock() 190 189 defer t.mu.Unlock() 191 - 190 + 192 191 if workers, exists := t.workers[jobName]; exists && workerID > 0 && workerID <= len(workers) { 193 192 workers[workerID-1].Status = "working" 194 193 workers[workerID-1].CurrentTask = task ··· 199 198 func (t *Tracker) CompleteWorker(jobName string, workerID int) { 200 199 t.mu.Lock() 201 200 defer t.mu.Unlock() 202 - 201 + 203 202 if workers, exists := t.workers[jobName]; exists && workerID > 0 && workerID <= len(workers) { 204 203 workers[workerID-1].Status = "idle" 205 204 workers[workerID-1].CurrentTask = "" ··· 211 210 func (t *Tracker) GetWorkers(jobName string) []WorkerStatus { 212 211 t.mu.RLock() 213 212 defer t.mu.RUnlock() 214 - 213 + 215 214 if workers, exists := t.workers[jobName]; exists { 216 215 // Create a copy with calculated durations 217 216 result := make([]WorkerStatus, len(workers))
+49 -12
internal/pds/scanner.go
··· 5 5 "fmt" 6 6 "math/rand" 7 7 "sync" 8 + "sync/atomic" 8 9 "time" 9 10 10 11 "github.com/acarl005/stripansi" 11 12 "github.com/atscan/atscanner/internal/config" 12 13 "github.com/atscan/atscanner/internal/ipinfo" 13 14 "github.com/atscan/atscanner/internal/log" 15 + "github.com/atscan/atscanner/internal/monitor" 14 16 "github.com/atscan/atscanner/internal/storage" 15 17 ) 16 18 ··· 34 36 startTime := time.Now() 35 37 log.Info("Starting PDS availability scan...") 36 38 37 - // Get only PDS endpoints that need checking (stale or never checked) 39 + // Get only PDS endpoints that need checking 38 40 servers, err := s.db.GetEndpoints(ctx, &storage.EndpointFilter{ 39 41 Type: "pds", 40 - OnlyStale: true, // NEW: Only get stale endpoints 41 - RecheckInterval: s.config.RecheckInterval, // NEW: Use recheck interval from config 42 + OnlyStale: true, 43 + RecheckInterval: s.config.RecheckInterval, 42 44 }) 43 45 if err != nil { 44 46 return err ··· 46 48 47 49 if len(servers) == 0 { 48 50 log.Info("No endpoints need scanning at this time") 51 + monitor.GetTracker().UpdateProgress("pds_scan", 0, 0, "No endpoints need scanning") 49 52 return nil 50 53 } 51 54 52 - log.Info("Found %d endpoints that need scanning (not checked in last %s)", len(servers), s.config.RecheckInterval) 55 + log.Info("Found %d endpoints that need scanning", len(servers)) 56 + monitor.GetTracker().UpdateProgress("pds_scan", 0, len(servers), "Preparing to scan") 53 57 54 - // Shuffle the servers slice in place 58 + // Shuffle servers 55 59 if len(servers) > 0 { 56 60 r := rand.New(rand.NewSource(time.Now().UnixNano())) 57 61 r.Shuffle(len(servers), func(i, j int) { 58 62 servers[i], servers[j] = servers[j], servers[i] 59 63 }) 60 - log.Info("Randomized scan order for %d PDS servers...", len(servers)) 61 64 } 62 65 63 - // Worker pool 64 - jobs := make(chan *storage.Endpoint, len(servers)) 66 + // Initialize workers in tracker 67 + monitor.GetTracker().InitWorkers("pds_scan", s.config.Workers) 68 + 69 + // Worker pool with progress tracking 70 + jobs := make(chan *workerJob, len(servers)) 65 71 var wg sync.WaitGroup 72 + var completed int32 66 73 67 74 for i := 0; i < s.config.Workers; i++ { 68 75 wg.Add(1) 69 - go func() { 76 + workerID := i + 1 77 + go func(id int) { 70 78 defer wg.Done() 71 - s.worker(ctx, jobs) 72 - }() 79 + s.workerWithProgress(ctx, id, jobs, &completed, len(servers)) 80 + }(workerID) 73 81 } 74 82 75 83 // Send jobs 76 84 for _, server := range servers { 77 - jobs <- server 85 + jobs <- &workerJob{endpoint: server} 78 86 } 79 87 close(jobs) 80 88 ··· 82 90 wg.Wait() 83 91 84 92 log.Info("PDS scan completed in %v", time.Since(startTime)) 93 + monitor.GetTracker().UpdateProgress("pds_scan", len(servers), len(servers), "Completed") 85 94 86 95 return nil 96 + } 97 + 98 + type workerJob struct { 99 + endpoint *storage.Endpoint 100 + } 101 + 102 + func (s *Scanner) workerWithProgress(ctx context.Context, workerID int, jobs <-chan *workerJob, completed *int32, total int) { 103 + for job := range jobs { 104 + select { 105 + case <-ctx.Done(): 106 + return 107 + default: 108 + // Update worker status 109 + monitor.GetTracker().StartWorker("pds_scan", workerID, job.endpoint.Endpoint) 110 + 111 + // Scan endpoint 112 + s.scanAndSaveEndpoint(ctx, job.endpoint) 113 + 114 + // Update progress 115 + atomic.AddInt32(completed, 1) 116 + current := atomic.LoadInt32(completed) 117 + monitor.GetTracker().UpdateProgress("pds_scan", int(current), total, 118 + fmt.Sprintf("Scanned %d/%d endpoints", current, total)) 119 + 120 + // Mark worker as idle 121 + monitor.GetTracker().CompleteWorker("pds_scan", workerID) 122 + } 123 + } 87 124 } 88 125 89 126 func (s *Scanner) worker(ctx context.Context, jobs <-chan *storage.Endpoint) {
+26 -2
internal/worker/scheduler.go
··· 6 6 "time" 7 7 8 8 "github.com/atscan/atscanner/internal/log" 9 + "github.com/atscan/atscanner/internal/monitor" 9 10 ) 10 11 11 12 type Job struct { ··· 34 35 Interval: interval, 35 36 Fn: fn, 36 37 }) 38 + 39 + // Register job with tracker 40 + monitor.GetTracker().RegisterJob(name) 37 41 } 38 42 39 43 func (s *Scheduler) Start(ctx context.Context) { ··· 52 56 53 57 // Run immediately 54 58 log.Info("Starting job: %s", job.Name) 55 - job.Fn() 59 + s.executeJob(job) 56 60 57 61 for { 62 + // Set next run time 63 + monitor.GetTracker().SetNextRun(job.Name, time.Now().Add(job.Interval)) 64 + 58 65 select { 59 66 case <-ctx.Done(): 60 67 log.Info("Stopping job: %s", job.Name) 61 68 return 62 69 case <-ticker.C: 63 70 log.Info("Running job: %s", job.Name) 64 - job.Fn() 71 + s.executeJob(job) 65 72 } 66 73 } 67 74 } 75 + 76 + func (s *Scheduler) executeJob(job *Job) { 77 + monitor.GetTracker().StartJob(job.Name) 78 + 79 + // Run job and capture any panic 80 + func() { 81 + defer func() { 82 + if r := recover(); r != nil { 83 + log.Error("Job %s panicked: %v", job.Name, r) 84 + monitor.GetTracker().CompleteJob(job.Name, nil) 85 + } 86 + }() 87 + 88 + job.Fn() 89 + monitor.GetTracker().CompleteJob(job.Name, nil) 90 + }() 91 + }