1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-12-24 10:07:04 +02:00

broker memory: fix issue with publish/subscribe

mutex locking have errors, so when two service (one pub, other sub)
try to use this broker it waits for mutex release and nothing works

Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
This commit is contained in:
Vasiliy Tolstov 2019-07-12 23:47:37 +03:00
parent 3ccb900bca
commit 008749b2b0

View File

@ -72,14 +72,14 @@ func (m *memoryBroker) Init(opts ...broker.Option) error {
}
func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...broker.PublishOption) error {
m.Lock()
defer m.Unlock()
m.RLock()
if !m.connected {
m.RUnlock()
return errors.New("not connected")
}
subs, ok := m.Subscribers[topic]
m.RUnlock()
if !ok {
return nil
}
@ -99,12 +99,12 @@ func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...br
}
func (m *memoryBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) {
m.Lock()
defer m.Unlock()
m.RLock()
if !m.connected {
m.RUnlock()
return nil, errors.New("not connected")
}
m.RUnlock()
var options broker.SubscribeOptions
for _, o := range opts {
@ -119,7 +119,9 @@ func (m *memoryBroker) Subscribe(topic string, handler broker.Handler, opts ...b
opts: options,
}
m.Lock()
m.Subscribers[topic] = append(m.Subscribers[topic], sub)
m.Unlock()
go func() {
<-sub.exit