mirror of
				https://github.com/go-micro/go-micro.git
				synced 2025-10-30 23:27:41 +02:00 
			
		
		
		
	Merge branch 'auth-interface-update' of https://github.com/micro/go-micro into auth-interface-update
This commit is contained in:
		| @@ -118,6 +118,17 @@ func (h *rpcHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) { | ||||
|  | ||||
| 	// create context | ||||
| 	cx := ctx.FromRequest(r) | ||||
| 	// get context from http handler wrappers | ||||
| 	md, ok := r.Context().Value(metadata.MetadataKey{}).(metadata.Metadata) | ||||
| 	if !ok { | ||||
| 		md = make(metadata.Metadata) | ||||
| 	} | ||||
|  | ||||
| 	// merge context with overwrite | ||||
| 	cx = metadata.MergeContext(cx, md, true) | ||||
|  | ||||
| 	// set merged context to request | ||||
| 	*r = *r.Clone(cx) | ||||
|  | ||||
| 	// if stream we currently only support json | ||||
| 	if isStream(r, service) { | ||||
| @@ -284,26 +295,43 @@ func requestPayload(r *http.Request) ([]byte, error) { | ||||
| 	if !ok { | ||||
| 		md = make(map[string]string) | ||||
| 	} | ||||
|  | ||||
| 	// allocate maximum | ||||
| 	matches := make(map[string]string, len(md)) | ||||
| 	matches := make(map[string]interface{}, len(md)) | ||||
|  | ||||
| 	// get fields from url path | ||||
| 	for k, v := range md { | ||||
| 		// filter own keys | ||||
| 		if strings.HasPrefix(k, "x-api-field-") { | ||||
| 			matches[strings.TrimPrefix(k, "x-api-field-")] = v | ||||
| 			delete(md, k) | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	// map of all fields | ||||
| 	req := make(map[string]interface{}, len(md)) | ||||
|  | ||||
| 	// get fields from url values | ||||
| 	if len(r.URL.RawQuery) > 0 { | ||||
| 		umd := make(map[string]interface{}) | ||||
| 		err = qson.Unmarshal(&umd, r.URL.RawQuery) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 		for k, v := range umd { | ||||
| 			matches[k] = v | ||||
| 		} | ||||
| 		delete(md, k) | ||||
| 	} | ||||
|  | ||||
| 	// restore context without fields | ||||
| 	ctx = metadata.NewContext(ctx, md) | ||||
| 	*r = *r.WithContext(ctx) | ||||
| 	req := make(map[string]interface{}, len(md)) | ||||
| 	*r = *r.Clone(metadata.NewContext(ctx, md)) | ||||
|  | ||||
| 	for k, v := range matches { | ||||
| 		ps := strings.Split(k, ".") | ||||
| 		if len(ps) == 1 { | ||||
| 			req[k] = v | ||||
| 			continue | ||||
| 		} | ||||
|  | ||||
| 		em := make(map[string]interface{}) | ||||
| 		em[ps[len(ps)-1]] = v | ||||
| 		for i := len(ps) - 2; i > 0; i-- { | ||||
| @@ -311,7 +339,16 @@ func requestPayload(r *http.Request) ([]byte, error) { | ||||
| 			nm[ps[i]] = em | ||||
| 			em = nm | ||||
| 		} | ||||
| 		req[ps[0]] = em | ||||
| 		if vm, ok := req[ps[0]]; ok { | ||||
| 			// nested map | ||||
| 			nm := vm.(map[string]interface{}) | ||||
| 			for vk, vv := range em { | ||||
| 				nm[vk] = vv | ||||
| 			} | ||||
| 			req[ps[0]] = nm | ||||
| 		} else { | ||||
| 			req[ps[0]] = em | ||||
| 		} | ||||
| 	} | ||||
| 	pathbuf := []byte("{}") | ||||
| 	if len(req) > 0 { | ||||
| @@ -320,14 +357,8 @@ func requestPayload(r *http.Request) ([]byte, error) { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
| 	urlbuf := []byte("{}") | ||||
| 	if len(r.URL.RawQuery) > 0 { | ||||
| 		urlbuf, err = qson.ToJSON(r.URL.RawQuery) | ||||
| 		if err != nil { | ||||
| 			return nil, err | ||||
| 		} | ||||
| 	} | ||||
|  | ||||
| 	urlbuf := []byte("{}") | ||||
| 	out, err := jsonpatch.MergeMergePatches(urlbuf, pathbuf) | ||||
| 	if err != nil { | ||||
| 		return nil, err | ||||
|   | ||||
| @@ -255,14 +255,14 @@ func (r *staticRouter) endpoint(req *http.Request) (*endpoint, error) { | ||||
| 			} | ||||
| 			pMatch = true | ||||
| 			ctx := req.Context() | ||||
| 			md, ok := metadata.FromContext(ctx) | ||||
| 			md, ok := ctx.Value(metadata.MetadataKey{}).(metadata.Metadata) | ||||
| 			if !ok { | ||||
| 				md = make(metadata.Metadata) | ||||
| 			} | ||||
| 			for k, v := range matches { | ||||
| 				md[fmt.Sprintf("x-api-field-%s", k)] = v | ||||
| 			} | ||||
| 			*req = *req.WithContext(context.WithValue(ctx, metadata.MetadataKey{}, md)) | ||||
| 			*req = *req.Clone(context.WithValue(ctx, metadata.MetadataKey{}, md)) | ||||
| 			break pathLoop | ||||
| 		} | ||||
| 		if !pMatch { | ||||
|   | ||||
| @@ -221,7 +221,9 @@ func (g *grpcClient) stream(ctx context.Context, node *registry.Node, req client | ||||
| 	} | ||||
|  | ||||
| 	// set timeout in nanoseconds | ||||
| 	header["timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) | ||||
| 	if opts.StreamTimeout > time.Duration(0) { | ||||
| 		header["timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) | ||||
| 	} | ||||
| 	// set the content type for the request | ||||
| 	header["x-content-type"] = req.ContentType() | ||||
|  | ||||
|   | ||||
| @@ -57,6 +57,8 @@ type CallOptions struct { | ||||
| 	Retries int | ||||
| 	// Request/Response timeout | ||||
| 	RequestTimeout time.Duration | ||||
| 	// Stream timeout for the stream | ||||
| 	StreamTimeout time.Duration | ||||
| 	// Use the services own auth token | ||||
| 	ServiceToken bool | ||||
|  | ||||
| @@ -227,6 +229,13 @@ func RequestTimeout(d time.Duration) Option { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // StreamTimeout sets the stream timeout | ||||
| func StreamTimeout(d time.Duration) Option { | ||||
| 	return func(o *Options) { | ||||
| 		o.CallOptions.StreamTimeout = d | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // Transport dial timeout | ||||
| func DialTimeout(d time.Duration) Option { | ||||
| 	return func(o *Options) { | ||||
| @@ -295,6 +304,13 @@ func WithRequestTimeout(d time.Duration) CallOption { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithStreamTimeout sets the stream timeout | ||||
| func WithStreamTimeout(d time.Duration) CallOption { | ||||
| 	return func(o *CallOptions) { | ||||
| 		o.StreamTimeout = d | ||||
| 	} | ||||
| } | ||||
|  | ||||
| // WithDialTimeout is a CallOption which overrides that which | ||||
| // set in Options.CallOptions | ||||
| func WithDialTimeout(d time.Duration) CallOption { | ||||
|   | ||||
| @@ -198,7 +198,9 @@ func (r *rpcClient) stream(ctx context.Context, node *registry.Node, req Request | ||||
| 	} | ||||
|  | ||||
| 	// set timeout in nanoseconds | ||||
| 	msg.Header["Timeout"] = fmt.Sprintf("%d", opts.RequestTimeout) | ||||
| 	if opts.StreamTimeout > time.Duration(0) { | ||||
| 		msg.Header["Timeout"] = fmt.Sprintf("%d", opts.StreamTimeout) | ||||
| 	} | ||||
| 	// set the content type for the request | ||||
| 	msg.Header["Content-Type"] = req.ContentType() | ||||
| 	// set the accept header | ||||
|   | ||||
| @@ -16,6 +16,8 @@ type Config interface { | ||||
| 	reader.Values | ||||
| 	// Init the config | ||||
| 	Init(opts ...Option) error | ||||
| 	// Options in the config | ||||
| 	Options() Options | ||||
| 	// Stop the config loader/watcher | ||||
| 	Close() error | ||||
| 	// Load config sources | ||||
|   | ||||
| @@ -67,6 +67,10 @@ func (c *config) Init(opts ...Option) error { | ||||
| 	return nil | ||||
| } | ||||
|  | ||||
| func (c *config) Options() Options { | ||||
| 	return c.opts | ||||
| } | ||||
|  | ||||
| func (c *config) run() { | ||||
| 	watch := func(w loader.Watcher) error { | ||||
| 		for { | ||||
|   | ||||
| @@ -25,6 +25,13 @@ func (md Metadata) Get(key string) (string, bool) { | ||||
| 	return val, ok | ||||
| } | ||||
|  | ||||
| func (md Metadata) Delete(key string) { | ||||
| 	// delete key as-is | ||||
| 	delete(md, key) | ||||
| 	// delete also Title key | ||||
| 	delete(md, strings.Title(key)) | ||||
| } | ||||
|  | ||||
| // Copy makes a copy of the metadata | ||||
| func Copy(md Metadata) Metadata { | ||||
| 	cmd := make(Metadata) | ||||
| @@ -34,13 +41,22 @@ func Copy(md Metadata) Metadata { | ||||
| 	return cmd | ||||
| } | ||||
|  | ||||
| // Delete key from metadata | ||||
| func Delete(ctx context.Context, k string) context.Context { | ||||
| 	return Set(ctx, k, "") | ||||
| } | ||||
|  | ||||
| // Set add key with val to metadata | ||||
| func Set(ctx context.Context, k, v string) context.Context { | ||||
| 	md, ok := FromContext(ctx) | ||||
| 	if !ok { | ||||
| 		md = make(Metadata) | ||||
| 	} | ||||
| 	md[k] = v | ||||
| 	if v == "" { | ||||
| 		delete(md, k) | ||||
| 	} else { | ||||
| 		md[k] = v | ||||
| 	} | ||||
| 	return context.WithValue(ctx, MetadataKey{}, md) | ||||
| } | ||||
|  | ||||
| @@ -96,8 +112,10 @@ func MergeContext(ctx context.Context, patchMd Metadata, overwrite bool) context | ||||
| 	for k, v := range patchMd { | ||||
| 		if _, ok := cmd[k]; ok && !overwrite { | ||||
| 			// skip | ||||
| 		} else { | ||||
| 		} else if v != "" { | ||||
| 			cmd[k] = v | ||||
| 		} else { | ||||
| 			delete(cmd, k) | ||||
| 		} | ||||
| 	} | ||||
| 	return context.WithValue(ctx, MetadataKey{}, cmd) | ||||
|   | ||||
| @@ -18,6 +18,27 @@ func TestMetadataSet(t *testing.T) { | ||||
| 	} | ||||
| } | ||||
|  | ||||
| func TestMetadataDelete(t *testing.T) { | ||||
| 	md := Metadata{ | ||||
| 		"Foo": "bar", | ||||
| 		"Baz": "empty", | ||||
| 	} | ||||
|  | ||||
| 	ctx := NewContext(context.TODO(), md) | ||||
| 	ctx = Delete(ctx, "Baz") | ||||
|  | ||||
| 	emd, ok := FromContext(ctx) | ||||
| 	if !ok { | ||||
| 		t.Fatal("key Key not found") | ||||
| 	} | ||||
|  | ||||
| 	_, ok = emd["Baz"] | ||||
| 	if ok { | ||||
| 		t.Fatal("key Baz not deleted") | ||||
| 	} | ||||
|  | ||||
| } | ||||
|  | ||||
| func TestMetadataCopy(t *testing.T) { | ||||
| 	md := Metadata{ | ||||
| 		"Foo": "bar", | ||||
|   | ||||
		Reference in New Issue
	
	Block a user