From 3cf90d78254dc3b760c0b19c755870d2c87b340a Mon Sep 17 00:00:00 2001 From: Asim Date: Sun, 8 Nov 2015 23:16:14 +0000 Subject: [PATCH] Use direct reply queue --- transport/rabbitmq/connection.go | 17 ++++++----------- transport/rabbitmq/rabbitmq.go | 8 +++++--- 2 files changed, 11 insertions(+), 14 deletions(-) diff --git a/transport/rabbitmq/connection.go b/transport/rabbitmq/connection.go index 296e8d50..a9da4df9 100644 --- a/transport/rabbitmq/connection.go +++ b/transport/rabbitmq/connection.go @@ -18,12 +18,11 @@ var ( ) type rabbitMQConn struct { - Connection *amqp.Connection - Channel *rabbitMQChannel - ExchangeChannel *rabbitMQChannel - notify chan bool - exchange string - url string + Connection *amqp.Connection + Channel *rabbitMQChannel + notify chan bool + exchange string + url string connected bool @@ -111,10 +110,6 @@ func (r *rabbitMQConn) tryToConnect() error { return err } r.Channel.DeclareExchange(r.exchange) - r.ExchangeChannel, err = newRabbitChannel(r.Connection) - if err != nil { - return err - } return nil } @@ -143,5 +138,5 @@ func (r *rabbitMQConn) Consume(queue string) (<-chan amqp.Delivery, error) { } func (r *rabbitMQConn) Publish(exchange, key string, msg amqp.Publishing) error { - return r.ExchangeChannel.Publish(exchange, key, msg) + return r.Channel.Publish(exchange, key, msg) } diff --git a/transport/rabbitmq/rabbitmq.go b/transport/rabbitmq/rabbitmq.go index 6b8d9e52..c1081751 100644 --- a/transport/rabbitmq/rabbitmq.go +++ b/transport/rabbitmq/rabbitmq.go @@ -12,6 +12,10 @@ import ( "github.com/myodc/go-micro/transport" ) +const ( + directReplyQueue = "amq.rabbitmq.reply-to" +) + type rmqtport struct { conn *rabbitMQConn addrs []string @@ -236,12 +240,10 @@ func (r *rmqtport) Listen(addr string) (transport.Listener, error) { } func NewTransport(addrs []string, opt ...transport.Option) transport.Transport { - id, _ := uuid.NewV4() - return &rmqtport{ conn: newRabbitMQConn("", addrs), addrs: addrs, - replyTo: id.String(), + replyTo: directReplyQueue, inflight: make(map[string]chan amqp.Delivery), } }