2021-06-08 21:26:14 +03:00
|
|
|
package rabbit
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"fmt"
|
|
|
|
"log"
|
|
|
|
|
|
|
|
"github.com/streadway/amqp"
|
|
|
|
)
|
|
|
|
|
|
|
|
// StartConsumer consumes messages from a Rabbit queue with a specified
|
|
|
|
// routing key and passes them to a supplied handler for processing.
|
|
|
|
// The queue is created (or connected to, if exists) and bound to an exchange.
|
|
|
|
// Used for durable queues in the main go routine.
|
|
|
|
func (conn *Conn) StartConsumer(exch, qName, rKey string, handler func(amqp.Delivery) bool) error {
|
2021-06-08 21:41:51 +03:00
|
|
|
// Declare a durable queue
|
2021-06-08 21:26:14 +03:00
|
|
|
_, err := conn.Channel.QueueDeclare(qName, true, false, false, false, nil)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("queue declare: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = conn.Channel.QueueBind(qName, "#."+rKey+".#", exch, false, nil)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("queue bind: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Set prefetchCount above zero to limit unacknowledged messages.
|
|
|
|
err = conn.Channel.Qos(0, 0, false)
|
|
|
|
if err != nil {
|
|
|
|
return err
|
|
|
|
}
|
|
|
|
|
|
|
|
// Consume with explicit ack
|
|
|
|
msgs, err := conn.Channel.Consume(qName, "", false, false, false, false, nil)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("consume: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
for msg := range msgs {
|
|
|
|
if handler(msg) {
|
|
|
|
msg.Ack(false)
|
|
|
|
} else {
|
|
|
|
msg.Nack(false, true)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
log.Fatalf("consumer closed")
|
|
|
|
}()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
|
|
|
|
// StartConsumerTemp consumes messages with a specified routing key
|
|
|
|
// and passes them to a supplied handler for processing.
|
|
|
|
// Creates a separate channel and a temporary queue that will be deleted
|
|
|
|
// when processing ends (i.e. Websocket connection closes).
|
|
|
|
// Used in go routines such as each Websocket handler established
|
|
|
|
// by a front end user.
|
|
|
|
func (conn *Conn) StartConsumerTemp(ctx context.Context, done chan<- bool, exch, rKey string, handler func(amqp.Delivery) error) error {
|
|
|
|
// A separate channel for a consumer in a go routine
|
|
|
|
ch, err := conn.Connection.Channel()
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("open channel: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Declare a non-durable, auto-deleted, exlusive queue with
|
|
|
|
// a generated name.
|
|
|
|
q, err := ch.QueueDeclare("", false, true, true, false, nil)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("queue declare: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
err = ch.QueueBind(q.Name, "#."+rKey+".#", exch, false, nil)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("queue bind: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
// Consume with auto-ack
|
|
|
|
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("consume: %v", err)
|
|
|
|
}
|
|
|
|
|
|
|
|
go func() {
|
|
|
|
defer ch.Close()
|
|
|
|
Consumer:
|
|
|
|
for {
|
|
|
|
select {
|
|
|
|
case msg := <-msgs:
|
|
|
|
if err := handler(msg); err != nil {
|
|
|
|
done <- true
|
|
|
|
break Consumer
|
|
|
|
}
|
|
|
|
case <-ctx.Done():
|
|
|
|
break Consumer
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}()
|
|
|
|
|
|
|
|
return nil
|
|
|
|
}
|