From 1d6a23fa967643a737cd052234d480052d3ec2d9 Mon Sep 17 00:00:00 2001 From: Tim Voronov Date: Mon, 15 Oct 2018 17:17:15 -0400 Subject: [PATCH] Refactoring/new event broker (#127) * Refactored EventBroker * Improved event loop cancellation --- pkg/html/dynamic/document.go | 8 +- pkg/html/dynamic/element.go | 59 ++--- pkg/html/dynamic/events/broker.go | 261 +++++++++++++------- pkg/html/dynamic/events/broker_test.go | 316 +++++++++++++++++++++++++ pkg/html/dynamic/helpers.go | 161 +++++++------ pkg/stdlib/strings/lib.go | 2 +- 6 files changed, 605 insertions(+), 202 deletions(-) create mode 100644 pkg/html/dynamic/events/broker_test.go diff --git a/pkg/html/dynamic/document.go b/pkg/html/dynamic/document.go index 9b4426d8..6e8a1ef8 100644 --- a/pkg/html/dynamic/document.go +++ b/pkg/html/dynamic/document.go @@ -132,8 +132,8 @@ func NewHTMLDocument( doc.url = url doc.element = rootElement - broker.AddEventListener("load", doc.handlePageLoad) - broker.AddEventListener("error", doc.handleError) + broker.AddEventListener(events.EventLoad, doc.handlePageLoad) + broker.AddEventListener(events.EventError, doc.handleError) return doc } @@ -584,9 +584,9 @@ func (doc *HTMLDocument) WaitForNavigation(timeout values.Int) error { close(onEvent) } - defer doc.events.RemoveEventListener("load", listener) + defer doc.events.RemoveEventListener(events.EventLoad, listener) - doc.events.AddEventListener("load", listener) + doc.events.AddEventListener(events.EventLoad, listener) select { case <-onEvent: diff --git a/pkg/html/dynamic/element.go b/pkg/html/dynamic/element.go index 5a415f3d..6e296f00 100644 --- a/pkg/html/dynamic/element.go +++ b/pkg/html/dynamic/element.go @@ -36,7 +36,7 @@ type ( } HTMLElement struct { - sync.Mutex + mu sync.Mutex logger *zerolog.Logger client *cdp.Client events *events.EventBroker @@ -164,32 +164,32 @@ func NewHTMLElement( el.value = values.NewString(value) el.children = children - broker.AddEventListener("reload", el.handlePageReload) - broker.AddEventListener("attr:modified", el.handleAttrModified) - broker.AddEventListener("attr:removed", el.handleAttrRemoved) - broker.AddEventListener("children:count", el.handleChildrenCountChanged) - broker.AddEventListener("children:inserted", el.handleChildInserted) - broker.AddEventListener("children:deleted", el.handleChildDeleted) + broker.AddEventListener(events.EventReload, el.handlePageReload) + broker.AddEventListener(events.EventAttrModified, el.handleAttrModified) + broker.AddEventListener(events.EventAttrRemoved, el.handleAttrRemoved) + broker.AddEventListener(events.EventChildNodeCountUpdated, el.handleChildrenCountChanged) + broker.AddEventListener(events.EventChildNodeInserted, el.handleChildInserted) + broker.AddEventListener(events.EventChildNodeRemoved, el.handleChildRemoved) return el } func (el *HTMLElement) Close() error { - el.Lock() - defer el.Unlock() + el.mu.Lock() + defer el.mu.Unlock() // already closed if !el.connected { return nil } - el.connected = false - el.events.RemoveEventListener("reload", el.handlePageReload) - el.events.RemoveEventListener("attr:modified", el.handleAttrModified) - el.events.RemoveEventListener("attr:removed", el.handleAttrRemoved) - el.events.RemoveEventListener("children:count", el.handleChildrenCountChanged) - el.events.RemoveEventListener("children:inserted", el.handleChildInserted) - el.events.RemoveEventListener("children:deleted", el.handleChildDeleted) + el.connected = values.False + el.events.RemoveEventListener(events.EventReload, el.handlePageReload) + el.events.RemoveEventListener(events.EventAttrModified, el.handleAttrModified) + el.events.RemoveEventListener(events.EventAttrRemoved, el.handleAttrRemoved) + el.events.RemoveEventListener(events.EventChildNodeCountUpdated, el.handleChildrenCountChanged) + el.events.RemoveEventListener(events.EventChildNodeInserted, el.handleChildInserted) + el.events.RemoveEventListener(events.EventChildNodeRemoved, el.handleChildRemoved) return nil } @@ -243,8 +243,8 @@ func (el *HTMLElement) Unwrap() interface{} { } func (el *HTMLElement) Hash() uint64 { - el.Lock() - defer el.Unlock() + el.mu.Lock() + defer el.mu.Unlock() h := fnv.New64a() @@ -330,6 +330,7 @@ func (el *HTMLElement) GetChildNodes() core.Value { } func (el *HTMLElement) GetChildNode(idx values.Int) core.Value { + // TODO: Add lazy loading val, err := el.loadedChildren.Read() if err != nil { @@ -603,8 +604,8 @@ func (el *HTMLElement) InnerTextBySelectorAll(selector values.String) *values.Ar } func (el *HTMLElement) InnerHTML() values.String { - el.Lock() - defer el.Unlock() + el.mu.Lock() + defer el.mu.Unlock() return el.innerHTML } @@ -746,8 +747,8 @@ func (el *HTMLElement) Input(value core.Value, delay values.Int) error { } func (el *HTMLElement) IsConnected() values.Boolean { - el.Lock() - defer el.Unlock() + el.mu.Lock() + defer el.mu.Unlock() return el.connected } @@ -916,8 +917,8 @@ func (el *HTMLElement) handleChildrenCountChanged(message interface{}) { return } - el.Lock() - defer el.Unlock() + el.mu.Lock() + defer el.mu.Unlock() el.children = createChildrenArray(node.Node.Children) } @@ -937,8 +938,8 @@ func (el *HTMLElement) handleChildInserted(message interface{}) { prevID := reply.PreviousNodeID nextID := reply.Node.NodeID - el.Lock() - defer el.Unlock() + el.mu.Lock() + defer el.mu.Unlock() for idx, id := range el.children { if id.nodeID == prevID { @@ -991,7 +992,7 @@ func (el *HTMLElement) handleChildInserted(message interface{}) { }) } -func (el *HTMLElement) handleChildDeleted(message interface{}) { +func (el *HTMLElement) handleChildRemoved(message interface{}) { reply, ok := message.(*dom.ChildNodeRemovedReply) if !ok { @@ -1005,8 +1006,8 @@ func (el *HTMLElement) handleChildDeleted(message interface{}) { targetIDx := -1 targetID := reply.NodeID - el.Lock() - defer el.Unlock() + el.mu.Lock() + defer el.mu.Unlock() for idx, id := range el.children { if id.nodeID == targetID { diff --git a/pkg/html/dynamic/events/broker.go b/pkg/html/dynamic/events/broker.go index 6f3e68a4..b0b80e84 100644 --- a/pkg/html/dynamic/events/broker.go +++ b/pkg/html/dynamic/events/broker.go @@ -3,54 +3,68 @@ package events import ( "context" "github.com/MontFerret/ferret/pkg/runtime/core" - "github.com/mafredri/cdp/rpcc" + "github.com/mafredri/cdp/protocol/dom" + "github.com/mafredri/cdp/protocol/page" "reflect" "sync" - "time" ) type ( - MessageFactory func() interface{} - EventStream struct { - stream rpcc.Stream - message MessageFactory - } + Event int + EventListener func(message interface{}) EventBroker struct { - sync.Mutex - events map[string]*EventStream - listeners map[string][]EventListener - cancel context.CancelFunc + mu sync.Mutex + listeners map[Event][]EventListener + cancel context.CancelFunc + onLoad page.LoadEventFiredClient + onReload dom.DocumentUpdatedClient + onAttrModified dom.AttributeModifiedClient + onAttrRemoved dom.AttributeRemovedClient + onChildNodeCountUpdated dom.ChildNodeCountUpdatedClient + onChildNodeInserted dom.ChildNodeInsertedClient + onChildNodeRemoved dom.ChildNodeRemovedClient } ) -func NewEventBroker() *EventBroker { +var ( + //revive:disable-next-line var-declaration + EventError Event = 0 + EventLoad Event = 1 + EventReload Event = 2 + EventAttrModified Event = 3 + EventAttrRemoved Event = 4 + EventChildNodeCountUpdated Event = 5 + EventChildNodeInserted Event = 6 + EventChildNodeRemoved Event = 7 +) + +func NewEventBroker( + onLoad page.LoadEventFiredClient, + onReload dom.DocumentUpdatedClient, + onAttrModified dom.AttributeModifiedClient, + onAttrRemoved dom.AttributeRemovedClient, + onChildNodeCountUpdated dom.ChildNodeCountUpdatedClient, + onChildNodeInserted dom.ChildNodeInsertedClient, + onChildNodeRemoved dom.ChildNodeRemovedClient, +) *EventBroker { broker := new(EventBroker) - broker.events = make(map[string]*EventStream) - broker.listeners = make(map[string][]EventListener) + broker.listeners = make(map[Event][]EventListener) + broker.onLoad = onLoad + broker.onReload = onReload + broker.onAttrModified = onAttrModified + broker.onAttrRemoved = onAttrRemoved + broker.onChildNodeCountUpdated = onChildNodeCountUpdated + broker.onChildNodeInserted = onChildNodeInserted + broker.onChildNodeRemoved = onChildNodeRemoved return broker } -func (broker *EventBroker) AddEventStream(name string, stream rpcc.Stream, msg MessageFactory) error { - broker.Lock() - defer broker.Unlock() - - _, exists := broker.events[name] - - if exists { - return core.Error(core.ErrNotUnique, name) - } - - broker.events[name] = &EventStream{stream, msg} - - return nil -} - -func (broker *EventBroker) AddEventListener(event string, listener EventListener) { - broker.Lock() - defer broker.Unlock() +func (broker *EventBroker) AddEventListener(event Event, listener EventListener) { + broker.mu.Lock() + defer broker.mu.Unlock() listeners, ok := broker.listeners[event] @@ -61,9 +75,9 @@ func (broker *EventBroker) AddEventListener(event string, listener EventListener broker.listeners[event] = append(listeners, listener) } -func (broker *EventBroker) RemoveEventListener(event string, listener EventListener) { - broker.Lock() - defer broker.Unlock() +func (broker *EventBroker) RemoveEventListener(event Event, listener EventListener) { + broker.mu.Lock() + defer broker.mu.Unlock() idx := -1 @@ -91,14 +105,29 @@ func (broker *EventBroker) RemoveEventListener(event string, listener EventListe if len(listeners) > 1 { modifiedListeners = append(listeners[:idx], listeners[idx+1:]...) + } else { + modifiedListeners = make([]EventListener, 0, 5) } broker.listeners[event] = modifiedListeners } +func (broker *EventBroker) ListenerCount(event Event) int { + broker.mu.Lock() + defer broker.mu.Unlock() + + listeners, ok := broker.listeners[event] + + if !ok { + return 0 + } + + return len(listeners) +} + func (broker *EventBroker) Start() error { - broker.Lock() - defer broker.Unlock() + broker.mu.Lock() + defer broker.mu.Unlock() if broker.cancel != nil { return core.Error(core.ErrInvalidOperation, "broker is already started") @@ -108,89 +137,141 @@ func (broker *EventBroker) Start() error { broker.cancel = cancel - go func() { - counter := 0 - eventsCount := len(broker.events) - - for { - for name, event := range broker.events { - counter++ - - select { - case <-ctx.Done(): - return - case <-event.stream.Ready(): - msg := event.message() - err := event.stream.RecvMsg(msg) - - if err != nil { - broker.emit("error", err) - - return - } - - broker.emit(name, msg) - default: - // we have iterated over all events - // lets pause - if counter == eventsCount { - counter = 0 - time.Sleep(DefaultPolling) - } - - continue - } - } - } - }() + go broker.runLoop(ctx) return nil } func (broker *EventBroker) Stop() error { - broker.Lock() - defer broker.Unlock() + broker.mu.Lock() + defer broker.mu.Unlock() if broker.cancel == nil { return core.Error(core.ErrInvalidOperation, "broker is already stopped") } broker.cancel() + broker.cancel = nil return nil } func (broker *EventBroker) Close() error { - broker.Lock() - defer broker.Unlock() + broker.mu.Lock() + defer broker.mu.Unlock() if broker.cancel != nil { broker.cancel() + broker.cancel = nil } - for _, event := range broker.events { - event.stream.Close() - } + broker.onLoad.Close() + broker.onReload.Close() + broker.onAttrModified.Close() + broker.onAttrRemoved.Close() + broker.onChildNodeCountUpdated.Close() + broker.onChildNodeInserted.Close() + broker.onChildNodeRemoved.Close() return nil } -func (broker *EventBroker) emit(name string, message interface{}) { - broker.Lock() +func (broker *EventBroker) runLoop(ctx context.Context) { + for { + select { + case <-ctx.Done(): + return + case <-broker.onLoad.Ready(): + reply, err := broker.onLoad.Recv() - listeners, ok := broker.listeners[name] + broker.emit(EventLoad, reply, err) + default: + } + + select { + case <-ctx.Done(): + return + case <-broker.onReload.Ready(): + reply, err := broker.onReload.Recv() + + broker.emit(EventReload, reply, err) + default: + } + + select { + case <-ctx.Done(): + return + case <-broker.onAttrModified.Ready(): + reply, err := broker.onAttrModified.Recv() + + broker.emit(EventAttrModified, reply, err) + default: + } + + select { + case <-ctx.Done(): + return + case <-broker.onAttrRemoved.Ready(): + reply, err := broker.onAttrRemoved.Recv() + + broker.emit(EventAttrRemoved, reply, err) + default: + } + + select { + case <-ctx.Done(): + return + case <-broker.onChildNodeCountUpdated.Ready(): + reply, err := broker.onChildNodeCountUpdated.Recv() + + broker.emit(EventChildNodeCountUpdated, reply, err) + default: + + } + + select { + case <-ctx.Done(): + return + case <-broker.onChildNodeInserted.Ready(): + reply, err := broker.onChildNodeInserted.Recv() + + broker.emit(EventChildNodeInserted, reply, err) + default: + } + + select { + case <-ctx.Done(): + return + case <-broker.onChildNodeRemoved.Ready(): + reply, err := broker.onChildNodeRemoved.Recv() + + broker.emit(EventChildNodeRemoved, reply, err) + default: + } + } +} + +func (broker *EventBroker) emit(event Event, message interface{}, err error) { + if err != nil { + event = EventError + message = err + } + + broker.mu.Lock() + defer broker.mu.Unlock() + + listeners, ok := broker.listeners[event] if !ok { - broker.Unlock() return } - // we copy the list of listeners and unlock the broker before the execution. - // we do it in order to avoid deadlocks during calls of event listeners - snapshot := listeners[:] - broker.Unlock() + snapshot := make([]EventListener, len(listeners)) + copy(snapshot, listeners) - for _, listener := range snapshot { - listener(message) - } + go func() { + for _, listener := range snapshot { + listener(message) + } + }() } diff --git a/pkg/html/dynamic/events/broker_test.go b/pkg/html/dynamic/events/broker_test.go new file mode 100644 index 00000000..5dfb539a --- /dev/null +++ b/pkg/html/dynamic/events/broker_test.go @@ -0,0 +1,316 @@ +package events_test + +import ( + "github.com/MontFerret/ferret/pkg/html/dynamic/events" + "github.com/mafredri/cdp/protocol/dom" + "github.com/mafredri/cdp/protocol/page" + . "github.com/smartystreets/goconvey/convey" + "golang.org/x/sync/errgroup" + "testing" + "time" +) + +type ( + TestEventStream struct { + ready chan struct{} + message chan interface{} + } + + TestLoadEventFiredClient struct { + *TestEventStream + } + + TestDocumentUpdatedClient struct { + *TestEventStream + } + + TestAttributeModifiedClient struct { + *TestEventStream + } + + TestAttributeRemovedClient struct { + *TestEventStream + } + + TestChildNodeCountUpdatedClient struct { + *TestEventStream + } + + TestChildNodeInsertedClient struct { + *TestEventStream + } + + TestChildNodeRemovedClient struct { + *TestEventStream + } + + TestBroker struct { + *events.EventBroker + OnLoad *TestLoadEventFiredClient + OnReload *TestDocumentUpdatedClient + OnAttrMod *TestAttributeModifiedClient + OnAttrRem *TestAttributeRemovedClient + OnChildNodeCount *TestChildNodeCountUpdatedClient + OnChildNodeIns *TestChildNodeInsertedClient + OnChildNodeRem *TestChildNodeRemovedClient + } +) + +func NewTestEventStream() *TestEventStream { + es := new(TestEventStream) + es.ready = make(chan struct{}) + es.message = make(chan interface{}) + return es +} + +func (es *TestEventStream) Ready() <-chan struct{} { + return es.ready +} + +func (es *TestEventStream) RecvMsg(i interface{}) error { + // NOT IMPLEMENTED + return nil +} + +func (es *TestEventStream) Close() error { + close(es.message) + close(es.ready) + return nil +} + +func (es *TestEventStream) Emit(msg interface{}) { + es.ready <- struct{}{} + es.message <- msg +} + +func (es *TestLoadEventFiredClient) Recv() (*page.LoadEventFiredReply, error) { + r := <-es.message + reply := r.(*page.LoadEventFiredReply) + + return reply, nil +} + +func (es *TestLoadEventFiredClient) EmitDefault() { + es.TestEventStream.Emit(&page.LoadEventFiredReply{}) +} + +func (es *TestDocumentUpdatedClient) Recv() (*dom.DocumentUpdatedReply, error) { + r := <-es.message + reply := r.(*dom.DocumentUpdatedReply) + + return reply, nil +} + +func (es *TestAttributeModifiedClient) Recv() (*dom.AttributeModifiedReply, error) { + r := <-es.message + reply := r.(*dom.AttributeModifiedReply) + + return reply, nil +} + +func (es *TestAttributeRemovedClient) Recv() (*dom.AttributeRemovedReply, error) { + r := <-es.message + reply := r.(*dom.AttributeRemovedReply) + + return reply, nil +} + +func (es *TestChildNodeCountUpdatedClient) Recv() (*dom.ChildNodeCountUpdatedReply, error) { + r := <-es.message + reply := r.(*dom.ChildNodeCountUpdatedReply) + + return reply, nil +} + +func (es *TestChildNodeInsertedClient) Recv() (*dom.ChildNodeInsertedReply, error) { + r := <-es.message + reply := r.(*dom.ChildNodeInsertedReply) + + return reply, nil +} + +func (es *TestChildNodeRemovedClient) Recv() (*dom.ChildNodeRemovedReply, error) { + r := <-es.message + reply := r.(*dom.ChildNodeRemovedReply) + + return reply, nil +} + +func NewTestEventBroker() *TestBroker { + onLoad := &TestLoadEventFiredClient{NewTestEventStream()} + onReload := &TestDocumentUpdatedClient{NewTestEventStream()} + onAttrMod := &TestAttributeModifiedClient{NewTestEventStream()} + onAttrRem := &TestAttributeRemovedClient{NewTestEventStream()} + onChildCount := &TestChildNodeCountUpdatedClient{NewTestEventStream()} + onChildIns := &TestChildNodeInsertedClient{NewTestEventStream()} + onChildRem := &TestChildNodeRemovedClient{NewTestEventStream()} + + b := events.NewEventBroker( + onLoad, + onReload, + onAttrMod, + onAttrRem, + onChildCount, + onChildIns, + onChildRem, + ) + + return &TestBroker{ + b, + onLoad, + onReload, + onAttrMod, + onAttrRem, + onChildCount, + onChildIns, + onChildRem, + } +} + +func StressTest(h func() error, count int) error { + var err error + + for i := 0; i < count; i++ { + err = h() + + if err != nil { + return err + } + } + + return nil +} + +func StressTestAsync(h func() error, count int) error { + var gr errgroup.Group + + for i := 0; i < count; i++ { + gr.Go(h) + } + + return gr.Wait() +} + +func TestEventBroker(t *testing.T) { + Convey(".AddEventListener", t, func() { + Convey("Should add a new listener when not started", func() { + b := NewTestEventBroker() + + StressTest(func() error { + b.AddEventListener(events.EventLoad, func(message interface{}) {}) + + return nil + }, 500) + }) + + Convey("Should add a new listener when started", func() { + b := NewTestEventBroker() + b.Start() + defer b.Stop() + + StressTest(func() error { + b.AddEventListener(events.EventLoad, func(message interface{}) {}) + + return nil + }, 500) + }) + }) + + Convey(".RemoveEventListener", t, func() { + Convey("Should remove a listener when not started", func() { + b := NewTestEventBroker() + + StressTest(func() error { + listener := func(message interface{}) {} + + b.AddEventListener(events.EventLoad, listener) + b.RemoveEventListener(events.EventLoad, listener) + + So(b.ListenerCount(events.EventLoad), ShouldEqual, 0) + + return nil + }, 500) + }) + + Convey("Should add a new listener when started", func() { + b := NewTestEventBroker() + b.Start() + defer b.Stop() + + StressTest(func() error { + listener := func(message interface{}) {} + + b.AddEventListener(events.EventLoad, listener) + + StressTestAsync(func() error { + b.OnLoad.EmitDefault() + + return nil + }, 250) + + b.RemoveEventListener(events.EventLoad, listener) + + So(b.ListenerCount(events.EventLoad), ShouldEqual, 0) + + return nil + }, 250) + + }) + + Convey("Should not call listener once it was removed", func() { + b := NewTestEventBroker() + b.Start() + defer b.Stop() + + counter := 0 + listener := func(message interface{}) { + counter += 1 + } + + b.AddEventListener(events.EventLoad, listener) + b.OnLoad.Emit(&page.LoadEventFiredReply{}) + + time.Sleep(time.Duration(10) * time.Millisecond) + b.RemoveEventListener(events.EventLoad, listener) + + StressTestAsync(func() error { + b.OnLoad.Emit(&page.LoadEventFiredReply{}) + + return nil + }, 250) + + So(b.ListenerCount(events.EventLoad), ShouldEqual, 0) + So(counter, ShouldEqual, 1) + }) + }) + + Convey(".Stop", t, func() { + Convey("Should stop emitting events", func() { + b := NewTestEventBroker() + b.Start() + + counter := 0 + b.AddEventListener(events.EventLoad, func(message interface{}) { + counter++ + }) + + b.OnLoad.EmitDefault() + + time.Sleep(time.Duration(5) * time.Millisecond) + + b.Stop() + + go func() { + b.OnLoad.EmitDefault() + }() + + go func() { + b.OnLoad.EmitDefault() + }() + + time.Sleep(time.Duration(5) * time.Millisecond) + + So(counter, ShouldEqual, 1) + }) + }) +} diff --git a/pkg/html/dynamic/helpers.go b/pkg/html/dynamic/helpers.go index 5ffcd58f..62c03587 100644 --- a/pkg/html/dynamic/helpers.go +++ b/pkg/html/dynamic/helpers.go @@ -130,96 +130,101 @@ func waitForLoadEvent(ctx context.Context, client *cdp.Client) error { } func createEventBroker(client *cdp.Client) (*events.EventBroker, error) { + var err error + var onLoad page.LoadEventFiredClient + var onReload dom.DocumentUpdatedClient + var onAttrModified dom.AttributeModifiedClient + var onAttrRemoved dom.AttributeRemovedClient + var onChildCountUpdated dom.ChildNodeCountUpdatedClient + var onChildNodeInserted dom.ChildNodeInsertedClient + var onChildNodeRemoved dom.ChildNodeRemovedClient ctx := context.Background() - load, err := client.Page.LoadEventFired(ctx) + + onLoad, err = client.Page.LoadEventFired(ctx) if err != nil { return nil, err } - broker := events.NewEventBroker() - broker.AddEventStream("load", load, func() interface{} { - return new(page.LoadEventFiredReply) - }) + onReload, err = client.DOM.DocumentUpdated(ctx) + + if err != nil { + onLoad.Close() + return nil, err + } + + onAttrModified, err = client.DOM.AttributeModified(ctx) + + if err != nil { + onLoad.Close() + onReload.Close() + return nil, err + } + + onAttrRemoved, err = client.DOM.AttributeRemoved(ctx) + + if err != nil { + onLoad.Close() + onReload.Close() + onAttrModified.Close() + return nil, err + } + + onChildCountUpdated, err = client.DOM.ChildNodeCountUpdated(ctx) + + if err != nil { + onLoad.Close() + onReload.Close() + onAttrModified.Close() + onAttrRemoved.Close() + return nil, err + } + + onChildNodeInserted, err = client.DOM.ChildNodeInserted(ctx) + + if err != nil { + onLoad.Close() + onReload.Close() + onAttrModified.Close() + onAttrRemoved.Close() + onChildCountUpdated.Close() + return nil, err + } + + onChildNodeRemoved, err = client.DOM.ChildNodeRemoved(ctx) + + if err != nil { + onLoad.Close() + onReload.Close() + onAttrModified.Close() + onAttrRemoved.Close() + onChildCountUpdated.Close() + onChildNodeInserted.Close() + return nil, err + } + + broker := events.NewEventBroker( + onLoad, + onReload, + onAttrModified, + onAttrRemoved, + onChildCountUpdated, + onChildNodeInserted, + onChildNodeRemoved, + ) err = broker.Start() if err != nil { - broker.Close() - + onLoad.Close() + onReload.Close() + onAttrModified.Close() + onAttrRemoved.Close() + onChildCountUpdated.Close() + onChildNodeInserted.Close() + onChildNodeRemoved.Close() return nil, err } - destroy, err := client.DOM.DocumentUpdated(ctx) - - if err != nil { - broker.Close() - return nil, err - } - - broker.AddEventStream("reload", destroy, func() interface{} { - return new(dom.DocumentUpdatedReply) - }) - - attrModified, err := client.DOM.AttributeModified(ctx) - - if err != nil { - broker.Close() - - return nil, err - } - - broker.AddEventStream("attr:modified", attrModified, func() interface{} { - return new(dom.AttributeModifiedReply) - }) - - attrRemoved, err := client.DOM.AttributeRemoved(ctx) - - if err != nil { - broker.Close() - - return nil, err - } - - broker.AddEventStream("attr:removed", attrRemoved, func() interface{} { - return new(dom.AttributeRemovedReply) - }) - - childrenCount, err := client.DOM.ChildNodeCountUpdated(ctx) - - if err != nil { - broker.Close() - - return nil, err - } - - broker.AddEventStream("children:count", childrenCount, func() interface{} { - return new(dom.ChildNodeCountUpdatedReply) - }) - - childrenInsert, err := client.DOM.ChildNodeInserted(ctx) - - if err != nil { - broker.Close() - - return nil, err - } - - broker.AddEventStream("children:inserted", childrenInsert, func() interface{} { - return new(dom.ChildNodeInsertedReply) - }) - - childDeleted, err := client.DOM.ChildNodeRemoved(ctx) - - if err != nil { - broker.Close() - - return nil, err - } - - broker.AddEventStream("children:deleted", childDeleted, func() interface{} { - return new(dom.ChildNodeRemovedReply) - }) - return broker, nil } diff --git a/pkg/stdlib/strings/lib.go b/pkg/stdlib/strings/lib.go index 8ae9c877..dfe5101d 100644 --- a/pkg/stdlib/strings/lib.go +++ b/pkg/stdlib/strings/lib.go @@ -31,7 +31,7 @@ func NewLib() map[string]core.Function { "SUBSTITUTE": Substitute, "SUBSTRING": Substring, "TO_BASE64": ToBase64, - "FROM_BASE64": FromBase64, + "FROM_BASE64": FromBase64, "TRIM": Trim, "UPPER": Upper, }