mirror of
https://codeberg.org/forgejo/forgejo.git
synced 2026-03-25 17:03:05 -04:00
This reverts commit d4951968f0, #10008.
When Forgejo cancels a job server-side, for example due to an additional push to an open PR, it immediately archives the logs from DBFS to disk due to the changes in #10008. Then, the runner recognizes that the job status is cancelled and it attempts to flush its pending logs to Forgejo, resulting in warnings being logged:
```
forgejo-runner.log:time="2026-02-23T01:32:11+01:00" level=warning msg="uploading final logs failed, but will be retried: already_exists: log file has been archived" task_id=51
forgejo-runner.log:time="2026-02-23T01:32:11+01:00" level=warning msg="uploading final logs failed, but will be retried: already_exists: log file has been archived" task_id=51
forgejo-runner.log:time="2026-02-23T01:32:11+01:00" level=warning msg="uploading final logs failed, but will be retried: already_exists: log file has been archived" task_id=51
forgejo-runner.log:time="2026-02-23T01:32:12+01:00" level=warning msg="uploading final logs failed, but will be retried: already_exists: log file has been archived" task_id=51
forgejo-runner.log:time="2026-02-23T01:32:13+01:00" level=warning msg="uploading final logs failed, but will be retried: already_exists: log file has been archived" task_id=51
forgejo-runner.log:time="2026-02-23T01:32:14+01:00" level=warning msg="uploading final logs failed, but will be retried: already_exists: log file has been archived" task_id=51
forgejo-runner.log:time="2026-02-23T01:32:16+01:00" level=info msg="runner: received shutdown signal"
forgejo-runner.log:time="2026-02-23T01:32:16+01:00" level=info msg="runner: shutdown initiated, waiting [runner].shutdown_timeout=0s for running jobs to complete before shutting down"
forgejo-runner.log:time="2026-02-23T01:32:16+01:00" level=info msg="[poller] shutdown begin, 1 tasks currently running"
forgejo-runner.log:time="2026-02-23T01:32:16+01:00" level=info msg="forcing the jobs to shutdown"
forgejo-runner.log:time="2026-02-23T01:32:18+01:00" level=warning msg="uploading final logs failed, but will be retried: already_exists: log file has been archived" task_id=51
forgejo-runner.log:time="2026-02-23T01:32:24+01:00" level=warning msg="uploading final logs failed, but will be retried: already_exists: log file has been archived" task_id=51
```
This appears to be the cause of the `push-cancel` end-to-end test failing since #10008 was merged. https://code.forgejo.org/forgejo/end-to-end/actions/runs/4985/jobs/8/attempt/1 The `push-cancel` test case itself seems to succeed, but then the test process aborts with `return 1`. Doesn't reproduce locally.
Reviewed-on: https://codeberg.org/forgejo/forgejo/pulls/11462
Reviewed-by: Michael Kriese <michael.kriese@gmx.de>
Co-authored-by: Mathieu Fenniak <mathieu@fenniak.net>
Co-committed-by: Mathieu Fenniak <mathieu@fenniak.net>
357 lines
9.3 KiB
Go
357 lines
9.3 KiB
Go
// Copyright 2022 The Gitea Authors. All rights reserved.
|
|
// SPDX-License-Identifier: MIT
|
|
|
|
package actions
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
|
|
actions_model "forgejo.org/models/actions"
|
|
"forgejo.org/models/db"
|
|
actions_module "forgejo.org/modules/actions"
|
|
"forgejo.org/modules/setting"
|
|
"forgejo.org/modules/timeutil"
|
|
"forgejo.org/modules/util"
|
|
|
|
runnerv1 "code.forgejo.org/forgejo/actions-proto/runner/v1"
|
|
"google.golang.org/protobuf/types/known/structpb"
|
|
"google.golang.org/protobuf/types/known/timestamppb"
|
|
)
|
|
|
|
func PickTask(ctx context.Context, runner *actions_model.ActionRunner, requestKey *string) (*runnerv1.Task, bool, error) {
|
|
var (
|
|
task *runnerv1.Task
|
|
job *actions_model.ActionRunJob
|
|
)
|
|
|
|
if runner.Ephemeral {
|
|
hasRunnerAssignedTask, err := actions_model.HasTaskForRunner(ctx, runner.ID)
|
|
// Let the runner retry the request, do not allow to proceed
|
|
if err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
// if runner has task, dont assign new task
|
|
if hasRunnerAssignedTask {
|
|
return nil, false, nil
|
|
}
|
|
}
|
|
|
|
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
|
t, ok, err := actions_model.CreateTaskForRunner(ctx, runner, requestKey)
|
|
if err != nil {
|
|
return fmt.Errorf("CreateTaskForRunner: %w", err)
|
|
}
|
|
if !ok {
|
|
return nil
|
|
}
|
|
|
|
if err := t.LoadAttributes(ctx); err != nil {
|
|
return fmt.Errorf("task LoadAttributes: %w", err)
|
|
}
|
|
job = t.Job
|
|
|
|
secrets, err := getSecretsOfTask(ctx, t)
|
|
if err != nil {
|
|
return fmt.Errorf("GetSecretsOfTask: %w", err)
|
|
}
|
|
|
|
vars, err := actions_model.GetVariablesOfRun(ctx, t.Job.Run)
|
|
if err != nil {
|
|
return fmt.Errorf("GetVariablesOfRun: %w", err)
|
|
}
|
|
|
|
needs, err := findTaskNeeds(ctx, job)
|
|
if err != nil {
|
|
return fmt.Errorf("findTaskNeeds: %w", err)
|
|
}
|
|
|
|
taskContext, err := generateTaskContext(t)
|
|
if err != nil {
|
|
return fmt.Errorf("generateTaskContext: %w", err)
|
|
}
|
|
|
|
task = &runnerv1.Task{
|
|
Id: t.ID,
|
|
WorkflowPayload: t.Job.WorkflowPayload,
|
|
Context: taskContext,
|
|
Secrets: secrets,
|
|
Vars: vars,
|
|
Needs: needs,
|
|
}
|
|
|
|
return nil
|
|
}); err != nil {
|
|
return nil, false, err
|
|
}
|
|
|
|
if task == nil {
|
|
return nil, false, nil
|
|
}
|
|
|
|
CreateCommitStatus(ctx, job)
|
|
|
|
return task, true, nil
|
|
}
|
|
|
|
func RecoverTasks(ctx context.Context, tasks []*actions_model.ActionTask) ([]*runnerv1.Task, error) {
|
|
retval := make([]*runnerv1.Task, len(tasks))
|
|
|
|
err := db.WithTx(ctx, func(ctx context.Context) error {
|
|
for i, t := range tasks {
|
|
// `Token` is stored in the database w/ a one-way hash, so we can't recover it from the original. Instead
|
|
// we generate a new token to create usable runnerv1.Task objects.
|
|
t.GenerateToken()
|
|
if err := t.UpdateToken(ctx); err != nil {
|
|
return fmt.Errorf("UpdateTask failed: %w", err)
|
|
}
|
|
|
|
if err := t.LoadAttributes(ctx); err != nil {
|
|
return fmt.Errorf("task LoadAttributes: %w", err)
|
|
}
|
|
job := t.Job
|
|
|
|
secrets, err := getSecretsOfTask(ctx, t)
|
|
if err != nil {
|
|
return fmt.Errorf("GetSecretsOfTask: %w", err)
|
|
}
|
|
|
|
vars, err := actions_model.GetVariablesOfRun(ctx, t.Job.Run)
|
|
if err != nil {
|
|
return fmt.Errorf("GetVariablesOfRun: %w", err)
|
|
}
|
|
|
|
needs, err := findTaskNeeds(ctx, job)
|
|
if err != nil {
|
|
return fmt.Errorf("findTaskNeeds: %w", err)
|
|
}
|
|
|
|
taskContext, err := generateTaskContext(t)
|
|
if err != nil {
|
|
return fmt.Errorf("generateTaskContext: %w", err)
|
|
}
|
|
|
|
retval[i] = &runnerv1.Task{
|
|
Id: t.ID,
|
|
WorkflowPayload: t.Job.WorkflowPayload,
|
|
Context: taskContext,
|
|
Secrets: secrets,
|
|
Vars: vars,
|
|
Needs: needs,
|
|
}
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return retval, nil
|
|
}
|
|
|
|
func generateTaskContext(t *actions_model.ActionTask) (*structpb.Struct, error) {
|
|
run := t.Job.Run
|
|
gitCtx, err := GenerateGiteaContext(run, t.Job)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
gitCtx["token"] = t.Token
|
|
|
|
enableOpenIDConnect, err := t.Job.EnableOpenIDConnect()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Override the setting from the workflow is this is coming from a fork pull request
|
|
// and this isn't a pull_request_target event.
|
|
if run.IsForkPullRequest && run.TriggerEvent != actions_module.GithubEventPullRequestTarget {
|
|
enableOpenIDConnect = false
|
|
}
|
|
|
|
giteaRuntimeToken, err := CreateAuthorizationToken(t, gitCtx, enableOpenIDConnect)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
gitCtx["gitea_runtime_token"] = giteaRuntimeToken
|
|
|
|
if enableOpenIDConnect {
|
|
gitCtx["forgejo_actions_id_token_request_token"] = giteaRuntimeToken
|
|
// The "placeholder=true" at the end of the URL is meaningless, but we need a param
|
|
// here if we want to match the format used in GitHub actions examples (e.g., to ensure
|
|
// that "ACTIONS_ID_TOKEN_REQUEST_URL&audience=..." will work as expected).
|
|
gitCtx["forgejo_actions_id_token_request_url"] = setting.AppURL + setting.AppSubURL + fmt.Sprintf("api/actions/_apis/pipelines/workflows/%d/idtoken?placeholder=true", t.Job.RunID)
|
|
}
|
|
|
|
return structpb.NewStruct(gitCtx)
|
|
}
|
|
|
|
func findTaskNeeds(ctx context.Context, taskJob *actions_model.ActionRunJob) (map[string]*runnerv1.TaskNeed, error) {
|
|
taskNeeds, err := FindTaskNeeds(ctx, taskJob)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
ret := make(map[string]*runnerv1.TaskNeed, len(taskNeeds))
|
|
for jobID, taskNeed := range taskNeeds {
|
|
ret[jobID] = &runnerv1.TaskNeed{
|
|
Outputs: taskNeed.Outputs,
|
|
Result: runnerv1.Result(taskNeed.Result),
|
|
}
|
|
}
|
|
return ret, nil
|
|
}
|
|
|
|
func StopTask(ctx context.Context, taskID int64, status actions_model.Status) error {
|
|
if !status.IsDone() {
|
|
return fmt.Errorf("cannot stop task with status %v", status)
|
|
}
|
|
e := db.GetEngine(ctx)
|
|
|
|
task := &actions_model.ActionTask{}
|
|
if has, err := e.ID(taskID).Get(task); err != nil {
|
|
return err
|
|
} else if !has {
|
|
return util.ErrNotExist
|
|
}
|
|
if task.Status.IsDone() {
|
|
return nil
|
|
}
|
|
|
|
now := timeutil.TimeStampNow()
|
|
task.Status = status
|
|
task.Stopped = now
|
|
if _, err := UpdateRunJob(ctx, &actions_model.ActionRunJob{
|
|
ID: task.JobID,
|
|
Status: task.Status,
|
|
Stopped: task.Stopped,
|
|
}, nil); err != nil {
|
|
return err
|
|
}
|
|
|
|
if err := actions_model.UpdateTask(ctx, task, "status", "stopped"); err != nil {
|
|
return err
|
|
}
|
|
|
|
runner := &actions_model.ActionRunner{}
|
|
if _, err := e.ID(task.RunnerID).Get(runner); err != nil {
|
|
return fmt.Errorf("failed to find runner assigned to task")
|
|
}
|
|
|
|
if runner.Ephemeral {
|
|
err := actions_model.DeleteRunner(ctx, runner)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to remove ephemeral runner from stopped task: %w", err)
|
|
}
|
|
}
|
|
|
|
if err := task.LoadAttributes(ctx); err != nil {
|
|
return err
|
|
}
|
|
|
|
for _, step := range task.Steps {
|
|
if !step.Status.IsDone() {
|
|
step.Status = status
|
|
if step.Started == 0 {
|
|
step.Started = now
|
|
}
|
|
step.Stopped = now
|
|
}
|
|
if _, err := e.ID(step.ID).Update(step); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// UpdateTaskByState updates the task by the state.
|
|
// It will always update the task if the state is not final, even there is no change.
|
|
// So it will update ActionTask.Updated to avoid the task being judged as a zombie task.
|
|
func UpdateTaskByState(ctx context.Context, runnerID int64, state *runnerv1.TaskState) (*actions_model.ActionTask, error) {
|
|
stepStates := map[int64]*runnerv1.StepState{}
|
|
for _, v := range state.Steps {
|
|
stepStates[v.Id] = v
|
|
}
|
|
|
|
ctx, committer, err := db.TxContext(ctx)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer committer.Close()
|
|
|
|
e := db.GetEngine(ctx)
|
|
|
|
task := &actions_model.ActionTask{}
|
|
if has, err := e.ID(state.Id).Get(task); err != nil {
|
|
return nil, err
|
|
} else if !has {
|
|
return nil, util.ErrNotExist
|
|
} else if runnerID != task.RunnerID {
|
|
return nil, errors.New("invalid runner for task")
|
|
}
|
|
|
|
if task.Status.IsDone() {
|
|
// the state is final, do nothing
|
|
return task, nil
|
|
}
|
|
|
|
// state.Result is not unspecified means the task is finished
|
|
if state.Result != runnerv1.Result_RESULT_UNSPECIFIED {
|
|
task.Status = actions_model.Status(state.Result)
|
|
task.Stopped = timeutil.TimeStamp(state.StoppedAt.AsTime().Unix())
|
|
if err := actions_model.UpdateTask(ctx, task, "status", "stopped"); err != nil {
|
|
return nil, err
|
|
}
|
|
if _, err := UpdateRunJob(ctx, &actions_model.ActionRunJob{
|
|
ID: task.JobID,
|
|
Status: task.Status,
|
|
Stopped: task.Stopped,
|
|
}, nil); err != nil {
|
|
return nil, err
|
|
}
|
|
} else {
|
|
// Force update ActionTask.Updated to avoid the task being judged as a zombie task
|
|
task.Updated = timeutil.TimeStampNow()
|
|
if err := actions_model.UpdateTask(ctx, task, "updated"); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if err := task.LoadAttributes(ctx); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
for _, step := range task.Steps {
|
|
var result runnerv1.Result
|
|
if v, ok := stepStates[step.Index]; ok {
|
|
result = v.Result
|
|
step.LogIndex = v.LogIndex
|
|
step.LogLength = v.LogLength
|
|
step.Started = convertTimestamp(v.StartedAt)
|
|
step.Stopped = convertTimestamp(v.StoppedAt)
|
|
}
|
|
if result != runnerv1.Result_RESULT_UNSPECIFIED {
|
|
step.Status = actions_model.Status(result)
|
|
} else if step.Started != 0 {
|
|
step.Status = actions_model.StatusRunning
|
|
}
|
|
if _, err := e.ID(step.ID).Update(step); err != nil {
|
|
return nil, err
|
|
}
|
|
}
|
|
|
|
if err := committer.Commit(); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return task, nil
|
|
}
|
|
|
|
func convertTimestamp(timestamp *timestamppb.Timestamp) timeutil.TimeStamp {
|
|
if timestamp.GetSeconds() == 0 && timestamp.GetNanos() == 0 {
|
|
return timeutil.TimeStamp(0)
|
|
}
|
|
return timeutil.TimeStamp(timestamp.AsTime().Unix())
|
|
}
|