reworked websocket connection

This commit is contained in:
knoxfighter 2020-12-06 17:08:53 +01:00
parent 17c398acb0
commit 54bc8bd354
14 changed files with 497 additions and 226 deletions

View File

@ -216,7 +216,7 @@ func LogTail(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json;charset=UTF-8")
config := bootstrap.GetConfig()
resp, err = factorio.TailLog(config.FactorioLog)
resp, err = factorio.TailLog()
if err != nil {
resp = fmt.Sprintf("Could not tail %s: %s", config.FactorioLog, err)
return
@ -257,7 +257,7 @@ func StartServer(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json;charset=UTF-8")
if server.Running {
if server.GetRunning() {
resp = "Factorio server is already running"
w.WriteHeader(http.StatusConflict)
return
@ -299,7 +299,7 @@ func StartServer(w http.ResponseWriter, r *http.Request) {
timeout := 0
for timeout <= 3 {
time.Sleep(1 * time.Second)
if server.Running {
if server.GetRunning() {
log.Printf("Running Factorio server detected")
break
} else {
@ -309,7 +309,7 @@ func StartServer(w http.ResponseWriter, r *http.Request) {
timeout++
}
if server.Running == false {
if server.GetRunning() == false {
resp = fmt.Sprintf("Error starting Factorio server: %s", err)
log.Println(resp)
w.WriteHeader(http.StatusInternalServerError)
@ -329,7 +329,7 @@ func StopServer(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json;charset=UTF-8")
var server = factorio.GetFactorioServer()
if server.Running {
if server.GetRunning() {
err := server.Stop()
if err != nil {
resp = fmt.Sprintf("Error stopping factorio server: %s", err)
@ -356,7 +356,7 @@ func KillServer(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json;charset=UTF-8")
var server = factorio.GetFactorioServer()
if server.Running {
if server.GetRunning() {
err := server.Kill()
if err != nil {
resp = fmt.Sprintf("Error killing factorio server: %s", err)
@ -381,7 +381,7 @@ func CheckServer(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json;charset=UTF-8")
var server = factorio.GetFactorioServer()
if server.Running {
if server.GetRunning() {
resp["status"] = "running"
resp["port"] = strconv.Itoa(server.Port)
resp["savefile"] = server.Savefile

View File

@ -1,22 +1,13 @@
package api
import (
"github.com/mroote/factorio-server-manager/api/websocket"
"log"
"net/http"
"github.com/gorilla/mux"
"github.com/gorilla/websocket"
)
var upgrader = websocket.Upgrader{
ReadBufferSize: 1024,
WriteBufferSize: 1024,
//TODO Proper origin check
CheckOrigin: func(r *http.Request) bool { return true },
}
type Handler func(*Client, interface{})
type Route struct {
Name string
Method string
@ -26,13 +17,8 @@ type Route struct {
type Routes []Route
type WSRouter struct {
rules map[string]Handler
}
func NewRouter() *mux.Router {
r := mux.NewRouter().StrictSlash(true)
ws := NewWSRouter()
// API subrouter
// Serves all JSON REST handlers prefixed with /api
@ -57,10 +43,15 @@ func NewRouter() *mux.Router {
r.Path("/ws").
Methods("GET").
Name("Websocket").
Handler(AuthorizeHandler(ws))
ws.Handle("command send", CommandSend)
ws.Handle("log subscribe", LogSubscribe)
ws.Handle("server status subscribe", ServerStatusSubscribe)
Handler(
AuthorizeHandler(
http.HandlerFunc(
func(w http.ResponseWriter, r *http.Request) {
websocket.ServeWs(w, r)
},
),
),
)
// Serves the frontend application from the app directory
// Uses basic file server to serve index.html and Javascript application
@ -125,34 +116,6 @@ func AuthorizeHandler(h http.Handler) http.Handler {
})
}
func NewWSRouter() *WSRouter {
return &WSRouter{
rules: make(map[string]Handler),
}
}
func (ws *WSRouter) Handle(msgName string, handler Handler) {
ws.rules[msgName] = handler
}
func (ws *WSRouter) FindHandler(msgName string) (Handler, bool) {
handler, found := ws.rules[msgName]
return handler, found
}
func (ws *WSRouter) ServeHTTP(w http.ResponseWriter, r *http.Request) {
socket, err := upgrader.Upgrade(w, r, nil)
if err != nil {
w.WriteHeader(http.StatusInternalServerError)
log.Printf("Error opening ws connection: %s", err)
return
}
client := NewClient(socket, ws.FindHandler)
defer client.Close()
go client.Write()
client.Read()
}
// Defines all API REST endpoints
// All routes are prefixed with /api
var apiRoutes = Routes{

View File

@ -0,0 +1,171 @@
package websocket
import (
"log"
"net/http"
"time"
"github.com/gorilla/websocket"
)
const (
// Timeout for sending a message
writeWait = 10 * time.Second
// Timeout between the answer of two pong messages
pongWait = 60 * time.Second
// Period in which a new ping message is sent. Must be less than pongWait.
pingPeriod = (pongWait * 9) / 10
// Maximum message size sent from a client
maxMessageSize = 2048
)
// The upgrader to upgrade from http to ws protocol
var upgrader = websocket.Upgrader{
ReadBufferSize: 4096,
WriteBufferSize: 4096,
}
// The websocket client, that is the middleman between a websocket connection and the hub.
// It manages every communication between the hub and the websocket connection.
type wsClient struct {
// The hub this client is registered to.
hub *wsHub
// The websocket connection.
conn *websocket.Conn
// channel to send messages to the websocket connection.
send chan wsMessage
}
// read messages from the websocket connection, choose what has to be done with it and execute that action
// messages with room name, send to the room
// messages with empty room name and controls set, will execute that control, nothing will be sent to other clients
// messages with empty room name and empty controls will send the message to all clients registered in the hub.
//
// This pump has to be executed in a goroutine!
func (client *wsClient) readPump() {
// When this pump closes, unregister the client and close the websocket connection
defer func() {
client.hub.unregister <- client
client.conn.Close()
}()
// Setup some websocket connection settings
client.conn.SetReadLimit(maxMessageSize)
client.conn.SetReadDeadline(time.Now().Add(pongWait))
client.conn.SetPongHandler(func(string) error {
client.conn.SetReadDeadline(time.Now().Add(pongWait))
return nil
})
for {
// wait and read the next incoming message on the websocket.
var message wsMessage
err := client.conn.ReadJSON(&message)
if err != nil {
if websocket.IsUnexpectedCloseError(err, websocket.CloseGoingAway, websocket.CloseAbnormalClosure) {
log.Printf("error: %v", err)
}
break
}
if message.RoomName == "" {
// controls messages will not sent to other clients, they only are relevant for the server
if message.Controls != (WsControls{}) {
// this message is a control message, do its job!
switch message.Controls.Type {
case "subscribe":
room := client.hub.GetRoom(message.Controls.Value)
room.register <- client
case "unsubscribe":
room := client.hub.GetRoom(message.Controls.Value)
room.unregister <- client
default:
for _, handler := range client.hub.controlHandlers {
go handler(message.Controls)
}
}
} else {
client.hub.broadcast <- message
}
} else {
// Send the message to the defined room
room := client.hub.GetRoom(message.RoomName)
room.send <- message
}
}
}
// write message to the websocket connection.
// messages from client.send channel are sent
// Also starts a timer ticker to send ping messages
//
// This pump has to be executed in a goroutine!
func (client *wsClient) writePump() {
// setup ping message ticker
ticker := time.NewTicker(pingPeriod)
// stop the ticker and close the websocket connection, when this pump is finished
defer func() {
ticker.Stop()
client.conn.Close()
}()
for {
select {
case message, ok := <-client.send:
// Setup timeout
client.conn.SetWriteDeadline(time.Now().Add(writeWait))
if !ok {
// The hub closed the channel. Therefore notify the client.
client.conn.WriteMessage(websocket.CloseMessage, []byte{})
return
}
// send the message as json
err := client.conn.WriteJSON(message)
if err != nil {
return
}
case <-ticker.C:
// Setup timeout
client.conn.SetWriteDeadline(time.Now().Add(writeWait))
// send a ping message
if err := client.conn.WriteMessage(websocket.PingMessage, nil); err != nil {
return
}
}
}
}
// serveWs is the http handler to upgrade from http to ws..
// Also the startup point for a client
func ServeWs(w http.ResponseWriter, r *http.Request) {
// upgrade the connection
conn, err := upgrader.Upgrade(w, r, nil)
if err != nil {
log.Println(err)
return
}
// setup the client
client := &wsClient{
hub: WebsocketHub,
conn: conn,
send: make(chan wsMessage, 256),
}
// register this client in the hub
client.hub.register <- client
// start the pipes for the new client in goroutines
go client.writePump()
go client.readPump()
}

200
src/api/websocket/wshub.go Normal file
View File

@ -0,0 +1,200 @@
package websocket
import (
"reflect"
)
// the hub, that is exported and can be used anywhere to work with the websocket
var WebsocketHub *wsHub
// a controlHandler is used to determine, if something has to be done, on a specific command.
// register a handler with `wsHub.RegisterControlHandler`
// unregister a handler with `wsHub.UnregisterControlHandler`
type controlHandler func(controls WsControls)
// The type for of control messages.
// Type ans Value both have to be set, if controls are sent.
// Currently supported Type:
// - `subscribe` - Value used as room name
// - `unsubscribe` - Value used as room name
// - `command` - Value contains the command to execute
type WsControls struct {
Type string `json:"type"`
Value string `json:"value"`
}
// the main message of our websocket protocol.
// if the room_name is an empty string, this will be sent as broadcast
// if controls is not empty, this will not be sent anywhere, but used as commands, to join/leave rooms and to send commands to the factorio server
type wsMessage struct {
RoomName string `json:"room_name"`
Message interface{} `json:"message,omitempty"`
Controls WsControls `json:"controls,omitempty"`
}
type wsRoom struct {
// same as the key of the map in the wsHub
name string
// the wsHub this room is part of
hub *wsHub
// clients that are in this room. This list is a sublist of the one inside the hub
clients map[*wsClient]bool
// register a client to this room
register chan *wsClient
// unregister a client from this room, if no clients remain, this room will be deleted
unregister chan *wsClient
// send a message to all clients in this room
send chan wsMessage
}
// Hub is the basic setup of the server.
// It contains everything needed for the websocket to run.
// Only the controlHandler Subscriptions are public, everything else can be controlled with the functions and the wsClient.
type wsHub struct {
// list of all connected clients
clients map[*wsClient]bool
// Messages that should be sent to ALL clients
broadcast chan wsMessage
// a list of all rooms
rooms map[string]*wsRoom
// register a client to this hub
register chan *wsClient
// unregister a client from this hub
unregister chan *wsClient
// run a control message on all registered controlHandler
runControl chan WsControls
// list of all registered controlHandlers
controlHandlers map[reflect.Value]controlHandler
// register a controlHandler
RegisterControlHandler chan controlHandler
// unregister a controlHandler
UnregisterControlHandler chan controlHandler
}
// initialize and run the mein websocket hub.
func init() {
WebsocketHub = &wsHub{
broadcast: make(chan wsMessage),
register: make(chan *wsClient),
rooms: make(map[string]*wsRoom),
unregister: make(chan *wsClient),
clients: make(map[*wsClient]bool),
runControl: make(chan WsControls),
controlHandlers: make(map[reflect.Value]controlHandler),
RegisterControlHandler: make(chan controlHandler),
UnregisterControlHandler: make(chan controlHandler),
}
go WebsocketHub.run()
}
// remove a client from this hub and all of its rooms
func (hub *wsHub) removeClient(client *wsClient) {
delete(hub.clients, client)
close(client.send)
for _, room := range hub.rooms {
room.unregister <- client
}
}
// run starts a websocket hub, this has to be done in a subroutine `go hub.run()`
func (hub *wsHub) run() {
for {
select {
case client := <-hub.register:
hub.clients[client] = true
case client := <-hub.unregister:
if _, ok := hub.clients[client]; ok {
hub.removeClient(client)
}
case message := <-hub.broadcast:
for client := range hub.clients {
select {
case client.send <- message:
default:
hub.removeClient(client)
}
}
case function := <-hub.RegisterControlHandler:
hub.controlHandlers[reflect.ValueOf(function)] = function
case function := <-hub.UnregisterControlHandler:
delete(hub.controlHandlers, reflect.ValueOf(function))
}
}
}
// Broadcast a message to all connected clients (only clients connected to this room).
func (hub *wsHub) Broadcast(message interface{}) {
hub.broadcast <- wsMessage{
RoomName: "",
Message: message,
}
}
// get a websocket room or create it, if it doesn't exist yet.
// Also starts the rooms subroutine `wsRoom.run()`
func (hub *wsHub) GetRoom(name string) *wsRoom {
if room, ok := hub.rooms[name]; ok {
return room
} else {
room := &wsRoom{
name: name,
hub: hub,
clients: make(map[*wsClient]bool),
register: make(chan *wsClient),
unregister: make(chan *wsClient),
send: make(chan wsMessage),
}
hub.rooms[name] = room
go room.run()
return room
}
}
// run starts a websocket room. This has to be run as a subroutine `go room.run()`
func (room *wsRoom) run() {
for {
select {
case client := <-room.register:
room.clients[client] = true
case client := <-room.unregister:
if _, ok := room.clients[client]; ok {
delete(room.clients, client)
if len(room.clients) == 0 {
// remove this room
delete(room.hub.rooms, room.name)
return
}
}
case message := <-room.send:
for client := range room.clients {
select {
case client.send <- message:
default:
room.unregister <- client
}
}
}
}
}
// Send a message into this room.
func (room *wsRoom) Send(message interface{}) {
room.send <- wsMessage{
RoomName: room.name,
Message: message,
}
}

View File

@ -1,58 +0,0 @@
package api
import (
"github.com/gorilla/websocket"
)
type Message struct {
Name string `json:"name"`
Data interface{} `json:"data"`
}
type FindHandler func(string) (Handler, bool)
type Client struct {
send chan Message
socket *websocket.Conn
findHandler FindHandler
stopChannels map[int]chan bool
id string
}
func (client *Client) Read() {
var message Message
for {
if err := client.socket.ReadJSON(&message); err != nil {
break
}
if handler, found := client.findHandler(message.Name); found {
handler(client, message.Data)
}
}
client.socket.Close()
}
func (client *Client) Write() {
for msg := range client.send {
if err := client.socket.WriteJSON(msg); err != nil {
break
}
}
client.socket.Close()
}
func (client *Client) Close() {
for _, ch := range client.stopChannels {
ch <- true
}
close(client.send)
}
func NewClient(socket *websocket.Conn, findHandler FindHandler) *Client {
return &Client{
send: make(chan Message),
socket: socket,
findHandler: findHandler,
stopChannels: make(map[int]chan bool),
}
}

View File

@ -1,91 +0,0 @@
package api
import (
"github.com/mroote/factorio-server-manager/bootstrap"
"github.com/mroote/factorio-server-manager/factorio"
"log"
"path/filepath"
"time"
"github.com/hpcloud/tail"
)
func IsClosed(ch <-chan Message) bool {
select {
case <-ch:
return true
default:
}
return false
}
func LogSubscribe(client *Client, data interface{}) {
go func() {
config := bootstrap.GetConfig()
logfile := filepath.Join(config.FactorioDir, "factorio-server-console.log")
t, err := tail.TailFile(logfile, tail.Config{Follow: true, Poll: true})
if err != nil {
log.Printf("Error subscribing to tail log %s", err)
return
}
for line := range t.Lines {
if !IsClosed(client.send) {
client.send <- Message{"log update", line.Text}
} else {
log.Printf("Channel was closed")
return
}
}
}()
}
func CommandSend(client *Client, data interface{}) {
server := factorio.GetFactorioServer()
if server.Running {
go func() {
log.Printf("Received command: %v", data)
reqId, err := server.Rcon.Write(data.(string))
if err != nil {
log.Printf("Error sending rcon command: %s", err)
return
}
log.Printf("Command send to Factorio: %s, with rcon request id: %v", data, reqId)
if !IsClosed(client.send) {
client.send <- Message{"receive command", data}
} else {
log.Printf("Channel was closed")
return
}
}()
}
}
func ServerStatusSubscribe(client *Client, data interface{}) {
var server = factorio.GetFactorioServer()
log.Printf("Subcribed to Server Status")
go func() {
isRunning := server.Running
// always check if status has changed
for {
if isRunning != server.Running {
isRunning = server.Running
log.Printf("Server Status has changed")
if IsClosed(client.send) {
log.Printf("Channel was closed")
return
}
client.send <- Message{"status update", "Server status has changed"}
}
time.Sleep(2 * time.Second)
}
}()
}

View File

@ -1,13 +1,12 @@
package factorio
import (
"github.com/hpcloud/tail"
"github.com/mroote/factorio-server-manager/bootstrap"
"log"
"github.com/hpcloud/tail"
)
func TailLog(filename string) ([]string, error) {
func TailLog() ([]string, error) {
result := []string{}
config := bootstrap.GetConfig()

View File

@ -3,6 +3,7 @@ package factorio
import (
"bufio"
"encoding/json"
"github.com/mroote/factorio-server-manager/api/websocket"
"github.com/mroote/factorio-server-manager/bootstrap"
"io"
"io/ioutil"
@ -24,7 +25,7 @@ type Server struct {
Latency int `json:"latency"`
BindIP string `json:"bindip"`
Port int `json:"port"`
Running bool `json:"running"`
running bool `json:"running"`
Version Version `json:"fac_version"`
BaseModVersion string `json:"base_mod_version"`
StdOut io.ReadCloser `json:"-"`
@ -38,8 +39,20 @@ type Server struct {
var instantiated Server
var once sync.Once
func (server *Server) autostart() {
func (server *Server) SetRunning(newState bool) {
if server.running != newState {
log.Println("new state, will also send to correct room")
server.running = newState
wsRoom := websocket.WebsocketHub.GetRoom("server_status")
wsRoom.Send("Server status has changed")
}
}
func (server *Server) GetRunning() bool {
return server.running
}
func (server *Server) autostart() {
var err error
if server.BindIP == "" {
server.BindIP = "0.0.0.0"
@ -193,6 +206,7 @@ func NewFactorioServer() (err error) {
SetFactorioServer(server)
// autostart factorio is configured to do so
if config.Autostart == "true" {
go instantiated.autostart()
}
@ -274,12 +288,12 @@ func (server *Server) Run() error {
log.Printf("Factorio process failed to start: %s", err)
return err
}
server.Running = true
server.SetRunning(true)
err = server.Cmd.Wait()
if err != nil {
log.Printf("Factorio process exited with error: %s", err)
server.Running = false
server.SetRunning(false)
return err
}
@ -294,6 +308,9 @@ func (server *Server) parseRunningCommand(std io.ReadCloser) (err error) {
log.Printf("Error: %s", err)
}
wsRoom := websocket.WebsocketHub.GetRoom("gamelog")
go wsRoom.Send(stdScanner.Text())
line := strings.Fields(stdScanner.Text())
// Ensure logline slice is in bounds
if len(line) > 1 {
@ -352,3 +369,27 @@ func (server *Server) checkLogError(logline []string) error {
return nil
}
func init() {
websocket.WebsocketHub.RegisterControlHandler <- serverWebsocketControl
}
// react to websocket control messages and run the command if it is requested
func serverWebsocketControl(controls websocket.WsControls) {
log.Println(controls)
if controls.Type == "command" {
command := controls.Value
server := GetFactorioServer()
if server.GetRunning() {
log.Printf("Received command: %v", command)
reqId, err := server.Rcon.Write(command)
if err != nil {
log.Printf("Error sending rcon command: %s", err)
return
}
log.Printf("Command send to Factorio: %s, with rcon request id: %v", command, reqId)
}
}
}

View File

@ -14,13 +14,13 @@ func (server *Server) Kill() error {
err := server.Cmd.Process.Signal(os.Kill)
if err != nil {
if err.Error() == "os: process already finished" {
server.Running = false
server.SetRunning(false)
return err
}
log.Printf("Error sending SIGKILL to Factorio process: %s", err)
return err
}
server.Running = false
server.SetRunning(false)
log.Printf("Sent SIGKILL to Factorio process. Factorio forced to exit.")
err = server.Rcon.Close()
@ -35,13 +35,13 @@ func (server *Server) Stop() error {
err := server.Cmd.Process.Signal(os.Interrupt)
if err != nil {
if err.Error() == "os: process already finished" {
server.Running = false
server.SetRunning(false)
return err
}
log.Printf("Error sending SIGINT to Factorio process: %s", err)
return err
}
server.Running = false
server.SetRunning(false)
log.Printf("Sent SIGINT to Factorio process. Factorio shutting down...")
err = server.Rcon.Close()

View File

@ -46,13 +46,13 @@ func (server *Server) Kill() error {
err := server.Cmd.Process.Signal(os.Kill)
if err != nil {
if err.Error() == "os: process already finished" {
server.Running = false
server.SetRunning(false)
return err
}
log.Printf("Error sending SIGKILL to Factorio process: %s", err)
return err
}
server.Running = false
server.SetRunning(false)
log.Println("Sent SIGKILL to Factorio process. Factorio forced to exit.")
return nil
@ -77,6 +77,6 @@ func (server *Server) Stop() error {
// Re-enable handling of CTRL+C after we're sure that the factrio server is shut down.
setCtrlHandlingIsDisabledForThisProcess(false)
server.Running = false
server.SetRunning(false)
return nil
}

View File

@ -26,8 +26,9 @@ func main() {
// Initialize authentication system
api.GetAuth()
// Initialize HTTP router
// Initialize HTTP router -- also initializes websocket
router := api.NewRouter()
log.Printf("Starting server on: %s:%s", config.ServerIP, config.ServerPort)
log.Fatal(http.ListenAndServe(config.ServerIP+":"+config.ServerPort, router))

View File

@ -39,7 +39,7 @@ const App = () => {
await updateServerStatus();
socket.emit('server status subscribe');
socket.on('status update', updateServerStatus)
socket.on('server_status', updateServerStatus)
}
},[]);

View File

@ -14,12 +14,13 @@ const Console = ({serverStatus}) => {
setLogs(lines => [...lines, line]);
}
socket.on('log update', appendLog)
socket.on('gamelog', appendLog)
socket.emit('log subscribe')
consoleInput.current?.focus();
return () => {
socket.off('log update', appendLog);
socket.off('gamelog', appendLog);
socket.emit("log unsubscribe")
}
}, []);

View File

@ -6,20 +6,64 @@ const socket = new WebSocket(ws_scheme + "://" + window.location.host + "/ws");
const bus = new EventEmitter();
bus.on('log subscribe', () => {
socket.send(JSON.stringify({name: 'log subscribe'}));
socket.send(
JSON.stringify(
{
room_name: "",
controls: {
type: "subscribe",
value: "gamelog"
}
}
)
);
});
bus.on('log unsubscribe', () => {
socket.send(
JSON.stringify(
{
room_name: "",
controls: {
type: "unsubscribe",
value: "gamelog"
}
}
)
);
})
bus.on('server status subscribe', () => {
socket.send(JSON.stringify({name: 'server status subscribe'}));
socket.send(
JSON.stringify(
{
room_name: "",
controls: {
type: "subscribe",
value: "server_status"
}
}
)
);
});
bus.on('command send', command => {
socket.send(JSON.stringify({name: 'command send', data: command}));
socket.send(
JSON.stringify(
{
room_name: "",
controls: {
type: "command",
value: command
}
}
)
);
});
socket.onmessage = e => {
const {name, data} = JSON.parse(e.data)
bus.emit(name, data);
const {room_name, message} = JSON.parse(e.data);
bus.emit(room_name, message);
}
export default bus;