diff --git a/internal/http/http.go b/internal/http/http.go index 77a1bb4e8..4c0721e1a 100644 --- a/internal/http/http.go +++ b/internal/http/http.go @@ -13,11 +13,11 @@ import ( "github.com/apex/log" "github.com/pkg/errors" - "golang.org/x/sync/errgroup" "github.com/goreleaser/goreleaser/config" "github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/internal/artifact" + "github.com/goreleaser/goreleaser/internal/semerrgroup" "github.com/goreleaser/goreleaser/pipeline" ) @@ -157,15 +157,10 @@ func Upload(ctx *context.Context, puts []config.Put, kind string, check Response } func runPipeByFilter(ctx *context.Context, put config.Put, filter artifact.Filter, kind string, check ResponseChecker) error { - sem := make(chan bool, ctx.Parallelism) - var g errgroup.Group + var g = semerrgroup.New(ctx.Parallelism) for _, artifact := range ctx.Artifacts.Filter(filter).List() { - sem <- true artifact := artifact g.Go(func() error { - defer func() { - <-sem - }() return uploadAsset(ctx, put, artifact, kind, check) }) } diff --git a/internal/semerrgroup/sem.go b/internal/semerrgroup/sem.go new file mode 100644 index 000000000..e2855c401 --- /dev/null +++ b/internal/semerrgroup/sem.go @@ -0,0 +1,35 @@ +// Package semerrgroup wraps a error group with a semaphore with configurable +// size, so you can control the number of tasks being executed simultaneously. +package semerrgroup + +import "golang.org/x/sync/errgroup" + +// Group is the Semphore ErrorGroup itself +type Group struct { + ch chan bool + g errgroup.Group +} + +// New returns a new Group of a given size. +func New(size int) *Group { + return &Group{ + ch: make(chan bool, size), + g: errgroup.Group{}, + } +} + +// Go execs one function respecting the group and semaphore. +func (s *Group) Go(fn func() error) { + s.g.Go(func() error { + s.ch <- true + defer func() { + <-s.ch + }() + return fn() + }) +} + +// Wait waits for the group to complete and return an error if any. +func (s *Group) Wait() error { + return s.g.Wait() +} diff --git a/internal/semerrgroup/sem_test.go b/internal/semerrgroup/sem_test.go new file mode 100644 index 000000000..aa8a5f98a --- /dev/null +++ b/internal/semerrgroup/sem_test.go @@ -0,0 +1,26 @@ +package semerrgroup + +import ( + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" +) + +func TestSemaphore(t *testing.T) { + var g = New(4) + var lock sync.Mutex + var counter int + for i := 0; i < 10; i++ { + g.Go(func() error { + time.Sleep(10 * time.Millisecond) + lock.Lock() + counter++ + lock.Unlock() + return nil + }) + } + require.NoError(t, g.Wait()) + require.Equal(t, counter, 10) +} diff --git a/pipeline/build/build.go b/pipeline/build/build.go index 9e73e428b..49373d6ec 100644 --- a/pipeline/build/build.go +++ b/pipeline/build/build.go @@ -10,7 +10,6 @@ import ( "github.com/apex/log" "github.com/pkg/errors" - "golang.org/x/sync/errgroup" builders "github.com/goreleaser/goreleaser/build" "github.com/goreleaser/goreleaser/config" @@ -18,6 +17,7 @@ import ( // langs to init _ "github.com/goreleaser/goreleaser/internal/builders/golang" + "github.com/goreleaser/goreleaser/internal/semerrgroup" "github.com/goreleaser/goreleaser/internal/tmpl" ) @@ -69,16 +69,11 @@ func runPipeOnBuild(ctx *context.Context, build config.Build) error { if err := runHook(ctx, build.Env, build.Hooks.Pre); err != nil { return errors.Wrap(err, "pre hook failed") } - sem := make(chan bool, ctx.Parallelism) - var g errgroup.Group + var g = semerrgroup.New(ctx.Parallelism) for _, target := range build.Targets { - sem <- true target := target build := build g.Go(func() error { - defer func() { - <-sem - }() return doBuild(ctx, build, target) }) } diff --git a/pipeline/checksums/checksums.go b/pipeline/checksums/checksums.go index d985fac82..3b9b88389 100644 --- a/pipeline/checksums/checksums.go +++ b/pipeline/checksums/checksums.go @@ -8,11 +8,11 @@ import ( "path/filepath" "github.com/apex/log" - "golang.org/x/sync/errgroup" "github.com/goreleaser/goreleaser/checksum" "github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/internal/artifact" + "github.com/goreleaser/goreleaser/internal/semerrgroup" "github.com/goreleaser/goreleaser/internal/tmpl" ) @@ -47,8 +47,7 @@ func (Pipe) Run(ctx *context.Context) (err error) { } defer file.Close() // nolint: errcheck - var g errgroup.Group - var semaphore = make(chan bool, ctx.Parallelism) + var g = semerrgroup.New(ctx.Parallelism) for _, artifact := range ctx.Artifacts.Filter( artifact.Or( artifact.ByType(artifact.UploadableArchive), @@ -56,12 +55,8 @@ func (Pipe) Run(ctx *context.Context) (err error) { artifact.ByType(artifact.LinuxPackage), ), ).List() { - semaphore <- true artifact := artifact g.Go(func() error { - defer func() { - <-semaphore - }() return checksums(file, artifact) }) } diff --git a/pipeline/docker/docker.go b/pipeline/docker/docker.go index af9bd7e61..9bb70a4cd 100644 --- a/pipeline/docker/docker.go +++ b/pipeline/docker/docker.go @@ -10,12 +10,12 @@ import ( "github.com/apex/log" "github.com/pkg/errors" - "golang.org/x/sync/errgroup" "github.com/goreleaser/goreleaser/config" "github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/deprecate" + "github.com/goreleaser/goreleaser/internal/semerrgroup" "github.com/goreleaser/goreleaser/internal/tmpl" "github.com/goreleaser/goreleaser/pipeline" ) @@ -78,16 +78,11 @@ func (Pipe) Run(ctx *context.Context) error { } func doRun(ctx *context.Context) error { - var g errgroup.Group - sem := make(chan bool, ctx.Parallelism) + var g = semerrgroup.New(ctx.Parallelism) for i, docker := range ctx.Config.Dockers { docker := docker seed := i - sem <- true g.Go(func() error { - defer func() { - <-sem - }() log.WithField("docker", docker).Debug("looking for binaries matching") var binaries = ctx.Artifacts.Filter( artifact.And( diff --git a/pipeline/nfpm/nfpm.go b/pipeline/nfpm/nfpm.go index 4ae7dcb6d..1ef491fa9 100644 --- a/pipeline/nfpm/nfpm.go +++ b/pipeline/nfpm/nfpm.go @@ -5,8 +5,6 @@ import ( "os" "path/filepath" - "golang.org/x/sync/errgroup" - "github.com/apex/log" "github.com/goreleaser/nfpm" "github.com/imdario/mergo" @@ -21,6 +19,7 @@ import ( "github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/linux" + "github.com/goreleaser/goreleaser/internal/semerrgroup" "github.com/goreleaser/goreleaser/internal/tmpl" "github.com/goreleaser/goreleaser/pipeline" ) @@ -62,18 +61,13 @@ func doRun(ctx *context.Context) error { artifact.ByType(artifact.Binary), artifact.ByGoos("linux"), )).GroupByPlatform() - var g errgroup.Group - sem := make(chan bool, ctx.Parallelism) + var g = semerrgroup.New(ctx.Parallelism) for _, format := range ctx.Config.NFPM.Formats { for platform, artifacts := range linuxBinaries { - sem <- true format := format arch := linux.Arch(platform) artifacts := artifacts g.Go(func() error { - defer func() { - <-sem - }() return create(ctx, format, arch, artifacts) }) } diff --git a/pipeline/release/release.go b/pipeline/release/release.go index b344556da..c97b63f4f 100644 --- a/pipeline/release/release.go +++ b/pipeline/release/release.go @@ -4,11 +4,11 @@ import ( "os" "github.com/apex/log" - "golang.org/x/sync/errgroup" "github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/client" + "github.com/goreleaser/goreleaser/internal/semerrgroup" "github.com/goreleaser/goreleaser/pipeline" ) @@ -65,8 +65,7 @@ func doRun(ctx *context.Context, c client.Client) error { if err != nil { return err } - var g errgroup.Group - sem := make(chan bool, ctx.Parallelism) + var g = semerrgroup.New(ctx.Parallelism) for _, artifact := range ctx.Artifacts.Filter( artifact.Or( artifact.ByType(artifact.UploadableArchive), @@ -76,12 +75,8 @@ func doRun(ctx *context.Context, c client.Client) error { artifact.ByType(artifact.LinuxPackage), ), ).List() { - sem <- true artifact := artifact g.Go(func() error { - defer func() { - <-sem - }() return upload(ctx, c, releaseID, artifact) }) } diff --git a/pipeline/s3/s3.go b/pipeline/s3/s3.go index 1ef2a65f1..fbadfb671 100644 --- a/pipeline/s3/s3.go +++ b/pipeline/s3/s3.go @@ -13,8 +13,8 @@ import ( "github.com/goreleaser/goreleaser/config" "github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/internal/artifact" + "github.com/goreleaser/goreleaser/internal/semerrgroup" "github.com/goreleaser/goreleaser/internal/tmpl" - "golang.org/x/sync/errgroup" ) // Pipe for Artifactory @@ -44,15 +44,10 @@ func (Pipe) Default(ctx *context.Context) error { // Run the pipe func (Pipe) Run(ctx *context.Context) error { - var g errgroup.Group - sem := make(chan bool, ctx.Parallelism) + var g = semerrgroup.New(ctx.Parallelism) for _, conf := range ctx.Config.S3 { conf := conf - sem <- true g.Go(func() error { - defer func() { - <-sem - }() return upload(ctx, conf) }) } @@ -79,8 +74,7 @@ func upload(ctx *context.Context, conf config.S3) error { return err } - var g errgroup.Group - sem := make(chan bool, ctx.Parallelism) + var g = semerrgroup.New(ctx.Parallelism) for _, artifact := range ctx.Artifacts.Filter( artifact.Or( artifact.ByType(artifact.UploadableArchive), @@ -90,12 +84,8 @@ func upload(ctx *context.Context, conf config.S3) error { artifact.ByType(artifact.LinuxPackage), ), ).List() { - sem <- true artifact := artifact g.Go(func() error { - defer func() { - <-sem - }() f, err := os.Open(artifact.Path) if err != nil { return err diff --git a/pipeline/snapcraft/snapcraft.go b/pipeline/snapcraft/snapcraft.go index 0ad77d1fe..4afeeb992 100644 --- a/pipeline/snapcraft/snapcraft.go +++ b/pipeline/snapcraft/snapcraft.go @@ -11,12 +11,12 @@ import ( "strings" "github.com/apex/log" - "golang.org/x/sync/errgroup" yaml "gopkg.in/yaml.v2" "github.com/goreleaser/goreleaser/context" "github.com/goreleaser/goreleaser/internal/artifact" "github.com/goreleaser/goreleaser/internal/linux" + "github.com/goreleaser/goreleaser/internal/semerrgroup" "github.com/goreleaser/goreleaser/internal/tmpl" "github.com/goreleaser/goreleaser/pipeline" ) @@ -83,15 +83,13 @@ func (Pipe) Run(ctx *context.Context) error { return ErrNoSnapcraft } - var g errgroup.Group - sem := make(chan bool, ctx.Parallelism) + var g = semerrgroup.New(ctx.Parallelism) for platform, binaries := range ctx.Artifacts.Filter( artifact.And( artifact.ByGoos("linux"), artifact.ByType(artifact.Binary), ), ).GroupByPlatform() { - sem <- true arch := linux.Arch(platform) if arch == "armel" { log.WithField("arch", arch).Warn("ignored unsupported arch") @@ -99,9 +97,6 @@ func (Pipe) Run(ctx *context.Context) error { } binaries := binaries g.Go(func() error { - go func() { - <-sem - }() return create(ctx, arch, binaries) }) }