diff --git a/app/discovery/provider/docker.go b/app/discovery/provider/docker.go index 89ec11d..d8dfd5f 100644 --- a/app/discovery/provider/docker.go +++ b/app/discovery/provider/docker.go @@ -31,7 +31,7 @@ type Docker struct { // DockerClient defines interface listing containers and subscribing to events type DockerClient interface { ListContainers(opts dc.ListContainersOptions) ([]dc.APIContainers, error) - AddEventListener(listener chan<- *dc.APIEvents) error + AddEventListenerWithOptions(options dc.EventsOptions, listener chan<- *dc.APIEvents) error } // containerInfo is simplified docker.APIEvents for containers only @@ -44,24 +44,19 @@ type containerInfo struct { Port int } -var ( - upStatuses = []string{"start", "restart"} - downStatuses = []string{"die", "destroy", "stop", "pause"} -) - -// Channel gets eventsCh with all containers events +// Events gets eventsCh with all containers-related docker events events func (d *Docker) Events(ctx context.Context) (res <-chan struct{}) { eventsCh := make(chan struct{}) go func() { + defer close(eventsCh) // loop over to recover from failed events call for { - err := d.events(ctx, d.DockerClient, eventsCh) // publish events to eventsCh + err := d.events(ctx, d.DockerClient, eventsCh) // publish events to eventsCh in a blocking loop if err == context.Canceled || err == context.DeadlineExceeded { - close(eventsCh) return } - log.Printf("[WARN] docker events listener failed, restarted, %v", err) - time.Sleep(100 * time.Millisecond) // prevent busy loop on restart event listener + log.Printf("[WARN] docker events listener failed (restarted), %v", err) + time.Sleep(1 * time.Second) // prevent busy loop on restart of event listener } }() return eventsCh @@ -111,10 +106,14 @@ func (d *Docker) ID() discovery.ProviderID { return discovery.PIDocker } // filters everything except "container" type, detects stop/start events and publishes signals to eventsCh func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan struct{}) error { dockerEventsCh := make(chan *dc.APIEvents) - if err := client.AddEventListener(dockerEventsCh); err != nil { + err := client.AddEventListenerWithOptions(dc.EventsOptions{ + Filters: map[string][]string{"type": {"container"}, "event": {"start", "die", "destroy", "restart", "pause"}}}, + dockerEventsCh) + if err != nil { return errors.Wrap(err, "can't add even listener") } + eventsCh <- struct{}{} // initial emmit for { select { case <-ctx.Done(): @@ -123,12 +122,6 @@ func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan if !ok { return errors.New("events closed") } - if ev.Type != "container" { - continue - } - if !contains(ev.Status, upStatuses) && !contains(ev.Status, downStatuses) { - continue - } log.Printf("[DEBUG] api event %+v", ev) containerName := strings.TrimPrefix(ev.Actor.Attributes["name"], "/") @@ -158,7 +151,7 @@ func (d *Docker) listContainers() (res []containerInfo, err error) { log.Printf("[DEBUG] total containers = %d", len(containers)) for _, c := range containers { - if !contains(c.Status, upStatuses) { + if !contains(c.State, []string{"running"}) { continue } containerName := strings.TrimPrefix(c.Names[0], "/") diff --git a/app/discovery/provider/docker_client_mock.go b/app/discovery/provider/docker_client_mock.go index 6d1c97a..dadd9ce 100644 --- a/app/discovery/provider/docker_client_mock.go +++ b/app/discovery/provider/docker_client_mock.go @@ -15,8 +15,8 @@ import ( // // // make and configure a mocked DockerClient // mockedDockerClient := &DockerClientMock{ -// AddEventListenerFunc: func(listener chan<- *dclient.APIEvents) error { -// panic("mock out the AddEventListener method") +// AddEventListenerWithOptionsFunc: func(options dclient.EventsOptions, listener chan<- *dclient.APIEvents) error { +// panic("mock out the AddEventListenerWithOptions method") // }, // ListContainersFunc: func(opts dclient.ListContainersOptions) ([]dclient.APIContainers, error) { // panic("mock out the ListContainers method") @@ -28,16 +28,18 @@ import ( // // } type DockerClientMock struct { - // AddEventListenerFunc mocks the AddEventListener method. - AddEventListenerFunc func(listener chan<- *dclient.APIEvents) error + // AddEventListenerWithOptionsFunc mocks the AddEventListenerWithOptions method. + AddEventListenerWithOptionsFunc func(options dclient.EventsOptions, listener chan<- *dclient.APIEvents) error // ListContainersFunc mocks the ListContainers method. ListContainersFunc func(opts dclient.ListContainersOptions) ([]dclient.APIContainers, error) // calls tracks calls to the methods. calls struct { - // AddEventListener holds details about calls to the AddEventListener method. - AddEventListener []struct { + // AddEventListenerWithOptions holds details about calls to the AddEventListenerWithOptions method. + AddEventListenerWithOptions []struct { + // Options is the options argument value. + Options dclient.EventsOptions // Listener is the listener argument value. Listener chan<- *dclient.APIEvents } @@ -47,38 +49,42 @@ type DockerClientMock struct { Opts dclient.ListContainersOptions } } - lockAddEventListener sync.RWMutex - lockListContainers sync.RWMutex + lockAddEventListenerWithOptions sync.RWMutex + lockListContainers sync.RWMutex } -// AddEventListener calls AddEventListenerFunc. -func (mock *DockerClientMock) AddEventListener(listener chan<- *dclient.APIEvents) error { - if mock.AddEventListenerFunc == nil { - panic("DockerClientMock.AddEventListenerFunc: method is nil but DockerClient.AddEventListener was just called") +// AddEventListenerWithOptions calls AddEventListenerWithOptionsFunc. +func (mock *DockerClientMock) AddEventListenerWithOptions(options dclient.EventsOptions, listener chan<- *dclient.APIEvents) error { + if mock.AddEventListenerWithOptionsFunc == nil { + panic("DockerClientMock.AddEventListenerWithOptionsFunc: method is nil but DockerClient.AddEventListenerWithOptions was just called") } callInfo := struct { + Options dclient.EventsOptions Listener chan<- *dclient.APIEvents }{ + Options: options, Listener: listener, } - mock.lockAddEventListener.Lock() - mock.calls.AddEventListener = append(mock.calls.AddEventListener, callInfo) - mock.lockAddEventListener.Unlock() - return mock.AddEventListenerFunc(listener) + mock.lockAddEventListenerWithOptions.Lock() + mock.calls.AddEventListenerWithOptions = append(mock.calls.AddEventListenerWithOptions, callInfo) + mock.lockAddEventListenerWithOptions.Unlock() + return mock.AddEventListenerWithOptionsFunc(options, listener) } -// AddEventListenerCalls gets all the calls that were made to AddEventListener. +// AddEventListenerWithOptionsCalls gets all the calls that were made to AddEventListenerWithOptions. // Check the length with: -// len(mockedDockerClient.AddEventListenerCalls()) -func (mock *DockerClientMock) AddEventListenerCalls() []struct { +// len(mockedDockerClient.AddEventListenerWithOptionsCalls()) +func (mock *DockerClientMock) AddEventListenerWithOptionsCalls() []struct { + Options dclient.EventsOptions Listener chan<- *dclient.APIEvents } { var calls []struct { + Options dclient.EventsOptions Listener chan<- *dclient.APIEvents } - mock.lockAddEventListener.RLock() - calls = mock.calls.AddEventListener - mock.lockAddEventListener.RUnlock() + mock.lockAddEventListenerWithOptions.RLock() + calls = mock.calls.AddEventListenerWithOptions + mock.lockAddEventListenerWithOptions.RUnlock() return calls } diff --git a/app/discovery/provider/docker_test.go b/app/discovery/provider/docker_test.go index 96ff96f..e5da43d 100644 --- a/app/discovery/provider/docker_test.go +++ b/app/discovery/provider/docker_test.go @@ -5,39 +5,39 @@ import ( "testing" "time" - dclient "github.com/fsouza/go-dockerclient" + dc "github.com/fsouza/go-dockerclient" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) func TestDocker_List(t *testing.T) { - dc := &DockerClientMock{ - ListContainersFunc: func(opts dclient.ListContainersOptions) ([]dclient.APIContainers, error) { - return []dclient.APIContainers{ - {Names: []string{"c1"}, Status: "start", - Networks: dclient.NetworkList{ - Networks: map[string]dclient.ContainerNetwork{"default": {IPAddress: "127.0.0.2"}}, + dclient := &DockerClientMock{ + ListContainersFunc: func(opts dc.ListContainersOptions) ([]dc.APIContainers, error) { + return []dc.APIContainers{ + {Names: []string{"c1"}, State: "running", + Networks: dc.NetworkList{ + Networks: map[string]dc.ContainerNetwork{"bridge": {IPAddress: "127.0.0.2"}}, }, - Ports: []dclient.APIPort{ + Ports: []dc.APIPort{ {PrivatePort: 12345}, }, Labels: map[string]string{"reproxy.route": "^/api/123/(.*)", "reproxy.dest": "/blah/$1", "reproxy.server": "example.com", "reproxy.ping": "http://localhost/ping"}, }, - {Names: []string{"c2"}, Status: "start", - Networks: dclient.NetworkList{ - Networks: map[string]dclient.ContainerNetwork{"default": {IPAddress: "127.0.0.3"}}, + {Names: []string{"c2"}, State: "running", + Networks: dc.NetworkList{ + Networks: map[string]dc.ContainerNetwork{"bridge": {IPAddress: "127.0.0.3"}}, }, - Ports: []dclient.APIPort{ + Ports: []dc.APIPort{ {PrivatePort: 12346}, }, }, - {Names: []string{"c3"}, Status: "stop"}, - {Names: []string{"c4"}, Status: "start", - Networks: dclient.NetworkList{ - Networks: map[string]dclient.ContainerNetwork{"other": {IPAddress: "127.0.0.2"}}, + {Names: []string{"c3"}, State: "stopped"}, + {Names: []string{"c4"}, State: "running", + Networks: dc.NetworkList{ + Networks: map[string]dc.ContainerNetwork{"other": {IPAddress: "127.0.0.2"}}, }, - Ports: []dclient.APIPort{ + Ports: []dc.APIPort{ {PrivatePort: 12345}, }, }, @@ -45,10 +45,10 @@ func TestDocker_List(t *testing.T) { }, } - d := Docker{DockerClient: dc, Network: "default"} + d := Docker{DockerClient: dclient, Network: "bridge"} res, err := d.List() require.NoError(t, err) - assert.Equal(t, 2, len(res)) + require.Equal(t, 2, len(res)) assert.Equal(t, "^/api/123/(.*)", res[0].SrcMatch.String()) assert.Equal(t, "http://127.0.0.2:12345/blah/$1", res[0].Dst) @@ -63,15 +63,15 @@ func TestDocker_List(t *testing.T) { } func TestDocker_Events(t *testing.T) { - dc := &DockerClientMock{ - AddEventListenerFunc: func(listener chan<- *dclient.APIEvents) error { + dclient := &DockerClientMock{ + AddEventListenerWithOptionsFunc: func(options dc.EventsOptions, listener chan<- *dc.APIEvents) error { go func() { time.Sleep(30 * time.Millisecond) - listener <- &dclient.APIEvents{Type: "container", Status: "start", - Actor: dclient.APIActor{Attributes: map[string]string{"name": "/c1"}}} + listener <- &dc.APIEvents{Type: "container", Status: "start", + Actor: dc.APIActor{Attributes: map[string]string{"name": "/c1"}}} time.Sleep(30 * time.Millisecond) - listener <- &dclient.APIEvents{Type: "bad", Status: "start", - Actor: dclient.APIActor{Attributes: map[string]string{"name": "/c2"}}} + listener <- &dc.APIEvents{Type: "container", Status: "start", + Actor: dc.APIActor{Attributes: map[string]string{"name": "/c2"}}} }() return nil }, @@ -80,7 +80,7 @@ func TestDocker_Events(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond) defer cancel() - d := Docker{DockerClient: dc} + d := Docker{DockerClient: dclient} ch := d.Events(ctx) events := 0 @@ -88,5 +88,5 @@ func TestDocker_Events(t *testing.T) { t.Log("event") events++ } - assert.Equal(t, 1, events) + assert.Equal(t, 2+1, events, "initial event plus 2 more") }