1
0
mirror of https://github.com/MontFerret/ferret.git synced 2025-02-13 13:58:32 +02:00

Simplified event.Loop API

This commit is contained in:
Tim Voronov 2021-10-23 21:16:50 -04:00
parent 7384d3466d
commit 06b17394ed
3 changed files with 30 additions and 85 deletions

View File

@ -20,7 +20,7 @@ func NewLoop(sources ...SourceFactory) *Loop {
return loop
}
func (loop *Loop) Run(ctx context.Context) (context.CancelFunc, error) {
func (loop *Loop) Run(ctx context.Context) error {
var err error
sources := make([]Source, 0, len(loop.sources))
@ -44,16 +44,14 @@ func (loop *Loop) Run(ctx context.Context) (context.CancelFunc, error) {
src.Close()
}
return nil, err
return err
}
ctx, cancel := context.WithCancel(ctx)
for _, src := range sources {
loop.consume(ctx, src)
}
return cancel, nil
return nil
}
func (loop *Loop) Listeners(eventID ID) int {
@ -137,23 +135,31 @@ func (loop *Loop) consume(ctx context.Context, src Source) {
func (loop *Loop) emit(ctx context.Context, eventID ID, message interface{}) {
loop.mu.Lock()
defer loop.mu.Unlock()
var snapshot []Listener
listeners, exist := loop.listeners[eventID]
if !exist {
return
if exist {
snapshot = make([]Listener, 0, len(listeners))
for _, listener := range listeners {
snapshot = append(snapshot, listener)
}
}
for _, listener := range listeners {
select {
case <-ctx.Done():
loop.mu.Unlock()
for _, listener := range snapshot {
if isCtxDone(ctx) {
return
default:
// if returned false, it means the loops should not call the handler anymore
if !listener.Handler(ctx, message) {
delete(listeners, listener.ID)
}
}
// if returned false,
// the handler must be removed after the call
if !listener.Handler(ctx, message) {
loop.mu.Lock()
delete(loop.listeners[eventID], listener.ID)
loop.mu.Unlock()
}
}
}

View File

@ -133,7 +133,7 @@ func TestLoop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, err := loop.Run(ctx)
err := loop.Run(ctx)
defer cancel()
So(err, ShouldBeNil)
@ -178,11 +178,10 @@ func TestLoop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
c, err := loop.Run(ctx)
err := loop.Run(ctx)
defer cancel()
So(err, ShouldBeNil)
So(c, ShouldHaveSameTypeAs, cancel)
test.EmitDefault()
@ -233,7 +232,7 @@ func TestLoop(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
_, err := loop.Run(ctx)
err := loop.Run(ctx)
So(err, ShouldBeNil)
defer cancel()
@ -264,41 +263,7 @@ func TestLoop(t *testing.T) {
}))
ctx, cancel := context.WithCancel(context.Background())
_, err := loop.Run(ctx)
So(err, ShouldBeNil)
for i := 0; i <= eventsToFire; i++ {
time.Sleep(time.Duration(100) * time.Millisecond)
tes.EmitDefault()
}
// Stop the loop
cancel()
time.Sleep(time.Duration(100) * time.Millisecond)
So(tes.IsClosed(), ShouldBeTrue)
})
Convey("Should stop on nested Context.Done", t, func() {
eventsToFire := 5
counter := NewCounter()
var tes *TestEventStream
loop := events.NewLoop(events.NewStreamSourceFactory(TestEvent, func(ctx context.Context) (rpcc.Stream, error) {
tes = NewTestEventStream()
return tes, nil
}, func(stream rpcc.Stream) (interface{}, error) {
return stream.(*TestEventStream).Recv()
}))
loop.AddListener(TestEvent, events.Always(func(ctx context.Context, message interface{}) {
counter.Increase()
}))
cancel, err := loop.Run(context.Background())
err := loop.Run(ctx)
So(err, ShouldBeNil)
for i := 0; i <= eventsToFire; i++ {
@ -385,9 +350,7 @@ func BenchmarkLoop_Start(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
_, err := loop.Run(ctx)
if err != nil {
if err := loop.Run(ctx); err != nil {
panic(err)
}
@ -433,9 +396,7 @@ func BenchmarkLoop_StartAsync(b *testing.B) {
ctx, cancel := context.WithCancel(context.Background())
_, err := loop.Run(ctx)
if err != nil {
if err := loop.Run(ctx); err != nil {
panic(err)
}

View File

@ -35,9 +35,7 @@ type (
client *cdp.Client
headers *drivers.HTTPHeaders
foregroundLoop *events.Loop
stopForegroundLoop context.CancelFunc
backgroundLoop *events.Loop
stopBackgroundLoop context.CancelFunc
cancel context.CancelFunc
responseListenerID events.ListenerID
filterListenerID events.ListenerID
@ -64,14 +62,6 @@ func New(
defer func() {
if err != nil {
m.cancel()
if m.stopForegroundLoop != nil {
m.stopForegroundLoop()
}
if m.stopBackgroundLoop != nil {
m.stopBackgroundLoop()
}
}
}()
@ -124,24 +114,20 @@ func New(
}
}
cancel, err = m.foregroundLoop.Run(ctx)
err = m.foregroundLoop.Run(ctx)
if err != nil {
return nil, err
}
m.stopForegroundLoop = cancel
if m.backgroundLoop != nil {
// run in a separate loop in order to get higher priority
// TODO: Consider adding support of event priorities to EventLoop
cancel, err = m.backgroundLoop.Run(ctx)
err = m.backgroundLoop.Run(ctx)
if err != nil {
return nil, err
}
m.stopBackgroundLoop = cancel
}
return m, nil
@ -158,14 +144,6 @@ func (m *Manager) Close() error {
m.cancel = nil
}
if m.stopForegroundLoop != nil {
m.stopForegroundLoop()
}
if m.stopBackgroundLoop != nil {
m.stopBackgroundLoop()
}
return nil
}