1
0
mirror of https://github.com/ebosas/microservices.git synced 2024-11-16 10:08:29 +02:00

Reorganize code, directories

This commit is contained in:
ebosas 2021-06-08 21:26:14 +03:00
parent 0641de3cc1
commit 4a783cf5f1
40 changed files with 570 additions and 554 deletions

View File

@ -1,3 +1,3 @@
*/node_modules
*/build
server/static
web/*/node_modules
web/*/build
cmd/*/static/build

5
.gitignore vendored
View File

@ -1,3 +1,4 @@
node_modules
build
server/static
web/**/build
cmd/**/build
#.env

View File

@ -7,11 +7,11 @@ A basic example of microservice architecture which demonstrates communication be
* Uses WebSocket to talk to the front end
* Stores data in PostgreSQL
* Uses React for front end development
* Builds and runs the application with Docker
* Builds and runs with Docker
## Running the code
## Usage
To run the example, clone the Github repository and start the services with Docker Compose. Once Docker finishes downloading and building images, the front end is accessible by visiting `localhost:8080` in a browser.
To run the example, clone the Github repository and start the services using Docker Compose. Once Docker finishes downloading and building images, the front end is accessible by visiting `localhost:8080`.
```bash
git clone https://github.com/ebosas/microservices
@ -34,7 +34,8 @@ docker attach microservices_backend
To inspect the database, launch a new container that will connect to our Postgres database. Then enter the password `demopsw` (see the `.env` file).
```bash
docker run -it --rm --network microservices_network postgres:13-alpine psql -h postgres -U postgres -d microservices
docker run -it --rm --network microservices_network \
postgres:13-alpine psql -h postgres -U postgres -d microservices
```
Select everything from the messages table:

View File

@ -1,9 +1,9 @@
FROM golang:1.16-alpine AS backend
WORKDIR /go/src/app
COPY backend/go.mod backend/go.sum ./
COPY go.* .
COPY internal ./internal
RUN go mod download
COPY backend .
# Info about flags: https://golang.org/cmd/link/
COPY cmd/backend .
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags '-s' -o backend .
FROM scratch

View File

@ -1,8 +0,0 @@
module github.com/ebosas/microservices/backend
go 1.16
require (
github.com/ebosas/microservices/config v0.0.0-20210601112602-4c18303242d8
github.com/streadway/amqp v1.0.0
)

View File

@ -1,4 +0,0 @@
github.com/ebosas/microservices/config v0.0.0-20210601112602-4c18303242d8 h1:7k9LIaw7438zV/2ZzL5rjMhhDC0+KpDiXpMkIHXoZOk=
github.com/ebosas/microservices/config v0.0.0-20210601112602-4c18303242d8/go.mod h1:5Zi9BVqQ5wo4Az0osVj3uYCMTzIWPCHcOrgXXxH6Ih4=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=

View File

@ -1,154 +0,0 @@
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/ebosas/microservices/config"
"github.com/streadway/amqp"
)
type Message struct {
Text string `json:"text"`
Source string `json:"source"`
Time int64 `json:"time"`
}
var conf = config.New()
func main() {
log.SetFlags(0)
log.Print("Running backend")
conn, err := amqp.Dial(conf.RabbitURL)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
conf.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %s", err)
}
_, err = ch.QueueDeclare(
conf.QueueBack, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
err = ch.QueueBind(
conf.QueueBack, // queue name
fmt.Sprintf("#.%s.#", conf.KeyBack), // routing key
conf.Exchange, // exchange
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to bind a queue: %s", err)
}
msgs, err := ch.Consume(
conf.QueueBack, // queue name
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
// Reads messages from the back end queue
// and prints them to the terminal
go func() {
for msg := range msgs {
var message Message
err := json.Unmarshal(msg.Body, &message)
if err != nil {
log.Fatalf("Failed to unmarshal a message: %s", err)
}
log.Printf("[Received] %s", string(message.Text))
msg.Ack(false)
}
}()
publishInput(conn)
}
// publishInput reads user input, marshals to json,
// and publishes to RabbitMQ with the front end
// and database keys
func publishInput(c *amqp.Connection) {
ch, err := c.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
reader := bufio.NewReader(os.Stdin)
for {
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input)
if input == "" {
continue
}
message, err := json.Marshal(
Message{
input,
"back",
time.Now().UnixNano() / int64(1e6),
},
)
if err != nil {
log.Fatalf("Failed to marshal a message: %s", err)
}
err = ch.Publish(
conf.Exchange, // exchane name
conf.KeyFront+"."+conf.KeyDB, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Timestamp: time.Now(),
ContentType: "text/plain",
Body: message,
},
)
if err != nil {
log.Fatalf("Failed to publish a message: %s", err)
}
}
}

76
cmd/backend/backend.go Normal file
View File

@ -0,0 +1,76 @@
package main
import (
"bufio"
"encoding/json"
"fmt"
"log"
"os"
"strings"
"time"
"github.com/ebosas/microservices/internal/config"
"github.com/ebosas/microservices/internal/models"
"github.com/ebosas/microservices/internal/rabbit"
"github.com/streadway/amqp"
)
var conf = config.New()
func main() {
fmt.Println("Running backend service")
// Establish a Rabbit connection.
conn, err := rabbit.GetConn(conf.RabbitURL)
if err != nil {
log.Fatalf("rabbit connection: %s", err)
}
defer conn.Close()
err = conn.DeclareTopicExchange(conf.Exchange)
if err != nil {
log.Fatalf("declare exchange: %s", err)
}
// Start a Rabbit consumer with a message processing handler.
conn.StartConsumer(conf.Exchange, conf.QueueBack, conf.KeyBack, receiveMessages)
publishInput(conn)
}
// receiveMessages prints messages to stdout.
func receiveMessages(d amqp.Delivery) bool {
var message models.Message
err := json.Unmarshal(d.Body, &message)
if err != nil {
log.Fatalf("unmarshal message: %s", err)
}
fmt.Printf("> %s\n", string(message.Text))
return true
}
// publishInput reads user input, marshals to json, and publishes to
// a Rabbit exchange with the front-end and database routing key.
func publishInput(conn *rabbit.Conn) {
reader := bufio.NewReader(os.Stdin)
for {
input, _ := reader.ReadString('\n')
input = strings.TrimSpace(input)
if input == "" {
continue
}
inputTime := time.Now().UnixNano() / int64(1e6) // in miliseconds
inputMsg := models.Message{Text: input, Source: "back", Time: inputTime}
message, err := json.Marshal(inputMsg)
if err != nil {
log.Fatalf("marshal message: %s", err)
}
err = conn.Publish(conf.Exchange, conf.KeyFront+"."+conf.KeyDB, message)
if err != nil {
log.Fatalf("publish message: %s", err)
}
}
}

62
cmd/database/database.go Normal file
View File

@ -0,0 +1,62 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/ebosas/microservices/internal/config"
"github.com/ebosas/microservices/internal/models"
"github.com/ebosas/microservices/internal/rabbit"
"github.com/jackc/pgx/v4"
"github.com/streadway/amqp"
)
var conf = config.New()
func main() {
fmt.Println("Running database service")
// Postgres connection
connP, err := pgx.Connect(context.Background(), conf.PostgresURL)
if err != nil {
log.Fatalf("postgres connection: %s", err)
}
defer connP.Close(context.Background())
// Rabbit connection
connR, err := rabbit.GetConn(conf.RabbitURL)
if err != nil {
log.Fatalf("rabbit connection: %s", err)
}
defer connR.Close()
err = connR.DeclareTopicExchange(conf.Exchange)
if err != nil {
log.Fatalf("declare exchange: %s", err)
}
// Start a Rabbit consumer with a message processing handler.
connR.StartConsumer(conf.Exchange, conf.QueueDB, conf.KeyDB, func(d amqp.Delivery) bool {
return insertToDB(d, connP)
})
select {}
}
// insertToDB inserts a Rabbit message into a Postgres database.
func insertToDB(d amqp.Delivery, connP *pgx.Conn) bool {
var message models.Message
err := json.Unmarshal(d.Body, &message)
if err != nil {
log.Fatalf("unmarshal message: %s", err)
}
_, err = connP.Exec(context.Background(), "insert into messages (message, created) values ($1, to_timestamp($2))", message.Text, message.Time/1000)
if err != nil {
log.Fatalf("insert into database: %s", err)
}
return true
}

147
cmd/server/server.go Normal file
View File

@ -0,0 +1,147 @@
package main
import (
"context"
"embed"
"fmt"
"html/template"
"log"
"net/http"
"time"
"github.com/ebosas/microservices/internal/config"
"github.com/ebosas/microservices/internal/rabbit"
iwebsocket "github.com/ebosas/microservices/internal/websocket"
"github.com/gorilla/websocket"
"github.com/streadway/amqp"
)
//go:embed template
var filesTempl embed.FS
//go:embed static
var filesStatic embed.FS
var (
conf = config.New()
upgrader = websocket.Upgrader{} // use default options
)
func main() {
fmt.Println("Running server")
// Establish a Rabbit connection.
conn, err := rabbit.GetConn(conf.RabbitURL)
if err != nil {
log.Fatalf("rabbit connection: %s", err)
}
defer conn.Close()
err = conn.DeclareTopicExchange(conf.Exchange)
if err != nil {
log.Fatalf("declare exchange: %s", err)
}
http.Handle("/static/", http.FileServer(http.FS(filesStatic)))
http.HandleFunc("/", handleHome)
http.HandleFunc("/ws", handleWebsocketConn(conn))
log.Fatal(http.ListenAndServe(conf.ServerAddr, nil))
}
func handleHome(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
handleNotFound(w)
return
}
t, _ := template.ParseFS(filesTempl, "template/template.html")
t.Execute(w, nil)
}
// handleWebsocketConn passes a Rabbit connection to the Websocket handler.
func handleWebsocketConn(conn *rabbit.Conn) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
handleWebsocket(w, r, conn)
}
}
// handleWebsocket starts two message consumers/readers, one consuming
// a Rabbit queue and another reading from a Websocket connection.
// Each consumer receives a message handler to relay messages –
// from Rabbit to Websocket and vice versa.
func handleWebsocket(w http.ResponseWriter, r *http.Request, conn *rabbit.Conn) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("upgrade websocket: %s", err)
return
}
defer ws.Close()
// A separate channel for a publisher in a go routine.
ch, err := conn.Connection.Channel()
if err != nil {
log.Printf("open channel: %s", err)
return
}
defer ch.Close()
// done closes a websocket connection, if there's an error in
// either a Rabbit consumer or Websocket reader.
done := make(chan bool)
// cancel terminates all spawned go routinges if one of them is done.
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start a Rabbit consumer
err = conn.StartConsumerTemp(ctx, done, conf.Exchange, conf.KeyFront, handleWriteWebsocket(ws))
if err != nil {
log.Printf("start temp consumer: %s", err)
return
}
// Start a websocket consumer/reader
err = iwebsocket.StartReader(ctx, done, ws, handlePublishRabbit(ch))
if err != nil {
log.Printf("start websocket reader: %s", err)
return
}
<-done
}
// handleWriteWebsocket writes a Rabbit message to Websocket.
// A Rabbit consumer only passes a message. So, a Websocket connection is
// additionally passed using a closure.
func handleWriteWebsocket(ws *websocket.Conn) func(d amqp.Delivery) error {
return func(d amqp.Delivery) error {
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
// TODO: check msg
err := ws.WriteMessage(websocket.TextMessage, []byte(d.Body))
if err != nil {
return fmt.Errorf("write websocket: %v", err)
}
return nil
}
}
// handlePublishRabbit publishes a Websocket message to a Rabbit
// exchange with the the back-end and database routing key.
// A Websocket reader only passes a message. So, a Rabbit channel is
// additionally passed using a closure.
func handlePublishRabbit(ch *amqp.Channel) func(msg []byte) error {
return func(msg []byte) error {
// TODO: check msg
err := rabbit.PublishInChannel(ch, conf.Exchange, conf.KeyBack+"."+conf.KeyDB, msg)
if err != nil {
// TODO: log error, not common event
return fmt.Errorf("publish rabbit: %v", err)
}
return nil
}
}
// handleNotFound handles 404
func handleNotFound(w http.ResponseWriter) {
w.WriteHeader(http.StatusNotFound)
t, _ := template.ParseFS(filesTempl, "template/404.html")
t.Execute(w, nil)
}

View File

@ -0,0 +1,11 @@
<!doctype html>
<html lang="en">
<head>
<title>Page not found</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body>
<div>404 Not Found</div>
</body>
</html>

View File

@ -4,7 +4,7 @@
<title>Microservices</title>
<meta charset="utf-8">
<meta name="viewport" content="width=device-width, initial-scale=1">
<link rel="stylesheet" href="static/style.css">
<link rel="stylesheet" href="static/build/style.css">
</head>
<body>
<div id="root" class="vh-100">
@ -29,6 +29,6 @@
</div>
</div>
<!-- <script src="http://127.0.0.1:8000/index.js"></script> -->
<script src="static/index.js"></script>
<script src="static/build/index.js"></script>
</body>
</html>

View File

@ -1,3 +0,0 @@
module github.com/ebosas/microservices/config
go 1.16

View File

@ -1,9 +1,9 @@
FROM golang:1.16-alpine AS database
WORKDIR /go/src/app
COPY database/go.mod database/go.sum ./
COPY go.* .
COPY internal ./internal
RUN go mod download
COPY database .
# Info about flags: https://golang.org/cmd/link/
COPY cmd/database .
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags '-s' -o database .
FROM scratch

View File

@ -1,9 +0,0 @@
module github.com/ebosas/microservices/database
go 1.16
require (
github.com/ebosas/microservices/config v0.0.0-20210602114357-512ceebf6344
github.com/jackc/pgx/v4 v4.11.0
github.com/streadway/amqp v1.0.0
)

View File

@ -1,112 +0,0 @@
package main
import (
"context"
"encoding/json"
"fmt"
"log"
"github.com/ebosas/microservices/config"
"github.com/jackc/pgx/v4"
"github.com/streadway/amqp"
)
type Message struct {
Text string `json:"text"`
Source string `json:"source"`
Time int64 `json:"time"`
}
var conf = config.New()
func main() {
log.SetFlags(0)
log.Print("Running database")
// Postgres connection
connP, err := pgx.Connect(context.Background(), conf.PostgresURL)
if err != nil {
log.Fatalf("Failed to connect to Postgres: %s", err)
}
defer connP.Close(context.Background())
// Amqp connection
connA, err := amqp.Dial(conf.RabbitURL)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer connA.Close()
ch, err := connA.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
conf.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %s", err)
}
_, err = ch.QueueDeclare(
conf.QueueDB, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare a queue: %s", err)
}
err = ch.QueueBind(
conf.QueueDB, // queue name
fmt.Sprintf("#.%s.#", conf.KeyDB), // routing key
conf.Exchange, // exchange
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to bind a queue: %s", err)
}
msgs, err := ch.Consume(
conf.QueueDB, // queue name
"", // consumer
false, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Fatalf("Failed to register a consumer: %s", err)
}
for msg := range msgs {
var message Message
err := json.Unmarshal(msg.Body, &message)
if err != nil {
log.Printf("Failed to unmarshal a message: %s", err)
break
}
// Insert a message from RabbitMQ to Postgres
_, err = connP.Exec(context.Background(), "insert into messages (message, created) values ($1, to_timestamp($2))", message.Text, message.Time/1000)
if err != nil {
log.Printf("Failed to insert into a database: %s", err)
break
}
msg.Ack(false)
}
}

View File

@ -11,7 +11,7 @@ services:
ports:
- 5432:5432
volumes:
- ./database/structure.sql:/docker-entrypoint-initdb.d/structure.sql
- ./init/database.sql:/docker-entrypoint-initdb.d/database.sql
networks:
default:
name: microservices_network

View File

@ -5,9 +5,9 @@ services:
- 5672:5672
- 15672:15672
healthcheck:
# Info: https://www.rabbitmq.com/monitoring.html#health-checks
# test: ["CMD", "rabbitmqctl", "status"]
# Rabbit health check info https://www.rabbitmq.com/monitoring.html#health-checks
test: ["CMD", "rabbitmq-diagnostics", "-q", "status"]
# test: ["CMD", "rabbitmqctl", "status"] # alternative
interval: 10s
timeout: 10s
retries: 5
@ -18,7 +18,7 @@ services:
ports:
- 5432:5432
volumes:
- ./database/structure.sql:/docker-entrypoint-initdb.d/structure.sql
- ./init/database.sql:/docker-entrypoint-initdb.d/database.sql
healthcheck:
test: ["CMD-SHELL", "pg_isready -U postgres"]
interval: 10s

9
go.mod Normal file
View File

@ -0,0 +1,9 @@
module github.com/ebosas/microservices
go 1.16
require (
github.com/gorilla/websocket v1.4.2
github.com/jackc/pgx/v4 v4.11.0
github.com/streadway/amqp v1.0.0
)

View File

@ -49,8 +49,6 @@ github.com/dustin/go-humanize v0.0.0-20171111073723-bb3d318650d4/go.mod h1:Htrtb
github.com/eapache/go-resiliency v1.1.0/go.mod h1:kFI+JgMyC7bLPUVY133qvEBtVayf5mFgVsvEsIPBvNs=
github.com/eapache/go-xerial-snappy v0.0.0-20180814174437-776d5712da21/go.mod h1:+020luEh2TKB4/GOp8oxxtq0Daoen/Cii55CzbTV6DU=
github.com/eapache/queue v1.1.0/go.mod h1:6eCeP0CKFpHLu8blIFXhExK/dRa7WDZfr6jVFPTqq+I=
github.com/ebosas/microservices/config v0.0.0-20210602114357-512ceebf6344 h1:kdAJSegfNAfaWSov0t07dJf31hHcQBVMOnrYVtQfnBw=
github.com/ebosas/microservices/config v0.0.0-20210602114357-512ceebf6344/go.mod h1:5Zi9BVqQ5wo4Az0osVj3uYCMTzIWPCHcOrgXXxH6Ih4=
github.com/edsrzf/mmap-go v1.0.0/go.mod h1:YO35OhQPt3KJa3ryjFM5Bs14WD66h8eGKpfaBNrHW5M=
github.com/envoyproxy/go-control-plane v0.6.9/go.mod h1:SBwIajubJHhxtWwsL9s8ss4safvEdbitLhGGK48rN6g=
github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4=
@ -95,6 +93,8 @@ github.com/gorilla/context v1.1.1/go.mod h1:kBGZzfjB9CEq2AlWe17Uuf7NDRt0dE0s8S51
github.com/gorilla/mux v1.6.2/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/mux v1.7.3/go.mod h1:1lud6UwP+6orDFRuTfBEV8e9/aOM/c4fVVCaMa2zaAs=
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.1-0.20190118093823-f849b5445de4/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0/go.mod h1:8NvIoxWQoOIhqOTXgfV/d3M/q6VIi02HzZEHgUlZvzk=
github.com/grpc-ecosystem/grpc-gateway v1.9.5/go.mod h1:vNeuVxBJEsws4ogUvrchl83t/GYV9WGTSLVdBhOQFDY=

View File

@ -6,14 +6,16 @@ type Config struct {
ServerAddr string
PostgresURL string
RabbitURL string
Exchange string
QueueBack string
Exchange string // Rabbit exchange name
QueueBack string // queue name
QueueDB string
KeyFront string
KeyFront string // routing key name
KeyBack string
KeyDB string
}
// New returns configuration variables from the environment.
// These are passed by Docker from the .env file.
func New() *Config {
return &Config{
ServerAddr: getEnv("SERVER_ADDR", "localhost:8080"),
@ -32,6 +34,5 @@ func getEnv(key string, defaultVal string) string {
if value, exists := os.LookupEnv(key); exists {
return value
}
return defaultVal
}

View File

@ -0,0 +1,8 @@
package models
// Message is used to marshal/unmarshal Rabbit messages.
type Message struct {
Text string `json:"text"`
Source string `json:"source"`
Time int64 `json:"time"`
}

30
internal/rabbit/conn.go Normal file
View File

@ -0,0 +1,30 @@
package rabbit
import "github.com/streadway/amqp"
// Conn returns a Rabbit connecton. Also, a channel to be used
// in the main go routine.
type Conn struct {
Connection *amqp.Connection
Channel *amqp.Channel
}
// GetConn established a Rabbit connection.
func GetConn(rabbitURL string) (*Conn, error) {
conn, err := amqp.Dial(rabbitURL)
if err != nil {
return &Conn{}, err
}
ch, err := conn.Channel()
return &Conn{
Connection: conn,
Channel: ch,
}, err
}
// Close closes the Rabbit connection.
// All resources associated with the connection, including channels,
// will also be closed.
func (conn *Conn) Close() error {
return conn.Connection.Close()
}

100
internal/rabbit/consume.go Normal file
View File

@ -0,0 +1,100 @@
package rabbit
import (
"context"
"fmt"
"log"
"github.com/streadway/amqp"
)
// StartConsumer consumes messages from a Rabbit queue with a specified
// routing key and passes them to a supplied handler for processing.
// The queue is created (or connected to, if exists) and bound to an exchange.
// Used for durable queues in the main go routine.
func (conn *Conn) StartConsumer(exch, qName, rKey string, handler func(amqp.Delivery) bool) error {
_, err := conn.Channel.QueueDeclare(qName, true, false, false, false, nil)
if err != nil {
return fmt.Errorf("queue declare: %v", err)
}
err = conn.Channel.QueueBind(qName, "#."+rKey+".#", exch, false, nil)
if err != nil {
return fmt.Errorf("queue bind: %v", err)
}
// Set prefetchCount above zero to limit unacknowledged messages.
err = conn.Channel.Qos(0, 0, false)
if err != nil {
return err
}
// Consume with explicit ack
msgs, err := conn.Channel.Consume(qName, "", false, false, false, false, nil)
if err != nil {
return fmt.Errorf("consume: %v", err)
}
go func() {
for msg := range msgs {
if handler(msg) {
msg.Ack(false)
} else {
msg.Nack(false, true)
}
}
log.Fatalf("consumer closed")
}()
return nil
}
// StartConsumerTemp consumes messages with a specified routing key
// and passes them to a supplied handler for processing.
// Creates a separate channel and a temporary queue that will be deleted
// when processing ends (i.e. Websocket connection closes).
// Used in go routines such as each Websocket handler established
// by a front end user.
func (conn *Conn) StartConsumerTemp(ctx context.Context, done chan<- bool, exch, rKey string, handler func(amqp.Delivery) error) error {
// A separate channel for a consumer in a go routine
ch, err := conn.Connection.Channel()
if err != nil {
return fmt.Errorf("open channel: %v", err)
}
// Declare a non-durable, auto-deleted, exlusive queue with
// a generated name.
q, err := ch.QueueDeclare("", false, true, true, false, nil)
if err != nil {
return fmt.Errorf("queue declare: %v", err)
}
err = ch.QueueBind(q.Name, "#."+rKey+".#", exch, false, nil)
if err != nil {
return fmt.Errorf("queue bind: %v", err)
}
// Consume with auto-ack
msgs, err := ch.Consume(q.Name, "", true, false, false, false, nil)
if err != nil {
return fmt.Errorf("consume: %v", err)
}
go func() {
defer ch.Close()
Consumer:
for {
select {
case msg := <-msgs:
if err := handler(msg); err != nil {
done <- true
break Consumer
}
case <-ctx.Done():
break Consumer
}
}
}()
return nil
}

View File

@ -0,0 +1,14 @@
package rabbit
// DeclareTopicExchange declares an exchange of type 'topic'.
func (conn *Conn) DeclareTopicExchange(name string) error {
return conn.Channel.ExchangeDeclare(
name, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
}

View File

@ -0,0 +1,30 @@
package rabbit
import (
"time"
"github.com/streadway/amqp"
)
// Publish publishes a message to a Rabbit exchange using a channel
// opened upon connection to Rabbit. For use in the main go routine.
func (conn Conn) Publish(exch, rKey string, message []byte) error {
return PublishInChannel(conn.Channel, exch, rKey, message)
}
// PublishInChannel publishes a message to a Rabbit exchange using
// a provided channel. For use in go routines.
func PublishInChannel(ch *amqp.Channel, exch, rKey string, message []byte) error {
return ch.Publish(
exch, // exchane name
rKey, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
ContentType: "application/json",
DeliveryMode: amqp.Persistent,
Timestamp: time.Now(),
Body: message,
},
)
}

View File

@ -0,0 +1,41 @@
package websocket
import (
"context"
"github.com/gorilla/websocket"
)
// StartReader reads messages from a Websocket connection and passes
// them to a supplied handler for processing.
func StartReader(ctx context.Context, done chan<- bool, ws *websocket.Conn, handler func([]byte) error) error {
msgs := make(chan []byte)
go func() {
Reader:
for {
_, message, err := ws.ReadMessage()
if err != nil {
// log.Printf("read websocket: %s", err)
done <- true
break Reader
}
msgs <- message
}
}()
go func() {
Consumer:
for {
select {
case msg := <-msgs:
err := handler(msg)
if err != nil {
done <- true
break Consumer
}
case <-ctx.Done():
break Consumer
}
}
}()
return nil
}

View File

@ -1,28 +1,29 @@
FROM node:14-alpine AS react
WORKDIR /usr/src/app
COPY react/package*.json .
COPY web/react/package*.json .
RUN npm install
COPY react .
COPY web/react .
RUN npm run build
FROM node:14-alpine AS bootstrap
WORKDIR /usr/src/app
COPY bootstrap/package*.json .
COPY web/bootstrap/package*.json .
RUN npm install
COPY server/template.html ./ref/
COPY cmd/server/template ./ref/
COPY --from=react /usr/src/app/build ./ref/
COPY bootstrap .
COPY web/bootstrap .
RUN npm run css
# Build container for server
FROM golang:1.16-alpine AS server
WORKDIR /go/src/app
COPY server/go.mod server/go.sum ./
COPY go.* .
COPY internal ./internal
RUN go mod download
COPY --from=react /usr/src/app/build ./static/
COPY --from=bootstrap /usr/src/app/build ./static/
COPY server .
# Info about flags: https://golang.org/cmd/link/
COPY --from=react /usr/src/app/build ./static/build/
COPY --from=bootstrap /usr/src/app/build ./static/build/
COPY cmd/server .
# Flag info https://golang.org/cmd/link/
RUN CGO_ENABLED=0 GOOS=linux go build -ldflags '-s' -o server .
FROM scratch

View File

@ -1,9 +0,0 @@
module github.com/ebosas/microservices/server
go 1.16
require (
github.com/ebosas/microservices/config v0.0.0-20210601112602-4c18303242d8
github.com/gorilla/websocket v1.4.2
github.com/streadway/amqp v1.0.0
)

View File

@ -1,6 +0,0 @@
github.com/ebosas/microservices/config v0.0.0-20210601112602-4c18303242d8 h1:7k9LIaw7438zV/2ZzL5rjMhhDC0+KpDiXpMkIHXoZOk=
github.com/ebosas/microservices/config v0.0.0-20210601112602-4c18303242d8/go.mod h1:5Zi9BVqQ5wo4Az0osVj3uYCMTzIWPCHcOrgXXxH6Ih4=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/streadway/amqp v1.0.0 h1:kuuDrUJFZL1QYL9hUNuCxNObNzB0bV/ZG5jV3RWAQgo=
github.com/streadway/amqp v1.0.0/go.mod h1:AZpEONHx3DKn8O/DFsRAY58/XVQiIPMTMB1SddzLXVw=

View File

@ -1,210 +0,0 @@
package main
import (
"embed"
"fmt"
"html/template"
"log"
"net/http"
"time"
"github.com/ebosas/microservices/config"
"github.com/gorilla/websocket"
"github.com/streadway/amqp"
)
//go:embed template.html
var files embed.FS
//go:embed static
var static embed.FS
var (
conf = config.New()
upgrader = websocket.Upgrader{} // use default options
)
func main() {
log.SetFlags(0)
log.Print("Running server")
conn, err := amqp.Dial(conf.RabbitURL)
if err != nil {
log.Fatalf("Failed to connect to RabbitMQ: %s", err)
}
defer conn.Close()
ch, err := conn.Channel()
if err != nil {
log.Fatalf("Failed to open a channel: %s", err)
}
defer ch.Close()
err = ch.ExchangeDeclare(
conf.Exchange, // name
"topic", // type
true, // durable
false, // auto-deleted
false, // internal
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare an exchange: %s", err)
}
_, err = ch.QueueDeclare(
conf.QueueBack, // name
true, // durable
false, // delete when unused
false, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to declare a backend queue: %s", err)
}
err = ch.QueueBind(
conf.QueueBack, // queue name
fmt.Sprintf("#.%s.#", conf.KeyBack), // routing key
conf.Exchange, // exchange
false, // no-wait
nil, // arguments
)
if err != nil {
log.Fatalf("Failed to bind a backend queue: %s", err)
}
http.Handle("/static/", http.FileServer(http.FS(static)))
http.HandleFunc("/", handleHome)
http.HandleFunc("/ws", func(w http.ResponseWriter, r *http.Request) {
handleWs(w, r, conn)
})
log.Fatal(http.ListenAndServe(conf.ServerAddr, nil))
}
func handleHome(w http.ResponseWriter, r *http.Request) {
t, _ := template.ParseFS(files, "template.html")
t.Execute(w, nil)
}
func handleWs(w http.ResponseWriter, r *http.Request, c *amqp.Connection) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("Failed to upgrade WebSocket: %s", err)
return
}
defer ws.Close()
done := make(chan bool)
go wsWriter(ws, c, done)
go wsReader(ws, c, done)
<-done
}
// wsWriter reads messages from RabbitMQ
// and writes to websocket
func wsWriter(ws *websocket.Conn, c *amqp.Connection, done chan bool) {
defer func() {
done <- true
}()
ch, err := c.Channel()
if err != nil {
log.Printf("Failed to open a channel: %s", err)
return
}
defer ch.Close()
q, err := ch.QueueDeclare(
"", // name
false, // durable
true, // delete when unused
true, // exclusive
false, // no-wait
nil, // arguments
)
if err != nil {
log.Printf("Failed to create a frontend queue: %s", err)
return
}
err = ch.QueueBind(
q.Name, // queue name
fmt.Sprintf("#.%s.#", conf.KeyFront), // routing key
conf.Exchange, // exchange
false, // no-wait
nil, // arguments
)
if err != nil {
log.Printf("Failed to bind a frontend queue: %s", err)
return
}
msgs, err := ch.Consume(
q.Name, // queue name
"", // consumer
true, // auto-ack
false, // exclusive
false, // no-local
false, // no-wait
nil, // args
)
if err != nil {
log.Printf("Failed to register a consumer: %s", err)
return
}
for msg := range msgs {
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
err = ws.WriteMessage(websocket.TextMessage, []byte(msg.Body))
if err != nil {
log.Printf("Failed to write to WebSocket: %s", err)
break
}
}
}
// wsReader reads messages from websocket
// and publishes to RabbitMQ
func wsReader(ws *websocket.Conn, c *amqp.Connection, done chan bool) {
defer func() {
done <- true
}()
ch, err := c.Channel()
if err != nil {
log.Printf("Failed to open a channel: %s", err)
return
}
defer ch.Close()
for {
_, message, err := ws.ReadMessage()
if err != nil {
log.Printf("Failed to read a message: %s", err)
break
}
err = ch.Publish(
conf.Exchange, // exchane name
conf.KeyBack+"."+conf.KeyDB, // routing key
false, // mandatory
false, // immediate
amqp.Publishing{
Timestamp: time.Now(),
ContentType: "text/plain",
Body: []byte(message),
},
)
if err != nil {
log.Printf("Failed to publish a message: %s", err)
break
}
}
}

View File

@ -42,8 +42,7 @@ function App() {
}
}, []);
// Sets a received message handler,
// only once
// Sets a received message handler, only once.
React.useEffect(() => {
if (!ws.current) return;