1
0
mirror of https://github.com/ebosas/microservices.git synced 2025-06-06 22:16:11 +02:00

207 lines
5.5 KiB
Go
Raw Normal View History

2021-06-08 21:26:14 +03:00
package main
import (
"context"
"embed"
2021-10-08 13:25:43 +03:00
"encoding/json"
2021-06-08 21:26:14 +03:00
"fmt"
"html/template"
"log"
"net/http"
"time"
2021-10-07 16:23:34 +03:00
"github.com/ebosas/microservices/internal/cache"
2021-06-08 21:26:14 +03:00
"github.com/ebosas/microservices/internal/config"
"github.com/ebosas/microservices/internal/rabbit"
iwebsocket "github.com/ebosas/microservices/internal/websocket"
2021-10-07 16:23:34 +03:00
"github.com/go-redis/redis/v8"
2021-06-08 21:26:14 +03:00
"github.com/gorilla/websocket"
"github.com/streadway/amqp"
)
//go:embed template
var filesTempl embed.FS
//go:embed static
var filesStatic embed.FS
var (
conf = config.New()
upgrader = websocket.Upgrader{} // use default options
)
func main() {
2021-06-11 14:21:50 +03:00
fmt.Println("[Server]")
2021-06-08 21:26:14 +03:00
// Establish a Rabbit connection.
2021-10-07 16:23:34 +03:00
connMQ, err := rabbit.GetConn(conf.RabbitURL)
2021-06-08 21:26:14 +03:00
if err != nil {
log.Fatalf("rabbit connection: %s", err)
}
2021-10-07 16:23:34 +03:00
defer connMQ.Close()
2021-06-08 21:26:14 +03:00
2021-10-07 16:23:34 +03:00
err = connMQ.DeclareTopicExchange(conf.Exchange)
2021-06-08 21:26:14 +03:00
if err != nil {
log.Fatalf("declare exchange: %s", err)
}
2021-10-07 16:23:34 +03:00
// Redis connection
connR := redis.NewClient(&redis.Options{
Addr: conf.RedisURL,
Password: "", // no password set
DB: 0, // use default DB
})
2021-06-08 21:26:14 +03:00
http.Handle("/static/", http.FileServer(http.FS(filesStatic)))
http.HandleFunc("/", handleHome)
2021-10-07 16:23:34 +03:00
http.HandleFunc("/messages", handleMessages(connR))
http.HandleFunc("/ws", handleWebsocketConn(connMQ))
http.HandleFunc("/api/cache", handleAPICache(connR)) // defined in api.go
2021-06-08 21:26:14 +03:00
log.Fatal(http.ListenAndServe(conf.ServerAddr, nil))
}
2021-10-07 16:23:34 +03:00
// handleHome handles the home page.
2021-06-08 21:26:14 +03:00
func handleHome(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/" {
handleNotFound(w)
return
}
2021-10-07 16:23:34 +03:00
t := template.Must(template.ParseFS(filesTempl, "template/template.html", "template/navbar.html", "template/home.html"))
t.ExecuteTemplate(w, "layout", nil)
}
// handleMessages handles the messages page.
func handleMessages(cr *redis.Client) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
2021-10-08 13:25:43 +03:00
cacheJSON, err := cache.GetCacheJSON(cr)
2021-10-07 16:23:34 +03:00
if err != nil {
log.Printf("get cache: %s", err)
return
}
2021-10-08 13:25:43 +03:00
var cached cache.Cache
err = json.Unmarshal([]byte(cacheJSON), &cached)
if err != nil {
log.Printf("unmarshal cache: %s", err)
return
}
data := map[string]interface{}{
"Data": cached,
"Json": cacheJSON,
}
// data := struct {
// Data cache.Cache
// Json string
// }{cached, "Hello"}
// fmt.Println(data.Json)
2021-10-07 16:23:34 +03:00
funcMap := template.FuncMap{"fdate": formatTime}
t := template.Must(template.New("").Funcs(funcMap).ParseFS(filesTempl, "template/template.html", "template/navbar.html", "template/messages.html"))
t.ExecuteTemplate(w, "layout", data)
}
2021-06-08 21:26:14 +03:00
}
// handleWebsocketConn passes a Rabbit connection to the Websocket handler.
func handleWebsocketConn(conn *rabbit.Conn) func(w http.ResponseWriter, r *http.Request) {
return func(w http.ResponseWriter, r *http.Request) {
handleWebsocket(w, r, conn)
}
}
2021-06-11 14:21:50 +03:00
// handleWebsocket starts two message readers (consumers), one consuming
2021-06-08 21:26:14 +03:00
// a Rabbit queue and another reading from a Websocket connection.
// Each consumer receives a message handler to relay messages –
// from Rabbit to Websocket and vice versa.
func handleWebsocket(w http.ResponseWriter, r *http.Request, conn *rabbit.Conn) {
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Printf("upgrade websocket: %s", err)
return
}
defer ws.Close()
// A separate channel for a publisher in a go routine.
ch, err := conn.Connection.Channel()
if err != nil {
log.Printf("open channel: %s", err)
return
}
defer ch.Close()
2021-06-11 14:21:50 +03:00
// done and cancel() makes sure all spawned go routines are
// terminated if any one of them is finished.
2021-06-08 21:26:14 +03:00
done := make(chan bool)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Start a Rabbit consumer
err = conn.StartConsumerTemp(ctx, done, conf.Exchange, conf.KeyFront, handleWriteWebsocket(ws))
if err != nil {
log.Printf("start temp consumer: %s", err)
return
}
2021-06-11 14:21:50 +03:00
// Start a websocket reader (consumer)
2021-06-08 21:26:14 +03:00
err = iwebsocket.StartReader(ctx, done, ws, handlePublishRabbit(ch))
if err != nil {
log.Printf("start websocket reader: %s", err)
return
}
<-done
}
// handleWriteWebsocket writes a Rabbit message to Websocket.
// A Rabbit consumer only passes a message. So, a Websocket connection is
// additionally passed using a closure.
func handleWriteWebsocket(ws *websocket.Conn) func(d amqp.Delivery) error {
return func(d amqp.Delivery) error {
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
// TODO: check msg
err := ws.WriteMessage(websocket.TextMessage, []byte(d.Body))
if err != nil {
return fmt.Errorf("write websocket: %v", err)
}
return nil
}
}
// handlePublishRabbit publishes a Websocket message to a Rabbit
2021-06-08 21:30:53 +03:00
// exchange with the the back-end and database routing keys.
2021-06-08 21:26:14 +03:00
// A Websocket reader only passes a message. So, a Rabbit channel is
// additionally passed using a closure.
func handlePublishRabbit(ch *amqp.Channel) func(msg []byte) error {
return func(msg []byte) error {
// TODO: check msg
2021-10-05 16:10:14 +03:00
key := conf.KeyBack + "." + conf.KeyDB + "." + conf.KeyCache
err := rabbit.PublishInChannel(ch, conf.Exchange, key, msg)
2021-06-08 21:26:14 +03:00
if err != nil {
2021-06-11 14:21:50 +03:00
// TODO: log error
2021-06-08 21:26:14 +03:00
return fmt.Errorf("publish rabbit: %v", err)
}
return nil
}
}
// handleNotFound handles 404
func handleNotFound(w http.ResponseWriter) {
w.WriteHeader(http.StatusNotFound)
t, _ := template.ParseFS(filesTempl, "template/404.html")
t.Execute(w, nil)
}
2021-10-07 16:23:34 +03:00
// formatTime returns time formatted for display.
func formatTime(timestamp int64) string {
t := time.Unix(timestamp/1000, 0)
format := "3:04pm, Jan 2"
if t.Day() == time.Now().Day() {
format = "3:04pm"
}
return t.Format(format)
}