1
0
mirror of https://github.com/NUTtech/bell.git synced 2024-11-19 20:31:41 +02:00
bell/bell.go
2022-04-29 14:52:11 +03:00

202 lines
4.9 KiB
Go

// Package bell implements a simple event system (bell ringing and listening)
//
// Several listeners can be added for each ringing (handlerFunc).
// Listeners are called in a separate goroutine through an established channel.
// When the bell rings, a message is sequentially transmitted to each listener.
//
// If a channel is closed, the goroutine for that event is terminated.
//
// Example for usage:
// Listen("event_name", func(message Message) { fmt.PrintLn(message) }) - add listener on bell by name "event_name"
// Ring("event_name", "some_data") - Ring on bell (call event "event_name")
package bell
import (
"fmt"
"sync"
)
// Message The message that is passed to the event handler
type Message interface{}
// Events thread safe structure stores events, their handlers and functions for management
type Events struct {
mutex sync.RWMutex
wg sync.WaitGroup
channels map[string][]chan Message
queueSize uint
}
// New constructor for Events
func New() *Events {
return &Events{channels: map[string][]chan Message{}}
}
// Queue set events queue size
func (e *Events) Queue(size uint) *Events {
e.mutex.Lock()
defer e.mutex.Unlock()
e.queueSize = size
return e
}
// ListenN Subscribe on event where
// event - the event name,
// handlerFunc - handler function
// copiesCount - count handlers copies run
func (e *Events) ListenN(event string, handlerFunc func(message Message), copiesCount uint) {
e.mutex.Lock()
defer e.mutex.Unlock()
channel := make(chan Message, e.queueSize)
for i := uint(0); i < copiesCount; i++ {
go func(c chan Message, wg *sync.WaitGroup) {
for {
message, ok := <-c
if !ok {
break
}
handlerFunc(message)
wg.Done()
}
}(channel, &e.wg)
}
e.channels[event] = append(e.channels[event], channel)
}
// Listen Subscribe on event where
// event - the event name,
// handlerFunc - handler function
func (e *Events) Listen(event string, handlerFunc func(message Message)) {
e.ListenN(event, handlerFunc, 1)
}
// Ring Call event there
// event - event name
// message - data that will be passed to the event handler
func (e *Events) Ring(event string, message Message) error {
e.mutex.RLock()
defer e.mutex.RUnlock()
if _, ok := e.channels[event]; !ok {
return fmt.Errorf("channel %s not found", event)
}
for _, c := range e.channels[event] {
e.wg.Add(1)
c <- message
}
return nil
}
// Has Checks if there are listeners for the passed event
func (e *Events) Has(event string) bool {
e.mutex.RLock()
defer e.mutex.RUnlock()
_, ok := e.channels[event]
return ok
}
// List Returns a list of events that listeners are subscribed to
func (e *Events) List() []string {
e.mutex.RLock()
defer e.mutex.RUnlock()
list := make([]string, 0, len(e.channels))
for event := range e.channels {
list = append(list, event)
}
return list
}
// Remove Removes listeners by event name
// Removing listeners closes channels and stops the goroutine.
//
// If you call the function without the "names" parameter, all listeners of all events will be removed.
func (e *Events) Remove(names ...string) {
e.mutex.Lock()
defer e.mutex.Unlock()
if len(names) == 0 {
keys := make([]string, 0, len(e.channels))
for k := range e.channels {
keys = append(keys, k)
}
names = keys
}
for _, name := range names {
for _, channel := range e.channels[name] {
close(channel)
}
delete(e.channels, name)
}
}
// Wait Blocks the thread until all running events are completed
func (e *Events) Wait() {
e.mutex.Lock()
defer e.mutex.Unlock()
e.wg.Wait()
}
// globalState store of global event handlers
var globalState = New()
// ListenN Subscribe on event where
// event - the event name,
// handlerFunc - handler function
// copiesCount - count handlers copies run
func ListenN(event string, handlerFunc func(message Message), copiesCount uint) {
globalState.ListenN(event, handlerFunc, copiesCount)
}
// Listen Subscribe on event where
// event - the event name,
// handlerFunc - handler function
func Listen(event string, handlerFunc func(message Message)) {
globalState.Listen(event, handlerFunc)
}
// Ring Call event there
// event - event name
// message - data that will be passed to the event handler
func Ring(event string, message Message) error {
return globalState.Ring(event, message)
}
// Has Checks if there are listeners for the passed event
func Has(event string) bool {
return globalState.Has(event)
}
// List Returns a list of events that listeners are subscribed to
func List() []string {
return globalState.List()
}
// Remove Removes listeners by event name
// Removing listeners closes channels and stops the goroutine.
//
// If you call the function without the "names" parameter, all listeners of all events will be removed.
func Remove(names ...string) {
globalState.Remove(names...)
}
// Wait Blocks the thread until all running events are completed
func Wait() {
globalState.Wait()
}
// Queue set events queue size
func Queue(size uint) {
globalState.Queue(size)
}