diff --git a/api/api.go b/api/api.go index 7226b808..f2853921 100644 --- a/api/api.go +++ b/api/api.go @@ -32,12 +32,12 @@ type Api interface { // Options are API options. type Options struct { - // Address of the server - Address string // Router for resolving routes Router router.Router // Client to use for RPC Client client.Client + // Address of the server + Address string } // Option type are API option args. diff --git a/api/client/client.go b/api/client/client.go index 3a8fea37..4681a65a 100644 --- a/api/client/client.go +++ b/api/client/client.go @@ -47,13 +47,13 @@ type Request struct { // Response is the response of the generic `api-client` call. type Response struct { // json and base64 encoded response body - Body string `json:"body"` - // error fields. Error json example - // {"id":"go.micro.client","code":500,"detail":"malformed method name: \"\"","status":"Internal Server Error"} - Code int `json:"code"` + Body string `json:"body"` ID string `json:"id"` Detail string `json:"detail"` Status string `json:"status"` + // error fields. Error json example + // {"id":"go.micro.client","code":500,"detail":"malformed method name: \"\"","status":"Internal Server Error"} + Code int `json:"code"` } // Client enables generic calls to micro. diff --git a/api/handler/options.go b/api/handler/options.go index a0fd70bd..5ea9779d 100644 --- a/api/handler/options.go +++ b/api/handler/options.go @@ -13,11 +13,11 @@ var ( // Options is the list of api Options. type Options struct { - MaxRecvSize int64 - Namespace string Router router.Router Client client.Client Logger logger.Logger + Namespace string + MaxRecvSize int64 } // Option is a api Option. diff --git a/api/resolver/resolver.go b/api/resolver/resolver.go index 3a1e7549..6842613f 100644 --- a/api/resolver/resolver.go +++ b/api/resolver/resolver.go @@ -31,8 +31,8 @@ type Endpoint struct { // Options is a struct of available options. type Options struct { - Handler string Namespace func(*http.Request) string + Handler string } // Option is a helper for a single option. diff --git a/api/router/options.go b/api/router/options.go index 30968f05..904e6c8a 100644 --- a/api/router/options.go +++ b/api/router/options.go @@ -9,10 +9,10 @@ import ( // Options is a struct of options available. type Options struct { - Handler string Registry registry.Registry Resolver resolver.Resolver Logger logger.Logger + Handler string } // Option is a helper for a single options. diff --git a/api/router/registry/registry.go b/api/router/registry/registry.go index d1fbd1db..9abc142a 100644 --- a/api/router/registry/registry.go +++ b/api/router/registry/registry.go @@ -27,16 +27,17 @@ type endpoint struct { // router is the default router. type registryRouter struct { - exit chan bool opts router.Options // registry cache rc cache.Cache - sync.RWMutex - eps map[string]*router.Route + exit chan bool + eps map[string]*router.Route // compiled regexp for host and path ceps map[string]*endpoint + + sync.RWMutex } func (r *registryRouter) isStopped() bool { diff --git a/api/router/static/static.go b/api/router/static/static.go index 96b393d4..5a2f8158 100644 --- a/api/router/static/static.go +++ b/api/router/static/static.go @@ -25,10 +25,10 @@ type endpoint struct { // Router is the default router. type Router struct { - exit chan bool opts router.Options + exit chan bool + eps map[string]*endpoint sync.RWMutex - eps map[string]*endpoint } func (r *Router) isStopd() bool { diff --git a/api/router/util/compile.go b/api/router/util/compile.go index c4fd50c0..956b269a 100644 --- a/api/router/util/compile.go +++ b/api/router/util/compile.go @@ -9,18 +9,18 @@ const ( // Template is a compiled representation of path templates. type Template struct { - // Version is the version number of the format. - Version int + // Verb is a VERB part in the template. + Verb string + // Original template (example: /v1/a_bit_of_everything) + Template string // OpCodes is a sequence of operations. OpCodes []int // Pool is a constant pool Pool []string - // Verb is a VERB part in the template. - Verb string // Fields is a list of field paths bound in this template. Fields []string - // Original template (example: /v1/a_bit_of_everything) - Template string + // Version is the version number of the format. + Version int } // Compiler compiles utilities representation of path templates into marshallable operations. @@ -30,13 +30,14 @@ type Compiler interface { } type op struct { - // code is the opcode of the operation - code OpCode // str is a string operand of the code. // operand is ignored if str is not empty. str string + // code is the opcode of the operation + code OpCode + // operand is a numeric operand of the code. operand int } diff --git a/api/router/util/parse.go b/api/router/util/parse.go index 013b8aea..4b6f5449 100644 --- a/api/router/util/parse.go +++ b/api/router/util/parse.go @@ -107,9 +107,9 @@ func tokenize(path string) (tokens []string, verb string) { // parser is a parser of the template syntax defined in github.com/googleapis/googleapis/google/api/http.proto. type parser struct { + logger log.Logger tokens []string accepted []string - logger log.Logger } // topLevelSegments is the target of this parser. diff --git a/api/router/util/runtime.go b/api/router/util/runtime.go index caac8a79..aaae845f 100644 --- a/api/router/util/runtime.go +++ b/api/router/util/runtime.go @@ -24,6 +24,8 @@ type rop struct { // Pattern is a template pattern of http request paths defined in github.com/googleapis/googleapis/google/api/http.proto. type Pattern struct { + // verb is the VERB part of the path pattern. It is empty if the pattern does not have VERB part. + verb string // ops is a list of operations ops []rop // pool is a constant pool indexed by the operands or vars. @@ -34,16 +36,14 @@ type Pattern struct { stacksize int // tailLen is the length of the fixed-size segments after a deep wildcard tailLen int - // verb is the VERB part of the path pattern. It is empty if the pattern does not have VERB part. - verb string // assumeColonVerb indicates whether a path suffix after a final // colon may only be interpreted as a verb. assumeColonVerb bool } type patternOptions struct { - assumeColonVerb bool logger log.Logger + assumeColonVerb bool } // PatternOpt is an option for creating Patterns. diff --git a/api/router/util/types.go b/api/router/util/types.go index 2bf8607a..04e0e53e 100644 --- a/api/router/util/types.go +++ b/api/router/util/types.go @@ -9,9 +9,9 @@ import ( ) type template struct { - segments []segment verb string template string + segments []segment } type segment interface { diff --git a/api/server/acme/options.go b/api/server/acme/options.go index 045c2208..88bf21d1 100644 --- a/api/server/acme/options.go +++ b/api/server/acme/options.go @@ -10,17 +10,9 @@ type Option func(o *Options) // Options represents various options you can present to ACME providers. type Options struct { - // AcceptTLS must be set to true to indicate that you have read your - // provider's terms of service. - AcceptToS bool - // CA is the CA to use - CA string // ChallengeProvider is a go-acme/lego challenge provider. Set this if you // want to use DNS Challenges. Otherwise, tls-alpn-01 will be used ChallengeProvider challenge.Provider - // Issue certificates for domains on demand. Otherwise, certs will be - // retrieved / issued on start-up. - OnDemand bool // Cache is a storage interface. Most ACME libraries have an cache, but // there's no defined interface, so if you consume this option // sanity check it before using. @@ -28,6 +20,14 @@ type Options struct { // Logger is the underling logging framework Logger logger.Logger + // CA is the CA to use + CA string + // AcceptTLS must be set to true to indicate that you have read your + // provider's terms of service. + AcceptToS bool + // Issue certificates for domains on demand. Otherwise, certs will be + // retrieved / issued on start-up. + OnDemand bool } // AcceptToS indicates whether you accept your CA's terms of service. diff --git a/api/server/cors/cors.go b/api/server/cors/cors.go index c4cd4db4..9d1e74d5 100644 --- a/api/server/cors/cors.go +++ b/api/server/cors/cors.go @@ -6,9 +6,9 @@ import ( type Config struct { AllowOrigin string - AllowCredentials bool AllowMethods string AllowHeaders string + AllowCredentials bool } // CombinedCORSHandler wraps a server and provides CORS headers. diff --git a/api/server/http/http.go b/api/server/http/http.go index e95c9d25..b7d3ab88 100644 --- a/api/server/http/http.go +++ b/api/server/http/http.go @@ -15,12 +15,13 @@ import ( ) type httpServer struct { - mux *http.ServeMux opts server.Options - mtx sync.RWMutex - address string + mux *http.ServeMux exit chan chan error + address string + + mtx sync.RWMutex } func NewServer(address string, opts ...server.Option) server.Server { diff --git a/api/server/options.go b/api/server/options.go index 53676ffa..1f3c4ede 100644 --- a/api/server/options.go +++ b/api/server/options.go @@ -13,16 +13,16 @@ import ( type Option func(o *Options) type Options struct { + ACMEProvider acme.Provider + Resolver resolver.Resolver + Logger logger.Logger + CORSConfig *cors.Config + TLSConfig *tls.Config + ACMEHosts []string + Wrappers []Wrapper EnableACME bool EnableCORS bool - CORSConfig *cors.Config - ACMEProvider acme.Provider EnableTLS bool - ACMEHosts []string - TLSConfig *tls.Config - Resolver resolver.Resolver - Wrappers []Wrapper - Logger logger.Logger } type Wrapper func(h http.Handler) http.Handler diff --git a/auth/auth.go b/auth/auth.go index 99140102..fb192a3b 100644 --- a/auth/auth.go +++ b/auth/auth.go @@ -53,30 +53,30 @@ type Rules interface { // Account provided by an auth provider. type Account struct { + // Any other associated metadata + Metadata map[string]string `json:"metadata"` // ID of the account e.g. email ID string `json:"id"` // Type of the account, e.g. service Type string `json:"type"` // Issuer of the account Issuer string `json:"issuer"` - // Any other associated metadata - Metadata map[string]string `json:"metadata"` - // Scopes the account has access to - Scopes []string `json:"scopes"` // Secret for the account, e.g. the password Secret string `json:"secret"` + // Scopes the account has access to + Scopes []string `json:"scopes"` } // Token can be short or long lived. type Token struct { - // The token to be used for accessing resources - AccessToken string `json:"access_token"` - // RefreshToken to be used to generate a new token - RefreshToken string `json:"refresh_token"` // Time of token creation Created time.Time `json:"created"` // Time of token expiry Expiry time.Time `json:"expiry"` + // The token to be used for accessing resources + AccessToken string `json:"access_token"` + // RefreshToken to be used to generate a new token + RefreshToken string `json:"refresh_token"` } // Expired returns a boolean indicating if the token needs to be refreshed. @@ -106,13 +106,13 @@ const ( // Rule is used to verify access to a resource. type Rule struct { + // Resource the rule applies to + Resource *Resource // ID of the rule, e.g. "public" ID string // Scope the rule requires, a blank scope indicates open to the public and * indicates the rule // applies to any valid account Scope string - // Resource the rule applies to - Resource *Resource // Access determines if the rule grants or denies access to the resource Access Access // Priority the rule should take when verifying a request, the higher the value the sooner the diff --git a/auth/options.go b/auth/options.go index 82e27c81..04a35654 100644 --- a/auth/options.go +++ b/auth/options.go @@ -20,22 +20,22 @@ func NewOptions(opts ...Option) Options { } type Options struct { + // Logger is the underline logger + Logger logger.Logger + // Token is the services token used to authenticate itself + Token *Token // Namespace the service belongs to Namespace string // ID is the services auth ID ID string // Secret is used to authenticate the service Secret string - // Token is the services token used to authenticate itself - Token *Token // PublicKey for decoding JWTs PublicKey string // PrivateKey for encoding JWTs PrivateKey string // Addrs sets the addresses of auth Addrs []string - // Logger is the underline logger - Logger logger.Logger } type Option func(o *Options) @@ -93,14 +93,14 @@ func ClientToken(token *Token) Option { type GenerateOptions struct { // Metadata associated with the account Metadata map[string]string - // Scopes the account has access too - Scopes []string // Provider of the account, e.g. oauth Provider string // Type of the account, e.g. user Type string // Secret used to authenticate the account Secret string + // Scopes the account has access too + Scopes []string } type GenerateOption func(o *GenerateOptions) diff --git a/broker/http.go b/broker/http.go index c1497f5e..ef43f22d 100644 --- a/broker/http.go +++ b/broker/http.go @@ -29,38 +29,40 @@ import ( // HTTP Broker is a point to point async broker. type httpBroker struct { - id string - address string - opts Options + opts Options + + r registry.Registry mux *http.ServeMux - c *http.Client - r registry.Registry - - sync.RWMutex + c *http.Client subscribers map[string][]*httpSubscriber - running bool exit chan chan error + inbox map[string][][]byte + id string + address string + + sync.RWMutex + // offline message inbox - mtx sync.RWMutex - inbox map[string][][]byte + mtx sync.RWMutex + running bool } type httpSubscriber struct { opts SubscribeOptions - id string - topic string fn Handler svc *registry.Service hb *httpBroker + id string + topic string } type httpEvent struct { + err error m *Message t string - err error } var ( diff --git a/broker/memory.go b/broker/memory.go index 26361cf4..270d2c60 100644 --- a/broker/memory.go +++ b/broker/memory.go @@ -16,25 +16,26 @@ import ( type memoryBroker struct { opts *Options + Subscribers map[string][]*memorySubscriber + addr string sync.RWMutex - connected bool - Subscribers map[string][]*memorySubscriber + connected bool } type memoryEvent struct { - opts *Options - topic string err error message interface{} + opts *Options + topic string } type memorySubscriber struct { - id string - topic string + opts SubscribeOptions exit chan bool handler Handler - opts SubscribeOptions + id string + topic string } func (m *memoryBroker) Options() Options { diff --git a/broker/options.go b/broker/options.go index 32cec6f4..72a6c8db 100644 --- a/broker/options.go +++ b/broker/options.go @@ -10,23 +10,24 @@ import ( ) type Options struct { - Addrs []string - Secure bool - Codec codec.Marshaler + Codec codec.Marshaler // Logger is the underlying logger Logger logger.Logger + // Registry used for clustering + Registry registry.Registry + // Other options for implementations of the interface + // can be stored in a context + Context context.Context + // Handler executed when error happens in broker mesage // processing ErrorHandler Handler TLSConfig *tls.Config - // Registry used for clustering - Registry registry.Registry - // Other options for implementations of the interface - // can be stored in a context - Context context.Context + Addrs []string + Secure bool } type PublishOptions struct { @@ -36,17 +37,18 @@ type PublishOptions struct { } type SubscribeOptions struct { - // AutoAck defaults to true. When a handler returns - // with a nil error the message is acked. - AutoAck bool + + // Other options for implementations of the interface + // can be stored in a context + Context context.Context // Subscribers with the same queue name // will create a shared subscription where each // receives a subset of messages. Queue string - // Other options for implementations of the interface - // can be stored in a context - Context context.Context + // AutoAck defaults to true. When a handler returns + // with a nil error the message is acked. + AutoAck bool } type Option func(*Options) diff --git a/cache/memory.go b/cache/memory.go index 86f6a408..2ee07faf 100644 --- a/cache/memory.go +++ b/cache/memory.go @@ -8,9 +8,9 @@ import ( type memCache struct { opts Options - sync.RWMutex items map[string]Item + sync.RWMutex } func (c *memCache) Get(ctx context.Context, key string) (interface{}, time.Time, error) { diff --git a/cache/options.go b/cache/options.go index 41ad7168..2e85f625 100644 --- a/cache/options.go +++ b/cache/options.go @@ -9,14 +9,14 @@ import ( // Options represents the options for the cache. type Options struct { - Expiration time.Duration - Items map[string]Item - // Address represents the address or other connection information of the cache service. - Address string // Context should contain all implementation specific options, using context.WithValue. Context context.Context // Logger is the be used logger Logger logger.Logger + Items map[string]Item + // Address represents the address or other connection information of the cache service. + Address string + Expiration time.Duration } // Option manipulates the Options passed. diff --git a/client/options.go b/client/options.go index 35005bf8..d662a819 100644 --- a/client/options.go +++ b/client/options.go @@ -31,31 +31,19 @@ var ( // Options are the Client options. type Options struct { - // Used to select codec - ContentType string - // Plugged interfaces - Broker broker.Broker - Codecs map[string]codec.NewCodec - Registry registry.Registry - Selector selector.Selector - Transport transport.Transport + // Default Call Options + CallOptions CallOptions // Router sets the router Router Router - // Connection Pool - PoolSize int - PoolTTL time.Duration + Registry registry.Registry + Selector selector.Selector + Transport transport.Transport - // Response cache - Cache *Cache - - // Middleware for client - Wrappers []Wrapper - - // Default Call Options - CallOptions CallOptions + // Plugged interfaces + Broker broker.Broker // Logger is the underline logger Logger logger.Logger @@ -63,22 +51,40 @@ type Options struct { // Other options for implementations of the interface // can be stored in a context Context context.Context + Codecs map[string]codec.NewCodec + + // Response cache + Cache *Cache + + // Used to select codec + ContentType string + + // Middleware for client + Wrappers []Wrapper + + // Connection Pool + PoolSize int + PoolTTL time.Duration } // CallOptions are options used to make calls to a server. type CallOptions struct { + + // Other options for implementations of the interface + // can be stored in a context + Context context.Context + // Backoff func + Backoff BackoffFunc + // Check if retriable func + Retry RetryFunc SelectOptions []selector.SelectOption // Address of remote hosts Address []string - // Backoff func - Backoff BackoffFunc - // Check if retriable func - Retry RetryFunc - // Number of Call attempts - Retries int - // Transport Dial Timeout. Used for initial dial to establish a connection. - DialTimeout time.Duration + + // Middleware for low level call func + CallWrappers []CallWrapper + // ConnectionTimeout of one request to the server. // Set this lower than the RequestTimeout to enbale retries on connection timeout. ConnectionTimeout time.Duration @@ -86,27 +92,24 @@ type CallOptions struct { RequestTimeout time.Duration // Stream timeout for the stream StreamTimeout time.Duration - // Use the services own auth token - ServiceToken bool // Duration to cache the response for CacheExpiry time.Duration + // Transport Dial Timeout. Used for initial dial to establish a connection. + DialTimeout time.Duration + // Number of Call attempts + Retries int + // Use the services own auth token + ServiceToken bool // ConnClose sets the Connection: close header. ConnClose bool - - // Middleware for low level call func - CallWrappers []CallWrapper - - // Other options for implementations of the interface - // can be stored in a context - Context context.Context } type PublishOptions struct { - // Exchange is the routing exchange for the message - Exchange string // Other options for implementations of the interface // can be stored in a context Context context.Context + // Exchange is the routing exchange for the message + Exchange string } type MessageOptions struct { @@ -114,12 +117,12 @@ type MessageOptions struct { } type RequestOptions struct { - ContentType string - Stream bool // Other options for implementations of the interface // can be stored in a context - Context context.Context + Context context.Context + ContentType string + Stream bool } // NewOptions creates new Client options. diff --git a/client/rpc_client.go b/client/rpc_client.go index ad36e21d..ac179fb0 100644 --- a/client/rpc_client.go +++ b/client/rpc_client.go @@ -30,11 +30,12 @@ const ( ) type rpcClient struct { - seq uint64 - once atomic.Value opts Options + once atomic.Value pool pool.Pool + seq uint64 + mu sync.RWMutex } diff --git a/client/rpc_message.go b/client/rpc_message.go index 418c756f..0fb7e100 100644 --- a/client/rpc_message.go +++ b/client/rpc_message.go @@ -1,9 +1,9 @@ package client type message struct { + payload interface{} topic string contentType string - payload interface{} } func newMessage(topic string, payload interface{}, contentType string, opts ...MessageOption) Message { diff --git a/client/rpc_request.go b/client/rpc_request.go index 0c56b790..499ae5cc 100644 --- a/client/rpc_request.go +++ b/client/rpc_request.go @@ -5,13 +5,13 @@ import ( ) type rpcRequest struct { + opts RequestOptions + codec codec.Codec + body interface{} service string method string endpoint string contentType string - codec codec.Codec - body interface{} - opts RequestOptions } func newRequest(service, endpoint string, request interface{}, contentType string, reqOpts ...RequestOption) Request { diff --git a/client/rpc_response.go b/client/rpc_response.go index de7c9a00..cae0f262 100644 --- a/client/rpc_response.go +++ b/client/rpc_response.go @@ -6,10 +6,10 @@ import ( ) type rpcResponse struct { - header map[string]string - body []byte socket transport.Socket codec codec.Codec + header map[string]string + body []byte } func (r *rpcResponse) Codec() codec.Reader { diff --git a/client/rpc_stream.go b/client/rpc_stream.go index b3226b0c..b87b2b69 100644 --- a/client/rpc_stream.go +++ b/client/rpc_stream.go @@ -11,22 +11,23 @@ import ( // Implements the streamer interface. type rpcStream struct { - sync.RWMutex - id string - closed chan bool - // Indicates whether connection should be closed directly. - close bool err error request Request response Response codec codec.Codec context context.Context - // signal whether we should send EOS - sendEOS bool + closed chan bool // release releases the connection back to the pool release func(err error) + id string + sync.RWMutex + // Indicates whether connection should be closed directly. + close bool + + // signal whether we should send EOS + sendEOS bool } func (r *rpcStream) isClosed() bool { diff --git a/codec/codec.go b/codec/codec.go index 8266f2f3..fd725aa3 100644 --- a/codec/codec.go +++ b/codec/codec.go @@ -55,14 +55,15 @@ type Marshaler interface { // the communication, likely followed by the body. // In the case of an error, body may be nil. type Message struct { + + // The values read from the socket + Header map[string]string Id string - Type MessageType Target string Method string Endpoint string Error string - // The values read from the socket - Header map[string]string - Body []byte + Body []byte + Type MessageType } diff --git a/codec/jsonrpc/client.go b/codec/jsonrpc/client.go index 6b3e3284..935720a3 100644 --- a/codec/jsonrpc/client.go +++ b/codec/jsonrpc/client.go @@ -10,22 +10,24 @@ import ( ) type clientCodec struct { - dec *json.Decoder // for reading JSON values - enc *json.Encoder // for writing JSON values - c io.Closer // temporary work space req clientRequest resp clientResponse - sync.Mutex + c io.Closer + + dec *json.Decoder // for reading JSON values + enc *json.Encoder // for writing JSON values pending map[interface{}]string + + sync.Mutex } type clientRequest struct { - Method string `json:"method"` Params [1]interface{} `json:"params"` ID interface{} `json:"id"` + Method string `json:"method"` } type clientResponse struct { diff --git a/codec/jsonrpc/jsonrpc.go b/codec/jsonrpc/jsonrpc.go index a4b25497..cd1bef49 100644 --- a/codec/jsonrpc/jsonrpc.go +++ b/codec/jsonrpc/jsonrpc.go @@ -11,11 +11,11 @@ import ( ) type jsonCodec struct { - buf *bytes.Buffer - mt codec.MessageType rwc io.ReadWriteCloser + buf *bytes.Buffer c *clientCodec s *serverCodec + mt codec.MessageType } func (j *jsonCodec) Close() error { diff --git a/codec/jsonrpc/server.go b/codec/jsonrpc/server.go index 8ed5d889..e43df15a 100644 --- a/codec/jsonrpc/server.go +++ b/codec/jsonrpc/server.go @@ -19,9 +19,9 @@ type serverCodec struct { } type serverRequest struct { - Method string `json:"method"` - Params *json.RawMessage `json:"params"` ID interface{} `json:"id"` + Params *json.RawMessage `json:"params"` + Method string `json:"method"` } type serverResponse struct { diff --git a/codec/protorpc/protorpc.go b/codec/protorpc/protorpc.go index 030093b4..f5b66be2 100644 --- a/codec/protorpc/protorpc.go +++ b/codec/protorpc/protorpc.go @@ -17,10 +17,10 @@ type flusher interface { } type protoCodec struct { - sync.Mutex rwc io.ReadWriteCloser - mt codec.MessageType buf *bytes.Buffer + mt codec.MessageType + sync.Mutex } func (c *protoCodec) Close() error { diff --git a/config/config.go b/config/config.go index ba4b6c4d..3b2347a0 100644 --- a/config/config.go +++ b/config/config.go @@ -37,11 +37,12 @@ type Watcher interface { type Options struct { Loader loader.Loader Reader reader.Reader - Source []source.Source // for alternative data Context context.Context + Source []source.Source + WithWatcherDisabled bool } diff --git a/config/default.go b/config/default.go index 4d813d69..4d873c70 100644 --- a/config/default.go +++ b/config/default.go @@ -13,21 +13,21 @@ import ( ) type config struct { + // the current values + vals reader.Values exit chan bool + // the current snapshot + snap *loader.Snapshot opts Options sync.RWMutex - // the current snapshot - snap *loader.Snapshot - // the current values - vals reader.Values } type watcher struct { lw loader.Watcher rd reader.Reader - path []string value reader.Value + path []string } func newConfig(opts ...Option) (Config, error) { diff --git a/config/loader/loader.go b/config/loader/loader.go index eff82147..a36fd5d5 100644 --- a/config/loader/loader.go +++ b/config/loader/loader.go @@ -45,11 +45,12 @@ type Snapshot struct { // Options contains all options for a config loader. type Options struct { Reader reader.Reader - Source []source.Source // for alternative data Context context.Context + Source []source.Source + WithWatcherDisabled bool } diff --git a/config/loader/memory/memory.go b/config/loader/memory/memory.go index 25d10ace..ff96ea55 100644 --- a/config/loader/memory/memory.go +++ b/config/loader/memory/memory.go @@ -17,34 +17,35 @@ import ( ) type memory struct { - exit chan bool - opts loader.Options - - sync.RWMutex - // the current snapshot - snap *loader.Snapshot // the current values vals reader.Values + exit chan bool + // the current snapshot + snap *loader.Snapshot + + watchers *list.List + opts loader.Options + // all the sets sets []*source.ChangeSet // all the sources sources []source.Source - watchers *list.List + sync.RWMutex } type updateValue struct { - version string value reader.Value + version string } type watcher struct { - exit chan bool - path []string value reader.Value reader reader.Reader version atomic.Value + exit chan bool updates chan updateValue + path []string } func (w *watcher) getVersion() string { diff --git a/config/secrets/secrets.go b/config/secrets/secrets.go index 56c3ec93..bdab5d02 100644 --- a/config/secrets/secrets.go +++ b/config/secrets/secrets.go @@ -18,14 +18,14 @@ type Secrets interface { } type Options struct { + // Context for other opts + Context context.Context // Key is a symmetric key for encoding Key []byte // Private key for decoding PrivateKey []byte // Public key for encoding PublicKey []byte - // Context for other opts - Context context.Context } // Option sets options. diff --git a/config/source/env/env.go b/config/source/env/env.go index b7b0ff51..e30b08db 100644 --- a/config/source/env/env.go +++ b/config/source/env/env.go @@ -15,9 +15,9 @@ var ( ) type env struct { + opts source.Options prefixes []string strippedPrefixes []string - opts source.Options } func (e *env) Read() (*source.ChangeSet, error) { diff --git a/config/source/file/file.go b/config/source/file/file.go index a612d47a..b800c5c2 100644 --- a/config/source/file/file.go +++ b/config/source/file/file.go @@ -10,9 +10,9 @@ import ( ) type file struct { + opts source.Options fs fs.FS path string - opts source.Options } var ( diff --git a/config/source/memory/memory.go b/config/source/memory/memory.go index 8aa0776b..9cfcca45 100644 --- a/config/source/memory/memory.go +++ b/config/source/memory/memory.go @@ -10,9 +10,9 @@ import ( ) type memory struct { - sync.RWMutex ChangeSet *source.ChangeSet Watchers map[string]*watcher + sync.RWMutex } func (s *memory) Read() (*source.ChangeSet, error) { diff --git a/config/source/memory/watcher.go b/config/source/memory/watcher.go index e3d5011c..8d9ef7f8 100644 --- a/config/source/memory/watcher.go +++ b/config/source/memory/watcher.go @@ -5,9 +5,9 @@ import ( ) type watcher struct { - Id string Updates chan *source.ChangeSet Source *memory + Id string } func (w *watcher) Next() (*source.ChangeSet, error) { diff --git a/config/source/source.go b/config/source/source.go index 398a4bae..d1ce2c66 100644 --- a/config/source/source.go +++ b/config/source/source.go @@ -21,11 +21,11 @@ type Source interface { // ChangeSet represents a set of changes from a source. type ChangeSet struct { - Data []byte + Timestamp time.Time Checksum string Format string Source string - Timestamp time.Time + Data []byte } // Watcher watches a source for changes. diff --git a/debug/log/options.go b/debug/log/options.go index e03a70b0..770d252f 100644 --- a/debug/log/options.go +++ b/debug/log/options.go @@ -7,12 +7,12 @@ type Option func(*Options) // Options are logger options. type Options struct { + // Format specifies the output format + Format FormatFunc // Name of the log Name string // Size is the size of ring buffer Size int - // Format specifies the output format - Format FormatFunc } // Name of the log. diff --git a/debug/log/os.go b/debug/log/os.go index 55cc4bdf..0a880bae 100644 --- a/debug/log/os.go +++ b/debug/log/os.go @@ -10,11 +10,11 @@ import ( // Should stream from OS. type osLog struct { format FormatFunc - once sync.Once - - sync.RWMutex buffer *ring.Buffer subs map[string]*osStream + + sync.RWMutex + once sync.Once } type osStream struct { diff --git a/debug/profile/http/http.go b/debug/profile/http/http.go index 8931487e..d28ad12d 100644 --- a/debug/profile/http/http.go +++ b/debug/profile/http/http.go @@ -11,9 +11,9 @@ import ( ) type httpProfile struct { + server *http.Server sync.Mutex running bool - server *http.Server } var ( diff --git a/debug/profile/pprof/pprof.go b/debug/profile/pprof/pprof.go index d77e2c7d..354fbe6d 100644 --- a/debug/profile/pprof/pprof.go +++ b/debug/profile/pprof/pprof.go @@ -12,15 +12,15 @@ import ( ) type profiler struct { - opts profile.Options - - sync.Mutex - running bool // where the cpu profile is written cpuFile *os.File // where the mem profile is written memFile *os.File + opts profile.Options + + sync.Mutex + running bool } func (p *profiler) Start() error { diff --git a/debug/trace/default.go b/debug/trace/default.go index 18c81e25..c780485d 100644 --- a/debug/trace/default.go +++ b/debug/trace/default.go @@ -9,10 +9,10 @@ import ( ) type memTracer struct { - opts Options // ring buffer of traces buffer *ring.Buffer + opts Options } func (t *memTracer) Read(opts ...ReadOption) ([]*Span, error) { diff --git a/debug/trace/trace.go b/debug/trace/trace.go index d43f9e79..81d8624a 100644 --- a/debug/trace/trace.go +++ b/debug/trace/trace.go @@ -36,6 +36,10 @@ const ( // Span is used to record an entry. type Span struct { + // Start time + Started time.Time + // associated data + Metadata map[string]string // Id of the trace Trace string // name of the span @@ -44,12 +48,8 @@ type Span struct { Id string // parent span id Parent string - // Start time - Started time.Time // Duration in nano seconds Duration time.Duration - // associated data - Metadata map[string]string // Type Type SpanType } diff --git a/events/events.go b/events/events.go index 8c2dbf88..c2877571 100644 --- a/events/events.go +++ b/events/events.go @@ -38,19 +38,19 @@ type NackFunc func() error // Event is the object returned by the broker when you subscribe to a topic. type Event struct { - // ID to uniquely identify the event - ID string - // Topic of event, e.g. "registry.service.created" - Topic string // Timestamp of the event Timestamp time.Time // Metadata contains the values the event was indexed by Metadata map[string]string - // Payload contains the encoded message - Payload []byte ackFunc AckFunc nackFunc NackFunc + // ID to uniquely identify the event + ID string + // Topic of event, e.g. "registry.service.created" + Topic string + // Payload contains the encoded message + Payload []byte } // Unmarshal the events message into an object. diff --git a/events/memory.go b/events/memory.go index 210ecfaf..e3b19cf8 100644 --- a/events/memory.go +++ b/events/memory.go @@ -21,15 +21,16 @@ func NewStream(opts ...Option) (Stream, error) { } type subscriber struct { - Group string - Topic string Channel chan Event - sync.RWMutex retryMap map[string]int + Group string + Topic string retryLimit int - autoAck bool ackWait time.Duration + + sync.RWMutex + autoAck bool } type mem struct { diff --git a/events/options.go b/events/options.go index d0ff17eb..aa952a0d 100644 --- a/events/options.go +++ b/events/options.go @@ -25,9 +25,9 @@ func NewOptions(opts ...Option) *Options { } type StoreOptions struct { - TTL time.Duration Backup Backup Logger logger.Logger + TTL time.Duration } type StoreOption func(o *StoreOptions) @@ -66,21 +66,21 @@ func WithTimestamp(t time.Time) PublishOption { // ConsumeOptions contains all the options which can be provided when subscribing to a topic. type ConsumeOptions struct { - // Group is the name of the consumer group, if two consumers have the same group the events - // are distributed between them - Group string // Offset is the time from which the messages should be consumed from. If not provided then // the messages will be consumed starting from the moment the Subscription starts. Offset time.Time + // Group is the name of the consumer group, if two consumers have the same group the events + // are distributed between them + Group string + AckWait time.Duration + // RetryLimit indicates number of times a message is retried + RetryLimit int // AutoAck if true (default true), automatically acknowledges every message so it will not be redelivered. // If false specifies that each message need ts to be manually acknowledged by the subscriber. // If processing is successful the message should be ack'ed to remove the message from the stream. // If processing is unsuccessful the message should be nack'ed (negative acknowledgement) which will mean it will // remain on the stream to be processed again. AutoAck bool - AckWait time.Duration - // RetryLimit indicates number of times a message is retried - RetryLimit int // CustomRetries indicates whether to use RetryLimit CustomRetries bool } diff --git a/logger/default.go b/logger/default.go index ddd56bea..407811b8 100644 --- a/logger/default.go +++ b/logger/default.go @@ -23,8 +23,8 @@ func init() { } type defaultLogger struct { - sync.RWMutex opts Options + sync.RWMutex } // Init (opts...) should only overwrite provided options. diff --git a/logger/options.go b/logger/options.go index 505fea1b..48acd878 100644 --- a/logger/options.go +++ b/logger/options.go @@ -8,16 +8,16 @@ import ( type Option func(*Options) type Options struct { - // The logging level the logger should log at. default is `InfoLevel` - Level Level - // fields to always be logged - Fields map[string]interface{} // It's common to set this to a file, or leave it default which is `os.Stderr` Out io.Writer - // Caller skip frame count for file:line info - CallerSkipCount int // Alternative options Context context.Context + // fields to always be logged + Fields map[string]interface{} + // Caller skip frame count for file:line info + CallerSkipCount int + // The logging level the logger should log at. default is `InfoLevel` + Level Level } // WithFields set default fields for the logger. diff --git a/options.go b/options.go index ca1a74b8..174155e8 100644 --- a/options.go +++ b/options.go @@ -24,29 +24,31 @@ import ( // Options for micro service. type Options struct { - Auth auth.Auth - Broker broker.Broker - Cache cache.Cache - Cmd cmd.Cmd - Config config.Config - Client client.Client - Server server.Server - Store store.Store - Registry registry.Registry - Runtime runtime.Runtime - Transport transport.Transport - Profile profile.Profile - Logger logger.Logger - // Before and After funcs - BeforeStart []func() error - BeforeStop []func() error - AfterStart []func() error - AfterStop []func() error + Registry registry.Registry + Store store.Store + Auth auth.Auth + Cmd cmd.Cmd + Config config.Config + Client client.Client + Server server.Server // Other options for implementations of the interface // can be stored in a context Context context.Context + Cache cache.Cache + Runtime runtime.Runtime + Profile profile.Profile + Transport transport.Transport + Logger logger.Logger + Broker broker.Broker + // Before and After funcs + BeforeStart []func() error + AfterStart []func() error + AfterStop []func() error + + BeforeStop []func() error + Signal bool } diff --git a/registry/cache/cache.go b/registry/cache/cache.go index 693248a6..5cbd051e 100644 --- a/registry/cache/cache.go +++ b/registry/cache/cache.go @@ -23,20 +23,23 @@ type Cache interface { } type Options struct { + Logger log.Logger // TTL is the cache TTL TTL time.Duration - - Logger log.Logger } type Option func(o *Options) type cache struct { - registry.Registry opts Options - // registry cache - sync.RWMutex + registry.Registry + // status of the registry + // used to hold onto the cache + // in failure state + status error + // used to prevent cache breakdwon + sg singleflight.Group cache map[string][]*registry.Service ttls map[string]time.Time watched map[string]bool @@ -46,12 +49,9 @@ type cache struct { // indicate whether its running watchedRunning map[string]bool - // status of the registry - // used to hold onto the cache - // in failure state - status error - // used to prevent cache breakdwon - sg singleflight.Group + + // registry cache + sync.RWMutex } var ( diff --git a/registry/mdns_registry.go b/registry/mdns_registry.go index 4172b38a..d87324e5 100644 --- a/registry/mdns_registry.go +++ b/registry/mdns_registry.go @@ -26,43 +26,43 @@ var ( ) type mdnsTxt struct { + Metadata map[string]string Service string Version string Endpoints []*Endpoint - Metadata map[string]string } type mdnsEntry struct { - id string node *mdns.Server + id string } type mdnsRegistry struct { - opts *Options - // the mdns domain - domain string - - sync.Mutex + opts *Options services map[string][]*mdnsEntry - mtx sync.RWMutex - // watchers watchers map[string]*mdnsWatcher // listener listener chan *mdns.ServiceEntry + // the mdns domain + domain string + + mtx sync.RWMutex + + sync.Mutex } type mdnsWatcher struct { - id string wo WatchOptions ch chan *mdns.ServiceEntry exit chan struct{} - // the mdns domain - domain string // the registry registry *mdnsRegistry + id string + // the mdns domain + domain string } func encode(txt *mdnsTxt) ([]string, error) { diff --git a/registry/memory.go b/registry/memory.go index 695a2b06..bd15aeb1 100644 --- a/registry/memory.go +++ b/registry/memory.go @@ -14,9 +14,9 @@ var ( ) type node struct { - *Node - TTL time.Duration LastSeen time.Time + *Node + TTL time.Duration } type record struct { @@ -30,9 +30,10 @@ type record struct { type memRegistry struct { options *Options - sync.RWMutex records map[string]map[string]*record watchers map[string]*memWatcher + + sync.RWMutex } func NewMemoryRegistry(opts ...Option) Registry { diff --git a/registry/memory_watcher.go b/registry/memory_watcher.go index 8ecaf106..5cd110db 100644 --- a/registry/memory_watcher.go +++ b/registry/memory_watcher.go @@ -5,10 +5,10 @@ import ( ) type memWatcher struct { - id string wo WatchOptions res chan *Result exit chan bool + id string } func (m *memWatcher) Next() (*Result, error) { diff --git a/registry/options.go b/registry/options.go index 38359d89..932706df 100644 --- a/registry/options.go +++ b/registry/options.go @@ -9,30 +9,30 @@ import ( ) type Options struct { + Logger logger.Logger + // Other options for implementations of the interface + // can be stored in a context + Context context.Context + TLSConfig *tls.Config Addrs []string Timeout time.Duration Secure bool - TLSConfig *tls.Config - Logger logger.Logger - // Other options for implementations of the interface - // can be stored in a context - Context context.Context } type RegisterOptions struct { - TTL time.Duration // Other options for implementations of the interface // can be stored in a context Context context.Context + TTL time.Duration } type WatchOptions struct { - // Specify a service to watch - // If blank, the watch is for all services - Service string // Other options for implementations of the interface // can be stored in a context Context context.Context + // Specify a service to watch + // If blank, the watch is for all services + Service string } type DeregisterOptions struct { diff --git a/registry/registry.go b/registry/registry.go index f27d3be2..2ba5d56d 100644 --- a/registry/registry.go +++ b/registry/registry.go @@ -37,16 +37,16 @@ type Service struct { } type Node struct { + Metadata map[string]string `json:"metadata"` Id string `json:"id"` Address string `json:"address"` - Metadata map[string]string `json:"metadata"` } type Endpoint struct { - Name string `json:"name"` Request *Value `json:"request"` Response *Value `json:"response"` Metadata map[string]string `json:"metadata"` + Name string `json:"name"` } type Value struct { diff --git a/registry/watcher.go b/registry/watcher.go index c9d073ed..d689cb14 100644 --- a/registry/watcher.go +++ b/registry/watcher.go @@ -13,8 +13,8 @@ type Watcher interface { // Result is returned by a call to Next on // the watcher. Actions can be create, update, delete. type Result struct { - Action string Service *Service + Action string } // EventType defines registry event type. @@ -45,12 +45,12 @@ func (t EventType) String() string { // Event is registry event. type Event struct { - // Id is registry id - Id string - // Type defines type of event - Type EventType // Timestamp is event timestamp Timestamp time.Time // Service is registry service Service *Service + // Id is registry id + Id string + // Type defines type of event + Type EventType } diff --git a/runtime/default.go b/runtime/default.go index a0883313..9b4ceb0a 100644 --- a/runtime/default.go +++ b/runtime/default.go @@ -21,18 +21,18 @@ import ( const defaultNamespace = "default" type runtime struct { - sync.RWMutex // options configure runtime options *Options // used to stop the runtime closed chan bool // used to start new services start chan *service - // indicates if we're running - running bool // namespaces stores services grouped by namespace, e.g. namespaces["foo"]["go.micro.auth:latest"] // would return the latest version of go.micro.auth from the foo namespace namespaces map[string]map[string]*service + sync.RWMutex + // indicates if we're running + running bool } // NewRuntime creates new local runtime and returns it. @@ -434,13 +434,13 @@ func (r *runtime) Logs(s *Service, options ...LogsOption) (LogStream, error) { } type logStream struct { + err error + logger log.Logger tail *tail.Tail - service string stream chan LogRecord + stop chan bool + service string sync.Mutex - stop chan bool - err error - logger log.Logger } func (l *logStream) Chan() chan LogRecord { diff --git a/runtime/kubernetes/kubernetes.go b/runtime/kubernetes/kubernetes.go index 8b9fa281..50e44c13 100644 --- a/runtime/kubernetes/kubernetes.go +++ b/runtime/kubernetes/kubernetes.go @@ -15,17 +15,17 @@ import ( type action int type kubernetes struct { - sync.RWMutex - // options configure runtime - options *runtime.Options - // indicates if we're running - running bool - // used to stop the runtime - closed chan bool // client is kubernetes client client client.Client + // options configure runtime + options *runtime.Options + // used to stop the runtime + closed chan bool // namespaces which exist namespaces []client.Namespace + sync.RWMutex + // indicates if we're running + running bool } // namespaceExists returns a boolean indicating if a namespace exists. @@ -346,12 +346,12 @@ func (k *kubernetes) Logs(s *runtime.Service, options ...runtime.LogsOption) (ru } type kubeStream struct { + err error // the k8s log stream stream chan runtime.LogRecord + stop chan bool // the stop chan sync.Mutex - stop chan bool - err error } func (k *kubeStream) Error() error { diff --git a/runtime/kubernetes/logs.go b/runtime/kubernetes/logs.go index c7c81314..0d9af67a 100644 --- a/runtime/kubernetes/logs.go +++ b/runtime/kubernetes/logs.go @@ -15,9 +15,9 @@ import ( ) type klog struct { + options runtime.LogsOptions client client.Client serviceName string - options runtime.LogsOptions } func (k *klog) podLogStream(podName string, stream *kubeStream) error { diff --git a/runtime/local/build/build.go b/runtime/local/build/build.go index bbc9e335..0c518d6d 100644 --- a/runtime/local/build/build.go +++ b/runtime/local/build/build.go @@ -15,20 +15,20 @@ type Builder interface { // Source is the source of a build. type Source struct { - // Language is the language of code - Language string // Location of the source Repository *source.Repository + // Language is the language of code + Language string } // Package is micro service package. type Package struct { + // Source of the binary + Source *Source // Name of the binary Name string // Location of the binary Path string // Type of binary Type string - // Source of the binary - Source *Source } diff --git a/runtime/local/build/docker/docker.go b/runtime/local/build/docker/docker.go index e0e246fe..e207e90b 100644 --- a/runtime/local/build/docker/docker.go +++ b/runtime/local/build/docker/docker.go @@ -14,8 +14,8 @@ import ( ) type Builder struct { - Options build.Options Client *docker.Client + Options build.Options } func (d *Builder) Build(s *build.Source) (*build.Package, error) { diff --git a/runtime/local/git/git.go b/runtime/local/git/git.go index 2c034144..0705d795 100644 --- a/runtime/local/git/git.go +++ b/runtime/local/git/git.go @@ -205,8 +205,6 @@ const defaultRepo = "github.com/micro/services" // Source is not just git related @todo move. type Source struct { - // is it a local folder intended for a local runtime? - Local bool // absolute path to service folder in local mode FullPath string // path of folder to repo root @@ -220,6 +218,8 @@ type Source struct { // dir to repo root // blank for non local LocalRepoRoot string + // is it a local folder intended for a local runtime? + Local bool } // Name to be passed to RPC call runtime.Create Update Delete diff --git a/runtime/local/process/process.go b/runtime/local/process/process.go index 65e901cf..2b74fcf2 100644 --- a/runtime/local/process/process.go +++ b/runtime/local/process/process.go @@ -22,22 +22,22 @@ type Process interface { type Executable struct { // Package containing executable Package *build.Package + // Initial working directory + Dir string // The env variables Env []string // Args to pass Args []string - // Initial working directory - Dir string } // PID is the running process. type PID struct { - // ID of the process - ID string // Stdin Input io.Writer // Stdout Output io.Reader // Stderr Error io.Reader + // ID of the process + ID string } diff --git a/runtime/options.go b/runtime/options.go index a33683ba..ce5078bd 100644 --- a/runtime/options.go +++ b/runtime/options.go @@ -14,16 +14,16 @@ type Option func(o *Options) type Options struct { // Scheduler for updates Scheduler Scheduler + // Client to use when making requests + Client client.Client + // Logger underline logger + Logger logger.Logger // Service type to manage Type string // Source of the services repository Source string // Base image to use Image string - // Client to use when making requests - Client client.Client - // Logger underline logger - Logger logger.Logger } func NewOptions(opts ...Option) *Options { @@ -86,28 +86,30 @@ type ReadOption func(o *ReadOptions) // CreateOptions configure runtime services. type CreateOptions struct { + // Log output + Output io.Writer + // Specify the context to use + Context context.Context + // Type of service to create + Type string + // Specify the image to use + Image string + // Namespace to create the service in + Namespace string // Command to execut Command []string // Args to pass into command Args []string // Environment to configure Env []string - // Log output - Output io.Writer - // Type of service to create - Type string // Retries before failing deploy Retries int - // Specify the image to use - Image string - // Namespace to create the service in - Namespace string - // Specify the context to use - Context context.Context } // ReadOptions queries runtime services. type ReadOptions struct { + // Specify the context to use + Context context.Context // Service name Service string // Version queries services with given version @@ -116,8 +118,6 @@ type ReadOptions struct { Type string // Namespace the service is running in Namespace string - // Specify the context to use - Context context.Context } // CreateType sets the type of service to create. @@ -222,10 +222,10 @@ func ReadContext(ctx context.Context) ReadOption { type UpdateOption func(o *UpdateOptions) type UpdateOptions struct { - // Namespace the service is running in - Namespace string // Specify the context to use Context context.Context + // Namespace the service is running in + Namespace string } // UpdateNamespace sets the namespace. @@ -245,10 +245,10 @@ func UpdateContext(ctx context.Context) UpdateOption { type DeleteOption func(o *DeleteOptions) type DeleteOptions struct { - // Namespace the service is running in - Namespace string // Specify the context to use Context context.Context + // Namespace the service is running in + Namespace string } // DeleteNamespace sets the namespace. @@ -270,14 +270,14 @@ type LogsOption func(o *LogsOptions) // LogsOptions configure runtime logging. type LogsOptions struct { + // Specify the context to use + Context context.Context + // Namespace the service is running in + Namespace string // How many existing lines to show Count int64 // Stream new lines? Stream bool - // Namespace the service is running in - Namespace string - // Specify the context to use - Context context.Context } // LogsExistingCount confiures how many existing lines to show. diff --git a/runtime/runtime.go b/runtime/runtime.go index f54ca542..425a3754 100644 --- a/runtime/runtime.go +++ b/runtime/runtime.go @@ -45,8 +45,8 @@ type LogStream interface { } type LogRecord struct { - Message string Metadata map[string]string + Message string } // Scheduler is a runtime service scheduler. @@ -85,26 +85,26 @@ func (t EventType) String() string { // Event is notification event. type Event struct { - // ID of the event - ID string - // Type is event type - Type EventType // Timestamp is event timestamp Timestamp time.Time // Service the event relates to Service *Service // Options to use when processing the event Options *CreateOptions + // ID of the event + ID string + // Type is event type + Type EventType } // Service is runtime service. type Service struct { + // Metadata stores metadata + Metadata map[string]string // Name of the service Name string // Version of the service Version string // url location of source Source string - // Metadata stores metadata - Metadata map[string]string } diff --git a/runtime/service.go b/runtime/service.go index 2be41a8b..dd192617 100644 --- a/runtime/service.go +++ b/runtime/service.go @@ -15,29 +15,32 @@ import ( ) type service struct { - sync.RWMutex - - running bool - closed chan bool - err error updated time.Time - retries int - maxRetries int + // to be used logger + Logger log.Logger // output for logs output io.Writer - // service to manage - *Service + err error // process creator Process *proc.Process + closed chan bool + + // service to manage + *Service // Exec Exec *process.Executable // process pid PID *process.PID - // to be used logger - Logger log.Logger + + retries int + maxRetries int + + sync.RWMutex + + running bool } func newService(s *Service, c CreateOptions, l log.Logger) *service { diff --git a/selector/options.go b/selector/options.go index bea8ff2b..264bd4fc 100644 --- a/selector/options.go +++ b/selector/options.go @@ -19,12 +19,13 @@ type Options struct { } type SelectOptions struct { - Filters []Filter - Strategy Strategy // Other options for implementations of the interface // can be stored in a context - Context context.Context + Context context.Context + Strategy Strategy + + Filters []Filter } type Option func(*Options) diff --git a/server/handler.go b/server/handler.go index 12096434..cf5fa7b6 100644 --- a/server/handler.go +++ b/server/handler.go @@ -5,19 +5,19 @@ import "context" type HandlerOption func(*HandlerOptions) type HandlerOptions struct { - Internal bool Metadata map[string]map[string]string + Internal bool } type SubscriberOption func(*SubscriberOptions) type SubscriberOptions struct { + Context context.Context + Queue string // AutoAck defaults to true. When a handler returns // with a nil error the message is acked. AutoAck bool - Queue string Internal bool - Context context.Context } // EndpointMetadata is a Handler option that allows metadata to be added to diff --git a/server/mock/mock.go b/server/mock/mock.go index a6e65e84..f1093bc2 100644 --- a/server/mock/mock.go +++ b/server/mock/mock.go @@ -9,11 +9,11 @@ import ( ) type MockServer struct { - sync.Mutex - Running bool Opts server.Options Handlers map[string]server.Handler Subscribers map[string][]server.Subscriber + sync.Mutex + Running bool } var ( diff --git a/server/mock/mock_handler.go b/server/mock/mock_handler.go index 97925ec5..1ffdb978 100644 --- a/server/mock/mock_handler.go +++ b/server/mock/mock_handler.go @@ -6,9 +6,9 @@ import ( ) type MockHandler struct { - Id string Opts server.HandlerOptions Hdlr interface{} + Id string } func (m *MockHandler) Name() string { diff --git a/server/mock/mock_subscriber.go b/server/mock/mock_subscriber.go index 5ceafb6b..7c21cf90 100644 --- a/server/mock/mock_subscriber.go +++ b/server/mock/mock_subscriber.go @@ -6,9 +6,9 @@ import ( ) type MockSubscriber struct { - Id string Opts server.SubscriberOptions Sub interface{} + Id string } func (m *MockSubscriber) Topic() string { diff --git a/server/options.go b/server/options.go index 7bdd04c6..f4dc7311 100644 --- a/server/options.go +++ b/server/options.go @@ -40,38 +40,41 @@ func WithRouterLogger(l logger.Logger) RouterOption { } type Options struct { - Codecs map[string]codec.NewCodec - Broker broker.Broker - Registry registry.Registry - Tracer trace.Tracer - Transport transport.Transport - Metadata map[string]string - Name string - Address string - Advertise string - Id string - Version string - HdlrWrappers []HandlerWrapper - SubWrappers []SubscriberWrapper - ListenOptions []transport.ListenOption - Logger logger.Logger + Logger logger.Logger - // RegisterCheck runs a check function before registering the service - RegisterCheck func(context.Context) error - // The register expiry time - RegisterTTL time.Duration - // The interval on which to register - RegisterInterval time.Duration - - // The router for requests - Router Router - - // TLSConfig specifies tls.Config for secure serving - TLSConfig *tls.Config + Broker broker.Broker + Registry registry.Registry + Tracer trace.Tracer + Transport transport.Transport // Other options for implementations of the interface // can be stored in a context Context context.Context + + // The router for requests + Router Router + + // RegisterCheck runs a check function before registering the service + RegisterCheck func(context.Context) error + Metadata map[string]string + + // TLSConfig specifies tls.Config for secure serving + TLSConfig *tls.Config + + Codecs map[string]codec.NewCodec + Name string + Id string + Version string + Advertise string + Address string + HdlrWrappers []HandlerWrapper + ListenOptions []transport.ListenOption + SubWrappers []SubscriberWrapper + // The interval on which to register + RegisterInterval time.Duration + + // The register expiry time + RegisterTTL time.Duration } // NewOptions creates new server options. diff --git a/server/rpc_codec.go b/server/rpc_codec.go index 4f3a19a0..410a62fb 100644 --- a/server/rpc_codec.go +++ b/server/rpc_codec.go @@ -19,22 +19,23 @@ import ( ) type rpcCodec struct { - socket transport.Socket - codec codec.Codec - protocol string + socket transport.Socket + codec codec.Codec req *transport.Message buf *readWriteCloser + first chan bool + protocol string + // check if we're the first sync.RWMutex - first chan bool } type readWriteCloser struct { - sync.RWMutex wbuf *bytes.Buffer rbuf *bytes.Buffer + sync.RWMutex } var ( diff --git a/server/rpc_handler.go b/server/rpc_handler.go index 702e1c6c..2064f137 100644 --- a/server/rpc_handler.go +++ b/server/rpc_handler.go @@ -7,10 +7,10 @@ import ( ) type RpcHandler struct { - name string handler interface{} - endpoints []*registry.Endpoint opts HandlerOptions + name string + endpoints []*registry.Endpoint } func NewRpcHandler(handler interface{}, opts ...HandlerOption) Handler { diff --git a/server/rpc_request.go b/server/rpc_request.go index 8d55db46..7d9deedd 100644 --- a/server/rpc_request.go +++ b/server/rpc_request.go @@ -9,26 +9,26 @@ import ( ) type rpcRequest struct { + socket transport.Socket + codec codec.Codec + rawBody interface{} + header map[string]string service string method string endpoint string contentType string - socket transport.Socket - codec codec.Codec - header map[string]string body []byte - rawBody interface{} stream bool first bool } type rpcMessage struct { - topic string - contentType string payload interface{} header map[string]string - body []byte codec codec.NewCodec + topic string + contentType string + body []byte } func (r *rpcRequest) Codec() codec.Reader { diff --git a/server/rpc_router.go b/server/rpc_router.go index c0a80a22..1788db76 100644 --- a/server/rpc_router.go +++ b/server/rpc_router.go @@ -26,19 +26,19 @@ var ( ) type methodType struct { - sync.Mutex // protects counters - method reflect.Method ArgType reflect.Type ReplyType reflect.Type ContextType reflect.Type + method reflect.Method + sync.Mutex // protects counters stream bool } type service struct { - name string // name of service - rcvr reflect.Value // receiver of methods for the service typ reflect.Type // type of the receiver method map[string]*methodType // registered methods + rcvr reflect.Value // receiver of methods for the service + name string // name of service } type request struct { @@ -53,25 +53,29 @@ type response struct { // router represents an RPC router. type router struct { - name string - ops RouterOptions + ops RouterOptions - mu sync.Mutex // protects the serviceMap serviceMap map[string]*service - reqLock sync.Mutex // protects freeReq freeReq *request - respLock sync.Mutex // protects freeResp freeResp *response + subscribers map[string][]*subscriber + name string + // handler wrappers hdlrWrappers []HandlerWrapper // subscriber wrappers subWrappers []SubscriberWrapper - su sync.RWMutex - subscribers map[string][]*subscriber + su sync.RWMutex + + mu sync.Mutex // protects the serviceMap + + reqLock sync.Mutex // protects freeReq + + respLock sync.Mutex // protects freeResp } // rpcRouter encapsulates functions that become a Router. diff --git a/server/rpc_server.go b/server/rpc_server.go index cc0ed070..7a28bfb8 100644 --- a/server/rpc_server.go +++ b/server/rpc_server.go @@ -27,25 +27,26 @@ import ( ) type rpcServer struct { + opts Options + // Subscribe to service name + subscriber broker.Subscriber // Goal: // router Router router *router exit chan chan error - sync.RWMutex - opts Options handlers map[string]Handler subscribers map[Subscriber][]broker.Subscriber - // Marks the serve as started - started bool - // Used for first registration - registered bool - // Subscribe to service name - subscriber broker.Subscriber // Graceful exit wg *sync.WaitGroup // Cached service rsvc *registry.Service + + sync.RWMutex + // Marks the serve as started + started bool + // Used for first registration + registered bool } // NewRPCServer will create a new default RPC server. diff --git a/server/rpc_stream.go b/server/rpc_stream.go index 760f6dcd..0735ddd6 100644 --- a/server/rpc_stream.go +++ b/server/rpc_stream.go @@ -11,13 +11,13 @@ import ( // Implements the Streamer interface. type rpcStream struct { - sync.RWMutex - id string - closed bool err error request Request codec codec.Codec context context.Context + id string + sync.RWMutex + closed bool } func (r *rpcStream) Context() context.Context { diff --git a/server/rpc_util.go b/server/rpc_util.go index a5499370..d69e51de 100644 --- a/server/rpc_util.go +++ b/server/rpc_util.go @@ -6,10 +6,10 @@ import ( // waitgroup for global management of connections. type waitGroup struct { - // local waitgroup - lg sync.WaitGroup // global waitgroup gg *sync.WaitGroup + // local waitgroup + lg sync.WaitGroup } // NewWaitGroup returns a new double waitgroup for global management of processes. diff --git a/server/subscriber.go b/server/subscriber.go index 1993e482..47327257 100644 --- a/server/subscriber.go +++ b/server/subscriber.go @@ -12,19 +12,19 @@ const ( ) type handler struct { - method reflect.Value reqType reflect.Type ctxType reflect.Type + method reflect.Value } type subscriber struct { - topic string - rcvr reflect.Value + opts SubscriberOptions typ reflect.Type subscriber interface{} + rcvr reflect.Value + topic string handlers []*handler endpoints []*registry.Endpoint - opts SubscriberOptions } func newSubscriber(topic string, sub interface{}, opts ...SubscriberOption) Subscriber { diff --git a/store/memory.go b/store/memory.go index 6c14da95..8b84809c 100644 --- a/store/memory.go +++ b/store/memory.go @@ -32,10 +32,10 @@ type memoryStore struct { } type storeRecord struct { + expiresAt time.Time + metadata map[string]interface{} key string value []byte - metadata map[string]interface{} - expiresAt time.Time } func (m *memoryStore) key(prefix, key string) string { diff --git a/store/options.go b/store/options.go index ed07c690..f7ffc477 100644 --- a/store/options.go +++ b/store/options.go @@ -10,20 +10,20 @@ import ( // Options contains configuration for the Store. type Options struct { - // Nodes contains the addresses or other connection information of the backing storage. - // For example, an etcd implementation would contain the nodes of the cluster. - // A SQL implementation could contain one or more connection strings. - Nodes []string - // Database allows multiple isolated stores to be kept in one backend, if supported. - Database string - // Table is analogous to a table in database backends or a key prefix in KV backends - Table string // Context should contain all implementation specific options, using context.WithValue. Context context.Context // Client to use for RPC Client client.Client // Logger is the underline logger Logger logger.Logger + // Database allows multiple isolated stores to be kept in one backend, if supported. + Database string + // Table is analogous to a table in database backends or a key prefix in KV backends + Table string + // Nodes contains the addresses or other connection information of the backing storage. + // For example, an etcd implementation would contain the nodes of the cluster. + // A SQL implementation could contain one or more connection strings. + Nodes []string } // Option sets values in Options. @@ -127,9 +127,9 @@ func ReadOffset(o uint) ReadOption { // WriteOptions configures an individual Write operation // If Expiry and TTL are set TTL takes precedence. type WriteOptions struct { - Database, Table string // Expiry is the time the record expires - Expiry time.Time + Expiry time.Time + Database, Table string // TTL is the time until the record expires TTL time.Duration } diff --git a/store/store.go b/store/store.go index babe5b0e..1a8a0686 100644 --- a/store/store.go +++ b/store/store.go @@ -36,12 +36,12 @@ type Store interface { // Record is an item stored or retrieved from a Store. type Record struct { + // Any associated metadata for indexing + Metadata map[string]interface{} `json:"metadata"` // The key to store the record Key string `json:"key"` // The value within the record Value []byte `json:"value"` - // Any associated metadata for indexing - Metadata map[string]interface{} `json:"metadata"` // Time to expire a record: TODO: change to timestamp Expiry time.Duration `json:"expiry,omitempty"` } diff --git a/sync/sync.go b/sync/sync.go index 38a8994b..c81f13bf 100644 --- a/sync/sync.go +++ b/sync/sync.go @@ -39,11 +39,11 @@ type Leader interface { } type Options struct { - Nodes []string - Prefix string - TLSConfig *tls.Config Context context.Context Logger logger.Logger + TLSConfig *tls.Config + Prefix string + Nodes []string } type Option func(o *Options) diff --git a/transport/http_client.go b/transport/http_client.go index 86f11ef5..a051f69d 100644 --- a/transport/http_client.go +++ b/transport/http_client.go @@ -17,23 +17,25 @@ import ( ) type httpTransportClient struct { - ht *httpTransport - addr string - conn net.Conn dialOpts DialOptions - once sync.Once + conn net.Conn + ht *httpTransport + + // request must be stored for response processing + req chan *http.Request + buff *bufio.Reader + addr string + + // local/remote ip + local string + remote string + reqList []*http.Request sync.RWMutex - // request must be stored for response processing - req chan *http.Request - reqList []*http.Request - buff *bufio.Reader - closed bool + once sync.Once - // local/remote ip - local string - remote string + closed bool } func (h *httpTransportClient) Local() string { diff --git a/transport/http_socket.go b/transport/http_socket.go index 294c517b..7db4b27a 100644 --- a/transport/http_socket.go +++ b/transport/http_socket.go @@ -13,15 +13,14 @@ import ( ) type httpTransportSocket struct { - ht *httpTransport - w http.ResponseWriter - r *http.Request - rw *bufio.ReadWriter - - mtx sync.RWMutex + w http.ResponseWriter // the hijacked when using http 1 conn net.Conn + ht *httpTransport + r *http.Request + rw *bufio.ReadWriter + // for the first request ch chan *http.Request @@ -33,6 +32,8 @@ type httpTransportSocket struct { // local/remote ip local string remote string + + mtx sync.RWMutex } func (h *httpTransportSocket) Local() string { diff --git a/transport/memory.go b/transport/memory.go index 4cfe47a3..76a568cb 100644 --- a/transport/memory.go +++ b/transport/memory.go @@ -16,8 +16,7 @@ import ( ) type memorySocket struct { - // True server mode, False client mode - server bool + ctx context.Context // Client receiver of io.Pipe with gob crecv *gob.Decoder // Client sender of the io.Pipe with gob @@ -36,7 +35,8 @@ type memorySocket struct { // for send/recv Timeout timeout time.Duration - ctx context.Context + // True server mode, False client mode + server bool } type memoryClient struct { @@ -45,19 +45,19 @@ type memoryClient struct { } type memoryListener struct { - addr string + lopts ListenOptions + ctx context.Context exit chan bool conn chan *memorySocket - lopts ListenOptions topts Options + addr string sync.RWMutex - ctx context.Context } type memoryTransport struct { - opts Options - sync.RWMutex listeners map[string]*memoryListener + opts Options + sync.RWMutex } func (ms *memorySocket) Recv(m *Message) error { diff --git a/transport/options.go b/transport/options.go index 2f5c894b..20fef7b8 100644 --- a/transport/options.go +++ b/transport/options.go @@ -15,39 +15,30 @@ var ( ) type Options struct { - // Addrs is the list of intermediary addresses to connect to - Addrs []string // Codec is the codec interface to use where headers are not supported // by the transport and the entire payload must be encoded Codec codec.Marshaler - // Secure tells the transport to secure the connection. - // In the case TLSConfig is not specified best effort self-signed - // certs should be used - Secure bool - // TLSConfig to secure the connection. The assumption is that this - // is mTLS keypair - TLSConfig *tls.Config - // Timeout sets the timeout for Send/Recv - Timeout time.Duration // Other options for implementations of the interface // can be stored in a context Context context.Context // Logger is the underline logger Logger logger.Logger + // TLSConfig to secure the connection. The assumption is that this + // is mTLS keypair + TLSConfig *tls.Config + // Addrs is the list of intermediary addresses to connect to + Addrs []string + // Timeout sets the timeout for Send/Recv + Timeout time.Duration // BuffSizeH2 is the HTTP2 buffer size BuffSizeH2 int + // Secure tells the transport to secure the connection. + // In the case TLSConfig is not specified best effort self-signed + // certs should be used + Secure bool } type DialOptions struct { - // Tells the transport this is a streaming connection with - // multiple calls to send/recv and that send may not even be called - Stream bool - // Timeout for dialing - Timeout time.Duration - // ConnClose sets the Connection header to close - ConnClose bool - // InsecureSkipVerify skip TLS verification. - InsecureSkipVerify bool // TODO: add tls options when dialing // Currently set in global options @@ -55,6 +46,15 @@ type DialOptions struct { // Other options for implementations of the interface // can be stored in a context Context context.Context + // Timeout for dialing + Timeout time.Duration + // Tells the transport this is a streaming connection with + // multiple calls to send/recv and that send may not even be called + Stream bool + // ConnClose sets the Connection header to close + ConnClose bool + // InsecureSkipVerify skip TLS verification. + InsecureSkipVerify bool } type ListenOptions struct { diff --git a/util/cmd/options.go b/util/cmd/options.go index 68a784a1..0595f5fa 100644 --- a/util/cmd/options.go +++ b/util/cmd/options.go @@ -19,28 +19,31 @@ import ( ) type Options struct { - // For the Command Line itself - Name string - Description string - Version string - // We need pointers to things so we can swap them out if needed. - Broker *broker.Broker - Registry *registry.Registry - Selector *selector.Selector + // Other options for implementations of the interface + // can be stored in a context + Context context.Context + Auth *auth.Auth + Selector *selector.Selector + Profile *profile.Profile + + Registry *registry.Registry + + Brokers map[string]func(...broker.Option) broker.Broker Transport *transport.Transport Cache *cache.Cache Config *config.Config Client *client.Client Server *server.Server Runtime *runtime.Runtime - Store *store.Store + Caches map[string]func(...cache.Option) cache.Cache Tracer *trace.Tracer - Auth *auth.Auth - Profile *profile.Profile + Profiles map[string]func(...profile.Option) profile.Profile - Brokers map[string]func(...broker.Option) broker.Broker - Caches map[string]func(...cache.Option) cache.Cache + // We need pointers to things so we can swap them out if needed. + Broker *broker.Broker + Auths map[string]func(...auth.Option) auth.Auth + Store *store.Store Configs map[string]func(...config.Option) (config.Config, error) Clients map[string]func(...client.Option) client.Client Registries map[string]func(...registry.Option) registry.Registry @@ -50,12 +53,11 @@ type Options struct { Runtimes map[string]func(...runtime.Option) runtime.Runtime Stores map[string]func(...store.Option) store.Store Tracers map[string]func(...trace.Option) trace.Tracer - Auths map[string]func(...auth.Option) auth.Auth - Profiles map[string]func(...profile.Option) profile.Profile + Version string - // Other options for implementations of the interface - // can be stored in a context - Context context.Context + // For the Command Line itself + Name string + Description string } // Command line Name. diff --git a/util/file/handler.go b/util/file/handler.go index de7502e2..3c2fce06 100644 --- a/util/file/handler.go +++ b/util/file/handler.go @@ -30,9 +30,9 @@ func RegisterHandler(s server.Server, readDir string) { } type handler struct { - readDir string - session *session logger log.Logger + session *session + readDir string } func (h *handler) Open(ctx context.Context, req *proto.OpenRequest, rsp *proto.OpenResponse) error { @@ -120,9 +120,9 @@ func (h *handler) Write(ctx context.Context, req *proto.WriteRequest, rsp *proto } type session struct { - sync.Mutex files map[int64]*os.File counter int64 + sync.Mutex } func (s *session) Add(file *os.File) int64 { diff --git a/util/kubernetes/api/request.go b/util/kubernetes/api/request.go index 513a7cb1..77991307 100644 --- a/util/kubernetes/api/request.go +++ b/util/kubernetes/api/request.go @@ -16,20 +16,20 @@ import ( // Request is used to construct a http request for the k8s API. type Request struct { // the request context - context context.Context - client *http.Client - header http.Header - params url.Values - method string - host string - namespace string + context context.Context + body io.Reader - resource string + err error + client *http.Client + header http.Header + params url.Values resourceName *string subResource *string - body io.Reader + method string + host string + namespace string - err error + resource string } // Params is the object to pass in to set parameters @@ -246,10 +246,10 @@ func (r *Request) Raw() (*http.Response, error) { // Options ... type Options struct { - Host string - Namespace string BearerToken *string Client *http.Client + Host string + Namespace string } // NewRequest creates a k8s api request. diff --git a/util/kubernetes/client/options.go b/util/kubernetes/client/options.go index 96fa6697..9e60f549 100644 --- a/util/kubernetes/client/options.go +++ b/util/kubernetes/client/options.go @@ -5,8 +5,8 @@ type CreateOptions struct { } type GetOptions struct { - Namespace string Labels map[string]string + Namespace string } type UpdateOptions struct { Namespace string @@ -19,13 +19,13 @@ type ListOptions struct { } type LogOptions struct { - Namespace string Params map[string]string + Namespace string } type WatchOptions struct { - Namespace string Params map[string]string + Namespace string } type CreateOption func(*CreateOptions) diff --git a/util/kubernetes/client/types.go b/util/kubernetes/client/types.go index 8a68a891..e54c1aa0 100644 --- a/util/kubernetes/client/types.go +++ b/util/kubernetes/client/types.go @@ -3,9 +3,9 @@ package client // ContainerPort. type ContainerPort struct { Name string `json:"name,omitempty"` + Protocol string `json:"protocol,omitempty"` HostPort int `json:"hostPort,omitempty"` ContainerPort int `json:"containerPort"` - Protocol string `json:"protocol,omitempty"` } // EnvVar is environment variable. @@ -32,9 +32,9 @@ type Container struct { // DeploymentSpec defines micro deployment spec. type DeploymentSpec struct { - Replicas int `json:"replicas,omitempty"` Selector *LabelSelector `json:"selector"` Template *Template `json:"template,omitempty"` + Replicas int `json:"replicas,omitempty"` } // DeploymentCondition describes the state of deployment. @@ -47,12 +47,12 @@ type DeploymentCondition struct { // DeploymentStatus is returned when querying deployment. type DeploymentStatus struct { + Conditions []DeploymentCondition `json:"conditions,omitempty"` Replicas int `json:"replicas,omitempty"` UpdatedReplicas int `json:"updatedReplicas,omitempty"` ReadyReplicas int `json:"readyReplicas,omitempty"` AvailableReplicas int `json:"availableReplicas,omitempty"` UnavailableReplicas int `json:"unavailableReplicas,omitempty"` - Conditions []DeploymentCondition `json:"conditions,omitempty"` } // Deployment is Kubernetes deployment. @@ -84,17 +84,17 @@ type LoadBalancerStatus struct { // Metadata defines api object metadata. type Metadata struct { + Labels map[string]string `json:"labels,omitempty"` + Annotations map[string]string `json:"annotations,omitempty"` Name string `json:"name,omitempty"` Namespace string `json:"namespace,omitempty"` Version string `json:"version,omitempty"` - Labels map[string]string `json:"labels,omitempty"` - Annotations map[string]string `json:"annotations,omitempty"` } // PodSpec is a pod. type PodSpec struct { - Containers []Container `json:"containers"` ServiceAccountName string `json:"serviceAccountName"` + Containers []Container `json:"containers"` } // PodList. @@ -111,11 +111,11 @@ type Pod struct { // PodStatus. type PodStatus struct { - Conditions []PodCondition `json:"conditions,omitempty"` - Containers []ContainerStatus `json:"containerStatuses"` PodIP string `json:"podIP"` Phase string `json:"phase"` Reason string `json:"reason"` + Conditions []PodCondition `json:"conditions,omitempty"` + Containers []ContainerStatus `json:"containerStatuses"` } // PodCondition describes the state of pod. @@ -137,16 +137,16 @@ type ContainerState struct { // Resource is API resource. type Resource struct { + Value interface{} Name string Kind string - Value interface{} } // ServicePort configures service ports. type ServicePort struct { Name string `json:"name,omitempty"` - Port int `json:"port"` Protocol string `json:"protocol,omitempty"` + Port int `json:"port"` } // ServiceSpec provides service configuration. @@ -197,9 +197,9 @@ type ImagePullSecret struct { // Secret. type Secret struct { - Type string `json:"type,omitempty"` Data map[string]string `json:"data"` Metadata *Metadata `json:"metadata"` + Type string `json:"type,omitempty"` } // ServiceAccount. diff --git a/util/mdns/client.go b/util/mdns/client.go index 4e2a7acc..d2741003 100644 --- a/util/mdns/client.go +++ b/util/mdns/client.go @@ -18,16 +18,17 @@ import ( type ServiceEntry struct { Name string Host string + Info string AddrV4 net.IP AddrV6 net.IP - Port int - Info string InfoFields []string - TTL int - Type uint16 Addr net.IP // @Deprecated + Port int + TTL int + Type uint16 + hasTXT bool sent bool } @@ -39,13 +40,13 @@ func (s *ServiceEntry) complete() bool { // QueryParam is used to customize how a Lookup is performed. type QueryParam struct { - Service string // Service to lookup - Domain string // Lookup domain, default "local" - Type uint16 // Lookup type, defaults to dns.TypePTR Context context.Context // Context - Timeout time.Duration // Lookup timeout, default 1 second. Ignored if Context is provided Interface *net.Interface // Multicast interface to use Entries chan<- *ServiceEntry // Entries Channel + Service string // Service to lookup + Domain string // Lookup domain, default "local" + Timeout time.Duration // Lookup timeout, default 1 second. Ignored if Context is provided + Type uint16 // Lookup type, defaults to dns.TypePTR WantUnicastResponse bool // Unicast response desired, as per 5.4 in RFC } @@ -170,9 +171,10 @@ type client struct { ipv4MulticastConn *net.UDPConn ipv6MulticastConn *net.UDPConn - closed bool closedCh chan struct{} // TODO(reddaly): This doesn't appear to be used. closeLock sync.Mutex + + closed bool } // NewClient creates a new mdns Client that can be used to query diff --git a/util/mdns/server.go b/util/mdns/server.go index 2e68fcdb..4b2c1f28 100644 --- a/util/mdns/server.go +++ b/util/mdns/server.go @@ -53,11 +53,12 @@ type Config struct { // is used. Iface *net.Interface + // GetMachineIP is a function to return the IP of the local machine + GetMachineIP GetMachineIP + // Port If it is not 0, replace the port 5353 with this port number. Port int - // GetMachineIP is a function to return the IP of the local machine - GetMachineIP GetMachineIP // LocalhostChecking if enabled asks the server to also send responses to 0.0.0.0 if the target IP // is this host (as defined by GetMachineIP). Useful in case machine is on a VPN which blocks comms on non standard ports LocalhostChecking bool @@ -71,12 +72,14 @@ type Server struct { ipv4List *net.UDPConn ipv6List *net.UDPConn - shutdown bool - shutdownCh chan struct{} - shutdownLock sync.Mutex - wg sync.WaitGroup + shutdownCh chan struct{} outboundIP net.IP + wg sync.WaitGroup + + shutdownLock sync.Mutex + + shutdown bool } // NewServer is used to create a new mDNS server from a config. diff --git a/util/mdns/zone.go b/util/mdns/zone.go index d5bfbe14..b7898c9d 100644 --- a/util/mdns/zone.go +++ b/util/mdns/zone.go @@ -28,13 +28,13 @@ type MDNSService struct { Service string // Service name (e.g. "_http._tcp.") Domain string // If blank, assumes "local" HostName string // Host machine DNS name (e.g. "mymachine.net.") - Port int // Service Port + serviceAddr string // Fully qualified service address + instanceAddr string // Fully qualified instance address + enumAddr string // _services._dns-sd._udp. IPs []net.IP // IP addresses for the service's host TXT []string // Service TXT records + Port int // Service Port TTL uint32 - serviceAddr string // Fully qualified service address - instanceAddr string // Fully qualified instance address - enumAddr string // _services._dns-sd._udp. } // validateFQDN returns an error if the passed string is not a fully qualified diff --git a/util/pki/certoptions.go b/util/pki/certoptions.go index 2a9e0cf1..abd11d94 100644 --- a/util/pki/certoptions.go +++ b/util/pki/certoptions.go @@ -11,17 +11,18 @@ import ( // CertOptions are passed to cert options. type CertOptions struct { - IsCA bool - Subject pkix.Name - DNSNames []string - IPAddresses []net.IP - SerialNumber *big.Int - NotBefore time.Time - NotAfter time.Time + NotBefore time.Time + NotAfter time.Time - Parent *x509.Certificate - Pub ed25519.PublicKey - Priv ed25519.PrivateKey + SerialNumber *big.Int + + Parent *x509.Certificate + Subject pkix.Name + DNSNames []string + IPAddresses []net.IP + Pub ed25519.PublicKey + Priv ed25519.PrivateKey + IsCA bool } // CertOption sets CertOptions. diff --git a/util/pool/default.go b/util/pool/default.go index f5d57f65..2ac1212b 100644 --- a/util/pool/default.go +++ b/util/pool/default.go @@ -10,18 +10,19 @@ import ( ) type pool struct { - size int - ttl time.Duration - tr transport.Transport + tr transport.Transport + + conns map[string][]*poolConn + size int + ttl time.Duration sync.Mutex - conns map[string][]*poolConn } type poolConn struct { - transport.Client - id string created time.Time + transport.Client + id string } func newPool(options Options) *pool { diff --git a/util/ring/buffer.go b/util/ring/buffer.go index 344c971c..658735d4 100644 --- a/util/ring/buffer.go +++ b/util/ring/buffer.go @@ -10,11 +10,11 @@ import ( // Buffer is ring buffer. type Buffer struct { - size int + streams map[string]*Stream + vals []*Entry + size int sync.RWMutex - vals []*Entry - streams map[string]*Stream } // Entry is ring buffer data entry. @@ -25,12 +25,12 @@ type Entry struct { // Stream is used to stream the buffer. type Stream struct { - // Id of the stream - Id string // Buffered entries Entries chan *Entry // Stop channel Stop chan bool + // Id of the stream + Id string } // Put adds a new value to ring buffer. diff --git a/util/socket/pool.go b/util/socket/pool.go index e2125dc0..080699b0 100644 --- a/util/socket/pool.go +++ b/util/socket/pool.go @@ -5,8 +5,8 @@ import ( ) type Pool struct { - sync.RWMutex pool map[string]*Socket + sync.RWMutex } func (p *Pool) Get(id string) (*Socket, bool) { diff --git a/util/socket/socket.go b/util/socket/socket.go index 03e4b6b8..d9cb86a8 100644 --- a/util/socket/socket.go +++ b/util/socket/socket.go @@ -9,17 +9,17 @@ import ( // Socket is our pseudo socket for transport.Socket. type Socket struct { - id string // closed closed chan bool - // remote addr - remote string - // local addr - local string // send chan send chan *transport.Message // recv chan recv chan *transport.Message + id string + // remote addr + remote string + // local addr + local string } func (s *Socket) SetLocal(l string) { diff --git a/util/stream/stream.go b/util/stream/stream.go index 61cc2170..3dfec0cb 100644 --- a/util/stream/stream.go +++ b/util/stream/stream.go @@ -21,9 +21,10 @@ type Stream interface { type stream struct { Stream - sync.RWMutex err error request *request + + sync.RWMutex } type request struct { diff --git a/util/sync/manager.go b/util/sync/manager.go index 051e1605..87e8434a 100644 --- a/util/sync/manager.go +++ b/util/sync/manager.go @@ -8,9 +8,9 @@ import ( ) type operation struct { - operation action - record *store.Record deadline time.Time + record *store.Record + operation action retries int maxiumum int } diff --git a/util/sync/sync.go b/util/sync/sync.go index 14fb3d8e..dff7d21d 100644 --- a/util/sync/sync.go +++ b/util/sync/sync.go @@ -21,9 +21,9 @@ type Sync interface { type syncStore struct { storeOpts store.Options - syncOpts Options pendingWrites []*deque.Deque pendingWriteTickers []*time.Ticker + syncOpts Options sync.RWMutex } @@ -108,7 +108,7 @@ func (c *syncStore) Sync() error { } type internalRecord struct { + expiresAt time.Time key string value []byte - expiresAt time.Time } diff --git a/util/wrapper/wrapper.go b/util/wrapper/wrapper.go index ce6f5abe..de9ee0df 100644 --- a/util/wrapper/wrapper.go +++ b/util/wrapper/wrapper.go @@ -69,8 +69,9 @@ func HandlerStats(stats stats.Stats) server.HandlerWrapper { type traceWrapper struct { client.Client - name string trace trace.Tracer + + name string } func (c *traceWrapper) Call(ctx context.Context, req client.Request, rsp interface{}, opts ...client.CallOption) error { diff --git a/web/options.go b/web/options.go index 367ebe32..6a6b35ce 100644 --- a/web/options.go +++ b/web/options.go @@ -14,41 +14,48 @@ import ( // Options for web. type Options struct { - Name string - Version string - Id string - Metadata map[string]string - Address string - Advertise string - - Action func(*cli.Context) - Flags []cli.Flag - - RegisterTTL time.Duration - RegisterInterval time.Duration - - // RegisterCheck runs a check function before registering the service - RegisterCheck func(context.Context) error - - Server *http.Server Handler http.Handler + Logger logger.Logger + + Service micro.Service + + Registry registry.Registry + // Alternative Options Context context.Context - Registry registry.Registry - Service micro.Service - Logger logger.Logger + Action func(*cli.Context) + Metadata map[string]string + TLSConfig *tls.Config + + Server *http.Server + + // RegisterCheck runs a check function before registering the service + RegisterCheck func(context.Context) error + + Version string + + // Static directory + StaticDir string + + Advertise string + + Address string + Name string + Id string + Flags []cli.Flag - Secure bool - TLSConfig *tls.Config BeforeStart []func() error BeforeStop []func() error AfterStart []func() error AfterStop []func() error - // Static directory - StaticDir string + RegisterInterval time.Duration + + RegisterTTL time.Duration + + Secure bool Signal bool } diff --git a/web/service.go b/web/service.go index 871d45f0..74bde1f2 100644 --- a/web/service.go +++ b/web/service.go @@ -25,16 +25,16 @@ import ( ) type service struct { - opts Options - mux *http.ServeMux srv *registry.Service + exit chan chan error + ex chan bool + opts Options + sync.RWMutex running bool static bool - exit chan chan error - ex chan bool } func newService(opts ...Option) Service {