From b6d005b21e948bca4605c29db343bdda4a6b1b59 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=8C=85=E5=AD=90?= Date: Thu, 22 Jul 2021 18:03:19 +0800 Subject: [PATCH] chore(examples/event): examples of increasing use of event (#1228) * chore(examples/event): examples of increasing use of event --- examples/event/event/event.go | 21 +++++ examples/event/kafka/kafka.go | 134 ++++++++++++++++++++++++++++++++ examples/event/receiver/main.go | 37 +++++++++ examples/event/sender/main.go | 33 ++++++++ examples/go.mod | 1 + examples/go.sum | 16 ++++ 6 files changed, 242 insertions(+) create mode 100644 examples/event/event/event.go create mode 100644 examples/event/kafka/kafka.go create mode 100644 examples/event/receiver/main.go create mode 100644 examples/event/sender/main.go diff --git a/examples/event/event/event.go b/examples/event/event/event.go new file mode 100644 index 000000000..65bd560f8 --- /dev/null +++ b/examples/event/event/event.go @@ -0,0 +1,21 @@ +package event + +import "context" + +type Message interface { + Key() string + Value() []byte + Header() map[string]string +} + +type Handler func(context.Context, Message) error + +type Sender interface { + Send(ctx context.Context, msg Message) error + Close() error +} + +type Receiver interface { + Receive(ctx context.Context, handler Handler) error + Close() error +} diff --git a/examples/event/kafka/kafka.go b/examples/event/kafka/kafka.go new file mode 100644 index 000000000..65730469e --- /dev/null +++ b/examples/event/kafka/kafka.go @@ -0,0 +1,134 @@ +package kafka + +import ( + "context" + "github.com/go-kratos/kratos/examples/event/event" + "github.com/segmentio/kafka-go" + "github.com/segmentio/kafka-go/protocol" + "log" +) + +var _ event.Sender = (*kafkaSender)(nil) +var _ event.Receiver = (*kafkaReceiver)(nil) +var _ event.Message = (*Message)(nil) + +type Message struct { + key string + value []byte + header map[string]string +} + +func (m *Message) Key() string { + return m.key +} +func (m *Message) Value() []byte { + return m.value +} +func (m *Message) Header() map[string]string { + return m.header +} + +func NewMessage(key string, value []byte, header map[string]string) event.Message { + return &Message{ + key: key, + value: value, + header: header, + } +} + +type kafkaSender struct { + writer *kafka.Writer + topic string +} + +func (s *kafkaSender) Send(ctx context.Context, message event.Message) error { + var h []kafka.Header + if len(message.Header()) > 0 { + for k, v := range message.Header() { + h = append(h, protocol.Header{ + Key: k, + Value: []byte(v), + }) + } + } + err := s.writer.WriteMessages(ctx, kafka.Message{ + Key: []byte(message.Key()), + Value: message.Value(), + Headers: h, + }) + if err != nil { + return err + } + return nil +} + +func (s *kafkaSender) Close() error { + err := s.writer.Close() + if err != nil { + return err + } + return nil +} + +func NewKafkaSender(address []string, topic string) (event.Sender, error) { + + w := &kafka.Writer{ + Topic: topic, + Addr: kafka.TCP(address...), + Balancer: &kafka.LeastBytes{}, + } + return &kafkaSender{writer: w, topic: topic}, nil +} + +type kafkaReceiver struct { + reader *kafka.Reader + topic string +} + +func (k *kafkaReceiver) Receive(ctx context.Context, handler event.Handler) error { + go func() { + for { + m, err := k.reader.FetchMessage(context.Background()) + if err != nil { + break + } + h := make(map[string]string) + if len(m.Headers) > 0 { + for _, header := range m.Headers { + h[header.Key] = string(header.Value) + } + } + err = handler(context.Background(), &Message{ + key: string(m.Key), + value: m.Value, + header: h, + }) + if err != nil { + log.Fatal("message handling exception:", err) + } + if err := k.reader.CommitMessages(ctx, m); err != nil { + log.Fatal("failed to commit messages:", err) + } + } + }() + return nil +} + +func (k *kafkaReceiver) Close() error { + err := k.reader.Close() + if err != nil { + return err + } + return nil +} + +func NewKafkaReceiver(address []string, topic string) (event.Receiver, error) { + r := kafka.NewReader(kafka.ReaderConfig{ + Brokers: address, + GroupID: "group-a", + Topic: topic, + MinBytes: 10e3, // 10KB + MaxBytes: 10e6, // 10MB + }) + return &kafkaReceiver{reader: r, topic: topic}, nil +} diff --git a/examples/event/receiver/main.go b/examples/event/receiver/main.go new file mode 100644 index 000000000..ae0435059 --- /dev/null +++ b/examples/event/receiver/main.go @@ -0,0 +1,37 @@ +package main + +import ( + "context" + "fmt" + "os" + "os/signal" + "syscall" + + "github.com/go-kratos/kratos/examples/event/event" + "github.com/go-kratos/kratos/examples/event/kafka" +) + +func main() { + sigs := make(chan os.Signal, 1) + signal.Notify(sigs, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM, syscall.SIGQUIT) + receiver, err := kafka.NewKafkaReceiver([]string{"localhost:9092"}, "kratos") + if err != nil { + panic(err) + } + receive(receiver) + select { + case <-sigs: + _ = receiver.Close() + } +} + +func receive(receiver event.Receiver) { + fmt.Println("start receiver") + err := receiver.Receive(context.Background(), func(ctx context.Context, message event.Message) error { + fmt.Printf("key:%s, value:%s, header:%s\n", message.Key(), message.Value(), message.Header()) + return nil + }) + if err != nil { + return + } +} diff --git a/examples/event/sender/main.go b/examples/event/sender/main.go new file mode 100644 index 000000000..07fffe4ae --- /dev/null +++ b/examples/event/sender/main.go @@ -0,0 +1,33 @@ +package main + +import ( + "context" + "fmt" + "github.com/go-kratos/kratos/examples/event/event" + "github.com/go-kratos/kratos/examples/event/kafka" +) + +func main() { + sender, err := kafka.NewKafkaSender([]string{"localhost:9092"}, "kratos") + if err != nil { + panic(err) + } + + for i := 0; i < 50; i++ { + send(sender) + } + + _ = sender.Close() +} + +func send(sender event.Sender) { + msg := kafka.NewMessage("kratos", []byte("hello world"), map[string]string{ + "user": "kratos", + "phone": "123456", + }) + err := sender.Send(context.Background(), msg) + if err != nil { + panic(err) + } + fmt.Printf("key:%s, value:%s, header:%s\n", msg.Key(), msg.Value(), msg.Header()) +} diff --git a/examples/go.mod b/examples/go.mod index f001540d2..4a15f8a6a 100644 --- a/examples/go.mod +++ b/examples/go.mod @@ -28,6 +28,7 @@ require ( github.com/nacos-group/nacos-sdk-go v1.0.7 github.com/nicksnyder/go-i18n/v2 v2.1.2 github.com/prometheus/client_golang v1.9.0 + github.com/segmentio/kafka-go v0.4.17 github.com/sirupsen/logrus v1.8.1 github.com/stretchr/testify v1.7.0 go.etcd.io/etcd/client/v3 v3.5.0-beta.4 diff --git a/examples/go.sum b/examples/go.sum index 535639015..fffa0fe8b 100644 --- a/examples/go.sum +++ b/examples/go.sum @@ -126,6 +126,7 @@ github.com/dgryski/go-sip13 v0.0.0-20181026042036-e10d5fee7954/go.mod h1:vAd38F8 github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk= github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs= +github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21 h1:YEetp8/yCZMuEPMUDHG0CW/brkkEp8mzqk2+ODEitlw= github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU= github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I= github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M= @@ -149,6 +150,8 @@ github.com/felixge/httpsnoop v1.0.1 h1:lvB5Jl89CsZtGIWuTcDM1E/vkVs49/Ml7JJe07l8S github.com/felixge/httpsnoop v1.0.1/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/franela/goblin v0.0.0-20200105215937-c9ffbefa60db/go.mod h1:7dvUGVsVBjqR7JHJk0brhHOZYGmfBYOrK0ZhYMEtBr4= github.com/franela/goreq v0.0.0-20171204163338-bcd34c9993f8/go.mod h1:ZhphrRTfi2rbfLwlschooIH4+wKKDR4Pdxhh+TRoA20= +github.com/frankban/quicktest v1.11.3 h1:8sXhOn0uLys67V8EsXLc6eszDs8VXWxL3iRvebPhedY= +github.com/frankban/quicktest v1.11.3/go.mod h1:wRf/ReqHper53s+kmmSZizM8NamnL3IM0I9ntUbOk+k= github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo= github.com/fsnotify/fsnotify v1.4.9 h1:hsms1Qyu0jgnwNXIxa+/V/PDsU6CfLf6CNO8H7IWoS4= github.com/fsnotify/fsnotify v1.4.9/go.mod h1:znqG4EE+3YCdAaPaxE2ZRY/06pZUdp0tY4IgpuI1SZQ= @@ -257,6 +260,8 @@ github.com/golang/protobuf v1.5.0/go.mod h1:FsONVRAS9T7sI+LIUmWTfcYkHO4aIWwzhcaS github.com/golang/protobuf v1.5.2 h1:ROPKBNFfQgOUMifHyP+KYbvpjbdoFNs+aK7DXlji0Tw= github.com/golang/protobuf v1.5.2/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/golang/snappy v0.0.0-20180518054509-2e65f85255db/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= +github.com/golang/snappy v0.0.1 h1:Qgr9rKW7uDUkrbSmQeiDsGa8SjGyCOGtuasMWwvp2P4= +github.com/golang/snappy v0.0.1/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0 h1:0udJVsspx3VBr5FwtLhQQtuAsVc79tTq0ocGIPAU6qo= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -401,6 +406,8 @@ github.com/kisielk/errcheck v1.1.0/go.mod h1:EZBBE59ingxPouuu3KfxchcWSUPOHkagtvW github.com/kisielk/errcheck v1.2.0/go.mod h1:/BMXB+zMLi60iA8Vv6Ksmxu/1UDYcXs4uQLJ+jE2L00= github.com/kisielk/errcheck v1.5.0/go.mod h1:pFxgyoBC7bSaBwPgfKdkLd5X25qrDl4LWUI2bnpBCr8= github.com/kisielk/gotool v1.0.0/go.mod h1:XhKaO+MFFWcvkIS/tQcRk01m1F5IRFswLeQ+oQHNcck= +github.com/klauspost/compress v1.9.8 h1:VMAMUUOh+gaxKTMk+zqbjsSjsIcUcL/LF4o63i82QyA= +github.com/klauspost/compress v1.9.8/go.mod h1:RyIbtBH6LamlWaDj8nUwkbUhJ87Yi3uG0guNDohfE1A= github.com/konsorten/go-windows-terminal-sequences v1.0.1/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/konsorten/go-windows-terminal-sequences v1.0.3/go.mod h1:T0+1ngSBFLxvqU3pZ+m/2kptfBszLMUkC4ZK/EgS/cQ= github.com/kr/fs v0.1.0/go.mod h1:FFnZGqtBN9Gxj7eW1uZ42v5BccTP0vu6NEaFoC2HwRg= @@ -527,6 +534,8 @@ github.com/pelletier/go-toml v1.2.0/go.mod h1:5z9KED0ma1S8pY6P1sdut58dfprrGBbd/9 github.com/performancecopilot/speed v3.0.0+incompatible/go.mod h1:/CLtqpZ5gBg1M9iaPbIdPPGyKcA8hKdoy6hAWba7Yac= github.com/pierrec/lz4 v1.0.2-0.20190131084431-473cd7ce01a1/go.mod h1:3/3N9NVKO0jef7pBehbT1qWhCMrIgbYNnFAZCqQ5LRc= github.com/pierrec/lz4 v2.0.5+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= +github.com/pierrec/lz4 v2.6.0+incompatible h1:Ix9yFKn1nSPBLFl/yZknTp8TU5G4Ps0JDmguYK6iH1A= +github.com/pierrec/lz4 v2.6.0+incompatible/go.mod h1:pdkljMzZIN41W+lC3N2tnIh5sFi+IEE17M5jbnwPHcY= github.com/pkg/errors v0.8.0/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.8.1/go.mod h1:bwawxfHBFNV+L2hUp1rHADufV3IMtnDRdf1r5NINEl0= github.com/pkg/errors v0.9.1 h1:FEBLx1zS214owpjy7qsBeixbURkuhQAwrK5UwLGTwt4= @@ -584,6 +593,8 @@ github.com/ryanuber/columnize v2.1.0+incompatible/go.mod h1:sm1tb6uqfes/u+d4ooFo github.com/samuel/go-zookeeper v0.0.0-20190923202752-2cc03de413da/go.mod h1:gi+0XIa01GRL2eRQVjQkKGqKF3SF9vZR/HnPullcV2E= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUtVbo7ada43DJhG55ua/hjS5I= github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= +github.com/segmentio/kafka-go v0.4.17 h1:IyqRstL9KUTDb3kyGPOOa5VffokKWSEzN6geJ92dSDY= +github.com/segmentio/kafka-go v0.4.17/go.mod h1:19+Eg7KwrNKy/PFhiIthEPkO8k+ac7/ZYXwYM9Df10w= github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeVRXNmyL/1OwPU0+IJeTBvfc= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= @@ -644,6 +655,10 @@ github.com/valyala/bytebufferpool v1.0.0/go.mod h1:6bBcMArwyJ5K/AmCkWv1jt77kVWyC github.com/valyala/fasttemplate v1.0.1/go.mod h1:UQGH1tvbgY+Nz5t2n7tXsz52dQxojPUpymEIMZ47gx8= github.com/valyala/fasttemplate v1.2.1 h1:TVEnxayobAdVkhQfrfes2IzOB6o+z4roRkPF52WA1u4= github.com/valyala/fasttemplate v1.2.1/go.mod h1:KHLXt3tVN2HBp8eijSv/kGJopbvo7S+qRAEEKiv+SiQ= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c h1:u40Z8hqBAAQyv+vATcGgV0YCnDjqSL7/q/JyPhhJSPk= +github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c/go.mod h1:lB8K/P019DLNhemzwFU4jHLhdvlE6uDZjXFejJXr49I= +github.com/xdg/stringprep v1.0.0 h1:d9X0esnoa3dFsV0FG35rAT0RIhYFlPq7MiP+DW89La0= +github.com/xdg/stringprep v1.0.0/go.mod h1:Jhud4/sHMO4oL310DaZAKk9ZaJ08SJfe+sJh0HrGL1Y= github.com/xiang90/probing v0.0.0-20190116061207-43a291ad63a2/go.mod h1:UETIi67q53MR2AWcXfiuqkDkRtnGDLqkBTpCHuJHxtU= github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= github.com/yuin/goldmark v1.1.27/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74= @@ -721,6 +736,7 @@ go.uber.org/zap v1.18.1/go.mod h1:xg/QME4nWcxGxrpdeYfq7UvYrLh66cuVKdrbD1XF/NI= golang.org/x/crypto v0.0.0-20180904163835-0709b304e793/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20181029021203-45a5f77698d3/go.mod h1:6SG95UA2DQfeDnfUPMdvaQW0Q7yPrPDi9nlGo2tz2b4= golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w= +golang.org/x/crypto v0.0.0-20190506204251-e1dfcc566284/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190510104115-cbcb75029529/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190605123033-f99c8df09eb5/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI= golang.org/x/crypto v0.0.0-20190701094942-4def268fd1a4/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=