From 492a280cf6aa3d3002ad4775d98ef965a975217b Mon Sep 17 00:00:00 2001 From: Anton Date: Thu, 30 Dec 2021 14:53:59 +0300 Subject: [PATCH] refactor pusher/puler --- internal/app/app.go | 44 +++++++-------------------------------- internal/pusher/pusher.go | 31 +++++++++++---------------- 2 files changed, 19 insertions(+), 56 deletions(-) diff --git a/internal/app/app.go b/internal/app/app.go index cf9a727..4d92965 100644 --- a/internal/app/app.go +++ b/internal/app/app.go @@ -96,16 +96,16 @@ func Run() error { sigchan := make(chan os.Signal, 1) defer close(sigchan) - errchan := make(chan error, 1) + errchan := make(chan error) defer close(errchan) signal.Notify(sigchan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) switch conf.MODE { case push: - go RunPusher_(ctx, errchan, rcli) + go RunPusher(ctx, errchan, rcli) case pull: - go RunPuller_(ctx, errchan, rcli) + go RunPuller(ctx, errchan, rcli) default: { cancel() @@ -121,37 +121,7 @@ func Run() error { return <-errchan } -func RunPuller(rasapi rascli.Api) error { - - log.Printf("v8-1c-cluster-pde: runing in %v mode", conf.MODE) - promRegistry := prometheus.NewRegistry() - promRegistry.MustRegister(rpHostsCollector.New(rasapi)) - - http.Handle("/metrics", - promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}), - ) - - log.Printf("v8-1c-cluster-pde: listen %v", fmt.Sprintf("%s:%s", "", conf.PULL_EXPOSE)) - - err := http.ListenAndServe(fmt.Sprintf("%s:%s", "", conf.PULL_EXPOSE), nil) - if err != nil { - return fmt.Errorf("app: %v", err) - } - - return nil -} - -func RunPusher(rasapi rascli.Api) error { - log.Printf("v8-1c-cluster-pde: runing in %v mode pushgateway %v\n", - conf.MODE, fmt.Sprintf("%s:%s", conf.PUSH_HOST, conf.PUSH_PORT)) - return pusher.New( - rpHostsCollector.New(rasapi), - fmt.Sprintf("%s:%s", conf.PUSH_HOST, conf.PUSH_PORT), - pusher.WithInterval(500), - ).Run(context.Background()) -} - -func RunPuller_(ctx context.Context, errchan chan<- error, rasapi rascli.Api) { +func RunPuller(ctx context.Context, errchan chan<- error, rasapi rascli.Api) { log.Printf("v8-1c-cluster-pde: runing in %v mode", conf.MODE) promRegistry := prometheus.NewRegistry() @@ -179,13 +149,13 @@ func RunPuller_(ctx context.Context, errchan chan<- error, rasapi rascli.Api) { log.Printf("v8-1c-cluster-pde: server shutdown") } -func RunPusher_(ctx context.Context, errchan chan<- error, rasapi rascli.Api) { +func RunPusher(ctx context.Context, errchan chan<- error, rasapi rascli.Api) { log.Printf("v8-1c-cluster-pde: runing in %v mode pushgateway %v\n", conf.MODE, fmt.Sprintf("%s:%s", conf.PUSH_HOST, conf.PUSH_PORT)) - errchan <- pusher.New( + go pusher.New( rpHostsCollector.New(rasapi), fmt.Sprintf("%s:%s", conf.PUSH_HOST, conf.PUSH_PORT), pusher.WithInterval(500), - ).Run(ctx) + ).Run(ctx, errchan) } diff --git a/internal/pusher/pusher.go b/internal/pusher/pusher.go index 06d0eb4..fdd62de 100644 --- a/internal/pusher/pusher.go +++ b/internal/pusher/pusher.go @@ -55,28 +55,21 @@ func New(collector prometheus.Collector, url string, opts ...PusherOption) *Push return p } -func (p *Pusher) Run(ctx context.Context) error { +func (p *Pusher) Run(ctx context.Context, errchan chan<- error) { ticker := time.NewTicker(time.Duration(p.intervalMillis * int(time.Microsecond))) - done := make(chan error) - go func(done chan error) { - Loop: - for { - select { - case <-ticker.C: - err := p.pusher.Push() - if err != nil { - done <- fmt.Errorf("puser: %v", err) - break Loop - } - case <-ctx.Done(): - log.Println("INFO: pusher context complete") - done <- nil +Loop: + for { + select { + case <-ticker.C: + err := p.pusher.Push() + if err != nil { + errchan <- fmt.Errorf("puser: %v", err) break Loop } + case <-ctx.Done(): + log.Println("INFO: pusher context done") + break Loop } - close(done) - }(done) - - return <-done + } }