1
0
mirror of https://github.com/MontFerret/ferret.git synced 2024-12-14 11:23:02 +02:00

Refactoring/new event broker (#127)

* Refactored EventBroker
* Improved event loop cancellation
This commit is contained in:
Tim Voronov 2018-10-15 17:17:15 -04:00 committed by GitHub
parent e64eb18ccf
commit 1d6a23fa96
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
6 changed files with 605 additions and 202 deletions

View File

@ -132,8 +132,8 @@ func NewHTMLDocument(
doc.url = url
doc.element = rootElement
broker.AddEventListener("load", doc.handlePageLoad)
broker.AddEventListener("error", doc.handleError)
broker.AddEventListener(events.EventLoad, doc.handlePageLoad)
broker.AddEventListener(events.EventError, doc.handleError)
return doc
}
@ -584,9 +584,9 @@ func (doc *HTMLDocument) WaitForNavigation(timeout values.Int) error {
close(onEvent)
}
defer doc.events.RemoveEventListener("load", listener)
defer doc.events.RemoveEventListener(events.EventLoad, listener)
doc.events.AddEventListener("load", listener)
doc.events.AddEventListener(events.EventLoad, listener)
select {
case <-onEvent:

View File

@ -36,7 +36,7 @@ type (
}
HTMLElement struct {
sync.Mutex
mu sync.Mutex
logger *zerolog.Logger
client *cdp.Client
events *events.EventBroker
@ -164,32 +164,32 @@ func NewHTMLElement(
el.value = values.NewString(value)
el.children = children
broker.AddEventListener("reload", el.handlePageReload)
broker.AddEventListener("attr:modified", el.handleAttrModified)
broker.AddEventListener("attr:removed", el.handleAttrRemoved)
broker.AddEventListener("children:count", el.handleChildrenCountChanged)
broker.AddEventListener("children:inserted", el.handleChildInserted)
broker.AddEventListener("children:deleted", el.handleChildDeleted)
broker.AddEventListener(events.EventReload, el.handlePageReload)
broker.AddEventListener(events.EventAttrModified, el.handleAttrModified)
broker.AddEventListener(events.EventAttrRemoved, el.handleAttrRemoved)
broker.AddEventListener(events.EventChildNodeCountUpdated, el.handleChildrenCountChanged)
broker.AddEventListener(events.EventChildNodeInserted, el.handleChildInserted)
broker.AddEventListener(events.EventChildNodeRemoved, el.handleChildRemoved)
return el
}
func (el *HTMLElement) Close() error {
el.Lock()
defer el.Unlock()
el.mu.Lock()
defer el.mu.Unlock()
// already closed
if !el.connected {
return nil
}
el.connected = false
el.events.RemoveEventListener("reload", el.handlePageReload)
el.events.RemoveEventListener("attr:modified", el.handleAttrModified)
el.events.RemoveEventListener("attr:removed", el.handleAttrRemoved)
el.events.RemoveEventListener("children:count", el.handleChildrenCountChanged)
el.events.RemoveEventListener("children:inserted", el.handleChildInserted)
el.events.RemoveEventListener("children:deleted", el.handleChildDeleted)
el.connected = values.False
el.events.RemoveEventListener(events.EventReload, el.handlePageReload)
el.events.RemoveEventListener(events.EventAttrModified, el.handleAttrModified)
el.events.RemoveEventListener(events.EventAttrRemoved, el.handleAttrRemoved)
el.events.RemoveEventListener(events.EventChildNodeCountUpdated, el.handleChildrenCountChanged)
el.events.RemoveEventListener(events.EventChildNodeInserted, el.handleChildInserted)
el.events.RemoveEventListener(events.EventChildNodeRemoved, el.handleChildRemoved)
return nil
}
@ -243,8 +243,8 @@ func (el *HTMLElement) Unwrap() interface{} {
}
func (el *HTMLElement) Hash() uint64 {
el.Lock()
defer el.Unlock()
el.mu.Lock()
defer el.mu.Unlock()
h := fnv.New64a()
@ -330,6 +330,7 @@ func (el *HTMLElement) GetChildNodes() core.Value {
}
func (el *HTMLElement) GetChildNode(idx values.Int) core.Value {
// TODO: Add lazy loading
val, err := el.loadedChildren.Read()
if err != nil {
@ -603,8 +604,8 @@ func (el *HTMLElement) InnerTextBySelectorAll(selector values.String) *values.Ar
}
func (el *HTMLElement) InnerHTML() values.String {
el.Lock()
defer el.Unlock()
el.mu.Lock()
defer el.mu.Unlock()
return el.innerHTML
}
@ -746,8 +747,8 @@ func (el *HTMLElement) Input(value core.Value, delay values.Int) error {
}
func (el *HTMLElement) IsConnected() values.Boolean {
el.Lock()
defer el.Unlock()
el.mu.Lock()
defer el.mu.Unlock()
return el.connected
}
@ -916,8 +917,8 @@ func (el *HTMLElement) handleChildrenCountChanged(message interface{}) {
return
}
el.Lock()
defer el.Unlock()
el.mu.Lock()
defer el.mu.Unlock()
el.children = createChildrenArray(node.Node.Children)
}
@ -937,8 +938,8 @@ func (el *HTMLElement) handleChildInserted(message interface{}) {
prevID := reply.PreviousNodeID
nextID := reply.Node.NodeID
el.Lock()
defer el.Unlock()
el.mu.Lock()
defer el.mu.Unlock()
for idx, id := range el.children {
if id.nodeID == prevID {
@ -991,7 +992,7 @@ func (el *HTMLElement) handleChildInserted(message interface{}) {
})
}
func (el *HTMLElement) handleChildDeleted(message interface{}) {
func (el *HTMLElement) handleChildRemoved(message interface{}) {
reply, ok := message.(*dom.ChildNodeRemovedReply)
if !ok {
@ -1005,8 +1006,8 @@ func (el *HTMLElement) handleChildDeleted(message interface{}) {
targetIDx := -1
targetID := reply.NodeID
el.Lock()
defer el.Unlock()
el.mu.Lock()
defer el.mu.Unlock()
for idx, id := range el.children {
if id.nodeID == targetID {

View File

@ -3,54 +3,68 @@ package events
import (
"context"
"github.com/MontFerret/ferret/pkg/runtime/core"
"github.com/mafredri/cdp/rpcc"
"github.com/mafredri/cdp/protocol/dom"
"github.com/mafredri/cdp/protocol/page"
"reflect"
"sync"
"time"
)
type (
MessageFactory func() interface{}
EventStream struct {
stream rpcc.Stream
message MessageFactory
}
Event int
EventListener func(message interface{})
EventBroker struct {
sync.Mutex
events map[string]*EventStream
listeners map[string][]EventListener
mu sync.Mutex
listeners map[Event][]EventListener
cancel context.CancelFunc
onLoad page.LoadEventFiredClient
onReload dom.DocumentUpdatedClient
onAttrModified dom.AttributeModifiedClient
onAttrRemoved dom.AttributeRemovedClient
onChildNodeCountUpdated dom.ChildNodeCountUpdatedClient
onChildNodeInserted dom.ChildNodeInsertedClient
onChildNodeRemoved dom.ChildNodeRemovedClient
}
)
func NewEventBroker() *EventBroker {
var (
//revive:disable-next-line var-declaration
EventError Event = 0
EventLoad Event = 1
EventReload Event = 2
EventAttrModified Event = 3
EventAttrRemoved Event = 4
EventChildNodeCountUpdated Event = 5
EventChildNodeInserted Event = 6
EventChildNodeRemoved Event = 7
)
func NewEventBroker(
onLoad page.LoadEventFiredClient,
onReload dom.DocumentUpdatedClient,
onAttrModified dom.AttributeModifiedClient,
onAttrRemoved dom.AttributeRemovedClient,
onChildNodeCountUpdated dom.ChildNodeCountUpdatedClient,
onChildNodeInserted dom.ChildNodeInsertedClient,
onChildNodeRemoved dom.ChildNodeRemovedClient,
) *EventBroker {
broker := new(EventBroker)
broker.events = make(map[string]*EventStream)
broker.listeners = make(map[string][]EventListener)
broker.listeners = make(map[Event][]EventListener)
broker.onLoad = onLoad
broker.onReload = onReload
broker.onAttrModified = onAttrModified
broker.onAttrRemoved = onAttrRemoved
broker.onChildNodeCountUpdated = onChildNodeCountUpdated
broker.onChildNodeInserted = onChildNodeInserted
broker.onChildNodeRemoved = onChildNodeRemoved
return broker
}
func (broker *EventBroker) AddEventStream(name string, stream rpcc.Stream, msg MessageFactory) error {
broker.Lock()
defer broker.Unlock()
_, exists := broker.events[name]
if exists {
return core.Error(core.ErrNotUnique, name)
}
broker.events[name] = &EventStream{stream, msg}
return nil
}
func (broker *EventBroker) AddEventListener(event string, listener EventListener) {
broker.Lock()
defer broker.Unlock()
func (broker *EventBroker) AddEventListener(event Event, listener EventListener) {
broker.mu.Lock()
defer broker.mu.Unlock()
listeners, ok := broker.listeners[event]
@ -61,9 +75,9 @@ func (broker *EventBroker) AddEventListener(event string, listener EventListener
broker.listeners[event] = append(listeners, listener)
}
func (broker *EventBroker) RemoveEventListener(event string, listener EventListener) {
broker.Lock()
defer broker.Unlock()
func (broker *EventBroker) RemoveEventListener(event Event, listener EventListener) {
broker.mu.Lock()
defer broker.mu.Unlock()
idx := -1
@ -91,14 +105,29 @@ func (broker *EventBroker) RemoveEventListener(event string, listener EventListe
if len(listeners) > 1 {
modifiedListeners = append(listeners[:idx], listeners[idx+1:]...)
} else {
modifiedListeners = make([]EventListener, 0, 5)
}
broker.listeners[event] = modifiedListeners
}
func (broker *EventBroker) ListenerCount(event Event) int {
broker.mu.Lock()
defer broker.mu.Unlock()
listeners, ok := broker.listeners[event]
if !ok {
return 0
}
return len(listeners)
}
func (broker *EventBroker) Start() error {
broker.Lock()
defer broker.Unlock()
broker.mu.Lock()
defer broker.mu.Unlock()
if broker.cancel != nil {
return core.Error(core.ErrInvalidOperation, "broker is already started")
@ -108,89 +137,141 @@ func (broker *EventBroker) Start() error {
broker.cancel = cancel
go func() {
counter := 0
eventsCount := len(broker.events)
for {
for name, event := range broker.events {
counter++
select {
case <-ctx.Done():
return
case <-event.stream.Ready():
msg := event.message()
err := event.stream.RecvMsg(msg)
if err != nil {
broker.emit("error", err)
return
}
broker.emit(name, msg)
default:
// we have iterated over all events
// lets pause
if counter == eventsCount {
counter = 0
time.Sleep(DefaultPolling)
}
continue
}
}
}
}()
go broker.runLoop(ctx)
return nil
}
func (broker *EventBroker) Stop() error {
broker.Lock()
defer broker.Unlock()
broker.mu.Lock()
defer broker.mu.Unlock()
if broker.cancel == nil {
return core.Error(core.ErrInvalidOperation, "broker is already stopped")
}
broker.cancel()
broker.cancel = nil
return nil
}
func (broker *EventBroker) Close() error {
broker.Lock()
defer broker.Unlock()
broker.mu.Lock()
defer broker.mu.Unlock()
if broker.cancel != nil {
broker.cancel()
broker.cancel = nil
}
for _, event := range broker.events {
event.stream.Close()
}
broker.onLoad.Close()
broker.onReload.Close()
broker.onAttrModified.Close()
broker.onAttrRemoved.Close()
broker.onChildNodeCountUpdated.Close()
broker.onChildNodeInserted.Close()
broker.onChildNodeRemoved.Close()
return nil
}
func (broker *EventBroker) emit(name string, message interface{}) {
broker.Lock()
func (broker *EventBroker) runLoop(ctx context.Context) {
for {
select {
case <-ctx.Done():
return
case <-broker.onLoad.Ready():
reply, err := broker.onLoad.Recv()
listeners, ok := broker.listeners[name]
broker.emit(EventLoad, reply, err)
default:
}
select {
case <-ctx.Done():
return
case <-broker.onReload.Ready():
reply, err := broker.onReload.Recv()
broker.emit(EventReload, reply, err)
default:
}
select {
case <-ctx.Done():
return
case <-broker.onAttrModified.Ready():
reply, err := broker.onAttrModified.Recv()
broker.emit(EventAttrModified, reply, err)
default:
}
select {
case <-ctx.Done():
return
case <-broker.onAttrRemoved.Ready():
reply, err := broker.onAttrRemoved.Recv()
broker.emit(EventAttrRemoved, reply, err)
default:
}
select {
case <-ctx.Done():
return
case <-broker.onChildNodeCountUpdated.Ready():
reply, err := broker.onChildNodeCountUpdated.Recv()
broker.emit(EventChildNodeCountUpdated, reply, err)
default:
}
select {
case <-ctx.Done():
return
case <-broker.onChildNodeInserted.Ready():
reply, err := broker.onChildNodeInserted.Recv()
broker.emit(EventChildNodeInserted, reply, err)
default:
}
select {
case <-ctx.Done():
return
case <-broker.onChildNodeRemoved.Ready():
reply, err := broker.onChildNodeRemoved.Recv()
broker.emit(EventChildNodeRemoved, reply, err)
default:
}
}
}
func (broker *EventBroker) emit(event Event, message interface{}, err error) {
if err != nil {
event = EventError
message = err
}
broker.mu.Lock()
defer broker.mu.Unlock()
listeners, ok := broker.listeners[event]
if !ok {
broker.Unlock()
return
}
// we copy the list of listeners and unlock the broker before the execution.
// we do it in order to avoid deadlocks during calls of event listeners
snapshot := listeners[:]
broker.Unlock()
snapshot := make([]EventListener, len(listeners))
copy(snapshot, listeners)
go func() {
for _, listener := range snapshot {
listener(message)
}
}()
}

View File

@ -0,0 +1,316 @@
package events_test
import (
"github.com/MontFerret/ferret/pkg/html/dynamic/events"
"github.com/mafredri/cdp/protocol/dom"
"github.com/mafredri/cdp/protocol/page"
. "github.com/smartystreets/goconvey/convey"
"golang.org/x/sync/errgroup"
"testing"
"time"
)
type (
TestEventStream struct {
ready chan struct{}
message chan interface{}
}
TestLoadEventFiredClient struct {
*TestEventStream
}
TestDocumentUpdatedClient struct {
*TestEventStream
}
TestAttributeModifiedClient struct {
*TestEventStream
}
TestAttributeRemovedClient struct {
*TestEventStream
}
TestChildNodeCountUpdatedClient struct {
*TestEventStream
}
TestChildNodeInsertedClient struct {
*TestEventStream
}
TestChildNodeRemovedClient struct {
*TestEventStream
}
TestBroker struct {
*events.EventBroker
OnLoad *TestLoadEventFiredClient
OnReload *TestDocumentUpdatedClient
OnAttrMod *TestAttributeModifiedClient
OnAttrRem *TestAttributeRemovedClient
OnChildNodeCount *TestChildNodeCountUpdatedClient
OnChildNodeIns *TestChildNodeInsertedClient
OnChildNodeRem *TestChildNodeRemovedClient
}
)
func NewTestEventStream() *TestEventStream {
es := new(TestEventStream)
es.ready = make(chan struct{})
es.message = make(chan interface{})
return es
}
func (es *TestEventStream) Ready() <-chan struct{} {
return es.ready
}
func (es *TestEventStream) RecvMsg(i interface{}) error {
// NOT IMPLEMENTED
return nil
}
func (es *TestEventStream) Close() error {
close(es.message)
close(es.ready)
return nil
}
func (es *TestEventStream) Emit(msg interface{}) {
es.ready <- struct{}{}
es.message <- msg
}
func (es *TestLoadEventFiredClient) Recv() (*page.LoadEventFiredReply, error) {
r := <-es.message
reply := r.(*page.LoadEventFiredReply)
return reply, nil
}
func (es *TestLoadEventFiredClient) EmitDefault() {
es.TestEventStream.Emit(&page.LoadEventFiredReply{})
}
func (es *TestDocumentUpdatedClient) Recv() (*dom.DocumentUpdatedReply, error) {
r := <-es.message
reply := r.(*dom.DocumentUpdatedReply)
return reply, nil
}
func (es *TestAttributeModifiedClient) Recv() (*dom.AttributeModifiedReply, error) {
r := <-es.message
reply := r.(*dom.AttributeModifiedReply)
return reply, nil
}
func (es *TestAttributeRemovedClient) Recv() (*dom.AttributeRemovedReply, error) {
r := <-es.message
reply := r.(*dom.AttributeRemovedReply)
return reply, nil
}
func (es *TestChildNodeCountUpdatedClient) Recv() (*dom.ChildNodeCountUpdatedReply, error) {
r := <-es.message
reply := r.(*dom.ChildNodeCountUpdatedReply)
return reply, nil
}
func (es *TestChildNodeInsertedClient) Recv() (*dom.ChildNodeInsertedReply, error) {
r := <-es.message
reply := r.(*dom.ChildNodeInsertedReply)
return reply, nil
}
func (es *TestChildNodeRemovedClient) Recv() (*dom.ChildNodeRemovedReply, error) {
r := <-es.message
reply := r.(*dom.ChildNodeRemovedReply)
return reply, nil
}
func NewTestEventBroker() *TestBroker {
onLoad := &TestLoadEventFiredClient{NewTestEventStream()}
onReload := &TestDocumentUpdatedClient{NewTestEventStream()}
onAttrMod := &TestAttributeModifiedClient{NewTestEventStream()}
onAttrRem := &TestAttributeRemovedClient{NewTestEventStream()}
onChildCount := &TestChildNodeCountUpdatedClient{NewTestEventStream()}
onChildIns := &TestChildNodeInsertedClient{NewTestEventStream()}
onChildRem := &TestChildNodeRemovedClient{NewTestEventStream()}
b := events.NewEventBroker(
onLoad,
onReload,
onAttrMod,
onAttrRem,
onChildCount,
onChildIns,
onChildRem,
)
return &TestBroker{
b,
onLoad,
onReload,
onAttrMod,
onAttrRem,
onChildCount,
onChildIns,
onChildRem,
}
}
func StressTest(h func() error, count int) error {
var err error
for i := 0; i < count; i++ {
err = h()
if err != nil {
return err
}
}
return nil
}
func StressTestAsync(h func() error, count int) error {
var gr errgroup.Group
for i := 0; i < count; i++ {
gr.Go(h)
}
return gr.Wait()
}
func TestEventBroker(t *testing.T) {
Convey(".AddEventListener", t, func() {
Convey("Should add a new listener when not started", func() {
b := NewTestEventBroker()
StressTest(func() error {
b.AddEventListener(events.EventLoad, func(message interface{}) {})
return nil
}, 500)
})
Convey("Should add a new listener when started", func() {
b := NewTestEventBroker()
b.Start()
defer b.Stop()
StressTest(func() error {
b.AddEventListener(events.EventLoad, func(message interface{}) {})
return nil
}, 500)
})
})
Convey(".RemoveEventListener", t, func() {
Convey("Should remove a listener when not started", func() {
b := NewTestEventBroker()
StressTest(func() error {
listener := func(message interface{}) {}
b.AddEventListener(events.EventLoad, listener)
b.RemoveEventListener(events.EventLoad, listener)
So(b.ListenerCount(events.EventLoad), ShouldEqual, 0)
return nil
}, 500)
})
Convey("Should add a new listener when started", func() {
b := NewTestEventBroker()
b.Start()
defer b.Stop()
StressTest(func() error {
listener := func(message interface{}) {}
b.AddEventListener(events.EventLoad, listener)
StressTestAsync(func() error {
b.OnLoad.EmitDefault()
return nil
}, 250)
b.RemoveEventListener(events.EventLoad, listener)
So(b.ListenerCount(events.EventLoad), ShouldEqual, 0)
return nil
}, 250)
})
Convey("Should not call listener once it was removed", func() {
b := NewTestEventBroker()
b.Start()
defer b.Stop()
counter := 0
listener := func(message interface{}) {
counter += 1
}
b.AddEventListener(events.EventLoad, listener)
b.OnLoad.Emit(&page.LoadEventFiredReply{})
time.Sleep(time.Duration(10) * time.Millisecond)
b.RemoveEventListener(events.EventLoad, listener)
StressTestAsync(func() error {
b.OnLoad.Emit(&page.LoadEventFiredReply{})
return nil
}, 250)
So(b.ListenerCount(events.EventLoad), ShouldEqual, 0)
So(counter, ShouldEqual, 1)
})
})
Convey(".Stop", t, func() {
Convey("Should stop emitting events", func() {
b := NewTestEventBroker()
b.Start()
counter := 0
b.AddEventListener(events.EventLoad, func(message interface{}) {
counter++
})
b.OnLoad.EmitDefault()
time.Sleep(time.Duration(5) * time.Millisecond)
b.Stop()
go func() {
b.OnLoad.EmitDefault()
}()
go func() {
b.OnLoad.EmitDefault()
}()
time.Sleep(time.Duration(5) * time.Millisecond)
So(counter, ShouldEqual, 1)
})
})
}

View File

@ -130,96 +130,101 @@ func waitForLoadEvent(ctx context.Context, client *cdp.Client) error {
}
func createEventBroker(client *cdp.Client) (*events.EventBroker, error) {
var err error
var onLoad page.LoadEventFiredClient
var onReload dom.DocumentUpdatedClient
var onAttrModified dom.AttributeModifiedClient
var onAttrRemoved dom.AttributeRemovedClient
var onChildCountUpdated dom.ChildNodeCountUpdatedClient
var onChildNodeInserted dom.ChildNodeInsertedClient
var onChildNodeRemoved dom.ChildNodeRemovedClient
ctx := context.Background()
load, err := client.Page.LoadEventFired(ctx)
onLoad, err = client.Page.LoadEventFired(ctx)
if err != nil {
return nil, err
}
broker := events.NewEventBroker()
broker.AddEventStream("load", load, func() interface{} {
return new(page.LoadEventFiredReply)
})
onReload, err = client.DOM.DocumentUpdated(ctx)
if err != nil {
onLoad.Close()
return nil, err
}
onAttrModified, err = client.DOM.AttributeModified(ctx)
if err != nil {
onLoad.Close()
onReload.Close()
return nil, err
}
onAttrRemoved, err = client.DOM.AttributeRemoved(ctx)
if err != nil {
onLoad.Close()
onReload.Close()
onAttrModified.Close()
return nil, err
}
onChildCountUpdated, err = client.DOM.ChildNodeCountUpdated(ctx)
if err != nil {
onLoad.Close()
onReload.Close()
onAttrModified.Close()
onAttrRemoved.Close()
return nil, err
}
onChildNodeInserted, err = client.DOM.ChildNodeInserted(ctx)
if err != nil {
onLoad.Close()
onReload.Close()
onAttrModified.Close()
onAttrRemoved.Close()
onChildCountUpdated.Close()
return nil, err
}
onChildNodeRemoved, err = client.DOM.ChildNodeRemoved(ctx)
if err != nil {
onLoad.Close()
onReload.Close()
onAttrModified.Close()
onAttrRemoved.Close()
onChildCountUpdated.Close()
onChildNodeInserted.Close()
return nil, err
}
broker := events.NewEventBroker(
onLoad,
onReload,
onAttrModified,
onAttrRemoved,
onChildCountUpdated,
onChildNodeInserted,
onChildNodeRemoved,
)
err = broker.Start()
if err != nil {
broker.Close()
onLoad.Close()
onReload.Close()
onAttrModified.Close()
onAttrRemoved.Close()
onChildCountUpdated.Close()
onChildNodeInserted.Close()
onChildNodeRemoved.Close()
return nil, err
}
destroy, err := client.DOM.DocumentUpdated(ctx)
if err != nil {
broker.Close()
return nil, err
}
broker.AddEventStream("reload", destroy, func() interface{} {
return new(dom.DocumentUpdatedReply)
})
attrModified, err := client.DOM.AttributeModified(ctx)
if err != nil {
broker.Close()
return nil, err
}
broker.AddEventStream("attr:modified", attrModified, func() interface{} {
return new(dom.AttributeModifiedReply)
})
attrRemoved, err := client.DOM.AttributeRemoved(ctx)
if err != nil {
broker.Close()
return nil, err
}
broker.AddEventStream("attr:removed", attrRemoved, func() interface{} {
return new(dom.AttributeRemovedReply)
})
childrenCount, err := client.DOM.ChildNodeCountUpdated(ctx)
if err != nil {
broker.Close()
return nil, err
}
broker.AddEventStream("children:count", childrenCount, func() interface{} {
return new(dom.ChildNodeCountUpdatedReply)
})
childrenInsert, err := client.DOM.ChildNodeInserted(ctx)
if err != nil {
broker.Close()
return nil, err
}
broker.AddEventStream("children:inserted", childrenInsert, func() interface{} {
return new(dom.ChildNodeInsertedReply)
})
childDeleted, err := client.DOM.ChildNodeRemoved(ctx)
if err != nil {
broker.Close()
return nil, err
}
broker.AddEventStream("children:deleted", childDeleted, func() interface{} {
return new(dom.ChildNodeRemovedReply)
})
return broker, nil
}