diff --git a/bufpool.go b/bufpool.go index 3a7d008d..d099e0a0 100644 --- a/bufpool.go +++ b/bufpool.go @@ -2,32 +2,50 @@ package main import ( "bytes" + "math" + "sort" "sync" ) -type bufPool struct { - mutex sync.Mutex - name string - size int - top *bufPoolEntry -} +type intSlice []int -type bufPoolEntry struct { - buf *bytes.Buffer - next *bufPoolEntry +func (p intSlice) Len() int { return len(p) } +func (p intSlice) Less(i, j int) bool { return p[i] < p[j] } +func (p intSlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } + +type bufPool struct { + mutex sync.Mutex + name string + size int + buffers []*bytes.Buffer + + calls intSlice + callInd int + + throughput int } func newBufPool(name string, n int, size int) *bufPool { - pool := bufPool{name: name, size: size} + pool := bufPool{ + name: name, + size: size, + buffers: make([]*bytes.Buffer, n), + calls: make(intSlice, 1024), + } - for i := 0; i < n; i++ { - pool.grow() + for i := range pool.buffers { + pool.buffers[i] = pool.new() + } + + if pool.size > 0 { + // Get the real cap + pool.size = pool.buffers[0].Cap() } return &pool } -func (p *bufPool) grow() { +func (p *bufPool) new() *bytes.Buffer { var buf *bytes.Buffer buf = new(bytes.Buffer) @@ -36,25 +54,68 @@ func (p *bufPool) grow() { buf.Grow(p.size) } - p.top = &bufPoolEntry{buf: buf, next: p.top} - - if prometheusEnabled { - incrementBuffersTotal(p.name) - } + return buf } -func (p *bufPool) get() *bytes.Buffer { +func (p *bufPool) calibrateAndClean() { + var score float64 + + sort.Sort(p.calls) + + pos := 0.95 * float64(p.callInd+1) + + if pos < 1.0 { + score = float64(p.calls[0]) + } else if pos >= float64(p.callInd) { + score = float64(p.calls[p.callInd-1]) + } else { + lower := float64(p.calls[int(pos)-1]) + upper := float64(p.calls[int(pos)]) + score = lower + (pos-math.Floor(pos))*(upper-lower) + } + + p.throughput = int(score) + p.callInd = 0 +} + +func (p *bufPool) get(size int) *bytes.Buffer { p.mutex.Lock() defer p.mutex.Unlock() - if p.top == nil { - p.grow() + minSize, maxSize, minInd, maxInd := -1, -1, -1, -1 + + for i := 0; i < len(p.buffers); i++ { + if p.buffers[i] != nil { + cap := p.buffers[i].Cap() + + if size > 0 && cap >= size && (minSize > cap || minSize == -1) { + minSize = cap + minInd = i + } + + if cap > maxSize { + maxSize = cap + maxInd = i + } + } } - buf := p.top.buf - buf.Reset() + var buf *bytes.Buffer - p.top = p.top.next + if minInd >= 0 { + // We found buffer with the desired size + buf = p.buffers[minInd] + p.buffers[minInd] = nil + } else if maxInd >= 0 { + // We didn't find buffer with the desired size + buf = p.buffers[maxInd] + p.buffers[maxInd] = nil + } else { + // We didn't find buffers at all + return p.new() + } + + buf.Reset() return buf } @@ -63,9 +124,26 @@ func (p *bufPool) put(buf *bytes.Buffer) { p.mutex.Lock() defer p.mutex.Unlock() - p.top = &bufPoolEntry{buf: buf, next: p.top} + p.calls[p.callInd] = buf.Cap() + p.callInd++ - if prometheusEnabled { - observeBufferSize(p.name, buf.Cap()) + if p.callInd == len(p.calls) { + p.calibrateAndClean() + } + + if p.throughput > 0 && buf.Cap() > p.throughput { + return + } + + for i, b := range p.buffers { + if b == nil { + p.buffers[i] = buf + + if prometheusEnabled { + observeBufferSize(p.name, buf.Cap()) + } + + return + } } } diff --git a/download.go b/download.go index 20f70987..91652df0 100644 --- a/download.go +++ b/download.go @@ -122,11 +122,24 @@ func checkTypeAndDimensions(r io.Reader) (imageType, error) { } func readAndCheckImage(ctx context.Context, res *http.Response) (context.Context, context.CancelFunc, error) { - buf := downloadBufPool.get() + var contentLength int + + if res.ContentLength > 0 { + contentLength = int(res.ContentLength) + } else { + // ContentLength wasn't set properly, trying to parse the header + contentLength, _ = strconv.Atoi(res.Header.Get("Content-Length")) + } + + buf := downloadBufPool.get(contentLength) cancel := func() { downloadBufPool.put(buf) } + if contentLength > buf.Cap() { + buf.Grow(contentLength - buf.Len()) + } + body := res.Body if conf.MaxSrcFileSize > 0 { @@ -138,19 +151,6 @@ func readAndCheckImage(ctx context.Context, res *http.Response) (context.Context return ctx, cancel, err } - var contentLength int - - if res.ContentLength > 0 { - contentLength = int(res.ContentLength) - } else { - // ContentLength wasn't set properly, trying to parse the header - contentLength, _ = strconv.Atoi(res.Header.Get("Content-Length")) - } - - if contentLength > buf.Cap() { - buf.Grow(contentLength - buf.Len()) - } - if _, err = buf.ReadFrom(body); err != nil { return ctx, cancel, newError(404, err.Error(), msgSourceImageIsUnreachable) } diff --git a/server.go b/server.go index 3b34c2c7..02ca0e77 100644 --- a/server.go +++ b/server.go @@ -124,7 +124,7 @@ func respondWithImage(ctx context.Context, reqID string, r *http.Request, rw htt addVaryHeader(rw) if conf.GZipCompression > 0 && strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - buf := responseGzipBufPool.get() + buf := responseGzipBufPool.get(0) defer responseGzipBufPool.put(buf) gz := responseGzipPool.get(buf)