1
0
mirror of https://github.com/Chipazawra/v8-1c-cluster-pde.git synced 2025-01-14 04:35:54 +02:00

refactor pusher/puler

This commit is contained in:
Anton 2021-12-30 14:53:59 +03:00
parent 95ce08a734
commit 492a280cf6
2 changed files with 19 additions and 56 deletions

View File

@ -96,16 +96,16 @@ func Run() error {
sigchan := make(chan os.Signal, 1) sigchan := make(chan os.Signal, 1)
defer close(sigchan) defer close(sigchan)
errchan := make(chan error, 1) errchan := make(chan error)
defer close(errchan) defer close(errchan)
signal.Notify(sigchan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM) signal.Notify(sigchan, os.Interrupt, syscall.SIGINT, syscall.SIGTERM)
switch conf.MODE { switch conf.MODE {
case push: case push:
go RunPusher_(ctx, errchan, rcli) go RunPusher(ctx, errchan, rcli)
case pull: case pull:
go RunPuller_(ctx, errchan, rcli) go RunPuller(ctx, errchan, rcli)
default: default:
{ {
cancel() cancel()
@ -121,37 +121,7 @@ func Run() error {
return <-errchan return <-errchan
} }
func RunPuller(rasapi rascli.Api) error { func RunPuller(ctx context.Context, errchan chan<- error, rasapi rascli.Api) {
log.Printf("v8-1c-cluster-pde: runing in %v mode", conf.MODE)
promRegistry := prometheus.NewRegistry()
promRegistry.MustRegister(rpHostsCollector.New(rasapi))
http.Handle("/metrics",
promhttp.HandlerFor(promRegistry, promhttp.HandlerOpts{}),
)
log.Printf("v8-1c-cluster-pde: listen %v", fmt.Sprintf("%s:%s", "", conf.PULL_EXPOSE))
err := http.ListenAndServe(fmt.Sprintf("%s:%s", "", conf.PULL_EXPOSE), nil)
if err != nil {
return fmt.Errorf("app: %v", err)
}
return nil
}
func RunPusher(rasapi rascli.Api) error {
log.Printf("v8-1c-cluster-pde: runing in %v mode pushgateway %v\n",
conf.MODE, fmt.Sprintf("%s:%s", conf.PUSH_HOST, conf.PUSH_PORT))
return pusher.New(
rpHostsCollector.New(rasapi),
fmt.Sprintf("%s:%s", conf.PUSH_HOST, conf.PUSH_PORT),
pusher.WithInterval(500),
).Run(context.Background())
}
func RunPuller_(ctx context.Context, errchan chan<- error, rasapi rascli.Api) {
log.Printf("v8-1c-cluster-pde: runing in %v mode", conf.MODE) log.Printf("v8-1c-cluster-pde: runing in %v mode", conf.MODE)
promRegistry := prometheus.NewRegistry() promRegistry := prometheus.NewRegistry()
@ -179,13 +149,13 @@ func RunPuller_(ctx context.Context, errchan chan<- error, rasapi rascli.Api) {
log.Printf("v8-1c-cluster-pde: server shutdown") log.Printf("v8-1c-cluster-pde: server shutdown")
} }
func RunPusher_(ctx context.Context, errchan chan<- error, rasapi rascli.Api) { func RunPusher(ctx context.Context, errchan chan<- error, rasapi rascli.Api) {
log.Printf("v8-1c-cluster-pde: runing in %v mode pushgateway %v\n", log.Printf("v8-1c-cluster-pde: runing in %v mode pushgateway %v\n",
conf.MODE, fmt.Sprintf("%s:%s", conf.PUSH_HOST, conf.PUSH_PORT)) conf.MODE, fmt.Sprintf("%s:%s", conf.PUSH_HOST, conf.PUSH_PORT))
errchan <- pusher.New( go pusher.New(
rpHostsCollector.New(rasapi), rpHostsCollector.New(rasapi),
fmt.Sprintf("%s:%s", conf.PUSH_HOST, conf.PUSH_PORT), fmt.Sprintf("%s:%s", conf.PUSH_HOST, conf.PUSH_PORT),
pusher.WithInterval(500), pusher.WithInterval(500),
).Run(ctx) ).Run(ctx, errchan)
} }

View File

@ -55,28 +55,21 @@ func New(collector prometheus.Collector, url string, opts ...PusherOption) *Push
return p return p
} }
func (p *Pusher) Run(ctx context.Context) error { func (p *Pusher) Run(ctx context.Context, errchan chan<- error) {
ticker := time.NewTicker(time.Duration(p.intervalMillis * int(time.Microsecond))) ticker := time.NewTicker(time.Duration(p.intervalMillis * int(time.Microsecond)))
done := make(chan error) Loop:
go func(done chan error) { for {
Loop: select {
for { case <-ticker.C:
select { err := p.pusher.Push()
case <-ticker.C: if err != nil {
err := p.pusher.Push() errchan <- fmt.Errorf("puser: %v", err)
if err != nil {
done <- fmt.Errorf("puser: %v", err)
break Loop
}
case <-ctx.Done():
log.Println("INFO: pusher context complete")
done <- nil
break Loop break Loop
} }
case <-ctx.Done():
log.Println("INFO: pusher context done")
break Loop
} }
close(done) }
}(done)
return <-done
} }