diff --git a/broker/memory/memory.go b/broker/memory/memory.go index fb077faa..5ace9581 100644 --- a/broker/memory/memory.go +++ b/broker/memory/memory.go @@ -72,14 +72,14 @@ func (m *memoryBroker) Init(opts ...broker.Option) error { } func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...broker.PublishOption) error { - m.Lock() - defer m.Unlock() - + m.RLock() if !m.connected { + m.RUnlock() return errors.New("not connected") } subs, ok := m.Subscribers[topic] + m.RUnlock() if !ok { return nil } @@ -99,12 +99,12 @@ func (m *memoryBroker) Publish(topic string, message *broker.Message, opts ...br } func (m *memoryBroker) Subscribe(topic string, handler broker.Handler, opts ...broker.SubscribeOption) (broker.Subscriber, error) { - m.Lock() - defer m.Unlock() - + m.RLock() if !m.connected { + m.RUnlock() return nil, errors.New("not connected") } + m.RUnlock() var options broker.SubscribeOptions for _, o := range opts { @@ -119,7 +119,9 @@ func (m *memoryBroker) Subscribe(topic string, handler broker.Handler, opts ...b opts: options, } + m.Lock() m.Subscribers[topic] = append(m.Subscribers[topic], sub) + m.Unlock() go func() { <-sub.exit