mirror of
https://github.com/go-micro/go-micro.git
synced 2025-01-23 17:53:05 +02:00
f23638c036
Signed-off-by: Vasiliy Tolstov <v.tolstov@unistack.org>
228 lines
4.2 KiB
Go
228 lines
4.2 KiB
Go
// Package broker provides a distributed task manager built on the micro broker
|
|
package broker
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"math/rand"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/google/uuid"
|
|
"github.com/micro/go-micro/v2/broker"
|
|
"github.com/micro/go-micro/v2/sync/task"
|
|
)
|
|
|
|
type brokerKey struct{}
|
|
|
|
// Task is a broker task
|
|
type Task struct {
|
|
// a micro broker
|
|
Broker broker.Broker
|
|
// Options
|
|
Options task.Options
|
|
|
|
mtx sync.RWMutex
|
|
status string
|
|
}
|
|
|
|
func returnError(err error, ch chan error) {
|
|
select {
|
|
case ch <- err:
|
|
default:
|
|
}
|
|
}
|
|
|
|
func (t *Task) Run(c task.Command) error {
|
|
// connect
|
|
t.Broker.Connect()
|
|
// unique id for this runner
|
|
id := uuid.New().String()
|
|
// topic of the command
|
|
topic := fmt.Sprintf("task.%s", c.Name)
|
|
|
|
// global error
|
|
errCh := make(chan error, t.Options.Pool)
|
|
|
|
// subscribe for distributed work
|
|
workFn := func(p broker.Event) error {
|
|
msg := p.Message()
|
|
|
|
// get command name
|
|
command := msg.Header["Command"]
|
|
|
|
// check the command is what we expect
|
|
if command != c.Name {
|
|
returnError(errors.New("received unknown command: "+command), errCh)
|
|
return nil
|
|
}
|
|
|
|
// new task created
|
|
switch msg.Header["Status"] {
|
|
case "start":
|
|
// artificially delay start of processing
|
|
time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100)))
|
|
|
|
// execute the function
|
|
err := c.Func()
|
|
|
|
status := "done"
|
|
errors := ""
|
|
|
|
if err != nil {
|
|
status = "error"
|
|
errors = err.Error()
|
|
}
|
|
|
|
// create response
|
|
msg := &broker.Message{
|
|
Header: map[string]string{
|
|
"Command": c.Name,
|
|
"Error": errors,
|
|
"Id": id,
|
|
"Status": status,
|
|
"Timestamp": fmt.Sprintf("%d", time.Now().Unix()),
|
|
},
|
|
// Body is nil, may be used in future
|
|
}
|
|
|
|
// publish end of task
|
|
if err := t.Broker.Publish(topic, msg); err != nil {
|
|
returnError(err, errCh)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// subscribe for the pool size
|
|
for i := 0; i < t.Options.Pool; i++ {
|
|
err := func() error {
|
|
// subscribe to work
|
|
subWork, err := t.Broker.Subscribe(topic, workFn, broker.Queue(fmt.Sprintf("work.%d", i)))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// unsubscribe on completion
|
|
defer subWork.Unsubscribe()
|
|
|
|
return nil
|
|
}()
|
|
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
|
|
// subscribe to all status messages
|
|
subStatus, err := t.Broker.Subscribe(topic, func(p broker.Event) error {
|
|
msg := p.Message()
|
|
|
|
// get command name
|
|
command := msg.Header["Command"]
|
|
|
|
// check the command is what we expect
|
|
if command != c.Name {
|
|
return nil
|
|
}
|
|
|
|
// check task status
|
|
switch msg.Header["Status"] {
|
|
// task is complete
|
|
case "done":
|
|
errCh <- nil
|
|
// someone failed
|
|
case "error":
|
|
returnError(errors.New(msg.Header["Error"]), errCh)
|
|
}
|
|
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer subStatus.Unsubscribe()
|
|
|
|
// a new task
|
|
msg := &broker.Message{
|
|
Header: map[string]string{
|
|
"Command": c.Name,
|
|
"Id": id,
|
|
"Status": "start",
|
|
"Timestamp": fmt.Sprintf("%d", time.Now().Unix()),
|
|
},
|
|
}
|
|
|
|
// artificially delay the start of the task
|
|
time.Sleep(time.Millisecond * time.Duration(10+rand.Intn(100)))
|
|
|
|
// publish the task
|
|
if err := t.Broker.Publish(topic, msg); err != nil {
|
|
return err
|
|
}
|
|
|
|
var gerrors []string
|
|
|
|
// wait for all responses
|
|
for i := 0; i < t.Options.Pool; i++ {
|
|
// check errors
|
|
err := <-errCh
|
|
|
|
// append to errors
|
|
if err != nil {
|
|
gerrors = append(gerrors, err.Error())
|
|
}
|
|
}
|
|
|
|
// return the errors
|
|
if len(gerrors) > 0 {
|
|
return errors.New("errors: " + strings.Join(gerrors, "\n"))
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (t *Task) Status() string {
|
|
t.mtx.RLock()
|
|
defer t.mtx.RUnlock()
|
|
return t.status
|
|
}
|
|
|
|
// Broker sets the micro broker
|
|
func WithBroker(b broker.Broker) task.Option {
|
|
return func(o *task.Options) {
|
|
if o.Context == nil {
|
|
o.Context = context.Background()
|
|
}
|
|
o.Context = context.WithValue(o.Context, brokerKey{}, b)
|
|
}
|
|
}
|
|
|
|
// NewTask returns a new broker task
|
|
func NewTask(opts ...task.Option) task.Task {
|
|
options := task.Options{
|
|
Context: context.Background(),
|
|
}
|
|
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
if options.Pool == 0 {
|
|
options.Pool = 1
|
|
}
|
|
|
|
b, ok := options.Context.Value(brokerKey{}).(broker.Broker)
|
|
if !ok {
|
|
b = broker.DefaultBroker
|
|
}
|
|
|
|
return &Task{
|
|
Broker: b,
|
|
Options: options,
|
|
}
|
|
}
|