From 9807728f9cdaecac92b641adcdeef17cd7aaa155 Mon Sep 17 00:00:00 2001 From: DarthSim Date: Thu, 31 Jan 2019 14:00:31 +0600 Subject: [PATCH] Calibrating default buffer size; Prometheus metrics for calibration --- bufpool.go | 59 ++++++++++++++++++++++++++++----------------------- prometheus.go | 26 ++++++++++++++++++++++- 2 files changed, 57 insertions(+), 28 deletions(-) diff --git a/bufpool.go b/bufpool.go index bb4e6c21..ddfe5bc9 100644 --- a/bufpool.go +++ b/bufpool.go @@ -13,32 +13,27 @@ func (p intSlice) Less(i, j int) bool { return p[i] < p[j] } func (p intSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } type bufPool struct { - mutex sync.Mutex - name string - size int - buffers []*bytes.Buffer + name string + defaultSize int + maxSize int + buffers []*bytes.Buffer calls intSlice callInd int - throughput int + mutex sync.Mutex } -func newBufPool(name string, n int, size int) *bufPool { +func newBufPool(name string, n int, defaultSize int) *bufPool { pool := bufPool{ - name: name, - size: size, - buffers: make([]*bytes.Buffer, n), - calls: make(intSlice, 1024), + name: name, + defaultSize: defaultSize, + buffers: make([]*bytes.Buffer, n), + calls: make(intSlice, 1024), } for i := range pool.buffers { - pool.buffers[i] = pool.new() - } - - if pool.size > 0 { - // Get the real cap - pool.size = pool.buffers[0].Cap() + pool.buffers[i] = new(bytes.Buffer) } return &pool @@ -49,8 +44,8 @@ func (p *bufPool) new() *bytes.Buffer { buf = new(bytes.Buffer) - if p.size > 0 { - buf.Grow(p.size) + if p.defaultSize > 0 { + buf.Grow(p.defaultSize) } return buf @@ -63,20 +58,28 @@ func (p *bufPool) calibrateAndClean() { score := p.calls[pos] p.callInd = 0 - p.throughput = 64 + p.maxSize = 64 for { - if p.throughput > score { + if p.maxSize > score { break } - p.throughput <<= 1 + p.maxSize <<= 1 } + p.defaultSize = maxInt(p.defaultSize, p.calls[0]) + p.maxSize = maxInt(p.defaultSize, p.maxSize) + for i, buf := range p.buffers { - if buf != nil && buf.Cap() > p.throughput { + if buf != nil && buf.Cap() > p.maxSize { p.buffers[i] = nil } } + + if prometheusEnabled { + setPrometheusBufferDefaultSize(p.name, p.defaultSize) + setPrometheusBufferMaxSize(p.name, p.maxSize) + } } func (p *bufPool) Get(size int) *bytes.Buffer { @@ -113,13 +116,15 @@ func (p *bufPool) Get(size int) *bytes.Buffer { p.buffers[maxInd] = nil } else { // We didn't find buffers at all - return p.new() + buf = new(bytes.Buffer) } buf.Reset() - if size > 0 && size > buf.Cap() { - buf.Grow(size) + growSize := maxInt(size, p.defaultSize) + + if growSize > buf.Cap() { + buf.Grow(growSize) } return buf @@ -138,7 +143,7 @@ func (p *bufPool) Put(buf *bytes.Buffer) { } } - if p.throughput > 0 && buf.Cap() > p.throughput { + if p.maxSize > 0 && buf.Cap() > p.maxSize { return } @@ -147,7 +152,7 @@ func (p *bufPool) Put(buf *bytes.Buffer) { p.buffers[i] = buf if prometheusEnabled && buf.Cap() > 0 { - observeBufferSize(p.name, buf.Cap()) + observePrometheusBufferSize(p.name, buf.Cap()) } return diff --git a/prometheus.go b/prometheus.go index ac2b79d4..02a973f8 100644 --- a/prometheus.go +++ b/prometheus.go @@ -17,6 +17,8 @@ var ( prometheusDownloadDuration prometheus.Histogram prometheusProcessingDuration prometheus.Histogram prometheusBufferSize *prometheus.HistogramVec + prometheusBufferDefaultSize *prometheus.GaugeVec + prometheusBufferMaxSize *prometheus.GaugeVec ) func initPrometheus() { @@ -54,6 +56,16 @@ func initPrometheus() { Help: "A histogram of the buffer size in megabytes.", }, []string{"type"}) + prometheusBufferDefaultSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "buffer_default_size_megabytes", + Help: "A gauge of the buffer default size in megabytes.", + }, []string{"type"}) + + prometheusBufferMaxSize = prometheus.NewGaugeVec(prometheus.GaugeOpts{ + Name: "buffer_max_size_megabytes", + Help: "A gauge of the buffer max size in megabytes.", + }, []string{"type"}) + prometheus.MustRegister( prometheusRequestsTotal, prometheusErrorsTotal, @@ -61,6 +73,8 @@ func initPrometheus() { prometheusDownloadDuration, prometheusProcessingDuration, prometheusBufferSize, + prometheusBufferDefaultSize, + prometheusBufferMaxSize, ) prometheusEnabled = true @@ -89,7 +103,17 @@ func incrementPrometheusErrorsTotal(t string) { prometheusErrorsTotal.With(prometheus.Labels{"type": t}).Inc() } -func observeBufferSize(t string, cap int) { +func observePrometheusBufferSize(t string, cap int) { size := float64(cap) / 1024.0 / 1024.0 prometheusBufferSize.With(prometheus.Labels{"type": t}).Observe(size) } + +func setPrometheusBufferDefaultSize(t string, cap int) { + size := float64(cap) / 1024.0 / 1024.0 + prometheusBufferDefaultSize.With(prometheus.Labels{"type": t}).Set(size) +} + +func setPrometheusBufferMaxSize(t string, cap int) { + size := float64(cap) / 1024.0 / 1024.0 + prometheusBufferMaxSize.With(prometheus.Labels{"type": t}).Set(size) +}