1
0
mirror of https://github.com/go-task/task.git synced 2024-12-12 10:45:49 +02:00

Adding a --concurrency (-C) flag

This commit is contained in:
Ross Hammermeister 2020-06-12 12:09:53 -06:00 committed by Andrey Nering
parent f0cd7d27fb
commit c6ecf70377
6 changed files with 119 additions and 22 deletions

View File

@ -1,5 +1,11 @@
# Changelog # Changelog
## Unreleased
- Add a `--concurrency` (alias `-C`) flag, to limit the number of tasks that
run concurrently. This is useful for heavy workloads.
([#345](https://github.com/go-task/task/pull/345)).
## v3.2.2 - 2021-01-12 ## v3.2.2 - 2021-01-12
- Improve performance of `--list` and `--summary` by skipping running shell - Improve performance of `--list` and `--summary` by skipping running shell

View File

@ -65,6 +65,7 @@ func main() {
dry bool dry bool
summary bool summary bool
parallel bool parallel bool
concurrency int
dir string dir string
entrypoint string entrypoint string
output string output string
@ -87,6 +88,7 @@ func main() {
pflag.StringVarP(&entrypoint, "taskfile", "t", "", `choose which Taskfile to run. Defaults to "Taskfile.yml"`) pflag.StringVarP(&entrypoint, "taskfile", "t", "", `choose which Taskfile to run. Defaults to "Taskfile.yml"`)
pflag.StringVarP(&output, "output", "o", "", "sets output style: [interleaved|group|prefixed]") pflag.StringVarP(&output, "output", "o", "", "sets output style: [interleaved|group|prefixed]")
pflag.BoolVarP(&color, "color", "c", true, "colored output") pflag.BoolVarP(&color, "color", "c", true, "colored output")
pflag.IntVarP(&concurrency, "concurrency", "C", 0, "limit number tasks to run concurrently")
pflag.Parse() pflag.Parse()
if versionFlag { if versionFlag {
@ -122,16 +124,17 @@ func main() {
} }
e := task.Executor{ e := task.Executor{
Force: force, Force: force,
Watch: watch, Watch: watch,
Verbose: verbose, Verbose: verbose,
Silent: silent, Silent: silent,
Dir: dir, Dir: dir,
Dry: dry, Dry: dry,
Entrypoint: entrypoint, Entrypoint: entrypoint,
Summary: summary, Summary: summary,
Parallel: parallel, Parallel: parallel,
Color: color, Color: color,
Concurrency: concurrency,
Stdin: os.Stdin, Stdin: os.Stdin,
Stdout: os.Stdout, Stdout: os.Stdout,

25
concurrency.go Normal file
View File

@ -0,0 +1,25 @@
package task
func (e *Executor) acquireConcurrencyLimit() func() {
if e.concurrencySemaphore == nil {
return emptyFunc
}
e.concurrencySemaphore <- struct{}{}
return func() {
<-e.concurrencySemaphore
}
}
func (e *Executor) releaseConcurrencyLimit() func() {
if e.concurrencySemaphore == nil {
return emptyFunc
}
<-e.concurrencySemaphore
return func() {
e.concurrencySemaphore <- struct{}{}
}
}
func emptyFunc() {}

39
task.go
View File

@ -32,16 +32,17 @@ const (
type Executor struct { type Executor struct {
Taskfile *taskfile.Taskfile Taskfile *taskfile.Taskfile
Dir string Dir string
Entrypoint string Entrypoint string
Force bool Force bool
Watch bool Watch bool
Verbose bool Verbose bool
Silent bool Silent bool
Dry bool Dry bool
Summary bool Summary bool
Parallel bool Parallel bool
Color bool Color bool
Concurrency int
Stdin io.Reader Stdin io.Reader
Stdout io.Writer Stdout io.Writer
@ -54,8 +55,9 @@ type Executor struct {
taskvars *taskfile.Vars taskvars *taskfile.Vars
taskCallCount map[string]*int32 concurrencySemaphore chan struct{}
mkdirMutexMap map[string]*sync.Mutex taskCallCount map[string]*int32
mkdirMutexMap map[string]*sync.Mutex
} }
// Run runs Task // Run runs Task
@ -247,6 +249,10 @@ func (e *Executor) Setup() error {
e.taskCallCount[k] = new(int32) e.taskCallCount[k] = new(int32)
e.mkdirMutexMap[k] = &sync.Mutex{} e.mkdirMutexMap[k] = &sync.Mutex{}
} }
if e.Concurrency > 0 {
e.concurrencySemaphore = make(chan struct{}, e.Concurrency)
}
return nil return nil
} }
@ -260,6 +266,9 @@ func (e *Executor) RunTask(ctx context.Context, call taskfile.Call) error {
return &MaximumTaskCallExceededError{task: call.Task} return &MaximumTaskCallExceededError{task: call.Task}
} }
release := e.acquireConcurrencyLimit()
defer release()
if err := e.runDeps(ctx, t); err != nil { if err := e.runDeps(ctx, t); err != nil {
return err return err
} }
@ -324,6 +333,9 @@ func (e *Executor) mkdir(t *taskfile.Task) error {
func (e *Executor) runDeps(ctx context.Context, t *taskfile.Task) error { func (e *Executor) runDeps(ctx context.Context, t *taskfile.Task) error {
g, ctx := errgroup.WithContext(ctx) g, ctx := errgroup.WithContext(ctx)
reacquire := e.releaseConcurrencyLimit()
defer reacquire()
for _, d := range t.Deps { for _, d := range t.Deps {
d := d d := d
@ -344,6 +356,9 @@ func (e *Executor) runCommand(ctx context.Context, t *taskfile.Task, call taskfi
switch { switch {
case cmd.Task != "": case cmd.Task != "":
reacquire := e.releaseConcurrencyLimit()
defer reacquire()
err := e.RunTask(ctx, taskfile.Call{Task: cmd.Task, Vars: cmd.Vars}) err := e.RunTask(ctx, taskfile.Call{Task: cmd.Task, Vars: cmd.Vars})
if err != nil { if err != nil {
return err return err

View File

@ -171,6 +171,22 @@ func TestVarsInvalidTmpl(t *testing.T) {
assert.EqualError(t, e.Run(context.Background(), taskfile.Call{Task: target}), expectError, "e.Run(target)") assert.EqualError(t, e.Run(context.Background(), taskfile.Call{Task: target}), expectError, "e.Run(target)")
} }
func TestConcurrency(t *testing.T) {
const (
dir = "testdata/concurrency"
target = "default"
)
e := &task.Executor{
Dir: dir,
Stdout: ioutil.Discard,
Stderr: ioutil.Discard,
Concurrency: 1,
}
assert.NoError(t, e.Setup(), "e.Setup()")
assert.NoError(t, e.Run(context.Background(), taskfile.Call{Task: target}), "e.Run(target)")
}
func TestParams(t *testing.T) { func TestParams(t *testing.T) {
tt := fileContentTest{ tt := fileContentTest{
Dir: "testdata/params", Dir: "testdata/params",

32
testdata/concurrency/Taskfile.yml vendored Normal file
View File

@ -0,0 +1,32 @@
version: '2'
tasks:
default:
deps:
- t1
t1:
deps:
- t3
- t4
cmds:
- task: t2
- echo done 1
t2:
deps:
- t5
- t6
cmds:
- echo done 2
t3:
cmds:
- echo done 3
t4:
cmds:
- echo done 4
t5:
cmds:
- echo done 5
t6:
cmds:
- echo done 6