From 225590da2c1c79ae7d9cea7bd38b2bfbfda44dee Mon Sep 17 00:00:00 2001 From: Umputun Date: Mon, 5 Apr 2021 03:37:28 -0500 Subject: [PATCH] add ping url and health check --- app/discovery/discovery.go | 11 +++- app/discovery/discovery_test.go | 18 ++--- app/discovery/provider/docker.go | 15 +++-- app/discovery/provider/docker_test.go | 5 +- app/discovery/provider/file.go | 4 +- app/discovery/provider/file_test.go | 3 + app/discovery/provider/static.go | 29 ++++----- app/discovery/provider/static_test.go | 17 ++--- app/discovery/provider/testdata/config.yml | 2 +- app/proxy/proxy.go | 76 ++++++++++++++++++++++ app/proxy/proxy_test.go | 59 ++++++++++++++++- 11 files changed, 197 insertions(+), 42 deletions(-) diff --git a/app/discovery/discovery.go b/app/discovery/discovery.go index 264143d..039f9fb 100644 --- a/app/discovery/discovery.go +++ b/app/discovery/discovery.go @@ -23,9 +23,10 @@ type Service struct { // UrlMapper contains all info about source and destination routes type UrlMapper struct { Server string - SrcMatch *regexp.Regexp + SrcMatch regexp.Regexp Dst string ProviderID ProviderID + PingURL string } // Provider defines sources of mappers @@ -107,6 +108,14 @@ func (s *Service) Servers() (servers []string) { return servers } +// Mappers return list of all mappers +func (s *Service) Mappers() (mappers []UrlMapper) { + s.lock.RLock() + defer s.lock.RUnlock() + mappers = append(mappers, s.mappers...) + return mappers +} + func (s *Service) mergeLists() (res []UrlMapper) { for _, p := range s.providers { lst, err := p.List() diff --git a/app/discovery/discovery_test.go b/app/discovery/discovery_test.go index b441667..859af82 100644 --- a/app/discovery/discovery_test.go +++ b/app/discovery/discovery_test.go @@ -20,8 +20,8 @@ func TestService_Do(t *testing.T) { }, ListFunc: func() ([]UrlMapper, error) { return []UrlMapper{ - {Server: "*", SrcMatch: regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"}, - {Server: "*", SrcMatch: regexp.MustCompile("^/api/svc2/(.*)"), Dst: "http://127.0.0.2:8080/blah2/$1/abc"}, + {Server: "*", SrcMatch: *regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"}, + {Server: "*", SrcMatch: *regexp.MustCompile("^/api/svc2/(.*)"), Dst: "http://127.0.0.2:8080/blah2/$1/abc"}, }, nil }, IDFunc: func() ProviderID { @@ -34,7 +34,7 @@ func TestService_Do(t *testing.T) { }, ListFunc: func() ([]UrlMapper, error) { return []UrlMapper{ - {Server: "localhost", SrcMatch: regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"}, + {Server: "localhost", SrcMatch: *regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"}, }, nil }, IDFunc: func() ProviderID { @@ -74,8 +74,8 @@ func TestService_Match(t *testing.T) { }, ListFunc: func() ([]UrlMapper, error) { return []UrlMapper{ - {SrcMatch: regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"}, - {Server: "m.example.com", SrcMatch: regexp.MustCompile("^/api/svc2/(.*)"), + {SrcMatch: *regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"}, + {Server: "m.example.com", SrcMatch: *regexp.MustCompile("^/api/svc2/(.*)"), Dst: "http://127.0.0.2:8080/blah2/$1/abc"}, }, nil }, @@ -89,7 +89,7 @@ func TestService_Match(t *testing.T) { }, ListFunc: func() ([]UrlMapper, error) { return []UrlMapper{ - {SrcMatch: regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"}, + {SrcMatch: *regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"}, }, nil }, IDFunc: func() ProviderID { @@ -137,8 +137,8 @@ func TestService_Servers(t *testing.T) { }, ListFunc: func() ([]UrlMapper, error) { return []UrlMapper{ - {SrcMatch: regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"}, - {Server: "m.example.com", SrcMatch: regexp.MustCompile("^/api/svc2/(.*)"), + {SrcMatch: *regexp.MustCompile("^/api/svc1/(.*)"), Dst: "http://127.0.0.1:8080/blah1/$1"}, + {Server: "m.example.com", SrcMatch: *regexp.MustCompile("^/api/svc2/(.*)"), Dst: "http://127.0.0.2:8080/blah2/$1/abc"}, }, nil }, @@ -152,7 +152,7 @@ func TestService_Servers(t *testing.T) { }, ListFunc: func() ([]UrlMapper, error) { return []UrlMapper{ - {Server: "xx.reproxy.io", SrcMatch: regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"}, + {Server: "xx.reproxy.io", SrcMatch: *regexp.MustCompile("/api/svc3/xyz"), Dst: "http://127.0.0.3:8080/blah3/xyz"}, }, nil }, IDFunc: func() ProviderID { diff --git a/app/discovery/provider/docker.go b/app/discovery/provider/docker.go index da2468a..280ee75 100644 --- a/app/discovery/provider/docker.go +++ b/app/discovery/provider/docker.go @@ -77,22 +77,29 @@ func (d *Docker) List() ([]discovery.UrlMapper, error) { for _, c := range containers { srcURL := fmt.Sprintf("^/api/%s/(.*)", c.Name) destURL := fmt.Sprintf("http://%s:%d/$1", c.IP, c.Port) + pingURL := fmt.Sprintf("http://%s:%d/ping", c.IP, c.Port) server := "*" - if v, ok := c.Labels["dpx.route"]; ok { + + if v, ok := c.Labels["reproxy.route"]; ok { srcURL = v } - if v, ok := c.Labels["dpx.dest"]; ok { + if v, ok := c.Labels["reproxy.dest"]; ok { destURL = fmt.Sprintf("http://%s:%d%s", c.IP, c.Port, v) } - if v, ok := c.Labels["dpx.server"]; ok { + if v, ok := c.Labels["reproxy.server"]; ok { server = v } srcRegex, err := regexp.Compile(srcURL) + + if v, ok := c.Labels["reproxy.ping"]; ok { + pingURL = v + } + if err != nil { return nil, errors.Wrapf(err, "invalid src regex %s", srcURL) } - res = append(res, discovery.UrlMapper{Server: server, SrcMatch: srcRegex, Dst: destURL}) + res = append(res, discovery.UrlMapper{Server: server, SrcMatch: *srcRegex, Dst: destURL, PingURL: pingURL}) } return res, nil } diff --git a/app/discovery/provider/docker_test.go b/app/discovery/provider/docker_test.go index 9f39681..96ff96f 100644 --- a/app/discovery/provider/docker_test.go +++ b/app/discovery/provider/docker_test.go @@ -21,7 +21,8 @@ func TestDocker_List(t *testing.T) { Ports: []dclient.APIPort{ {PrivatePort: 12345}, }, - Labels: map[string]string{"dpx.route": "^/api/123/(.*)", "dpx.dest": "/blah/$1", "dpx.server": "example.com"}, + Labels: map[string]string{"reproxy.route": "^/api/123/(.*)", "reproxy.dest": "/blah/$1", + "reproxy.server": "example.com", "reproxy.ping": "http://localhost/ping"}, }, {Names: []string{"c2"}, Status: "start", Networks: dclient.NetworkList{ @@ -52,9 +53,11 @@ func TestDocker_List(t *testing.T) { assert.Equal(t, "^/api/123/(.*)", res[0].SrcMatch.String()) assert.Equal(t, "http://127.0.0.2:12345/blah/$1", res[0].Dst) assert.Equal(t, "example.com", res[0].Server) + assert.Equal(t, "http://localhost/ping", res[0].PingURL) assert.Equal(t, "^/api/c2/(.*)", res[1].SrcMatch.String()) assert.Equal(t, "http://127.0.0.3:12346/$1", res[1].Dst) + assert.Equal(t, "http://127.0.0.3:12346/ping", res[1].PingURL) assert.Equal(t, "*", res[1].Server) } diff --git a/app/discovery/provider/file.go b/app/discovery/provider/file.go index 0e7f3e3..6268671 100644 --- a/app/discovery/provider/file.go +++ b/app/discovery/provider/file.go @@ -69,6 +69,7 @@ func (d *File) List() (res []discovery.UrlMapper, err error) { var fileConf map[string][]struct { SourceRoute string `yaml:"route"` Dest string `yaml:"dest"` + Ping string `yaml:"ping"` } fh, err := os.Open(d.FileName) if err != nil { @@ -90,7 +91,8 @@ func (d *File) List() (res []discovery.UrlMapper, err error) { if srv == "default" { srv = "*" } - res = append(res, discovery.UrlMapper{Server: srv, SrcMatch: rx, Dst: f.Dest}) + mapper := discovery.UrlMapper{Server: srv, SrcMatch: *rx, Dst: f.Dest, PingURL: f.Ping} + res = append(res, mapper) } } return res, nil diff --git a/app/discovery/provider/file_test.go b/app/discovery/provider/file_test.go index 6aeee63..9e91ece 100644 --- a/app/discovery/provider/file_test.go +++ b/app/discovery/provider/file_test.go @@ -60,5 +60,8 @@ func TestFile_List(t *testing.T) { assert.Equal(t, 3, len(res)) assert.Equal(t, "^/api/svc1/(.*)", res[0].SrcMatch.String()) assert.Equal(t, "http://127.0.0.3:8080/blah3/xyz", res[1].Dst) + assert.Equal(t, "http://127.0.0.3:8080/ping", res[1].PingURL) assert.Equal(t, "http://127.0.0.2:8080/blah2/$1/abc", res[2].Dst) + assert.Equal(t, "", res[2].PingURL) + } diff --git a/app/discovery/provider/static.go b/app/discovery/provider/static.go index b1e30f4..a3ef564 100644 --- a/app/discovery/provider/static.go +++ b/app/discovery/provider/static.go @@ -12,7 +12,7 @@ import ( // Static provider, rules are server,from,to type Static struct { - Rules []string // each rule is 2 or 3 elements comma separated. [server,]source url,destination + Rules []string // each rule is 5 elements comma separated. server,source url,destination,ping } // Events returns channel updating once @@ -27,21 +27,20 @@ func (s *Static) List() (res []discovery.UrlMapper, err error) { parse := func(inp string) (discovery.UrlMapper, error) { elems := strings.Split(inp, ",") - switch len(elems) { - case 2: - rx, err := regexp.Compile(strings.TrimSpace(elems[0])) - if err != nil { - return discovery.UrlMapper{}, errors.Wrapf(err, "can't parse regex %s", elems[0]) - } - return discovery.UrlMapper{Server: "*", SrcMatch: rx, Dst: strings.TrimSpace(elems[1])}, nil - case 3: - rx, err := regexp.Compile(strings.TrimSpace(elems[1])) - if err != nil { - return discovery.UrlMapper{}, errors.Wrapf(err, "can't parse regex %s", elems[1]) - } - return discovery.UrlMapper{Server: strings.TrimSpace(elems[0]), SrcMatch: rx, Dst: strings.TrimSpace(elems[2])}, nil + if len(elems) != 4 { + return discovery.UrlMapper{}, errors.Errorf("invalid rule %q", inp) } - return discovery.UrlMapper{}, errors.Errorf("can't parse entry %s", inp) + rx, err := regexp.Compile(strings.TrimSpace(elems[1])) + if err != nil { + return discovery.UrlMapper{}, errors.Wrapf(err, "can't parse regex %s", elems[1]) + } + + return discovery.UrlMapper{ + Server: strings.TrimSpace(elems[0]), + SrcMatch: *rx, + Dst: strings.TrimSpace(elems[2]), + PingURL: strings.TrimSpace(elems[3]), + }, nil } for _, r := range s.Rules { diff --git a/app/discovery/provider/static_test.go b/app/discovery/provider/static_test.go index 95a35fd..e397ee6 100644 --- a/app/discovery/provider/static_test.go +++ b/app/discovery/provider/static_test.go @@ -11,15 +11,15 @@ import ( func TestStatic_List(t *testing.T) { tbl := []struct { - rule string - server, src, dst string - err bool + rule string + server, src, dst, ping string + err bool }{ - {"example.com,123,456", "example.com", "123", "456", false}, - {"*,123,456", "*", "123", "456", false}, - {"123,456", "*", "123", "456", false}, - {"123", "", "", "", true}, - {"example.com , 123, 456 ", "example.com", "123", "456", false}, + {"example.com,123,456, ping ", "example.com", "123", "456", "ping", false}, + {"*,123,456,", "*", "123", "456", "", false}, + {"123,456", "", "", "", "", true}, + {"123", "", "", "", "", true}, + {"example.com , 123, 456 ,ping", "example.com", "123", "456", "ping", false}, } for i, tt := range tbl { @@ -35,6 +35,7 @@ func TestStatic_List(t *testing.T) { assert.Equal(t, tt.server, res[0].Server) assert.Equal(t, tt.src, res[0].SrcMatch.String()) assert.Equal(t, tt.dst, res[0].Dst) + assert.Equal(t, tt.ping, res[0].PingURL) }) } diff --git a/app/discovery/provider/testdata/config.yml b/app/discovery/provider/testdata/config.yml index 9498b6c..6d4bb1d 100644 --- a/app/discovery/provider/testdata/config.yml +++ b/app/discovery/provider/testdata/config.yml @@ -1,5 +1,5 @@ default: - {route: "^/api/svc1/(.*)", dest: "http://127.0.0.1:8080/blah1/$1"} - - {route: "/api/svc3/xyz", dest: "http://127.0.0.3:8080/blah3/xyz"} + - {route: "/api/svc3/xyz", dest: "http://127.0.0.3:8080/blah3/xyz", "ping": "http://127.0.0.3:8080/ping"} srv.example.com: - {route: "^/api/svc2/(.*)", dest: "http://127.0.0.2:8080/blah2/$1/abc"} diff --git a/app/proxy/proxy.go b/app/proxy/proxy.go index a8e5f9f..111db4e 100644 --- a/app/proxy/proxy.go +++ b/app/proxy/proxy.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "fmt" "net" "net/http" "net/http/httputil" @@ -9,6 +10,8 @@ import ( "regexp" "strconv" "strings" + "sync" + "sync/atomic" "time" "github.com/go-pkgz/lgr" @@ -17,6 +20,7 @@ import ( R "github.com/go-pkgz/rest" "github.com/go-pkgz/rest/logger" "github.com/pkg/errors" + "github.com/umputun/reproxy/app/discovery" ) // Http is a proxy server for both http and https @@ -38,6 +42,7 @@ type Http struct { type Matcher interface { Match(srv, src string) (string, bool) Servers() (servers []string) + Mappers() (mappers []discovery.UrlMapper) } // Run the lister and request's router, activate rest server @@ -67,6 +72,7 @@ func (h *Http) Run(ctx context.Context) error { R.Recoverer(lgr.Default()), R.AppInfo("dpx", "umputun", h.Version), R.Ping, + h.healthMiddleware, logger.New(logger.Prefix("[DEBUG] PROXY")).Handler, R.SizeLimit(h.MaxBodySize), R.Headers(h.ProxyHeaders...), @@ -214,3 +220,73 @@ func (h *Http) setXRealIP(r *http.Request) { } r.Header.Add("X-Real-IP", ip) } + +func (h *Http) healthMiddleware(next http.Handler) http.Handler { + fn := func(w http.ResponseWriter, r *http.Request) { + if r.Method == "GET" && strings.HasSuffix(strings.ToLower(r.URL.Path), "/health") { + h.healthHandler(w, r) + return + } + next.ServeHTTP(w, r) + } + return http.HandlerFunc(fn) +} + +func (h *Http) healthHandler(w http.ResponseWriter, r *http.Request) { + + // runs pings in parallel + check := func(mappers []discovery.UrlMapper) (ok bool, valid int, total int, errs []string) { + outCh := make(chan error, 8) + var pinged int32 + var wg sync.WaitGroup + for _, m := range mappers { + if m.PingURL == "" { + continue + } + wg.Add(1) + go func(m discovery.UrlMapper) { + defer wg.Done() + + atomic.AddInt32(&pinged, 1) + client := http.Client{Timeout: 100 * time.Millisecond} + resp, err := client.Get(m.PingURL) + if err != nil { + log.Printf("[WARN] failed to ping for health %s, %v", m.PingURL, err) + outCh <- fmt.Errorf("%s, %v", m.PingURL, err) + return + } + if resp.StatusCode != http.StatusOK { + log.Printf("[WARN] failed ping status for health %s (%s)", m.PingURL, resp.Status) + outCh <- fmt.Errorf("%s, %s", m.PingURL, resp.Status) + return + } + }(m) + } + + go func() { + wg.Wait() + close(outCh) + }() + + for e := range outCh { + errs = append(errs, e.Error()) + } + return len(errs) == 0, int(atomic.LoadInt32(&pinged)) - len(errs), len(mappers), errs + } + + w.Header().Set("Content-Type", "application/json; charset=UTF-8") + ok, valid, total, errs := check(h.Mappers()) + if !ok { + w.WriteHeader(http.StatusExpectationFailed) + _, err := fmt.Fprintf(w, `{"status": "failed", "passed": %d, "failed":%d, "errors": "%+v"}`, valid, total-valid, errs) + if err != nil { + log.Printf("[WARN] failed %v", err) + } + return + } + w.WriteHeader(http.StatusOK) + _, err := fmt.Fprintf(w, `{"status": "ok", "services": %d}`, valid) + if err != nil { + log.Printf("[WARN] failed to send halth, %v", err) + } +} diff --git a/app/proxy/proxy_test.go b/app/proxy/proxy_test.go index 4593c82..84e91b3 100644 --- a/app/proxy/proxy_test.go +++ b/app/proxy/proxy_test.go @@ -2,6 +2,7 @@ package proxy import ( "context" + "encoding/json" "fmt" "io" "math/rand" @@ -33,8 +34,8 @@ func TestHttp_Do(t *testing.T) { svc := discovery.NewService([]discovery.Provider{ &provider.Static{Rules: []string{ - "localhost,^/api/(.*)," + ds.URL + "/123/$1", - "127.0.0.1,^/api/(.*)," + ds.URL + "/567/$1", + "localhost,^/api/(.*)," + ds.URL + "/123/$1,", + "127.0.0.1,^/api/(.*)," + ds.URL + "/567/$1,", }, }}) @@ -110,3 +111,57 @@ func TestHttp_toHttp(t *testing.T) { } } + +func TestHttp_healthHandler(t *testing.T) { + port := rand.Intn(10000) + 40000 + h := Http{TimeOut: 200 * time.Millisecond, Address: fmt.Sprintf("127.0.0.1:%d", port)} + ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond) + defer cancel() + + ds := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("req: %v", r) + w.Header().Add("h1", "v1") + fmt.Fprintf(w, "response %s", r.URL.String()) + })) + + ps := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + t.Logf("req: %v", r) + if r.URL.Path == "/123/ping" { + return + } + w.WriteHeader(http.StatusBadRequest) + })) + + svc := discovery.NewService([]discovery.Provider{ + &provider.Static{Rules: []string{ + "localhost,^/api/(.*)," + ds.URL + "/123/$1," + ps.URL + "/123/ping", + "127.0.0.1,^/api/(.*)," + ds.URL + "/567/$1," + ps.URL + "/567/ping", + }, + }}) + + go func() { + _ = svc.Run(context.Background()) + }() + + h.Matcher = svc + go func() { + _ = h.Run(ctx) + }() + time.Sleep(10 * time.Millisecond) + + client := http.Client{} + req, err := http.NewRequest("GET", "http://127.0.0.1:"+strconv.Itoa(port)+"/health", nil) + require.NoError(t, err) + resp, err := client.Do(req) + require.NoError(t, err) + defer resp.Body.Close() + assert.Equal(t, http.StatusExpectationFailed, resp.StatusCode) + + res := map[string]interface{}{} + err = json.NewDecoder(resp.Body).Decode(&res) + require.NoError(t, err) + assert.Equal(t, "failed", res["status"]) + assert.Equal(t, 1., res["passed"]) + assert.Equal(t, 1., res["failed"]) + assert.Contains(t, res["errors"], "400 Bad Request") +}