mirror of
https://github.com/go-gitea/gitea
synced 2024-12-22 04:37:49 +01:00
support concurrency
This commit is contained in:
parent
2d7e6e9482
commit
130f2a2222
@ -20,7 +20,6 @@ import (
|
|||||||
"code.gitea.io/gitea/modules/util"
|
"code.gitea.io/gitea/modules/util"
|
||||||
webhook_module "code.gitea.io/gitea/modules/webhook"
|
webhook_module "code.gitea.io/gitea/modules/webhook"
|
||||||
|
|
||||||
"github.com/nektos/act/pkg/jobparser"
|
|
||||||
"xorm.io/builder"
|
"xorm.io/builder"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -47,6 +46,8 @@ type ActionRun struct {
|
|||||||
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
|
TriggerEvent string // the trigger event defined in the `on` configuration of the triggered workflow
|
||||||
Status Status `xorm:"index"`
|
Status Status `xorm:"index"`
|
||||||
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
|
Version int `xorm:"version default 0"` // Status could be updated concomitantly, so an optimistic lock is needed
|
||||||
|
ConcurrencyGroup string
|
||||||
|
ConcurrencyCancel bool
|
||||||
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
|
// Started and Stopped is used for recording last run time, if rerun happened, they will be reset to 0
|
||||||
Started timeutil.TimeStamp
|
Started timeutil.TimeStamp
|
||||||
Stopped timeutil.TimeStamp
|
Stopped timeutil.TimeStamp
|
||||||
@ -168,7 +169,7 @@ func (run *ActionRun) IsSchedule() bool {
|
|||||||
return run.ScheduleID > 0
|
return run.ScheduleID > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
|
func UpdateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) error {
|
||||||
_, err := db.GetEngine(ctx).ID(repo.ID).
|
_, err := db.GetEngine(ctx).ID(repo.ID).
|
||||||
SetExpr("num_action_runs",
|
SetExpr("num_action_runs",
|
||||||
builder.Select("count(*)").From("action_run").
|
builder.Select("count(*)").From("action_run").
|
||||||
@ -196,13 +197,20 @@ func updateRepoRunsNumbers(ctx context.Context, repo *repo_model.Repository) err
|
|||||||
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
|
// It's useful when a new run is triggered, and all previous runs needn't be continued anymore.
|
||||||
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
|
func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID string, event webhook_module.HookEventType) error {
|
||||||
// Find all runs in the specified repository, reference, and workflow with non-final status
|
// Find all runs in the specified repository, reference, and workflow with non-final status
|
||||||
runs, total, err := db.FindAndCount[ActionRun](ctx, FindRunOptions{
|
opts := &FindRunOptions{
|
||||||
RepoID: repoID,
|
RepoID: repoID,
|
||||||
Ref: ref,
|
Ref: ref,
|
||||||
WorkflowID: workflowID,
|
WorkflowID: workflowID,
|
||||||
TriggerEvent: event,
|
TriggerEvent: event,
|
||||||
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
|
Status: []Status{StatusRunning, StatusWaiting, StatusBlocked},
|
||||||
})
|
}
|
||||||
|
return CancelPreviousJobsWithOpts(ctx, opts)
|
||||||
|
}
|
||||||
|
|
||||||
|
// CancelPreviousJobs cancels all previous jobs with opts
|
||||||
|
func CancelPreviousJobsWithOpts(ctx context.Context, opts *FindRunOptions) error {
|
||||||
|
// Find all runs by opts
|
||||||
|
runs, total, err := db.FindAndCount[ActionRun](ctx, opts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -222,38 +230,8 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
|
|||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// Iterate over each job and attempt to cancel it.
|
if err := CancelJobs(ctx, jobs); err != nil {
|
||||||
for _, job := range jobs {
|
return err
|
||||||
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
|
|
||||||
status := job.Status
|
|
||||||
if status.IsDone() {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
|
|
||||||
if job.TaskID == 0 {
|
|
||||||
job.Status = StatusCancelled
|
|
||||||
job.Stopped = timeutil.TimeStampNow()
|
|
||||||
|
|
||||||
// Update the job's status and stopped time in the database.
|
|
||||||
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
|
|
||||||
if n == 0 {
|
|
||||||
return fmt.Errorf("job has changed, try again")
|
|
||||||
}
|
|
||||||
|
|
||||||
// Continue with the next job.
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
// If the job has an associated task, try to stop the task, effectively cancelling the job.
|
|
||||||
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -261,80 +239,41 @@ func CancelPreviousJobs(ctx context.Context, repoID int64, ref, workflowID strin
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// InsertRun inserts a run
|
func CancelJobs(ctx context.Context, jobs []*ActionRunJob) error {
|
||||||
// The title will be cut off at 255 characters if it's longer than 255 characters.
|
// Iterate over each job and attempt to cancel it.
|
||||||
func InsertRun(ctx context.Context, run *ActionRun, jobs []*jobparser.SingleWorkflow) error {
|
for _, job := range jobs {
|
||||||
ctx, committer, err := db.TxContext(ctx)
|
// Skip jobs that are already in a terminal state (completed, cancelled, etc.).
|
||||||
if err != nil {
|
status := job.Status
|
||||||
return err
|
if status.IsDone() {
|
||||||
}
|
continue
|
||||||
defer committer.Close()
|
|
||||||
|
|
||||||
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
run.Index = index
|
|
||||||
run.Title, _ = util.SplitStringAtByteN(run.Title, 255)
|
|
||||||
|
|
||||||
if err := db.Insert(ctx, run); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if run.Repo == nil {
|
|
||||||
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
}
|
||||||
run.Repo = repo
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
|
// If the job has no associated task (probably an error), set its status to 'Cancelled' and stop it.
|
||||||
return err
|
if job.TaskID == 0 {
|
||||||
}
|
job.Status = StatusCancelled
|
||||||
|
job.Stopped = timeutil.TimeStampNow()
|
||||||
|
|
||||||
runJobs := make([]*ActionRunJob, 0, len(jobs))
|
// Update the job's status and stopped time in the database.
|
||||||
var hasWaiting bool
|
n, err := UpdateRunJob(ctx, job, builder.Eq{"task_id": 0}, "status", "stopped")
|
||||||
for _, v := range jobs {
|
if err != nil {
|
||||||
id, job := v.Job()
|
return err
|
||||||
needs := job.Needs()
|
}
|
||||||
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
|
|
||||||
return err
|
// If the update affected 0 rows, it means the job has changed in the meantime, so we need to try again.
|
||||||
|
if n == 0 {
|
||||||
|
return fmt.Errorf("job has changed, try again")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Continue with the next job.
|
||||||
|
continue
|
||||||
}
|
}
|
||||||
payload, _ := v.Marshal()
|
|
||||||
status := StatusWaiting
|
|
||||||
if len(needs) > 0 || run.NeedApproval {
|
|
||||||
status = StatusBlocked
|
|
||||||
} else {
|
|
||||||
hasWaiting = true
|
|
||||||
}
|
|
||||||
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
|
|
||||||
runJobs = append(runJobs, &ActionRunJob{
|
|
||||||
RunID: run.ID,
|
|
||||||
RepoID: run.RepoID,
|
|
||||||
OwnerID: run.OwnerID,
|
|
||||||
CommitSHA: run.CommitSHA,
|
|
||||||
IsForkPullRequest: run.IsForkPullRequest,
|
|
||||||
Name: job.Name,
|
|
||||||
WorkflowPayload: payload,
|
|
||||||
JobID: id,
|
|
||||||
Needs: needs,
|
|
||||||
RunsOn: job.RunsOn(),
|
|
||||||
Status: status,
|
|
||||||
})
|
|
||||||
}
|
|
||||||
if err := db.Insert(ctx, runJobs); err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
// if there is a job in the waiting status, increase tasks version.
|
// If the job has an associated task, try to stop the task, effectively cancelling the job.
|
||||||
if hasWaiting {
|
if err := StopTask(ctx, job.TaskID, StatusCancelled); err != nil {
|
||||||
if err := IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
|
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
return nil
|
||||||
return committer.Commit()
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
|
func GetRunByID(ctx context.Context, id int64) (*ActionRun, error) {
|
||||||
@ -426,7 +365,7 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
|
|||||||
}
|
}
|
||||||
run.Repo = repo
|
run.Repo = repo
|
||||||
}
|
}
|
||||||
if err := updateRepoRunsNumbers(ctx, run.Repo); err != nil {
|
if err := UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -435,3 +374,38 @@ func UpdateRun(ctx context.Context, run *ActionRun, cols ...string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type ActionRunIndex db.ResourceIndex
|
type ActionRunIndex db.ResourceIndex
|
||||||
|
|
||||||
|
func CancelConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) error {
|
||||||
|
// cancel previous jobs in the same concurrency group
|
||||||
|
previousJobs, err := db.Find[ActionRunJob](ctx, FindRunJobOptions{
|
||||||
|
RepoID: actionRunJob.RepoID,
|
||||||
|
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
|
||||||
|
Statuses: []Status{
|
||||||
|
StatusRunning,
|
||||||
|
StatusWaiting,
|
||||||
|
StatusBlocked,
|
||||||
|
},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("find previous jobs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return CancelJobs(ctx, previousJobs)
|
||||||
|
}
|
||||||
|
|
||||||
|
func ShouldJobBeBlockedByConcurrentJobs(ctx context.Context, actionRunJob *ActionRunJob) (bool, error) {
|
||||||
|
if actionRunJob.ConcurrencyCancel {
|
||||||
|
return false, CancelConcurrentJobs(ctx, actionRunJob)
|
||||||
|
}
|
||||||
|
|
||||||
|
concurrentJobsNum, err := db.Count[ActionRunJob](ctx, FindRunJobOptions{
|
||||||
|
RepoID: actionRunJob.RepoID,
|
||||||
|
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
|
||||||
|
Statuses: []Status{StatusRunning, StatusWaiting},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("count waiting jobs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return concurrentJobsNum > 0, nil
|
||||||
|
}
|
||||||
|
@ -33,10 +33,16 @@ type ActionRunJob struct {
|
|||||||
RunsOn []string `xorm:"JSON TEXT"`
|
RunsOn []string `xorm:"JSON TEXT"`
|
||||||
TaskID int64 // the latest task of the job
|
TaskID int64 // the latest task of the job
|
||||||
Status Status `xorm:"index"`
|
Status Status `xorm:"index"`
|
||||||
Started timeutil.TimeStamp
|
|
||||||
Stopped timeutil.TimeStamp
|
RawConcurrencyGroup string // raw concurrency.group
|
||||||
Created timeutil.TimeStamp `xorm:"created"`
|
RawConcurrencyCancel string // raw concurrency.cancel-in-progress
|
||||||
Updated timeutil.TimeStamp `xorm:"updated index"`
|
ConcurrencyGroup string // evaluated concurrency.group
|
||||||
|
ConcurrencyCancel bool // evaluated concurrency.cancel-in-progress
|
||||||
|
|
||||||
|
Started timeutil.TimeStamp
|
||||||
|
Stopped timeutil.TimeStamp
|
||||||
|
Created timeutil.TimeStamp `xorm:"created"`
|
||||||
|
Updated timeutil.TimeStamp `xorm:"updated index"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
|
@ -46,14 +46,21 @@ func (jobs ActionJobList) LoadAttributes(ctx context.Context, withRepo bool) err
|
|||||||
return jobs.LoadRuns(ctx, withRepo)
|
return jobs.LoadRuns(ctx, withRepo)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func GetRunsByIDs(ctx context.Context, runIDs []int64) (RunList, error) {
|
||||||
|
runList := make(RunList, 0, len(runIDs))
|
||||||
|
err := db.GetEngine(ctx).In("id", runIDs).Find(&runList)
|
||||||
|
return runList, err
|
||||||
|
}
|
||||||
|
|
||||||
type FindRunJobOptions struct {
|
type FindRunJobOptions struct {
|
||||||
db.ListOptions
|
db.ListOptions
|
||||||
RunID int64
|
RunID int64
|
||||||
RepoID int64
|
RepoID int64
|
||||||
OwnerID int64
|
OwnerID int64
|
||||||
CommitSHA string
|
CommitSHA string
|
||||||
Statuses []Status
|
Statuses []Status
|
||||||
UpdatedBefore timeutil.TimeStamp
|
UpdatedBefore timeutil.TimeStamp
|
||||||
|
ConcurrencyGroup string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opts FindRunJobOptions) ToConds() builder.Cond {
|
func (opts FindRunJobOptions) ToConds() builder.Cond {
|
||||||
@ -76,5 +83,8 @@ func (opts FindRunJobOptions) ToConds() builder.Cond {
|
|||||||
if opts.UpdatedBefore > 0 {
|
if opts.UpdatedBefore > 0 {
|
||||||
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
|
cond = cond.And(builder.Lt{"updated": opts.UpdatedBefore})
|
||||||
}
|
}
|
||||||
|
if opts.ConcurrencyGroup != "" {
|
||||||
|
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
|
||||||
|
}
|
||||||
return cond
|
return cond
|
||||||
}
|
}
|
||||||
|
@ -63,14 +63,16 @@ func (runs RunList) LoadRepos(ctx context.Context) error {
|
|||||||
|
|
||||||
type FindRunOptions struct {
|
type FindRunOptions struct {
|
||||||
db.ListOptions
|
db.ListOptions
|
||||||
RepoID int64
|
RepoID int64
|
||||||
OwnerID int64
|
OwnerID int64
|
||||||
WorkflowID string
|
WorkflowID string
|
||||||
Ref string // the commit/tag/… that caused this workflow
|
Ref string // the commit/tag/… that caused this workflow
|
||||||
TriggerUserID int64
|
TriggerUserID int64
|
||||||
TriggerEvent webhook_module.HookEventType
|
TriggerEvent webhook_module.HookEventType
|
||||||
Approved bool // not util.OptionalBool, it works only when it's true
|
Approved bool // not util.OptionalBool, it works only when it's true
|
||||||
Status []Status
|
Status []Status
|
||||||
|
SortType string
|
||||||
|
ConcurrencyGroup string
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opts FindRunOptions) ToConds() builder.Cond {
|
func (opts FindRunOptions) ToConds() builder.Cond {
|
||||||
@ -99,11 +101,21 @@ func (opts FindRunOptions) ToConds() builder.Cond {
|
|||||||
if opts.TriggerEvent != "" {
|
if opts.TriggerEvent != "" {
|
||||||
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
|
cond = cond.And(builder.Eq{"trigger_event": opts.TriggerEvent})
|
||||||
}
|
}
|
||||||
|
if len(opts.ConcurrencyGroup) > 0 {
|
||||||
|
cond = cond.And(builder.Eq{"concurrency_group": opts.ConcurrencyGroup})
|
||||||
|
}
|
||||||
return cond
|
return cond
|
||||||
}
|
}
|
||||||
|
|
||||||
func (opts FindRunOptions) ToOrders() string {
|
func (opts FindRunOptions) ToOrders() string {
|
||||||
return "`id` DESC"
|
switch opts.SortType {
|
||||||
|
case "oldest":
|
||||||
|
return "created ASC"
|
||||||
|
case "newest":
|
||||||
|
return "created DESC"
|
||||||
|
default:
|
||||||
|
return "`id` DESC"
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type StatusInfo struct {
|
type StatusInfo struct {
|
||||||
|
@ -369,6 +369,7 @@ func prepareMigrationTasks() []*migration {
|
|||||||
newMigration(309, "Improve Notification table indices", v1_23.ImproveNotificationTableIndices),
|
newMigration(309, "Improve Notification table indices", v1_23.ImproveNotificationTableIndices),
|
||||||
newMigration(310, "Add Priority to ProtectedBranch", v1_23.AddPriorityToProtectedBranch),
|
newMigration(310, "Add Priority to ProtectedBranch", v1_23.AddPriorityToProtectedBranch),
|
||||||
newMigration(311, "Add TimeEstimate to Issue table", v1_23.AddTimeEstimateColumnToIssueTable),
|
newMigration(311, "Add TimeEstimate to Issue table", v1_23.AddTimeEstimateColumnToIssueTable),
|
||||||
|
// TODO: AddActionsConcurrency
|
||||||
}
|
}
|
||||||
return preparedMigrations
|
return preparedMigrations
|
||||||
}
|
}
|
||||||
|
28
models/migrations/v1_23/v312.go
Normal file
28
models/migrations/v1_23/v312.go
Normal file
@ -0,0 +1,28 @@
|
|||||||
|
// Copyright 2024 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
package v1_23 //nolint
|
||||||
|
|
||||||
|
import (
|
||||||
|
"xorm.io/xorm"
|
||||||
|
)
|
||||||
|
|
||||||
|
func AddActionsConcurrency(x *xorm.Engine) error {
|
||||||
|
type ActionRun struct {
|
||||||
|
ConcurrencyGroup string
|
||||||
|
ConcurrencyCancel bool
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := x.Sync(new(ActionRun)); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
type ActionRunJob struct {
|
||||||
|
RawConcurrencyGroup string
|
||||||
|
RawConcurrencyCancel string
|
||||||
|
ConcurrencyGroup string
|
||||||
|
ConcurrencyCancel bool
|
||||||
|
}
|
||||||
|
|
||||||
|
return x.Sync(new(ActionRunJob))
|
||||||
|
}
|
@ -8,14 +8,8 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
actions_model "code.gitea.io/gitea/models/actions"
|
actions_model "code.gitea.io/gitea/models/actions"
|
||||||
"code.gitea.io/gitea/models/db"
|
|
||||||
secret_model "code.gitea.io/gitea/models/secret"
|
secret_model "code.gitea.io/gitea/models/secret"
|
||||||
actions_module "code.gitea.io/gitea/modules/actions"
|
|
||||||
"code.gitea.io/gitea/modules/container"
|
|
||||||
"code.gitea.io/gitea/modules/git"
|
|
||||||
"code.gitea.io/gitea/modules/json"
|
|
||||||
"code.gitea.io/gitea/modules/log"
|
"code.gitea.io/gitea/modules/log"
|
||||||
"code.gitea.io/gitea/modules/setting"
|
|
||||||
"code.gitea.io/gitea/services/actions"
|
"code.gitea.io/gitea/services/actions"
|
||||||
|
|
||||||
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
runnerv1 "code.gitea.io/actions-proto-go/runner/v1"
|
||||||
@ -65,82 +59,16 @@ func pickTask(ctx context.Context, runner *actions_model.ActionRunner) (*runnerv
|
|||||||
}
|
}
|
||||||
|
|
||||||
func generateTaskContext(t *actions_model.ActionTask) *structpb.Struct {
|
func generateTaskContext(t *actions_model.ActionTask) *structpb.Struct {
|
||||||
event := map[string]any{}
|
|
||||||
_ = json.Unmarshal([]byte(t.Job.Run.EventPayload), &event)
|
|
||||||
|
|
||||||
// TriggerEvent is added in https://github.com/go-gitea/gitea/pull/25229
|
|
||||||
// This fallback is for the old ActionRun that doesn't have the TriggerEvent field
|
|
||||||
// and should be removed in 1.22
|
|
||||||
eventName := t.Job.Run.TriggerEvent
|
|
||||||
if eventName == "" {
|
|
||||||
eventName = t.Job.Run.Event.Event()
|
|
||||||
}
|
|
||||||
|
|
||||||
baseRef := ""
|
|
||||||
headRef := ""
|
|
||||||
ref := t.Job.Run.Ref
|
|
||||||
sha := t.Job.Run.CommitSHA
|
|
||||||
if pullPayload, err := t.Job.Run.GetPullRequestEventPayload(); err == nil && pullPayload.PullRequest != nil && pullPayload.PullRequest.Base != nil && pullPayload.PullRequest.Head != nil {
|
|
||||||
baseRef = pullPayload.PullRequest.Base.Ref
|
|
||||||
headRef = pullPayload.PullRequest.Head.Ref
|
|
||||||
|
|
||||||
// if the TriggerEvent is pull_request_target, ref and sha need to be set according to the base of pull request
|
|
||||||
// In GitHub's documentation, ref should be the branch or tag that triggered workflow. But when the TriggerEvent is pull_request_target,
|
|
||||||
// the ref will be the base branch.
|
|
||||||
if t.Job.Run.TriggerEvent == actions_module.GithubEventPullRequestTarget {
|
|
||||||
ref = git.BranchPrefix + pullPayload.PullRequest.Base.Name
|
|
||||||
sha = pullPayload.PullRequest.Base.Sha
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
refName := git.RefName(ref)
|
|
||||||
|
|
||||||
giteaRuntimeToken, err := actions.CreateAuthorizationToken(t.ID, t.Job.RunID, t.JobID)
|
giteaRuntimeToken, err := actions.CreateAuthorizationToken(t.ID, t.Job.RunID, t.JobID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("actions.CreateAuthorizationToken failed: %v", err)
|
log.Error("actions.CreateAuthorizationToken failed: %v", err)
|
||||||
}
|
}
|
||||||
|
|
||||||
taskContext, err := structpb.NewStruct(map[string]any{
|
ghCtx := actions.GenerateGitContext(t.Job.Run, t.Job)
|
||||||
// standard contexts, see https://docs.github.com/en/actions/learn-github-actions/contexts#github-context
|
ghCtx["token"] = t.Token
|
||||||
"action": "", // string, The name of the action currently running, or the id of a step. GitHub removes special characters, and uses the name __run when the current step runs a script without an id. If you use the same action more than once in the same job, the name will include a suffix with the sequence number with underscore before it. For example, the first script you run will have the name __run, and the second script will be named __run_2. Similarly, the second invocation of actions/checkout will be actionscheckout2.
|
ghCtx["gitea_runtime_token"] = giteaRuntimeToken
|
||||||
"action_path": "", // string, The path where an action is located. This property is only supported in composite actions. You can use this path to access files located in the same repository as the action.
|
|
||||||
"action_ref": "", // string, For a step executing an action, this is the ref of the action being executed. For example, v2.
|
|
||||||
"action_repository": "", // string, For a step executing an action, this is the owner and repository name of the action. For example, actions/checkout.
|
|
||||||
"action_status": "", // string, For a composite action, the current result of the composite action.
|
|
||||||
"actor": t.Job.Run.TriggerUser.Name, // string, The username of the user that triggered the initial workflow run. If the workflow run is a re-run, this value may differ from github.triggering_actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges.
|
|
||||||
"api_url": setting.AppURL + "api/v1", // string, The URL of the GitHub REST API.
|
|
||||||
"base_ref": baseRef, // string, The base_ref or target branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target.
|
|
||||||
"env": "", // string, Path on the runner to the file that sets environment variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions."
|
|
||||||
"event": event, // object, The full event webhook payload. You can access individual properties of the event using this context. This object is identical to the webhook payload of the event that triggered the workflow run, and is different for each event. The webhooks for each GitHub Actions event is linked in "Events that trigger workflows." For example, for a workflow run triggered by the push event, this object contains the contents of the push webhook payload.
|
|
||||||
"event_name": eventName, // string, The name of the event that triggered the workflow run.
|
|
||||||
"event_path": "", // string, The path to the file on the runner that contains the full event webhook payload.
|
|
||||||
"graphql_url": "", // string, The URL of the GitHub GraphQL API.
|
|
||||||
"head_ref": headRef, // string, The head_ref or source branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target.
|
|
||||||
"job": fmt.Sprint(t.JobID), // string, The job_id of the current job.
|
|
||||||
"ref": ref, // string, The fully-formed ref of the branch or tag that triggered the workflow run. For workflows triggered by push, this is the branch or tag ref that was pushed. For workflows triggered by pull_request, this is the pull request merge branch. For workflows triggered by release, this is the release tag created. For other triggers, this is the branch or tag ref that triggered the workflow run. This is only set if a branch or tag is available for the event type. The ref given is fully-formed, meaning that for branches the format is refs/heads/<branch_name>, for pull requests it is refs/pull/<pr_number>/merge, and for tags it is refs/tags/<tag_name>. For example, refs/heads/feature-branch-1.
|
|
||||||
"ref_name": refName.ShortName(), // string, The short ref name of the branch or tag that triggered the workflow run. This value matches the branch or tag name shown on GitHub. For example, feature-branch-1.
|
|
||||||
"ref_protected": false, // boolean, true if branch protections are configured for the ref that triggered the workflow run.
|
|
||||||
"ref_type": refName.RefType(), // string, The type of ref that triggered the workflow run. Valid values are branch or tag.
|
|
||||||
"path": "", // string, Path on the runner to the file that sets system PATH variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions."
|
|
||||||
"repository": t.Job.Run.Repo.OwnerName + "/" + t.Job.Run.Repo.Name, // string, The owner and repository name. For example, Codertocat/Hello-World.
|
|
||||||
"repository_owner": t.Job.Run.Repo.OwnerName, // string, The repository owner's name. For example, Codertocat.
|
|
||||||
"repositoryUrl": t.Job.Run.Repo.HTMLURL(), // string, The Git URL to the repository. For example, git://github.com/codertocat/hello-world.git.
|
|
||||||
"retention_days": "", // string, The number of days that workflow run logs and artifacts are kept.
|
|
||||||
"run_id": fmt.Sprint(t.Job.RunID), // string, A unique number for each workflow run within a repository. This number does not change if you re-run the workflow run.
|
|
||||||
"run_number": fmt.Sprint(t.Job.Run.Index), // string, A unique number for each run of a particular workflow in a repository. This number begins at 1 for the workflow's first run, and increments with each new run. This number does not change if you re-run the workflow run.
|
|
||||||
"run_attempt": fmt.Sprint(t.Job.Attempt), // string, A unique number for each attempt of a particular workflow run in a repository. This number begins at 1 for the workflow run's first attempt, and increments with each re-run.
|
|
||||||
"secret_source": "Actions", // string, The source of a secret used in a workflow. Possible values are None, Actions, Dependabot, or Codespaces.
|
|
||||||
"server_url": setting.AppURL, // string, The URL of the GitHub server. For example: https://github.com.
|
|
||||||
"sha": sha, // string, The commit SHA that triggered the workflow. The value of this commit SHA depends on the event that triggered the workflow. For more information, see "Events that trigger workflows." For example, ffac537e6cbbf934b08745a378932722df287a53.
|
|
||||||
"token": t.Token, // string, A token to authenticate on behalf of the GitHub App installed on your repository. This is functionally equivalent to the GITHUB_TOKEN secret. For more information, see "Automatic token authentication."
|
|
||||||
"triggering_actor": "", // string, The username of the user that initiated the workflow run. If the workflow run is a re-run, this value may differ from github.actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges.
|
|
||||||
"workflow": t.Job.Run.WorkflowID, // string, The name of the workflow. If the workflow file doesn't specify a name, the value of this property is the full path of the workflow file in the repository.
|
|
||||||
"workspace": "", // string, The default working directory on the runner for steps, and the default location of your repository when using the checkout action.
|
|
||||||
|
|
||||||
// additional contexts
|
taskContext, err := structpb.NewStruct(ghCtx)
|
||||||
"gitea_default_actions_url": setting.Actions.DefaultActionsURL.URL(),
|
|
||||||
"gitea_runtime_token": giteaRuntimeToken,
|
|
||||||
})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("structpb.NewStruct failed: %v", err)
|
log.Error("structpb.NewStruct failed: %v", err)
|
||||||
}
|
}
|
||||||
@ -149,52 +77,16 @@ func generateTaskContext(t *actions_model.ActionTask) *structpb.Struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func findTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[string]*runnerv1.TaskNeed, error) {
|
func findTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[string]*runnerv1.TaskNeed, error) {
|
||||||
if err := task.LoadAttributes(ctx); err != nil {
|
taskNeeds, err := actions.FindTaskNeeds(ctx, task)
|
||||||
return nil, fmt.Errorf("LoadAttributes: %w", err)
|
|
||||||
}
|
|
||||||
if len(task.Job.Needs) == 0 {
|
|
||||||
return nil, nil
|
|
||||||
}
|
|
||||||
needs := container.SetOf(task.Job.Needs...)
|
|
||||||
|
|
||||||
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: task.Job.RunID})
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("FindRunJobs: %w", err)
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
jobIDJobs := make(map[string][]*actions_model.ActionRunJob)
|
ret := make(map[string]*runnerv1.TaskNeed, len(taskNeeds))
|
||||||
for _, job := range jobs {
|
for jobID, taskNeed := range taskNeeds {
|
||||||
jobIDJobs[job.JobID] = append(jobIDJobs[job.JobID], job)
|
|
||||||
}
|
|
||||||
|
|
||||||
ret := make(map[string]*runnerv1.TaskNeed, len(needs))
|
|
||||||
for jobID, jobsWithSameID := range jobIDJobs {
|
|
||||||
if !needs.Contains(jobID) {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
var jobOutputs map[string]string
|
|
||||||
for _, job := range jobsWithSameID {
|
|
||||||
if job.TaskID == 0 || !job.Status.IsDone() {
|
|
||||||
// it shouldn't happen, or the job has been rerun
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
got, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID)
|
|
||||||
if err != nil {
|
|
||||||
return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err)
|
|
||||||
}
|
|
||||||
outputs := make(map[string]string, len(got))
|
|
||||||
for _, v := range got {
|
|
||||||
outputs[v.OutputKey] = v.OutputValue
|
|
||||||
}
|
|
||||||
if len(jobOutputs) == 0 {
|
|
||||||
jobOutputs = outputs
|
|
||||||
} else {
|
|
||||||
jobOutputs = mergeTwoOutputs(outputs, jobOutputs)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
ret[jobID] = &runnerv1.TaskNeed{
|
ret[jobID] = &runnerv1.TaskNeed{
|
||||||
Outputs: jobOutputs,
|
Outputs: taskNeed.Outputs,
|
||||||
Result: runnerv1.Result(actions_model.AggregateJobStatus(jobsWithSameID)),
|
Result: runnerv1.Result(taskNeed.Result),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -561,6 +561,7 @@ func Approve(ctx *context_module.Context) {
|
|||||||
}
|
}
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
if len(job.Needs) == 0 && job.Status.IsBlocked() {
|
if len(job.Needs) == 0 && job.Status.IsBlocked() {
|
||||||
|
// TODO: check concurrency
|
||||||
job.Status = actions_model.StatusWaiting
|
job.Status = actions_model.StatusWaiting
|
||||||
_, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
|
_, err := actions_model.UpdateRunJob(ctx, job, nil, "status")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -826,9 +827,10 @@ func Run(ctx *context_module.Context) {
|
|||||||
|
|
||||||
// find workflow from commit
|
// find workflow from commit
|
||||||
var workflows []*jobparser.SingleWorkflow
|
var workflows []*jobparser.SingleWorkflow
|
||||||
|
var content []byte
|
||||||
for _, entry := range entries {
|
for _, entry := range entries {
|
||||||
if entry.Name() == workflowID {
|
if entry.Name() == workflowID {
|
||||||
content, err := actions.GetContentFromEntry(entry)
|
content, err = actions.GetContentFromEntry(entry)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
ctx.Error(http.StatusInternalServerError, err.Error())
|
ctx.Error(http.StatusInternalServerError, err.Error())
|
||||||
return
|
return
|
||||||
@ -914,7 +916,7 @@ func Run(ctx *context_module.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Insert the action run and its associated jobs into the database
|
// Insert the action run and its associated jobs into the database
|
||||||
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
|
if err := actions_service.InsertRun(ctx, run, workflows); err != nil {
|
||||||
ctx.ServerError("workflow", err)
|
ctx.ServerError("workflow", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
37
services/actions/concurrency.go
Normal file
37
services/actions/concurrency.go
Normal file
@ -0,0 +1,37 @@
|
|||||||
|
// Copyright 2024 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
package actions
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
actions_model "code.gitea.io/gitea/models/actions"
|
||||||
|
|
||||||
|
"github.com/nektos/act/pkg/jobparser"
|
||||||
|
act_model "github.com/nektos/act/pkg/model"
|
||||||
|
)
|
||||||
|
|
||||||
|
func evaluateJobConcurrency(run *actions_model.ActionRun, actionRunJob *actions_model.ActionRunJob, vars map[string]string, jobResults map[string]*jobparser.JobResult) (string, bool, error) {
|
||||||
|
rawConcurrency := &act_model.RawConcurrency{
|
||||||
|
Group: actionRunJob.RawConcurrencyGroup,
|
||||||
|
CancelInProgress: actionRunJob.RawConcurrencyCancel,
|
||||||
|
}
|
||||||
|
|
||||||
|
gitCtx := jobparser.ToGitContext(GenerateGitContext(run, actionRunJob))
|
||||||
|
|
||||||
|
actWorkflow, err := act_model.ReadWorkflow(bytes.NewReader(actionRunJob.WorkflowPayload))
|
||||||
|
if err != nil {
|
||||||
|
return "", false, fmt.Errorf("read workflow: %w", err)
|
||||||
|
}
|
||||||
|
actJob := actWorkflow.GetJob(actionRunJob.JobID)
|
||||||
|
|
||||||
|
if len(jobResults) == 0 {
|
||||||
|
jobResults = map[string]*jobparser.JobResult{actionRunJob.JobID: {}}
|
||||||
|
}
|
||||||
|
|
||||||
|
concurrencyGroup, concurrencyCancel := jobparser.EvaluateJobConcurrency(rawConcurrency, actionRunJob.JobID, actJob, gitCtx, vars, jobResults)
|
||||||
|
|
||||||
|
return concurrencyGroup, concurrencyCancel, nil
|
||||||
|
}
|
@ -10,7 +10,9 @@ import (
|
|||||||
|
|
||||||
actions_model "code.gitea.io/gitea/models/actions"
|
actions_model "code.gitea.io/gitea/models/actions"
|
||||||
"code.gitea.io/gitea/models/db"
|
"code.gitea.io/gitea/models/db"
|
||||||
|
"code.gitea.io/gitea/modules/container"
|
||||||
"code.gitea.io/gitea/modules/graceful"
|
"code.gitea.io/gitea/modules/graceful"
|
||||||
|
"code.gitea.io/gitea/modules/log"
|
||||||
"code.gitea.io/gitea/modules/queue"
|
"code.gitea.io/gitea/modules/queue"
|
||||||
|
|
||||||
"github.com/nektos/act/pkg/jobparser"
|
"github.com/nektos/act/pkg/jobparser"
|
||||||
@ -37,25 +39,116 @@ func jobEmitterQueueHandler(items ...*jobUpdate) []*jobUpdate {
|
|||||||
ctx := graceful.GetManager().ShutdownContext()
|
ctx := graceful.GetManager().ShutdownContext()
|
||||||
var ret []*jobUpdate
|
var ret []*jobUpdate
|
||||||
for _, update := range items {
|
for _, update := range items {
|
||||||
if err := checkJobsOfRun(ctx, update.RunID); err != nil {
|
if err := checkJobsByRunID(ctx, update.RunID); err != nil {
|
||||||
ret = append(ret, update)
|
ret = append(ret, update)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func checkJobsOfRun(ctx context.Context, runID int64) error {
|
func checkJobsByRunID(ctx context.Context, runID int64) error {
|
||||||
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: runID})
|
run, err := actions_model.GetRunByID(ctx, runID)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get action run: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return db.WithTx(ctx, func(ctx context.Context) error {
|
||||||
|
// check jobs of the current run
|
||||||
|
if err := checkJobsOfRun(ctx, run); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// check run (workflow-level) concurrency
|
||||||
|
concurrentRunIDs := make(container.Set[int64])
|
||||||
|
if len(run.ConcurrencyGroup) > 0 && !run.ConcurrencyCancel {
|
||||||
|
concurrentRuns, err := db.Find[actions_model.ActionRun](ctx, actions_model.FindRunOptions{
|
||||||
|
RepoID: run.RepoID,
|
||||||
|
ConcurrencyGroup: run.ConcurrencyGroup,
|
||||||
|
Status: []actions_model.Status{actions_model.StatusBlocked},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, cRun := range concurrentRuns {
|
||||||
|
concurrentRunIDs.Add(cRun.ID)
|
||||||
|
if cRun.NeedApproval {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := checkJobsOfRun(ctx, cRun); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
updatedRun, err := actions_model.GetRunByID(ctx, cRun.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if updatedRun.Status == actions_model.StatusWaiting {
|
||||||
|
// only run one blocked action run in the same concurrency group
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// check job concurrency
|
||||||
|
concurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, job := range concurrentJobs {
|
||||||
|
if job.Status.IsDone() && len(job.ConcurrencyGroup) > 0 && !job.ConcurrencyCancel {
|
||||||
|
concurrentJobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{
|
||||||
|
RepoID: job.RepoID,
|
||||||
|
ConcurrencyGroup: job.ConcurrencyGroup,
|
||||||
|
Statuses: []actions_model.Status{actions_model.StatusBlocked},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
for _, cJob := range concurrentJobs {
|
||||||
|
if concurrentRunIDs.Contains(cJob.RunID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
cRun, err := actions_model.GetRunByID(ctx, cJob.RunID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if cRun.NeedApproval {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if err := checkJobsOfRun(ctx, cRun); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
updatedJob, err := actions_model.GetRunJobByID(ctx, cJob.ID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if updatedJob.Status == actions_model.StatusWaiting {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func checkJobsOfRun(ctx context.Context, run *actions_model.ActionRun) error {
|
||||||
|
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: run.ID})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
vars, err := actions_model.GetVariablesOfRun(ctx, run)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get run %d variables: %w", run.ID, err)
|
||||||
|
}
|
||||||
|
|
||||||
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
if err := db.WithTx(ctx, func(ctx context.Context) error {
|
||||||
idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
|
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
idToJobs[job.JobID] = append(idToJobs[job.JobID], job)
|
job.Run = run
|
||||||
}
|
}
|
||||||
|
|
||||||
updates := newJobStatusResolver(jobs).Resolve()
|
updates := newJobStatusResolver(jobs, vars).Resolve(ctx)
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
if status, ok := updates[job.ID]; ok {
|
if status, ok := updates[job.ID]; ok {
|
||||||
job.Status = status
|
job.Status = status
|
||||||
@ -78,9 +171,10 @@ type jobStatusResolver struct {
|
|||||||
statuses map[int64]actions_model.Status
|
statuses map[int64]actions_model.Status
|
||||||
needs map[int64][]int64
|
needs map[int64][]int64
|
||||||
jobMap map[int64]*actions_model.ActionRunJob
|
jobMap map[int64]*actions_model.ActionRunJob
|
||||||
|
vars map[string]string
|
||||||
}
|
}
|
||||||
|
|
||||||
func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
|
func newJobStatusResolver(jobs actions_model.ActionJobList, vars map[string]string) *jobStatusResolver {
|
||||||
idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
|
idToJobs := make(map[string][]*actions_model.ActionRunJob, len(jobs))
|
||||||
jobMap := make(map[int64]*actions_model.ActionRunJob)
|
jobMap := make(map[int64]*actions_model.ActionRunJob)
|
||||||
for _, job := range jobs {
|
for _, job := range jobs {
|
||||||
@ -102,13 +196,14 @@ func newJobStatusResolver(jobs actions_model.ActionJobList) *jobStatusResolver {
|
|||||||
statuses: statuses,
|
statuses: statuses,
|
||||||
needs: needs,
|
needs: needs,
|
||||||
jobMap: jobMap,
|
jobMap: jobMap,
|
||||||
|
vars: vars,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
|
func (r *jobStatusResolver) Resolve(ctx context.Context) map[int64]actions_model.Status {
|
||||||
ret := map[int64]actions_model.Status{}
|
ret := map[int64]actions_model.Status{}
|
||||||
for i := 0; i < len(r.statuses); i++ {
|
for i := 0; i < len(r.statuses); i++ {
|
||||||
updated := r.resolve()
|
updated := r.resolve(ctx)
|
||||||
if len(updated) == 0 {
|
if len(updated) == 0 {
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
@ -120,7 +215,7 @@ func (r *jobStatusResolver) Resolve() map[int64]actions_model.Status {
|
|||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
|
func (r *jobStatusResolver) resolve(ctx context.Context) map[int64]actions_model.Status {
|
||||||
ret := map[int64]actions_model.Status{}
|
ret := map[int64]actions_model.Status{}
|
||||||
for id, status := range r.statuses {
|
for id, status := range r.statuses {
|
||||||
if status != actions_model.StatusBlocked {
|
if status != actions_model.StatusBlocked {
|
||||||
@ -137,6 +232,17 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
if allDone {
|
if allDone {
|
||||||
|
// check concurrency
|
||||||
|
blockedByJobConcurrency, err := checkConcurrencyForJobWithNeeds(ctx, r.jobMap[id], r.vars)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("Check run %d job %d concurrency: %v. This job will stay blocked.")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if blockedByJobConcurrency {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
if allSucceed {
|
if allSucceed {
|
||||||
ret[id] = actions_model.StatusWaiting
|
ret[id] = actions_model.StatusWaiting
|
||||||
} else {
|
} else {
|
||||||
@ -160,3 +266,59 @@ func (r *jobStatusResolver) resolve() map[int64]actions_model.Status {
|
|||||||
}
|
}
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func checkConcurrencyForJobWithNeeds(ctx context.Context, actionRunJob *actions_model.ActionRunJob, vars map[string]string) (bool, error) {
|
||||||
|
if len(actionRunJob.RawConcurrencyGroup) == 0 {
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
if err := actionRunJob.LoadRun(ctx); err != nil {
|
||||||
|
return false, err
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(actionRunJob.ConcurrencyGroup) == 0 {
|
||||||
|
// empty concurrency group means the raw concurrency has not been evaluated
|
||||||
|
task, err := actions_model.GetTaskByID(ctx, actionRunJob.TaskID)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("get task by id: %w", err)
|
||||||
|
}
|
||||||
|
taskNeeds, err := FindTaskNeeds(ctx, task)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("find task needs: %w", err)
|
||||||
|
}
|
||||||
|
jobResults := make(map[string]*jobparser.JobResult, len(taskNeeds))
|
||||||
|
for jobID, taskNeed := range taskNeeds {
|
||||||
|
jobResult := &jobparser.JobResult{
|
||||||
|
Result: taskNeed.Result.String(),
|
||||||
|
Outputs: taskNeed.Outputs,
|
||||||
|
}
|
||||||
|
jobResults[jobID] = jobResult
|
||||||
|
}
|
||||||
|
|
||||||
|
actionRunJob.ConcurrencyGroup, actionRunJob.ConcurrencyCancel, err = evaluateJobConcurrency(actionRunJob.Run, actionRunJob, vars, jobResults)
|
||||||
|
if err != nil {
|
||||||
|
return false, fmt.Errorf("evaluate job concurrency: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := actions_model.UpdateRunJob(ctx, &actions_model.ActionRunJob{
|
||||||
|
ID: actionRunJob.ID,
|
||||||
|
ConcurrencyGroup: actionRunJob.ConcurrencyGroup,
|
||||||
|
ConcurrencyCancel: actionRunJob.ConcurrencyCancel,
|
||||||
|
}, nil); err != nil {
|
||||||
|
return false, fmt.Errorf("update run job: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(actionRunJob.ConcurrencyGroup) == 0 {
|
||||||
|
// the job should not be blocked by concurrency if its concurrency group is empty
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if actionRunJob.ConcurrencyCancel {
|
||||||
|
if err := actions_model.CancelConcurrentJobs(ctx, actionRunJob); err != nil {
|
||||||
|
return false, fmt.Errorf("cancel concurrent jobs: %w", err)
|
||||||
|
}
|
||||||
|
return false, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
return actions_model.ShouldJobBeBlockedByConcurrentJobs(ctx, actionRunJob)
|
||||||
|
}
|
||||||
|
@ -4,6 +4,7 @@
|
|||||||
package actions
|
package actions
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"context"
|
||||||
"testing"
|
"testing"
|
||||||
|
|
||||||
actions_model "code.gitea.io/gitea/models/actions"
|
actions_model "code.gitea.io/gitea/models/actions"
|
||||||
@ -129,8 +130,8 @@ jobs:
|
|||||||
}
|
}
|
||||||
for _, tt := range tests {
|
for _, tt := range tests {
|
||||||
t.Run(tt.name, func(t *testing.T) {
|
t.Run(tt.name, func(t *testing.T) {
|
||||||
r := newJobStatusResolver(tt.jobs)
|
r := newJobStatusResolver(tt.jobs, nil)
|
||||||
assert.Equal(t, tt.want, r.Resolve())
|
assert.Equal(t, tt.want, r.Resolve(context.Background()))
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -332,27 +332,27 @@ func handleWorkflows(
|
|||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
|
wfRawConcurrency, err := jobparser.ReadWorkflowRawConcurrency(dwf.Content)
|
||||||
|
if err != nil {
|
||||||
|
log.Error("ReadWorkflowRawConcurrency: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if wfRawConcurrency != nil {
|
||||||
|
wfGitCtx := jobparser.ToGitContext(GenerateGitContext(run, nil))
|
||||||
|
wfConcurrencyGroup, wfConcurrencyCancel := jobparser.EvaluateWorkflowConcurrency(wfRawConcurrency, wfGitCtx, vars)
|
||||||
|
if len(wfConcurrencyGroup) > 0 {
|
||||||
|
run.ConcurrencyGroup = wfConcurrencyGroup
|
||||||
|
run.ConcurrencyCancel = wfConcurrencyCancel
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars))
|
jobs, err := jobparser.Parse(dwf.Content, jobparser.WithVars(vars))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
log.Error("jobparser.Parse: %v", err)
|
log.Error("jobparser.Parse: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
// cancel running jobs if the event is push or pull_request_sync
|
if err := InsertRun(ctx, run, jobs); err != nil {
|
||||||
if run.Event == webhook_module.HookEventPush ||
|
|
||||||
run.Event == webhook_module.HookEventPullRequestSync {
|
|
||||||
if err := actions_model.CancelPreviousJobs(
|
|
||||||
ctx,
|
|
||||||
run.RepoID,
|
|
||||||
run.Ref,
|
|
||||||
run.WorkflowID,
|
|
||||||
run.Event,
|
|
||||||
); err != nil {
|
|
||||||
log.Error("CancelPreviousJobs: %v", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := actions_model.InsertRun(ctx, run, jobs); err != nil {
|
|
||||||
log.Error("InsertRun: %v", err)
|
log.Error("InsertRun: %v", err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
160
services/actions/run.go
Normal file
160
services/actions/run.go
Normal file
@ -0,0 +1,160 @@
|
|||||||
|
// Copyright 2024 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
package actions
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
actions_model "code.gitea.io/gitea/models/actions"
|
||||||
|
"code.gitea.io/gitea/models/db"
|
||||||
|
repo_model "code.gitea.io/gitea/models/repo"
|
||||||
|
"code.gitea.io/gitea/modules/util"
|
||||||
|
|
||||||
|
"github.com/nektos/act/pkg/jobparser"
|
||||||
|
)
|
||||||
|
|
||||||
|
// InsertRun inserts a run
|
||||||
|
// The title will be cut off at 255 characters if it's longer than 255 characters.
|
||||||
|
func InsertRun(ctx context.Context, run *actions_model.ActionRun, jobs []*jobparser.SingleWorkflow) error {
|
||||||
|
ctx, committer, err := db.TxContext(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
defer committer.Close()
|
||||||
|
|
||||||
|
index, err := db.GetNextResourceIndex(ctx, "action_run_index", run.RepoID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
run.Index = index
|
||||||
|
run.Title, _ = util.SplitStringAtByteN(run.Title, 255)
|
||||||
|
|
||||||
|
// check workflow concurrency
|
||||||
|
if len(run.ConcurrencyGroup) > 0 {
|
||||||
|
if run.ConcurrencyCancel {
|
||||||
|
if err := actions_model.CancelPreviousJobsWithOpts(ctx, &actions_model.FindRunOptions{
|
||||||
|
RepoID: run.RepoID,
|
||||||
|
ConcurrencyGroup: run.ConcurrencyGroup,
|
||||||
|
Status: []actions_model.Status{
|
||||||
|
actions_model.StatusRunning,
|
||||||
|
actions_model.StatusWaiting,
|
||||||
|
actions_model.StatusBlocked,
|
||||||
|
},
|
||||||
|
}); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
concurrentRunsNum, err := db.Count[actions_model.ActionRun](ctx, &actions_model.FindRunOptions{
|
||||||
|
RepoID: run.RepoID,
|
||||||
|
ConcurrencyGroup: run.ConcurrencyGroup,
|
||||||
|
Status: []actions_model.Status{actions_model.StatusWaiting, actions_model.StatusRunning},
|
||||||
|
})
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if concurrentRunsNum > 0 {
|
||||||
|
run.Status = actions_model.StatusBlocked
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.Insert(ctx, run); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if run.Repo == nil {
|
||||||
|
repo, err := repo_model.GetRepositoryByID(ctx, run.RepoID)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
run.Repo = repo
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := actions_model.UpdateRepoRunsNumbers(ctx, run.Repo); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// query vars for evaluating job concurrency groups
|
||||||
|
vars, err := actions_model.GetVariablesOfRun(ctx, run)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("get run %d variables: %w", run.ID, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
runJobs := make([]*actions_model.ActionRunJob, 0, len(jobs))
|
||||||
|
var hasWaiting bool
|
||||||
|
for _, v := range jobs {
|
||||||
|
id, job := v.Job()
|
||||||
|
needs := job.Needs()
|
||||||
|
if err := v.SetJob(id, job.EraseNeeds()); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
payload, _ := v.Marshal()
|
||||||
|
status := actions_model.StatusWaiting
|
||||||
|
if len(needs) > 0 || run.NeedApproval || run.Status == actions_model.StatusBlocked {
|
||||||
|
status = actions_model.StatusBlocked
|
||||||
|
} else {
|
||||||
|
hasWaiting = true
|
||||||
|
}
|
||||||
|
job.Name, _ = util.SplitStringAtByteN(job.Name, 255)
|
||||||
|
runJob := &actions_model.ActionRunJob{
|
||||||
|
RunID: run.ID,
|
||||||
|
RepoID: run.RepoID,
|
||||||
|
OwnerID: run.OwnerID,
|
||||||
|
CommitSHA: run.CommitSHA,
|
||||||
|
IsForkPullRequest: run.IsForkPullRequest,
|
||||||
|
Name: job.Name,
|
||||||
|
WorkflowPayload: payload,
|
||||||
|
JobID: id,
|
||||||
|
Needs: needs,
|
||||||
|
RunsOn: job.RunsOn(),
|
||||||
|
Status: status,
|
||||||
|
}
|
||||||
|
|
||||||
|
// check job concurrency
|
||||||
|
if job.RawConcurrency != nil && len(job.RawConcurrency.Group) > 0 {
|
||||||
|
runJob.RawConcurrencyGroup = job.RawConcurrency.Group
|
||||||
|
runJob.RawConcurrencyCancel = job.RawConcurrency.CancelInProgress
|
||||||
|
// we do not need to evaluate job concurrency if the job is blocked
|
||||||
|
// because it will be checked by job emitter
|
||||||
|
if runJob.Status != actions_model.StatusBlocked {
|
||||||
|
var err error
|
||||||
|
runJob.ConcurrencyGroup, runJob.ConcurrencyCancel, err = evaluateJobConcurrency(run, runJob, vars, map[string]*jobparser.JobResult{})
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("evaluate job concurrency: %w", err)
|
||||||
|
}
|
||||||
|
if len(runJob.ConcurrencyGroup) > 0 {
|
||||||
|
// check if the job should be blocked by job concurrency
|
||||||
|
shouldBlock, err := actions_model.ShouldJobBeBlockedByConcurrentJobs(ctx, runJob)
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
if shouldBlock {
|
||||||
|
runJob.Status = actions_model.StatusBlocked
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
runJobs = append(runJobs, runJob)
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := db.Insert(ctx, runJobs); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
run.Status = actions_model.AggregateJobStatus(runJobs)
|
||||||
|
if err := actions_model.UpdateRun(ctx, run, "status"); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
// if there is a job in the waiting status, increase tasks version.
|
||||||
|
if hasWaiting {
|
||||||
|
if err := actions_model.IncreaseTaskVersion(ctx, run.OwnerID, run.RepoID); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return committer.Commit()
|
||||||
|
}
|
@ -145,7 +145,7 @@ func CreateScheduleTask(ctx context.Context, cron *actions_model.ActionSchedule)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Insert the action run and its associated jobs into the database
|
// Insert the action run and its associated jobs into the database
|
||||||
if err := actions_model.InsertRun(ctx, run, workflows); err != nil {
|
if err := InsertRun(ctx, run, workflows); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
171
services/actions/utils.go
Normal file
171
services/actions/utils.go
Normal file
@ -0,0 +1,171 @@
|
|||||||
|
// Copyright 2024 The Gitea Authors. All rights reserved.
|
||||||
|
// SPDX-License-Identifier: MIT
|
||||||
|
|
||||||
|
package actions
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
actions_model "code.gitea.io/gitea/models/actions"
|
||||||
|
"code.gitea.io/gitea/models/db"
|
||||||
|
actions_module "code.gitea.io/gitea/modules/actions"
|
||||||
|
"code.gitea.io/gitea/modules/container"
|
||||||
|
"code.gitea.io/gitea/modules/git"
|
||||||
|
"code.gitea.io/gitea/modules/json"
|
||||||
|
"code.gitea.io/gitea/modules/setting"
|
||||||
|
)
|
||||||
|
|
||||||
|
// GenerateGitContext generate the git context without token and gitea_runtime_token
|
||||||
|
// job can be nil when generating context for interpolating workflow-level expressions
|
||||||
|
func GenerateGitContext(run *actions_model.ActionRun, job *actions_model.ActionRunJob) map[string]any {
|
||||||
|
event := map[string]any{}
|
||||||
|
_ = json.Unmarshal([]byte(run.EventPayload), &event)
|
||||||
|
|
||||||
|
// TriggerEvent is added in https://github.com/go-gitea/gitea/pull/25229
|
||||||
|
// This fallback is for the old ActionRun that doesn't have the TriggerEvent field
|
||||||
|
// and should be removed in 1.22
|
||||||
|
eventName := run.TriggerEvent
|
||||||
|
if eventName == "" {
|
||||||
|
eventName = run.Event.Event()
|
||||||
|
}
|
||||||
|
|
||||||
|
baseRef := ""
|
||||||
|
headRef := ""
|
||||||
|
ref := run.Ref
|
||||||
|
sha := run.CommitSHA
|
||||||
|
if pullPayload, err := run.GetPullRequestEventPayload(); err == nil && pullPayload.PullRequest != nil && pullPayload.PullRequest.Base != nil && pullPayload.PullRequest.Head != nil {
|
||||||
|
baseRef = pullPayload.PullRequest.Base.Ref
|
||||||
|
headRef = pullPayload.PullRequest.Head.Ref
|
||||||
|
|
||||||
|
// if the TriggerEvent is pull_request_target, ref and sha need to be set according to the base of pull request
|
||||||
|
// In GitHub's documentation, ref should be the branch or tag that triggered workflow. But when the TriggerEvent is pull_request_target,
|
||||||
|
// the ref will be the base branch.
|
||||||
|
if run.TriggerEvent == actions_module.GithubEventPullRequestTarget {
|
||||||
|
ref = git.BranchPrefix + pullPayload.PullRequest.Base.Name
|
||||||
|
sha = pullPayload.PullRequest.Base.Sha
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
refName := git.RefName(ref)
|
||||||
|
|
||||||
|
gitContext := map[string]any{
|
||||||
|
// standard contexts, see https://docs.github.com/en/actions/learn-github-actions/contexts#github-context
|
||||||
|
"action": "", // string, The name of the action currently running, or the id of a step. GitHub removes special characters, and uses the name __run when the current step runs a script without an id. If you use the same action more than once in the same job, the name will include a suffix with the sequence number with underscore before it. For example, the first script you run will have the name __run, and the second script will be named __run_2. Similarly, the second invocation of actions/checkout will be actionscheckout2.
|
||||||
|
"action_path": "", // string, The path where an action is located. This property is only supported in composite actions. You can use this path to access files located in the same repository as the action.
|
||||||
|
"action_ref": "", // string, For a step executing an action, this is the ref of the action being executed. For example, v2.
|
||||||
|
"action_repository": "", // string, For a step executing an action, this is the owner and repository name of the action. For example, actions/checkout.
|
||||||
|
"action_status": "", // string, For a composite action, the current result of the composite action.
|
||||||
|
"actor": run.TriggerUser.Name, // string, The username of the user that triggered the initial workflow run. If the workflow run is a re-run, this value may differ from github.triggering_actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges.
|
||||||
|
"api_url": setting.AppURL + "api/v1", // string, The URL of the GitHub REST API.
|
||||||
|
"base_ref": baseRef, // string, The base_ref or target branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target.
|
||||||
|
"env": "", // string, Path on the runner to the file that sets environment variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions."
|
||||||
|
"event": event, // object, The full event webhook payload. You can access individual properties of the event using this context. This object is identical to the webhook payload of the event that triggered the workflow run, and is different for each event. The webhooks for each GitHub Actions event is linked in "Events that trigger workflows." For example, for a workflow run triggered by the push event, this object contains the contents of the push webhook payload.
|
||||||
|
"event_name": eventName, // string, The name of the event that triggered the workflow run.
|
||||||
|
"event_path": "", // string, The path to the file on the runner that contains the full event webhook payload.
|
||||||
|
"graphql_url": "", // string, The URL of the GitHub GraphQL API.
|
||||||
|
"head_ref": headRef, // string, The head_ref or source branch of the pull request in a workflow run. This property is only available when the event that triggers a workflow run is either pull_request or pull_request_target.
|
||||||
|
"job": "", // string, The job_id of the current job.
|
||||||
|
"ref": ref, // string, The fully-formed ref of the branch or tag that triggered the workflow run. For workflows triggered by push, this is the branch or tag ref that was pushed. For workflows triggered by pull_request, this is the pull request merge branch. For workflows triggered by release, this is the release tag created. For other triggers, this is the branch or tag ref that triggered the workflow run. This is only set if a branch or tag is available for the event type. The ref given is fully-formed, meaning that for branches the format is refs/heads/<branch_name>, for pull requests it is refs/pull/<pr_number>/merge, and for tags it is refs/tags/<tag_name>. For example, refs/heads/feature-branch-1.
|
||||||
|
"ref_name": refName.ShortName(), // string, The short ref name of the branch or tag that triggered the workflow run. This value matches the branch or tag name shown on GitHub. For example, feature-branch-1.
|
||||||
|
"ref_protected": false, // boolean, true if branch protections are configured for the ref that triggered the workflow run.
|
||||||
|
"ref_type": refName.RefType(), // string, The type of ref that triggered the workflow run. Valid values are branch or tag.
|
||||||
|
"path": "", // string, Path on the runner to the file that sets system PATH variables from workflow commands. This file is unique to the current step and is a different file for each step in a job. For more information, see "Workflow commands for GitHub Actions."
|
||||||
|
"repository": run.Repo.OwnerName + "/" + run.Repo.Name, // string, The owner and repository name. For example, Codertocat/Hello-World.
|
||||||
|
"repository_owner": run.Repo.OwnerName, // string, The repository owner's name. For example, Codertocat.
|
||||||
|
"repositoryUrl": run.Repo.HTMLURL(), // string, The Git URL to the repository. For example, git://github.com/codertocat/hello-world.git.
|
||||||
|
"retention_days": "", // string, The number of days that workflow run logs and artifacts are kept.
|
||||||
|
"run_id": "", // string, A unique number for each workflow run within a repository. This number does not change if you re-run the workflow run.
|
||||||
|
"run_number": fmt.Sprint(run.Index), // string, A unique number for each run of a particular workflow in a repository. This number begins at 1 for the workflow's first run, and increments with each new run. This number does not change if you re-run the workflow run.
|
||||||
|
"run_attempt": "", // string, A unique number for each attempt of a particular workflow run in a repository. This number begins at 1 for the workflow run's first attempt, and increments with each re-run.
|
||||||
|
"secret_source": "Actions", // string, The source of a secret used in a workflow. Possible values are None, Actions, Dependabot, or Codespaces.
|
||||||
|
"server_url": setting.AppURL, // string, The URL of the GitHub server. For example: https://github.com.
|
||||||
|
"sha": sha, // string, The commit SHA that triggered the workflow. The value of this commit SHA depends on the event that triggered the workflow. For more information, see "Events that trigger workflows." For example, ffac537e6cbbf934b08745a378932722df287a53.
|
||||||
|
"triggering_actor": "", // string, The username of the user that initiated the workflow run. If the workflow run is a re-run, this value may differ from github.actor. Any workflow re-runs will use the privileges of github.actor, even if the actor initiating the re-run (github.triggering_actor) has different privileges.
|
||||||
|
"workflow": run.WorkflowID, // string, The name of the workflow. If the workflow file doesn't specify a name, the value of this property is the full path of the workflow file in the repository.
|
||||||
|
"workspace": "", // string, The default working directory on the runner for steps, and the default location of your repository when using the checkout action.
|
||||||
|
|
||||||
|
// additional contexts
|
||||||
|
"gitea_default_actions_url": setting.Actions.DefaultActionsURL.URL(),
|
||||||
|
}
|
||||||
|
|
||||||
|
if job != nil {
|
||||||
|
gitContext["job"] = job.JobID
|
||||||
|
gitContext["run_id"] = job.RunID
|
||||||
|
gitContext["run_attempt"] = job.Attempt
|
||||||
|
}
|
||||||
|
|
||||||
|
return gitContext
|
||||||
|
}
|
||||||
|
|
||||||
|
type TaskNeed struct {
|
||||||
|
Result actions_model.Status
|
||||||
|
Outputs map[string]string
|
||||||
|
}
|
||||||
|
|
||||||
|
func FindTaskNeeds(ctx context.Context, task *actions_model.ActionTask) (map[string]*TaskNeed, error) {
|
||||||
|
if err := task.LoadAttributes(ctx); err != nil {
|
||||||
|
return nil, fmt.Errorf("LoadAttributes: %w", err)
|
||||||
|
}
|
||||||
|
if len(task.Job.Needs) == 0 {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
needs := container.SetOf(task.Job.Needs...)
|
||||||
|
|
||||||
|
jobs, err := db.Find[actions_model.ActionRunJob](ctx, actions_model.FindRunJobOptions{RunID: task.Job.RunID})
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("FindRunJobs: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
jobIDJobs := make(map[string][]*actions_model.ActionRunJob)
|
||||||
|
for _, job := range jobs {
|
||||||
|
jobIDJobs[job.JobID] = append(jobIDJobs[job.JobID], job)
|
||||||
|
}
|
||||||
|
|
||||||
|
ret := make(map[string]*TaskNeed, len(needs))
|
||||||
|
for jobID, jobsWithSameID := range jobIDJobs {
|
||||||
|
if !needs.Contains(jobID) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
var jobOutputs map[string]string
|
||||||
|
for _, job := range jobsWithSameID {
|
||||||
|
if job.TaskID == 0 || !job.Status.IsDone() {
|
||||||
|
// it shouldn't happen, or the job has been rerun
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
got, err := actions_model.FindTaskOutputByTaskID(ctx, job.TaskID)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("FindTaskOutputByTaskID: %w", err)
|
||||||
|
}
|
||||||
|
outputs := make(map[string]string, len(got))
|
||||||
|
for _, v := range got {
|
||||||
|
outputs[v.OutputKey] = v.OutputValue
|
||||||
|
}
|
||||||
|
if len(jobOutputs) == 0 {
|
||||||
|
jobOutputs = outputs
|
||||||
|
} else {
|
||||||
|
jobOutputs = mergeTwoOutputs(outputs, jobOutputs)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
ret[jobID] = &TaskNeed{
|
||||||
|
Outputs: jobOutputs,
|
||||||
|
Result: actions_model.AggregateJobStatus(jobsWithSameID),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// mergeTwoOutputs merges two outputs from two different ActionRunJobs
|
||||||
|
// Values with the same output name may be overridden. The user should ensure the output names are unique.
|
||||||
|
// See https://docs.github.com/en/actions/writing-workflows/workflow-syntax-for-github-actions#using-job-outputs-in-a-matrix-job
|
||||||
|
func mergeTwoOutputs(o1, o2 map[string]string) map[string]string {
|
||||||
|
ret := make(map[string]string, len(o1))
|
||||||
|
for k1, v1 := range o1 {
|
||||||
|
if len(v1) > 0 {
|
||||||
|
ret[k1] = v1
|
||||||
|
} else {
|
||||||
|
ret[k1] = o2[k1]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ret
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user