diff --git a/cmd/kratos/internal/proto/server/server.go b/cmd/kratos/internal/proto/server/server.go index a30f3caa1..f3970e7ea 100644 --- a/cmd/kratos/internal/proto/server/server.go +++ b/cmd/kratos/internal/proto/server/server.go @@ -60,7 +60,8 @@ func run(cmd *cobra.Command, args []string) { for _, e := range s.Elements { r, ok := e.(*proto.RPC) if ok { - cs.Methods = append(cs.Methods, &Method{Service: s.Name, Name: r.Name, Request: r.RequestType, Reply: r.ReturnsType}) + cs.Methods = append(cs.Methods, &Method{Service: s.Name, Name: r.Name, Request: r.RequestType, + Reply: r.ReturnsType, Type: getMethodType(r.StreamsRequest, r.StreamsReturns)}) } } res = append(res, cs) @@ -86,3 +87,16 @@ func run(cmd *cobra.Command, args []string) { fmt.Println(to) } } + +func getMethodType(streamsRequest, streamsReturns bool) MethodType { + if !streamsRequest && !streamsReturns { + return unaryType + } else if streamsRequest && streamsReturns { + return twoWayStreamsType + } else if streamsRequest { + return requestStreamsType + } else if streamsReturns { + return returnsStreamsType + } + return unaryType +} diff --git a/cmd/kratos/internal/proto/server/template.go b/cmd/kratos/internal/proto/server/template.go index 343ef2893..54ca81690 100644 --- a/cmd/kratos/internal/proto/server/template.go +++ b/cmd/kratos/internal/proto/server/template.go @@ -10,7 +10,12 @@ var serviceTemplate = ` package service import ( + {{- if .UseContext }} "context" + {{- end }} + {{- if .UseIO }} + "io" + {{- end }} pb "{{ .Package }}" {{- if .GoogleEmpty }} @@ -28,18 +33,74 @@ func New{{ .Service }}Service() *{{ .Service }}Service { {{- $s1 := "google.protobuf.Empty" }} {{ range .Methods }} +{{- if eq .Type 1 }} func (s *{{ .Service }}Service) {{ .Name }}(ctx context.Context, req {{ if eq .Request $s1 }}*emptypb.Empty{{ else }}*pb.{{ .Request }}{{ end }}) ({{ if eq .Reply $s1 }}*emptypb.Empty{{ else }}*pb.{{ .Reply }}{{ end }}, error) { return {{ if eq .Reply $s1 }}&emptypb.Empty{}{{ else }}&pb.{{ .Reply }}{}{{ end }}, nil } + +{{- else if eq .Type 2 }} +func (s *{{ .Service }}Service) {{ .Name }}(conn pb.{{ .Service }}_{{ .Name }}Server) error { + for { + req, err := conn.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + err = conn.Send(&pb.{{ .Reply }}{}) + if err != nil { + return err + } + } +} + +{{- else if eq .Type 3 }} +func (s *{{ .Service }}Service) {{ .Name }}(conn pb.{{ .Service }}_{{ .Name }}Server) error { + for { + req, err := conn.Recv() + if err == io.EOF { + return conn.SendAndClose(&pb.{{ .Reply }}{}) + } + if err != nil { + return err + } + } +} + +{{- else if eq .Type 4 }} +func (s *{{ .Service }}Service) {{ .Name }}(req {{ if eq .Request $s1 }}*emptypb.Empty{{ else }}*pb.{{ .Request }}{{ end }}, conn pb.{{ .Service }}_{{ .Name }}Server) error { + for { + err := conn.Send(&pb.{{ .Reply }}{}) + if err != nil { + return err + } + } +} + +{{- end }} {{- end }} ` +type MethodType uint8 + +const ( + unaryType MethodType = 1 + twoWayStreamsType MethodType = 2 + requestStreamsType MethodType = 3 + returnsStreamsType MethodType = 4 +) + // Service is a proto service. type Service struct { Package string Service string Methods []*Method GoogleEmpty bool + + UseIO bool + UseContext bool } // Method is a proto method. @@ -48,14 +109,24 @@ type Method struct { Name string Request string Reply string + + // type: unary or stream + Type MethodType } func (s *Service) execute() ([]byte, error) { buf := new(bytes.Buffer) for _, method := range s.Methods { - if method.Request == "google.protobuf.Empty" || method.Reply == "google.protobuf.Empty" { + if (method.Type == unaryType && (method.Request == "google.protobuf.Empty" || method.Reply == "google.protobuf.Empty")) || + (method.Type == returnsStreamsType && method.Request == "google.protobuf.Empty") { s.GoogleEmpty = true } + if method.Type == twoWayStreamsType || method.Type == returnsStreamsType { + s.UseIO = true + } + if method.Type == unaryType { + s.UseContext = true + } } tmpl, err := template.New("service").Parse(serviceTemplate) if err != nil { diff --git a/examples/stream/client/main.go b/examples/stream/client/main.go new file mode 100644 index 000000000..5086ae7e8 --- /dev/null +++ b/examples/stream/client/main.go @@ -0,0 +1,103 @@ +package main + +import ( + "context" + "fmt" + "io" + "log" + "strconv" + "sync" + "time" + + "github.com/go-kratos/kratos/examples/stream/hello" + "github.com/go-kratos/kratos/v2/middleware/recovery" + transgrpc "github.com/go-kratos/kratos/v2/transport/grpc" +) + +var wg = sync.WaitGroup{} + +func main() { + conn, err := transgrpc.DialInsecure( + context.Background(), + transgrpc.WithEndpoint("127.0.0.1:9001"), + transgrpc.WithMiddleware( + recovery.Recovery(), + ), + ) + if err != nil { + panic(err) + } + defer conn.Close() + + client := hello.NewHelloClient(conn) + + wg.Add(3) + + go getNumber(client) + go uploadLog(client) + go chat(client) + + wg.Wait() +} + +func getNumber(client hello.HelloClient) { + defer wg.Done() + stream, err := client.GetNumber(context.Background(), &hello.GetNumberRequest{Data: "2021/08/01"}) + if err != nil { + log.Fatal(err) + } + for { + res, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Fatalf("ListStr get stream err: %v", err) + } + // 打印返回值 + log.Println(res.Number) + } +} + +func uploadLog(client hello.HelloClient) { + defer wg.Done() + stream, err := client.UploadLog(context.Background()) + if err != nil { + log.Fatal(err) + } + var number int + for { + err := stream.Send(&hello.UploadLogRequest{Log: "log:" + strconv.Itoa(number)}) + if err != nil { + log.Fatalf("ListStr get stream err: %v", err) + } + time.Sleep(time.Millisecond * 50) + number++ + } + return +} + +func chat(client hello.HelloClient) { + defer wg.Done() + stream, err := client.Chat(context.Background()) + if err != nil { + log.Fatal(err) + } + var number int + for { + err = stream.Send(&hello.ChatRequest{UpMsg: "kratos:" + strconv.Itoa(number)}) + if err != nil { + log.Fatalf("ListStr get stream err: %v", err) + } + res, err := stream.Recv() + if err == io.EOF { + break + } + if err != nil { + log.Fatalf("ListStr get stream err: %v", err) + } + fmt.Println(res.DownMsg) + time.Sleep(time.Millisecond * 50) + number++ + } +} diff --git a/examples/stream/hello/hello.pb.go b/examples/stream/hello/hello.pb.go new file mode 100644 index 000000000..b1b001228 --- /dev/null +++ b/examples/stream/hello/hello.pb.go @@ -0,0 +1,479 @@ +// Code generated by protoc-gen-go. DO NOT EDIT. +// versions: +// protoc-gen-go v1.27.1 +// protoc v3.17.3 +// source: stream/hello/hello.proto + +package hello + +import ( + protoreflect "google.golang.org/protobuf/reflect/protoreflect" + protoimpl "google.golang.org/protobuf/runtime/protoimpl" + reflect "reflect" + sync "sync" +) + +const ( + // Verify that this generated code is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(20 - protoimpl.MinVersion) + // Verify that runtime/protoimpl is sufficiently up-to-date. + _ = protoimpl.EnforceVersion(protoimpl.MaxVersion - 20) +) + +type GetNumberRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Data string `protobuf:"bytes,1,opt,name=data,proto3" json:"data,omitempty"` +} + +func (x *GetNumberRequest) Reset() { + *x = GetNumberRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_stream_hello_hello_proto_msgTypes[0] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetNumberRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetNumberRequest) ProtoMessage() {} + +func (x *GetNumberRequest) ProtoReflect() protoreflect.Message { + mi := &file_stream_hello_hello_proto_msgTypes[0] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetNumberRequest.ProtoReflect.Descriptor instead. +func (*GetNumberRequest) Descriptor() ([]byte, []int) { + return file_stream_hello_hello_proto_rawDescGZIP(), []int{0} +} + +func (x *GetNumberRequest) GetData() string { + if x != nil { + return x.Data + } + return "" +} + +type GetNumberReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Number int64 `protobuf:"varint,1,opt,name=number,proto3" json:"number,omitempty"` +} + +func (x *GetNumberReply) Reset() { + *x = GetNumberReply{} + if protoimpl.UnsafeEnabled { + mi := &file_stream_hello_hello_proto_msgTypes[1] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *GetNumberReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*GetNumberReply) ProtoMessage() {} + +func (x *GetNumberReply) ProtoReflect() protoreflect.Message { + mi := &file_stream_hello_hello_proto_msgTypes[1] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use GetNumberReply.ProtoReflect.Descriptor instead. +func (*GetNumberReply) Descriptor() ([]byte, []int) { + return file_stream_hello_hello_proto_rawDescGZIP(), []int{1} +} + +func (x *GetNumberReply) GetNumber() int64 { + if x != nil { + return x.Number + } + return 0 +} + +type UploadLogRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Log string `protobuf:"bytes,1,opt,name=log,proto3" json:"log,omitempty"` +} + +func (x *UploadLogRequest) Reset() { + *x = UploadLogRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_stream_hello_hello_proto_msgTypes[2] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UploadLogRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UploadLogRequest) ProtoMessage() {} + +func (x *UploadLogRequest) ProtoReflect() protoreflect.Message { + mi := &file_stream_hello_hello_proto_msgTypes[2] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UploadLogRequest.ProtoReflect.Descriptor instead. +func (*UploadLogRequest) Descriptor() ([]byte, []int) { + return file_stream_hello_hello_proto_rawDescGZIP(), []int{2} +} + +func (x *UploadLogRequest) GetLog() string { + if x != nil { + return x.Log + } + return "" +} + +type UploadLogReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + Res string `protobuf:"bytes,1,opt,name=res,proto3" json:"res,omitempty"` +} + +func (x *UploadLogReply) Reset() { + *x = UploadLogReply{} + if protoimpl.UnsafeEnabled { + mi := &file_stream_hello_hello_proto_msgTypes[3] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *UploadLogReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*UploadLogReply) ProtoMessage() {} + +func (x *UploadLogReply) ProtoReflect() protoreflect.Message { + mi := &file_stream_hello_hello_proto_msgTypes[3] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use UploadLogReply.ProtoReflect.Descriptor instead. +func (*UploadLogReply) Descriptor() ([]byte, []int) { + return file_stream_hello_hello_proto_rawDescGZIP(), []int{3} +} + +func (x *UploadLogReply) GetRes() string { + if x != nil { + return x.Res + } + return "" +} + +type ChatRequest struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + UpMsg string `protobuf:"bytes,1,opt,name=up_msg,json=upMsg,proto3" json:"up_msg,omitempty"` +} + +func (x *ChatRequest) Reset() { + *x = ChatRequest{} + if protoimpl.UnsafeEnabled { + mi := &file_stream_hello_hello_proto_msgTypes[4] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ChatRequest) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChatRequest) ProtoMessage() {} + +func (x *ChatRequest) ProtoReflect() protoreflect.Message { + mi := &file_stream_hello_hello_proto_msgTypes[4] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChatRequest.ProtoReflect.Descriptor instead. +func (*ChatRequest) Descriptor() ([]byte, []int) { + return file_stream_hello_hello_proto_rawDescGZIP(), []int{4} +} + +func (x *ChatRequest) GetUpMsg() string { + if x != nil { + return x.UpMsg + } + return "" +} + +type ChatReply struct { + state protoimpl.MessageState + sizeCache protoimpl.SizeCache + unknownFields protoimpl.UnknownFields + + DownMsg string `protobuf:"bytes,1,opt,name=down_msg,json=downMsg,proto3" json:"down_msg,omitempty"` +} + +func (x *ChatReply) Reset() { + *x = ChatReply{} + if protoimpl.UnsafeEnabled { + mi := &file_stream_hello_hello_proto_msgTypes[5] + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + ms.StoreMessageInfo(mi) + } +} + +func (x *ChatReply) String() string { + return protoimpl.X.MessageStringOf(x) +} + +func (*ChatReply) ProtoMessage() {} + +func (x *ChatReply) ProtoReflect() protoreflect.Message { + mi := &file_stream_hello_hello_proto_msgTypes[5] + if protoimpl.UnsafeEnabled && x != nil { + ms := protoimpl.X.MessageStateOf(protoimpl.Pointer(x)) + if ms.LoadMessageInfo() == nil { + ms.StoreMessageInfo(mi) + } + return ms + } + return mi.MessageOf(x) +} + +// Deprecated: Use ChatReply.ProtoReflect.Descriptor instead. +func (*ChatReply) Descriptor() ([]byte, []int) { + return file_stream_hello_hello_proto_rawDescGZIP(), []int{5} +} + +func (x *ChatReply) GetDownMsg() string { + if x != nil { + return x.DownMsg + } + return "" +} + +var File_stream_hello_hello_proto protoreflect.FileDescriptor + +var file_stream_hello_hello_proto_rawDesc = []byte{ + 0x0a, 0x18, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2f, 0x68, + 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x12, 0x0c, 0x73, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x22, 0x26, 0x0a, 0x10, 0x47, 0x65, 0x74, 0x4e, + 0x75, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x12, 0x0a, 0x04, + 0x64, 0x61, 0x74, 0x61, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x64, 0x61, 0x74, 0x61, + 0x22, 0x28, 0x0a, 0x0e, 0x47, 0x65, 0x74, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x12, 0x16, 0x0a, 0x06, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, + 0x28, 0x03, 0x52, 0x06, 0x6e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x22, 0x24, 0x0a, 0x10, 0x55, 0x70, + 0x6c, 0x6f, 0x61, 0x64, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x12, 0x10, + 0x0a, 0x03, 0x6c, 0x6f, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x03, 0x6c, 0x6f, 0x67, + 0x22, 0x22, 0x0a, 0x0e, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x4c, 0x6f, 0x67, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x12, 0x10, 0x0a, 0x03, 0x72, 0x65, 0x73, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, + 0x03, 0x72, 0x65, 0x73, 0x22, 0x24, 0x0a, 0x0b, 0x43, 0x68, 0x61, 0x74, 0x52, 0x65, 0x71, 0x75, + 0x65, 0x73, 0x74, 0x12, 0x15, 0x0a, 0x06, 0x75, 0x70, 0x5f, 0x6d, 0x73, 0x67, 0x18, 0x01, 0x20, + 0x01, 0x28, 0x09, 0x52, 0x05, 0x75, 0x70, 0x4d, 0x73, 0x67, 0x22, 0x26, 0x0a, 0x09, 0x43, 0x68, + 0x61, 0x74, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x12, 0x19, 0x0a, 0x08, 0x64, 0x6f, 0x77, 0x6e, 0x5f, + 0x6d, 0x73, 0x67, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x64, 0x6f, 0x77, 0x6e, 0x4d, + 0x73, 0x67, 0x32, 0xe1, 0x01, 0x0a, 0x05, 0x48, 0x65, 0x6c, 0x6c, 0x6f, 0x12, 0x4b, 0x0a, 0x09, + 0x47, 0x65, 0x74, 0x4e, 0x75, 0x6d, 0x62, 0x65, 0x72, 0x12, 0x1e, 0x2e, 0x73, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x75, 0x6d, 0x62, + 0x65, 0x72, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x4e, 0x75, 0x6d, 0x62, + 0x65, 0x72, 0x52, 0x65, 0x70, 0x6c, 0x79, 0x30, 0x01, 0x12, 0x4b, 0x0a, 0x09, 0x55, 0x70, 0x6c, + 0x6f, 0x61, 0x64, 0x4c, 0x6f, 0x67, 0x12, 0x1e, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, + 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x4c, 0x6f, 0x67, 0x52, + 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x1c, 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, + 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x55, 0x70, 0x6c, 0x6f, 0x61, 0x64, 0x4c, 0x6f, 0x67, 0x52, + 0x65, 0x70, 0x6c, 0x79, 0x28, 0x01, 0x12, 0x3e, 0x0a, 0x04, 0x43, 0x68, 0x61, 0x74, 0x12, 0x19, + 0x2e, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x43, 0x68, + 0x61, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x73, 0x74, 0x72, 0x65, + 0x61, 0x6d, 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x2e, 0x43, 0x68, 0x61, 0x74, 0x52, 0x65, 0x70, + 0x6c, 0x79, 0x28, 0x01, 0x30, 0x01, 0x42, 0x49, 0x0a, 0x0c, 0x73, 0x74, 0x72, 0x65, 0x61, 0x6d, + 0x2e, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x50, 0x01, 0x5a, 0x37, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, + 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x67, 0x6f, 0x2d, 0x6b, 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2f, 0x6b, + 0x72, 0x61, 0x74, 0x6f, 0x73, 0x2f, 0x65, 0x78, 0x61, 0x6d, 0x70, 0x6c, 0x65, 0x73, 0x2f, 0x73, + 0x74, 0x72, 0x65, 0x61, 0x6d, 0x2f, 0x68, 0x65, 0x6c, 0x6c, 0x6f, 0x3b, 0x68, 0x65, 0x6c, 0x6c, + 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, +} + +var ( + file_stream_hello_hello_proto_rawDescOnce sync.Once + file_stream_hello_hello_proto_rawDescData = file_stream_hello_hello_proto_rawDesc +) + +func file_stream_hello_hello_proto_rawDescGZIP() []byte { + file_stream_hello_hello_proto_rawDescOnce.Do(func() { + file_stream_hello_hello_proto_rawDescData = protoimpl.X.CompressGZIP(file_stream_hello_hello_proto_rawDescData) + }) + return file_stream_hello_hello_proto_rawDescData +} + +var file_stream_hello_hello_proto_msgTypes = make([]protoimpl.MessageInfo, 6) +var file_stream_hello_hello_proto_goTypes = []interface{}{ + (*GetNumberRequest)(nil), // 0: stream.hello.GetNumberRequest + (*GetNumberReply)(nil), // 1: stream.hello.GetNumberReply + (*UploadLogRequest)(nil), // 2: stream.hello.UploadLogRequest + (*UploadLogReply)(nil), // 3: stream.hello.UploadLogReply + (*ChatRequest)(nil), // 4: stream.hello.ChatRequest + (*ChatReply)(nil), // 5: stream.hello.ChatReply +} +var file_stream_hello_hello_proto_depIdxs = []int32{ + 0, // 0: stream.hello.Hello.GetNumber:input_type -> stream.hello.GetNumberRequest + 2, // 1: stream.hello.Hello.UploadLog:input_type -> stream.hello.UploadLogRequest + 4, // 2: stream.hello.Hello.Chat:input_type -> stream.hello.ChatRequest + 1, // 3: stream.hello.Hello.GetNumber:output_type -> stream.hello.GetNumberReply + 3, // 4: stream.hello.Hello.UploadLog:output_type -> stream.hello.UploadLogReply + 5, // 5: stream.hello.Hello.Chat:output_type -> stream.hello.ChatReply + 3, // [3:6] is the sub-list for method output_type + 0, // [0:3] is the sub-list for method input_type + 0, // [0:0] is the sub-list for extension type_name + 0, // [0:0] is the sub-list for extension extendee + 0, // [0:0] is the sub-list for field type_name +} + +func init() { file_stream_hello_hello_proto_init() } +func file_stream_hello_hello_proto_init() { + if File_stream_hello_hello_proto != nil { + return + } + if !protoimpl.UnsafeEnabled { + file_stream_hello_hello_proto_msgTypes[0].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetNumberRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_stream_hello_hello_proto_msgTypes[1].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*GetNumberReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_stream_hello_hello_proto_msgTypes[2].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UploadLogRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_stream_hello_hello_proto_msgTypes[3].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*UploadLogReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_stream_hello_hello_proto_msgTypes[4].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ChatRequest); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + file_stream_hello_hello_proto_msgTypes[5].Exporter = func(v interface{}, i int) interface{} { + switch v := v.(*ChatReply); i { + case 0: + return &v.state + case 1: + return &v.sizeCache + case 2: + return &v.unknownFields + default: + return nil + } + } + } + type x struct{} + out := protoimpl.TypeBuilder{ + File: protoimpl.DescBuilder{ + GoPackagePath: reflect.TypeOf(x{}).PkgPath(), + RawDescriptor: file_stream_hello_hello_proto_rawDesc, + NumEnums: 0, + NumMessages: 6, + NumExtensions: 0, + NumServices: 1, + }, + GoTypes: file_stream_hello_hello_proto_goTypes, + DependencyIndexes: file_stream_hello_hello_proto_depIdxs, + MessageInfos: file_stream_hello_hello_proto_msgTypes, + }.Build() + File_stream_hello_hello_proto = out.File + file_stream_hello_hello_proto_rawDesc = nil + file_stream_hello_hello_proto_goTypes = nil + file_stream_hello_hello_proto_depIdxs = nil +} diff --git a/examples/stream/hello/hello_grpc.pb.go b/examples/stream/hello/hello_grpc.pb.go new file mode 100644 index 000000000..11590f889 --- /dev/null +++ b/examples/stream/hello/hello_grpc.pb.go @@ -0,0 +1,266 @@ +// Code generated by protoc-gen-go-grpc. DO NOT EDIT. + +package hello + +import ( + context "context" + grpc "google.golang.org/grpc" + codes "google.golang.org/grpc/codes" + status "google.golang.org/grpc/status" +) + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the grpc package it is being compiled against. +// Requires gRPC-Go v1.32.0 or later. +const _ = grpc.SupportPackageIsVersion7 + +// HelloClient is the client API for Hello service. +// +// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://pkg.go.dev/google.golang.org/grpc/?tab=doc#ClientConn.NewStream. +type HelloClient interface { + GetNumber(ctx context.Context, in *GetNumberRequest, opts ...grpc.CallOption) (Hello_GetNumberClient, error) + UploadLog(ctx context.Context, opts ...grpc.CallOption) (Hello_UploadLogClient, error) + Chat(ctx context.Context, opts ...grpc.CallOption) (Hello_ChatClient, error) +} + +type helloClient struct { + cc grpc.ClientConnInterface +} + +func NewHelloClient(cc grpc.ClientConnInterface) HelloClient { + return &helloClient{cc} +} + +func (c *helloClient) GetNumber(ctx context.Context, in *GetNumberRequest, opts ...grpc.CallOption) (Hello_GetNumberClient, error) { + stream, err := c.cc.NewStream(ctx, &Hello_ServiceDesc.Streams[0], "/stream.hello.Hello/GetNumber", opts...) + if err != nil { + return nil, err + } + x := &helloGetNumberClient{stream} + if err := x.ClientStream.SendMsg(in); err != nil { + return nil, err + } + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + return x, nil +} + +type Hello_GetNumberClient interface { + Recv() (*GetNumberReply, error) + grpc.ClientStream +} + +type helloGetNumberClient struct { + grpc.ClientStream +} + +func (x *helloGetNumberClient) Recv() (*GetNumberReply, error) { + m := new(GetNumberReply) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *helloClient) UploadLog(ctx context.Context, opts ...grpc.CallOption) (Hello_UploadLogClient, error) { + stream, err := c.cc.NewStream(ctx, &Hello_ServiceDesc.Streams[1], "/stream.hello.Hello/UploadLog", opts...) + if err != nil { + return nil, err + } + x := &helloUploadLogClient{stream} + return x, nil +} + +type Hello_UploadLogClient interface { + Send(*UploadLogRequest) error + CloseAndRecv() (*UploadLogReply, error) + grpc.ClientStream +} + +type helloUploadLogClient struct { + grpc.ClientStream +} + +func (x *helloUploadLogClient) Send(m *UploadLogRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *helloUploadLogClient) CloseAndRecv() (*UploadLogReply, error) { + if err := x.ClientStream.CloseSend(); err != nil { + return nil, err + } + m := new(UploadLogReply) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func (c *helloClient) Chat(ctx context.Context, opts ...grpc.CallOption) (Hello_ChatClient, error) { + stream, err := c.cc.NewStream(ctx, &Hello_ServiceDesc.Streams[2], "/stream.hello.Hello/Chat", opts...) + if err != nil { + return nil, err + } + x := &helloChatClient{stream} + return x, nil +} + +type Hello_ChatClient interface { + Send(*ChatRequest) error + Recv() (*ChatReply, error) + grpc.ClientStream +} + +type helloChatClient struct { + grpc.ClientStream +} + +func (x *helloChatClient) Send(m *ChatRequest) error { + return x.ClientStream.SendMsg(m) +} + +func (x *helloChatClient) Recv() (*ChatReply, error) { + m := new(ChatReply) + if err := x.ClientStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// HelloServer is the service API for Hello service. +// All implementations must embed UnimplementedHelloServer +// for forward compatibility +type HelloServer interface { + GetNumber(*GetNumberRequest, Hello_GetNumberServer) error + UploadLog(Hello_UploadLogServer) error + Chat(Hello_ChatServer) error + mustEmbedUnimplementedHelloServer() +} + +// UnimplementedHelloServer must be embedded to have forward compatible implementations. +type UnimplementedHelloServer struct { +} + +func (UnimplementedHelloServer) GetNumber(*GetNumberRequest, Hello_GetNumberServer) error { + return status.Errorf(codes.Unimplemented, "method GetNumber not implemented") +} +func (UnimplementedHelloServer) UploadLog(Hello_UploadLogServer) error { + return status.Errorf(codes.Unimplemented, "method UploadLog not implemented") +} +func (UnimplementedHelloServer) Chat(Hello_ChatServer) error { + return status.Errorf(codes.Unimplemented, "method Chat not implemented") +} +func (UnimplementedHelloServer) mustEmbedUnimplementedHelloServer() {} + +// UnsafeHelloServer may be embedded to opt out of forward compatibility for this service. +// Use of this interface is not recommended, as added methods to HelloServer will +// result in compilation errors. +type UnsafeHelloServer interface { + mustEmbedUnimplementedHelloServer() +} + +func RegisterHelloServer(s grpc.ServiceRegistrar, srv HelloServer) { + s.RegisterService(&Hello_ServiceDesc, srv) +} + +func _Hello_GetNumber_Handler(srv interface{}, stream grpc.ServerStream) error { + m := new(GetNumberRequest) + if err := stream.RecvMsg(m); err != nil { + return err + } + return srv.(HelloServer).GetNumber(m, &helloGetNumberServer{stream}) +} + +type Hello_GetNumberServer interface { + Send(*GetNumberReply) error + grpc.ServerStream +} + +type helloGetNumberServer struct { + grpc.ServerStream +} + +func (x *helloGetNumberServer) Send(m *GetNumberReply) error { + return x.ServerStream.SendMsg(m) +} + +func _Hello_UploadLog_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HelloServer).UploadLog(&helloUploadLogServer{stream}) +} + +type Hello_UploadLogServer interface { + SendAndClose(*UploadLogReply) error + Recv() (*UploadLogRequest, error) + grpc.ServerStream +} + +type helloUploadLogServer struct { + grpc.ServerStream +} + +func (x *helloUploadLogServer) SendAndClose(m *UploadLogReply) error { + return x.ServerStream.SendMsg(m) +} + +func (x *helloUploadLogServer) Recv() (*UploadLogRequest, error) { + m := new(UploadLogRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +func _Hello_Chat_Handler(srv interface{}, stream grpc.ServerStream) error { + return srv.(HelloServer).Chat(&helloChatServer{stream}) +} + +type Hello_ChatServer interface { + Send(*ChatReply) error + Recv() (*ChatRequest, error) + grpc.ServerStream +} + +type helloChatServer struct { + grpc.ServerStream +} + +func (x *helloChatServer) Send(m *ChatReply) error { + return x.ServerStream.SendMsg(m) +} + +func (x *helloChatServer) Recv() (*ChatRequest, error) { + m := new(ChatRequest) + if err := x.ServerStream.RecvMsg(m); err != nil { + return nil, err + } + return m, nil +} + +// Hello_ServiceDesc is the grpc.ServiceDesc for Hello service. +// It's only intended for direct use with grpc.RegisterService, +// and not to be introspected or modified (even as a copy) +var Hello_ServiceDesc = grpc.ServiceDesc{ + ServiceName: "stream.hello.Hello", + HandlerType: (*HelloServer)(nil), + Methods: []grpc.MethodDesc{}, + Streams: []grpc.StreamDesc{ + { + StreamName: "GetNumber", + Handler: _Hello_GetNumber_Handler, + ServerStreams: true, + }, + { + StreamName: "UploadLog", + Handler: _Hello_UploadLog_Handler, + ClientStreams: true, + }, + { + StreamName: "Chat", + Handler: _Hello_Chat_Handler, + ServerStreams: true, + ClientStreams: true, + }, + }, + Metadata: "stream/hello/hello.proto", +} diff --git a/examples/stream/server/main.go b/examples/stream/server/main.go new file mode 100644 index 000000000..b25612175 --- /dev/null +++ b/examples/stream/server/main.go @@ -0,0 +1,31 @@ +package main + +import ( + "log" + + "github.com/go-kratos/kratos/examples/stream/hello" + "github.com/go-kratos/kratos/examples/stream/service" + "github.com/go-kratos/kratos/v2" + "github.com/go-kratos/kratos/v2/middleware/recovery" + "github.com/go-kratos/kratos/v2/transport/grpc" +) + +func main() { + grpcSrv := grpc.NewServer( + grpc.Address(":9001"), + grpc.Middleware( + recovery.Recovery(), + ), + ) + hello.RegisterHelloServer(grpcSrv, service.NewHelloService()) + + app := kratos.New( + kratos.Name("hello"), + kratos.Server( + grpcSrv, + ), + ) + if err := app.Run(); err != nil { + log.Fatal(err) + } +} diff --git a/examples/stream/service/hello.go b/examples/stream/service/hello.go new file mode 100644 index 000000000..b0b531aef --- /dev/null +++ b/examples/stream/service/hello.go @@ -0,0 +1,58 @@ +package service + +import ( + "fmt" + "io" + "time" + + pb "github.com/go-kratos/kratos/examples/stream/hello" +) + +type HelloService struct { + pb.UnimplementedHelloServer +} + +func NewHelloService() *HelloService { + return &HelloService{} +} + +func (s *HelloService) GetNumber(req *pb.GetNumberRequest, conn pb.Hello_GetNumberServer) error { + var number int64 + for { + fmt.Println(req.Data) + err := conn.Send(&pb.GetNumberReply{Number: number}) + if err != nil { + return err + } + number++ + time.Sleep(time.Second) + } +} +func (s *HelloService) UploadLog(conn pb.Hello_UploadLogServer) error { + for { + req, err := conn.Recv() + if err == io.EOF { + return conn.SendAndClose(&pb.UploadLogReply{Res: "ok"}) + } + if err != nil { + return err + } + fmt.Println(req.Log) + } +} +func (s *HelloService) Chat(conn pb.Hello_ChatServer) error { + for { + req, err := conn.Recv() + if err == io.EOF { + return nil + } + if err != nil { + return err + } + + err = conn.Send(&pb.ChatReply{DownMsg: "hello " + req.UpMsg}) + if err != nil { + return err + } + } +}