at main 5.2 kB view raw
1package monitor 2 3import ( 4 "sync" 5 "time" 6) 7 8type JobStatus struct { 9 Name string `json:"name"` 10 Status string `json:"status"` // "idle", "running", "completed", "error" 11 StartTime time.Time `json:"start_time,omitempty"` 12 LastRun time.Time `json:"last_run,omitempty"` 13 Duration time.Duration `json:"duration,omitempty"` 14 Progress *Progress `json:"progress,omitempty"` 15 Error string `json:"error,omitempty"` 16 NextRun time.Time `json:"next_run,omitempty"` 17 RunCount int64 `json:"run_count"` 18 SuccessCount int64 `json:"success_count"` 19 ErrorCount int64 `json:"error_count"` 20} 21 22type Progress struct { 23 Current int `json:"current"` 24 Total int `json:"total"` 25 Percent float64 `json:"percent"` 26 Message string `json:"message,omitempty"` 27} 28 29type WorkerStatus struct { 30 ID int `json:"id"` 31 Status string `json:"status"` // "idle", "working" 32 CurrentTask string `json:"current_task,omitempty"` 33 StartedAt time.Time `json:"started_at,omitempty"` 34 Duration time.Duration `json:"duration,omitempty"` 35} 36 37type Tracker struct { 38 mu sync.RWMutex 39 jobs map[string]*JobStatus 40 workers map[string][]WorkerStatus // key is job name 41} 42 43var globalTracker *Tracker 44 45func init() { 46 globalTracker = &Tracker{ 47 jobs: make(map[string]*JobStatus), 48 workers: make(map[string][]WorkerStatus), 49 } 50} 51 52func GetTracker() *Tracker { 53 return globalTracker 54} 55 56// Job status methods 57func (t *Tracker) RegisterJob(name string) { 58 t.mu.Lock() 59 defer t.mu.Unlock() 60 61 t.jobs[name] = &JobStatus{ 62 Name: name, 63 Status: "idle", 64 } 65} 66 67func (t *Tracker) StartJob(name string) { 68 t.mu.Lock() 69 defer t.mu.Unlock() 70 71 if job, exists := t.jobs[name]; exists { 72 job.Status = "running" 73 job.StartTime = time.Now() 74 job.Error = "" 75 job.RunCount++ 76 } 77} 78 79func (t *Tracker) CompleteJob(name string, err error) { 80 t.mu.Lock() 81 defer t.mu.Unlock() 82 83 if job, exists := t.jobs[name]; exists { 84 job.LastRun = time.Now() 85 job.Duration = time.Since(job.StartTime) 86 87 if err != nil { 88 job.Status = "error" 89 job.Error = err.Error() 90 job.ErrorCount++ 91 } else { 92 job.Status = "completed" 93 job.SuccessCount++ 94 } 95 96 job.Progress = nil // Clear progress 97 } 98} 99 100func (t *Tracker) UpdateProgress(name string, current, total int, message string) { 101 t.mu.Lock() 102 defer t.mu.Unlock() 103 104 if job, exists := t.jobs[name]; exists { 105 var percent float64 106 if total > 0 { 107 percent = float64(current) / float64(total) * 100 108 } 109 110 job.Progress = &Progress{ 111 Current: current, 112 Total: total, 113 Percent: percent, 114 Message: message, 115 } 116 } 117} 118 119func (t *Tracker) SetNextRun(name string, nextRun time.Time) { 120 t.mu.Lock() 121 defer t.mu.Unlock() 122 123 if job, exists := t.jobs[name]; exists { 124 job.NextRun = nextRun 125 } 126} 127 128func (t *Tracker) GetJobStatus(name string) *JobStatus { 129 t.mu.RLock() 130 defer t.mu.RUnlock() 131 132 if job, exists := t.jobs[name]; exists { 133 // Create a copy 134 jobCopy := *job 135 if job.Progress != nil { 136 progressCopy := *job.Progress 137 jobCopy.Progress = &progressCopy 138 } 139 140 // Calculate duration for running jobs 141 if jobCopy.Status == "running" { 142 jobCopy.Duration = time.Since(jobCopy.StartTime) 143 } 144 145 return &jobCopy 146 } 147 return nil 148} 149 150func (t *Tracker) GetAllJobs() map[string]*JobStatus { 151 t.mu.RLock() 152 defer t.mu.RUnlock() 153 154 result := make(map[string]*JobStatus) 155 for name, job := range t.jobs { 156 jobCopy := *job 157 if job.Progress != nil { 158 progressCopy := *job.Progress 159 jobCopy.Progress = &progressCopy 160 } 161 162 // Calculate duration for running jobs 163 if jobCopy.Status == "running" { 164 jobCopy.Duration = time.Since(jobCopy.StartTime) 165 } 166 167 result[name] = &jobCopy 168 } 169 return result 170} 171 172// Worker status methods 173func (t *Tracker) InitWorkers(jobName string, count int) { 174 t.mu.Lock() 175 defer t.mu.Unlock() 176 177 workers := make([]WorkerStatus, count) 178 for i := 0; i < count; i++ { 179 workers[i] = WorkerStatus{ 180 ID: i + 1, 181 Status: "idle", 182 } 183 } 184 t.workers[jobName] = workers 185} 186 187func (t *Tracker) StartWorker(jobName string, workerID int, task string) { 188 t.mu.Lock() 189 defer t.mu.Unlock() 190 191 if workers, exists := t.workers[jobName]; exists && workerID > 0 && workerID <= len(workers) { 192 workers[workerID-1].Status = "working" 193 workers[workerID-1].CurrentTask = task 194 workers[workerID-1].StartedAt = time.Now() 195 } 196} 197 198func (t *Tracker) CompleteWorker(jobName string, workerID int) { 199 t.mu.Lock() 200 defer t.mu.Unlock() 201 202 if workers, exists := t.workers[jobName]; exists && workerID > 0 && workerID <= len(workers) { 203 workers[workerID-1].Status = "idle" 204 workers[workerID-1].CurrentTask = "" 205 workers[workerID-1].Duration = time.Since(workers[workerID-1].StartedAt) 206 workers[workerID-1].StartedAt = time.Time{} 207 } 208} 209 210func (t *Tracker) GetWorkers(jobName string) []WorkerStatus { 211 t.mu.RLock() 212 defer t.mu.RUnlock() 213 214 if workers, exists := t.workers[jobName]; exists { 215 // Create a copy with calculated durations 216 result := make([]WorkerStatus, len(workers)) 217 for i, w := range workers { 218 result[i] = w 219 if w.Status == "working" && !w.StartedAt.IsZero() { 220 result[i].Duration = time.Since(w.StartedAt) 221 } 222 } 223 return result 224 } 225 return nil 226}