mirror of
https://github.com/go-micro/go-micro.git
synced 2025-01-11 17:18:28 +02:00
321 lines
5.0 KiB
Go
321 lines
5.0 KiB
Go
package runtime
|
|
|
|
import (
|
|
"errors"
|
|
"io"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/micro/go-micro/runtime/package"
|
|
"github.com/micro/go-micro/runtime/process"
|
|
proc "github.com/micro/go-micro/runtime/process/os"
|
|
"github.com/micro/go-micro/util/log"
|
|
)
|
|
|
|
type runtime struct {
|
|
sync.RWMutex
|
|
// used to stop the runtime
|
|
closed chan bool
|
|
// used to start new services
|
|
start chan *service
|
|
// indicates if we're running
|
|
running bool
|
|
// the service map
|
|
services map[string]*service
|
|
}
|
|
|
|
type service struct {
|
|
sync.RWMutex
|
|
|
|
running bool
|
|
closed chan bool
|
|
err error
|
|
|
|
// output for logs
|
|
output io.Writer
|
|
|
|
// service to manage
|
|
*Service
|
|
// process creator
|
|
Process *proc.Process
|
|
// Exec
|
|
Exec *process.Executable
|
|
// process pid
|
|
PID *process.PID
|
|
}
|
|
|
|
func newRuntime() *runtime {
|
|
return &runtime{
|
|
closed: make(chan bool),
|
|
start: make(chan *service, 128),
|
|
services: make(map[string]*service),
|
|
}
|
|
}
|
|
|
|
func newService(s *Service, c CreateOptions) *service {
|
|
var exec string
|
|
var args []string
|
|
|
|
if len(s.Exec) > 0 {
|
|
parts := strings.Split(s.Exec, " ")
|
|
exec = parts[0]
|
|
args = []string{}
|
|
|
|
if len(parts) > 1 {
|
|
args = parts[1:]
|
|
}
|
|
} else {
|
|
// set command
|
|
exec = c.Command[0]
|
|
// set args
|
|
if len(c.Command) > 1 {
|
|
args = c.Command[1:]
|
|
}
|
|
}
|
|
|
|
return &service{
|
|
Service: s,
|
|
Process: new(proc.Process),
|
|
Exec: &process.Executable{
|
|
Binary: &packager.Binary{
|
|
Name: s.Name,
|
|
Path: exec,
|
|
},
|
|
Env: c.Env,
|
|
Args: args,
|
|
},
|
|
output: c.Output,
|
|
}
|
|
}
|
|
|
|
func (s *service) streamOutput() {
|
|
go io.Copy(s.output, s.PID.Output)
|
|
go io.Copy(s.output, s.PID.Error)
|
|
}
|
|
|
|
func (s *service) Running() bool {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
return s.running
|
|
}
|
|
|
|
func (s *service) Start() error {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
if s.running {
|
|
return nil
|
|
}
|
|
|
|
// reset
|
|
s.err = nil
|
|
s.closed = make(chan bool)
|
|
|
|
// TODO: pull source & build binary
|
|
log.Debugf("Runtime service %s forking new process\n")
|
|
p, err := s.Process.Fork(s.Exec)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
// set the pid
|
|
s.PID = p
|
|
// set to running
|
|
s.running = true
|
|
|
|
if s.output != nil {
|
|
s.streamOutput()
|
|
}
|
|
|
|
// wait and watch
|
|
go s.Wait()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) Stop() error {
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
select {
|
|
case <-s.closed:
|
|
return nil
|
|
default:
|
|
close(s.closed)
|
|
s.running = false
|
|
return s.Process.Kill(s.PID)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (s *service) Error() error {
|
|
s.RLock()
|
|
defer s.RUnlock()
|
|
return s.err
|
|
}
|
|
|
|
func (s *service) Wait() {
|
|
// wait for process to exit
|
|
err := s.Process.Wait(s.PID)
|
|
|
|
s.Lock()
|
|
defer s.Unlock()
|
|
|
|
// save the error
|
|
if err != nil {
|
|
s.err = err
|
|
}
|
|
|
|
// no longer running
|
|
s.running = false
|
|
}
|
|
|
|
func (r *runtime) run() {
|
|
r.RLock()
|
|
closed := r.closed
|
|
r.RUnlock()
|
|
|
|
t := time.NewTicker(time.Second * 5)
|
|
defer t.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-t.C:
|
|
// check running services
|
|
r.RLock()
|
|
for _, service := range r.services {
|
|
if service.Running() {
|
|
continue
|
|
}
|
|
|
|
// TODO: check service error
|
|
log.Debugf("Runtime starting %s", service.Name)
|
|
if err := service.Start(); err != nil {
|
|
log.Debugf("Runtime error starting %s: %v", service.Name, err)
|
|
}
|
|
}
|
|
r.RUnlock()
|
|
case service := <-r.start:
|
|
if service.Running() {
|
|
continue
|
|
}
|
|
|
|
// TODO: check service error
|
|
log.Debugf("Starting %s", service.Name)
|
|
if err := service.Start(); err != nil {
|
|
log.Debugf("Runtime error starting %s: %v", service.Name, err)
|
|
}
|
|
case <-closed:
|
|
// TODO: stop all the things
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (r *runtime) Create(s *Service, opts ...CreateOption) error {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
|
|
if _, ok := r.services[s.Name]; ok {
|
|
return errors.New("service already registered")
|
|
}
|
|
|
|
var options CreateOptions
|
|
for _, o := range opts {
|
|
o(&options)
|
|
}
|
|
|
|
if len(s.Exec) == 0 && len(options.Command) == 0 {
|
|
return errors.New("missing exec command")
|
|
}
|
|
|
|
// save service
|
|
r.services[s.Name] = newService(s, options)
|
|
|
|
// push into start queue
|
|
r.start <- r.services[s.Name]
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *runtime) Delete(s *Service) error {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
|
|
if s, ok := r.services[s.Name]; ok {
|
|
delete(r.services, s.Name)
|
|
return s.Stop()
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *runtime) Update(s *Service) error {
|
|
// delete the service
|
|
if err := r.Delete(s); err != nil {
|
|
return err
|
|
}
|
|
|
|
// create new service
|
|
return r.Create(s)
|
|
}
|
|
|
|
func (r *runtime) List() ([]*Service, error) {
|
|
var services []*Service
|
|
r.RLock()
|
|
defer r.RUnlock()
|
|
|
|
for _, service := range r.services {
|
|
services = append(services, service.Service)
|
|
}
|
|
|
|
return services, nil
|
|
}
|
|
|
|
func (r *runtime) Start() error {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
|
|
// already running
|
|
if r.running {
|
|
return nil
|
|
}
|
|
|
|
// set running
|
|
r.running = true
|
|
r.closed = make(chan bool)
|
|
|
|
go r.run()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (r *runtime) Stop() error {
|
|
r.Lock()
|
|
defer r.Unlock()
|
|
|
|
if !r.running {
|
|
return nil
|
|
}
|
|
|
|
select {
|
|
case <-r.closed:
|
|
return nil
|
|
default:
|
|
close(r.closed)
|
|
|
|
// set not running
|
|
r.running = false
|
|
|
|
// stop all the services
|
|
for _, service := range r.services {
|
|
log.Debugf("Runtime stopping %s", service.Name)
|
|
service.Stop()
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|