mirror of
https://github.com/go-micro/go-micro.git
synced 2025-06-12 22:07:47 +02:00
fix: easy lint fixes to api/ (#2567)
This commit is contained in:
@ -28,6 +28,7 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
// Handler is the name of this handler.
|
||||
Handler = "rpc"
|
||||
packageID = "go.micro.api"
|
||||
)
|
||||
@ -76,6 +77,7 @@ func strategy(services []*registry.Service) selector.Strategy {
|
||||
func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
logger := h.opts.Logger
|
||||
bsize := handler.DefaultMaxRecvSize
|
||||
|
||||
if h.opts.MaxRecvSize > 0 {
|
||||
bsize = h.opts.MaxRecvSize
|
||||
}
|
||||
@ -83,6 +85,7 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
r.Body = http.MaxBytesReader(w, r.Body, bsize)
|
||||
|
||||
defer r.Body.Close()
|
||||
|
||||
var service *router.Route
|
||||
|
||||
if h.opts.Router != nil {
|
||||
@ -93,8 +96,10 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if werr != nil {
|
||||
logger.Log(log.ErrorLevel, werr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
service = s
|
||||
} else {
|
||||
// we have no way of routing the request
|
||||
@ -105,18 +110,18 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
return
|
||||
}
|
||||
|
||||
ct := r.Header.Get("Content-Type")
|
||||
contentType := r.Header.Get("Content-Type")
|
||||
|
||||
// Strip charset from Content-Type (like `application/json; charset=UTF-8`)
|
||||
if idx := strings.IndexRune(ct, ';'); idx >= 0 {
|
||||
ct = ct[:idx]
|
||||
if idx := strings.IndexRune(contentType, ';'); idx >= 0 {
|
||||
contentType = contentType[:idx]
|
||||
}
|
||||
|
||||
// micro client
|
||||
c := h.opts.Client
|
||||
myClient := h.opts.Client
|
||||
|
||||
// create context
|
||||
cx := ctx.FromRequest(r)
|
||||
myContext := ctx.FromRequest(r)
|
||||
// get context from http handler wrappers
|
||||
md, ok := metadata.FromContext(r.Context())
|
||||
if !ok {
|
||||
@ -132,23 +137,24 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
|
||||
// merge context with overwrite
|
||||
cx = metadata.MergeContext(cx, md, true)
|
||||
myContext = metadata.MergeContext(myContext, md, true)
|
||||
|
||||
// set merged context to request
|
||||
*r = *r.Clone(cx)
|
||||
*r = *r.Clone(myContext)
|
||||
// if stream we currently only support json
|
||||
if isStream(r, service) {
|
||||
// drop older context as it can have timeouts and create new
|
||||
// md, _ := metadata.FromContext(cx)
|
||||
// serveWebsocket(context.TODO(), w, r, service, c)
|
||||
if err := serveWebsocket(cx, w, r, service, c); err != nil {
|
||||
if err := serveWebsocket(myContext, w, r, service, myClient); err != nil {
|
||||
logger.Log(log.ErrorLevel, err)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// create strategy
|
||||
so := selector.WithStrategy(strategy(service.Versions))
|
||||
mySelector := selector.WithStrategy(strategy(service.Versions))
|
||||
|
||||
// walk the standard call path
|
||||
// get payload
|
||||
@ -157,6 +163,7 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if werr := writeError(w, r, err); werr != nil {
|
||||
logger.Log(log.ErrorLevel, werr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -164,7 +171,7 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
|
||||
switch {
|
||||
// proto codecs
|
||||
case hasCodec(ct, protoCodecs):
|
||||
case hasCodec(contentType, protoCodecs):
|
||||
request := &proto.Message{}
|
||||
// if the extracted payload isn't empty lets use it
|
||||
if len(br) > 0 {
|
||||
@ -174,18 +181,19 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// create request/response
|
||||
response := &proto.Message{}
|
||||
|
||||
req := c.NewRequest(
|
||||
req := myClient.NewRequest(
|
||||
service.Service,
|
||||
service.Endpoint.Name,
|
||||
request,
|
||||
client.WithContentType(ct),
|
||||
client.WithContentType(contentType),
|
||||
)
|
||||
|
||||
// make the call
|
||||
if err := c.Call(cx, req, response, client.WithSelectOption(so)); err != nil {
|
||||
if err := myClient.Call(myContext, req, response, client.WithSelectOption(mySelector)); err != nil {
|
||||
if werr := writeError(w, r, err); werr != nil {
|
||||
logger.Log(log.ErrorLevel, werr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -195,13 +203,14 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if werr := writeError(w, r, err); werr != nil {
|
||||
logger.Log(log.ErrorLevel, werr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
default:
|
||||
// if json codec is not present set to json
|
||||
if !hasCodec(ct, jsonCodecs) {
|
||||
ct = "application/json"
|
||||
if !hasCodec(contentType, jsonCodecs) {
|
||||
contentType = "application/json"
|
||||
}
|
||||
|
||||
// default to trying json
|
||||
@ -214,17 +223,18 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
// create request/response
|
||||
var response json.RawMessage
|
||||
|
||||
req := c.NewRequest(
|
||||
req := myClient.NewRequest(
|
||||
service.Service,
|
||||
service.Endpoint.Name,
|
||||
&request,
|
||||
client.WithContentType(ct),
|
||||
client.WithContentType(contentType),
|
||||
)
|
||||
// make the call
|
||||
if err := c.Call(cx, req, &response, client.WithSelectOption(so)); err != nil {
|
||||
if err := myClient.Call(myContext, req, &response, client.WithSelectOption(mySelector)); err != nil {
|
||||
if werr := writeError(w, r, err); werr != nil {
|
||||
logger.Log(log.ErrorLevel, werr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
@ -234,6 +244,7 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
if werr := writeError(w, r, err); werr != nil {
|
||||
logger.Log(log.ErrorLevel, werr)
|
||||
}
|
||||
|
||||
return
|
||||
}
|
||||
}
|
||||
@ -244,8 +255,8 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
||||
}
|
||||
}
|
||||
|
||||
func (rh *rpcHandler) String() string {
|
||||
return "rpc"
|
||||
func (h *rpcHandler) String() string {
|
||||
return Handler
|
||||
}
|
||||
|
||||
func hasCodec(ct string, codecs []string) bool {
|
||||
@ -254,6 +265,7 @@ func hasCodec(ct string, codecs []string) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
@ -266,37 +278,44 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
||||
// we have to decode json-rpc and proto-rpc because we suck
|
||||
// well actually because there's no proxy codec right now
|
||||
|
||||
ct := r.Header.Get("Content-Type")
|
||||
myCt := r.Header.Get("Content-Type")
|
||||
|
||||
switch {
|
||||
case strings.Contains(ct, "application/json-rpc"):
|
||||
case strings.Contains(myCt, "application/json-rpc"):
|
||||
msg := codec.Message{
|
||||
Type: codec.Request,
|
||||
Header: make(map[string]string),
|
||||
}
|
||||
|
||||
c := jsonrpc.NewCodec(&buffer{r.Body})
|
||||
if err = c.ReadHeader(&msg, codec.Request); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var raw json.RawMessage
|
||||
if err = c.ReadBody(&raw); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return ([]byte)(raw), nil
|
||||
case strings.Contains(ct, "application/proto-rpc"), strings.Contains(ct, "application/octet-stream"):
|
||||
case strings.Contains(myCt, "application/proto-rpc"), strings.Contains(myCt, "application/octet-stream"):
|
||||
msg := codec.Message{
|
||||
Type: codec.Request,
|
||||
Header: make(map[string]string),
|
||||
}
|
||||
|
||||
c := protorpc.NewCodec(&buffer{r.Body})
|
||||
if err = c.ReadHeader(&msg, codec.Request); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
var raw proto.Message
|
||||
if err = c.ReadBody(&raw); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return raw.Marshal()
|
||||
case strings.Contains(ct, "application/www-x-form-urlencoded"):
|
||||
case strings.Contains(myCt, "application/www-x-form-urlencoded"):
|
||||
if err := r.ParseForm(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -314,6 +333,7 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
||||
|
||||
// otherwise as per usual
|
||||
ctx := r.Context()
|
||||
|
||||
// dont user meadata.FromContext as it mangles names
|
||||
md, ok := metadata.FromContext(ctx)
|
||||
if !ok {
|
||||
@ -330,9 +350,11 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
||||
// filter own keys
|
||||
if strings.HasPrefix(k, "x-api-field-") {
|
||||
matches[strings.TrimPrefix(k, "x-api-field-")] = v
|
||||
|
||||
delete(md, k)
|
||||
} else if k == "x-api-body" {
|
||||
bodydst = v
|
||||
|
||||
delete(md, k)
|
||||
}
|
||||
}
|
||||
@ -343,10 +365,12 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
||||
// get fields from url values
|
||||
if len(r.URL.RawQuery) > 0 {
|
||||
umd := make(map[string]interface{})
|
||||
|
||||
err = qson.Unmarshal(&umd, r.URL.RawQuery)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
for k, v := range umd {
|
||||
matches[k] = v
|
||||
}
|
||||
@ -361,24 +385,29 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
||||
req[k] = v
|
||||
continue
|
||||
}
|
||||
|
||||
em := make(map[string]interface{})
|
||||
em[ps[len(ps)-1]] = v
|
||||
|
||||
for i := len(ps) - 2; i > 0; i-- {
|
||||
nm := make(map[string]interface{})
|
||||
nm[ps[i]] = em
|
||||
em = nm
|
||||
}
|
||||
|
||||
if vm, ok := req[ps[0]]; ok {
|
||||
// nested map
|
||||
nm := vm.(map[string]interface{})
|
||||
for vk, vv := range em {
|
||||
nm[vk] = vv
|
||||
}
|
||||
|
||||
req[ps[0]] = nm
|
||||
} else {
|
||||
req[ps[0]] = em
|
||||
}
|
||||
}
|
||||
|
||||
pathbuf := []byte("{}")
|
||||
if len(req) > 0 {
|
||||
pathbuf, err = json.Marshal(req)
|
||||
@ -388,41 +417,48 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
||||
}
|
||||
|
||||
urlbuf := []byte("{}")
|
||||
|
||||
out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
switch r.Method {
|
||||
case "GET":
|
||||
case http.MethodGet:
|
||||
// empty response
|
||||
if strings.Contains(ct, "application/json") && string(out) == "{}" {
|
||||
if strings.Contains(myCt, "application/json") && string(out) == "{}" {
|
||||
return out, nil
|
||||
} else if string(out) == "{}" && !strings.Contains(ct, "application/json") {
|
||||
} else if string(out) == "{}" && !strings.Contains(myCt, "application/json") {
|
||||
return []byte{}, nil
|
||||
}
|
||||
|
||||
return out, nil
|
||||
case "PATCH", "POST", "PUT", "DELETE":
|
||||
case http.MethodPatch, http.MethodPost, http.MethodPut, http.MethodDelete:
|
||||
bodybuf := []byte("{}")
|
||||
buf := bufferPool.Get()
|
||||
|
||||
defer bufferPool.Put(buf)
|
||||
if _, err := buf.ReadFrom(r.Body); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if b := buf.Bytes(); len(b) > 0 {
|
||||
bodybuf = b
|
||||
}
|
||||
|
||||
if bodydst == "" || bodydst == "*" {
|
||||
if out, err = jsonpatch.MergeMergePatches(out, bodybuf); err == nil {
|
||||
return out, nil
|
||||
}
|
||||
}
|
||||
|
||||
var jsonbody map[string]interface{}
|
||||
if json.Valid(bodybuf) {
|
||||
if err = json.Unmarshal(bodybuf, &jsonbody); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
dstmap := make(map[string]interface{})
|
||||
ps := strings.Split(bodydst, ".")
|
||||
if len(ps) == 1 {
|
||||
@ -463,33 +499,35 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
||||
return []byte{}, nil
|
||||
}
|
||||
|
||||
func writeError(w http.ResponseWriter, r *http.Request, err error) error {
|
||||
func writeError(rsp http.ResponseWriter, req *http.Request, err error) error {
|
||||
ce := errors.Parse(err.Error())
|
||||
|
||||
switch ce.Code {
|
||||
case 0:
|
||||
// assuming it's totally screwed
|
||||
ce.Code = 500
|
||||
ce.Code = http.StatusInternalServerError
|
||||
ce.Id = packageID
|
||||
ce.Status = http.StatusText(500)
|
||||
ce.Status = http.StatusText(http.StatusInternalServerError)
|
||||
ce.Detail = "error during request: " + ce.Detail
|
||||
w.WriteHeader(500)
|
||||
|
||||
rsp.WriteHeader(http.StatusInternalServerError)
|
||||
default:
|
||||
w.WriteHeader(int(ce.Code))
|
||||
rsp.WriteHeader(int(ce.Code))
|
||||
}
|
||||
|
||||
// response content type
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
rsp.Header().Set("Content-Type", "application/json")
|
||||
|
||||
// Set trailers
|
||||
if strings.Contains(r.Header.Get("Content-Type"), "application/grpc") {
|
||||
w.Header().Set("Trailer", "grpc-status")
|
||||
w.Header().Set("Trailer", "grpc-message")
|
||||
w.Header().Set("grpc-status", "13")
|
||||
w.Header().Set("grpc-message", ce.Detail)
|
||||
if strings.Contains(req.Header.Get("Content-Type"), "application/grpc") {
|
||||
rsp.Header().Set("Trailer", "grpc-status")
|
||||
rsp.Header().Set("Trailer", "grpc-message")
|
||||
rsp.Header().Set("grpc-status", "13")
|
||||
rsp.Header().Set("grpc-message", ce.Detail)
|
||||
}
|
||||
|
||||
_, werr := w.Write([]byte(ce.Error()))
|
||||
_, werr := rsp.Write([]byte(ce.Error()))
|
||||
|
||||
return werr
|
||||
}
|
||||
|
||||
@ -512,11 +550,14 @@ func writeResponse(w http.ResponseWriter, r *http.Request, rsp []byte) error {
|
||||
|
||||
// write response
|
||||
_, err := w.Write(rsp)
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
// NewHandler returns a new RPC handler.
|
||||
func NewHandler(opts ...handler.Option) handler.Handler {
|
||||
options := handler.NewOptions(opts...)
|
||||
|
||||
return &rpcHandler{
|
||||
opts: options,
|
||||
}
|
||||
|
Reference in New Issue
Block a user