1
0
mirror of https://github.com/umputun/reproxy.git synced 2024-11-24 08:12:31 +02:00

add ping url and health check

This commit is contained in:
Umputun 2021-04-05 03:37:28 -05:00
parent 5759918cb6
commit 225590da2c
11 changed files with 197 additions and 42 deletions

View File

@ -23,9 +23,10 @@ type Service struct {
// UrlMapper contains all info about source and destination routes // UrlMapper contains all info about source and destination routes
type UrlMapper struct { type UrlMapper struct {
Server string Server string
SrcMatch *regexp.Regexp SrcMatch regexp.Regexp
Dst string Dst string
ProviderID ProviderID ProviderID ProviderID
PingURL string
} }
// Provider defines sources of mappers // Provider defines sources of mappers
@ -107,6 +108,14 @@ func (s *Service) Servers() (servers []string) {
return servers return servers
} }
// Mappers return list of all mappers
func (s *Service) Mappers() (mappers []UrlMapper) {
s.lock.RLock()
defer s.lock.RUnlock()
mappers = append(mappers, s.mappers...)
return mappers
}
func (s *Service) mergeLists() (res []UrlMapper) { func (s *Service) mergeLists() (res []UrlMapper) {
for _, p := range s.providers { for _, p := range s.providers {
lst, err := p.List() lst, err := p.List()

View File

@ -20,8 +20,8 @@ func TestService_Do(t *testing.T) {
}, },
ListFunc: func() ([]UrlMapper, error) { ListFunc: func() ([]UrlMapper, error) {
return []UrlMapper{ return []UrlMapper{
{Server: "*", SrcMatch: regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"}, {Server: "*", SrcMatch: *regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"},
{Server: "*", SrcMatch: regexp.MustCompile("^/api/svc2/(.*)"), Dst: "http://127.0.0.2:8080/blah2/$1/abc"}, {Server: "*", SrcMatch: *regexp.MustCompile("^/api/svc2/(.*)"), Dst: "http://127.0.0.2:8080/blah2/$1/abc"},
}, nil }, nil
}, },
IDFunc: func() ProviderID { IDFunc: func() ProviderID {
@ -34,7 +34,7 @@ func TestService_Do(t *testing.T) {
}, },
ListFunc: func() ([]UrlMapper, error) { ListFunc: func() ([]UrlMapper, error) {
return []UrlMapper{ return []UrlMapper{
{Server: "localhost", SrcMatch: regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"}, {Server: "localhost", SrcMatch: *regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"},
}, nil }, nil
}, },
IDFunc: func() ProviderID { IDFunc: func() ProviderID {
@ -74,8 +74,8 @@ func TestService_Match(t *testing.T) {
}, },
ListFunc: func() ([]UrlMapper, error) { ListFunc: func() ([]UrlMapper, error) {
return []UrlMapper{ return []UrlMapper{
{SrcMatch: regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"}, {SrcMatch: *regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"},
{Server: "m.example.com", SrcMatch: regexp.MustCompile("^/api/svc2/(.*)"), {Server: "m.example.com", SrcMatch: *regexp.MustCompile("^/api/svc2/(.*)"),
Dst: "http://127.0.0.2:8080/blah2/$1/abc"}, Dst: "http://127.0.0.2:8080/blah2/$1/abc"},
}, nil }, nil
}, },
@ -89,7 +89,7 @@ func TestService_Match(t *testing.T) {
}, },
ListFunc: func() ([]UrlMapper, error) { ListFunc: func() ([]UrlMapper, error) {
return []UrlMapper{ return []UrlMapper{
{SrcMatch: regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"}, {SrcMatch: *regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"},
}, nil }, nil
}, },
IDFunc: func() ProviderID { IDFunc: func() ProviderID {
@ -137,8 +137,8 @@ func TestService_Servers(t *testing.T) {
}, },
ListFunc: func() ([]UrlMapper, error) { ListFunc: func() ([]UrlMapper, error) {
return []UrlMapper{ return []UrlMapper{
{SrcMatch: regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"}, {SrcMatch: *regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"},
{Server: "m.example.com", SrcMatch: regexp.MustCompile("^/api/svc2/(.*)"), {Server: "m.example.com", SrcMatch: *regexp.MustCompile("^/api/svc2/(.*)"),
Dst: "http://127.0.0.2:8080/blah2/$1/abc"}, Dst: "http://127.0.0.2:8080/blah2/$1/abc"},
}, nil }, nil
}, },
@ -152,7 +152,7 @@ func TestService_Servers(t *testing.T) {
}, },
ListFunc: func() ([]UrlMapper, error) { ListFunc: func() ([]UrlMapper, error) {
return []UrlMapper{ return []UrlMapper{
{Server: "xx.reproxy.io", SrcMatch: regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"}, {Server: "xx.reproxy.io", SrcMatch: *regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"},
}, nil }, nil
}, },
IDFunc: func() ProviderID { IDFunc: func() ProviderID {

View File

@ -77,22 +77,29 @@ func (d *Docker) List() ([]discovery.UrlMapper, error) {
for _, c := range containers { for _, c := range containers {
srcURL := fmt.Sprintf("^/api/%s/(.*)", c.Name) srcURL := fmt.Sprintf("^/api/%s/(.*)", c.Name)
destURL := fmt.Sprintf("http://%s:%d/$1", c.IP, c.Port) destURL := fmt.Sprintf("http://%s:%d/$1", c.IP, c.Port)
pingURL := fmt.Sprintf("http://%s:%d/ping", c.IP, c.Port)
server := "*" server := "*"
if v, ok := c.Labels["dpx.route"]; ok {
if v, ok := c.Labels["reproxy.route"]; ok {
srcURL = v srcURL = v
} }
if v, ok := c.Labels["dpx.dest"]; ok { if v, ok := c.Labels["reproxy.dest"]; ok {
destURL = fmt.Sprintf("http://%s:%d%s", c.IP, c.Port, v) destURL = fmt.Sprintf("http://%s:%d%s", c.IP, c.Port, v)
} }
if v, ok := c.Labels["dpx.server"]; ok { if v, ok := c.Labels["reproxy.server"]; ok {
server = v server = v
} }
srcRegex, err := regexp.Compile(srcURL) srcRegex, err := regexp.Compile(srcURL)
if v, ok := c.Labels["reproxy.ping"]; ok {
pingURL = v
}
if err != nil { if err != nil {
return nil, errors.Wrapf(err, "invalid src regex %s", srcURL) return nil, errors.Wrapf(err, "invalid src regex %s", srcURL)
} }
res = append(res, discovery.UrlMapper{Server: server, SrcMatch: srcRegex, Dst: destURL}) res = append(res, discovery.UrlMapper{Server: server, SrcMatch: *srcRegex, Dst: destURL, PingURL: pingURL})
} }
return res, nil return res, nil
} }

View File

@ -21,7 +21,8 @@ func TestDocker_List(t *testing.T) {
Ports: []dclient.APIPort{ Ports: []dclient.APIPort{
{PrivatePort: 12345}, {PrivatePort: 12345},
}, },
Labels: map[string]string{"dpx.route": "^/api/123/(.*)", "dpx.dest": "/blah/$1", "dpx.server": "example.com"}, Labels: map[string]string{"reproxy.route": "^/api/123/(.*)", "reproxy.dest": "/blah/$1",
"reproxy.server": "example.com", "reproxy.ping": "http://localhost/ping"},
}, },
{Names: []string{"c2"}, Status: "start", {Names: []string{"c2"}, Status: "start",
Networks: dclient.NetworkList{ Networks: dclient.NetworkList{
@ -52,9 +53,11 @@ func TestDocker_List(t *testing.T) {
assert.Equal(t, "^/api/123/(.*)", res[0].SrcMatch.String()) assert.Equal(t, "^/api/123/(.*)", res[0].SrcMatch.String())
assert.Equal(t, "http://127.0.0.2:12345/blah/$1", res[0].Dst) assert.Equal(t, "http://127.0.0.2:12345/blah/$1", res[0].Dst)
assert.Equal(t, "example.com", res[0].Server) assert.Equal(t, "example.com", res[0].Server)
assert.Equal(t, "http://localhost/ping", res[0].PingURL)
assert.Equal(t, "^/api/c2/(.*)", res[1].SrcMatch.String()) assert.Equal(t, "^/api/c2/(.*)", res[1].SrcMatch.String())
assert.Equal(t, "http://127.0.0.3:12346/$1", res[1].Dst) assert.Equal(t, "http://127.0.0.3:12346/$1", res[1].Dst)
assert.Equal(t, "http://127.0.0.3:12346/ping", res[1].PingURL)
assert.Equal(t, "*", res[1].Server) assert.Equal(t, "*", res[1].Server)
} }

View File

@ -69,6 +69,7 @@ func (d *File) List() (res []discovery.UrlMapper, err error) {
var fileConf map[string][]struct { var fileConf map[string][]struct {
SourceRoute string `yaml:"route"` SourceRoute string `yaml:"route"`
Dest string `yaml:"dest"` Dest string `yaml:"dest"`
Ping string `yaml:"ping"`
} }
fh, err := os.Open(d.FileName) fh, err := os.Open(d.FileName)
if err != nil { if err != nil {
@ -90,7 +91,8 @@ func (d *File) List() (res []discovery.UrlMapper, err error) {
if srv == "default" { if srv == "default" {
srv = "*" srv = "*"
} }
res = append(res, discovery.UrlMapper{Server: srv, SrcMatch: rx, Dst: f.Dest}) mapper := discovery.UrlMapper{Server: srv, SrcMatch: *rx, Dst: f.Dest, PingURL: f.Ping}
res = append(res, mapper)
} }
} }
return res, nil return res, nil

View File

@ -60,5 +60,8 @@ func TestFile_List(t *testing.T) {
assert.Equal(t, 3, len(res)) assert.Equal(t, 3, len(res))
assert.Equal(t, "^/api/svc1/(.*)", res[0].SrcMatch.String()) assert.Equal(t, "^/api/svc1/(.*)", res[0].SrcMatch.String())
assert.Equal(t, "http://127.0.0.3:8080/blah3/xyz", res[1].Dst) assert.Equal(t, "http://127.0.0.3:8080/blah3/xyz", res[1].Dst)
assert.Equal(t, "http://127.0.0.3:8080/ping", res[1].PingURL)
assert.Equal(t, "http://127.0.0.2:8080/blah2/$1/abc", res[2].Dst) assert.Equal(t, "http://127.0.0.2:8080/blah2/$1/abc", res[2].Dst)
assert.Equal(t, "", res[2].PingURL)
} }

View File

@ -12,7 +12,7 @@ import (
// Static provider, rules are server,from,to // Static provider, rules are server,from,to
type Static struct { type Static struct {
Rules []string // each rule is 2 or 3 elements comma separated. [server,]source url,destination Rules []string // each rule is 5 elements comma separated. server,source url,destination,ping
} }
// Events returns channel updating once // Events returns channel updating once
@ -27,21 +27,20 @@ func (s *Static) List() (res []discovery.UrlMapper, err error) {
parse := func(inp string) (discovery.UrlMapper, error) { parse := func(inp string) (discovery.UrlMapper, error) {
elems := strings.Split(inp, ",") elems := strings.Split(inp, ",")
switch len(elems) { if len(elems) != 4 {
case 2: return discovery.UrlMapper{}, errors.Errorf("invalid rule %q", inp)
rx, err := regexp.Compile(strings.TrimSpace(elems[0]))
if err != nil {
return discovery.UrlMapper{}, errors.Wrapf(err, "can't parse regex %s", elems[0])
}
return discovery.UrlMapper{Server: "*", SrcMatch: rx, Dst: strings.TrimSpace(elems[1])}, nil
case 3:
rx, err := regexp.Compile(strings.TrimSpace(elems[1]))
if err != nil {
return discovery.UrlMapper{}, errors.Wrapf(err, "can't parse regex %s", elems[1])
}
return discovery.UrlMapper{Server: strings.TrimSpace(elems[0]), SrcMatch: rx, Dst: strings.TrimSpace(elems[2])}, nil
} }
return discovery.UrlMapper{}, errors.Errorf("can't parse entry %s", inp) rx, err := regexp.Compile(strings.TrimSpace(elems[1]))
if err != nil {
return discovery.UrlMapper{}, errors.Wrapf(err, "can't parse regex %s", elems[1])
}
return discovery.UrlMapper{
Server: strings.TrimSpace(elems[0]),
SrcMatch: *rx,
Dst: strings.TrimSpace(elems[2]),
PingURL: strings.TrimSpace(elems[3]),
}, nil
} }
for _, r := range s.Rules { for _, r := range s.Rules {

View File

@ -11,15 +11,15 @@ import (
func TestStatic_List(t *testing.T) { func TestStatic_List(t *testing.T) {
tbl := []struct { tbl := []struct {
rule string rule string
server, src, dst string server, src, dst, ping string
err bool err bool
}{ }{
{"example.com,123,456", "example.com", "123", "456", false}, {"example.com,123,456, ping ", "example.com", "123", "456", "ping", false},
{"*,123,456", "*", "123", "456", false}, {"*,123,456,", "*", "123", "456", "", false},
{"123,456", "*", "123", "456", false}, {"123,456", "", "", "", "", true},
{"123", "", "", "", true}, {"123", "", "", "", "", true},
{"example.com , 123, 456 ", "example.com", "123", "456", false}, {"example.com , 123, 456 ,ping", "example.com", "123", "456", "ping", false},
} }
for i, tt := range tbl { for i, tt := range tbl {
@ -35,6 +35,7 @@ func TestStatic_List(t *testing.T) {
assert.Equal(t, tt.server, res[0].Server) assert.Equal(t, tt.server, res[0].Server)
assert.Equal(t, tt.src, res[0].SrcMatch.String()) assert.Equal(t, tt.src, res[0].SrcMatch.String())
assert.Equal(t, tt.dst, res[0].Dst) assert.Equal(t, tt.dst, res[0].Dst)
assert.Equal(t, tt.ping, res[0].PingURL)
}) })
} }

View File

@ -1,5 +1,5 @@
default: default:
- {route: "^/api/svc1/(.*)", dest: "http://127.0.0.1:8080/blah1/$1"} - {route: "^/api/svc1/(.*)", dest: "http://127.0.0.1:8080/blah1/$1"}
- {route: "/api/svc3/xyz", dest: "http://127.0.0.3:8080/blah3/xyz"} - {route: "/api/svc3/xyz", dest: "http://127.0.0.3:8080/blah3/xyz", "ping": "http://127.0.0.3:8080/ping"}
srv.example.com: srv.example.com:
- {route: "^/api/svc2/(.*)", dest: "http://127.0.0.2:8080/blah2/$1/abc"} - {route: "^/api/svc2/(.*)", dest: "http://127.0.0.2:8080/blah2/$1/abc"}

View File

@ -2,6 +2,7 @@ package proxy
import ( import (
"context" "context"
"fmt"
"net" "net"
"net/http" "net/http"
"net/http/httputil" "net/http/httputil"
@ -9,6 +10,8 @@ import (
"regexp" "regexp"
"strconv" "strconv"
"strings" "strings"
"sync"
"sync/atomic"
"time" "time"
"github.com/go-pkgz/lgr" "github.com/go-pkgz/lgr"
@ -17,6 +20,7 @@ import (
R "github.com/go-pkgz/rest" R "github.com/go-pkgz/rest"
"github.com/go-pkgz/rest/logger" "github.com/go-pkgz/rest/logger"
"github.com/pkg/errors" "github.com/pkg/errors"
"github.com/umputun/reproxy/app/discovery"
) )
// Http is a proxy server for both http and https // Http is a proxy server for both http and https
@ -38,6 +42,7 @@ type Http struct {
type Matcher interface { type Matcher interface {
Match(srv, src string) (string, bool) Match(srv, src string) (string, bool)
Servers() (servers []string) Servers() (servers []string)
Mappers() (mappers []discovery.UrlMapper)
} }
// Run the lister and request's router, activate rest server // Run the lister and request's router, activate rest server
@ -67,6 +72,7 @@ func (h *Http) Run(ctx context.Context) error {
R.Recoverer(lgr.Default()), R.Recoverer(lgr.Default()),
R.AppInfo("dpx", "umputun", h.Version), R.AppInfo("dpx", "umputun", h.Version),
R.Ping, R.Ping,
h.healthMiddleware,
logger.New(logger.Prefix("[DEBUG] PROXY")).Handler, logger.New(logger.Prefix("[DEBUG] PROXY")).Handler,
R.SizeLimit(h.MaxBodySize), R.SizeLimit(h.MaxBodySize),
R.Headers(h.ProxyHeaders...), R.Headers(h.ProxyHeaders...),
@ -214,3 +220,73 @@ func (h *Http) setXRealIP(r *http.Request) {
} }
r.Header.Add("X-Real-IP", ip) r.Header.Add("X-Real-IP", ip)
} }
func (h *Http) healthMiddleware(next http.Handler) http.Handler {
fn := func(w http.ResponseWriter, r *http.Request) {
if r.Method == "GET" && strings.HasSuffix(strings.ToLower(r.URL.Path), "/health") {
h.healthHandler(w, r)
return
}
next.ServeHTTP(w, r)
}
return http.HandlerFunc(fn)
}
func (h *Http) healthHandler(w http.ResponseWriter, r *http.Request) {
// runs pings in parallel
check := func(mappers []discovery.UrlMapper) (ok bool, valid int, total int, errs []string) {
outCh := make(chan error, 8)
var pinged int32
var wg sync.WaitGroup
for _, m := range mappers {
if m.PingURL == "" {
continue
}
wg.Add(1)
go func(m discovery.UrlMapper) {
defer wg.Done()
atomic.AddInt32(&pinged, 1)
client := http.Client{Timeout: 100 * time.Millisecond}
resp, err := client.Get(m.PingURL)
if err != nil {
log.Printf("[WARN] failed to ping for health %s, %v", m.PingURL, err)
outCh <- fmt.Errorf("%s, %v", m.PingURL, err)
return
}
if resp.StatusCode != http.StatusOK {
log.Printf("[WARN] failed ping status for health %s (%s)", m.PingURL, resp.Status)
outCh <- fmt.Errorf("%s, %s", m.PingURL, resp.Status)
return
}
}(m)
}
go func() {
wg.Wait()
close(outCh)
}()
for e := range outCh {
errs = append(errs, e.Error())
}
return len(errs) == 0, int(atomic.LoadInt32(&pinged)) - len(errs), len(mappers), errs
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
ok, valid, total, errs := check(h.Mappers())
if !ok {
w.WriteHeader(http.StatusExpectationFailed)
_, err := fmt.Fprintf(w, `{"status": "failed", "passed": %d, "failed":%d, "errors": "%+v"}`, valid, total-valid, errs)
if err != nil {
log.Printf("[WARN] failed %v", err)
}
return
}
w.WriteHeader(http.StatusOK)
_, err := fmt.Fprintf(w, `{"status": "ok", "services": %d}`, valid)
if err != nil {
log.Printf("[WARN] failed to send halth, %v", err)
}
}

View File

@ -2,6 +2,7 @@ package proxy
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"io" "io"
"math/rand" "math/rand"
@ -33,8 +34,8 @@ func TestHttp_Do(t *testing.T) {
svc := discovery.NewService([]discovery.Provider{ svc := discovery.NewService([]discovery.Provider{
&provider.Static{Rules: []string{ &provider.Static{Rules: []string{
"localhost,^/api/(.*)," + ds.URL + "/123/$1", "localhost,^/api/(.*)," + ds.URL + "/123/$1,",
"127.0.0.1,^/api/(.*)," + ds.URL + "/567/$1", "127.0.0.1,^/api/(.*)," + ds.URL + "/567/$1,",
}, },
}}) }})
@ -110,3 +111,57 @@ func TestHttp_toHttp(t *testing.T) {
} }
} }
func TestHttp_healthHandler(t *testing.T) {
port := rand.Intn(10000) + 40000
h := Http{TimeOut: 200 * time.Millisecond, Address: fmt.Sprintf("127.0.0.1:%d", port)}
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
ds := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Logf("req: %v", r)
w.Header().Add("h1", "v1")
fmt.Fprintf(w, "response %s", r.URL.String())
}))
ps := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
t.Logf("req: %v", r)
if r.URL.Path == "/123/ping" {
return
}
w.WriteHeader(http.StatusBadRequest)
}))
svc := discovery.NewService([]discovery.Provider{
&provider.Static{Rules: []string{
"localhost,^/api/(.*)," + ds.URL + "/123/$1," + ps.URL + "/123/ping",
"127.0.0.1,^/api/(.*)," + ds.URL + "/567/$1," + ps.URL + "/567/ping",
},
}})
go func() {
_ = svc.Run(context.Background())
}()
h.Matcher = svc
go func() {
_ = h.Run(ctx)
}()
time.Sleep(10 * time.Millisecond)
client := http.Client{}
req, err := http.NewRequest("GET", "http://127.0.0.1:"+strconv.Itoa(port)+"/health", nil)
require.NoError(t, err)
resp, err := client.Do(req)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equal(t, http.StatusExpectationFailed, resp.StatusCode)
res := map[string]interface{}{}
err = json.NewDecoder(resp.Body).Decode(&res)
require.NoError(t, err)
assert.Equal(t, "failed", res["status"])
assert.Equal(t, 1., res["passed"])
assert.Equal(t, 1., res["failed"])
assert.Contains(t, res["errors"], "400 Bad Request")
}