From f4f746489cbc45b83ddbe7a099046103fbfa2d9d Mon Sep 17 00:00:00 2001 From: DarthSim Date: Thu, 17 Jan 2019 14:51:19 +0600 Subject: [PATCH] Optimized memory usage; Reducing memory fragmentation --- bufpool.go | 62 ++++++++++++++++++++++++++++++++++++++++++++++++++++ config.go | 25 +++++++++++++++++++++ download.go | 14 +++++------- gzip.go | 26 ---------------------- gzippool.go | 63 +++++++++++++++++++++++++++++++++++++++++++++++++++++ main.go | 17 +++++++++++++++ process.go | 27 ++++++++++++++--------- server.go | 29 +++++++++++++----------- vips.h | 2 +- 9 files changed, 206 insertions(+), 59 deletions(-) create mode 100644 bufpool.go delete mode 100644 gzip.go create mode 100644 gzippool.go diff --git a/bufpool.go b/bufpool.go new file mode 100644 index 00000000..51731963 --- /dev/null +++ b/bufpool.go @@ -0,0 +1,62 @@ +package main + +import ( + "bytes" + "sync" +) + +type bufPool struct { + mutex sync.Mutex + size int + top *bufPoolEntry +} + +type bufPoolEntry struct { + buf *bytes.Buffer + next *bufPoolEntry +} + +func newBufPool(n int, size int) *bufPool { + pool := bufPool{size: size} + + for i := 0; i < n; i++ { + pool.grow() + } + + return &pool +} + +func (p *bufPool) grow() { + var buf *bytes.Buffer + + if p.size == 0 { + buf = new(bytes.Buffer) + } else { + buf = bytes.NewBuffer(make([]byte, p.size, p.size)) + } + + p.top = &bufPoolEntry{buf: buf, next: p.top} +} + +func (p *bufPool) get() *bytes.Buffer { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.top == nil { + p.grow() + } + + buf := p.top.buf + buf.Reset() + + p.top = p.top.next + + return buf +} + +func (p *bufPool) put(buf *bytes.Buffer) { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.top = &bufPoolEntry{buf: buf, next: p.top} +} diff --git a/config.go b/config.go index 746bbb4f..218ee56b 100644 --- a/config.go +++ b/config.go @@ -193,6 +193,10 @@ type config struct { SentryDSN string SentryEnvironment string SentryRelease string + + FreeMemoryInterval int + DownloadBufferSize int + GZipBufferSize int } var conf = config{ @@ -217,6 +221,7 @@ var conf = config{ HoneybadgerEnv: "production", SentryEnvironment: "production", SentryRelease: fmt.Sprintf("imgproxy/%s", version), + FreeMemoryInterval: 10, } func init() { @@ -308,6 +313,10 @@ func init() { strEnvConfig(&conf.SentryEnvironment, "IMGPROXY_SENTRY_ENVIRONMENT") strEnvConfig(&conf.SentryRelease, "IMGPROXY_SENTRY_RELEASE") + intEnvConfig(&conf.FreeMemoryInterval, "IMGPROXY_FREE_MEMORY_INTERVAL") + intEnvConfig(&conf.DownloadBufferSize, "IMGPROXY_DOWNLOAD_BUFFER_SIZE") + intEnvConfig(&conf.GZipBufferSize, "IMGPROXY_GZIP_BUFFER_SIZE") + if len(conf.Keys) != len(conf.Salts) { logFatal("Number of keys and number of salts should be equal. Keys: %d, salts: %d", len(conf.Keys), len(conf.Salts)) } @@ -410,6 +419,22 @@ func init() { logFatal("Can't use the same binding for the main server and Prometheus") } + if conf.FreeMemoryInterval <= 0 { + logFatal("Free memory interval should be greater than zero") + } + + if conf.DownloadBufferSize < 0 { + logFatal("Download buffer size should be greater than or quual to 0") + } else if conf.DownloadBufferSize > int(^uint32(0)) { + logFatal("Download buffer size can't be creater than %d", ^uint32(0)) + } + + if conf.GZipBufferSize < 0 { + logFatal("GZip buffer size should be greater than or quual to 0") + } else if conf.GZipBufferSize > int(^uint32(0)) { + logFatal("GZip buffer size can't be creater than %d", ^uint32(0)) + } + initDownloading() initNewrelic() initPrometheus() diff --git a/download.go b/download.go index 9be2c008..44acc75c 100644 --- a/download.go +++ b/download.go @@ -9,7 +9,6 @@ import ( "io" "io/ioutil" "net/http" - "sync" "time" _ "image/gif" @@ -31,11 +30,7 @@ var ( const msgSourceImageIsUnreachable = "Source image is unreachable" -var downloadBufPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, -} +var downloadBufPool *bufPool func initDownloading() { transport := &http.Transport{ @@ -62,6 +57,8 @@ func initDownloading() { Timeout: time.Duration(conf.DownloadTimeout) * time.Second, Transport: transport, } + + downloadBufPool = newBufPool(conf.Concurrency, conf.DownloadBufferSize) } func checkDimensions(width, height int) error { @@ -95,10 +92,9 @@ func checkTypeAndDimensions(r io.Reader) (imageType, error) { } func readAndCheckImage(ctx context.Context, res *http.Response) (context.Context, context.CancelFunc, error) { - buf := downloadBufPool.Get().(*bytes.Buffer) + buf := downloadBufPool.get() cancel := func() { - buf.Reset() - downloadBufPool.Put(buf) + downloadBufPool.put(buf) } imgtype, err := checkTypeAndDimensions(io.TeeReader(res.Body, buf)) diff --git a/gzip.go b/gzip.go deleted file mode 100644 index b37ccb4c..00000000 --- a/gzip.go +++ /dev/null @@ -1,26 +0,0 @@ -package main - -import ( - "compress/gzip" - "io" - "os" - "sync" -) - -var nullwriter, _ = os.Open("/dev/null") - -var gzipPool = sync.Pool{ - New: func() interface{} { - gz, _ := gzip.NewWriterLevel(nullwriter, conf.GZipCompression) - return gz - }, -} - -func gzipData(data []byte, w io.Writer) { - gz := gzipPool.Get().(*gzip.Writer) - defer gzipPool.Put(gz) - - gz.Reset(w) - gz.Write(data) - gz.Close() -} diff --git a/gzippool.go b/gzippool.go new file mode 100644 index 00000000..906a7dc6 --- /dev/null +++ b/gzippool.go @@ -0,0 +1,63 @@ +package main + +import ( + "compress/gzip" + "io" + "io/ioutil" + "sync" +) + +type gzipPool struct { + mutex sync.Mutex + top *gzipPoolEntry +} + +type gzipPoolEntry struct { + gz *gzip.Writer + next *gzipPoolEntry +} + +func newGzipPool(n int) *gzipPool { + pool := new(gzipPool) + + for i := 0; i < n; i++ { + pool.grow() + } + + return pool +} + +func (p *gzipPool) grow() { + gz, err := gzip.NewWriterLevel(ioutil.Discard, conf.GZipCompression) + if err != nil { + logFatal("Can't init GZip compression: %s", err) + } + + p.top = &gzipPoolEntry{ + gz: gz, + next: p.top, + } +} + +func (p *gzipPool) get(w io.Writer) *gzip.Writer { + p.mutex.Lock() + defer p.mutex.Unlock() + + if p.top == nil { + p.grow() + } + + gz := p.top.gz + gz.Reset(w) + + p.top = p.top.next + + return gz +} + +func (p *gzipPool) put(gz *gzip.Writer) { + p.mutex.Lock() + defer p.mutex.Unlock() + + p.top = &gzipPoolEntry{gz: gz, next: p.top} +} diff --git a/main.go b/main.go index c6ef0afb..7006cd12 100644 --- a/main.go +++ b/main.go @@ -3,6 +3,9 @@ package main import ( "os" "os/signal" + "runtime" + "runtime/debug" + "time" "net/http" _ "net/http/pprof" @@ -13,6 +16,20 @@ const version = "2.1.5" type ctxKey string func main() { + go func() { + var logMemStats = len(os.Getenv("IMGPROXY_LOG_MEM_STATS")) > 0 + + for range time.Tick(time.Duration(conf.FreeMemoryInterval) * time.Second) { + debug.FreeOSMemory() + + if logMemStats { + var m runtime.MemStats + runtime.ReadMemStats(&m) + logNotice("[MEMORY USAGE] Sys: %d; HeapIdle: %d; HeapInuse: %d", m.Sys/1024/1024, m.HeapIdle/1024/1024, m.HeapInuse/1024/1024) + } + } + }() + if len(os.Getenv("IMGPROXY_PPROF_BIND")) > 0 { go func() { http.ListenAndServe(os.Getenv("IMGPROXY_PPROF_BIND"), nil) diff --git a/process.go b/process.go index ce059d5a..a9dd87b4 100644 --- a/process.go +++ b/process.go @@ -491,7 +491,7 @@ func transformGif(ctx context.Context, img **C.struct__VipsImage, po *processing return nil } -func processImage(ctx context.Context) ([]byte, error) { +func processImage(ctx context.Context) ([]byte, context.CancelFunc, error) { runtime.LockOSThread() defer runtime.UnlockOSThread() @@ -511,7 +511,7 @@ func processImage(ctx context.Context) ([]byte, error) { imgtype := getImageType(ctx) if po.Gravity.Type == gravitySmart && !vipsSupportSmartcrop { - return nil, errSmartCropNotSupported + return nil, func() {}, errSmartCropNotSupported } if po.Format == imageTypeUnknown { @@ -524,17 +524,17 @@ func processImage(ctx context.Context) ([]byte, error) { img, err := vipsLoadImage(data, imgtype, 1, 1.0, po.Format == imageTypeGIF) if err != nil { - return nil, err + return nil, func() {}, err } defer C.clear_image(&img) if imgtype == imageTypeGIF && po.Format == imageTypeGIF && vipsIsAnimatedGif(img) { if err := transformGif(ctx, &img, po); err != nil { - return nil, err + return nil, func() {}, err } } else { if err := transformImage(ctx, &img, data, po, imgtype); err != nil { - return nil, err + return nil, func() {}, err } } @@ -542,7 +542,7 @@ func processImage(ctx context.Context) ([]byte, error) { if po.Format == imageTypeGIF { if err := vipsCastUchar(&img); err != nil { - return nil, err + return nil, func() {}, err } checkTimeout(ctx) } @@ -645,9 +645,8 @@ func vipsLoadImage(data []byte, imgtype imageType, shrink int, svgScale float64, return img, nil } -func vipsSaveImage(img *C.struct__VipsImage, imgtype imageType, quality int) ([]byte, error) { +func vipsSaveImage(img *C.struct__VipsImage, imgtype imageType, quality int) ([]byte, context.CancelFunc, error) { var ptr unsafe.Pointer - defer C.g_free_go(&ptr) err := C.int(0) @@ -669,10 +668,18 @@ func vipsSaveImage(img *C.struct__VipsImage, imgtype imageType, quality int) ([] err = C.vips_icosave_go(img, &ptr, &imgsize) } if err != 0 { - return nil, vipsError() + return nil, func() {}, vipsError() } - return C.GoBytes(ptr, C.int(imgsize)), nil + const maxBufSize = ^uint32(0) + + b := (*[maxBufSize]byte)(ptr)[:int(imgsize):int(imgsize)] + + cancel := func() { + C.g_free_go(&ptr) + } + + return b, cancel, nil } func vipsArrayjoin(in []*C.struct__VipsImage, out **C.struct__VipsImage) error { diff --git a/server.go b/server.go index 52c940b9..1d9de92d 100644 --- a/server.go +++ b/server.go @@ -1,7 +1,6 @@ package main import ( - "bytes" "context" "crypto/subtle" "fmt" @@ -11,7 +10,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "time" nanoid "github.com/matoous/go-nanoid" @@ -46,13 +44,10 @@ var ( errInvalidMethod = newError(422, "Invalid request method", "Method doesn't allowed") errInvalidSecret = newError(403, "Invalid secret", "Forbidden") -) -var responseBufPool = sync.Pool{ - New: func() interface{} { - return new(bytes.Buffer) - }, -} + responseGzipBufPool *bufPool + responseGzipPool *gzipPool +) type httpHandler struct { sem chan struct{} @@ -73,6 +68,11 @@ func startServer() *http.Server { MaxHeaderBytes: 1 << 20, } + if conf.GZipCompression > 0 { + responseGzipBufPool = newBufPool(conf.Concurrency, conf.GZipBufferSize) + responseGzipPool = newGzipPool(conf.Concurrency) + } + go func() { logNotice("Starting server at %s", conf.Bind) if err := s.Serve(netutil.LimitListener(l, conf.MaxClients)); err != nil && err != http.ErrServerClosed { @@ -122,12 +122,14 @@ func respondWithImage(ctx context.Context, reqID string, r *http.Request, rw htt rw.Header().Set("Content-Disposition", contentDisposition(getImageURL(ctx), po.Format)) if conf.GZipCompression > 0 && strings.Contains(r.Header.Get("Accept-Encoding"), "gzip") { - buf := responseBufPool.Get().(*bytes.Buffer) - defer responseBufPool.Put(buf) + buf := responseGzipBufPool.get() + defer responseGzipBufPool.put(buf) - buf.Reset() + gz := responseGzipPool.get(buf) + defer responseGzipPool.put(gz) - gzipData(data, buf) + gz.Write(data) + gz.Close() rw.Header().Set("Content-Encoding", "gzip") rw.Header().Set("Content-Length", strconv.Itoa(buf.Len())) @@ -279,7 +281,8 @@ func (h *httpHandler) ServeHTTP(rw http.ResponseWriter, r *http.Request) { checkTimeout(ctx) - imageData, err := processImage(ctx) + imageData, processcancel, err := processImage(ctx) + defer processcancel() if err != nil { if newRelicEnabled { sendErrorToNewRelic(ctx, err) diff --git a/vips.h b/vips.h index ae1c60b8..96103be4 100644 --- a/vips.h +++ b/vips.h @@ -325,6 +325,6 @@ vips_icosave_go(VipsImage *in, void **buf, size_t *len) { void vips_cleanup() { - vips_thread_shutdown(); vips_error_clear(); + vips_thread_shutdown(); }