diff --git a/plugins/broker/rabbitmq/options.go b/plugins/broker/rabbitmq/options.go index 97c45b58..b45f6353 100644 --- a/plugins/broker/rabbitmq/options.go +++ b/plugins/broker/rabbitmq/options.go @@ -2,6 +2,7 @@ package rabbitmq import ( "context" + "time" "github.com/asim/go-micro/v3/broker" ) @@ -15,6 +16,16 @@ type exchangeKey struct{} type requeueOnErrorKey struct{} type deliveryMode struct{} type priorityKey struct{} +type contentType struct{} +type contentEncoding struct{} +type correlationID struct{} +type replyTo struct{} +type expiration struct{} +type messageID struct{} +type timestamp struct{} +type typeMsg struct{} +type userID struct{} +type appID struct{} type externalAuth struct{} type durableExchange struct{} @@ -68,6 +79,56 @@ func Priority(value uint8) broker.PublishOption { return setPublishOption(priorityKey{}, value) } +// ContentType sets a property MIME content type for publishing +func ContentType(value string) broker.PublishOption { + return setPublishOption(contentType{}, value) +} + +// ContentEncoding sets a property MIME content encoding for publishing +func ContentEncoding(value string) broker.PublishOption { + return setPublishOption(contentEncoding{}, value) +} + +// CorrelationID sets a property correlation ID for publishing +func CorrelationID(value string) broker.PublishOption { + return setPublishOption(correlationID{}, value) +} + +// ReplyTo sets a property address to to reply to (ex: RPC) for publishing +func ReplyTo(value string) broker.PublishOption { + return setPublishOption(replyTo{}, value) +} + +// Expiration sets a property message expiration spec for publishing +func Expiration(value string) broker.PublishOption { + return setPublishOption(expiration{}, value) +} + +// MessageId sets a property message identifier for publishing +func MessageId(value string) broker.PublishOption { + return setPublishOption(messageID{}, value) +} + +// Timestamp sets a property message timestamp for publishing +func Timestamp(value time.Time) broker.PublishOption { + return setPublishOption(timestamp{}, value) +} + +// TypeMsg sets a property message type name for publishing +func TypeMsg(value string) broker.PublishOption { + return setPublishOption(typeMsg{}, value) +} + +// UserID sets a property user id for publishing +func UserID(value string) broker.PublishOption { + return setPublishOption(userID{}, value) +} + +// AppID sets a property application id for publishing +func AppID(value string) broker.PublishOption { + return setPublishOption(appID{}, value) +} + func ExternalAuth() broker.Option { return setBrokerOption(externalAuth{}, ExternalAuthentication{}) } diff --git a/plugins/broker/rabbitmq/rabbitmq.go b/plugins/broker/rabbitmq/rabbitmq.go index 54ab5a61..4acc0f49 100644 --- a/plugins/broker/rabbitmq/rabbitmq.go +++ b/plugins/broker/rabbitmq/rabbitmq.go @@ -162,6 +162,47 @@ func (r *rbroker) Publish(topic string, msg *broker.Message, opts ...broker.Publ if value, ok := options.Context.Value(priorityKey{}).(uint8); ok { m.Priority = value } + + if value, ok := options.Context.Value(contentType{}).(string); ok { + m.ContentType = value + } + + if value, ok := options.Context.Value(contentEncoding{}).(string); ok { + m.ContentEncoding = value + } + + if value, ok := options.Context.Value(correlationID{}).(string); ok { + m.CorrelationId = value + } + + if value, ok := options.Context.Value(replyTo{}).(string); ok { + m.ReplyTo = value + } + + if value, ok := options.Context.Value(expiration{}).(string); ok { + m.Expiration = value + } + + if value, ok := options.Context.Value(messageID{}).(string); ok { + m.MessageId = value + } + + if value, ok := options.Context.Value(timestamp{}).(time.Time); ok { + m.Timestamp = value + } + + if value, ok := options.Context.Value(typeMsg{}).(string); ok { + m.Type = value + } + + if value, ok := options.Context.Value(userID{}).(string); ok { + m.UserId = value + } + + if value, ok := options.Context.Value(appID{}).(string); ok { + m.AppId = value + } + } for k, v := range msg.Header {