-92
models/actions/task.go
-92
models/actions/task.go
···
17
17
"forgejo.org/modules/timeutil"
18
18
"forgejo.org/modules/util"
19
19
20
-
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
21
20
lru "github.com/hashicorp/golang-lru/v2"
22
21
"github.com/nektos/act/pkg/jobparser"
23
-
"google.golang.org/protobuf/types/known/timestamppb"
24
22
"xorm.io/builder"
25
23
)
26
24
···
337
335
return err
338
336
}
339
337
340
-
// UpdateTaskByState updates the task by the state.
341
-
// It will always update the task if the state is not final, even there is no change.
342
-
// So it will update ActionTask.Updated to avoid the task being judged as a zombie task.
343
-
func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.TaskState) (*ActionTask, error) {
344
-
stepStates := map[int64]*runnerv1.StepState{}
345
-
for _, v := range state.Steps {
346
-
stepStates[v.Id] = v
347
-
}
348
-
349
-
ctx, commiter, err := db.TxContext(ctx)
350
-
if err != nil {
351
-
return nil, err
352
-
}
353
-
defer commiter.Close()
354
-
355
-
e := db.GetEngine(ctx)
356
-
357
-
task := &ActionTask{}
358
-
if has, err := e.ID(state.Id).Get(task); err != nil {
359
-
return nil, err
360
-
} else if !has {
361
-
return nil, util.ErrNotExist
362
-
} else if runnerID != task.RunnerID {
363
-
return nil, fmt.Errorf("invalid runner for task")
364
-
}
365
-
366
-
if task.Status.IsDone() {
367
-
// the state is final, do nothing
368
-
return task, nil
369
-
}
370
-
371
-
// state.Result is not unspecified means the task is finished
372
-
if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
373
-
task.Status = Status(state.Result)
374
-
task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
375
-
if err := UpdateTask(ctx, task, "status", "stopped"); err != nil {
376
-
return nil, err
377
-
}
378
-
if _, err := UpdateRunJob(ctx, &ActionRunJob{
379
-
ID: task.JobID,
380
-
Status: task.Status,
381
-
Stopped: task.Stopped,
382
-
}, nil); err != nil {
383
-
return nil, err
384
-
}
385
-
} else {
386
-
// Force update ActionTask.Updated to avoid the task being judged as a zombie task
387
-
task.Updated = timeutil.TimeStampNow()
388
-
if err := UpdateTask(ctx, task, "updated"); err != nil {
389
-
return nil, err
390
-
}
391
-
}
392
-
393
-
if err := task.LoadAttributes(ctx); err != nil {
394
-
return nil, err
395
-
}
396
-
397
-
for _, step := range task.Steps {
398
-
var result runnerv1.Result
399
-
if v, ok := stepStates[step.Index]; ok {
400
-
result = v.Result
401
-
step.LogIndex = v.LogIndex
402
-
step.LogLength = v.LogLength
403
-
step.Started = convertTimestamp(v.StartedAt)
404
-
step.Stopped = convertTimestamp(v.StoppedAt)
405
-
}
406
-
if result != runnerv1.Result_RESULT_UNSPECIFIED {
407
-
step.Status = Status(result)
408
-
} else if step.Started != 0 {
409
-
step.Status = StatusRunning
410
-
}
411
-
if _, err := e.ID(step.ID).Update(step); err != nil {
412
-
return nil, err
413
-
}
414
-
}
415
-
416
-
if err := commiter.Commit(); err != nil {
417
-
return nil, err
418
-
}
419
-
420
-
return task, nil
421
-
}
422
-
423
338
func FindOldTasksToExpire(ctx context.Context, olderThan timeutil.TimeStamp, limit int) ([]*ActionTask, error) {
424
339
e := db.GetEngine(ctx)
425
340
···
428
343
return tasks, e.Where("stopped > 0 AND stopped < ? AND log_expired = ?", olderThan, false).
429
344
Limit(limit).
430
345
Find(&tasks)
431
-
}
432
-
433
-
func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp {
434
-
if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 {
435
-
return timeutil.TimeStamp(0)
436
-
}
437
-
return timeutil.TimeStamp(timestamp.AsTime().Unix())
438
346
}
439
347
440
348
func logFileName(repoFullName string, taskID int64) string {
+1
-1
routers/api/actions/runner/runner.go
+1
-1
routers/api/actions/runner/runner.go
···
178
178
) (*connect.Response[runnerv1.UpdateTaskResponse], error) {
179
179
runner := GetRunner(ctx)
180
180
181
-
task, err := actions_model.UpdateTaskByState(ctx, runner.ID, req.Msg.State)
181
+
task, err := actions_service.UpdateTaskByState(ctx, runner.ID, req.Msg.State)
182
182
if err != nil {
183
183
return nil, connect.NewError(connect.CodeInternal, fmt.Errorf("update task: %w", err))
184
184
}
+91
services/actions/task.go
+91
services/actions/task.go
···
15
15
16
16
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
17
17
"google.golang.org/protobuf/types/known/structpb"
18
+
"google.golang.org/protobuf/types/known/timestamppb"
18
19
)
19
20
20
21
func PickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv1.Task, bool, error) {
···
158
159
159
160
return nil
160
161
}
162
+
163
+
// UpdateTaskByState updates the task by the state.
164
+
// It will always update the task if the state is not final, even there is no change.
165
+
// So it will update ActionTask.Updated to avoid the task being judged as a zombie task.
166
+
func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.TaskState) (*actions_model.ActionTask, error) {
167
+
stepStates := map[int64]*runnerv1.StepState{}
168
+
for _, v := range state.Steps {
169
+
stepStates[v.Id] = v
170
+
}
171
+
172
+
ctx, commiter, err := db.TxContext(ctx)
173
+
if err != nil {
174
+
return nil, err
175
+
}
176
+
defer commiter.Close()
177
+
178
+
e := db.GetEngine(ctx)
179
+
180
+
task := &actions_model.ActionTask{}
181
+
if has, err := e.ID(state.Id).Get(task); err != nil {
182
+
return nil, err
183
+
} else if !has {
184
+
return nil, util.ErrNotExist
185
+
} else if runnerID != task.RunnerID {
186
+
return nil, fmt.Errorf("invalid runner for task")
187
+
}
188
+
189
+
if task.Status.IsDone() {
190
+
// the state is final, do nothing
191
+
return task, nil
192
+
}
193
+
194
+
// state.Result is not unspecified means the task is finished
195
+
if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
196
+
task.Status = actions_model.Status(state.Result)
197
+
task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
198
+
if err := actions_model.UpdateTask(ctx, task, "status", "stopped"); err != nil {
199
+
return nil, err
200
+
}
201
+
if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{
202
+
ID: task.JobID,
203
+
Status: task.Status,
204
+
Stopped: task.Stopped,
205
+
}, nil); err != nil {
206
+
return nil, err
207
+
}
208
+
} else {
209
+
// Force update ActionTask.Updated to avoid the task being judged as a zombie task
210
+
task.Updated = timeutil.TimeStampNow()
211
+
if err := actions_model.UpdateTask(ctx, task, "updated"); err != nil {
212
+
return nil, err
213
+
}
214
+
}
215
+
216
+
if err := task.LoadAttributes(ctx); err != nil {
217
+
return nil, err
218
+
}
219
+
220
+
for _, step := range task.Steps {
221
+
var result runnerv1.Result
222
+
if v, ok := stepStates[step.Index]; ok {
223
+
result = v.Result
224
+
step.LogIndex = v.LogIndex
225
+
step.LogLength = v.LogLength
226
+
step.Started = convertTimestamp(v.StartedAt)
227
+
step.Stopped = convertTimestamp(v.StoppedAt)
228
+
}
229
+
if result != runnerv1.Result_RESULT_UNSPECIFIED {
230
+
step.Status = actions_model.Status(result)
231
+
} else if step.Started != 0 {
232
+
step.Status = actions_model.StatusRunning
233
+
}
234
+
if _, err := e.ID(step.ID).Update(step); err != nil {
235
+
return nil, err
236
+
}
237
+
}
238
+
239
+
if err := commiter.Commit(); err != nil {
240
+
return nil, err
241
+
}
242
+
243
+
return task, nil
244
+
}
245
+
246
+
func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp {
247
+
if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 {
248
+
return timeutil.TimeStamp(0)
249
+
}
250
+
return timeutil.TimeStamp(timestamp.AsTime().Unix())
251
+
}