1
0
mirror of https://github.com/MontFerret/ferret.git synced 2025-11-29 22:17:29 +02:00
Files
ferret/pkg/drivers/cdp/events/source.go
Tim Voronov bd6463fa29 Bench (#683)
* Extended public API

* Refactored event loop

* Refactored parser

* Fixed unit tests

* Added --dry-run opt

* Added total alloc

* Added profiler

* Fixed driver registration
2021-10-16 17:24:54 -04:00

95 lines
1.9 KiB
Go

package events
import (
"context"
"github.com/mafredri/cdp/rpcc"
)
type (
// ID represents a unique event ID
ID int
// Event represents a system event that is returned from an event source
Event struct {
ID ID
Data interface{}
}
// Source represents a custom source of system events
Source interface {
rpcc.Stream
Recv() (Event, error)
}
// SourceFactory represents a function that creates a new instance of Source.
SourceFactory func(ctx context.Context) (Source, error)
StreamFactory func(ctx context.Context) (rpcc.Stream, error)
DataStreamReceiver func(stream rpcc.Stream) (interface{}, error)
// StreamSource represents a helper struct for generating custom event sources
StreamSource struct {
eventID ID
stream rpcc.Stream
receiver DataStreamReceiver
}
)
var (
Error = New("error")
)
// NewStreamSource create a new custom event source based on rpcc.Stream
// eventID - is a unique event ID
// stream - is a custom event stream
// receiver - is a value conversion function
func NewStreamSource(
eventID ID,
stream rpcc.Stream,
receiver DataStreamReceiver,
) Source {
return &StreamSource{eventID, stream, receiver}
}
func (src *StreamSource) ID() ID {
return src.eventID
}
func (src *StreamSource) Ready() <-chan struct{} {
return src.stream.Ready()
}
func (src *StreamSource) RecvMsg(m interface{}) error {
return src.stream.RecvMsg(m)
}
func (src *StreamSource) Close() error {
return src.stream.Close()
}
func (src *StreamSource) Recv() (Event, error) {
data, err := src.receiver(src.stream)
if err != nil {
return Event{}, err
}
return Event{
ID: src.eventID,
Data: data,
}, nil
}
func NewStreamSourceFactory(eventID ID, factory StreamFactory, receiver DataStreamReceiver) SourceFactory {
return func(ctx context.Context) (Source, error) {
stream, err := factory(ctx)
if err != nil {
return nil, err
}
return NewStreamSource(eventID, stream, receiver), nil
}
}