diff --git a/cmd/backend/backend.go b/cmd/backend/backend.go index ff6e5e4..b04aac8 100644 --- a/cmd/backend/backend.go +++ b/cmd/backend/backend.go @@ -32,14 +32,14 @@ func main() { log.Fatalf("declare exchange: %s", err) } - // Start a Rabbit consumer with a message processing handler. - conn.StartConsumer(conf.Exchange, conf.QueueBack, conf.KeyBack, receiveMessages) + // Start a Rabbit consumer with a handler for printing messages. + conn.StartConsumer(conf.Exchange, conf.QueueBack, conf.KeyBack, printMessages) publishInput(conn) } -// receiveMessages prints messages to stdout. -func receiveMessages(d amqp.Delivery) bool { +// printMessages prints messages to stdout. +func printMessages(d amqp.Delivery) bool { var message models.Message err := json.Unmarshal(d.Body, &message) if err != nil { @@ -55,7 +55,6 @@ func receiveMessages(d amqp.Delivery) bool { // a Rabbit exchange with the front-end and database routing keys. func publishInput(conn *rabbit.Conn) { reader := bufio.NewReader(os.Stdin) - for { input, _ := reader.ReadString('\n') input = strings.TrimSpace(input) @@ -68,6 +67,7 @@ func publishInput(conn *rabbit.Conn) { if err != nil { log.Fatalf("marshal message: %s", err) } + err = conn.Publish(conf.Exchange, conf.KeyFront+"."+conf.KeyDB, message) if err != nil { log.Fatalf("publish message: %s", err) diff --git a/internal/rabbit/consume.go b/internal/rabbit/consume.go index e63c483..a9542ec 100644 --- a/internal/rabbit/consume.go +++ b/internal/rabbit/consume.go @@ -13,6 +13,7 @@ import ( // The queue is created (or connected to, if exists) and bound to an exchange. // Used for durable queues in the main go routine. func (conn *Conn) StartConsumer(exch, qName, rKey string, handler func(amqp.Delivery) bool) error { + // Declare a durable queue _, err := conn.Channel.QueueDeclare(qName, true, false, false, false, nil) if err != nil { return fmt.Errorf("queue declare: %v", err) diff --git a/internal/rabbit/publish.go b/internal/rabbit/publish.go index ffbad88..2500101 100644 --- a/internal/rabbit/publish.go +++ b/internal/rabbit/publish.go @@ -6,8 +6,8 @@ import ( "github.com/streadway/amqp" ) -// Publish publishes a message to a Rabbit exchange using a channel -// opened upon connection to Rabbit. For use in the main go routine. +// Publish publishes a message to a Rabbit exchange using the main channel. +// For use in the main go routine. func (conn Conn) Publish(exch, rKey string, message []byte) error { return PublishInChannel(conn.Channel, exch, rKey, message) }