1
0
mirror of https://github.com/ebosas/microservices.git synced 2025-06-06 22:16:11 +02:00
microservices/cmd/database/database.go

75 lines
2.0 KiB
Go
Raw Normal View History

2021-06-08 21:26:14 +03:00
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/ebosas/microservices/internal/config"
"github.com/ebosas/microservices/internal/models"
"github.com/ebosas/microservices/internal/rabbit"
"github.com/jackc/pgx/v4"
"github.com/streadway/amqp"
)
var conf = config.New()
func main() {
2021-06-11 14:22:51 +03:00
fmt.Println("[Database service]")
2021-06-08 21:26:14 +03:00
// Postgres connection
2021-10-05 16:10:14 +03:00
connPG, err := pgx.Connect(context.Background(), conf.PostgresURL)
2021-06-08 21:26:14 +03:00
if err != nil {
log.Fatalf("postgres connection: %s", err)
}
2021-10-05 16:10:14 +03:00
defer connPG.Close(context.Background())
2021-06-08 21:26:14 +03:00
2021-10-05 16:10:14 +03:00
// RabbitMQ connection
connMQ, err := rabbit.GetConn(conf.RabbitURL)
2021-06-08 21:26:14 +03:00
if err != nil {
log.Fatalf("rabbit connection: %s", err)
}
2021-10-05 16:10:14 +03:00
defer connMQ.Close()
2021-06-08 21:26:14 +03:00
2021-10-05 16:10:14 +03:00
err = connMQ.DeclareTopicExchange(conf.Exchange)
2021-06-08 21:26:14 +03:00
if err != nil {
log.Fatalf("declare exchange: %s", err)
}
// Start a Rabbit consumer with a message processing handler.
2021-10-05 16:10:14 +03:00
connMQ.StartConsumer(conf.Exchange, conf.QueueDB, conf.KeyDB, func(d amqp.Delivery) bool {
return insertToDB(d, connPG)
2021-06-08 21:26:14 +03:00
})
select {}
}
// insertToDB inserts a Rabbit message into a Postgres database.
2021-06-11 14:22:51 +03:00
func insertToDB(d amqp.Delivery, c *pgx.Conn) bool {
2021-06-08 21:26:14 +03:00
var message models.Message
err := json.Unmarshal(d.Body, &message)
if err != nil {
log.Fatalf("unmarshal message: %s", err)
}
2021-06-11 14:22:51 +03:00
_, err = c.Exec(context.Background(), "insert into messages (message, created) values ($1, to_timestamp($2))", message.Text, message.Time/1000)
2021-06-08 21:26:14 +03:00
if err != nil {
log.Fatalf("insert into database: %s", err)
}
2021-10-06 15:30:52 +03:00
// An alternative query that returns the id of the inserted row.
// var id int64
// err = c.QueryRow(context.Background(), "insert into messages (message, created) values ($1, to_timestamp($2)) returning id", message.Text, message.Time/1000).Scan(&id)
// if err != nil {
// log.Fatalf("insert into database: %s", err)
// }
// fmt.Println(id)
// For cache, could send messages from here instead
// of doing it from the server and backend services.
2021-10-05 16:10:14 +03:00
// err = <Rabbit conn>.Publish(conf.Exchange, conf.KeyCache, d.Body)
2021-06-08 21:26:14 +03:00
return true
}