From df734bd111efdc5d436fee291ef07b6cbeff6d8c Mon Sep 17 00:00:00 2001 From: Carlos Alexandro Becker Date: Sat, 11 Jun 2022 16:54:55 -0300 Subject: [PATCH] feat: update x/sync (#3138) * feat: update x/sync Signed-off-by: Carlos A Becker * test: fixes Signed-off-by: Carlos A Becker * fix: revert Signed-off-by: Carlos A Becker * test: fix race condition Signed-off-by: Carlos A Becker --- go.mod | 2 +- go.sum | 3 +- internal/semerrgroup/sem.go | 57 ++-------------------- internal/semerrgroup/sem_test.go | 83 +++++++++++++++++++------------- 4 files changed, 55 insertions(+), 90 deletions(-) diff --git a/go.mod b/go.mod index 6a5921f38..3638601ca 100644 --- a/go.mod +++ b/go.mod @@ -36,7 +36,7 @@ require ( gocloud.dev v0.24.0 golang.org/x/crypto v0.0.0-20220307211146-efcb8507fb70 golang.org/x/oauth2 v0.0.0-20220411215720-9780585627b5 - golang.org/x/sync v0.0.0-20210220032951-036812b2e83c + golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 gopkg.in/mail.v2 v2.3.1 gopkg.in/yaml.v3 v3.0.1 ) diff --git a/go.sum b/go.sum index 6c1e0d948..2480810e3 100644 --- a/go.sum +++ b/go.sum @@ -844,8 +844,9 @@ golang.org/x/sync v0.0.0-20200317015054-43a5402ce75a/go.mod h1:RxMgew5VJxzue5/jJ golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sync v0.0.0-20201207232520-09787c993a3a/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= -golang.org/x/sync v0.0.0-20210220032951-036812b2e83c h1:5KslGYwFpkhGh+Q16bwMP3cOontH8FOep7tGV86Y7SQ= golang.org/x/sync v0.0.0-20210220032951-036812b2e83c/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29 h1:w8s32wxx3sY+OjLlv9qltkLU5yvJzxjjgiHWLjdIcw4= +golang.org/x/sync v0.0.0-20220513210516-0976fa681c29/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM= golang.org/x/sys v0.0.0-20180823144017-11551d06cbcc/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180830151530-49385e6e1522/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= golang.org/x/sys v0.0.0-20180905080454-ebe1bf3edb33/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY= diff --git a/internal/semerrgroup/sem.go b/internal/semerrgroup/sem.go index 323ed4ce3..74a2f35b3 100644 --- a/internal/semerrgroup/sem.go +++ b/internal/semerrgroup/sem.go @@ -17,60 +17,9 @@ type Group interface { // New returns a new Group of a given size. func New(size int) Group { - if size == 1 { - return &serialGroup{} - } - return ¶llelGroup{ - ch: make(chan bool, size), - g: errgroup.Group{}, - } -} - -var _ Group = ¶llelGroup{} - -type parallelGroup struct { - ch chan bool - g errgroup.Group -} - -// Go execs one function respecting the group and semaphore. -func (s *parallelGroup) Go(fn func() error) { - s.g.Go(func() error { - s.ch <- true - defer func() { - <-s.ch - }() - return fn() - }) -} - -// Wait waits for the group to complete and return an error if any. -func (s *parallelGroup) Wait() error { - return s.g.Wait() -} - -var _ Group = &serialGroup{} - -type serialGroup struct { - err error - errOnce sync.Once -} - -// Go execs runs `fn` and saves the result if no error has been encountered. -func (s *serialGroup) Go(fn func() error) { - if s.err != nil { - return - } - if err := fn(); err != nil { - s.errOnce.Do(func() { - s.err = err - }) - } -} - -// Wait waits for Go to complete and returns the first error encountered. -func (s *serialGroup) Wait() error { - return s.err + var g errgroup.Group + g.SetLimit(size) + return &g } var _ Group = &skipAwareGroup{} diff --git a/internal/semerrgroup/sem_test.go b/internal/semerrgroup/sem_test.go index d2c0758b5..c664ee7bd 100644 --- a/internal/semerrgroup/sem_test.go +++ b/internal/semerrgroup/sem_test.go @@ -11,20 +11,24 @@ import ( ) func TestSemaphore(t *testing.T) { - g := New(4) - var lock sync.Mutex - var counter int - for i := 0; i < 10; i++ { - g.Go(func() error { - time.Sleep(10 * time.Millisecond) - lock.Lock() - counter++ - lock.Unlock() - return nil + for _, i := range []int{1, 4} { + t.Run(fmt.Sprintf("limit-%d", i), func(t *testing.T) { + g := New(i) + var lock sync.Mutex + var counter int + for i := 0; i < 10; i++ { + g.Go(func() error { + time.Sleep(10 * time.Millisecond) + lock.Lock() + counter++ + lock.Unlock() + return nil + }) + } + require.NoError(t, g.Wait()) + require.Equal(t, counter, 10) }) } - require.NoError(t, g.Wait()) - require.Equal(t, counter, 10) } func TestSemaphoreOrder(t *testing.T) { @@ -42,35 +46,46 @@ func TestSemaphoreOrder(t *testing.T) { require.Equal(t, []int{0, 1, 2, 3, 4, 5, 6, 7, 8, 9}, output) } -func TestSemaphoreOrderError(t *testing.T) { - g := New(1) - output := []int{} - for i := 0; i < 10; i++ { - i := i - g.Go(func() error { - output = append(output, i) - return fmt.Errorf("fake err") +func TestSemaphoreError(t *testing.T) { + for _, i := range []int{1, 4} { + t.Run(fmt.Sprintf("limit-%d", i), func(t *testing.T) { + g := New(i) + var lock sync.Mutex + output := []int{} + for i := 0; i < 10; i++ { + i := i + g.Go(func() error { + lock.Lock() + defer lock.Unlock() + output = append(output, i) + return fmt.Errorf("fake err") + }) + } + require.EqualError(t, g.Wait(), "fake err") + require.Len(t, output, 10) }) } - require.EqualError(t, g.Wait(), "fake err") - require.Equal(t, []int{0}, output) } func TestSemaphoreSkipAware(t *testing.T) { - g := NewSkipAware(New(1)) - var lock sync.Mutex - var counter int - for i := 0; i < 10; i++ { - g.Go(func() error { - time.Sleep(10 * time.Millisecond) - lock.Lock() - counter++ - lock.Unlock() - return pipe.Skip("fake skip") + for _, i := range []int{1, 4} { + t.Run(fmt.Sprintf("limit-%d", i), func(t *testing.T) { + g := NewSkipAware(New(i)) + var lock sync.Mutex + var counter int + for i := 0; i < 10; i++ { + g.Go(func() error { + time.Sleep(10 * time.Millisecond) + lock.Lock() + counter++ + lock.Unlock() + return pipe.Skip("fake skip") + }) + } + require.EqualError(t, g.Wait(), "fake skip") + require.Equal(t, counter, 10) }) } - require.EqualError(t, g.Wait(), "fake skip") - require.Equal(t, counter, 10) } func TestSemaphoreSkipAndRealError(t *testing.T) {