1
0
mirror of https://github.com/go-micro/go-micro.git synced 2024-11-24 08:02:32 +02:00

remove runtime that no one uses

This commit is contained in:
asim 2024-07-07 18:28:38 +01:00
parent 627066baf9
commit 29d79d748d
23 changed files with 0 additions and 3488 deletions

View File

@ -1,661 +0,0 @@
package runtime
import (
"archive/tar"
"compress/gzip"
"errors"
"fmt"
"io"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/nxadm/tail"
log "go-micro.dev/v5/logger"
"go-micro.dev/v5/runtime/local/git"
)
// defaultNamespace to use if not provided as an option.
const defaultNamespace = "default"
type runtime struct {
// options configure runtime
options *Options
// used to stop the runtime
closed chan bool
// used to start new services
start chan *service
// namespaces stores services grouped by namespace, e.g. namespaces["foo"]["go.micro.auth:latest"]
// would return the latest version of go.micro.auth from the foo namespace
namespaces map[string]map[string]*service
sync.RWMutex
// indicates if we're running
running bool
}
// NewRuntime creates new local runtime and returns it.
func NewRuntime(opts ...Option) Runtime {
// get default options
options := NewOptions(opts...)
// make the logs directory
path := filepath.Join(os.TempDir(), "micro", "logs")
_ = os.MkdirAll(path, 0755)
return &runtime{
options: options,
closed: make(chan bool),
start: make(chan *service, 128),
namespaces: make(map[string]map[string]*service),
}
}
// @todo move this to runtime default.
func (r *runtime) checkoutSourceIfNeeded(s *Service) error {
// Runtime service like config have no source.
// Skip checkout in that case
if len(s.Source) == 0 {
return nil
}
// @todo make this come from config
cpath := filepath.Join(os.TempDir(), "micro", "uploads", s.Source)
path := strings.ReplaceAll(cpath, ".tar.gz", "")
if ex, _ := exists(cpath); ex {
err := os.RemoveAll(path)
if err != nil {
return err
}
err = os.MkdirAll(path, 0777)
if err != nil {
return err
}
err = uncompress(cpath, path)
if err != nil {
return err
}
s.Source = path
return nil
}
source, err := git.ParseSourceLocal("", s.Source)
if err != nil {
return err
}
source.Ref = s.Version
err = git.CheckoutSource(os.TempDir(), source)
if err != nil {
return err
}
s.Source = source.FullPath
return nil
}
// modified version of: https://gist.github.com/mimoo/25fc9716e0f1353791f5908f94d6e726
func uncompress(src string, dst string) error {
file, err := os.OpenFile(src, os.O_RDWR|os.O_CREATE, 0666)
defer file.Close()
if err != nil {
return err
}
// ungzip
zr, err := gzip.NewReader(file)
if err != nil {
return err
}
// untar
tr := tar.NewReader(zr)
// uncompress each element
for {
header, err := tr.Next()
if err == io.EOF {
break // End of archive
}
if err != nil {
return err
}
target := header.Name
// validate name against path traversal
if !validRelPath(header.Name) {
return fmt.Errorf("tar contained invalid name error %q\n", target)
}
// add dst + re-format slashes according to system
target = filepath.Join(dst, header.Name)
// if no join is needed, replace with ToSlash:
// target = filepath.ToSlash(header.Name)
// check the type
switch header.Typeflag {
// if its a dir and it doesn't exist create it (with 0755 permission)
case tar.TypeDir:
if _, err := os.Stat(target); err != nil {
// @todo think about this:
// if we don't nuke the folder, we might end up with files from
// the previous decompress.
if err := os.MkdirAll(target, 0755); err != nil {
return err
}
}
// if it's a file create it (with same permission)
case tar.TypeReg:
// the truncating is probably unnecessary due to the `RemoveAll` of folders
// above
fileToWrite, err := os.OpenFile(target, os.O_TRUNC|os.O_CREATE|os.O_RDWR, os.FileMode(header.Mode))
if err != nil {
return err
}
// copy over contents
if _, err := io.Copy(fileToWrite, tr); err != nil {
return err
}
// manually close here after each file operation; defering would cause each file close
// to wait until all operations have completed.
fileToWrite.Close()
}
}
return nil
}
// check for path traversal and correct forward slashes.
func validRelPath(p string) bool {
if p == "" || strings.Contains(p, `\`) || strings.HasPrefix(p, "/") || strings.Contains(p, "../") {
return false
}
return true
}
// Init initializes runtime options.
func (r *runtime) Init(opts ...Option) error {
r.Lock()
defer r.Unlock()
for _, o := range opts {
o(r.options)
}
return nil
}
// run runs the runtime management loop.
func (r *runtime) run(events <-chan Event) {
t := time.NewTicker(time.Second * 5)
defer t.Stop()
logger := r.options.Logger
// process event processes an incoming event
processEvent := func(event Event, service *service, ns string) error {
// get current vals
r.RLock()
name := service.Name
updated := service.updated
r.RUnlock()
// only process if the timestamp is newer
if !event.Timestamp.After(updated) {
return nil
}
logger.Logf(log.DebugLevel, "Runtime updating service %s in %v namespace", name, ns)
// this will cause a delete followed by created
if err := r.Update(service.Service, UpdateNamespace(ns)); err != nil {
return err
}
// update the local timestamp
r.Lock()
service.updated = updated
r.Unlock()
return nil
}
for {
select {
case <-t.C:
// check running services
r.RLock()
for _, sevices := range r.namespaces {
for _, service := range sevices {
if !service.ShouldStart() {
continue
}
// TODO: check service error
logger.Logf(log.DebugLevel, "Runtime starting %s", service.Name)
if err := service.Start(); err != nil {
logger.Logf(log.DebugLevel, "Runtime error starting %s: %v", service.Name, err)
}
}
}
r.RUnlock()
case service := <-r.start:
if !service.ShouldStart() {
continue
}
// TODO: check service error
logger.Logf(log.DebugLevel, "Runtime starting service %s", service.Name)
if err := service.Start(); err != nil {
logger.Logf(log.DebugLevel, "Runtime error starting service %s: %v", service.Name, err)
}
case event := <-events:
logger.Logf(log.DebugLevel, "Runtime received notification event: %v", event)
// NOTE: we only handle Update events for now
switch event.Type {
case Update:
if event.Service != nil {
ns := defaultNamespace
if event.Options != nil && len(event.Options.Namespace) > 0 {
ns = event.Options.Namespace
}
r.RLock()
if _, ok := r.namespaces[ns]; !ok {
logger.Logf(log.DebugLevel, "Runtime unknown namespace: %s", ns)
r.RUnlock()
continue
}
service, ok := r.namespaces[ns][fmt.Sprintf("%v:%v", event.Service.Name, event.Service.Version)]
r.RUnlock()
if !ok {
logger.Logf(log.DebugLevel, "Runtime unknown service: %s", event.Service)
}
if err := processEvent(event, service, ns); err != nil {
logger.Logf(log.DebugLevel, "Runtime error updating service %s: %v", event.Service, err)
}
continue
}
r.RLock()
namespaces := r.namespaces
r.RUnlock()
// if blank service was received we update all services
for ns, services := range namespaces {
for _, service := range services {
if err := processEvent(event, service, ns); err != nil {
logger.Logf(log.DebugLevel, "Runtime error updating service %s: %v", service.Name, err)
}
}
}
}
case <-r.closed:
logger.Logf(log.DebugLevel, "Runtime stopped")
return
}
}
}
func logFile(serviceName string) string {
// make the directory
name := strings.Replace(serviceName, "/", "-", -1)
path := filepath.Join(os.TempDir(), "micro", "logs")
return filepath.Join(path, fmt.Sprintf("%v.log", name))
}
func serviceKey(s *Service) string {
return fmt.Sprintf("%v:%v", s.Name, s.Version)
}
// Create creates a new service which is then started by runtime.
func (r *runtime) Create(s *Service, opts ...CreateOption) error {
err := r.checkoutSourceIfNeeded(s)
if err != nil {
return err
}
r.Lock()
defer r.Unlock()
var options CreateOptions
for _, o := range opts {
o(&options)
}
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
if len(options.Command) == 0 {
options.Command = []string{"go"}
options.Args = []string{"run", "."}
}
if _, ok := r.namespaces[options.Namespace]; !ok {
r.namespaces[options.Namespace] = make(map[string]*service)
}
if _, ok := r.namespaces[options.Namespace][serviceKey(s)]; ok {
return errors.New("service already running")
}
// create new service
service := newService(s, options, r.options.Logger)
f, err := os.OpenFile(logFile(service.Name), os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
r.options.Logger.Log(log.FatalLevel, err)
}
if service.output != nil {
service.output = io.MultiWriter(service.output, f)
} else {
service.output = f
}
// start the service
if err := service.Start(); err != nil {
return err
}
// save service
r.namespaces[options.Namespace][serviceKey(s)] = service
return nil
}
// exists returns whether the given file or directory exists.
func exists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
// @todo: Getting existing lines is not supported yet.
// The reason for this is because it's hard to calculate line offset
// as opposed to character offset.
// This logger streams by default and only supports the `StreamCount` option.
func (r *runtime) Logs(s *Service, options ...LogsOption) (LogStream, error) {
lopts := LogsOptions{}
for _, o := range options {
o(&lopts)
}
ret := &logStream{
service: s.Name,
stream: make(chan LogRecord),
stop: make(chan bool),
logger: r.options.Logger,
}
fpath := logFile(s.Name)
if ex, err := exists(fpath); err != nil {
return nil, err
} else if !ex {
return nil, fmt.Errorf("Log file %v does not exists", fpath)
}
// have to check file size to avoid too big of a seek
fi, err := os.Stat(fpath)
if err != nil {
return nil, err
}
size := fi.Size()
whence := 2
// Multiply by length of an average line of log in bytes
offset := lopts.Count * 200
if offset > size {
offset = size
}
offset *= -1
t, err := tail.TailFile(fpath, tail.Config{Follow: lopts.Stream, Location: &tail.SeekInfo{
Whence: whence,
Offset: int64(offset),
}, Logger: tail.DiscardingLogger})
if err != nil {
return nil, err
}
ret.tail = t
go func() {
for {
select {
case line, ok := <-t.Lines:
if !ok {
ret.Stop()
return
}
ret.stream <- LogRecord{Message: line.Text}
case <-ret.stop:
return
}
}
}()
return ret, nil
}
type logStream struct {
err error
logger log.Logger
tail *tail.Tail
stream chan LogRecord
stop chan bool
service string
sync.Mutex
}
func (l *logStream) Chan() chan LogRecord {
return l.stream
}
func (l *logStream) Error() error {
return l.err
}
func (l *logStream) Stop() error {
l.Lock()
defer l.Unlock()
select {
case <-l.stop:
return nil
default:
close(l.stop)
close(l.stream)
err := l.tail.Stop()
if err != nil {
l.logger.Logf(log.ErrorLevel, "Error stopping tail: %v", err)
return err
}
}
return nil
}
// Read returns all instances of requested service
// If no service name is provided we return all the track services.
func (r *runtime) Read(opts ...ReadOption) ([]*Service, error) {
r.Lock()
defer r.Unlock()
gopts := ReadOptions{}
for _, o := range opts {
o(&gopts)
}
if len(gopts.Namespace) == 0 {
gopts.Namespace = defaultNamespace
}
save := func(k, v string) bool {
if len(k) == 0 {
return true
}
return k == v
}
//nolint:prealloc
var services []*Service
if _, ok := r.namespaces[gopts.Namespace]; !ok {
return make([]*Service, 0), nil
}
for _, service := range r.namespaces[gopts.Namespace] {
if !save(gopts.Service, service.Name) {
continue
}
if !save(gopts.Version, service.Version) {
continue
}
// TODO deal with service type
// no version has sbeen requested, just append the service
services = append(services, service.Service)
}
return services, nil
}
// Update attempts to update the service.
func (r *runtime) Update(s *Service, opts ...UpdateOption) error {
var options UpdateOptions
for _, o := range opts {
o(&options)
}
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
err := r.checkoutSourceIfNeeded(s)
if err != nil {
return err
}
r.Lock()
srvs, ok := r.namespaces[options.Namespace]
r.Unlock()
if !ok {
return errors.New("Service not found")
}
r.Lock()
service, ok := srvs[serviceKey(s)]
r.Unlock()
if !ok {
return errors.New("Service not found")
}
if err := service.Stop(); err != nil && err.Error() != "no such process" {
r.options.Logger.Logf(log.ErrorLevel, "Error stopping service %s: %s", service.Name, err)
return err
}
return service.Start()
}
// Delete removes the service from the runtime and stops it.
func (r *runtime) Delete(s *Service, opts ...DeleteOption) error {
r.Lock()
defer r.Unlock()
var options DeleteOptions
for _, o := range opts {
o(&options)
}
if len(options.Namespace) == 0 {
options.Namespace = defaultNamespace
}
srvs, ok := r.namespaces[options.Namespace]
if !ok {
return nil
}
r.options.Logger.Logf(log.DebugLevel, "Runtime deleting service %s", s.Name)
service, ok := srvs[serviceKey(s)]
if !ok {
return nil
}
// check if running
if !service.Running() {
delete(srvs, service.key())
r.namespaces[options.Namespace] = srvs
return nil
}
// otherwise stop it
if err := service.Stop(); err != nil {
return err
}
// delete it
delete(srvs, service.key())
r.namespaces[options.Namespace] = srvs
return nil
}
// Start starts the runtime.
func (r *runtime) Start() error {
r.Lock()
defer r.Unlock()
// already running
if r.running {
return nil
}
// set running
r.running = true
r.closed = make(chan bool)
var events <-chan Event
if r.options.Scheduler != nil {
var err error
events, err = r.options.Scheduler.Notify()
if err != nil {
// TODO: should we bail here?
r.options.Logger.Logf(log.DebugLevel, "Runtime failed to start update notifier")
}
}
go r.run(events)
return nil
}
// Stop stops the runtime.
func (r *runtime) Stop() error {
r.Lock()
defer r.Unlock()
if !r.running {
return nil
}
select {
case <-r.closed:
return nil
default:
close(r.closed)
// set not running
r.running = false
// stop all the services
for _, services := range r.namespaces {
for _, service := range services {
r.options.Logger.Logf(log.DebugLevel, "Runtime stopping %s", service.Name)
service.Stop()
}
}
// stop the scheduler
if r.options.Scheduler != nil {
return r.options.Scheduler.Close()
}
}
return nil
}
// String implements stringer interface.
func (r *runtime) String() string {
return "local"
}

View File

@ -1,627 +0,0 @@
// Package kubernetes implements kubernetes micro runtime
package kubernetes
import (
"fmt"
"sync"
"time"
log "go-micro.dev/v5/logger"
"go-micro.dev/v5/runtime"
"go-micro.dev/v5/util/kubernetes/client"
)
// action to take on runtime service.
type action int
type kubernetes struct {
// client is kubernetes client
client client.Client
// options configure runtime
options *runtime.Options
// used to stop the runtime
closed chan bool
// namespaces which exist
namespaces []client.Namespace
sync.RWMutex
// indicates if we're running
running bool
}
// namespaceExists returns a boolean indicating if a namespace exists.
func (k *kubernetes) namespaceExists(name string) (bool, error) {
// populate the cache
if k.namespaces == nil {
namespaceList := new(client.NamespaceList)
resource := &client.Resource{Kind: "namespace", Value: namespaceList}
if err := k.client.List(resource); err != nil {
return false, err
}
k.namespaces = namespaceList.Items
}
// check if the namespace exists in the cache
for _, n := range k.namespaces {
if n.Metadata.Name == name {
return true, nil
}
}
return false, nil
}
// createNamespace creates a new k8s namespace.
func (k *kubernetes) createNamespace(namespace string) error {
ns := client.Namespace{Metadata: &client.Metadata{Name: namespace}}
err := k.client.Create(&client.Resource{Kind: "namespace", Value: ns})
// add to cache
if err == nil && k.namespaces != nil {
k.namespaces = append(k.namespaces, ns)
}
return err
}
// getService queries kubernetes for micro service
// NOTE: this function is not thread-safe.
func (k *kubernetes) getService(labels map[string]string, opts ...client.GetOption) ([]*service, error) {
// get the service status
serviceList := new(client.ServiceList)
r := &client.Resource{
Kind: "service",
Value: serviceList,
}
opts = append(opts, client.GetLabels(labels))
// get the service from k8s
if err := k.client.Get(r, opts...); err != nil {
return nil, err
}
// get the deployment status
depList := new(client.DeploymentList)
d := &client.Resource{
Kind: "deployment",
Value: depList,
}
if err := k.client.Get(d, opts...); err != nil {
return nil, err
}
// get the pods from k8s
podList := new(client.PodList)
p := &client.Resource{
Kind: "pod",
Value: podList,
}
if err := k.client.Get(p, opts...); err != nil {
return nil, err
}
// service map
svcMap := make(map[string]*service)
// collect info from kubernetes service
for _, kservice := range serviceList.Items {
// name of the service
name := kservice.Metadata.Labels["name"]
// version of the service
version := kservice.Metadata.Labels["version"]
srv := &service{
Service: &runtime.Service{
Name: name,
Version: version,
Metadata: make(map[string]string),
},
kservice: &kservice,
}
// set the address
address := kservice.Spec.ClusterIP
port := kservice.Spec.Ports[0]
srv.Service.Metadata["address"] = fmt.Sprintf("%s:%d", address, port.Port)
// set the type of service
srv.Service.Metadata["type"] = kservice.Metadata.Labels["micro"]
// copy annotations metadata into service metadata
for k, v := range kservice.Metadata.Annotations {
srv.Service.Metadata[k] = v
}
// save as service
svcMap[name+version] = srv
}
// collect additional info from kubernetes deployment
for _, kdep := range depList.Items {
// name of the service
name := kdep.Metadata.Labels["name"]
// versio of the service
version := kdep.Metadata.Labels["version"]
// access existing service map based on name + version
if svc, ok := svcMap[name+version]; ok {
// we're expecting our own service name in metadata
if _, ok := kdep.Metadata.Annotations["name"]; !ok {
continue
}
// set the service name, version and source
// based on existing annotations we stored
svc.Service.Name = kdep.Metadata.Annotations["name"]
svc.Service.Version = kdep.Metadata.Annotations["version"]
svc.Service.Source = kdep.Metadata.Annotations["source"]
// delete from metadata
delete(kdep.Metadata.Annotations, "name")
delete(kdep.Metadata.Annotations, "version")
delete(kdep.Metadata.Annotations, "source")
// copy all annotations metadata into service metadata
for k, v := range kdep.Metadata.Annotations {
svc.Service.Metadata[k] = v
}
// parse out deployment status and inject into service metadata
if len(kdep.Status.Conditions) > 0 {
svc.Status(kdep.Status.Conditions[0].Type, nil)
svc.Metadata["started"] = kdep.Status.Conditions[0].LastUpdateTime
} else {
svc.Status("n/a", nil)
}
// get the real status
for _, item := range podList.Items {
var status string
// check the name
if item.Metadata.Labels["name"] != name {
continue
}
// check the version
if item.Metadata.Labels["version"] != version {
continue
}
switch item.Status.Phase {
case "Failed":
status = item.Status.Reason
default:
status = item.Status.Phase
}
// skip if we can't get the container
if len(item.Status.Containers) == 0 {
continue
}
// now try get a deeper status
state := item.Status.Containers[0].State
// set start time
if state.Running != nil {
svc.Metadata["started"] = state.Running.Started
}
// set status from waiting
if v := state.Waiting; v != nil {
if len(v.Reason) > 0 {
status = v.Reason
}
}
// TODO: set from terminated
svc.Status(status, nil)
}
// save deployment
svc.kdeploy = &kdep
}
}
// collect all the services and return
services := make([]*service, 0, len(serviceList.Items))
for _, service := range svcMap {
services = append(services, service)
}
return services, nil
}
// run runs the runtime management loop.
func (k *kubernetes) run(events <-chan runtime.Event) {
t := time.NewTicker(time.Second * 10)
defer t.Stop()
logger := k.options.Logger
for {
select {
case <-t.C:
// TODO: figure out what to do here
// - do we even need the ticker for k8s services?
case event := <-events:
// NOTE: we only handle Update events for now
logger.Logf(log.DebugLevel, "Runtime received notification event: %v", event)
switch event.Type {
case runtime.Update:
// only process if there's an actual service
// we do not update all the things individually
if event.Service == nil {
continue
}
// format the name
name := client.Format(event.Service.Name)
// set the default labels
labels := map[string]string{
"micro": k.options.Type,
"name": name,
}
if len(event.Service.Version) > 0 {
labels["version"] = event.Service.Version
}
// get the deployment status
deployed := new(client.DeploymentList)
// get the existing service rather than creating a new one
err := k.client.Get(&client.Resource{
Kind: "deployment",
Value: deployed,
}, client.GetLabels(labels))
if err != nil {
logger.Logf(log.DebugLevel, "Runtime update failed to get service %s: %v", event.Service, err)
continue
}
// technically we should not receive multiple versions but hey ho
for _, service := range deployed.Items {
// check the name matches
if service.Metadata.Name != name {
continue
}
// update build time annotation
if service.Spec.Template.Metadata.Annotations == nil {
service.Spec.Template.Metadata.Annotations = make(map[string]string)
}
// update the build time
service.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", event.Timestamp.Unix())
logger.Logf(log.DebugLevel, "Runtime updating service: %s deployment: %s", event.Service, service.Metadata.Name)
if err := k.client.Update(deploymentResource(&service)); err != nil {
logger.Logf(log.DebugLevel, "Runtime failed to update service %s: %v", event.Service, err)
continue
}
}
}
case <-k.closed:
logger.Logf(log.DebugLevel, "Runtime stopped")
return
}
}
}
// Init initializes runtime options.
func (k *kubernetes) Init(opts ...runtime.Option) error {
k.Lock()
defer k.Unlock()
for _, o := range opts {
o(k.options)
}
return nil
}
func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (runtime.LogStream, error) {
klo := newLog(k.client, s.Name, options...)
stream, err := klo.Stream()
if err != nil {
return nil, err
}
// If requested, also read existing records and stream those too
if klo.options.Count > 0 {
go func() {
records, err := klo.Read()
if err != nil {
k.options.Logger.Logf(log.ErrorLevel, "Failed to get logs for service '%v' from k8s: %v", err)
return
}
// @todo: this might actually not run before podLogStream starts
// and might cause out of order log retrieval at the receiving end.
// A better approach would probably to suppor this inside the `klog.Stream` method.
for _, record := range records {
stream.Chan() <- record
}
}()
}
return stream, nil
}
type kubeStream struct {
err error
// the k8s log stream
stream chan runtime.LogRecord
stop chan bool
// the stop chan
sync.Mutex
}
func (k *kubeStream) Error() error {
return k.err
}
func (k *kubeStream) Chan() chan runtime.LogRecord {
return k.stream
}
func (k *kubeStream) Stop() error {
k.Lock()
defer k.Unlock()
select {
case <-k.stop:
return nil
default:
close(k.stop)
close(k.stream)
}
return nil
}
// Creates a service.
func (k *kubernetes) Create(s *runtime.Service, opts ...runtime.CreateOption) error {
k.Lock()
defer k.Unlock()
options := runtime.CreateOptions{
Type: k.options.Type,
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
// default type if it doesn't exist
if len(options.Type) == 0 {
options.Type = k.options.Type
}
// default the source if it doesn't exist
if len(s.Source) == 0 {
s.Source = k.options.Source
}
// ensure the namespace exists
namespace := client.SerializeResourceName(options.Namespace)
// only do this if the namespace is not default
if namespace != "default" {
if exist, err := k.namespaceExists(namespace); err == nil && !exist {
if err := k.createNamespace(namespace); err != nil {
return err
}
} else if err != nil {
return err
}
}
// determine the image from the source and options
options.Image = k.getImage(s, options)
// create new service
service := newService(s, options, k.options.Logger)
// start the service
return service.Start(k.client, client.CreateNamespace(options.Namespace))
}
// Read returns all instances of given service.
func (k *kubernetes) Read(opts ...runtime.ReadOption) ([]*runtime.Service, error) {
k.Lock()
defer k.Unlock()
// set the default labels
labels := map[string]string{}
options := runtime.ReadOptions{
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
if len(options.Service) > 0 {
labels["name"] = client.Format(options.Service)
}
// add version to labels if a version has been supplied
if len(options.Version) > 0 {
labels["version"] = options.Version
}
if len(options.Type) > 0 {
labels["micro"] = options.Type
}
srvs, err := k.getService(labels, client.GetNamespace(options.Namespace))
if err != nil {
return nil, err
}
var services []*runtime.Service
for _, service := range srvs {
services = append(services, service.Service)
}
return services, nil
}
// Update the service in place.
func (k *kubernetes) Update(s *runtime.Service, opts ...runtime.UpdateOption) error {
options := runtime.UpdateOptions{
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
labels := map[string]string{}
if len(s.Name) > 0 {
labels["name"] = client.Format(s.Name)
}
if len(s.Version) > 0 {
labels["version"] = s.Version
}
// get the existing service
services, err := k.getService(labels)
if err != nil {
return err
}
// update the relevant services
for _, service := range services {
// nil check
if service.kdeploy.Metadata == nil || service.kdeploy.Metadata.Annotations == nil {
md := new(client.Metadata)
md.Annotations = make(map[string]string)
service.kdeploy.Metadata = md
}
// update metadata
for k, v := range s.Metadata {
service.kdeploy.Metadata.Annotations[k] = v
}
// update build time annotation
service.kdeploy.Spec.Template.Metadata.Annotations["updated"] = fmt.Sprintf("%d", time.Now().Unix())
// update the service
if err := service.Update(k.client, client.UpdateNamespace(options.Namespace)); err != nil {
return err
}
}
return nil
}
// Delete removes a service.
func (k *kubernetes) Delete(s *runtime.Service, opts ...runtime.DeleteOption) error {
options := runtime.DeleteOptions{
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
k.Lock()
defer k.Unlock()
// create new kubernetes micro service
service := newService(s, runtime.CreateOptions{
Type: k.options.Type,
Namespace: options.Namespace,
}, k.options.Logger)
return service.Stop(k.client, client.DeleteNamespace(options.Namespace))
}
// Start starts the runtime.
func (k *kubernetes) Start() error {
k.Lock()
defer k.Unlock()
// already running
if k.running {
return nil
}
// set running
k.running = true
k.closed = make(chan bool)
var events <-chan runtime.Event
if k.options.Scheduler != nil {
var err error
events, err = k.options.Scheduler.Notify()
if err != nil {
// TODO: should we bail here?
k.options.Logger.Logf(log.DebugLevel, "Runtime failed to start update notifier")
}
}
go k.run(events)
return nil
}
// Stop shuts down the runtime.
func (k *kubernetes) Stop() error {
k.Lock()
defer k.Unlock()
if !k.running {
return nil
}
select {
case <-k.closed:
return nil
default:
close(k.closed)
// set not running
k.running = false
// stop the scheduler
if k.options.Scheduler != nil {
return k.options.Scheduler.Close()
}
}
return nil
}
// String implements stringer interface.
func (k *kubernetes) String() string {
return "kubernetes"
}
// NewRuntime creates new kubernetes runtime.
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
// get default options
// Create labels with type "micro": "service"
mtops := append([]runtime.Option{runtime.WithType("service")}, opts...)
options := runtime.NewOptions(mtops...)
// kubernetes client
client := client.NewClusterClient()
return &kubernetes{
options: options,
closed: make(chan bool),
client: client,
}
}
func (k *kubernetes) getImage(s *runtime.Service, options runtime.CreateOptions) string {
// use the image when its specified
if len(options.Image) > 0 {
return options.Image
}
if len(k.options.Image) > 0 {
return k.options.Image
}
return ""
}

View File

@ -1,204 +0,0 @@
// Package kubernetes taken from https://github.com/micro/go-micro/blob/master/debug/log/kubernetes/kubernetes.go
// There are some modifications compared to the other files as
// this package doesn't provide write functionality.
// With the write functionality gone, structured logs also go away.
package kubernetes
import (
"bufio"
"strconv"
"time"
"go-micro.dev/v5/logger"
"go-micro.dev/v5/runtime"
"go-micro.dev/v5/util/kubernetes/client"
)
type klog struct {
options runtime.LogsOptions
client client.Client
serviceName string
}
func (k *klog) podLogStream(podName string, stream *kubeStream) error {
p := make(map[string]string)
p["follow"] = "true"
opts := []client.LogOption{
client.LogParams(p),
client.LogNamespace(k.options.Namespace),
}
// get the logs for the pod
body, err := k.client.Log(&client.Resource{
Name: podName,
Kind: "pod",
}, opts...)
if err != nil {
stream.err = err
if err := stream.Stop(); err != nil {
stream.err = err
return err
}
return err
}
s := bufio.NewScanner(body)
defer body.Close()
for {
select {
case <-stream.stop:
return stream.Error()
default:
if s.Scan() {
record := runtime.LogRecord{
Message: s.Text(),
}
stream.stream <- record
} else {
// TODO: is there a blocking call
// rather than a sleep loop?
time.Sleep(time.Second)
}
}
}
}
func (k *klog) getMatchingPods() ([]string, error) {
r := &client.Resource{
Kind: "pod",
Value: new(client.PodList),
}
l := make(map[string]string)
l["name"] = client.Format(k.serviceName)
// TODO: specify micro:service
// l["micro"] = "service"
opts := []client.GetOption{
client.GetLabels(l),
client.GetNamespace(k.options.Namespace),
}
if err := k.client.Get(r, opts...); err != nil {
return nil, err
}
var matches []string
podList, ok := r.Value.(*client.PodList)
if !ok {
logger.DefaultLogger.Log(logger.ErrorLevel, "Failed to cast to *client.PodList")
}
for _, p := range podList.Items {
// find labels that match the name
if p.Metadata.Labels["name"] == client.Format(k.serviceName) {
matches = append(matches, p.Metadata.Name)
}
}
return matches, nil
}
func (k *klog) Read() ([]runtime.LogRecord, error) {
pods, err := k.getMatchingPods()
if err != nil {
return nil, err
}
var records []runtime.LogRecord
for _, pod := range pods {
logParams := make(map[string]string)
// if !opts.Since.Equal(time.Time{}) {
// logParams["sinceSeconds"] = strconv.Itoa(int(time.Since(opts.Since).Seconds()))
//}
if k.options.Count != 0 {
logParams["tailLines"] = strconv.Itoa(int(k.options.Count))
}
if k.options.Stream {
logParams["follow"] = "true"
}
opts := []client.LogOption{
client.LogParams(logParams),
client.LogNamespace(k.options.Namespace),
}
logs, err := k.client.Log(&client.Resource{
Name: pod,
Kind: "pod",
}, opts...)
if err != nil {
return nil, err
}
defer logs.Close()
s := bufio.NewScanner(logs)
for s.Scan() {
record := runtime.LogRecord{
Message: s.Text(),
}
// record.Metadata["pod"] = pod
records = append(records, record)
}
}
// sort the records
// sort.Slice(records, func(i, j int) bool { return records[i].Timestamp.Before(records[j].Timestamp) })
return records, nil
}
func (k *klog) Stream() (runtime.LogStream, error) {
// find the matching pods
pods, err := k.getMatchingPods()
if err != nil {
return nil, err
}
stream := &kubeStream{
stream: make(chan runtime.LogRecord),
stop: make(chan bool),
}
// stream from the individual pods
for _, pod := range pods {
go func(podName string) {
err := k.podLogStream(podName, stream)
if err != nil {
logger.DefaultLogger.Log(logger.ErrorLevel, err)
}
}(pod)
}
return stream, nil
}
// NewLog returns a configured Kubernetes logger.
func newLog(c client.Client, serviceName string, opts ...runtime.LogsOption) *klog {
options := runtime.LogsOptions{
Namespace: client.DefaultNamespace,
}
for _, o := range opts {
o(&options)
}
klog := &klog{
serviceName: serviceName,
client: c,
options: options,
}
return klog
}

View File

@ -1,188 +0,0 @@
package kubernetes
import (
"encoding/json"
"strings"
"time"
log "go-micro.dev/v5/logger"
"go-micro.dev/v5/runtime"
"go-micro.dev/v5/util/kubernetes/api"
"go-micro.dev/v5/util/kubernetes/client"
)
type service struct {
// service to manage
*runtime.Service
// Kubernetes service
kservice *client.Service
// Kubernetes deployment
kdeploy *client.Deployment
// to be used logger
logger log.Logger
}
func parseError(err error) *api.Status {
status := new(api.Status)
json.Unmarshal([]byte(err.Error()), &status)
return status
}
func newService(s *runtime.Service, c runtime.CreateOptions, l log.Logger) *service {
// use pre-formatted name/version
name := client.Format(s.Name)
version := client.Format(s.Version)
kservice := client.NewService(name, version, c.Type, c.Namespace)
kdeploy := client.NewDeployment(name, version, c.Type, c.Namespace)
// ensure the metadata is set
if kdeploy.Spec.Template.Metadata.Annotations == nil {
kdeploy.Spec.Template.Metadata.Annotations = make(map[string]string)
}
// create if non existent
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
// add the service metadata to the k8s labels, do this first so we
// don't override any labels used by the runtime, e.g. name
for k, v := range s.Metadata {
kdeploy.Metadata.Annotations[k] = v
}
// attach our values to the deployment; name, version, source
kdeploy.Metadata.Annotations["name"] = s.Name
kdeploy.Metadata.Annotations["version"] = s.Version
kdeploy.Metadata.Annotations["source"] = s.Source
// associate owner:group to be later augmented
kdeploy.Metadata.Annotations["owner"] = "micro"
kdeploy.Metadata.Annotations["group"] = "micro"
// update the deployment is a custom source is provided
if len(c.Image) > 0 {
for i := range kdeploy.Spec.Template.PodSpec.Containers {
kdeploy.Spec.Template.PodSpec.Containers[i].Image = c.Image
kdeploy.Spec.Template.PodSpec.Containers[i].Command = []string{}
kdeploy.Spec.Template.PodSpec.Containers[i].Args = []string{}
}
}
// define the environment values used by the container
env := make([]client.EnvVar, 0, len(c.Env))
for _, evar := range c.Env {
evarPair := strings.Split(evar, "=")
env = append(env, client.EnvVar{Name: evarPair[0], Value: evarPair[1]})
}
// if environment has been supplied update deployment default environment
if len(env) > 0 {
kdeploy.Spec.Template.PodSpec.Containers[0].Env = append(kdeploy.Spec.Template.PodSpec.Containers[0].Env, env...)
}
// set the command if specified
if len(c.Command) > 0 {
kdeploy.Spec.Template.PodSpec.Containers[0].Command = c.Command
}
if len(c.Args) > 0 {
kdeploy.Spec.Template.PodSpec.Containers[0].Args = c.Args
}
return &service{
Service: s,
kservice: kservice,
kdeploy: kdeploy,
logger: log.LoggerOrDefault(l),
}
}
func deploymentResource(d *client.Deployment) *client.Resource {
return &client.Resource{
Name: d.Metadata.Name,
Kind: "deployment",
Value: d,
}
}
func serviceResource(s *client.Service) *client.Resource {
return &client.Resource{
Name: s.Metadata.Name,
Kind: "service",
Value: s,
}
}
// Start starts the Kubernetes service. It creates new kubernetes deployment and service API objects.
func (s *service) Start(k client.Client, opts ...client.CreateOption) error {
// create deployment first; if we fail, we dont create service
if err := k.Create(deploymentResource(s.kdeploy), opts...); err != nil {
s.logger.Logf(log.DebugLevel, "Runtime failed to create deployment: %v", err)
s.Status("error", err)
v := parseError(err)
if v.Reason == "AlreadyExists" {
return runtime.ErrAlreadyExists
}
return err
}
// create service now that the deployment has been created
if err := k.Create(serviceResource(s.kservice), opts...); err != nil {
s.logger.Logf(log.DebugLevel, "Runtime failed to create service: %v", err)
s.Status("error", err)
v := parseError(err)
if v.Reason == "AlreadyExists" {
return runtime.ErrAlreadyExists
}
return err
}
s.Status("started", nil)
return nil
}
func (s *service) Stop(k client.Client, opts ...client.DeleteOption) error {
// first attempt to delete service
if err := k.Delete(serviceResource(s.kservice), opts...); err != nil {
s.logger.Logf(log.DebugLevel, "Runtime failed to delete service: %v", err)
s.Status("error", err)
return err
}
// delete deployment once the service has been deleted
if err := k.Delete(deploymentResource(s.kdeploy), opts...); err != nil {
s.logger.Logf(log.DebugLevel, "Runtime failed to delete deployment: %v", err)
s.Status("error", err)
return err
}
s.Status("stopped", nil)
return nil
}
func (s *service) Update(k client.Client, opts ...client.UpdateOption) error {
if err := k.Update(deploymentResource(s.kdeploy), opts...); err != nil {
s.logger.Logf(log.DebugLevel, "Runtime failed to update deployment: %v", err)
s.Status("error", err)
return err
}
if err := k.Update(serviceResource(s.kservice), opts...); err != nil {
s.logger.Logf(log.DebugLevel, "Runtime failed to update service: %v", err)
return err
}
return nil
}
func (s *service) Status(status string, err error) {
s.Metadata["lastStatusUpdate"] = time.Now().Format(time.RFC3339)
if err == nil {
s.Metadata["status"] = status
delete(s.Metadata, "error")
return
}
s.Metadata["status"] = "error"
s.Metadata["error"] = err.Error()
}

View File

@ -1,34 +0,0 @@
// Package build builds a micro runtime package
package build
import (
"go-micro.dev/v5/runtime/local/source"
)
// Builder builds binaries.
type Builder interface {
// Build builds a package
Build(*Source) (*Package, error)
// Clean deletes the package
Clean(*Package) error
}
// Source is the source of a build.
type Source struct {
// Location of the source
Repository *source.Repository
// Language is the language of code
Language string
}
// Package is micro service package.
type Package struct {
// Source of the binary
Source *Source
// Name of the binary
Name string
// Location of the binary
Path string
// Type of binary
Type string
}

View File

@ -1,94 +0,0 @@
// Package docker builds docker images
package docker
import (
"archive/tar"
"bytes"
"io"
"os"
"path/filepath"
docker "github.com/fsouza/go-dockerclient"
"go-micro.dev/v5/logger"
"go-micro.dev/v5/runtime/local/build"
)
type Builder struct {
Client *docker.Client
Options build.Options
}
func (d *Builder) Build(s *build.Source) (*build.Package, error) {
image := filepath.Join(s.Repository.Path, s.Repository.Name)
buf := new(bytes.Buffer)
tw := tar.NewWriter(buf)
defer tw.Close()
dockerFile := "Dockerfile"
// open docker file
f, err := os.Open(filepath.Join(s.Repository.Path, s.Repository.Name, dockerFile))
if err != nil {
return nil, err
}
defer f.Close()
// read docker file
by, err := io.ReadAll(f)
if err != nil {
return nil, err
}
tarHeader := &tar.Header{
Name: dockerFile,
Size: int64(len(by)),
}
err = tw.WriteHeader(tarHeader)
if err != nil {
return nil, err
}
_, err = tw.Write(by)
if err != nil {
return nil, err
}
tr := bytes.NewReader(buf.Bytes())
err = d.Client.BuildImage(docker.BuildImageOptions{
Name: image,
Dockerfile: dockerFile,
InputStream: tr,
OutputStream: io.Discard,
RmTmpContainer: true,
SuppressOutput: true,
})
if err != nil {
return nil, err
}
return &build.Package{
Name: image,
Path: image,
Type: "docker",
Source: s,
}, nil
}
func (d *Builder) Clean(b *build.Package) error {
image := filepath.Join(b.Path, b.Name)
return d.Client.RemoveImage(image)
}
func NewBuilder(opts ...build.Option) build.Builder {
options := build.Options{}
for _, o := range opts {
o(&options)
}
endpoint := "unix:///var/run/docker.sock"
client, err := docker.NewClient(endpoint)
if err != nil {
logger.Log(logger.FatalLevel, err)
}
return &Builder{
Options: options,
Client: client,
}
}

View File

@ -1,70 +0,0 @@
// Package golang is a go package manager
package golang
import (
"os"
"os/exec"
"path/filepath"
"go-micro.dev/v5/runtime/local/build"
)
type Builder struct {
Options build.Options
Cmd string
Path string
}
// whichGo locates the go command.
func whichGo() string {
// check GOROOT
if gr := os.Getenv("GOROOT"); len(gr) > 0 {
return filepath.Join(gr, "bin", "go")
}
// check path
for _, p := range filepath.SplitList(os.Getenv("PATH")) {
bin := filepath.Join(p, "go")
if _, err := os.Stat(bin); err == nil {
return bin
}
}
// best effort
return "go"
}
func (g *Builder) Build(s *build.Source) (*build.Package, error) {
binary := filepath.Join(g.Path, s.Repository.Name)
source := filepath.Join(s.Repository.Path, s.Repository.Name)
cmd := exec.Command(g.Cmd, "build", "-o", binary, source)
if err := cmd.Run(); err != nil {
return nil, err
}
return &build.Package{
Name: s.Repository.Name,
Path: binary,
Type: "go",
Source: s,
}, nil
}
func (g *Builder) Clean(b *build.Package) error {
binary := filepath.Join(b.Path, b.Name)
return os.Remove(binary)
}
func NewBuild(opts ...build.Option) build.Builder {
options := build.Options{
Path: os.TempDir(),
}
for _, o := range opts {
o(&options)
}
return &Builder{
Options: options,
Cmd: whichGo(),
Path: options.Path,
}
}

View File

@ -1,15 +0,0 @@
package build
type Options struct {
// local path to download source
Path string
}
type Option func(o *Options)
// Local path for repository.
func Path(p string) Option {
return func(o *Options) {
o.Path = p
}
}

View File

@ -1,341 +0,0 @@
package git
import (
"errors"
"fmt"
"os"
"os/exec"
"path/filepath"
"regexp"
"strings"
"github.com/go-git/go-git/v5"
"github.com/go-git/go-git/v5/config"
"github.com/go-git/go-git/v5/plumbing"
)
type Gitter interface {
Clone(repo string) error
FetchAll(repo string) error
Checkout(repo, branchOrCommit string) error
RepoDir(repo string) string
}
type libGitter struct {
folder string
}
func (g libGitter) Clone(repo string) error {
fold := filepath.Join(g.folder, dirifyRepo(repo))
exists, err := pathExists(fold)
if err != nil {
return err
}
if exists {
return nil
}
_, err = git.PlainClone(fold, false, &git.CloneOptions{
URL: repo,
Progress: os.Stdout,
})
return err
}
func (g libGitter) FetchAll(repo string) error {
repos, err := git.PlainOpen(filepath.Join(g.folder, dirifyRepo(repo)))
if err != nil {
return err
}
remotes, err := repos.Remotes()
if err != nil {
return err
}
err = remotes[0].Fetch(&git.FetchOptions{
RefSpecs: []config.RefSpec{"refs/*:refs/*", "HEAD:refs/heads/HEAD"},
Progress: os.Stdout,
Depth: 1,
})
if err != nil && err != git.NoErrAlreadyUpToDate {
return err
}
return nil
}
func (g libGitter) Checkout(repo, branchOrCommit string) error {
if branchOrCommit == "latest" {
branchOrCommit = "master"
}
repos, err := git.PlainOpen(filepath.Join(g.folder, dirifyRepo(repo)))
if err != nil {
return err
}
worktree, err := repos.Worktree()
if err != nil {
return err
}
if plumbing.IsHash(branchOrCommit) {
return worktree.Checkout(&git.CheckoutOptions{
Hash: plumbing.NewHash(branchOrCommit),
Force: true,
})
}
return worktree.Checkout(&git.CheckoutOptions{
Branch: plumbing.NewBranchReferenceName(branchOrCommit),
Force: true,
})
}
func (g libGitter) RepoDir(repo string) string {
return filepath.Join(g.folder, dirifyRepo(repo))
}
type binaryGitter struct {
folder string
}
func (g binaryGitter) Clone(repo string) error {
fold := filepath.Join(g.folder, dirifyRepo(repo), ".git")
exists, err := pathExists(fold)
if err != nil {
return err
}
if exists {
return nil
}
fold = filepath.Join(g.folder, dirifyRepo(repo))
cmd := exec.Command("git", "clone", repo, ".")
err = os.MkdirAll(fold, 0777)
if err != nil {
return err
}
cmd.Dir = fold
_, err = cmd.Output()
if err != nil {
return err
}
return err
}
func (g binaryGitter) FetchAll(repo string) error {
cmd := exec.Command("git", "fetch", "--all")
cmd.Dir = filepath.Join(g.folder, dirifyRepo(repo))
outp, err := cmd.CombinedOutput()
if err != nil {
return errors.New(string(outp))
}
return err
}
func (g binaryGitter) Checkout(repo, branchOrCommit string) error {
if branchOrCommit == "latest" {
branchOrCommit = "master"
}
cmd := exec.Command("git", "checkout", "-f", branchOrCommit)
cmd.Dir = filepath.Join(g.folder, dirifyRepo(repo))
outp, err := cmd.CombinedOutput()
if err != nil {
return errors.New(string(outp))
}
return nil
}
func (g binaryGitter) RepoDir(repo string) string {
return filepath.Join(g.folder, dirifyRepo(repo))
}
func NewGitter(folder string) Gitter {
if commandExists("git") {
return binaryGitter{folder}
}
return libGitter{folder}
}
func commandExists(cmd string) bool {
_, err := exec.LookPath(cmd)
return err == nil
}
func dirifyRepo(s string) string {
s = strings.ReplaceAll(s, "https://", "")
s = strings.ReplaceAll(s, "/", "-")
return s
}
// exists returns whether the given file or directory exists.
func pathExists(path string) (bool, error) {
_, err := os.Stat(path)
if err == nil {
return true, nil
}
if os.IsNotExist(err) {
return false, nil
}
return true, err
}
// GetRepoRoot determines the repo root from a full path.
// Returns empty string and no error if not found.
func GetRepoRoot(fullPath string) (string, error) {
// traverse parent directories
prev := fullPath
for {
current := prev
exists, err := pathExists(filepath.Join(current, ".git"))
if err != nil {
return "", err
}
if exists {
return current, nil
}
prev = filepath.Dir(current)
// reached top level, see:
// https://play.golang.org/p/rDgVdk3suzb
if current == prev {
break
}
}
return "", nil
}
const defaultRepo = "github.com/micro/services"
// Source is not just git related @todo move.
type Source struct {
// absolute path to service folder in local mode
FullPath string
// path of folder to repo root
// be it local or github repo
Folder string
// github ref
Ref string
// for cloning purposes
// blank for local
Repo string
// dir to repo root
// blank for non local
LocalRepoRoot string
// is it a local folder intended for a local runtime?
Local bool
}
// Name to be passed to RPC call runtime.Create Update Delete
// eg: `helloworld/api`, `crufter/myrepo/helloworld/api`, `localfolder`
func (s *Source) RuntimeName() string {
if s.Repo == "github.com/micro/services" || s.Repo == "" {
return s.Folder
}
return fmt.Sprintf("%v/%v", strings.ReplaceAll(s.Repo, "github.com/", ""), s.Folder)
}
// Source to be passed to RPC call runtime.Create Update Delete
// eg: `helloworld`, `github.com/crufter/myrepo/helloworld`, `/path/to/localrepo/localfolder`
func (s *Source) RuntimeSource() string {
if s.Local {
return s.FullPath
}
if s.Repo == "github.com/micro/services" || s.Repo == "" {
return s.Folder
}
return fmt.Sprintf("%v/%v", s.Repo, s.Folder)
}
// ParseSource parses a `micro run/update/kill` source.
func ParseSource(source string) (*Source, error) {
// If github is not present, we got a shorthand for `micro/services`
if !strings.Contains(source, "github.com") {
source = "github.com/micro/services/" + source
}
if !strings.Contains(source, "@") {
source += "@latest"
}
ret := &Source{}
refs := strings.Split(source, "@")
ret.Ref = refs[1]
parts := strings.Split(refs[0], "/")
ret.Repo = strings.Join(parts[0:3], "/")
if len(parts) > 1 {
ret.Folder = strings.Join(parts[3:], "/")
}
return ret, nil
}
// ParseSourceLocal detects and handles local pathes too
// workdir should be used only from the CLI @todo better interface for this function.
// PathExistsFunc exists only for testing purposes, to make the function side effect free.
func ParseSourceLocal(workDir, source string, pathExistsFunc ...func(path string) (bool, error)) (*Source, error) {
var pexists func(string) (bool, error)
if len(pathExistsFunc) == 0 {
pexists = pathExists
} else {
pexists = pathExistsFunc[0]
}
var localFullPath string
if len(workDir) > 0 {
localFullPath = filepath.Join(workDir, source)
} else {
localFullPath = source
}
if exists, err := pexists(localFullPath); err == nil && exists {
localRepoRoot, err := GetRepoRoot(localFullPath)
if err != nil {
return nil, err
}
var folder string
// If the local repo root is a top level folder, we are not in a git repo.
// In this case, we should take the last folder as folder name.
if localRepoRoot == "" {
folder = filepath.Base(localFullPath)
} else {
folder = strings.ReplaceAll(localFullPath, localRepoRoot+string(filepath.Separator), "")
}
return &Source{
Local: true,
Folder: folder,
FullPath: localFullPath,
LocalRepoRoot: localRepoRoot,
Ref: "latest", // @todo consider extracting branch from git here
}, nil
}
return ParseSource(source)
}
// CheckoutSource for the local runtime server
// folder is the folder to check out the source code to
// Modifies source path to set it to checked out repo absolute path locally.
func CheckoutSource(folder string, source *Source) error {
// if it's a local folder, do nothing
if exists, err := pathExists(source.FullPath); err == nil && exists {
return nil
}
gitter := NewGitter(folder)
repo := source.Repo
if !strings.Contains(repo, "https://") {
repo = "https://" + repo
}
// Always clone, it's idempotent and only clones if needed
err := gitter.Clone(repo)
if err != nil {
return err
}
source.FullPath = filepath.Join(gitter.RepoDir(source.Repo), source.Folder)
return gitter.Checkout(repo, source.Ref)
}
// code below is not used yet
var nameExtractRegexp = regexp.MustCompile(`((micro|web)\.Name\(")(.*)("\))`)
func extractServiceName(fileContent []byte) string {
hits := nameExtractRegexp.FindAll(fileContent, 1)
if len(hits) == 0 {
return ""
}
hit := string(hits[0])
return strings.Split(hit, "\"")[1]
}

View File

@ -1,133 +0,0 @@
package git
import (
"testing"
)
type parseCase struct {
source string
expected *Source
}
func TestParseSource(t *testing.T) {
cases := []parseCase{
{
source: "helloworld",
expected: &Source{
Repo: "github.com/micro/services",
Folder: "helloworld",
Ref: "latest",
},
},
{
source: "github.com/micro/services/helloworld",
expected: &Source{
Repo: "github.com/micro/services",
Folder: "helloworld",
Ref: "latest",
},
},
{
source: "github.com/micro/services/helloworld@v1.12.1",
expected: &Source{
Repo: "github.com/micro/services",
Folder: "helloworld",
Ref: "v1.12.1",
},
},
{
source: "github.com/micro/services/helloworld@branchname",
expected: &Source{
Repo: "github.com/micro/services",
Folder: "helloworld",
Ref: "branchname",
},
},
{
source: "github.com/crufter/reponame/helloworld@branchname",
expected: &Source{
Repo: "github.com/crufter/reponame",
Folder: "helloworld",
Ref: "branchname",
},
},
}
for i, c := range cases {
result, err := ParseSource(c.source)
if err != nil {
t.Fatalf("Failed case %v: %v", i, err)
}
if result.Folder != c.expected.Folder {
t.Fatalf("Folder does not match for '%v', expected '%v', got '%v'", i, c.expected.Folder, result.Folder)
}
if result.Repo != c.expected.Repo {
t.Fatalf("Repo address does not match for '%v', expected '%v', got '%v'", i, c.expected.Repo, result.Repo)
}
if result.Ref != c.expected.Ref {
t.Fatalf("Ref does not match for '%v', expected '%v', got '%v'", i, c.expected.Ref, result.Ref)
}
}
}
type localParseCase struct {
source string
expected *Source
workDir string
pathExists bool
}
func TestLocalParseSource(t *testing.T) {
cases := []localParseCase{
{
source: ".",
expected: &Source{
Folder: "folder2",
Ref: "latest",
},
workDir: "/folder1/folder2",
pathExists: true,
},
}
for i, c := range cases {
result, err := ParseSourceLocal(c.workDir, c.source, func(s string) (bool, error) {
return c.pathExists, nil
})
if err != nil {
t.Fatalf("Failed case %v: %v", i, err)
}
if result.Folder != c.expected.Folder {
t.Fatalf("Folder does not match for '%v', expected '%v', got '%v'", i, c.expected.Folder, result.Folder)
}
if result.Repo != c.expected.Repo {
t.Fatalf("Repo address does not match for '%v', expected '%v', got '%v'", i, c.expected.Repo, result.Repo)
}
if result.Ref != c.expected.Ref {
t.Fatalf("Ref does not match for '%v', expected '%v', got '%v'", i, c.expected.Ref, result.Ref)
}
}
}
type nameCase struct {
fileContent string
expected string
}
func TestServiceNameExtract(t *testing.T) {
cases := []nameCase{
{
fileContent: `func main() {
// New Service
service := micro.NewService(
micro.Name("go.micro.service.helloworld"),
micro.Version("latest"),
)`,
expected: "go.micro.service.helloworld",
},
}
for i, c := range cases {
result := extractServiceName([]byte(c.fileContent))
if result != c.expected {
t.Fatalf("Case %v, expected: %v, got: %v", i, c.expected, result)
}
}
}

View File

@ -1,11 +0,0 @@
// Package local provides a local runtime
package local
import (
"go-micro.dev/v5/runtime"
)
// NewRuntime returns a new local runtime.
func NewRuntime(opts ...runtime.Option) runtime.Runtime {
return runtime.NewRuntime(opts...)
}

View File

@ -1,5 +0,0 @@
package process
type Options struct{}
type Option func(o *Options)

View File

@ -1,95 +0,0 @@
//go:build !windows
// +build !windows
// Package os runs processes locally
package os
import (
"fmt"
"os"
"os/exec"
"strconv"
"syscall"
"go-micro.dev/v5/runtime/local/process"
)
func (p *Process) Exec(exe *process.Executable) error {
cmd := exec.Command(exe.Package.Path)
cmd.Dir = exe.Dir
return cmd.Run()
}
func (p *Process) Fork(exe *process.Executable) (*process.PID, error) {
// create command
cmd := exec.Command(exe.Package.Path, exe.Args...)
cmd.Dir = exe.Dir
// set env vars
cmd.Env = append(cmd.Env, os.Environ()...)
cmd.Env = append(cmd.Env, exe.Env...)
// create process group
cmd.SysProcAttr = &syscall.SysProcAttr{Setpgid: true}
in, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
out, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
er, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
// start the process
if err := cmd.Start(); err != nil {
return nil, err
}
return &process.PID{
ID: fmt.Sprintf("%d", cmd.Process.Pid),
Input: in,
Output: out,
Error: er,
}, nil
}
func (p *Process) Kill(pid *process.PID) error {
id, err := strconv.Atoi(pid.ID)
if err != nil {
return err
}
if _, err := os.FindProcess(id); err != nil {
return err
}
// now kill it
// using -ve PID kills the process group which we created in Fork()
return syscall.Kill(-id, syscall.SIGTERM)
}
func (p *Process) Wait(pid *process.PID) error {
id, err := strconv.Atoi(pid.ID)
if err != nil {
return err
}
pr, err := os.FindProcess(id)
if err != nil {
return err
}
ps, err := pr.Wait()
if err != nil {
return err
}
if ps.Success() {
return nil
}
return fmt.Errorf(ps.String())
}

View File

@ -1,89 +0,0 @@
// Package os runs processes locally
package os
import (
"fmt"
"os"
"os/exec"
"strconv"
"go-micro.dev/v5/runtime/local/process"
)
func (p *Process) Exec(exe *process.Executable) error {
cmd := exec.Command(exe.Package.Path)
return cmd.Run()
}
func (p *Process) Fork(exe *process.Executable) (*process.PID, error) {
// create command
cmd := exec.Command(exe.Package.Path, exe.Args...)
// set env vars
cmd.Env = append(cmd.Env, exe.Env...)
in, err := cmd.StdinPipe()
if err != nil {
return nil, err
}
out, err := cmd.StdoutPipe()
if err != nil {
return nil, err
}
er, err := cmd.StderrPipe()
if err != nil {
return nil, err
}
// start the process
if err := cmd.Start(); err != nil {
return nil, err
}
return &process.PID{
ID: fmt.Sprintf("%d", cmd.Process.Pid),
Input: in,
Output: out,
Error: er,
}, nil
}
func (p *Process) Kill(pid *process.PID) error {
id, err := strconv.Atoi(pid.ID)
if err != nil {
return err
}
pr, err := os.FindProcess(id)
if err != nil {
return err
}
// now kill it
err = pr.Kill()
// return the kill error
return err
}
func (p *Process) Wait(pid *process.PID) error {
id, err := strconv.Atoi(pid.ID)
if err != nil {
return err
}
pr, err := os.FindProcess(id)
if err != nil {
return err
}
ps, err := pr.Wait()
if err != nil {
return err
}
if ps.Success() {
return nil
}
return fmt.Errorf(ps.String())
}

View File

@ -1,12 +0,0 @@
// Package os runs processes locally
package os
import (
"go-micro.dev/v5/runtime/local/process"
)
type Process struct{}
func NewProcess(opts ...process.Option) process.Process {
return &Process{}
}

View File

@ -1,43 +0,0 @@
// Package process executes a binary
package process
import (
"io"
"go-micro.dev/v5/runtime/local/build"
)
// Process manages a running process.
type Process interface {
// Executes a process to completion
Exec(*Executable) error
// Creates a new process
Fork(*Executable) (*PID, error)
// Kills the process
Kill(*PID) error
// Waits for a process to exit
Wait(*PID) error
}
type Executable struct {
// Package containing executable
Package *build.Package
// Initial working directory
Dir string
// The env variables
Env []string
// Args to pass
Args []string
}
// PID is the running process.
type PID struct {
// Stdin
Input io.Writer
// Stdout
Output io.Reader
// Stderr
Error io.Reader
// ID of the process
ID string
}

View File

@ -1,87 +0,0 @@
// Package git provides a git source
package git
import (
"os"
"path/filepath"
"strings"
"github.com/go-git/go-git/v5"
"go-micro.dev/v5/runtime/local/source"
)
// Source retrieves source code
// An empty struct can be used.
type Source struct {
Options source.Options
}
func (g *Source) Fetch(url string) (*source.Repository, error) {
purl := url
if parts := strings.Split(url, "://"); len(parts) > 1 {
purl = parts[len(parts)-1]
}
name := filepath.Base(url)
path := filepath.Join(g.Options.Path, purl)
_, err := git.PlainClone(path, false, &git.CloneOptions{
URL: url,
})
if err == nil {
return &source.Repository{
Name: name,
Path: path,
URL: url,
}, nil
}
// repo already exists
if err != git.ErrRepositoryAlreadyExists {
return nil, err
}
// open repo
re, err := git.PlainOpen(path)
if err != nil {
return nil, err
}
// update it
if err := re.Fetch(nil); err != nil {
return nil, err
}
return &source.Repository{
Name: name,
Path: path,
URL: url,
}, nil
}
func (g *Source) Commit(r *source.Repository) error {
repo := filepath.Join(r.Path)
re, err := git.PlainOpen(repo)
if err != nil {
return err
}
return re.Push(nil)
}
func (g *Source) String() string {
return "git"
}
func NewSource(opts ...source.Option) *Source {
options := source.Options{
Path: os.TempDir(),
}
for _, o := range opts {
o(&options)
}
return &Source{
Options: options,
}
}

View File

@ -1,93 +0,0 @@
// Package golang is a source for Go
package golang
import (
"os"
"os/exec"
"path/filepath"
"strings"
"go-micro.dev/v5/runtime/local/source"
)
type Source struct {
Options source.Options
// Go Command
Cmd string
Path string
}
func (g *Source) Fetch(url string) (*source.Repository, error) {
purl := url
if parts := strings.Split(url, "://"); len(parts) > 1 {
purl = parts[len(parts)-1]
}
// name of repo
name := filepath.Base(url)
// local path of repo
path := filepath.Join(g.Path, purl)
args := []string{"get", "-d", url, path}
cmd := exec.Command(g.Cmd, args...)
if err := cmd.Run(); err != nil {
return nil, err
}
return &source.Repository{
Name: name,
Path: path,
URL: url,
}, nil
}
// Commit is not yet supported.
func (g *Source) Commit(r *source.Repository) error {
return nil
}
func (g *Source) String() string {
return "golang"
}
// whichGo locates the go command.
func whichGo() string {
// check GOROOT
if gr := os.Getenv("GOROOT"); len(gr) > 0 {
return filepath.Join(gr, "bin", "go")
}
// check path
for _, p := range filepath.SplitList(os.Getenv("PATH")) {
bin := filepath.Join(p, "go")
if _, err := os.Stat(bin); err == nil {
return bin
}
}
// best effort
return "go"
}
func NewSource(opts ...source.Option) source.Source {
options := source.Options{
Path: os.TempDir(),
}
for _, o := range opts {
o(&options)
}
cmd := whichGo()
path := options.Path
// point of no return
if len(cmd) == 0 {
panic("Could not find Go executable")
}
return &Source{
Options: options,
Cmd: cmd,
Path: path,
}
}

View File

@ -1,15 +0,0 @@
package source
type Options struct {
// local path to download source
Path string
}
type Option func(o *Options)
// Local path for repository.
func Path(p string) Option {
return func(o *Options) {
o.Path = p
}
}

View File

@ -1,22 +0,0 @@
// Package source retrieves source code
package source
// Source retrieves source code.
type Source interface {
// Fetch repo from a url
Fetch(url string) (*Repository, error)
// Commit and upload repo
Commit(*Repository) error
// The sourcerer
String() string
}
// Repository is the source repository.
type Repository struct {
// Name or repo
Name string
// Local path where repo is stored
Path string
// URL from which repo was retrieved
URL string
}

View File

@ -1,309 +0,0 @@
package runtime
import (
"context"
"io"
"go-micro.dev/v5/client"
"go-micro.dev/v5/logger"
)
type Option func(o *Options)
// Options configure runtime.
type Options struct {
// Scheduler for updates
Scheduler Scheduler
// Client to use when making requests
Client client.Client
// Logger underline logger
Logger logger.Logger
// Service type to manage
Type string
// Source of the services repository
Source string
// Base image to use
Image string
}
func NewOptions(opts ...Option) *Options {
options := &Options{
Logger: logger.DefaultLogger,
}
for _, o := range opts {
o(options)
}
return options
}
// WithSource sets the base image / repository.
func WithSource(src string) Option {
return func(o *Options) {
o.Source = src
}
}
// WithScheduler specifies a scheduler for updates.
func WithScheduler(n Scheduler) Option {
return func(o *Options) {
o.Scheduler = n
}
}
// WithType sets the service type to manage.
func WithType(t string) Option {
return func(o *Options) {
o.Type = t
}
}
// WithImage sets the image to use.
func WithImage(t string) Option {
return func(o *Options) {
o.Image = t
}
}
// WithClient sets the client to use.
func WithClient(c client.Client) Option {
return func(o *Options) {
o.Client = c
}
}
// WithLogger sets the underline logger.
func WithLogger(l logger.Logger) Option {
return func(o *Options) {
o.Logger = l
}
}
type CreateOption func(o *CreateOptions)
type ReadOption func(o *ReadOptions)
// CreateOptions configure runtime services.
type CreateOptions struct {
// Log output
Output io.Writer
// Specify the context to use
Context context.Context
// Type of service to create
Type string
// Specify the image to use
Image string
// Namespace to create the service in
Namespace string
// Command to execut
Command []string
// Args to pass into command
Args []string
// Environment to configure
Env []string
// Retries before failing deploy
Retries int
}
// ReadOptions queries runtime services.
type ReadOptions struct {
// Specify the context to use
Context context.Context
// Service name
Service string
// Version queries services with given version
Version string
// Type of service
Type string
// Namespace the service is running in
Namespace string
}
// CreateType sets the type of service to create.
func CreateType(t string) CreateOption {
return func(o *CreateOptions) {
o.Type = t
}
}
// CreateImage sets the image to use.
func CreateImage(img string) CreateOption {
return func(o *CreateOptions) {
o.Image = img
}
}
// CreateNamespace sets the namespace.
func CreateNamespace(ns string) CreateOption {
return func(o *CreateOptions) {
o.Namespace = ns
}
}
// CreateContext sets the context.
func CreateContext(ctx context.Context) CreateOption {
return func(o *CreateOptions) {
o.Context = ctx
}
}
// WithCommand specifies the command to execute.
func WithCommand(cmd ...string) CreateOption {
return func(o *CreateOptions) {
// set command
o.Command = cmd
}
}
// WithArgs specifies the command to execute.
func WithArgs(args ...string) CreateOption {
return func(o *CreateOptions) {
// set command
o.Args = args
}
}
func WithRetries(retries int) CreateOption {
return func(o *CreateOptions) {
o.Retries = retries
}
}
// WithEnv sets the created service environment.
func WithEnv(env []string) CreateOption {
return func(o *CreateOptions) {
o.Env = env
}
}
// WithOutput sets the arg output.
func WithOutput(out io.Writer) CreateOption {
return func(o *CreateOptions) {
o.Output = out
}
}
// ReadService returns services with the given name.
func ReadService(service string) ReadOption {
return func(o *ReadOptions) {
o.Service = service
}
}
// ReadVersion confifgures service version.
func ReadVersion(version string) ReadOption {
return func(o *ReadOptions) {
o.Version = version
}
}
// ReadType returns services of the given type.
func ReadType(t string) ReadOption {
return func(o *ReadOptions) {
o.Type = t
}
}
// ReadNamespace sets the namespace.
func ReadNamespace(ns string) ReadOption {
return func(o *ReadOptions) {
o.Namespace = ns
}
}
// ReadContext sets the context.
func ReadContext(ctx context.Context) ReadOption {
return func(o *ReadOptions) {
o.Context = ctx
}
}
type UpdateOption func(o *UpdateOptions)
type UpdateOptions struct {
// Specify the context to use
Context context.Context
// Namespace the service is running in
Namespace string
}
// UpdateNamespace sets the namespace.
func UpdateNamespace(ns string) UpdateOption {
return func(o *UpdateOptions) {
o.Namespace = ns
}
}
// UpdateContext sets the context.
func UpdateContext(ctx context.Context) UpdateOption {
return func(o *UpdateOptions) {
o.Context = ctx
}
}
type DeleteOption func(o *DeleteOptions)
type DeleteOptions struct {
// Specify the context to use
Context context.Context
// Namespace the service is running in
Namespace string
}
// DeleteNamespace sets the namespace.
func DeleteNamespace(ns string) DeleteOption {
return func(o *DeleteOptions) {
o.Namespace = ns
}
}
// DeleteContext sets the context.
func DeleteContext(ctx context.Context) DeleteOption {
return func(o *DeleteOptions) {
o.Context = ctx
}
}
// LogsOption configures runtime logging.
type LogsOption func(o *LogsOptions)
// LogsOptions configure runtime logging.
type LogsOptions struct {
// Specify the context to use
Context context.Context
// Namespace the service is running in
Namespace string
// How many existing lines to show
Count int64
// Stream new lines?
Stream bool
}
// LogsExistingCount confiures how many existing lines to show.
func LogsCount(count int64) LogsOption {
return func(l *LogsOptions) {
l.Count = count
}
}
// LogsStream configures whether to stream new lines.
func LogsStream(stream bool) LogsOption {
return func(l *LogsOptions) {
l.Stream = stream
}
}
// LogsNamespace sets the namespace.
func LogsNamespace(ns string) LogsOption {
return func(o *LogsOptions) {
o.Namespace = ns
}
}
// LogsContext sets the context.
func LogsContext(ctx context.Context) LogsOption {
return func(o *LogsOptions) {
o.Context = ctx
}
}

View File

@ -1,110 +0,0 @@
// Package runtime is a service runtime manager
package runtime
import (
"errors"
"time"
)
var (
// DefaultRuntime is default micro runtime.
DefaultRuntime Runtime = NewRuntime()
// DefaultName is default runtime service name.
DefaultName = "go.micro.runtime"
ErrAlreadyExists = errors.New("already exists")
)
// Runtime is a service runtime manager.
type Runtime interface {
// Init initializes runtime
Init(...Option) error
// Create registers a service
Create(*Service, ...CreateOption) error
// Read returns the service
Read(...ReadOption) ([]*Service, error)
// Update the service in place
Update(*Service, ...UpdateOption) error
// Remove a service
Delete(*Service, ...DeleteOption) error
// Logs returns the logs for a service
Logs(*Service, ...LogsOption) (LogStream, error)
// Start starts the runtime
Start() error
// Stop shuts down the runtime
Stop() error
// String describes runtime
String() string
}
// Stream returns a log stream.
type LogStream interface {
Error() error
Chan() chan LogRecord
Stop() error
}
type LogRecord struct {
Metadata map[string]string
Message string
}
// Scheduler is a runtime service scheduler.
type Scheduler interface {
// Notify publishes schedule events
Notify() (<-chan Event, error)
// Close stops the scheduler
Close() error
}
// EventType defines schedule event.
type EventType int
const (
// Create is emitted when a new build has been craeted.
Create EventType = iota
// Update is emitted when a new update become available.
Update
// Delete is emitted when a build has been deleted.
Delete
)
// String returns human readable event type.
func (t EventType) String() string {
switch t {
case Create:
return "create"
case Delete:
return "delete"
case Update:
return "update"
default:
return "unknown"
}
}
// Event is notification event.
type Event struct {
// Timestamp is event timestamp
Timestamp time.Time
// Service the event relates to
Service *Service
// Options to use when processing the event
Options *CreateOptions
// ID of the event
ID string
// Type is event type
Type EventType
}
// Service is runtime service.
type Service struct {
// Metadata stores metadata
Metadata map[string]string
// Name of the service
Name string
// Version of the service
Version string
// url location of source
Source string
}

View File

@ -1,230 +0,0 @@
package runtime
import (
"fmt"
"io"
"strconv"
"strings"
"sync"
"time"
log "go-micro.dev/v5/logger"
"go-micro.dev/v5/runtime/local/build"
"go-micro.dev/v5/runtime/local/process"
proc "go-micro.dev/v5/runtime/local/process/os"
)
type service struct {
updated time.Time
// to be used logger
Logger log.Logger
// output for logs
output io.Writer
err error
// process creator
Process *proc.Process
closed chan bool
// service to manage
*Service
// Exec
Exec *process.Executable
// process pid
PID *process.PID
retries int
maxRetries int
sync.RWMutex
running bool
}
func newService(s *Service, c CreateOptions, l log.Logger) *service {
var exec string
var args []string
// set command
exec = strings.Join(c.Command, " ")
args = c.Args
return &service{
Service: s,
Process: new(proc.Process),
Exec: &process.Executable{
Package: &build.Package{
Name: s.Name,
Path: exec,
},
Env: c.Env,
Args: args,
Dir: s.Source,
},
Logger: log.LoggerOrDefault(l),
closed: make(chan bool),
output: c.Output,
updated: time.Now(),
maxRetries: c.Retries,
}
}
func (s *service) streamOutput() {
go io.Copy(s.output, s.PID.Output)
go io.Copy(s.output, s.PID.Error)
}
func (s *service) shouldStart() bool {
if s.running {
return false
}
return s.retries <= s.maxRetries
}
func (s *service) key() string {
return fmt.Sprintf("%v:%v", s.Name, s.Version)
}
func (s *service) ShouldStart() bool {
s.RLock()
defer s.RUnlock()
return s.shouldStart()
}
func (s *service) Running() bool {
s.RLock()
defer s.RUnlock()
return s.running
}
// Start starts the service.
func (s *service) Start() error {
s.Lock()
defer s.Unlock()
if !s.shouldStart() {
return nil
}
// reset
s.err = nil
s.closed = make(chan bool)
s.retries = 0
if s.Metadata == nil {
s.Metadata = make(map[string]string)
}
s.Status("starting", nil)
// TODO: pull source & build binary
s.Logger.Log(log.DebugLevel, "Runtime service %s forking new process", s.Service.Name)
p, err := s.Process.Fork(s.Exec)
if err != nil {
s.Status("error", err)
return err
}
// set the pid
s.PID = p
// set to running
s.running = true
// set status
s.Status("running", nil)
// set started
s.Metadata["started"] = time.Now().Format(time.RFC3339)
if s.output != nil {
s.streamOutput()
}
// wait and watch
go s.Wait()
return nil
}
// Status updates the status of the service. Assumes it's called under a lock as it mutates state.
func (s *service) Status(status string, err error) {
s.Metadata["lastStatusUpdate"] = time.Now().Format(time.RFC3339)
s.Metadata["status"] = status
if err == nil {
delete(s.Metadata, "error")
return
}
s.Metadata["error"] = err.Error()
}
// Stop stops the service.
func (s *service) Stop() error {
s.Lock()
defer s.Unlock()
select {
case <-s.closed:
return nil
default:
close(s.closed)
s.running = false
s.retries = 0
if s.PID == nil {
return nil
}
// set status
s.Status("stopping", nil)
// kill the process
err := s.Process.Kill(s.PID)
if err == nil {
// wait for it to exit
s.Process.Wait(s.PID)
}
// set status
s.Status("stopped", err)
// return the kill error
return err
}
}
// Error returns the last error service has returned.
func (s *service) Error() error {
s.RLock()
defer s.RUnlock()
return s.err
}
// Wait waits for the service to finish running.
func (s *service) Wait() {
// wait for process to exit
s.RLock()
thisPID := s.PID
s.RUnlock()
err := s.Process.Wait(thisPID)
s.Lock()
defer s.Unlock()
if s.PID.ID != thisPID.ID {
// trying to update when it's already been switched out, ignore
s.Logger.Logf(log.WarnLevel, "Trying to update a process status but PID doesn't match. Old %s, New %s. Skipping update.", thisPID.ID, s.PID.ID)
return
}
// save the error
if err != nil {
s.Logger.Logf(log.ErrorLevel, "Service %s terminated with error %s", s.Name, err)
s.retries++
s.Status("error", err)
s.Metadata["retries"] = strconv.Itoa(s.retries)
s.err = err
} else {
s.Status("done", nil)
}
// no longer running
s.running = false
}