diff --git a/plugins/broker/kafka/options.go b/plugins/broker/kafka/options.go index 986d5f85..6a46fa86 100644 --- a/plugins/broker/kafka/options.go +++ b/plugins/broker/kafka/options.go @@ -80,6 +80,21 @@ func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, cl continue } + if p.m.Body == nil { + p.m.Body = msg.Value + } + // if we don't have headers, create empty map + if m.Header == nil { + m.Header = make(map[string]string) + } + for _, header := range msg.Headers { + m.Header[string(header.Key)] = string(header.Value) + } + m.Header["Micro-Topic"] = msg.Topic // only for RPC server, it somehow inspect Header for topic + if _, ok := m.Header["Content-Type"]; !ok { + m.Header["Content-Type"] = "application/json" // default to json codec + } + err := h.handler(p) if err == nil && h.subopts.AutoAck { sess.MarkMessage(msg, "")