1
0
mirror of https://github.com/umputun/reproxy.git synced 2025-11-23 22:04:57 +02:00
Files
reproxy/app/discovery/provider/docker.go

392 lines
10 KiB
Go
Raw Normal View History

2021-04-01 02:37:28 -05:00
package provider
import (
"context"
"encoding/json"
2021-04-01 02:37:28 -05:00
"fmt"
"net"
"net/http"
2021-04-01 02:37:28 -05:00
"regexp"
"sort"
"strconv"
2021-04-01 02:37:28 -05:00
"strings"
"time"
log "github.com/go-pkgz/lgr"
2021-04-03 14:23:23 -05:00
"github.com/umputun/reproxy/app/discovery"
2021-04-01 02:37:28 -05:00
)
//go:generate moq -out docker_client_mock.go -skip-ensure -fmt goimports . DockerClient
2021-04-07 21:52:14 -05:00
// Docker provider watches compatible for stop/start changes from containers and maps by
// default from ^/api/%s/(.*) to http://%s:%d/$1, i.e. http://example.com/api/my_container/something
// will be mapped to http://172.17.42.1:8080/something. Ip will be the internal ip of the container and port exposed
// in the Dockerfile.
// Alternatively labels can alter this. reproxy.route sets source route, and reproxy.dest sets the destination.
2021-04-07 21:52:14 -05:00
// Optional reproxy.server enforces match by server name (hostname) and reproxy.ping sets the health check url
2021-04-01 02:37:28 -05:00
type Docker struct {
DockerClient DockerClient
Excludes []string
AutoAPI bool
2021-04-28 14:00:38 -05:00
APIPrefix string
RefreshInterval time.Duration
2021-04-01 02:37:28 -05:00
}
// DockerClient defines interface listing containers and subscribing to events
type DockerClient interface {
ListContainers() ([]containerInfo, error)
2021-04-01 02:37:28 -05:00
}
// containerInfo is simplified view of container metadata
2021-04-01 02:37:28 -05:00
type containerInfo struct {
ID string
Name string
State string
Labels map[string]string
TS time.Time
IP string
Ports []int
2021-04-01 02:37:28 -05:00
}
2021-04-08 22:06:16 -05:00
// Events gets eventsCh with all containers-related docker events events
func (d *Docker) Events(ctx context.Context) (res <-chan discovery.ProviderID) {
eventsCh := make(chan discovery.ProviderID)
go func() {
if err := d.events(ctx, eventsCh); err != context.Canceled {
log.Printf("[ERROR] unexpected docker client exit reason: %s", err)
}
}()
2021-04-01 02:37:28 -05:00
return eventsCh
}
// List all containers and make url mappers
// If AutoAPI enabled all each container and set all params, if not - allow only container with reproxy.* labels
2021-04-09 15:05:22 -05:00
func (d *Docker) List() ([]discovery.URLMapper, error) {
containers, err := d.listContainers(true)
2021-04-01 02:37:28 -05:00
if err != nil {
return nil, err
}
2021-04-09 15:05:22 -05:00
res := make([]discovery.URLMapper, 0, len(containers))
2021-04-01 02:37:28 -05:00
for _, c := range containers {
enabled, explicit := false, false
2021-04-28 14:00:38 -05:00
srcURL := fmt.Sprintf("^/%s/(.*)", c.Name) // default destination
if d.APIPrefix != "" {
prefix := strings.TrimLeft(d.APIPrefix, "/")
prefix = strings.TrimRight(prefix, "/")
srcURL = fmt.Sprintf("^/%s/%s/(.*)", prefix, c.Name) // default destination with api prefix
}
if d.AutoAPI {
enabled = true
}
port, err := d.matchedPort(c)
if err != nil {
log.Printf("[DEBUG] container %s disabled, %v", c.Name, err)
continue
}
destURL := fmt.Sprintf("http://%s:%d/$1", c.IP, port)
pingURL := fmt.Sprintf("http://%s:%d/ping", c.IP, port)
2021-04-02 00:07:36 -05:00
server := "*"
assetsWebRoot, assetsLocation := "", ""
2021-04-05 03:37:28 -05:00
// we don't care about value because disabled will be filtered before
if _, ok := c.Labels["reproxy.enabled"]; ok {
enabled, explicit = true, true
}
2021-04-05 03:37:28 -05:00
if v, ok := c.Labels["reproxy.route"]; ok {
enabled, explicit = true, true
2021-04-01 02:37:28 -05:00
srcURL = v
}
2021-04-05 03:37:28 -05:00
if v, ok := c.Labels["reproxy.dest"]; ok {
enabled, explicit = true, true
destURL = fmt.Sprintf("http://%s:%d%s", c.IP, port, v)
2021-04-01 02:37:28 -05:00
}
2021-04-05 03:37:28 -05:00
if v, ok := c.Labels["reproxy.server"]; ok {
enabled = true
2021-04-02 00:07:36 -05:00
server = v
}
2021-04-05 03:37:28 -05:00
if v, ok := c.Labels["reproxy.ping"]; ok {
enabled = true
pingURL = fmt.Sprintf("http://%s:%d%s", c.IP, port, v)
2021-04-05 03:37:28 -05:00
}
if v, ok := c.Labels["reproxy.assets"]; ok {
if ae := strings.Split(v, ":"); len(ae) == 2 {
enabled = true
assetsWebRoot = ae[0]
assetsLocation = ae[1]
}
}
if !enabled {
log.Printf("[DEBUG] container %s disabled", c.Name)
continue
}
srcRegex, err := regexp.Compile(srcURL)
2021-04-01 02:37:28 -05:00
if err != nil {
return nil, fmt.Errorf("invalid src regex: %w", err)
2021-04-01 02:37:28 -05:00
}
// docker server label may have multiple, comma separated servers
for _, srv := range strings.Split(server, ",") {
mp := discovery.URLMapper{Server: strings.TrimSpace(srv), SrcMatch: *srcRegex, Dst: destURL,
PingURL: pingURL, ProviderID: discovery.PIDocker, MatchType: discovery.MTProxy}
// for assets we add the second proxy mapping only if explicitly requested
if assetsWebRoot != "" && explicit {
mp.MatchType = discovery.MTProxy
res = append(res, mp)
}
if assetsWebRoot != "" {
mp.MatchType = discovery.MTStatic
mp.AssetsWebRoot = assetsWebRoot
mp.AssetsLocation = assetsLocation
}
res = append(res, mp)
}
2021-04-01 02:37:28 -05:00
}
// sort by len(SrcMatch) to have shorter matches after longer
// this way we can handle possible conflicts with more detailed match triggered before less detailed
sort.Slice(res, func(i, j int) bool {
return len(res[i].SrcMatch.String()) > len(res[j].SrcMatch.String())
})
2021-04-01 02:37:28 -05:00
return res, nil
}
func (d *Docker) matchedPort(c containerInfo) (port int, err error) {
port = c.Ports[0] // by default use the first exposed port
if v, ok := c.Labels["reproxy.port"]; ok {
rp, err := strconv.Atoi(v)
if err != nil {
return 0, fmt.Errorf("invalid reproxy port %s: %w", v, err)
}
for _, p := range c.Ports {
// set port to reproxy.port if matched with one of exposed
if p == rp {
port = rp
break
}
}
}
return port, nil
}
// events starts monitoring changes in running containers and sends refresh
// notification to eventsCh when change(s) are detected. Blocks caller
func (d *Docker) events(ctx context.Context, eventsCh chan<- discovery.ProviderID) error {
ticker := time.NewTicker(d.RefreshInterval)
defer ticker.Stop()
2021-04-01 02:37:28 -05:00
// Keep track of running containers
saved := make(map[string]containerInfo)
update := func() {
containers, err := d.listContainers(false)
if err != nil {
log.Printf("[ERROR] failed to fetch running containers: %s", err)
return
}
refresh := false
seen := make(map[string]bool)
for _, c := range containers {
old, exists := saved[c.ID]
if !exists || c.IP != old.IP || c.State != old.State || !c.TS.Equal(old.TS) {
refresh = true
}
seen[c.ID] = true
}
if len(saved) != len(seen) || refresh {
log.Printf("[INFO] changes in running containers detected: refreshing routes")
for k := range saved {
delete(saved, k)
}
for _, c := range containers {
saved[c.ID] = c
}
eventsCh <- discovery.PIDocker
}
}
update() // Refresh immediately
2021-04-01 02:37:28 -05:00
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
update()
2021-04-01 02:37:28 -05:00
}
}
}
func (d *Docker) listContainers(allowLogging bool) (res []containerInfo, err error) {
containers, err := d.DockerClient.ListContainers()
2021-04-01 02:37:28 -05:00
if err != nil {
return nil, fmt.Errorf("can't list containers: %w", err)
2021-04-01 02:37:28 -05:00
}
if allowLogging {
log.Printf("[DEBUG] total containers = %d", len(containers))
}
2021-04-01 02:37:28 -05:00
for _, c := range containers {
if c.State != "running" {
if allowLogging {
log.Printf("[DEBUG] skip container %s due to state %s", c.Name, c.State)
}
2021-04-01 02:37:28 -05:00
continue
}
if discovery.Contains(c.Name, d.Excludes) || strings.EqualFold(c.Name, "reproxy") {
if allowLogging {
log.Printf("[DEBUG] container %s excluded", c.Name)
}
2021-04-01 02:37:28 -05:00
continue
}
if v, ok := c.Labels["reproxy.enabled"]; ok {
if strings.EqualFold(v, "false") || strings.EqualFold(v, "no") || v == "0" {
if allowLogging {
log.Printf("[DEBUG] skip container %s due to reproxy.enabled=%s", c.Name, v)
}
continue
}
}
if c.IP == "" {
if allowLogging {
log.Printf("[DEBUG] skip container %s, no ip on defined networks", c.Name)
}
continue
}
if len(c.Ports) == 0 {
if allowLogging {
log.Printf("[DEBUG] skip container %s, no exposed ports", c.Name)
}
continue
}
if allowLogging {
log.Printf("[DEBUG] running container added, %+v", c)
}
res = append(res, c)
2021-04-01 02:37:28 -05:00
}
if allowLogging {
log.Print("[DEBUG] completed list")
}
2021-04-01 02:37:28 -05:00
return res, nil
}
type dockerClient struct {
client http.Client
network string // network for IP selection
}
// NewDockerClient constructs docker client for given host and network
func NewDockerClient(host, network string) DockerClient {
var schemaRegex = regexp.MustCompile("^(?:([a-z0-9]+)://)?(.*)$")
parts := schemaRegex.FindStringSubmatch(host)
proto, addr := parts[1], parts[2]
log.Printf("[DEBUG] configuring docker client to talk to %s via %s", addr, proto)
client := http.Client{
Transport: &http.Transport{
DialContext: func(_ context.Context, _, _ string) (net.Conn, error) {
return net.Dial(proto, addr)
},
},
Timeout: time.Second * 5,
}
return &dockerClient{client, network}
}
func (d *dockerClient) ListContainers() ([]containerInfo, error) {
2021-04-17 16:34:49 +03:00
// Minimum API version that returns attached networks
// docs.docker.com/engine/api/version-history/#v122-api-changes
const APIVersion = "v1.22"
resp, err := d.client.Get(fmt.Sprintf("http://localhost/%s/containers/json", APIVersion))
if err != nil {
return nil, fmt.Errorf("failed connection to docker socket: %w", err)
}
defer resp.Body.Close()
2021-04-17 16:36:23 +03:00
if resp.StatusCode != http.StatusOK {
e := struct {
Message string `json:"message"`
}{}
if err := json.NewDecoder(resp.Body).Decode(&e); err != nil {
return nil, fmt.Errorf("failed to parse error from docker daemon: %w", err)
}
return nil, fmt.Errorf("unexpected error from docker daemon: %s", e.Message)
}
2021-04-17 22:55:32 -05:00
var response []struct {
ID string `json:"Id"`
Name string
State string
Labels map[string]string
Created int64
NetworkSettings struct {
Networks map[string]struct {
IPAddress string
}
}
Names []string
Ports []struct{ PrivatePort int } `json:"Ports"`
}
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
return nil, fmt.Errorf("failed to parse response from docker daemon: %w", err)
}
containers := make([]containerInfo, len(response))
for i, resp := range response {
c := containerInfo{}
c.ID = resp.ID
c.Name = strings.TrimPrefix(resp.Names[0], "/")
c.State = resp.State
c.Labels = resp.Labels
c.TS = time.Unix(resp.Created, 0)
for k, v := range resp.NetworkSettings.Networks {
if d.network == "" || k == d.network { // match on network name if defined
c.IP = v.IPAddress
break
}
}
for _, p := range resp.Ports {
c.Ports = append(c.Ports, p.PrivatePort)
}
containers[i] = c
}
return containers, nil
}