mirror of
https://github.com/umputun/reproxy.git
synced 2025-11-29 22:08:14 +02:00
simplify docker events with filters
This commit is contained in:
@@ -31,7 +31,7 @@ type Docker struct {
|
||||
// DockerClient defines interface listing containers and subscribing to events
|
||||
type DockerClient interface {
|
||||
ListContainers(opts dc.ListContainersOptions) ([]dc.APIContainers, error)
|
||||
AddEventListener(listener chan<- *dc.APIEvents) error
|
||||
AddEventListenerWithOptions(options dc.EventsOptions, listener chan<- *dc.APIEvents) error
|
||||
}
|
||||
|
||||
// containerInfo is simplified docker.APIEvents for containers only
|
||||
@@ -44,24 +44,19 @@ type containerInfo struct {
|
||||
Port int
|
||||
}
|
||||
|
||||
var (
|
||||
upStatuses = []string{"start", "restart"}
|
||||
downStatuses = []string{"die", "destroy", "stop", "pause"}
|
||||
)
|
||||
|
||||
// Channel gets eventsCh with all containers events
|
||||
// Events gets eventsCh with all containers-related docker events events
|
||||
func (d *Docker) Events(ctx context.Context) (res <-chan struct{}) {
|
||||
eventsCh := make(chan struct{})
|
||||
go func() {
|
||||
defer close(eventsCh)
|
||||
// loop over to recover from failed events call
|
||||
for {
|
||||
err := d.events(ctx, d.DockerClient, eventsCh) // publish events to eventsCh
|
||||
err := d.events(ctx, d.DockerClient, eventsCh) // publish events to eventsCh in a blocking loop
|
||||
if err == context.Canceled || err == context.DeadlineExceeded {
|
||||
close(eventsCh)
|
||||
return
|
||||
}
|
||||
log.Printf("[WARN] docker events listener failed, restarted, %v", err)
|
||||
time.Sleep(100 * time.Millisecond) // prevent busy loop on restart event listener
|
||||
log.Printf("[WARN] docker events listener failed (restarted), %v", err)
|
||||
time.Sleep(1 * time.Second) // prevent busy loop on restart of event listener
|
||||
}
|
||||
}()
|
||||
return eventsCh
|
||||
@@ -111,10 +106,14 @@ func (d *Docker) ID() discovery.ProviderID { return discovery.PIDocker }
|
||||
// filters everything except "container" type, detects stop/start events and publishes signals to eventsCh
|
||||
func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan struct{}) error {
|
||||
dockerEventsCh := make(chan *dc.APIEvents)
|
||||
if err := client.AddEventListener(dockerEventsCh); err != nil {
|
||||
err := client.AddEventListenerWithOptions(dc.EventsOptions{
|
||||
Filters: map[string][]string{"type": {"container"}, "event": {"start", "die", "destroy", "restart", "pause"}}},
|
||||
dockerEventsCh)
|
||||
if err != nil {
|
||||
return errors.Wrap(err, "can't add even listener")
|
||||
}
|
||||
|
||||
eventsCh <- struct{}{} // initial emmit
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
@@ -123,12 +122,6 @@ func (d *Docker) events(ctx context.Context, client DockerClient, eventsCh chan
|
||||
if !ok {
|
||||
return errors.New("events closed")
|
||||
}
|
||||
if ev.Type != "container" {
|
||||
continue
|
||||
}
|
||||
if !contains(ev.Status, upStatuses) && !contains(ev.Status, downStatuses) {
|
||||
continue
|
||||
}
|
||||
log.Printf("[DEBUG] api event %+v", ev)
|
||||
containerName := strings.TrimPrefix(ev.Actor.Attributes["name"], "/")
|
||||
|
||||
@@ -158,7 +151,7 @@ func (d *Docker) listContainers() (res []containerInfo, err error) {
|
||||
log.Printf("[DEBUG] total containers = %d", len(containers))
|
||||
|
||||
for _, c := range containers {
|
||||
if !contains(c.Status, upStatuses) {
|
||||
if !contains(c.State, []string{"running"}) {
|
||||
continue
|
||||
}
|
||||
containerName := strings.TrimPrefix(c.Names[0], "/")
|
||||
|
||||
@@ -15,8 +15,8 @@ import (
|
||||
//
|
||||
// // make and configure a mocked DockerClient
|
||||
// mockedDockerClient := &DockerClientMock{
|
||||
// AddEventListenerFunc: func(listener chan<- *dclient.APIEvents) error {
|
||||
// panic("mock out the AddEventListener method")
|
||||
// AddEventListenerWithOptionsFunc: func(options dclient.EventsOptions, listener chan<- *dclient.APIEvents) error {
|
||||
// panic("mock out the AddEventListenerWithOptions method")
|
||||
// },
|
||||
// ListContainersFunc: func(opts dclient.ListContainersOptions) ([]dclient.APIContainers, error) {
|
||||
// panic("mock out the ListContainers method")
|
||||
@@ -28,16 +28,18 @@ import (
|
||||
//
|
||||
// }
|
||||
type DockerClientMock struct {
|
||||
// AddEventListenerFunc mocks the AddEventListener method.
|
||||
AddEventListenerFunc func(listener chan<- *dclient.APIEvents) error
|
||||
// AddEventListenerWithOptionsFunc mocks the AddEventListenerWithOptions method.
|
||||
AddEventListenerWithOptionsFunc func(options dclient.EventsOptions, listener chan<- *dclient.APIEvents) error
|
||||
|
||||
// ListContainersFunc mocks the ListContainers method.
|
||||
ListContainersFunc func(opts dclient.ListContainersOptions) ([]dclient.APIContainers, error)
|
||||
|
||||
// calls tracks calls to the methods.
|
||||
calls struct {
|
||||
// AddEventListener holds details about calls to the AddEventListener method.
|
||||
AddEventListener []struct {
|
||||
// AddEventListenerWithOptions holds details about calls to the AddEventListenerWithOptions method.
|
||||
AddEventListenerWithOptions []struct {
|
||||
// Options is the options argument value.
|
||||
Options dclient.EventsOptions
|
||||
// Listener is the listener argument value.
|
||||
Listener chan<- *dclient.APIEvents
|
||||
}
|
||||
@@ -47,38 +49,42 @@ type DockerClientMock struct {
|
||||
Opts dclient.ListContainersOptions
|
||||
}
|
||||
}
|
||||
lockAddEventListener sync.RWMutex
|
||||
lockListContainers sync.RWMutex
|
||||
lockAddEventListenerWithOptions sync.RWMutex
|
||||
lockListContainers sync.RWMutex
|
||||
}
|
||||
|
||||
// AddEventListener calls AddEventListenerFunc.
|
||||
func (mock *DockerClientMock) AddEventListener(listener chan<- *dclient.APIEvents) error {
|
||||
if mock.AddEventListenerFunc == nil {
|
||||
panic("DockerClientMock.AddEventListenerFunc: method is nil but DockerClient.AddEventListener was just called")
|
||||
// AddEventListenerWithOptions calls AddEventListenerWithOptionsFunc.
|
||||
func (mock *DockerClientMock) AddEventListenerWithOptions(options dclient.EventsOptions, listener chan<- *dclient.APIEvents) error {
|
||||
if mock.AddEventListenerWithOptionsFunc == nil {
|
||||
panic("DockerClientMock.AddEventListenerWithOptionsFunc: method is nil but DockerClient.AddEventListenerWithOptions was just called")
|
||||
}
|
||||
callInfo := struct {
|
||||
Options dclient.EventsOptions
|
||||
Listener chan<- *dclient.APIEvents
|
||||
}{
|
||||
Options: options,
|
||||
Listener: listener,
|
||||
}
|
||||
mock.lockAddEventListener.Lock()
|
||||
mock.calls.AddEventListener = append(mock.calls.AddEventListener, callInfo)
|
||||
mock.lockAddEventListener.Unlock()
|
||||
return mock.AddEventListenerFunc(listener)
|
||||
mock.lockAddEventListenerWithOptions.Lock()
|
||||
mock.calls.AddEventListenerWithOptions = append(mock.calls.AddEventListenerWithOptions, callInfo)
|
||||
mock.lockAddEventListenerWithOptions.Unlock()
|
||||
return mock.AddEventListenerWithOptionsFunc(options, listener)
|
||||
}
|
||||
|
||||
// AddEventListenerCalls gets all the calls that were made to AddEventListener.
|
||||
// AddEventListenerWithOptionsCalls gets all the calls that were made to AddEventListenerWithOptions.
|
||||
// Check the length with:
|
||||
// len(mockedDockerClient.AddEventListenerCalls())
|
||||
func (mock *DockerClientMock) AddEventListenerCalls() []struct {
|
||||
// len(mockedDockerClient.AddEventListenerWithOptionsCalls())
|
||||
func (mock *DockerClientMock) AddEventListenerWithOptionsCalls() []struct {
|
||||
Options dclient.EventsOptions
|
||||
Listener chan<- *dclient.APIEvents
|
||||
} {
|
||||
var calls []struct {
|
||||
Options dclient.EventsOptions
|
||||
Listener chan<- *dclient.APIEvents
|
||||
}
|
||||
mock.lockAddEventListener.RLock()
|
||||
calls = mock.calls.AddEventListener
|
||||
mock.lockAddEventListener.RUnlock()
|
||||
mock.lockAddEventListenerWithOptions.RLock()
|
||||
calls = mock.calls.AddEventListenerWithOptions
|
||||
mock.lockAddEventListenerWithOptions.RUnlock()
|
||||
return calls
|
||||
}
|
||||
|
||||
|
||||
@@ -5,39 +5,39 @@ import (
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
dclient "github.com/fsouza/go-dockerclient"
|
||||
dc "github.com/fsouza/go-dockerclient"
|
||||
"github.com/stretchr/testify/assert"
|
||||
"github.com/stretchr/testify/require"
|
||||
)
|
||||
|
||||
func TestDocker_List(t *testing.T) {
|
||||
dc := &DockerClientMock{
|
||||
ListContainersFunc: func(opts dclient.ListContainersOptions) ([]dclient.APIContainers, error) {
|
||||
return []dclient.APIContainers{
|
||||
{Names: []string{"c1"}, Status: "start",
|
||||
Networks: dclient.NetworkList{
|
||||
Networks: map[string]dclient.ContainerNetwork{"default": {IPAddress: "127.0.0.2"}},
|
||||
dclient := &DockerClientMock{
|
||||
ListContainersFunc: func(opts dc.ListContainersOptions) ([]dc.APIContainers, error) {
|
||||
return []dc.APIContainers{
|
||||
{Names: []string{"c1"}, State: "running",
|
||||
Networks: dc.NetworkList{
|
||||
Networks: map[string]dc.ContainerNetwork{"bridge": {IPAddress: "127.0.0.2"}},
|
||||
},
|
||||
Ports: []dclient.APIPort{
|
||||
Ports: []dc.APIPort{
|
||||
{PrivatePort: 12345},
|
||||
},
|
||||
Labels: map[string]string{"reproxy.route": "^/api/123/(.*)", "reproxy.dest": "/blah/$1",
|
||||
"reproxy.server": "example.com", "reproxy.ping": "http://localhost/ping"},
|
||||
},
|
||||
{Names: []string{"c2"}, Status: "start",
|
||||
Networks: dclient.NetworkList{
|
||||
Networks: map[string]dclient.ContainerNetwork{"default": {IPAddress: "127.0.0.3"}},
|
||||
{Names: []string{"c2"}, State: "running",
|
||||
Networks: dc.NetworkList{
|
||||
Networks: map[string]dc.ContainerNetwork{"bridge": {IPAddress: "127.0.0.3"}},
|
||||
},
|
||||
Ports: []dclient.APIPort{
|
||||
Ports: []dc.APIPort{
|
||||
{PrivatePort: 12346},
|
||||
},
|
||||
},
|
||||
{Names: []string{"c3"}, Status: "stop"},
|
||||
{Names: []string{"c4"}, Status: "start",
|
||||
Networks: dclient.NetworkList{
|
||||
Networks: map[string]dclient.ContainerNetwork{"other": {IPAddress: "127.0.0.2"}},
|
||||
{Names: []string{"c3"}, State: "stopped"},
|
||||
{Names: []string{"c4"}, State: "running",
|
||||
Networks: dc.NetworkList{
|
||||
Networks: map[string]dc.ContainerNetwork{"other": {IPAddress: "127.0.0.2"}},
|
||||
},
|
||||
Ports: []dclient.APIPort{
|
||||
Ports: []dc.APIPort{
|
||||
{PrivatePort: 12345},
|
||||
},
|
||||
},
|
||||
@@ -45,10 +45,10 @@ func TestDocker_List(t *testing.T) {
|
||||
},
|
||||
}
|
||||
|
||||
d := Docker{DockerClient: dc, Network: "default"}
|
||||
d := Docker{DockerClient: dclient, Network: "bridge"}
|
||||
res, err := d.List()
|
||||
require.NoError(t, err)
|
||||
assert.Equal(t, 2, len(res))
|
||||
require.Equal(t, 2, len(res))
|
||||
|
||||
assert.Equal(t, "^/api/123/(.*)", res[0].SrcMatch.String())
|
||||
assert.Equal(t, "http://127.0.0.2:12345/blah/$1", res[0].Dst)
|
||||
@@ -63,15 +63,15 @@ func TestDocker_List(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestDocker_Events(t *testing.T) {
|
||||
dc := &DockerClientMock{
|
||||
AddEventListenerFunc: func(listener chan<- *dclient.APIEvents) error {
|
||||
dclient := &DockerClientMock{
|
||||
AddEventListenerWithOptionsFunc: func(options dc.EventsOptions, listener chan<- *dc.APIEvents) error {
|
||||
go func() {
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
listener <- &dclient.APIEvents{Type: "container", Status: "start",
|
||||
Actor: dclient.APIActor{Attributes: map[string]string{"name": "/c1"}}}
|
||||
listener <- &dc.APIEvents{Type: "container", Status: "start",
|
||||
Actor: dc.APIActor{Attributes: map[string]string{"name": "/c1"}}}
|
||||
time.Sleep(30 * time.Millisecond)
|
||||
listener <- &dclient.APIEvents{Type: "bad", Status: "start",
|
||||
Actor: dclient.APIActor{Attributes: map[string]string{"name": "/c2"}}}
|
||||
listener <- &dc.APIEvents{Type: "container", Status: "start",
|
||||
Actor: dc.APIActor{Attributes: map[string]string{"name": "/c2"}}}
|
||||
}()
|
||||
return nil
|
||||
},
|
||||
@@ -80,7 +80,7 @@ func TestDocker_Events(t *testing.T) {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 200*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
d := Docker{DockerClient: dc}
|
||||
d := Docker{DockerClient: dclient}
|
||||
ch := d.Events(ctx)
|
||||
|
||||
events := 0
|
||||
@@ -88,5 +88,5 @@ func TestDocker_Events(t *testing.T) {
|
||||
t.Log("event")
|
||||
events++
|
||||
}
|
||||
assert.Equal(t, 1, events)
|
||||
assert.Equal(t, 2+1, events, "initial event plus 2 more")
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user