mirror of
https://github.com/go-micro/go-micro.git
synced 2025-09-16 08:36:30 +02:00
Add header suppor for Kafka broker plugin; (#2470)
This commit is contained in:
@@ -80,6 +80,21 @@ func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, cl
|
||||
continue
|
||||
}
|
||||
|
||||
if p.m.Body == nil {
|
||||
p.m.Body = msg.Value
|
||||
}
|
||||
// if we don't have headers, create empty map
|
||||
if m.Header == nil {
|
||||
m.Header = make(map[string]string)
|
||||
}
|
||||
for _, header := range msg.Headers {
|
||||
m.Header[string(header.Key)] = string(header.Value)
|
||||
}
|
||||
m.Header["Micro-Topic"] = msg.Topic // only for RPC server, it somehow inspect Header for topic
|
||||
if _, ok := m.Header["Content-Type"]; !ok {
|
||||
m.Header["Content-Type"] = "application/json" // default to json codec
|
||||
}
|
||||
|
||||
err := h.handler(p)
|
||||
if err == nil && h.subopts.AutoAck {
|
||||
sess.MarkMessage(msg, "")
|
||||
|
Reference in New Issue
Block a user