mirror of
https://github.com/umputun/reproxy.git
synced 2025-09-16 08:46:17 +02:00
Refactor
Thx for the feedback. * Add missing test * Decouple containerInfo from docker client * Move refresh interval var to `Docker` * Minor things
This commit is contained in:
@@ -13,7 +13,6 @@ import (
|
||||
"time"
|
||||
|
||||
log "github.com/go-pkgz/lgr"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
"github.com/umputun/reproxy/app/discovery"
|
||||
)
|
||||
@@ -27,9 +26,10 @@ import (
|
||||
// Alternatively labels can alter this. reproxy.route sets source route, and reproxy.dest sets the destination.
|
||||
// Optional reproxy.server enforces match by server name (hostname) and reproxy.ping sets the health check url
|
||||
type Docker struct {
|
||||
DockerClient DockerClient
|
||||
Excludes []string
|
||||
AutoAPI bool
|
||||
DockerClient DockerClient
|
||||
Excludes []string
|
||||
AutoAPI bool
|
||||
RefreshInterval time.Duration
|
||||
}
|
||||
|
||||
// DockerClient defines interface listing containers and subscribing to events
|
||||
@@ -39,19 +39,23 @@ type DockerClient interface {
|
||||
|
||||
// containerInfo is simplified view of container metadata
|
||||
type containerInfo struct {
|
||||
ID string `json:"Id"`
|
||||
Name string `json:"-"`
|
||||
State string `json:"State"`
|
||||
Labels map[string]string `json:"Labels"`
|
||||
TS time.Time `json:"-"`
|
||||
IP string `json:"-"`
|
||||
Ports []int `json:"-"`
|
||||
ID string
|
||||
Name string
|
||||
State string
|
||||
Labels map[string]string
|
||||
TS time.Time
|
||||
IP string
|
||||
Ports []int
|
||||
}
|
||||
|
||||
// Events gets eventsCh with all containers-related docker events events
|
||||
func (d *Docker) Events(ctx context.Context) (res <-chan discovery.ProviderID) {
|
||||
eventsCh := make(chan discovery.ProviderID)
|
||||
go d.events(ctx, eventsCh)
|
||||
go func() {
|
||||
if err := d.events(ctx, eventsCh); err != context.Canceled {
|
||||
log.Printf("[ERROR] unexpected docker client exit reason: %s", err)
|
||||
}
|
||||
}()
|
||||
return eventsCh
|
||||
}
|
||||
|
||||
@@ -123,7 +127,7 @@ func (d *Docker) List() ([]discovery.URLMapper, error) {
|
||||
|
||||
srcRegex, err := regexp.Compile(srcURL)
|
||||
if err != nil {
|
||||
return nil, errors.Wrapf(err, "invalid src regex %s", srcURL)
|
||||
return nil, fmt.Errorf("invalid src regex: %w", err)
|
||||
}
|
||||
|
||||
// docker server label may have multiple, comma separated servers
|
||||
@@ -155,7 +159,7 @@ func (d *Docker) matchedPort(c containerInfo) (port int, err error) {
|
||||
if v, ok := c.Labels["reproxy.port"]; ok {
|
||||
rp, err := strconv.Atoi(v)
|
||||
if err != nil {
|
||||
return 0, errors.Wrapf(err, "invalid reproxy.port %s", v)
|
||||
return 0, fmt.Errorf("invalid reproxy port %s: %w", v, err)
|
||||
}
|
||||
for _, p := range c.Ports {
|
||||
// set port to reproxy.port if matched with one of exposed
|
||||
@@ -168,13 +172,10 @@ func (d *Docker) matchedPort(c containerInfo) (port int, err error) {
|
||||
return port, nil
|
||||
}
|
||||
|
||||
// changed in tests
|
||||
var dockerPollingInterval = time.Second * 10
|
||||
|
||||
// events starts monitoring changes in running containers and sends refresh
|
||||
// notification to eventsCh when change(s) are detected. Blocks caller
|
||||
func (d *Docker) events(ctx context.Context, eventsCh chan<- discovery.ProviderID) error {
|
||||
ticker := time.NewTicker(dockerPollingInterval)
|
||||
ticker := time.NewTicker(d.RefreshInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Keep track of running containers
|
||||
@@ -226,7 +227,7 @@ func (d *Docker) events(ctx context.Context, eventsCh chan<- discovery.ProviderI
|
||||
func (d *Docker) listContainers() (res []containerInfo, err error) {
|
||||
containers, err := d.DockerClient.ListContainers()
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "can't list containers")
|
||||
return nil, fmt.Errorf("can't list containers: %w", err)
|
||||
}
|
||||
|
||||
log.Printf("[DEBUG] total containers = %d", len(containers))
|
||||
@@ -276,7 +277,7 @@ func NewDockerClient(host, network string) DockerClient {
|
||||
var schemaRegex = regexp.MustCompile("^(?:([a-z0-9]+)://)?(.*)$")
|
||||
parts := schemaRegex.FindStringSubmatch(host)
|
||||
proto, addr := parts[1], parts[2]
|
||||
log.Printf("[DEBUG] Configuring docker client to talk to %s via %s", addr, proto)
|
||||
log.Printf("[DEBUG] configuring docker client to talk to %s via %s", addr, proto)
|
||||
|
||||
client := http.Client{
|
||||
Transport: &http.Transport{
|
||||
@@ -291,52 +292,57 @@ func NewDockerClient(host, network string) DockerClient {
|
||||
}
|
||||
|
||||
func (d *dockerClient) ListContainers() ([]containerInfo, error) {
|
||||
const APIVersion = "v1.41"
|
||||
|
||||
const dockerAPIVersion = "v1.41"
|
||||
|
||||
resp, err := d.client.Get(fmt.Sprintf("http://localhost/%s/containers/json", dockerAPIVersion))
|
||||
resp, err := d.client.Get(fmt.Sprintf("http://localhost/%s/containers/json", APIVersion))
|
||||
if err != nil {
|
||||
return nil, errors.Wrap(err, "failed connection to docker socket")
|
||||
return nil, fmt.Errorf("failed connection to docker socket: %w", err)
|
||||
}
|
||||
|
||||
defer resp.Body.Close()
|
||||
|
||||
response := []struct {
|
||||
containerInfo
|
||||
ID string `json:"Id"`
|
||||
Name string
|
||||
State string
|
||||
Labels map[string]string
|
||||
Created int64
|
||||
NetworkSettings struct {
|
||||
Networks map[string]struct {
|
||||
IPAddress string
|
||||
}
|
||||
}
|
||||
Names []string
|
||||
ExposedPorts []struct{ PrivatePort int } `json:"Ports"`
|
||||
Names []string
|
||||
Ports []struct{ PrivatePort int } `json:"Ports"`
|
||||
}{}
|
||||
|
||||
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
||||
return nil, errors.Wrap(err, "failed to parse response from docker daemon")
|
||||
return nil, fmt.Errorf("failed to parse response from docker daemon: %w", err)
|
||||
}
|
||||
|
||||
containers := make([]containerInfo, len(response))
|
||||
|
||||
for i, c := range response {
|
||||
// fill remaining fields
|
||||
c.TS = time.Unix(c.Created, 0)
|
||||
for i, resp := range response {
|
||||
c := containerInfo{}
|
||||
|
||||
for k, v := range c.NetworkSettings.Networks {
|
||||
c.ID = resp.ID
|
||||
c.Name = strings.TrimPrefix(resp.Names[0], "/")
|
||||
c.State = resp.State
|
||||
c.Labels = resp.Labels
|
||||
c.TS = time.Unix(resp.Created, 0)
|
||||
|
||||
for k, v := range resp.NetworkSettings.Networks {
|
||||
if d.network == "" || k == d.network { // match on network name if defined
|
||||
c.IP = v.IPAddress
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
c.Name = strings.TrimPrefix(c.Names[0], "/")
|
||||
|
||||
for _, p := range c.ExposedPorts {
|
||||
for _, p := range resp.Ports {
|
||||
c.Ports = append(c.Ports, p.PrivatePort)
|
||||
}
|
||||
|
||||
containers[i] = c.containerInfo
|
||||
containers[i] = c
|
||||
}
|
||||
|
||||
return containers, nil
|
||||
|
@@ -69,14 +69,61 @@ func TestDocker_List(t *testing.T) {
|
||||
assert.Equal(t, "*", res[2].Server)
|
||||
}
|
||||
|
||||
func TestDocker_ListWithAutoAPI(t *testing.T) {
|
||||
dclient := &DockerClientMock{
|
||||
ListContainersFunc: func() ([]containerInfo, error) {
|
||||
return []containerInfo{
|
||||
{
|
||||
Name: "c1", State: "running", IP: "127.0.0.2", Ports: []int{1345, 12345},
|
||||
Labels: map[string]string{"reproxy.route": "^/api/123/(.*)", "reproxy.dest": "/blah/$1",
|
||||
"reproxy.port": "12345", "reproxy.server": "example.com, example2.com", "reproxy.ping": "/ping"},
|
||||
},
|
||||
{
|
||||
Name: "c2", State: "running", IP: "127.0.0.3", Ports: []int{12346},
|
||||
},
|
||||
{
|
||||
Name: "c3", State: "stopped",
|
||||
},
|
||||
{
|
||||
Name: "c4", State: "running", IP: "127.0.0.122", Ports: []int{2345},
|
||||
Labels: map[string]string{"reproxy.enabled": "false"},
|
||||
},
|
||||
}, nil
|
||||
},
|
||||
}
|
||||
|
||||
d := Docker{DockerClient: dclient, AutoAPI: true}
|
||||
res, err := d.List()
|
||||
require.NoError(t, err)
|
||||
require.Equal(t, 3, len(res))
|
||||
|
||||
assert.Equal(t, "^/api/123/(.*)", res[0].SrcMatch.String())
|
||||
assert.Equal(t, "http://127.0.0.2:12345/blah/$1", res[0].Dst)
|
||||
assert.Equal(t, "example.com", res[0].Server)
|
||||
assert.Equal(t, "http://127.0.0.2:12345/ping", res[0].PingURL)
|
||||
|
||||
assert.Equal(t, "^/api/123/(.*)", res[1].SrcMatch.String())
|
||||
assert.Equal(t, "http://127.0.0.2:12345/blah/$1", res[1].Dst)
|
||||
assert.Equal(t, "example2.com", res[1].Server)
|
||||
assert.Equal(t, "http://127.0.0.2:12345/ping", res[1].PingURL)
|
||||
|
||||
assert.Equal(t, "^/api/c2/(.*)", res[2].SrcMatch.String())
|
||||
assert.Equal(t, "http://127.0.0.3:12346/$1", res[2].Dst)
|
||||
assert.Equal(t, "http://127.0.0.3:12346/ping", res[2].PingURL)
|
||||
assert.Equal(t, "*", res[2].Server)
|
||||
}
|
||||
|
||||
func TestDocker_refresh(t *testing.T) {
|
||||
containers := make(chan []containerInfo)
|
||||
|
||||
d := Docker{DockerClient: &DockerClientMock{
|
||||
ListContainersFunc: func() ([]containerInfo, error) {
|
||||
return <-containers, nil
|
||||
d := Docker{
|
||||
DockerClient: &DockerClientMock{
|
||||
ListContainersFunc: func() ([]containerInfo, error) {
|
||||
return <-containers, nil
|
||||
},
|
||||
},
|
||||
}}
|
||||
RefreshInterval: time.Nanosecond,
|
||||
}
|
||||
|
||||
events := make(chan discovery.ProviderID)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||
@@ -96,7 +143,6 @@ func TestDocker_refresh(t *testing.T) {
|
||||
}
|
||||
|
||||
go func() {
|
||||
dockerPollingInterval = time.Microsecond
|
||||
if err := d.events(ctx, events); err != context.Canceled {
|
||||
log.Fatal(err)
|
||||
}
|
||||
@@ -152,5 +198,5 @@ func TestDockerClient(t *testing.T) {
|
||||
assert.Equal(t, []int{80}, c[0].Ports)
|
||||
assert.Equal(t, time.Unix(1618417435, 0), c[0].TS)
|
||||
|
||||
assert.Equal(t, "", c[1].IP)
|
||||
assert.Empty(t, c[1].IP)
|
||||
}
|
||||
|
@@ -207,8 +207,11 @@ func makeProviders() ([]discovery.Provider, error) {
|
||||
if opts.Docker.AutoAPI {
|
||||
log.Printf("[INFO] auto-api enabled for docker")
|
||||
}
|
||||
|
||||
const refreshInterval = time.Second * 10 // seems like a reasonable default
|
||||
|
||||
res = append(res, &provider.Docker{DockerClient: client, Excludes: opts.Docker.Excluded,
|
||||
AutoAPI: opts.Docker.AutoAPI})
|
||||
AutoAPI: opts.Docker.AutoAPI, RefreshInterval: refreshInterval})
|
||||
}
|
||||
|
||||
if len(res) == 0 && opts.Assets.Location == "" {
|
||||
|
Reference in New Issue
Block a user