From 39a7dc2ad1195e7310fdcae74b4cab4b7c3795bc Mon Sep 17 00:00:00 2001 From: Carlos Alexandro Becker Date: Wed, 29 Apr 2020 22:35:48 -0300 Subject: [PATCH] fix: blob data race (#1480) Signed-off-by: Carlos Alexandro Becker --- internal/pipe/blob/blob.go | 6 +- internal/pipe/blob/upload.go | 106 ++++++++++++++++++++++------------- 2 files changed, 68 insertions(+), 44 deletions(-) diff --git a/internal/pipe/blob/blob.go b/internal/pipe/blob/blob.go index f19babfbd..50cdb1643 100644 --- a/internal/pipe/blob/blob.go +++ b/internal/pipe/blob/blob.go @@ -37,16 +37,12 @@ func (Pipe) Publish(ctx *context.Context) error { if len(ctx.Config.Blobs) == 0 { return pipe.Skip("Blob section is not configured") } - var up uploader = productionUploader{} - if ctx.SkipPublish { - up = skipUploader{} - } var g = semerrgroup.New(ctx.Parallelism) for _, conf := range ctx.Config.Blobs { conf := conf g.Go(func() error { - return doUpload(ctx, conf, up) + return doUpload(ctx, conf) }) } return g.Wait() diff --git a/internal/pipe/blob/upload.go b/internal/pipe/blob/upload.go index 816791594..266e08d74 100644 --- a/internal/pipe/blob/upload.go +++ b/internal/pipe/blob/upload.go @@ -2,6 +2,7 @@ package blob import ( "fmt" + "io" "io/ioutil" "net/url" "path/filepath" @@ -61,7 +62,7 @@ func urlFor(ctx *context.Context, conf config.Blob) (string, error) { // Takes goreleaser context(which includes artificats) and bucketURL for // upload to destination (eg: gs://gorelease-bucket) using the given uploader // implementation -func doUpload(ctx *context.Context, conf config.Blob, up uploader) error { +func doUpload(ctx *context.Context, conf config.Blob) error { folder, err := tmpl.New(ctx).Apply(conf.Folder) if err != nil { return err @@ -84,6 +85,12 @@ func doUpload(ctx *context.Context, conf config.Blob, up uploader) error { filter = artifact.And(filter, artifact.ByIDs(conf.IDs...)) } + var up = newUploader(ctx) + if err := up.Open(ctx, bucketURL); err != nil { + return handleError(err, bucketURL) + } + defer up.Close() + var g = semerrgroup.New(ctx.Parallelism) for _, artifact := range ctx.Artifacts.Filter(filter).List() { artifact := artifact @@ -94,25 +101,8 @@ func doUpload(ctx *context.Context, conf config.Blob, up uploader) error { return err } - if err := up.Upload(ctx, bucketURL, filepath.Join(folder, artifact.Name), data); err != nil { - switch { - case errorContains(err, "NoSuchBucket", "ContainerNotFound", "notFound"): - return errors.Wrapf(err, "provided bucket does not exist: %s", bucketURL) - case errorContains(err, "NoCredentialProviders"): - return errors.Wrapf(err, "check credentials and access to bucket: %s", bucketURL) - case errorContains(err, "InvalidAccessKeyId"): - return errors.Wrap(err, "aws access key id you provided does not exist in our records") - case errorContains(err, "AuthenticationFailed"): - return errors.Wrap(err, "azure storage key you provided is not valid") - case errorContains(err, "invalid_grant"): - return errors.Wrap(err, "google app credentials you provided is not valid") - case errorContains(err, "no such host"): - return errors.Wrap(err, "azure storage account you provided is not valid") - case errorContains(err, "ServiceCode=ResourceNotFound"): - return errors.Wrapf(err, "missing azure storage key for provided bucket %s", bucketURL) - default: - return errors.Wrap(err, "failed to write to bucket") - } + if err := up.Upload(ctx, filepath.Join(folder, artifact.Name), data); err != nil { + return handleError(err, bucketURL) } return err }) @@ -120,6 +110,34 @@ func doUpload(ctx *context.Context, conf config.Blob, up uploader) error { return g.Wait() } +func handleError(err error, url string) error { + switch { + case errorContains(err, "NoSuchBucket", "ContainerNotFound", "notFound"): + return errors.Wrapf(err, "provided bucket does not exist: %s", url) + case errorContains(err, "NoCredentialProviders"): + return errors.Wrapf(err, "check credentials and access to bucket: %s", url) + case errorContains(err, "InvalidAccessKeyId"): + return errors.Wrap(err, "aws access key id you provided does not exist in our records") + case errorContains(err, "AuthenticationFailed"): + return errors.Wrap(err, "azure storage key you provided is not valid") + case errorContains(err, "invalid_grant"): + return errors.Wrap(err, "google app credentials you provided is not valid") + case errorContains(err, "no such host"): + return errors.Wrap(err, "azure storage account you provided is not valid") + case errorContains(err, "ServiceCode=ResourceNotFound"): + return errors.Wrapf(err, "missing azure storage key for provided bucket %s", url) + default: + return errors.Wrap(err, "failed to write to bucket") + } +} + +func newUploader(ctx *context.Context) uploader { + if ctx.SkipPublish { + return &skipUploader{} + } + return &productionUploader{} +} + func getData(ctx *context.Context, conf config.Blob, path string) ([]byte, error) { data, err := ioutil.ReadFile(path) if err != nil { @@ -142,41 +160,51 @@ func getData(ctx *context.Context, conf config.Blob, path string) ([]byte, error // uploader implements upload type uploader interface { - Upload(ctx *context.Context, url, path string, data []byte) error + io.Closer + Open(ctx *context.Context, url string) error + Upload(ctx *context.Context, path string, data []byte) error } // skipUploader is used when --skip-upload is set and will just log // things without really doing anything type skipUploader struct{} -func (u skipUploader) Upload(_ *context.Context, url, path string, _ []byte) error { - log.WithFields(log.Fields{ - "bucket": url, - "path": path, - }).Warn("doUpload skipped because skip-publish is set") +func (u *skipUploader) Close() error { return nil } +func (u *skipUploader) Open(_ *context.Context, _ string) error { return nil } + +func (u *skipUploader) Upload(_ *context.Context, path string, _ []byte) error { + log.WithField("path", path).Warn("upload skipped because skip-publish is set") return nil } // productionUploader actually do upload to -type productionUploader struct{} +type productionUploader struct { + bucket *blob.Bucket +} -func (u productionUploader) Upload(ctx *context.Context, url, path string, data []byte) (err error) { +func (u *productionUploader) Close() error { + if u.bucket == nil { + return nil + } + return u.bucket.Close() +} +func (u *productionUploader) Open(ctx *context.Context, bucket string) error { log.WithFields(log.Fields{ - "bucket": url, - "path": path, - }).Info("uploading") + "bucket": bucket, + }).Debug("uploading") - // TODO: its not so great that we open one connection for each file - conn, err := blob.OpenBucket(ctx, url) + conn, err := blob.OpenBucket(ctx, bucket) if err != nil { return err } - defer func() { - if cerr := conn.Close(); err == nil { - err = cerr - } - }() - w, err := conn.NewWriter(ctx, path, nil) + u.bucket = conn + return nil +} + +func (u *productionUploader) Upload(ctx *context.Context, path string, data []byte) (err error) { + log.WithField("path", path).Info("uploading") + + w, err := u.bucket.NewWriter(ctx, path, nil) if err != nil { return err }