+2
-2
config.yaml
+2
-2
config.yaml
+227
internal/monitor/tracker.go
+227
internal/monitor/tracker.go
···
···
1
+
package monitor
2
+
package monitor
3
+
4
+
import (
5
+
"sync"
6
+
"time"
7
+
)
8
+
9
+
type JobStatus struct {
10
+
Name string `json:"name"`
11
+
Status string `json:"status"` // "idle", "running", "completed", "error"
12
+
StartTime time.Time `json:"start_time,omitempty"`
13
+
LastRun time.Time `json:"last_run,omitempty"`
14
+
Duration time.Duration `json:"duration,omitempty"`
15
+
Progress *Progress `json:"progress,omitempty"`
16
+
Error string `json:"error,omitempty"`
17
+
NextRun time.Time `json:"next_run,omitempty"`
18
+
RunCount int64 `json:"run_count"`
19
+
SuccessCount int64 `json:"success_count"`
20
+
ErrorCount int64 `json:"error_count"`
21
+
}
22
+
23
+
type Progress struct {
24
+
Current int `json:"current"`
25
+
Total int `json:"total"`
26
+
Percent float64 `json:"percent"`
27
+
Message string `json:"message,omitempty"`
28
+
}
29
+
30
+
type WorkerStatus struct {
31
+
ID int `json:"id"`
32
+
Status string `json:"status"` // "idle", "working"
33
+
CurrentTask string `json:"current_task,omitempty"`
34
+
StartedAt time.Time `json:"started_at,omitempty"`
35
+
Duration time.Duration `json:"duration,omitempty"`
36
+
}
37
+
38
+
type Tracker struct {
39
+
mu sync.RWMutex
40
+
jobs map[string]*JobStatus
41
+
workers map[string][]WorkerStatus // key is job name
42
+
}
43
+
44
+
var globalTracker *Tracker
45
+
46
+
func init() {
47
+
globalTracker = &Tracker{
48
+
jobs: make(map[string]*JobStatus),
49
+
workers: make(map[string][]WorkerStatus),
50
+
}
51
+
}
52
+
53
+
func GetTracker() *Tracker {
54
+
return globalTracker
55
+
}
56
+
57
+
// Job status methods
58
+
func (t *Tracker) RegisterJob(name string) {
59
+
t.mu.Lock()
60
+
defer t.mu.Unlock()
61
+
62
+
t.jobs[name] = &JobStatus{
63
+
Name: name,
64
+
Status: "idle",
65
+
}
66
+
}
67
+
68
+
func (t *Tracker) StartJob(name string) {
69
+
t.mu.Lock()
70
+
defer t.mu.Unlock()
71
+
72
+
if job, exists := t.jobs[name]; exists {
73
+
job.Status = "running"
74
+
job.StartTime = time.Now()
75
+
job.Error = ""
76
+
job.RunCount++
77
+
}
78
+
}
79
+
80
+
func (t *Tracker) CompleteJob(name string, err error) {
81
+
t.mu.Lock()
82
+
defer t.mu.Unlock()
83
+
84
+
if job, exists := t.jobs[name]; exists {
85
+
job.LastRun = time.Now()
86
+
job.Duration = time.Since(job.StartTime)
87
+
88
+
if err != nil {
89
+
job.Status = "error"
90
+
job.Error = err.Error()
91
+
job.ErrorCount++
92
+
} else {
93
+
job.Status = "completed"
94
+
job.SuccessCount++
95
+
}
96
+
97
+
job.Progress = nil // Clear progress
98
+
}
99
+
}
100
+
101
+
func (t *Tracker) UpdateProgress(name string, current, total int, message string) {
102
+
t.mu.Lock()
103
+
defer t.mu.Unlock()
104
+
105
+
if job, exists := t.jobs[name]; exists {
106
+
var percent float64
107
+
if total > 0 {
108
+
percent = float64(current) / float64(total) * 100
109
+
}
110
+
111
+
job.Progress = &Progress{
112
+
Current: current,
113
+
Total: total,
114
+
Percent: percent,
115
+
Message: message,
116
+
}
117
+
}
118
+
}
119
+
120
+
func (t *Tracker) SetNextRun(name string, nextRun time.Time) {
121
+
t.mu.Lock()
122
+
defer t.mu.Unlock()
123
+
124
+
if job, exists := t.jobs[name]; exists {
125
+
job.NextRun = nextRun
126
+
}
127
+
}
128
+
129
+
func (t *Tracker) GetJobStatus(name string) *JobStatus {
130
+
t.mu.RLock()
131
+
defer t.mu.RUnlock()
132
+
133
+
if job, exists := t.jobs[name]; exists {
134
+
// Create a copy
135
+
jobCopy := *job
136
+
if job.Progress != nil {
137
+
progressCopy := *job.Progress
138
+
jobCopy.Progress = &progressCopy
139
+
}
140
+
141
+
// Calculate duration for running jobs
142
+
if jobCopy.Status == "running" {
143
+
jobCopy.Duration = time.Since(jobCopy.StartTime)
144
+
}
145
+
146
+
return &jobCopy
147
+
}
148
+
return nil
149
+
}
150
+
151
+
func (t *Tracker) GetAllJobs() map[string]*JobStatus {
152
+
t.mu.RLock()
153
+
defer t.mu.RUnlock()
154
+
155
+
result := make(map[string]*JobStatus)
156
+
for name, job := range t.jobs {
157
+
jobCopy := *job
158
+
if job.Progress != nil {
159
+
progressCopy := *job.Progress
160
+
jobCopy.Progress = &progressCopy
161
+
}
162
+
163
+
// Calculate duration for running jobs
164
+
if jobCopy.Status == "running" {
165
+
jobCopy.Duration = time.Since(jobCopy.StartTime)
166
+
}
167
+
168
+
result[name] = &jobCopy
169
+
}
170
+
return result
171
+
}
172
+
173
+
// Worker status methods
174
+
func (t *Tracker) InitWorkers(jobName string, count int) {
175
+
t.mu.Lock()
176
+
defer t.mu.Unlock()
177
+
178
+
workers := make([]WorkerStatus, count)
179
+
for i := 0; i < count; i++ {
180
+
workers[i] = WorkerStatus{
181
+
ID: i + 1,
182
+
Status: "idle",
183
+
}
184
+
}
185
+
t.workers[jobName] = workers
186
+
}
187
+
188
+
func (t *Tracker) StartWorker(jobName string, workerID int, task string) {
189
+
t.mu.Lock()
190
+
defer t.mu.Unlock()
191
+
192
+
if workers, exists := t.workers[jobName]; exists && workerID > 0 && workerID <= len(workers) {
193
+
workers[workerID-1].Status = "working"
194
+
workers[workerID-1].CurrentTask = task
195
+
workers[workerID-1].StartedAt = time.Now()
196
+
}
197
+
}
198
+
199
+
func (t *Tracker) CompleteWorker(jobName string, workerID int) {
200
+
t.mu.Lock()
201
+
defer t.mu.Unlock()
202
+
203
+
if workers, exists := t.workers[jobName]; exists && workerID > 0 && workerID <= len(workers) {
204
+
workers[workerID-1].Status = "idle"
205
+
workers[workerID-1].CurrentTask = ""
206
+
workers[workerID-1].Duration = time.Since(workers[workerID-1].StartedAt)
207
+
workers[workerID-1].StartedAt = time.Time{}
208
+
}
209
+
}
210
+
211
+
func (t *Tracker) GetWorkers(jobName string) []WorkerStatus {
212
+
t.mu.RLock()
213
+
defer t.mu.RUnlock()
214
+
215
+
if workers, exists := t.workers[jobName]; exists {
216
+
// Create a copy with calculated durations
217
+
result := make([]WorkerStatus, len(workers))
218
+
for i, w := range workers {
219
+
result[i] = w
220
+
if w.Status == "working" && !w.StartedAt.IsZero() {
221
+
result[i].Duration = time.Since(w.StartedAt)
222
+
}
223
+
}
224
+
return result
225
+
}
226
+
return nil
227
+
}
+12
-8
internal/pds/scanner.go
+12
-8
internal/pds/scanner.go
···
34
startTime := time.Now()
35
log.Info("Starting PDS availability scan...")
36
37
-
// Get only PDS endpoints
38
servers, err := s.db.GetEndpoints(ctx, &storage.EndpointFilter{
39
-
Type: "pds",
40
})
41
if err != nil {
42
return err
43
}
44
45
-
// 2. ADD THIS BLOCK TO SHUFFLE THE LIST
46
if len(servers) > 0 {
47
-
// Create a new random source to avoid using the global one
48
r := rand.New(rand.NewSource(time.Now().UnixNano()))
49
-
// Shuffle the servers slice in place
50
r.Shuffle(len(servers), func(i, j int) {
51
servers[i], servers[j] = servers[j], servers[i]
52
})
53
log.Info("Randomized scan order for %d PDS servers...", len(servers))
54
-
} else {
55
-
log.Info("Scanning 0 PDS servers...")
56
-
return nil // No need to continue if there are no servers
57
}
58
59
// Worker pool
···
34
startTime := time.Now()
35
log.Info("Starting PDS availability scan...")
36
37
+
// Get only PDS endpoints that need checking (stale or never checked)
38
servers, err := s.db.GetEndpoints(ctx, &storage.EndpointFilter{
39
+
Type: "pds",
40
+
OnlyStale: true, // NEW: Only get stale endpoints
41
+
RecheckInterval: s.config.RecheckInterval, // NEW: Use recheck interval from config
42
})
43
if err != nil {
44
return err
45
}
46
47
+
if len(servers) == 0 {
48
+
log.Info("No endpoints need scanning at this time")
49
+
return nil
50
+
}
51
+
52
+
log.Info("Found %d endpoints that need scanning (not checked in last %s)", len(servers), s.config.RecheckInterval)
53
+
54
+
// Shuffle the servers slice in place
55
if len(servers) > 0 {
56
r := rand.New(rand.NewSource(time.Now().UnixNano()))
57
r.Shuffle(len(servers), func(i, j int) {
58
servers[i], servers[j] = servers[j], servers[i]
59
})
60
log.Info("Randomized scan order for %d PDS servers...", len(servers))
61
}
62
63
// Worker pool
+5
internal/storage/postgres.go
+5
internal/storage/postgres.go
···
302
args = append(args, statusInt)
303
argIdx++
304
}
305
+
306
+
// NEW: Filter for stale endpoints only
307
+
if filter.OnlyStale && filter.RecheckInterval > 0 {
308
+
query += fmt.Sprintf(" AND (last_checked IS NULL OR last_checked < NOW() - INTERVAL '%d seconds')", int(filter.RecheckInterval.Seconds()))
309
+
}
310
}
311
312
query += " ORDER BY id DESC"
+7
-5
internal/storage/types.go
+7
-5
internal/storage/types.go
···
72
73
// EndpointFilter for querying endpoints
74
type EndpointFilter struct {
75
+
Type string // "pds", "labeler", etc.
76
+
Status string
77
+
MinUserCount int64
78
+
OnlyStale bool // NEW: Only return endpoints that need re-checking
79
+
RecheckInterval time.Duration // NEW: How long before an endpoint is considered stale
80
+
Limit int
81
+
Offset int
82
}
83
84
// EndpointStats contains aggregate statistics about endpoints