mirror of
https://github.com/go-kratos/kratos.git
synced 2025-01-24 03:46:37 +02:00
9743ad8d32
* fix registry. 1. When etcd stops, the application cannot stop. 2. Stop consul first, then stop app. Then start consul, the registered service remains in consul registry as an unhealthy status. Co-authored-by: corel <corelchen@qq.com>
183 lines
4.0 KiB
Go
183 lines
4.0 KiB
Go
package kratos
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"github.com/go-kratos/kratos/v2/registry"
|
|
"github.com/go-kratos/kratos/v2/transport"
|
|
|
|
"github.com/google/uuid"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
// AppInfo is application context value.
|
|
type AppInfo interface {
|
|
ID() string
|
|
Name() string
|
|
Version() string
|
|
Metadata() map[string]string
|
|
Endpoint() []string
|
|
}
|
|
|
|
// App is an application components lifecycle manager.
|
|
type App struct {
|
|
opts options
|
|
ctx context.Context
|
|
cancel func()
|
|
instance *registry.ServiceInstance
|
|
}
|
|
|
|
// New create an application lifecycle manager.
|
|
func New(opts ...Option) *App {
|
|
o := options{
|
|
ctx: context.Background(),
|
|
logger: log.NewHelper(log.DefaultLogger),
|
|
sigs: []os.Signal{syscall.SIGTERM, syscall.SIGQUIT, syscall.SIGINT},
|
|
registrarTimeout: 10 * time.Second,
|
|
}
|
|
if id, err := uuid.NewUUID(); err == nil {
|
|
o.id = id.String()
|
|
}
|
|
for _, opt := range opts {
|
|
opt(&o)
|
|
}
|
|
ctx, cancel := context.WithCancel(o.ctx)
|
|
return &App{
|
|
ctx: ctx,
|
|
cancel: cancel,
|
|
opts: o,
|
|
}
|
|
}
|
|
|
|
// ID returns app instance id.
|
|
func (a *App) ID() string { return a.opts.id }
|
|
|
|
// Name returns service name.
|
|
func (a *App) Name() string { return a.opts.name }
|
|
|
|
// Version returns app version.
|
|
func (a *App) Version() string { return a.opts.version }
|
|
|
|
// Metadata returns service metadata.
|
|
func (a *App) Metadata() map[string]string { return a.opts.metadata }
|
|
|
|
// Endpoint returns endpoints.
|
|
func (a *App) Endpoint() []string {
|
|
if a.instance == nil {
|
|
return []string{}
|
|
}
|
|
return a.instance.Endpoints
|
|
}
|
|
|
|
// Run executes all OnStart hooks registered with the application's Lifecycle.
|
|
func (a *App) Run() error {
|
|
instance, err := a.buildInstance()
|
|
if err != nil {
|
|
return err
|
|
}
|
|
ctx := NewContext(a.ctx, a)
|
|
eg, ctx := errgroup.WithContext(ctx)
|
|
wg := sync.WaitGroup{}
|
|
for _, srv := range a.opts.servers {
|
|
srv := srv
|
|
eg.Go(func() error {
|
|
<-ctx.Done() // wait for stop signal
|
|
return srv.Stop(ctx)
|
|
})
|
|
wg.Add(1)
|
|
eg.Go(func() error {
|
|
wg.Done()
|
|
return srv.Start(ctx)
|
|
})
|
|
}
|
|
wg.Wait()
|
|
if a.opts.registrar != nil {
|
|
rctx, rcancel := context.WithTimeout(a.opts.ctx, a.opts.registrarTimeout)
|
|
defer rcancel()
|
|
if err := a.opts.registrar.Register(rctx, instance); err != nil {
|
|
return err
|
|
}
|
|
a.instance = instance
|
|
}
|
|
c := make(chan os.Signal, 1)
|
|
signal.Notify(c, a.opts.sigs...)
|
|
eg.Go(func() error {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-c:
|
|
err := a.Stop()
|
|
if err != nil {
|
|
a.opts.logger.Errorf("failed to stop app: %v", err)
|
|
return err
|
|
}
|
|
}
|
|
}
|
|
})
|
|
if err := eg.Wait(); err != nil && !errors.Is(err, context.Canceled) {
|
|
return err
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// Stop gracefully stops the application.
|
|
func (a *App) Stop() error {
|
|
if a.opts.registrar != nil && a.instance != nil {
|
|
ctx, cancel := context.WithTimeout(a.opts.ctx, a.opts.registrarTimeout)
|
|
defer cancel()
|
|
if err := a.opts.registrar.Deregister(ctx, a.instance); err != nil {
|
|
return err
|
|
}
|
|
}
|
|
if a.cancel != nil {
|
|
a.cancel()
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (a *App) buildInstance() (*registry.ServiceInstance, error) {
|
|
endpoints := make([]string, 0) //nolint:gomnd
|
|
for _, e := range a.opts.endpoints {
|
|
endpoints = append(endpoints, e.String())
|
|
}
|
|
if len(endpoints) == 0 {
|
|
for _, srv := range a.opts.servers {
|
|
if r, ok := srv.(transport.Endpointer); ok {
|
|
e, err := r.Endpoint()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
endpoints = append(endpoints, e.String())
|
|
}
|
|
}
|
|
}
|
|
return ®istry.ServiceInstance{
|
|
ID: a.opts.id,
|
|
Name: a.opts.name,
|
|
Version: a.opts.version,
|
|
Metadata: a.opts.metadata,
|
|
Endpoints: endpoints,
|
|
}, nil
|
|
}
|
|
|
|
type appKey struct{}
|
|
|
|
// NewContext returns a new Context that carries value.
|
|
func NewContext(ctx context.Context, s AppInfo) context.Context {
|
|
return context.WithValue(ctx, appKey{}, s)
|
|
}
|
|
|
|
// FromContext returns the Transport value stored in ctx, if any.
|
|
func FromContext(ctx context.Context) (s AppInfo, ok bool) {
|
|
s, ok = ctx.Value(appKey{}).(AppInfo)
|
|
return
|
|
}
|