2021-06-08 21:26:14 +03:00
|
|
|
package main
|
|
|
|
|
|
|
|
import (
|
|
|
|
"context"
|
|
|
|
"embed"
|
|
|
|
"fmt"
|
|
|
|
"html/template"
|
|
|
|
"log"
|
|
|
|
"net/http"
|
|
|
|
"time"
|
|
|
|
|
2021-10-07 16:23:34 +03:00
|
|
|
"github.com/ebosas/microservices/internal/cache"
|
2021-06-08 21:26:14 +03:00
|
|
|
"github.com/ebosas/microservices/internal/config"
|
|
|
|
"github.com/ebosas/microservices/internal/rabbit"
|
|
|
|
iwebsocket "github.com/ebosas/microservices/internal/websocket"
|
2021-10-07 16:23:34 +03:00
|
|
|
"github.com/go-redis/redis/v8"
|
2021-06-08 21:26:14 +03:00
|
|
|
"github.com/gorilla/websocket"
|
|
|
|
"github.com/streadway/amqp"
|
|
|
|
)
|
|
|
|
|
|
|
|
//go:embed template
|
|
|
|
var filesTempl embed.FS
|
|
|
|
|
|
|
|
//go:embed static
|
|
|
|
var filesStatic embed.FS
|
|
|
|
|
|
|
|
var (
|
|
|
|
conf = config.New()
|
|
|
|
upgrader = websocket.Upgrader{} // use default options
|
|
|
|
)
|
|
|
|
|
|
|
|
func main() {
|
2021-06-11 14:21:50 +03:00
|
|
|
fmt.Println("[Server]")
|
2021-06-08 21:26:14 +03:00
|
|
|
|
|
|
|
// Establish a Rabbit connection.
|
2021-10-07 16:23:34 +03:00
|
|
|
connMQ, err := rabbit.GetConn(conf.RabbitURL)
|
2021-06-08 21:26:14 +03:00
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("rabbit connection: %s", err)
|
|
|
|
}
|
2021-10-07 16:23:34 +03:00
|
|
|
defer connMQ.Close()
|
2021-06-08 21:26:14 +03:00
|
|
|
|
2021-10-07 16:23:34 +03:00
|
|
|
err = connMQ.DeclareTopicExchange(conf.Exchange)
|
2021-06-08 21:26:14 +03:00
|
|
|
if err != nil {
|
|
|
|
log.Fatalf("declare exchange: %s", err)
|
|
|
|
}
|
|
|
|
|
2021-10-07 16:23:34 +03:00
|
|
|
// Redis connection
|
|
|
|
connR := redis.NewClient(&redis.Options{
|
|
|
|
Addr: conf.RedisURL,
|
|
|
|
Password: "", // no password set
|
|
|
|
DB: 0, // use default DB
|
|
|
|
})
|
|
|
|
|
2021-06-08 21:26:14 +03:00
|
|
|
http.Handle("/static/", http.FileServer(http.FS(filesStatic)))
|
|
|
|
http.HandleFunc("/", handleHome)
|
2021-10-07 16:23:34 +03:00
|
|
|
http.HandleFunc("/messages", handleMessages(connR))
|
|
|
|
http.HandleFunc("/ws", handleWebsocketConn(connMQ))
|
|
|
|
http.HandleFunc("/api/cache", handleAPICache(connR)) // defined in api.go
|
2021-06-08 21:26:14 +03:00
|
|
|
log.Fatal(http.ListenAndServe(conf.ServerAddr, nil))
|
|
|
|
}
|
|
|
|
|
2021-10-07 16:23:34 +03:00
|
|
|
// handleHome handles the home page.
|
2021-06-08 21:26:14 +03:00
|
|
|
func handleHome(w http.ResponseWriter, r *http.Request) {
|
|
|
|
if r.URL.Path != "/" {
|
|
|
|
handleNotFound(w)
|
|
|
|
return
|
|
|
|
}
|
2021-10-07 16:23:34 +03:00
|
|
|
t := template.Must(template.ParseFS(filesTempl, "template/template.html", "template/navbar.html", "template/home.html"))
|
|
|
|
t.ExecuteTemplate(w, "layout", nil)
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleMessages handles the messages page.
|
|
|
|
func handleMessages(cr *redis.Client) func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
2021-10-08 21:40:10 +03:00
|
|
|
cacheData, cacheJSON, err := cache.GetCache(cr)
|
2021-10-07 16:23:34 +03:00
|
|
|
if err != nil {
|
|
|
|
log.Printf("get cache: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-10-08 13:25:43 +03:00
|
|
|
data := map[string]interface{}{
|
2021-10-08 21:40:10 +03:00
|
|
|
"Data": cacheData,
|
2021-10-08 13:25:43 +03:00
|
|
|
"Json": cacheJSON,
|
|
|
|
}
|
|
|
|
|
2021-10-08 21:40:10 +03:00
|
|
|
funcMap := template.FuncMap{"ftime": formatTime}
|
2021-10-07 16:23:34 +03:00
|
|
|
t := template.Must(template.New("").Funcs(funcMap).ParseFS(filesTempl, "template/template.html", "template/navbar.html", "template/messages.html"))
|
|
|
|
t.ExecuteTemplate(w, "layout", data)
|
|
|
|
}
|
2021-06-08 21:26:14 +03:00
|
|
|
}
|
|
|
|
|
|
|
|
// handleWebsocketConn passes a Rabbit connection to the Websocket handler.
|
|
|
|
func handleWebsocketConn(conn *rabbit.Conn) func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
|
|
handleWebsocket(w, r, conn)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2021-06-11 14:21:50 +03:00
|
|
|
// handleWebsocket starts two message readers (consumers), one consuming
|
2021-06-08 21:26:14 +03:00
|
|
|
// a Rabbit queue and another reading from a Websocket connection.
|
|
|
|
// Each consumer receives a message handler to relay messages –
|
|
|
|
// from Rabbit to Websocket and vice versa.
|
|
|
|
func handleWebsocket(w http.ResponseWriter, r *http.Request, conn *rabbit.Conn) {
|
|
|
|
ws, err := upgrader.Upgrade(w, r, nil)
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("upgrade websocket: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer ws.Close()
|
|
|
|
|
|
|
|
// A separate channel for a publisher in a go routine.
|
|
|
|
ch, err := conn.Connection.Channel()
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("open channel: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
defer ch.Close()
|
|
|
|
|
2021-06-11 14:21:50 +03:00
|
|
|
// done and cancel() makes sure all spawned go routines are
|
|
|
|
// terminated if any one of them is finished.
|
2021-06-08 21:26:14 +03:00
|
|
|
done := make(chan bool)
|
|
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
|
|
defer cancel()
|
|
|
|
|
|
|
|
// Start a Rabbit consumer
|
|
|
|
err = conn.StartConsumerTemp(ctx, done, conf.Exchange, conf.KeyFront, handleWriteWebsocket(ws))
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("start temp consumer: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
2021-06-11 14:21:50 +03:00
|
|
|
// Start a websocket reader (consumer)
|
2021-06-08 21:26:14 +03:00
|
|
|
err = iwebsocket.StartReader(ctx, done, ws, handlePublishRabbit(ch))
|
|
|
|
if err != nil {
|
|
|
|
log.Printf("start websocket reader: %s", err)
|
|
|
|
return
|
|
|
|
}
|
|
|
|
|
|
|
|
<-done
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleWriteWebsocket writes a Rabbit message to Websocket.
|
|
|
|
// A Rabbit consumer only passes a message. So, a Websocket connection is
|
|
|
|
// additionally passed using a closure.
|
|
|
|
func handleWriteWebsocket(ws *websocket.Conn) func(d amqp.Delivery) error {
|
|
|
|
return func(d amqp.Delivery) error {
|
|
|
|
ws.SetWriteDeadline(time.Now().Add(10 * time.Second))
|
|
|
|
// TODO: check msg
|
|
|
|
err := ws.WriteMessage(websocket.TextMessage, []byte(d.Body))
|
|
|
|
if err != nil {
|
|
|
|
return fmt.Errorf("write websocket: %v", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// handlePublishRabbit publishes a Websocket message to a Rabbit
|
2021-06-08 21:30:53 +03:00
|
|
|
// exchange with the the back-end and database routing keys.
|
2021-06-08 21:26:14 +03:00
|
|
|
// A Websocket reader only passes a message. So, a Rabbit channel is
|
|
|
|
// additionally passed using a closure.
|
|
|
|
func handlePublishRabbit(ch *amqp.Channel) func(msg []byte) error {
|
|
|
|
return func(msg []byte) error {
|
|
|
|
// TODO: check msg
|
2021-10-05 16:10:14 +03:00
|
|
|
key := conf.KeyBack + "." + conf.KeyDB + "." + conf.KeyCache
|
|
|
|
err := rabbit.PublishInChannel(ch, conf.Exchange, key, msg)
|
2021-06-08 21:26:14 +03:00
|
|
|
if err != nil {
|
2021-06-11 14:21:50 +03:00
|
|
|
// TODO: log error
|
2021-06-08 21:26:14 +03:00
|
|
|
return fmt.Errorf("publish rabbit: %v", err)
|
|
|
|
}
|
|
|
|
return nil
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
// handleNotFound handles 404
|
|
|
|
func handleNotFound(w http.ResponseWriter) {
|
|
|
|
w.WriteHeader(http.StatusNotFound)
|
|
|
|
t, _ := template.ParseFS(filesTempl, "template/404.html")
|
|
|
|
t.Execute(w, nil)
|
|
|
|
}
|
2021-10-07 16:23:34 +03:00
|
|
|
|
|
|
|
// formatTime returns time formatted for display.
|
|
|
|
func formatTime(timestamp int64) string {
|
|
|
|
t := time.Unix(timestamp/1000, 0)
|
|
|
|
|
|
|
|
format := "3:04pm, Jan 2"
|
|
|
|
if t.Day() == time.Now().Day() {
|
|
|
|
format = "3:04pm"
|
|
|
|
}
|
|
|
|
|
|
|
|
return t.Format(format)
|
|
|
|
}
|