From 973d6c9a149d9e0d834b9341be8b4f898494cd5c Mon Sep 17 00:00:00 2001 From: "nikolay.bystritskiy" Date: Fri, 7 May 2021 17:36:07 +0200 Subject: [PATCH] add automatic health-checker --- app/discovery/discovery.go | 32 ++++++++++++ app/discovery/health.go | 97 ++++++++++++++++++++++++++++++++++++ app/discovery/health_test.go | 1 + app/main.go | 8 +++ app/proxy/health.go | 65 ++---------------------- 5 files changed, 142 insertions(+), 61 deletions(-) create mode 100644 app/discovery/health.go create mode 100644 app/discovery/health_test.go diff --git a/app/discovery/discovery.go b/app/discovery/discovery.go index 41fc4e8..2506363 100644 --- a/app/discovery/discovery.go +++ b/app/discovery/discovery.go @@ -36,6 +36,8 @@ type URLMapper struct { AssetsLocation string AssetsWebRoot string + + dead bool } // Provider defines sources of mappers @@ -147,6 +149,36 @@ func (s *Service) Match(srv, src string) (string, MatchType, bool) { return src, MTProxy, false } +// ScheduleHealthCheck starts background loop with health-check +func (s *Service) ScheduleHealthCheck(ctx context.Context, interval time.Duration) { + log.Printf("health-check scheduled every %s seconds", interval) + + go func() { + hloop: + for { + timer := time.NewTimer(interval) + select { + case <-timer.C: + s.lock.RLock() + cres := CheckHealth(s.Mappers()) + s.lock.RUnlock() + + sort.SliceStable(cres.mappers, func(i, j int) bool { + return cres.mappers[j].dead + }) + s.lock.Lock() + s.mappers = make(map[string][]URLMapper) + for _, m := range cres.mappers { + s.mappers[m.Server] = append(s.mappers[m.Server], m) + } + s.lock.Unlock() + case <-ctx.Done(): + break hloop + } + } + }() +} + // Servers return list of all servers, skips "*" (catch-all/default) func (s *Service) Servers() (servers []string) { s.lock.RLock() diff --git a/app/discovery/health.go b/app/discovery/health.go new file mode 100644 index 0000000..f561a58 --- /dev/null +++ b/app/discovery/health.go @@ -0,0 +1,97 @@ +package discovery + +import ( + "fmt" + "log" + "net/http" + "strings" + "sync" + "time" +) + +type CheckResult struct { + Ok bool + Valid int + Total int + Errs []string + mappers []URLMapper +} + +func CheckHealth(mappers []URLMapper) CheckResult { + const concurrent = 8 + sema := make(chan struct{}, concurrent) // limit health check to 8 concurrent calls + + // runs pings in parallel + type mapperError struct { + mapper URLMapper + err error + } + outCh := make(chan mapperError, concurrent) + + services, pinged := 0, 0 + var wg sync.WaitGroup + for _, m := range mappers { + if m.MatchType != MTProxy { + continue + } + services++ + if m.PingURL == "" { + continue + } + pinged++ + wg.Add(1) + + go func(m URLMapper) { + sema <- struct{}{} + defer func() { + <-sema + wg.Done() + }() + + m.dead = false + errMsg, err := ping(m) + if err != nil { + m.dead = true + log.Print(errMsg) + } + outCh <- mapperError{m, err} + }(m) + } + + go func() { + wg.Wait() + close(outCh) + }() + + res := CheckResult{} + + for m := range outCh { + if m.err != nil { + res.Errs = append(res.Errs, m.err.Error()) + } + res.mappers = append(res.mappers, m.mapper) + } + + res.Ok = len(res.Errs) == 0 + res.Valid = pinged - len(res.Errs) + res.Total = services + + return res +} + +func ping(m URLMapper) (string, error) { + client := http.Client{Timeout: 500 * time.Millisecond} + + resp, err := client.Get(m.PingURL) + if err != nil { + errMsg := strings.Replace(err.Error(), "\"", "", -1) + errMsg = fmt.Sprintf("[WARN] failed to ping for health %s, %s", m.PingURL, errMsg) + return errMsg, fmt.Errorf("%s %s: %s, %v", m.Server, m.SrcMatch.String(), m.PingURL, errMsg) + } + if resp.StatusCode != http.StatusOK { + errMsg := fmt.Sprintf("[WARN] failed ping status for health %s (%s)", m.PingURL, resp.Status) + return errMsg, fmt.Errorf("%s %s: %s, %s", m.Server, m.SrcMatch.String(), m.PingURL, resp.Status) + } + + return "", err +} diff --git a/app/discovery/health_test.go b/app/discovery/health_test.go new file mode 100644 index 0000000..5844159 --- /dev/null +++ b/app/discovery/health_test.go @@ -0,0 +1 @@ +package discovery diff --git a/app/main.go b/app/main.go index b119871..d3c68db 100644 --- a/app/main.go +++ b/app/main.go @@ -107,6 +107,11 @@ var opts struct { Signature bool `long:"signature" env:"SIGNATURE" description:"enable reproxy signature headers"` Dbg bool `long:"dbg" env:"DEBUG" description:"debug mode"` + + HealthCheck struct { + Enabled bool `long:"health-check" env:"HEALTH_CHECK" description:"enable automatic health-check"` + Interval time.Duration `long:"health-check-interval" env:"HEALTH_CHECK_INTERVAL" default:"300s" description:"automatic health-check interval"` + } } var revision = "unknown" @@ -163,6 +168,9 @@ func run() error { } }() } + if opts.HealthCheck.Enabled { + svc.ScheduleHealthCheck(context.Background(), opts.HealthCheck.Interval) + } sslConfig, sslErr := makeSSLConfig() if sslErr != nil { diff --git a/app/proxy/health.go b/app/proxy/health.go index eb5aa93..67654d3 100644 --- a/app/proxy/health.go +++ b/app/proxy/health.go @@ -4,12 +4,9 @@ import ( "fmt" "net/http" "strings" - "sync" - "time" log "github.com/go-pkgz/lgr" "github.com/go-pkgz/rest" - "github.com/umputun/reproxy/app/discovery" ) @@ -25,63 +22,9 @@ func (h *Http) healthMiddleware(next http.Handler) http.Handler { } func (h *Http) healthHandler(w http.ResponseWriter, _ *http.Request) { - - const concurrent = 8 - sema := make(chan struct{}, concurrent) // limit health check to 8 concurrent calls - - // runs pings in parallel - check := func(mappers []discovery.URLMapper) (ok bool, valid int, total int, errs []string) { - outCh := make(chan error, concurrent) - services, pinged := 0, 0 - var wg sync.WaitGroup - for _, m := range mappers { - if m.MatchType != discovery.MTProxy { - continue - } - services++ - if m.PingURL == "" { - continue - } - pinged++ - wg.Add(1) - - go func(m discovery.URLMapper) { - sema <- struct{}{} - defer func() { - <-sema - wg.Done() - }() - - client := http.Client{Timeout: 500 * time.Millisecond} - resp, err := client.Get(m.PingURL) - if err != nil { - errMsg := strings.Replace(err.Error(), "\"", "", -1) - log.Printf("[WARN] failed to ping for health %s, %s", m.PingURL, errMsg) - outCh <- fmt.Errorf("%s %s: %s, %v", m.Server, m.SrcMatch.String(), m.PingURL, errMsg) - return - } - if resp.StatusCode != http.StatusOK { - log.Printf("[WARN] failed ping status for health %s (%s)", m.PingURL, resp.Status) - outCh <- fmt.Errorf("%s %s: %s, %s", m.Server, m.SrcMatch.String(), m.PingURL, resp.Status) - return - } - }(m) - } - - go func() { - wg.Wait() - close(outCh) - }() - - for e := range outCh { - errs = append(errs, e.Error()) - } - return len(errs) == 0, pinged - len(errs), services, errs - } - w.Header().Set("Content-Type", "application/json; charset=UTF-8") - ok, valid, total, errs := check(h.Mappers()) - if !ok { + res := discovery.CheckHealth(h.Mappers()) + if !res.Ok { w.WriteHeader(http.StatusExpectationFailed) errResp := struct { @@ -90,13 +33,13 @@ func (h *Http) healthHandler(w http.ResponseWriter, _ *http.Request) { Passed int `json:"passed,omitempty"` Failed int `json:"failed,omitempty"` Errors []string `json:"errors,omitempty"` - }{Status: "failed", Services: total, Passed: valid, Failed: len(errs), Errors: errs} + }{Status: "failed", Services: res.Total, Passed: res.Valid, Failed: len(res.Errs), Errors: res.Errs} rest.RenderJSON(w, errResp) return } w.WriteHeader(http.StatusOK) - _, err := fmt.Fprintf(w, `{"status": "ok", "services": %d}`, valid) + _, err := fmt.Fprintf(w, `{"status": "ok", "services": %d}`, res.Valid) if err != nil { log.Printf("[WARN] failed to send health, %v", err) }