diff --git a/plugins/events/natsjs/nats.go b/plugins/events/natsjs/nats.go index 8c03ee44..914032f6 100644 --- a/plugins/events/natsjs/nats.go +++ b/plugins/events/natsjs/nats.go @@ -16,8 +16,7 @@ import ( ) const ( - defaultClusterID = "micro" - reconnectLoopDuration = 10 * time.Second + defaultClusterID = "micro" ) // NewStream returns an initialized nats stream or an error if the connection to the nats @@ -111,13 +110,8 @@ func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOp return errors.Wrap(err, "Error encoding event") } - pubOpts := []nats.PubOpt{ - // TODO: to make de-duplication work, we need to pass the event from the outside as an option - // nats.MsgId(event.ID), // event de-duplication - } - // publish the event to the topic's channel - if _, err := s.natsJetStreamCtx.PublishAsync(event.Topic, bytes, pubOpts...); err != nil { + if _, err := s.natsJetStreamCtx.PublishAsync(event.Topic, bytes); err != nil { return errors.Wrap(err, "Error publishing message to topic") } @@ -146,14 +140,6 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve ctx, cancel := context.WithCancel(context.TODO()) defer cancel() - // TODO: not supported by go-micro interface - // would need a event.InProgressFunc{} to be - // called periodically - //err := m.InProgress(nats.Context(ctx)) - //if err != nil { - // return - } - // decode the message var evt events.Event if err := json.Unmarshal(m.Data, &evt); err != nil { diff --git a/plugins/events/natsjs/nats_test.go b/plugins/events/natsjs/nats_test.go index 6940f531..014a4444 100644 --- a/plugins/events/natsjs/nats_test.go +++ b/plugins/events/natsjs/nats_test.go @@ -19,7 +19,7 @@ type Payload struct { Name string } -func SingleEvent(t *testing.T) { +func TestSingleEvent(t *testing.T) { ctx, cancel := context.WithCancel(context.TODO()) defer cancel()