mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-12 08:23:58 +02:00
132 lines
2.3 KiB
Go
132 lines
2.3 KiB
Go
// Package socket provides a pseudo socket
|
|
package socket
|
|
|
|
import (
|
|
"io"
|
|
|
|
"github.com/micro/go-micro/transport"
|
|
)
|
|
|
|
// Socket is our pseudo socket for transport.Socket
|
|
type Socket struct {
|
|
id string
|
|
// closed
|
|
closed chan bool
|
|
// remote addr
|
|
remote string
|
|
// local addr
|
|
local string
|
|
// send chan
|
|
send chan *transport.Message
|
|
// recv chan
|
|
recv chan *transport.Message
|
|
}
|
|
|
|
func (s *Socket) SetLocal(l string) {
|
|
s.local = l
|
|
}
|
|
|
|
func (s *Socket) SetRemote(r string) {
|
|
s.remote = r
|
|
}
|
|
|
|
// Accept passes a message to the socket which will be processed by the call to Recv
|
|
func (s *Socket) Accept(m *transport.Message) error {
|
|
select {
|
|
case s.recv <- m:
|
|
return nil
|
|
case <-s.closed:
|
|
return io.EOF
|
|
}
|
|
}
|
|
|
|
// Process takes the next message off the send queue created by a call to Send
|
|
func (s *Socket) Process(m *transport.Message) error {
|
|
select {
|
|
case msg := <-s.send:
|
|
*m = *msg
|
|
case <-s.closed:
|
|
// see if we need to drain
|
|
select {
|
|
case msg := <-s.send:
|
|
*m = *msg
|
|
return nil
|
|
default:
|
|
return io.EOF
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (s *Socket) Remote() string {
|
|
return s.remote
|
|
}
|
|
|
|
func (s *Socket) Local() string {
|
|
return s.local
|
|
}
|
|
|
|
func (s *Socket) Send(m *transport.Message) error {
|
|
// make copy
|
|
msg := &transport.Message{
|
|
Header: make(map[string]string),
|
|
Body: make([]byte, len(m.Body)),
|
|
}
|
|
|
|
// copy headers
|
|
for k, v := range m.Header {
|
|
msg.Header[k] = v
|
|
}
|
|
|
|
// copy body
|
|
copy(msg.Body, m.Body)
|
|
|
|
// send a message
|
|
select {
|
|
case s.send <- msg:
|
|
case <-s.closed:
|
|
return io.EOF
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *Socket) Recv(m *transport.Message) error {
|
|
// receive a message
|
|
select {
|
|
case msg := <-s.recv:
|
|
// set message
|
|
*m = *msg
|
|
case <-s.closed:
|
|
return io.EOF
|
|
}
|
|
|
|
// return nil
|
|
return nil
|
|
}
|
|
|
|
// Close closes the socket
|
|
func (s *Socket) Close() error {
|
|
select {
|
|
case <-s.closed:
|
|
// no op
|
|
default:
|
|
close(s.closed)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// New returns a new pseudo socket which can be used in the place of a transport socket.
|
|
// Messages are sent to the socket via Accept and receives from the socket via Process.
|
|
// SetLocal/SetRemote should be called before using the socket.
|
|
func New(id string) *Socket {
|
|
return &Socket{
|
|
id: id,
|
|
closed: make(chan bool),
|
|
local: "local",
|
|
remote: "remote",
|
|
send: make(chan *transport.Message, 128),
|
|
recv: make(chan *transport.Message, 128),
|
|
}
|
|
}
|