From 2c54ee703fb0cc23a808eb40099845368c581e41 Mon Sep 17 00:00:00 2001 From: Joel Speed Date: Sun, 14 Feb 2021 17:39:36 +0000 Subject: [PATCH] Add server group implementation for running multiple servers at once --- pkg/http/server_group.go | 35 ++++++++++++ pkg/http/server_group_test.go | 102 ++++++++++++++++++++++++++++++++++ 2 files changed, 137 insertions(+) create mode 100644 pkg/http/server_group.go create mode 100644 pkg/http/server_group_test.go diff --git a/pkg/http/server_group.go b/pkg/http/server_group.go new file mode 100644 index 00000000..261d9837 --- /dev/null +++ b/pkg/http/server_group.go @@ -0,0 +1,35 @@ +package http + +import ( + "context" + + "golang.org/x/sync/errgroup" +) + +// NewServerGroup creates a new Server to start and gracefully stop a collection +// of Servers. +func NewServerGroup(servers ...Server) Server { + return &serverGroup{ + servers: servers, + } +} + +// serverGroup manages the starting and graceful shutdown of a collection of +// servers. +type serverGroup struct { + servers []Server +} + +// Start runs the servers in the server group. +func (s *serverGroup) Start(ctx context.Context) error { + g, groupCtx := errgroup.WithContext(ctx) + + for _, server := range s.servers { + srv := server + g.Go(func() error { + return srv.Start(groupCtx) + }) + } + + return g.Wait() +} diff --git a/pkg/http/server_group_test.go b/pkg/http/server_group_test.go new file mode 100644 index 00000000..97748b4d --- /dev/null +++ b/pkg/http/server_group_test.go @@ -0,0 +1,102 @@ +package http + +import ( + "context" + "errors" + + . "github.com/onsi/ginkgo" + . "github.com/onsi/gomega" +) + +var _ = Describe("Server Group", func() { + var m1, m2, m3 *mockServer + var ctx context.Context + var cancel context.CancelFunc + var group Server + + BeforeEach(func() { + ctx, cancel = context.WithCancel(context.Background()) + + m1 = newMockServer() + m2 = newMockServer() + m3 = newMockServer() + group = NewServerGroup(m1, m2, m3) + }) + + AfterEach(func() { + cancel() + }) + + It("starts each server in the group", func() { + go func() { + defer GinkgoRecover() + Expect(group.Start(ctx)).To(Succeed()) + }() + + Eventually(m1.started).Should(BeClosed(), "mock server 1 not started") + Eventually(m2.started).Should(BeClosed(), "mock server 2 not started") + Eventually(m3.started).Should(BeClosed(), "mock server 3 not started") + }) + + It("stop each server in the group when the context is cancelled", func() { + go func() { + defer GinkgoRecover() + Expect(group.Start(ctx)).To(Succeed()) + }() + + cancel() + Eventually(m1.stopped).Should(BeClosed(), "mock server 1 not stopped") + Eventually(m2.stopped).Should(BeClosed(), "mock server 2 not stopped") + Eventually(m3.stopped).Should(BeClosed(), "mock server 3 not stopped") + }) + + It("stop each server in the group when the an error occurs", func() { + err := errors.New("server error") + go func() { + defer GinkgoRecover() + Expect(group.Start(ctx)).To(MatchError(err)) + }() + + m2.errors <- err + Eventually(m1.stopped).Should(BeClosed(), "mock server 1 not stopped") + Eventually(m2.stopped).Should(BeClosed(), "mock server 2 not stopped") + Eventually(m3.stopped).Should(BeClosed(), "mock server 3 not stopped") + }) +}) + +// mockServer is used to test the server group can start +// and stop multiple servers simultaneously. +type mockServer struct { + started chan struct{} + startClosed bool + stopped chan struct{} + stopClosed bool + errors chan error +} + +func newMockServer() *mockServer { + return &mockServer{ + started: make(chan struct{}), + stopped: make(chan struct{}), + errors: make(chan error), + } +} + +func (m *mockServer) Start(ctx context.Context) error { + if !m.startClosed { + close(m.started) + m.startClosed = true + } + defer func() { + if !m.stopClosed { + close(m.stopped) + m.stopClosed = true + } + }() + select { + case <-ctx.Done(): + return nil + case err := <-m.errors: + return err + } +}