mirror of
https://github.com/goreleaser/goreleaser.git
synced 2025-01-06 03:13:48 +02:00
fix: blob data race (#1480)
Signed-off-by: Carlos Alexandro Becker <caarlos0@gmail.com>
This commit is contained in:
parent
0f7ff6247b
commit
39a7dc2ad1
@ -37,16 +37,12 @@ func (Pipe) Publish(ctx *context.Context) error {
|
||||
if len(ctx.Config.Blobs) == 0 {
|
||||
return pipe.Skip("Blob section is not configured")
|
||||
}
|
||||
var up uploader = productionUploader{}
|
||||
if ctx.SkipPublish {
|
||||
up = skipUploader{}
|
||||
}
|
||||
|
||||
var g = semerrgroup.New(ctx.Parallelism)
|
||||
for _, conf := range ctx.Config.Blobs {
|
||||
conf := conf
|
||||
g.Go(func() error {
|
||||
return doUpload(ctx, conf, up)
|
||||
return doUpload(ctx, conf)
|
||||
})
|
||||
}
|
||||
return g.Wait()
|
||||
|
@ -2,6 +2,7 @@ package blob
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"net/url"
|
||||
"path/filepath"
|
||||
@ -61,7 +62,7 @@ func urlFor(ctx *context.Context, conf config.Blob) (string, error) {
|
||||
// Takes goreleaser context(which includes artificats) and bucketURL for
|
||||
// upload to destination (eg: gs://gorelease-bucket) using the given uploader
|
||||
// implementation
|
||||
func doUpload(ctx *context.Context, conf config.Blob, up uploader) error {
|
||||
func doUpload(ctx *context.Context, conf config.Blob) error {
|
||||
folder, err := tmpl.New(ctx).Apply(conf.Folder)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -84,6 +85,12 @@ func doUpload(ctx *context.Context, conf config.Blob, up uploader) error {
|
||||
filter = artifact.And(filter, artifact.ByIDs(conf.IDs...))
|
||||
}
|
||||
|
||||
var up = newUploader(ctx)
|
||||
if err := up.Open(ctx, bucketURL); err != nil {
|
||||
return handleError(err, bucketURL)
|
||||
}
|
||||
defer up.Close()
|
||||
|
||||
var g = semerrgroup.New(ctx.Parallelism)
|
||||
for _, artifact := range ctx.Artifacts.Filter(filter).List() {
|
||||
artifact := artifact
|
||||
@ -94,25 +101,8 @@ func doUpload(ctx *context.Context, conf config.Blob, up uploader) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if err := up.Upload(ctx, bucketURL, filepath.Join(folder, artifact.Name), data); err != nil {
|
||||
switch {
|
||||
case errorContains(err, "NoSuchBucket", "ContainerNotFound", "notFound"):
|
||||
return errors.Wrapf(err, "provided bucket does not exist: %s", bucketURL)
|
||||
case errorContains(err, "NoCredentialProviders"):
|
||||
return errors.Wrapf(err, "check credentials and access to bucket: %s", bucketURL)
|
||||
case errorContains(err, "InvalidAccessKeyId"):
|
||||
return errors.Wrap(err, "aws access key id you provided does not exist in our records")
|
||||
case errorContains(err, "AuthenticationFailed"):
|
||||
return errors.Wrap(err, "azure storage key you provided is not valid")
|
||||
case errorContains(err, "invalid_grant"):
|
||||
return errors.Wrap(err, "google app credentials you provided is not valid")
|
||||
case errorContains(err, "no such host"):
|
||||
return errors.Wrap(err, "azure storage account you provided is not valid")
|
||||
case errorContains(err, "ServiceCode=ResourceNotFound"):
|
||||
return errors.Wrapf(err, "missing azure storage key for provided bucket %s", bucketURL)
|
||||
default:
|
||||
return errors.Wrap(err, "failed to write to bucket")
|
||||
}
|
||||
if err := up.Upload(ctx, filepath.Join(folder, artifact.Name), data); err != nil {
|
||||
return handleError(err, bucketURL)
|
||||
}
|
||||
return err
|
||||
})
|
||||
@ -120,6 +110,34 @@ func doUpload(ctx *context.Context, conf config.Blob, up uploader) error {
|
||||
return g.Wait()
|
||||
}
|
||||
|
||||
func handleError(err error, url string) error {
|
||||
switch {
|
||||
case errorContains(err, "NoSuchBucket", "ContainerNotFound", "notFound"):
|
||||
return errors.Wrapf(err, "provided bucket does not exist: %s", url)
|
||||
case errorContains(err, "NoCredentialProviders"):
|
||||
return errors.Wrapf(err, "check credentials and access to bucket: %s", url)
|
||||
case errorContains(err, "InvalidAccessKeyId"):
|
||||
return errors.Wrap(err, "aws access key id you provided does not exist in our records")
|
||||
case errorContains(err, "AuthenticationFailed"):
|
||||
return errors.Wrap(err, "azure storage key you provided is not valid")
|
||||
case errorContains(err, "invalid_grant"):
|
||||
return errors.Wrap(err, "google app credentials you provided is not valid")
|
||||
case errorContains(err, "no such host"):
|
||||
return errors.Wrap(err, "azure storage account you provided is not valid")
|
||||
case errorContains(err, "ServiceCode=ResourceNotFound"):
|
||||
return errors.Wrapf(err, "missing azure storage key for provided bucket %s", url)
|
||||
default:
|
||||
return errors.Wrap(err, "failed to write to bucket")
|
||||
}
|
||||
}
|
||||
|
||||
func newUploader(ctx *context.Context) uploader {
|
||||
if ctx.SkipPublish {
|
||||
return &skipUploader{}
|
||||
}
|
||||
return &productionUploader{}
|
||||
}
|
||||
|
||||
func getData(ctx *context.Context, conf config.Blob, path string) ([]byte, error) {
|
||||
data, err := ioutil.ReadFile(path)
|
||||
if err != nil {
|
||||
@ -142,41 +160,51 @@ func getData(ctx *context.Context, conf config.Blob, path string) ([]byte, error
|
||||
|
||||
// uploader implements upload
|
||||
type uploader interface {
|
||||
Upload(ctx *context.Context, url, path string, data []byte) error
|
||||
io.Closer
|
||||
Open(ctx *context.Context, url string) error
|
||||
Upload(ctx *context.Context, path string, data []byte) error
|
||||
}
|
||||
|
||||
// skipUploader is used when --skip-upload is set and will just log
|
||||
// things without really doing anything
|
||||
type skipUploader struct{}
|
||||
|
||||
func (u skipUploader) Upload(_ *context.Context, url, path string, _ []byte) error {
|
||||
log.WithFields(log.Fields{
|
||||
"bucket": url,
|
||||
"path": path,
|
||||
}).Warn("doUpload skipped because skip-publish is set")
|
||||
func (u *skipUploader) Close() error { return nil }
|
||||
func (u *skipUploader) Open(_ *context.Context, _ string) error { return nil }
|
||||
|
||||
func (u *skipUploader) Upload(_ *context.Context, path string, _ []byte) error {
|
||||
log.WithField("path", path).Warn("upload skipped because skip-publish is set")
|
||||
return nil
|
||||
}
|
||||
|
||||
// productionUploader actually do upload to
|
||||
type productionUploader struct{}
|
||||
type productionUploader struct {
|
||||
bucket *blob.Bucket
|
||||
}
|
||||
|
||||
func (u productionUploader) Upload(ctx *context.Context, url, path string, data []byte) (err error) {
|
||||
func (u *productionUploader) Close() error {
|
||||
if u.bucket == nil {
|
||||
return nil
|
||||
}
|
||||
return u.bucket.Close()
|
||||
}
|
||||
func (u *productionUploader) Open(ctx *context.Context, bucket string) error {
|
||||
log.WithFields(log.Fields{
|
||||
"bucket": url,
|
||||
"path": path,
|
||||
}).Info("uploading")
|
||||
"bucket": bucket,
|
||||
}).Debug("uploading")
|
||||
|
||||
// TODO: its not so great that we open one connection for each file
|
||||
conn, err := blob.OpenBucket(ctx, url)
|
||||
conn, err := blob.OpenBucket(ctx, bucket)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer func() {
|
||||
if cerr := conn.Close(); err == nil {
|
||||
err = cerr
|
||||
}
|
||||
}()
|
||||
w, err := conn.NewWriter(ctx, path, nil)
|
||||
u.bucket = conn
|
||||
return nil
|
||||
}
|
||||
|
||||
func (u *productionUploader) Upload(ctx *context.Context, path string, data []byte) (err error) {
|
||||
log.WithField("path", path).Info("uploading")
|
||||
|
||||
w, err := u.bucket.NewWriter(ctx, path, nil)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user