1
0
mirror of https://github.com/go-kratos/kratos.git synced 2025-01-18 03:21:57 +02:00

fix(transport/http): fixed the problem of getting empty node list (#1206)

* fix(transport/http): fixed the problem of getting empty node list

* fix http blocking

Co-authored-by: longXboy <longxboyhi@gmail.com>
This commit is contained in:
包子 2021-07-18 07:46:03 +08:00 committed by GitHub
parent b2ba585b2e
commit 5f678de2cc
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 71 additions and 5 deletions

View File

@ -7,6 +7,7 @@ import (
"github.com/go-kratos/etcd/registry"
"github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
clientv3 "go.etcd.io/etcd/client/v3"
)
@ -18,6 +19,11 @@ func main() {
panic(err)
}
r := registry.New(cli)
callGRPC(r)
callHTTP(r)
}
func callGRPC(r *registry.Registry) {
conn, err := grpc.DialInsecure(
context.Background(),
grpc.WithEndpoint("discovery:///helloworld"),
@ -33,3 +39,21 @@ func main() {
}
log.Printf("[grpc] SayHello %+v\n", reply)
}
func callHTTP(r *registry.Registry) {
conn, err := http.NewClient(
context.Background(),
http.WithEndpoint("discovery:///helloworld"),
http.WithDiscovery(r),
http.WithBlock(),
)
if err != nil {
log.Fatal(err)
}
client := helloworld.NewGreeterHTTPClient(conn)
reply, err := client.SayHello(context.Background(), &helloworld.HelloRequest{Name: "kratos"})
if err != nil {
log.Fatal(err)
}
log.Printf("[http] SayHello %+v\n", reply)
}

View File

@ -8,7 +8,9 @@ import (
"github.com/go-kratos/etcd/registry"
pb "github.com/go-kratos/kratos/examples/helloworld/helloworld"
"github.com/go-kratos/kratos/v2"
"github.com/go-kratos/kratos/v2/middleware/recovery"
"github.com/go-kratos/kratos/v2/transport/grpc"
"github.com/go-kratos/kratos/v2/transport/http"
etcd "go.etcd.io/etcd/client/v3"
)
@ -34,13 +36,20 @@ func main() {
grpcSrv := grpc.NewServer(
grpc.Address(":9000"),
)
httpSrv := http.NewServer(
http.Address(":8000"),
http.Middleware(
recovery.Recovery(),
),
)
s := &server{}
pb.RegisterGreeterServer(grpcSrv, s)
pb.RegisterGreeterHTTPServer(httpSrv, s)
app := kratos.New(
kratos.Name("helloworld"),
kratos.Server(
grpcSrv,
httpSrv,
),
kratos.Registrar(r),
)

View File

@ -45,6 +45,7 @@ type clientOptions struct {
balancer balancer.Balancer
discovery registry.Discovery
middleware []middleware.Middleware
block bool
}
// WithTransport with client transport.
@ -119,6 +120,13 @@ func WithBalancer(b balancer.Balancer) ClientOption {
}
}
// WithBlock with client block.
func WithBlock() ClientOption {
return func(o *clientOptions) {
o.block = true
}
}
// Client is an HTTP client.
type Client struct {
opts clientOptions
@ -148,7 +156,7 @@ func NewClient(ctx context.Context, opts ...ClientOption) (*Client, error) {
var r *resolver
if options.discovery != nil {
if target.Scheme == "discovery" {
if r, err = newResolver(ctx, options.discovery, target, options.balancer); err != nil {
if r, err = newResolver(ctx, options.discovery, target, options.balancer, options.block); err != nil {
return nil, fmt.Errorf("[http client] new resolver failed!err: %v", options.endpoint)
}
} else if _, _, err := host.ExtractHostPort(options.endpoint); err != nil {

View File

@ -44,7 +44,7 @@ type resolver struct {
logger *log.Helper
}
func newResolver(ctx context.Context, discovery registry.Discovery, target *Target, updater Updater) (*resolver, error) {
func newResolver(ctx context.Context, discovery registry.Discovery, target *Target, updater Updater, block bool) (*resolver, error) {
watcher, err := discovery.Watch(ctx, target.Endpoint)
if err != nil {
return nil, err
@ -54,18 +54,26 @@ func newResolver(ctx context.Context, discovery registry.Discovery, target *Targ
watcher: watcher,
logger: log.NewHelper(log.DefaultLogger),
}
done := make(chan error, 1)
go func() {
for {
var executed bool
services, err := watcher.Next()
if err != nil {
r.logger.Errorf("http client watch services got unexpected error:=%v", err)
r.logger.Errorf("http client watch service %v got unexpected error:=%v", target, err)
if block {
select {
case done <- err:
default:
}
}
return
}
var nodes []*registry.ServiceInstance
for _, in := range services {
_, endpoint, err := parseEndpoint(in.Endpoints)
if err != nil {
r.logger.Errorf("Failed to parse discovery endpoint: %v error %v", in.Endpoints, err)
r.logger.Errorf("Failed to parse (%v) discovery endpoint: %v error %v", target, in.Endpoints, err)
continue
}
if endpoint == "" {
@ -79,8 +87,25 @@ func newResolver(ctx context.Context, discovery registry.Discovery, target *Targ
r.nodes = nodes
r.lock.Unlock()
}
if block && !executed {
executed = true
done <- nil
}
}
}()
if block {
select {
case e := <-done:
if e != nil {
watcher.Stop()
}
return r, e
case <-ctx.Done():
r.logger.Errorf("http client watch service %v reaching context deadline!", target)
watcher.Stop()
return nil, ctx.Err()
}
}
return r, nil
}