2021-06-08 21:26:14 +03:00
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/ebosas/microservices/internal/config"
"github.com/ebosas/microservices/internal/models"
"github.com/ebosas/microservices/internal/rabbit"
"github.com/jackc/pgx/v4"
"github.com/streadway/amqp"
)
var conf = config . New ( )
func main ( ) {
2021-06-11 14:22:51 +03:00
fmt . Println ( "[Database service]" )
2021-06-08 21:26:14 +03:00
// Postgres connection
2021-10-05 16:10:14 +03:00
connPG , err := pgx . Connect ( context . Background ( ) , conf . PostgresURL )
2021-06-08 21:26:14 +03:00
if err != nil {
log . Fatalf ( "postgres connection: %s" , err )
}
2021-10-05 16:10:14 +03:00
defer connPG . Close ( context . Background ( ) )
2021-06-08 21:26:14 +03:00
2021-10-27 09:32:24 +03:00
// The table is not created when deployed on AWS RDS.
_ , err = connPG . Exec ( context . Background ( ) , "create table if not exists messages (id serial primary key, message text not null, created timestamp not null)" )
if err != nil {
log . Fatalf ( "create table: %s" , err )
}
2021-10-05 16:10:14 +03:00
// RabbitMQ connection
connMQ , err := rabbit . GetConn ( conf . RabbitURL )
2021-06-08 21:26:14 +03:00
if err != nil {
log . Fatalf ( "rabbit connection: %s" , err )
}
2021-10-05 16:10:14 +03:00
defer connMQ . Close ( )
2021-06-08 21:26:14 +03:00
2021-10-05 16:10:14 +03:00
err = connMQ . DeclareTopicExchange ( conf . Exchange )
2021-06-08 21:26:14 +03:00
if err != nil {
log . Fatalf ( "declare exchange: %s" , err )
}
// Start a Rabbit consumer with a message processing handler.
2021-10-05 16:10:14 +03:00
connMQ . StartConsumer ( conf . Exchange , conf . QueueDB , conf . KeyDB , func ( d amqp . Delivery ) bool {
return insertToDB ( d , connPG )
2021-06-08 21:26:14 +03:00
} )
select { }
}
// insertToDB inserts a Rabbit message into a Postgres database.
2021-06-11 14:22:51 +03:00
func insertToDB ( d amqp . Delivery , c * pgx . Conn ) bool {
2021-06-08 21:26:14 +03:00
var message models . Message
err := json . Unmarshal ( d . Body , & message )
if err != nil {
log . Fatalf ( "unmarshal message: %s" , err )
}
2021-06-11 14:22:51 +03:00
_ , err = c . Exec ( context . Background ( ) , "insert into messages (message, created) values ($1, to_timestamp($2))" , message . Text , message . Time / 1000 )
2021-06-08 21:26:14 +03:00
if err != nil {
log . Fatalf ( "insert into database: %s" , err )
}
2021-10-10 18:49:39 +03:00
// // An alternative query that returns the id of the inserted row.
2021-10-06 15:30:52 +03:00
// var id int64
// err = c.QueryRow(context.Background(), "insert into messages (message, created) values ($1, to_timestamp($2)) returning id", message.Text, message.Time/1000).Scan(&id)
// if err != nil {
// log.Fatalf("insert into database: %s", err)
// }
// fmt.Println(id)
2021-06-08 21:26:14 +03:00
return true
}