1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-08-04 21:42:57 +02:00

fix natsjs syntax error, remove TODOs and enable tests (#2446)

This commit is contained in:
Willy Kloucek
2022-03-11 09:03:35 +01:00
committed by GitHub
parent a2f6fac852
commit e5a35d38f9
2 changed files with 3 additions and 17 deletions

View File

@ -16,8 +16,7 @@ import (
)
const (
defaultClusterID = "micro"
reconnectLoopDuration = 10 * time.Second
defaultClusterID = "micro"
)
// NewStream returns an initialized nats stream or an error if the connection to the nats
@ -111,13 +110,8 @@ func (s *stream) Publish(topic string, msg interface{}, opts ...events.PublishOp
return errors.Wrap(err, "Error encoding event")
}
pubOpts := []nats.PubOpt{
// TODO: to make de-duplication work, we need to pass the event from the outside as an option
// nats.MsgId(event.ID), // event de-duplication
}
// publish the event to the topic's channel
if _, err := s.natsJetStreamCtx.PublishAsync(event.Topic, bytes, pubOpts...); err != nil {
if _, err := s.natsJetStreamCtx.PublishAsync(event.Topic, bytes); err != nil {
return errors.Wrap(err, "Error publishing message to topic")
}
@ -146,14 +140,6 @@ func (s *stream) Consume(topic string, opts ...events.ConsumeOption) (<-chan eve
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()
// TODO: not supported by go-micro interface
// would need a event.InProgressFunc{} to be
// called periodically
//err := m.InProgress(nats.Context(ctx))
//if err != nil {
// return
}
// decode the message
var evt events.Event
if err := json.Unmarshal(m.Data, &evt); err != nil {

View File

@ -19,7 +19,7 @@ type Payload struct {
Name string
}
func SingleEvent(t *testing.T) {
func TestSingleEvent(t *testing.T) {
ctx, cancel := context.WithCancel(context.TODO())
defer cancel()