1
0
mirror of https://github.com/imgproxy/imgproxy.git synced 2025-12-23 22:11:10 +02:00
Files
imgproxy/handlers/stream/handler.go
2025-10-01 20:05:06 +02:00

195 lines
5.1 KiB
Go

package stream
import (
"context"
"io"
"log/slog"
"net/http"
"sync"
"github.com/imgproxy/imgproxy/v3/cookies"
"github.com/imgproxy/imgproxy/v3/fetcher"
"github.com/imgproxy/imgproxy/v3/httpheaders"
"github.com/imgproxy/imgproxy/v3/ierrors"
"github.com/imgproxy/imgproxy/v3/monitoring"
"github.com/imgproxy/imgproxy/v3/options"
"github.com/imgproxy/imgproxy/v3/options/keys"
"github.com/imgproxy/imgproxy/v3/server"
)
const (
streamBufferSize = 4096 // Size of the buffer used for streaming
categoryStreaming = "streaming" // Streaming error category
)
var (
// streamBufPool is a sync.Pool for reusing byte slices used for streaming
streamBufPool = sync.Pool{
New: func() any {
buf := make([]byte, streamBufferSize)
return &buf
},
}
)
// HandlerContext provides access to shared handler dependencies
type HandlerContext interface {
Fetcher() *fetcher.Fetcher
Cookies() *cookies.Cookies
Monitoring() *monitoring.Monitoring
}
// Handler handles image passthrough requests, allowing images to be streamed directly
type Handler struct {
HandlerContext
config *Config // Configuration for the streamer
}
// request holds the parameters and state for a single streaming request
type request struct {
HandlerContext
handler *Handler
imageRequest *http.Request
imageURL string
reqID string
opts *options.Options
rw server.ResponseWriter
}
// New creates new handler object
func New(hCtx HandlerContext, config *Config) (*Handler, error) {
if err := config.Validate(); err != nil {
return nil, err
}
return &Handler{
HandlerContext: hCtx,
config: config,
}, nil
}
// Stream handles the image passthrough request, streaming the image directly to the response writer
func (s *Handler) Execute(
ctx context.Context,
userRequest *http.Request,
imageURL string,
reqID string,
o *options.Options,
rw server.ResponseWriter,
) error {
stream := &request{
HandlerContext: s.HandlerContext,
handler: s,
imageRequest: userRequest,
imageURL: imageURL,
reqID: reqID,
opts: o,
rw: rw,
}
return stream.execute(ctx)
}
// execute handles the actual streaming logic
func (s *request) execute(ctx context.Context) error {
s.Monitoring().Stats().IncImagesInProgress()
defer s.Monitoring().Stats().DecImagesInProgress()
defer s.Monitoring().StartStreamingSegment(ctx)()
// Passthrough request headers from the original request
requestHeaders := s.getImageRequestHeaders()
cookieJar, err := s.getCookieJar()
if err != nil {
return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
}
// Build the request to fetch the image
r, err := s.Fetcher().BuildRequest(ctx, s.imageURL, requestHeaders, cookieJar)
if r != nil {
defer r.Cancel()
}
if err != nil {
return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
}
// Send the request to fetch the image
res, err := r.Send()
if res != nil {
defer res.Body.Close()
}
if err != nil {
return ierrors.Wrap(err, 0, ierrors.WithCategory(categoryStreaming))
}
// Output streaming response headers
s.rw.SetOriginHeaders(res.Header)
s.rw.Passthrough(s.handler.config.PassthroughResponseHeaders...) // NOTE: priority? This is lowest as it was
s.rw.SetContentLength(int(res.ContentLength))
s.rw.SetCanonical(s.imageURL)
s.rw.SetExpires(s.opts.GetTime(keys.Expires))
// Set the Content-Disposition header
s.setContentDisposition(r.URL().Path, res)
// Copy the status code from the original response
s.rw.WriteHeader(res.StatusCode)
// Write the actual data
s.streamData(res)
return nil
}
// getCookieJar returns non-empty cookie jar if cookie passthrough is enabled
func (s *request) getCookieJar() (http.CookieJar, error) {
return s.Cookies().JarFromRequest(s.imageRequest)
}
// getImageRequestHeaders returns a new http.Header containing only
// the headers that should be passed through from the user request
func (s *request) getImageRequestHeaders() http.Header {
h := make(http.Header)
httpheaders.CopyFromRequest(s.imageRequest, h, s.handler.config.PassthroughRequestHeaders)
return h
}
// setContentDisposition writes the headers to the response writer
func (s *request) setContentDisposition(imagePath string, serverResponse *http.Response) {
// Try to set correct Content-Disposition file name and extension
if serverResponse.StatusCode < 200 || serverResponse.StatusCode >= 300 {
return
}
ct := serverResponse.Header.Get(httpheaders.ContentType)
s.rw.SetContentDisposition(
imagePath,
s.opts.GetString(keys.Filename, ""),
"",
ct,
s.opts.GetBool(keys.ReturnAttachment, false),
)
}
// streamData copies the image data from the response body to the response writer
func (s *request) streamData(res *http.Response) {
buf := streamBufPool.Get().(*[]byte)
defer streamBufPool.Put(buf)
_, copyerr := io.CopyBuffer(s.rw, res.Body, *buf)
server.LogResponse(
s.reqID, s.imageRequest, res.StatusCode, nil,
slog.String("image_url", s.imageURL),
slog.Any("processing_options", s.opts),
)
// We've got to skip logging here
if copyerr != nil {
panic(http.ErrAbortHandler)
}
}