mirror of
https://github.com/go-micro/go-micro.git
synced 2025-01-23 17:53:05 +02:00
269 lines
4.6 KiB
Go
269 lines
4.6 KiB
Go
// Package broker provides a go-micro/broker handler
|
|
package broker
|
|
|
|
import (
|
|
"encoding/json"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"net/url"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/gorilla/websocket"
|
|
"github.com/micro/go-micro/api/handler"
|
|
"github.com/micro/go-micro/broker"
|
|
"github.com/micro/go-micro/util/log"
|
|
)
|
|
|
|
const (
|
|
Handler = "broker"
|
|
|
|
pingTime = (readDeadline * 9) / 10
|
|
readLimit = 16384
|
|
readDeadline = 60 * time.Second
|
|
writeDeadline = 10 * time.Second
|
|
)
|
|
|
|
type brokerHandler struct {
|
|
opts handler.Options
|
|
u websocket.Upgrader
|
|
}
|
|
|
|
type conn struct {
|
|
b broker.Broker
|
|
cType string
|
|
topic string
|
|
queue string
|
|
exit chan bool
|
|
|
|
sync.Mutex
|
|
ws *websocket.Conn
|
|
}
|
|
|
|
var (
|
|
once sync.Once
|
|
contentType = "text/plain"
|
|
)
|
|
|
|
func checkOrigin(r *http.Request) bool {
|
|
origin := r.Header["Origin"]
|
|
if len(origin) == 0 {
|
|
return true
|
|
}
|
|
u, err := url.Parse(origin[0])
|
|
if err != nil {
|
|
return false
|
|
}
|
|
return u.Host == r.Host
|
|
}
|
|
|
|
func (c *conn) close() {
|
|
select {
|
|
case <-c.exit:
|
|
return
|
|
default:
|
|
close(c.exit)
|
|
}
|
|
}
|
|
|
|
func (c *conn) readLoop() {
|
|
defer func() {
|
|
c.close()
|
|
c.ws.Close()
|
|
}()
|
|
|
|
// set read limit/deadline
|
|
c.ws.SetReadLimit(readLimit)
|
|
c.ws.SetReadDeadline(time.Now().Add(readDeadline))
|
|
|
|
// set close handler
|
|
ch := c.ws.CloseHandler()
|
|
c.ws.SetCloseHandler(func(code int, text string) error {
|
|
err := ch(code, text)
|
|
c.close()
|
|
return err
|
|
})
|
|
|
|
// set pong handler
|
|
c.ws.SetPongHandler(func(string) error {
|
|
c.ws.SetReadDeadline(time.Now().Add(readDeadline))
|
|
return nil
|
|
})
|
|
|
|
for {
|
|
_, message, err := c.ws.ReadMessage()
|
|
if err != nil {
|
|
return
|
|
}
|
|
c.b.Publish(c.topic, &broker.Message{
|
|
Header: map[string]string{"Content-Type": c.cType},
|
|
Body: message,
|
|
})
|
|
}
|
|
}
|
|
|
|
func (c *conn) write(mType int, data []byte) error {
|
|
c.Lock()
|
|
c.ws.SetWriteDeadline(time.Now().Add(writeDeadline))
|
|
err := c.ws.WriteMessage(mType, data)
|
|
c.Unlock()
|
|
return err
|
|
}
|
|
|
|
func (c *conn) writeLoop() {
|
|
ticker := time.NewTicker(pingTime)
|
|
|
|
var opts []broker.SubscribeOption
|
|
|
|
if len(c.queue) > 0 {
|
|
opts = append(opts, broker.Queue(c.queue))
|
|
}
|
|
|
|
subscriber, err := c.b.Subscribe(c.topic, func(p broker.Event) error {
|
|
b, err := json.Marshal(p.Message())
|
|
if err != nil {
|
|
return nil
|
|
}
|
|
return c.write(websocket.TextMessage, b)
|
|
}, opts...)
|
|
|
|
defer func() {
|
|
subscriber.Unsubscribe()
|
|
ticker.Stop()
|
|
c.ws.Close()
|
|
}()
|
|
|
|
if err != nil {
|
|
log.Log(err.Error())
|
|
return
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
if err := c.write(websocket.PingMessage, []byte{}); err != nil {
|
|
return
|
|
}
|
|
case <-c.exit:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (b *brokerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
|
|
br := b.opts.Service.Client().Options().Broker
|
|
|
|
// Setup the broker
|
|
once.Do(func() {
|
|
br.Init()
|
|
br.Connect()
|
|
})
|
|
|
|
// Parse
|
|
r.ParseForm()
|
|
topic := r.Form.Get("topic")
|
|
|
|
// Can't do anything without a topic
|
|
if len(topic) == 0 {
|
|
http.Error(w, "Topic not specified", 400)
|
|
return
|
|
}
|
|
|
|
// Post assumed to be Publish
|
|
if r.Method == "POST" {
|
|
// Create a broker message
|
|
msg := &broker.Message{
|
|
Header: make(map[string]string),
|
|
}
|
|
|
|
// Set header
|
|
for k, v := range r.Header {
|
|
msg.Header[k] = strings.Join(v, ", ")
|
|
}
|
|
|
|
// Read body
|
|
b, err := ioutil.ReadAll(r.Body)
|
|
if err != nil {
|
|
http.Error(w, err.Error(), 500)
|
|
return
|
|
}
|
|
|
|
// Set body
|
|
msg.Body = b
|
|
|
|
// Publish
|
|
br.Publish(topic, msg)
|
|
return
|
|
}
|
|
|
|
// now back to our regularly scheduled programming
|
|
|
|
if r.Method != "GET" {
|
|
http.Error(w, "Method not allowed", 405)
|
|
return
|
|
}
|
|
|
|
queue := r.Form.Get("queue")
|
|
|
|
ws, err := b.u.Upgrade(w, r, nil)
|
|
if err != nil {
|
|
log.Log(err.Error())
|
|
return
|
|
}
|
|
|
|
cType := r.Header.Get("Content-Type")
|
|
if len(cType) == 0 {
|
|
cType = contentType
|
|
}
|
|
|
|
c := &conn{
|
|
b: br,
|
|
cType: cType,
|
|
topic: topic,
|
|
queue: queue,
|
|
exit: make(chan bool),
|
|
ws: ws,
|
|
}
|
|
|
|
go c.writeLoop()
|
|
c.readLoop()
|
|
}
|
|
|
|
func (b *brokerHandler) String() string {
|
|
return "broker"
|
|
}
|
|
|
|
func NewHandler(opts ...handler.Option) handler.Handler {
|
|
return &brokerHandler{
|
|
u: websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
return true
|
|
},
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
},
|
|
opts: handler.NewOptions(opts...),
|
|
}
|
|
}
|
|
|
|
func WithCors(cors map[string]bool, opts ...handler.Option) handler.Handler {
|
|
return &brokerHandler{
|
|
u: websocket.Upgrader{
|
|
CheckOrigin: func(r *http.Request) bool {
|
|
if origin := r.Header.Get("Origin"); cors[origin] {
|
|
return true
|
|
} else if len(origin) > 0 && cors["*"] {
|
|
return true
|
|
} else if checkOrigin(r) {
|
|
return true
|
|
}
|
|
return false
|
|
},
|
|
ReadBufferSize: 1024,
|
|
WriteBufferSize: 1024,
|
|
},
|
|
opts: handler.NewOptions(opts...),
|
|
}
|
|
}
|