1
0
mirror of https://github.com/umputun/reproxy.git synced 2025-09-16 08:46:17 +02:00

add automatic health-checker

This commit is contained in:
nikolay.bystritskiy
2021-05-07 17:36:07 +02:00
committed by Umputun
parent 04a65d61bf
commit 973d6c9a14
5 changed files with 142 additions and 61 deletions

View File

@@ -36,6 +36,8 @@ type URLMapper struct {
AssetsLocation string
AssetsWebRoot string
dead bool
}
// Provider defines sources of mappers
@@ -147,6 +149,36 @@ func (s *Service) Match(srv, src string) (string, MatchType, bool) {
return src, MTProxy, false
}
// ScheduleHealthCheck starts background loop with health-check
func (s *Service) ScheduleHealthCheck(ctx context.Context, interval time.Duration) {
log.Printf("health-check scheduled every %s seconds", interval)
go func() {
hloop:
for {
timer := time.NewTimer(interval)
select {
case <-timer.C:
s.lock.RLock()
cres := CheckHealth(s.Mappers())
s.lock.RUnlock()
sort.SliceStable(cres.mappers, func(i, j int) bool {
return cres.mappers[j].dead
})
s.lock.Lock()
s.mappers = make(map[string][]URLMapper)
for _, m := range cres.mappers {
s.mappers[m.Server] = append(s.mappers[m.Server], m)
}
s.lock.Unlock()
case <-ctx.Done():
break hloop
}
}
}()
}
// Servers return list of all servers, skips "*" (catch-all/default)
func (s *Service) Servers() (servers []string) {
s.lock.RLock()

97
app/discovery/health.go Normal file
View File

@@ -0,0 +1,97 @@
package discovery
import (
"fmt"
"log"
"net/http"
"strings"
"sync"
"time"
)
type CheckResult struct {
Ok bool
Valid int
Total int
Errs []string
mappers []URLMapper
}
func CheckHealth(mappers []URLMapper) CheckResult {
const concurrent = 8
sema := make(chan struct{}, concurrent) // limit health check to 8 concurrent calls
// runs pings in parallel
type mapperError struct {
mapper URLMapper
err error
}
outCh := make(chan mapperError, concurrent)
services, pinged := 0, 0
var wg sync.WaitGroup
for _, m := range mappers {
if m.MatchType != MTProxy {
continue
}
services++
if m.PingURL == "" {
continue
}
pinged++
wg.Add(1)
go func(m URLMapper) {
sema <- struct{}{}
defer func() {
<-sema
wg.Done()
}()
m.dead = false
errMsg, err := ping(m)
if err != nil {
m.dead = true
log.Print(errMsg)
}
outCh <- mapperError{m, err}
}(m)
}
go func() {
wg.Wait()
close(outCh)
}()
res := CheckResult{}
for m := range outCh {
if m.err != nil {
res.Errs = append(res.Errs, m.err.Error())
}
res.mappers = append(res.mappers, m.mapper)
}
res.Ok = len(res.Errs) == 0
res.Valid = pinged - len(res.Errs)
res.Total = services
return res
}
func ping(m URLMapper) (string, error) {
client := http.Client{Timeout: 500 * time.Millisecond}
resp, err := client.Get(m.PingURL)
if err != nil {
errMsg := strings.Replace(err.Error(), "\"", "", -1)
errMsg = fmt.Sprintf("[WARN] failed to ping for health %s, %s", m.PingURL, errMsg)
return errMsg, fmt.Errorf("%s %s: %s, %v", m.Server, m.SrcMatch.String(), m.PingURL, errMsg)
}
if resp.StatusCode != http.StatusOK {
errMsg := fmt.Sprintf("[WARN] failed ping status for health %s (%s)", m.PingURL, resp.Status)
return errMsg, fmt.Errorf("%s %s: %s, %s", m.Server, m.SrcMatch.String(), m.PingURL, resp.Status)
}
return "", err
}

View File

@@ -0,0 +1 @@
package discovery

View File

@@ -107,6 +107,11 @@ var opts struct {
Signature bool `long:"signature" env:"SIGNATURE" description:"enable reproxy signature headers"`
Dbg bool `long:"dbg" env:"DEBUG" description:"debug mode"`
HealthCheck struct {
Enabled bool `long:"health-check" env:"HEALTH_CHECK" description:"enable automatic health-check"`
Interval time.Duration `long:"health-check-interval" env:"HEALTH_CHECK_INTERVAL" default:"300s" description:"automatic health-check interval"`
}
}
var revision = "unknown"
@@ -163,6 +168,9 @@ func run() error {
}
}()
}
if opts.HealthCheck.Enabled {
svc.ScheduleHealthCheck(context.Background(), opts.HealthCheck.Interval)
}
sslConfig, sslErr := makeSSLConfig()
if sslErr != nil {

View File

@@ -4,12 +4,9 @@ import (
"fmt"
"net/http"
"strings"
"sync"
"time"
log "github.com/go-pkgz/lgr"
"github.com/go-pkgz/rest"
"github.com/umputun/reproxy/app/discovery"
)
@@ -25,63 +22,9 @@ func (h *Http) healthMiddleware(next http.Handler) http.Handler {
}
func (h *Http) healthHandler(w http.ResponseWriter, _ *http.Request) {
const concurrent = 8
sema := make(chan struct{}, concurrent) // limit health check to 8 concurrent calls
// runs pings in parallel
check := func(mappers []discovery.URLMapper) (ok bool, valid int, total int, errs []string) {
outCh := make(chan error, concurrent)
services, pinged := 0, 0
var wg sync.WaitGroup
for _, m := range mappers {
if m.MatchType != discovery.MTProxy {
continue
}
services++
if m.PingURL == "" {
continue
}
pinged++
wg.Add(1)
go func(m discovery.URLMapper) {
sema <- struct{}{}
defer func() {
<-sema
wg.Done()
}()
client := http.Client{Timeout: 500 * time.Millisecond}
resp, err := client.Get(m.PingURL)
if err != nil {
errMsg := strings.Replace(err.Error(), "\"", "", -1)
log.Printf("[WARN] failed to ping for health %s, %s", m.PingURL, errMsg)
outCh <- fmt.Errorf("%s %s: %s, %v", m.Server, m.SrcMatch.String(), m.PingURL, errMsg)
return
}
if resp.StatusCode != http.StatusOK {
log.Printf("[WARN] failed ping status for health %s (%s)", m.PingURL, resp.Status)
outCh <- fmt.Errorf("%s %s: %s, %s", m.Server, m.SrcMatch.String(), m.PingURL, resp.Status)
return
}
}(m)
}
go func() {
wg.Wait()
close(outCh)
}()
for e := range outCh {
errs = append(errs, e.Error())
}
return len(errs) == 0, pinged - len(errs), services, errs
}
w.Header().Set("Content-Type", "application/json; charset=UTF-8")
ok, valid, total, errs := check(h.Mappers())
if !ok {
res := discovery.CheckHealth(h.Mappers())
if !res.Ok {
w.WriteHeader(http.StatusExpectationFailed)
errResp := struct {
@@ -90,13 +33,13 @@ func (h *Http) healthHandler(w http.ResponseWriter, _ *http.Request) {
Passed int `json:"passed,omitempty"`
Failed int `json:"failed,omitempty"`
Errors []string `json:"errors,omitempty"`
}{Status: "failed", Services: total, Passed: valid, Failed: len(errs), Errors: errs}
}{Status: "failed", Services: res.Total, Passed: res.Valid, Failed: len(res.Errs), Errors: res.Errs}
rest.RenderJSON(w, errResp)
return
}
w.WriteHeader(http.StatusOK)
_, err := fmt.Fprintf(w, `{"status": "ok", "services": %d}`, valid)
_, err := fmt.Fprintf(w, `{"status": "ok", "services": %d}`, res.Valid)
if err != nil {
log.Printf("[WARN] failed to send health, %v", err)
}