1
0
mirror of https://github.com/ebosas/microservices.git synced 2025-08-24 20:08:55 +02:00
This commit is contained in:
ebosas
2021-06-03 11:54:22 +03:00
parent 5ef4a3fe6a
commit 8a9a37cdf2
3 changed files with 16 additions and 17 deletions

View File

@@ -23,6 +23,7 @@ var conf = config.New()
func main() { func main() {
log.SetFlags(0) log.SetFlags(0)
log.Print("Running backend")
conn, err := amqp.Dial(conf.RabbitURL) conn, err := amqp.Dial(conf.RabbitURL)
if err != nil { if err != nil {
@@ -85,6 +86,8 @@ func main() {
log.Fatalf("Failed to register a consumer: %s", err) log.Fatalf("Failed to register a consumer: %s", err)
} }
// Reads messages from the back end queue
// and prints them to the terminal
go func() { go func() {
for msg := range msgs { for msg := range msgs {
var message Message var message Message
@@ -101,9 +104,9 @@ func main() {
publishInput(conn) publishInput(conn)
} }
// publishInput reads user input from stdin, // publishInput reads user input, marshals to json,
// marshals as json messages, and publishes them // and publishes to RabbitMQ with the front end
// to a RabbitMQ exchange // and database keys
func publishInput(c *amqp.Connection) { func publishInput(c *amqp.Connection) {
ch, err := c.Channel() ch, err := c.Channel()
if err != nil { if err != nil {

View File

@@ -20,6 +20,9 @@ type Message struct {
var conf = config.New() var conf = config.New()
func main() { func main() {
log.SetFlags(0)
log.Print("Running database")
// Postgres connection // Postgres connection
connP, err := pgx.Connect(context.Background(), conf.PostgresURL) connP, err := pgx.Connect(context.Background(), conf.PostgresURL)
if err != nil { if err != nil {
@@ -97,7 +100,7 @@ func main() {
break break
} }
// Insert a message from Rabbit to Postgres // Insert a message from RabbitMQ to Postgres
_, err = connP.Exec(context.Background(), "insert into messages (message, created) values ($1, to_timestamp($2))", message.Text, message.Time/1000) _, err = connP.Exec(context.Background(), "insert into messages (message, created) values ($1, to_timestamp($2))", message.Text, message.Time/1000)
if err != nil { if err != nil {
log.Printf("Failed to insert into a database: %s", err) log.Printf("Failed to insert into a database: %s", err)

View File

@@ -26,6 +26,7 @@ var (
func main() { func main() {
log.SetFlags(0) log.SetFlags(0)
log.Print("Running server")
conn, err := amqp.Dial(conf.RabbitURL) conn, err := amqp.Dial(conf.RabbitURL)
if err != nil { if err != nil {
@@ -75,21 +76,15 @@ func main() {
log.Fatalf("Failed to bind a backend queue: %s", err) log.Fatalf("Failed to bind a backend queue: %s", err)
} }
// files := http.FileServer(http.Dir("./static"))
// http.Handle("/static/", http.StripPrefix("/static/", files))
// http.Handle("/static/", http.StripPrefix("/static/", http.FileServer(http.FS(static))))
http.Handle("/static/", http.FileServer(http.FS(static))) http.Handle("/static/", http.FileServer(http.FS(static)))
http.HandleFunc("/", handleHome) http.HandleFunc("/", handleHome)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) { http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
handleWs(w, r, conn) handleWs(w, r, conn)
}) })
log.Fatal(http.ListenAndServe(conf.ServerAddr, nil)) log.Fatal(http.ListenAndServe(conf.ServerAddr, nil))
} }
func handleHome(w http.ResponseWriter, r *http.Request) { func handleHome(w http.ResponseWriter, r *http.Request) {
// t, _ := template.ParseFiles("template.html")
t, _ := template.ParseFS(files, "template.html") t, _ := template.ParseFS(files, "template.html")
t.Execute(w, nil) t.Execute(w, nil)
} }
@@ -110,8 +105,8 @@ func handleWs(w http.ResponseWriter, r *http.Request, c *amqp.Connection) {
<-done <-done
} }
// wsWriter reads messages from a Rabbit exchange // wsWriter reads messages from RabbitMQ
// and writes to a websocket // and writes to websocket
func wsWriter(ws *websocket.Conn, c *amqp.Connection, done chan bool) { func wsWriter(ws *websocket.Conn, c *amqp.Connection, done chan bool) {
defer func() { defer func() {
done <- true done <- true
@@ -152,7 +147,7 @@ func wsWriter(ws *websocket.Conn, c *amqp.Connection, done chan bool) {
msgs, err := ch.Consume( msgs, err := ch.Consume(
q.Name, // queue name q.Name, // queue name
"", // consumer "", // consumer
false, // auto-ack true, // auto-ack
false, // exclusive false, // exclusive
false, // no-local false, // no-local
false, // no-wait false, // no-wait
@@ -170,14 +165,12 @@ func wsWriter(ws *websocket.Conn, c *amqp.Connection, done chan bool) {
log.Printf("Failed to write to WebSocket: %s", err) log.Printf("Failed to write to WebSocket: %s", err)
break break
} }
msg.Ack(false)
} }
} }
// wsReader reads messages from a websocket and writes // wsReader reads messages from websocket
// to a Rabbit exchange // and publishes to RabbitMQ
func wsReader(ws *websocket.Conn, c *amqp.Connection, done chan bool) { func wsReader(ws *websocket.Conn, c *amqp.Connection, done chan bool) {
defer func() { defer func() {
done <- true done <- true