mirror of
https://github.com/go-micro/go-micro.git
synced 2024-12-30 10:10:44 +02:00
fix: linting issues (#2566)
This commit is contained in:
parent
85c0b0b8eb
commit
010b1d9f11
15
.github/ISSUE_TEMPLATE/bug_report.md
vendored
15
.github/ISSUE_TEMPLATE/bug_report.md
vendored
@ -2,23 +2,24 @@
|
||||
name: Bug report
|
||||
about: For reporting bugs in go-micro
|
||||
title: "[BUG]"
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
labels: ""
|
||||
assignees: ""
|
||||
---
|
||||
|
||||
**Describe the bug**
|
||||
## Describe the bug
|
||||
|
||||
1. What are you trying to do?
|
||||
2. What did you expect to happen?
|
||||
3. What happens instead?
|
||||
|
||||
**How to reproduce the bug:**
|
||||
## How to reproduce the bug
|
||||
|
||||
If possible, please include a minimal code snippet here.
|
||||
|
||||
**Environment:**
|
||||
## Environment
|
||||
|
||||
Go Version: please paste `go version` output here
|
||||
```
|
||||
|
||||
```go
|
||||
please paste `go env` output here
|
||||
```
|
||||
|
@ -2,9 +2,8 @@
|
||||
name: Feature request / Enhancement
|
||||
about: If you have a need not served by go-micro
|
||||
title: "[FEATURE]"
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
labels: ""
|
||||
assignees: ""
|
||||
---
|
||||
|
||||
**Is your feature request related to a problem? Please describe.**
|
||||
|
7
.github/ISSUE_TEMPLATE/question.md
vendored
7
.github/ISSUE_TEMPLATE/question.md
vendored
@ -1,10 +1,9 @@
|
||||
---
|
||||
name: Question
|
||||
about: Ask a question about go-micro
|
||||
title: ''
|
||||
labels: ''
|
||||
assignees: ''
|
||||
|
||||
title: ""
|
||||
labels: ""
|
||||
assignees: ""
|
||||
---
|
||||
|
||||
Before asking, please check if your question has already been answered:
|
||||
|
5
.github/PULL_REQUEST_TEMPLATE.md
vendored
5
.github/PULL_REQUEST_TEMPLATE.md
vendored
@ -1,4 +1,5 @@
|
||||
## Pull Request template
|
||||
# Pull Request template
|
||||
|
||||
Please, go through these steps before clicking submit on this PR.
|
||||
|
||||
1. Make sure this PR targets the `develop` branch. We follow the git-flow branching model.
|
||||
@ -7,4 +8,4 @@ Please, go through these steps before clicking submit on this PR.
|
||||
4. Make sure you have some relevant tests.
|
||||
5. Put `closes #XXXX` in your comment to auto-close the issue that your PR fixes (if applicable).
|
||||
|
||||
**PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING**
|
||||
## PLEASE REMOVE THIS TEMPLATE BEFORE SUBMITTING
|
||||
|
90
.github/workflows/tests.yaml
vendored
90
.github/workflows/tests.yaml
vendored
@ -2,64 +2,64 @@ name: Run Tests
|
||||
on:
|
||||
push:
|
||||
branches:
|
||||
- '**'
|
||||
- "**"
|
||||
pull_request:
|
||||
types:
|
||||
- opened
|
||||
- reopened
|
||||
- opened
|
||||
- reopened
|
||||
branches:
|
||||
- '**'
|
||||
- "**"
|
||||
jobs:
|
||||
golangci:
|
||||
name: Lint
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.19
|
||||
check-latest: true
|
||||
cache: true
|
||||
- name: golangci-lint
|
||||
uses: golangci/golangci-lint-action@v3
|
||||
- uses: actions/checkout@v3
|
||||
- uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.19
|
||||
check-latest: true
|
||||
cache: true
|
||||
- name: golangci-lint
|
||||
uses: golangci/golangci-lint-action@v3
|
||||
unittests:
|
||||
name: Unit Tests
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.19
|
||||
check-latest: true
|
||||
cache: true
|
||||
- name: Get dependencies
|
||||
run: |
|
||||
go install github.com/kyoh86/richgo@latest
|
||||
go get -v -t -d ./...
|
||||
- name: Run tests
|
||||
id: tests
|
||||
run: richgo test -v -race -cover ./...
|
||||
env:
|
||||
IN_TRAVIS_CI: yes
|
||||
RICHGO_FORCE_COLOR: 1
|
||||
- uses: actions/checkout@v3
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.19
|
||||
check-latest: true
|
||||
cache: true
|
||||
- name: Get dependencies
|
||||
run: |
|
||||
go install github.com/kyoh86/richgo@latest
|
||||
go get -v -t -d ./...
|
||||
- name: Run tests
|
||||
id: tests
|
||||
run: richgo test -v -race -cover ./...
|
||||
env:
|
||||
IN_TRAVIS_CI: yes
|
||||
RICHGO_FORCE_COLOR: 1
|
||||
summary:
|
||||
name: Summary Report
|
||||
runs-on: ubuntu-latest
|
||||
steps:
|
||||
- uses: actions/checkout@v3
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.19
|
||||
check-latest: true
|
||||
cache: true
|
||||
- name: Get dependencies
|
||||
run: |
|
||||
go install github.com/mfridman/tparse@latest
|
||||
go get -v -t -d ./...
|
||||
- name: Run tests
|
||||
id: tests
|
||||
run: go test -v -race -cover -json ./... | tparse -notests -format=markdown >> $GITHUB_STEP_SUMMARY
|
||||
env:
|
||||
IN_TRAVIS_CI: yes
|
||||
- uses: actions/checkout@v3
|
||||
- name: Set up Go
|
||||
uses: actions/setup-go@v3
|
||||
with:
|
||||
go-version: 1.19
|
||||
check-latest: true
|
||||
cache: true
|
||||
- name: Get dependencies
|
||||
run: |
|
||||
go install github.com/mfridman/tparse@latest
|
||||
go get -v -t -d ./...
|
||||
- name: Run tests
|
||||
id: tests
|
||||
run: go test -v -race -cover -json ./... | tparse -notests -format=markdown >> $GITHUB_STEP_SUMMARY
|
||||
env:
|
||||
IN_TRAVIS_CI: yes
|
||||
|
1
.gitignore
vendored
1
.gitignore
vendored
@ -1,6 +1,7 @@
|
||||
# Develop tools
|
||||
/.vscode/
|
||||
/.idea/
|
||||
/.trunk
|
||||
|
||||
# Binaries for programs and plugins
|
||||
*.exe
|
||||
|
@ -20,7 +20,8 @@ run:
|
||||
# won't be reported. Default value is empty list, but there is
|
||||
# no need to include all autogenerated files, we confidently recognize
|
||||
# autogenerated files. If it's not please let us know.
|
||||
skip-files: []
|
||||
skip-files:
|
||||
[]
|
||||
# - .*\\.pb\\.go$
|
||||
|
||||
allow-parallel-runners: true
|
||||
@ -186,7 +187,7 @@ linters:
|
||||
|
||||
# Can be considered to be enabled
|
||||
- gochecknoinits
|
||||
- gochecknoglobals # RIP
|
||||
- gochecknoglobals # RIP
|
||||
- dogsled
|
||||
- wrapcheck
|
||||
- paralleltest
|
||||
@ -210,6 +211,15 @@ issues:
|
||||
# - path: internal/app/machined/pkg/system/services
|
||||
# linters:
|
||||
# - dupl
|
||||
exclude-rules:
|
||||
- path: _test\.go
|
||||
linters:
|
||||
- gocyclo
|
||||
- dupl
|
||||
- gosec
|
||||
- funlen
|
||||
- varnamelen
|
||||
- wsl
|
||||
|
||||
# Independently from option `exclude` we use default exclude patterns,
|
||||
# it can be disabled by this option. To list all
|
||||
|
7
.trunk/.gitignore
vendored
Normal file
7
.trunk/.gitignore
vendored
Normal file
@ -0,0 +1,7 @@
|
||||
*out
|
||||
*logs
|
||||
*actions
|
||||
*notifications
|
||||
plugins
|
||||
user_trunk.yaml
|
||||
user.yaml
|
10
.trunk/config/.markdownlint.yaml
Normal file
10
.trunk/config/.markdownlint.yaml
Normal file
@ -0,0 +1,10 @@
|
||||
# Autoformatter friendly markdownlint config (all formatting rules disabled)
|
||||
default: true
|
||||
blank_lines: false
|
||||
bullet: false
|
||||
html: false
|
||||
indentation: false
|
||||
line_length: false
|
||||
spaces: false
|
||||
url: false
|
||||
whitespace: false
|
28
.trunk/trunk.yaml
Normal file
28
.trunk/trunk.yaml
Normal file
@ -0,0 +1,28 @@
|
||||
version: 0.1
|
||||
actions:
|
||||
enabled:
|
||||
- trunk-announce
|
||||
- trunk-cache-prune
|
||||
- trunk-check-pre-push
|
||||
- trunk-fmt-pre-commit
|
||||
- trunk-upgrade-available
|
||||
runtimes:
|
||||
enabled:
|
||||
- go@1.18.3
|
||||
- node@16.14.2
|
||||
lint:
|
||||
enabled:
|
||||
- actionlint@1.6.20
|
||||
- git-diff-check
|
||||
- gitleaks@8.13.0
|
||||
- gofmt@1.16.7
|
||||
- golangci-lint@1.49.0
|
||||
- markdownlint@0.32.2
|
||||
- prettier@2.7.1
|
||||
cli:
|
||||
version: 0.18.1-beta
|
||||
plugins:
|
||||
sources:
|
||||
- id: trunk
|
||||
ref: v0.0.4
|
||||
uri: https://github.com/trunk-io/plugins
|
@ -1,5 +1,4 @@
|
||||
# Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![Go.Dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go-micro.dev/v4?tab=doc) [![Go Report Card](https://goreportcard.com/badge/github.com/go-micro/go-micro)](https://goreportcard.com/report/github.com/go-micro/go-micro) [![](https://dcbadge.vercel.app/api/server/qV3HvnEJfB?style=flat-square&theme=default-inverted)](https://discord.gg/qV3HvnEJfB)
|
||||
|
||||
# Go Micro [![License](https://img.shields.io/:license-apache-blue.svg)](https://opensource.org/licenses/Apache-2.0) [![Go.Dev reference](https://img.shields.io/badge/go.dev-reference-007d9c?logo=go&logoColor=white&style=flat-square)](https://pkg.go.dev/go-micro.dev/v4?tab=doc) [![Go Report Card](https://goreportcard.com/badge/github.com/go-micro/go-micro)](https://goreportcard.com/report/github.com/go-micro/go-micro) [![Discord](https://dcbadge.vercel.app/api/server/qV3HvnEJfB?style=flat-square&theme=default-inverted)](https://discord.gg/qV3HvnEJfB)
|
||||
|
||||
Go Micro is a framework for distributed systems development.
|
||||
|
||||
|
@ -12,7 +12,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/gorilla/websocket"
|
||||
|
||||
"go-micro.dev/v4/logger"
|
||||
)
|
||||
|
||||
|
@ -11,7 +11,6 @@ import (
|
||||
|
||||
jsonpatch "github.com/evanphx/json-patch/v5"
|
||||
"github.com/oxtoacart/bpool"
|
||||
|
||||
"go-micro.dev/v4/api/handler"
|
||||
"go-micro.dev/v4/api/internal/proto"
|
||||
"go-micro.dev/v4/api/router"
|
||||
@ -458,7 +457,6 @@ func requestPayload(r *http.Request) ([]byte, error) {
|
||||
return out, nil
|
||||
}
|
||||
|
||||
//fallback to previous unknown behaviour
|
||||
return bodybuf, nil
|
||||
}
|
||||
|
||||
|
@ -12,7 +12,6 @@ import (
|
||||
"github.com/gobwas/httphead"
|
||||
"github.com/gobwas/ws"
|
||||
"github.com/gobwas/ws/wsutil"
|
||||
|
||||
"go-micro.dev/v4/api/router"
|
||||
"go-micro.dev/v4/client"
|
||||
raw "go-micro.dev/v4/codec/bytes"
|
||||
|
@ -342,6 +342,5 @@ func NewRouter(opts ...router.Option) *staticRouter {
|
||||
eps: make(map[string]*endpoint),
|
||||
}
|
||||
// go r.watch()
|
||||
//go r.refresh()
|
||||
return r
|
||||
}
|
||||
|
@ -7,10 +7,9 @@ import (
|
||||
"net"
|
||||
"os"
|
||||
|
||||
"golang.org/x/crypto/acme/autocert"
|
||||
|
||||
"go-micro.dev/v4/api/server/acme"
|
||||
log "go-micro.dev/v4/logger"
|
||||
"golang.org/x/crypto/acme/autocert"
|
||||
)
|
||||
|
||||
// autoCertACME is the ACME provider from golang.org/x/crypto/acme/autocert.
|
||||
|
@ -2,7 +2,6 @@ package acme
|
||||
|
||||
import (
|
||||
"github.com/go-acme/lego/v4/challenge"
|
||||
|
||||
"go-micro.dev/v4/logger"
|
||||
)
|
||||
|
||||
|
@ -9,7 +9,6 @@ import (
|
||||
"sync"
|
||||
|
||||
"github.com/gorilla/handlers"
|
||||
|
||||
"go-micro.dev/v4/api/server"
|
||||
"go-micro.dev/v4/api/server/cors"
|
||||
log "go-micro.dev/v4/logger"
|
||||
|
@ -4,11 +4,10 @@ import (
|
||||
"crypto/tls"
|
||||
"net/http"
|
||||
|
||||
"go-micro.dev/v4/api/server/cors"
|
||||
"go-micro.dev/v4/logger"
|
||||
|
||||
"go-micro.dev/v4/api/resolver"
|
||||
"go-micro.dev/v4/api/server/acme"
|
||||
"go-micro.dev/v4/api/server/cors"
|
||||
"go-micro.dev/v4/logger"
|
||||
)
|
||||
|
||||
type Option func(o *Options)
|
||||
|
@ -18,6 +18,7 @@ type Broker interface {
|
||||
// message and optional Ack method to acknowledge receipt of the message.
|
||||
type Handler func(Event) error
|
||||
|
||||
// Message is a message send/received from the broker.
|
||||
type Message struct {
|
||||
Header map[string]string
|
||||
Body []byte
|
||||
@ -39,7 +40,8 @@ type Subscriber interface {
|
||||
}
|
||||
|
||||
var (
|
||||
DefaultBroker Broker = NewBroker()
|
||||
// DefaultBroker is the default Broker.
|
||||
DefaultBroker = NewBroker()
|
||||
)
|
||||
|
||||
func Init(opts ...Option) error {
|
||||
@ -62,6 +64,7 @@ func Subscribe(topic string, handler Handler, opts ...SubscribeOption) (Subscrib
|
||||
return DefaultBroker.Subscribe(topic, handler, opts...)
|
||||
}
|
||||
|
||||
// String returns the name of the Broker.
|
||||
func String() string {
|
||||
return DefaultBroker.String()
|
||||
}
|
||||
|
@ -1,4 +1,4 @@
|
||||
// Package http provides a http based message broker
|
||||
// Package broker provides a http based message broker
|
||||
package broker
|
||||
|
||||
import (
|
||||
@ -16,8 +16,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"golang.org/x/net/http2"
|
||||
|
||||
"go-micro.dev/v4/codec/json"
|
||||
merr "go-micro.dev/v4/errors"
|
||||
"go-micro.dev/v4/registry"
|
||||
@ -25,6 +23,7 @@ import (
|
||||
maddr "go-micro.dev/v4/util/addr"
|
||||
mnet "go-micro.dev/v4/util/net"
|
||||
mls "go-micro.dev/v4/util/tls"
|
||||
"golang.org/x/net/http2"
|
||||
)
|
||||
|
||||
// HTTP Broker is a point to point async broker.
|
||||
|
@ -56,19 +56,19 @@ func newTestRegistry() registry.Registry {
|
||||
return registry.NewMemoryRegistry(registry.Services(testData))
|
||||
}
|
||||
|
||||
func sub(be *testing.B, c int) {
|
||||
be.StopTimer()
|
||||
func sub(b *testing.B, c int) {
|
||||
b.StopTimer()
|
||||
m := newTestRegistry()
|
||||
|
||||
b := broker.NewBroker(broker.Registry(m))
|
||||
brker := broker.NewBroker(broker.Registry(m))
|
||||
topic := uuid.New().String()
|
||||
|
||||
if err := b.Init(); err != nil {
|
||||
be.Fatalf("Unexpected init error: %v", err)
|
||||
if err := brker.Init(); err != nil {
|
||||
b.Fatalf("Unexpected init error: %v", err)
|
||||
}
|
||||
|
||||
if err := b.Connect(); err != nil {
|
||||
be.Fatalf("Unexpected connect error: %v", err)
|
||||
if err := brker.Connect(); err != nil {
|
||||
b.Fatalf("Unexpected connect error: %v", err)
|
||||
}
|
||||
|
||||
msg := &broker.Message{
|
||||
@ -82,52 +82,54 @@ func sub(be *testing.B, c int) {
|
||||
done := make(chan bool, c)
|
||||
|
||||
for i := 0; i < c; i++ {
|
||||
sub, err := b.Subscribe(topic, func(p broker.Event) error {
|
||||
sub, err := brker.Subscribe(topic, func(p broker.Event) error {
|
||||
done <- true
|
||||
m := p.Message()
|
||||
|
||||
if string(m.Body) != string(msg.Body) {
|
||||
be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
|
||||
b.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
|
||||
}
|
||||
|
||||
return nil
|
||||
}, broker.Queue("shared"))
|
||||
if err != nil {
|
||||
be.Fatalf("Unexpected subscribe error: %v", err)
|
||||
b.Fatalf("Unexpected subscribe error: %v", err)
|
||||
}
|
||||
subs = append(subs, sub)
|
||||
}
|
||||
|
||||
for i := 0; i < be.N; i++ {
|
||||
be.StartTimer()
|
||||
if err := b.Publish(topic, msg); err != nil {
|
||||
be.Fatalf("Unexpected publish error: %v", err)
|
||||
for i := 0; i < b.N; i++ {
|
||||
b.StartTimer()
|
||||
if err := brker.Publish(topic, msg); err != nil {
|
||||
b.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
<-done
|
||||
be.StopTimer()
|
||||
b.StopTimer()
|
||||
}
|
||||
|
||||
for _, sub := range subs {
|
||||
sub.Unsubscribe()
|
||||
if err := sub.Unsubscribe(); err != nil {
|
||||
b.Fatalf("Unexpected unsubscribe error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := b.Disconnect(); err != nil {
|
||||
be.Fatalf("Unexpected disconnect error: %v", err)
|
||||
if err := brker.Disconnect(); err != nil {
|
||||
b.Fatalf("Unexpected disconnect error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func pub(be *testing.B, c int) {
|
||||
be.StopTimer()
|
||||
func pub(b *testing.B, c int) {
|
||||
b.StopTimer()
|
||||
m := newTestRegistry()
|
||||
b := broker.NewBroker(broker.Registry(m))
|
||||
brk := broker.NewBroker(broker.Registry(m))
|
||||
topic := uuid.New().String()
|
||||
|
||||
if err := b.Init(); err != nil {
|
||||
be.Fatalf("Unexpected init error: %v", err)
|
||||
if err := brk.Init(); err != nil {
|
||||
b.Fatalf("Unexpected init error: %v", err)
|
||||
}
|
||||
|
||||
if err := b.Connect(); err != nil {
|
||||
be.Fatalf("Unexpected connect error: %v", err)
|
||||
if err := brk.Connect(); err != nil {
|
||||
b.Fatalf("Unexpected connect error: %v", err)
|
||||
}
|
||||
|
||||
msg := &broker.Message{
|
||||
@ -139,27 +141,27 @@ func pub(be *testing.B, c int) {
|
||||
|
||||
done := make(chan bool, c*4)
|
||||
|
||||
sub, err := b.Subscribe(topic, func(p broker.Event) error {
|
||||
sub, err := brk.Subscribe(topic, func(p broker.Event) error {
|
||||
done <- true
|
||||
m := p.Message()
|
||||
if string(m.Body) != string(msg.Body) {
|
||||
be.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
|
||||
b.Fatalf("Unexpected msg %s, expected %s", string(m.Body), string(msg.Body))
|
||||
}
|
||||
return nil
|
||||
}, broker.Queue("shared"))
|
||||
if err != nil {
|
||||
be.Fatalf("Unexpected subscribe error: %v", err)
|
||||
b.Fatalf("Unexpected subscribe error: %v", err)
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
ch := make(chan int, c*4)
|
||||
be.StartTimer()
|
||||
b.StartTimer()
|
||||
|
||||
for i := 0; i < c; i++ {
|
||||
go func() {
|
||||
for range ch {
|
||||
if err := b.Publish(topic, msg); err != nil {
|
||||
be.Fatalf("Unexpected publish error: %v", err)
|
||||
if err := brk.Publish(topic, msg); err != nil {
|
||||
b.Fatalf("Unexpected publish error: %v", err)
|
||||
}
|
||||
select {
|
||||
case <-done:
|
||||
@ -170,19 +172,19 @@ func pub(be *testing.B, c int) {
|
||||
}()
|
||||
}
|
||||
|
||||
for i := 0; i < be.N; i++ {
|
||||
for i := 0; i < b.N; i++ {
|
||||
wg.Add(1)
|
||||
ch <- i
|
||||
}
|
||||
|
||||
wg.Wait()
|
||||
be.StopTimer()
|
||||
b.StopTimer()
|
||||
sub.Unsubscribe()
|
||||
close(ch)
|
||||
close(done)
|
||||
|
||||
if err := b.Disconnect(); err != nil {
|
||||
be.Fatalf("Unexpected disconnect error: %v", err)
|
||||
if err := brk.Disconnect(); err != nil {
|
||||
b.Fatalf("Unexpected disconnect error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
@ -226,7 +228,9 @@ func TestBroker(t *testing.T) {
|
||||
}
|
||||
|
||||
<-done
|
||||
sub.Unsubscribe()
|
||||
if err := sub.Unsubscribe(); err != nil {
|
||||
t.Fatalf("Unexpected unsubscribe error: %v", err)
|
||||
}
|
||||
|
||||
if err := b.Disconnect(); err != nil {
|
||||
t.Fatalf("Unexpected disconnect error: %v", err)
|
||||
@ -282,7 +286,9 @@ func TestConcurrentSubBroker(t *testing.T) {
|
||||
wg.Wait()
|
||||
|
||||
for _, sub := range subs {
|
||||
sub.Unsubscribe()
|
||||
if err := sub.Unsubscribe(); err != nil {
|
||||
t.Fatalf("Unexpected unsubscribe error: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
if err := b.Disconnect(); err != nil {
|
||||
@ -336,7 +342,9 @@ func TestConcurrentPubBroker(t *testing.T) {
|
||||
|
||||
wg.Wait()
|
||||
|
||||
sub.Unsubscribe()
|
||||
if err := sub.Unsubscribe(); err != nil {
|
||||
t.Fatalf("Unexpected unsubscribe error: %v", err)
|
||||
}
|
||||
|
||||
if err := b.Disconnect(); err != nil {
|
||||
t.Fatalf("Unexpected disconnect error: %v", err)
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
maddr "go-micro.dev/v4/util/addr"
|
||||
mnet "go-micro.dev/v4/util/net"
|
||||
|
@ -8,17 +8,17 @@ Go Config makes this easy, pluggable and mergeable. You'll never have to deal wi
|
||||
## Features
|
||||
|
||||
- **Dynamic Loading** - Load configuration from multiple source as and when needed. Go Config manages watching config sources
|
||||
in the background and automatically merges and updates an in memory view.
|
||||
in the background and automatically merges and updates an in memory view.
|
||||
|
||||
- **Pluggable Sources** - Choose from any number of sources to load and merge config. The backend source is abstracted away into
|
||||
a standard format consumed internally and decoded via encoders. Sources can be env vars, flags, file, etcd, k8s configmap, etc.
|
||||
a standard format consumed internally and decoded via encoders. Sources can be env vars, flags, file, etcd, k8s configmap, etc.
|
||||
|
||||
- **Mergeable Config** - If you specify multiple sources of config, regardless of format, they will be merged and presented in
|
||||
a single view. This massively simplifies priority order loading and changes based on environment.
|
||||
a single view. This massively simplifies priority order loading and changes based on environment.
|
||||
|
||||
- **Observe Changes** - Optionally watch the config for changes to specific values. Hot reload your app using Go Config's watcher.
|
||||
You don't have to handle ad-hoc hup reloading or whatever else, just keep reading the config and watch for changes if you need
|
||||
to be notified.
|
||||
You don't have to handle ad-hoc hup reloading or whatever else, just keep reading the config and watch for changes if you need
|
||||
to be notified.
|
||||
|
||||
- **Sane Defaults** - In case config loads badly or is completely wiped away for some unknown reason, you can specify fallback
|
||||
values when accessing any config values directly. This ensures you'll always be reading some sane default in the event of a problem.
|
||||
values when accessing any config values directly. This ensures you'll always be reading some sane default in the event of a problem.
|
||||
|
@ -1,4 +1,4 @@
|
||||
// package loader manages loading from multiple sources
|
||||
// Package loader manages loading from multiple sources
|
||||
package loader
|
||||
|
||||
import (
|
||||
@ -42,6 +42,7 @@ type Snapshot struct {
|
||||
Version string
|
||||
}
|
||||
|
||||
// Options contains all options for a config loader.
|
||||
type Options struct {
|
||||
Reader reader.Reader
|
||||
Source []source.Source
|
||||
@ -52,6 +53,7 @@ type Options struct {
|
||||
WithWatcherDisabled bool
|
||||
}
|
||||
|
||||
// Option is a helper for a single option.
|
||||
type Option func(o *Options)
|
||||
|
||||
// Copy snapshot.
|
||||
|
@ -2,11 +2,11 @@
|
||||
package box
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"go-micro.dev/v4/config/secrets"
|
||||
naclbox "golang.org/x/crypto/nacl/box"
|
||||
|
||||
"crypto/rand"
|
||||
)
|
||||
|
||||
const keyLength = 32
|
||||
|
@ -3,11 +3,11 @@
|
||||
package secretbox
|
||||
|
||||
import (
|
||||
"crypto/rand"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
"go-micro.dev/v4/config/secrets"
|
||||
"golang.org/x/crypto/nacl/secretbox"
|
||||
|
||||
"crypto/rand"
|
||||
)
|
||||
|
||||
const keyLength = 32
|
||||
|
@ -27,10 +27,10 @@ Becomes
|
||||
|
||||
```json
|
||||
{
|
||||
"database": {
|
||||
"address": "127.0.0.1",
|
||||
"port": 3306
|
||||
}
|
||||
"database": {
|
||||
"address": "127.0.0.1",
|
||||
"port": 3306
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
42
config/source/env/README.md
vendored
42
config/source/env/README.md
vendored
@ -8,10 +8,9 @@ We expect environment variables to be in the standard format of FOO=bar
|
||||
|
||||
Keys are converted to lowercase and split on underscore.
|
||||
|
||||
### Format example
|
||||
|
||||
### Example
|
||||
|
||||
```
|
||||
```bash
|
||||
DATABASE_ADDRESS=127.0.0.1
|
||||
DATABASE_PORT=3306
|
||||
```
|
||||
@ -20,10 +19,10 @@ Becomes
|
||||
|
||||
```json
|
||||
{
|
||||
"database": {
|
||||
"address": "127.0.0.1",
|
||||
"port": 3306
|
||||
}
|
||||
"database": {
|
||||
"address": "127.0.0.1",
|
||||
"port": 3306
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
@ -31,18 +30,18 @@ Becomes
|
||||
|
||||
Environment variables can be namespaced so we only have access to a subset. Two options are available:
|
||||
|
||||
```
|
||||
```go
|
||||
WithPrefix(p ...string)
|
||||
WithStrippedPrefix(p ...string)
|
||||
```
|
||||
|
||||
The former will preserve the prefix and make it a top level key in the config. The latter eliminates the prefix, reducing the nesting by one.
|
||||
|
||||
#### Example:
|
||||
### Prefixes example
|
||||
|
||||
Given ENVs of:
|
||||
|
||||
```
|
||||
```bash
|
||||
APP_DATABASE_ADDRESS=127.0.0.1
|
||||
APP_DATABASE_PORT=3306
|
||||
VAULT_ADDR=vault:1337
|
||||
@ -50,7 +49,7 @@ VAULT_ADDR=vault:1337
|
||||
|
||||
and a source initialized as follows:
|
||||
|
||||
```
|
||||
```go
|
||||
src := env.NewSource(
|
||||
env.WithPrefix("VAULT"),
|
||||
env.WithStrippedPrefix("APP"),
|
||||
@ -59,27 +58,26 @@ src := env.NewSource(
|
||||
|
||||
The resulting config will be:
|
||||
|
||||
```
|
||||
```json
|
||||
{
|
||||
"database": {
|
||||
"address": "127.0.0.1",
|
||||
"port": 3306
|
||||
},
|
||||
"vault": {
|
||||
"addr": "vault:1337"
|
||||
}
|
||||
"database": {
|
||||
"address": "127.0.0.1",
|
||||
"port": 3306
|
||||
},
|
||||
"vault": {
|
||||
"addr": "vault:1337"
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
||||
## New Source
|
||||
|
||||
Specify source with data
|
||||
|
||||
```go
|
||||
src := env.NewSource(
|
||||
// optionally specify prefix
|
||||
env.WithPrefix("MICRO"),
|
||||
// optionally specify prefix
|
||||
env.WithPrefix("MICRO"),
|
||||
)
|
||||
```
|
||||
|
||||
|
1
config/source/env/options.go
vendored
1
config/source/env/options.go
vendored
@ -2,7 +2,6 @@ package env
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"strings"
|
||||
|
||||
"go-micro.dev/v4/config/source"
|
||||
|
@ -12,16 +12,16 @@ A config file format in json
|
||||
|
||||
```json
|
||||
{
|
||||
"hosts": {
|
||||
"database": {
|
||||
"address": "10.0.0.1",
|
||||
"port": 3306
|
||||
},
|
||||
"cache": {
|
||||
"address": "10.0.0.2",
|
||||
"port": 6379
|
||||
}
|
||||
"hosts": {
|
||||
"database": {
|
||||
"address": "10.0.0.1",
|
||||
"port": 3306
|
||||
},
|
||||
"cache": {
|
||||
"address": "10.0.0.2",
|
||||
"port": 6379
|
||||
}
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
@ -39,7 +39,7 @@ fileSource := file.NewSource(
|
||||
|
||||
To load different file formats e.g yaml, toml, xml simply specify them with their extension
|
||||
|
||||
```
|
||||
```go
|
||||
fileSource := file.NewSource(
|
||||
file.WithPath("/tmp/config.yaml"),
|
||||
)
|
||||
@ -47,12 +47,12 @@ fileSource := file.NewSource(
|
||||
|
||||
If you want to specify a file without extension, ensure you set the encoder to the same format
|
||||
|
||||
```
|
||||
```go
|
||||
e := toml.NewEncoder()
|
||||
|
||||
fileSource := file.NewSource(
|
||||
file.WithPath("/tmp/config"),
|
||||
source.WithEncoder(e),
|
||||
source.WithEncoder(e),
|
||||
)
|
||||
```
|
||||
|
||||
@ -67,4 +67,3 @@ conf := config.NewConfig()
|
||||
// Load file source
|
||||
conf.Load(fileSource)
|
||||
```
|
||||
|
||||
|
@ -8,7 +8,7 @@ We expect the use of the `flag` package. Upper case flags will be lower cased. D
|
||||
|
||||
### Example
|
||||
|
||||
```
|
||||
```go
|
||||
dbAddress := flag.String("database_address", "127.0.0.1", "the db address")
|
||||
dbPort := flag.Int("database_port", 3306, "the db port)
|
||||
```
|
||||
@ -17,10 +17,10 @@ Becomes
|
||||
|
||||
```json
|
||||
{
|
||||
"database": {
|
||||
"address": "127.0.0.1",
|
||||
"port": 3306
|
||||
}
|
||||
"database": {
|
||||
"address": "127.0.0.1",
|
||||
"port": 3306
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
|
@ -69,12 +69,14 @@ func (d *Debug) Trace(ctx context.Context, req *proto.TraceRequest, rsp *proto.T
|
||||
|
||||
for _, t := range traces {
|
||||
var typ proto.SpanType
|
||||
|
||||
switch t.Type {
|
||||
case trace.SpanTypeRequestInbound:
|
||||
typ = proto.SpanType_INBOUND
|
||||
case trace.SpanTypeRequestOutbound:
|
||||
typ = proto.SpanType_OUTBOUND
|
||||
}
|
||||
|
||||
rsp.Spans = append(rsp.Spans, &proto.Span{
|
||||
Trace: t.Trace,
|
||||
Id: t.Id,
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/pkg/errors"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/store"
|
||||
)
|
||||
|
4
registry/cache/README.md
vendored
4
registry/cache/README.md
vendored
@ -6,7 +6,7 @@ If you're looking for caching in your microservices use the [selector](https://m
|
||||
|
||||
## Interface
|
||||
|
||||
```
|
||||
```go
|
||||
// Cache is the registry cache interface
|
||||
type Cache interface {
|
||||
// embed the registry interface
|
||||
@ -18,7 +18,7 @@ type Cache interface {
|
||||
|
||||
## Usage
|
||||
|
||||
```
|
||||
```go
|
||||
import (
|
||||
"github.com/micro/go-micro/registry"
|
||||
"github.com/micro/go-micro/registry/cache"
|
||||
|
3
registry/cache/cache.go
vendored
3
registry/cache/cache.go
vendored
@ -7,11 +7,10 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/sync/singleflight"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/registry"
|
||||
util "go-micro.dev/v4/util/registry"
|
||||
"golang.org/x/sync/singleflight"
|
||||
)
|
||||
|
||||
// Cache is the registry cache interface.
|
||||
|
@ -16,7 +16,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/util/mdns"
|
||||
)
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
)
|
||||
|
||||
|
@ -13,7 +13,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/nxadm/tail"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/runtime/local/git"
|
||||
)
|
||||
|
@ -20,7 +20,6 @@ import (
|
||||
|
||||
"go-micro.dev/v4/codec"
|
||||
merrors "go-micro.dev/v4/errors"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
)
|
||||
|
||||
|
@ -15,7 +15,6 @@ import (
|
||||
"go-micro.dev/v4/broker"
|
||||
"go-micro.dev/v4/codec"
|
||||
raw "go-micro.dev/v4/codec/bytes"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/metadata"
|
||||
"go-micro.dev/v4/registry"
|
||||
|
@ -8,7 +8,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
|
||||
"go-micro.dev/v4/codec"
|
||||
log "go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/registry"
|
||||
|
@ -22,6 +22,7 @@ func getURL(addr string) (*url.URL, error) {
|
||||
Host: addr,
|
||||
},
|
||||
}
|
||||
|
||||
return http.ProxyFromEnvironment(r)
|
||||
}
|
||||
|
||||
@ -37,6 +38,7 @@ func (p *pbuffer) Read(b []byte) (int, error) {
|
||||
func proxyDial(conn net.Conn, addr string, proxyURL *url.URL) (_ net.Conn, err error) {
|
||||
defer func() {
|
||||
if err != nil {
|
||||
// trunk-ignore(golangci-lint/errcheck)
|
||||
conn.Close()
|
||||
}
|
||||
}()
|
||||
@ -56,20 +58,26 @@ func proxyDial(conn net.Conn, addr string, proxyURL *url.URL) (_ net.Conn, err e
|
||||
}
|
||||
|
||||
if err := r.Write(conn); err != nil {
|
||||
return nil, fmt.Errorf("failed to write the HTTP request: %v", err)
|
||||
return nil, fmt.Errorf("failed to write the HTTP request: %w", err)
|
||||
}
|
||||
|
||||
br := bufio.NewReader(conn)
|
||||
|
||||
rsp, err := http.ReadResponse(br, r)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("reading server HTTP response: %v", err)
|
||||
return nil, fmt.Errorf("reading server HTTP response: %w", err)
|
||||
}
|
||||
defer rsp.Body.Close()
|
||||
|
||||
defer func() {
|
||||
err = rsp.Body.Close()
|
||||
}()
|
||||
|
||||
if rsp.StatusCode != http.StatusOK {
|
||||
dump, err := httputil.DumpResponse(rsp, true)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to do connect handshake, status code: %s", rsp.Status)
|
||||
}
|
||||
|
||||
return nil, fmt.Errorf("failed to do connect handshake, response: %q", dump)
|
||||
}
|
||||
|
||||
|
@ -12,13 +12,12 @@ import (
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/h2c"
|
||||
|
||||
maddr "go-micro.dev/v4/util/addr"
|
||||
"go-micro.dev/v4/util/buf"
|
||||
mnet "go-micro.dev/v4/util/net"
|
||||
mls "go-micro.dev/v4/util/tls"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/h2c"
|
||||
)
|
||||
|
||||
type httpTransport struct {
|
||||
@ -92,7 +91,7 @@ func (h *httpTransportClient) Send(m *Message) error {
|
||||
defer b.Close()
|
||||
|
||||
req := &http.Request{
|
||||
Method: "POST",
|
||||
Method: http.MethodPost,
|
||||
URL: &url.URL{
|
||||
Scheme: "http",
|
||||
Host: h.addr,
|
||||
@ -109,7 +108,9 @@ func (h *httpTransportClient) Send(m *Message) error {
|
||||
h.Unlock()
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
h.bl = append(h.bl, req)
|
||||
|
||||
select {
|
||||
case h.r <- h.bl[0]:
|
||||
h.bl = h.bl[1:]
|
||||
@ -120,18 +121,22 @@ func (h *httpTransportClient) Send(m *Message) error {
|
||||
|
||||
// set timeout if its greater than 0
|
||||
if h.ht.opts.Timeout > time.Duration(0) {
|
||||
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
|
||||
if err := h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return req.Write(h.conn)
|
||||
}
|
||||
|
||||
func (h *httpTransportClient) Recv(m *Message) error {
|
||||
if m == nil {
|
||||
// Recv receives a message.
|
||||
func (h *httpTransportClient) Recv(msg *Message) error {
|
||||
if msg == nil {
|
||||
return errors.New("message passed in is nil")
|
||||
}
|
||||
|
||||
var r *http.Request
|
||||
var req *http.Request
|
||||
|
||||
if !h.dialOpts.Stream {
|
||||
rc, ok := <-h.r
|
||||
if !ok {
|
||||
@ -140,27 +145,34 @@ func (h *httpTransportClient) Recv(m *Message) error {
|
||||
h.Unlock()
|
||||
return io.EOF
|
||||
}
|
||||
|
||||
rc = h.bl[0]
|
||||
h.bl = h.bl[1:]
|
||||
h.Unlock()
|
||||
}
|
||||
r = rc
|
||||
|
||||
req = rc
|
||||
}
|
||||
|
||||
// set timeout if its greater than 0
|
||||
if h.ht.opts.Timeout > time.Duration(0) {
|
||||
h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout))
|
||||
if err := h.conn.SetDeadline(time.Now().Add(h.ht.opts.Timeout)); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
h.Lock()
|
||||
defer h.Unlock()
|
||||
|
||||
if h.closed {
|
||||
return io.EOF
|
||||
}
|
||||
rsp, err := http.ReadResponse(h.buff, r)
|
||||
|
||||
rsp, err := http.ReadResponse(h.buff, req)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer rsp.Body.Close()
|
||||
|
||||
b, err := io.ReadAll(rsp.Body)
|
||||
@ -168,21 +180,21 @@ func (h *httpTransportClient) Recv(m *Message) error {
|
||||
return err
|
||||
}
|
||||
|
||||
if rsp.StatusCode != 200 {
|
||||
if rsp.StatusCode != http.StatusOK {
|
||||
return errors.New(rsp.Status + ": " + string(b))
|
||||
}
|
||||
|
||||
m.Body = b
|
||||
msg.Body = b
|
||||
|
||||
if m.Header == nil {
|
||||
m.Header = make(map[string]string, len(rsp.Header))
|
||||
if msg.Header == nil {
|
||||
msg.Header = make(map[string]string, len(rsp.Header))
|
||||
}
|
||||
|
||||
for k, v := range rsp.Header {
|
||||
if len(v) > 0 {
|
||||
m.Header[k] = v[0]
|
||||
msg.Header[k] = v[0]
|
||||
} else {
|
||||
m.Header[k] = ""
|
||||
msg.Header[k] = ""
|
||||
}
|
||||
}
|
||||
|
||||
@ -198,8 +210,10 @@ func (h *httpTransportClient) Close() error {
|
||||
h.Unlock()
|
||||
close(h.r)
|
||||
})
|
||||
|
||||
return h.conn.Close()
|
||||
}
|
||||
|
||||
err := h.conn.Close()
|
||||
h.once.Do(func() {
|
||||
h.Lock()
|
||||
@ -208,6 +222,7 @@ func (h *httpTransportClient) Close() error {
|
||||
h.Unlock()
|
||||
close(h.r)
|
||||
})
|
||||
|
||||
return err
|
||||
}
|
||||
|
||||
@ -219,12 +234,13 @@ func (h *httpTransportSocket) Remote() string {
|
||||
return h.remote
|
||||
}
|
||||
|
||||
func (h *httpTransportSocket) Recv(m *Message) error {
|
||||
if m == nil {
|
||||
func (h *httpTransportSocket) Recv(msg *Message) error {
|
||||
if msg == nil {
|
||||
return errors.New("message passed in is nil")
|
||||
}
|
||||
if m.Header == nil {
|
||||
m.Header = make(map[string]string, len(h.r.Header))
|
||||
|
||||
if msg.Header == nil {
|
||||
msg.Header = make(map[string]string, len(h.r.Header))
|
||||
}
|
||||
|
||||
// process http 1
|
||||
@ -245,6 +261,7 @@ func (h *httpTransportSocket) Recv(m *Message) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
r = rr
|
||||
}
|
||||
|
||||
@ -256,14 +273,15 @@ func (h *httpTransportSocket) Recv(m *Message) error {
|
||||
|
||||
// set body
|
||||
r.Body.Close()
|
||||
m.Body = b
|
||||
|
||||
msg.Body = b
|
||||
|
||||
// set headers
|
||||
for k, v := range r.Header {
|
||||
if len(v) > 0 {
|
||||
m.Header[k] = v[0]
|
||||
msg.Header[k] = v[0]
|
||||
} else {
|
||||
m.Header[k] = ""
|
||||
msg.Header[k] = ""
|
||||
}
|
||||
}
|
||||
|
||||
@ -295,25 +313,25 @@ func (h *httpTransportSocket) Recv(m *Message) error {
|
||||
|
||||
// check if we have data
|
||||
if n > 0 {
|
||||
m.Body = buf[:n]
|
||||
msg.Body = buf[:n]
|
||||
}
|
||||
|
||||
// set headers
|
||||
for k, v := range h.r.Header {
|
||||
if len(v) > 0 {
|
||||
m.Header[k] = v[0]
|
||||
msg.Header[k] = v[0]
|
||||
} else {
|
||||
m.Header[k] = ""
|
||||
msg.Header[k] = ""
|
||||
}
|
||||
}
|
||||
|
||||
// set path
|
||||
m.Header[":path"] = h.r.URL.Path
|
||||
msg.Header[":path"] = h.r.URL.Path
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (h *httpTransportSocket) Send(m *Message) error {
|
||||
func (h *httpTransportSocket) Send(msg *Message) error {
|
||||
if h.r.ProtoMajor == 1 {
|
||||
// make copy of header
|
||||
hdr := make(http.Header)
|
||||
@ -323,16 +341,16 @@ func (h *httpTransportSocket) Send(m *Message) error {
|
||||
|
||||
rsp := &http.Response{
|
||||
Header: hdr,
|
||||
Body: io.NopCloser(bytes.NewReader(m.Body)),
|
||||
Body: io.NopCloser(bytes.NewReader(msg.Body)),
|
||||
Status: "200 OK",
|
||||
StatusCode: 200,
|
||||
StatusCode: http.StatusOK,
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
ContentLength: int64(len(m.Body)),
|
||||
ContentLength: int64(len(msg.Body)),
|
||||
}
|
||||
|
||||
for k, v := range m.Header {
|
||||
for k, v := range msg.Header {
|
||||
rsp.Header.Set(k, v)
|
||||
}
|
||||
|
||||
@ -357,12 +375,12 @@ func (h *httpTransportSocket) Send(m *Message) error {
|
||||
defer h.mtx.RUnlock()
|
||||
|
||||
// set headers
|
||||
for k, v := range m.Header {
|
||||
for k, v := range msg.Header {
|
||||
h.w.Header().Set(k, v)
|
||||
}
|
||||
|
||||
// write request
|
||||
_, err := h.w.Write(m.Body)
|
||||
_, err := h.w.Write(msg.Body)
|
||||
|
||||
// flush the trailers
|
||||
h.w.(http.Flusher).Flush()
|
||||
@ -376,7 +394,7 @@ func (h *httpTransportSocket) error(m *Message) error {
|
||||
Header: make(http.Header),
|
||||
Body: io.NopCloser(bytes.NewReader(m.Body)),
|
||||
Status: "500 Internal Server Error",
|
||||
StatusCode: 500,
|
||||
StatusCode: http.StatusInternalServerError,
|
||||
Proto: "HTTP/1.1",
|
||||
ProtoMajor: 1,
|
||||
ProtoMinor: 1,
|
||||
@ -428,29 +446,29 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error {
|
||||
mux := http.NewServeMux()
|
||||
|
||||
// register our transport handler
|
||||
mux.HandleFunc("/", func(w http.ResponseWriter, r *http.Request) {
|
||||
mux.HandleFunc("/", func(rsp http.ResponseWriter, req *http.Request) {
|
||||
var buf *bufio.ReadWriter
|
||||
var con net.Conn
|
||||
|
||||
// read a regular request
|
||||
if r.ProtoMajor == 1 {
|
||||
b, err := io.ReadAll(r.Body)
|
||||
if req.ProtoMajor == 1 {
|
||||
b, err := io.ReadAll(req.Body)
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
http.Error(rsp, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
r.Body = io.NopCloser(bytes.NewReader(b))
|
||||
req.Body = io.NopCloser(bytes.NewReader(b))
|
||||
// hijack the conn
|
||||
hj, ok := w.(http.Hijacker)
|
||||
hj, ok := rsp.(http.Hijacker)
|
||||
if !ok {
|
||||
// we're screwed
|
||||
http.Error(w, "cannot serve conn", http.StatusInternalServerError)
|
||||
http.Error(rsp, "cannot serve conn", http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
|
||||
conn, bufrw, err := hj.Hijack()
|
||||
if err != nil {
|
||||
http.Error(w, err.Error(), http.StatusInternalServerError)
|
||||
http.Error(rsp, err.Error(), http.StatusInternalServerError)
|
||||
return
|
||||
}
|
||||
defer conn.Close()
|
||||
@ -459,23 +477,23 @@ func (h *httpTransportListener) Accept(fn func(Socket)) error {
|
||||
}
|
||||
|
||||
// buffered reader
|
||||
bufr := bufio.NewReader(r.Body)
|
||||
bufr := bufio.NewReader(req.Body)
|
||||
|
||||
// save the request
|
||||
ch := make(chan *http.Request, 1)
|
||||
ch <- r
|
||||
ch <- req
|
||||
|
||||
// create a new transport socket
|
||||
sock := &httpTransportSocket{
|
||||
ht: h.ht,
|
||||
w: w,
|
||||
r: r,
|
||||
w: rsp,
|
||||
r: req,
|
||||
rw: buf,
|
||||
buf: bufr,
|
||||
ch: ch,
|
||||
conn: con,
|
||||
local: h.Addr(),
|
||||
remote: r.RemoteAddr,
|
||||
remote: req.RemoteAddr,
|
||||
closed: make(chan bool),
|
||||
}
|
||||
|
||||
@ -516,8 +534,10 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
|
||||
opt(&dopts)
|
||||
}
|
||||
|
||||
var conn net.Conn
|
||||
var err error
|
||||
var (
|
||||
conn net.Conn
|
||||
err error
|
||||
)
|
||||
|
||||
// TODO: support dial option here rather than using internal config
|
||||
if h.opts.Secure || h.opts.TLSConfig != nil {
|
||||
@ -527,7 +547,9 @@ func (h *httpTransport) Dial(addr string, opts ...DialOption) (Client, error) {
|
||||
InsecureSkipVerify: true,
|
||||
}
|
||||
}
|
||||
|
||||
config.NextProtos = []string{"http/1.1"}
|
||||
|
||||
conn, err = newConn(func(addr string) (net.Conn, error) {
|
||||
return tls.DialWithDialer(&net.Dialer{Timeout: dopts.Timeout}, "tcp", addr, config)
|
||||
})(addr)
|
||||
@ -559,15 +581,17 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
|
||||
o(&options)
|
||||
}
|
||||
|
||||
var l net.Listener
|
||||
var err error
|
||||
var (
|
||||
list net.Listener
|
||||
err error
|
||||
)
|
||||
|
||||
if listener := getNetListener(&options); listener != nil {
|
||||
fn := func(addr string) (net.Listener, error) {
|
||||
return listener, nil
|
||||
}
|
||||
|
||||
l, err = mnet.Listen(addr, fn)
|
||||
list, err = mnet.Listen(addr, fn)
|
||||
} else if h.opts.Secure || h.opts.TLSConfig != nil {
|
||||
config := h.opts.TLSConfig
|
||||
|
||||
@ -594,13 +618,13 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
|
||||
return tls.Listen("tcp", addr, config)
|
||||
}
|
||||
|
||||
l, err = mnet.Listen(addr, fn)
|
||||
list, err = mnet.Listen(addr, fn)
|
||||
} else {
|
||||
fn := func(addr string) (net.Listener, error) {
|
||||
return net.Listen("tcp", addr)
|
||||
}
|
||||
|
||||
l, err = mnet.Listen(addr, fn)
|
||||
list, err = mnet.Listen(addr, fn)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
@ -609,7 +633,7 @@ func (h *httpTransport) Listen(addr string, opts ...ListenOption) (Listener, err
|
||||
|
||||
return &httpTransportListener{
|
||||
ht: h,
|
||||
listener: l,
|
||||
listener: list,
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -617,6 +641,7 @@ func (h *httpTransport) Init(opts ...Option) error {
|
||||
for _, o := range opts {
|
||||
o(&h.opts)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -633,5 +658,6 @@ func NewHTTPTransport(opts ...Option) *httpTransport {
|
||||
for _, o := range opts {
|
||||
o(&options)
|
||||
}
|
||||
|
||||
return &httpTransport{opts: options}
|
||||
}
|
||||
|
@ -16,6 +16,7 @@ type Transport interface {
|
||||
String() string
|
||||
}
|
||||
|
||||
// Message is a broker message.
|
||||
type Message struct {
|
||||
Header map[string]string
|
||||
Body []byte
|
||||
|
@ -6,12 +6,11 @@ import (
|
||||
"path/filepath"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
|
||||
"go-micro.dev/v4/errors"
|
||||
log "go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/server"
|
||||
proto "go-micro.dev/v4/util/file/proto"
|
||||
"golang.org/x/net/context"
|
||||
)
|
||||
|
||||
// NewHandler is a handler that can be registered with a micro Server.
|
||||
|
@ -9,10 +9,9 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/miekg/dns"
|
||||
"go-micro.dev/v4/logger"
|
||||
"golang.org/x/net/ipv4"
|
||||
"golang.org/x/net/ipv6"
|
||||
|
||||
"go-micro.dev/v4/logger"
|
||||
)
|
||||
|
||||
// ServiceEntry is returned after we query for a service.
|
||||
|
@ -24,6 +24,7 @@ func mergeMap(a map[string]interface{}, b map[string]interface{}) map[string]int
|
||||
a[bK] = bV
|
||||
}
|
||||
}
|
||||
|
||||
return a
|
||||
}
|
||||
|
||||
|
@ -16,13 +16,13 @@ import (
|
||||
var (
|
||||
// ErrInvalidParam is returned when invalid data is provided to the ToJSON or Unmarshal function.
|
||||
// Specifically, this will be returned when there is no equals sign present in the URL query parameter.
|
||||
ErrInvalidParam error = errors.New("qson: invalid url query param provided")
|
||||
ErrInvalidParam = errors.New("qson: invalid url query param provided")
|
||||
|
||||
bracketSplitter *regexp.Regexp
|
||||
)
|
||||
|
||||
func init() {
|
||||
bracketSplitter = regexp.MustCompile("\\[|\\]")
|
||||
bracketSplitter = regexp.MustCompile(`\[|\]`)
|
||||
}
|
||||
|
||||
// Unmarshal will take a dest along with URL
|
||||
@ -41,6 +41,7 @@ func Unmarshal(dst interface{}, query string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return json.Unmarshal(b, dst)
|
||||
}
|
||||
|
||||
@ -56,14 +57,18 @@ func ToJSON(query string) ([]byte, error) {
|
||||
var (
|
||||
builder interface{} = make(map[string]interface{})
|
||||
)
|
||||
|
||||
params := strings.Split(query, "&")
|
||||
|
||||
for _, part := range params {
|
||||
tempMap, err := queryToMap(part)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
builder = merge(builder, tempMap)
|
||||
}
|
||||
|
||||
return json.Marshal(builder)
|
||||
}
|
||||
|
||||
@ -81,10 +86,12 @@ func queryToMap(param string) (map[string]interface{}, error) {
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rawValue, err = url.QueryUnescape(rawValue)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
rawKey, err = url.QueryUnescape(rawKey)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -108,6 +115,7 @@ func queryToMap(param string) (map[string]interface{}, error) {
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
|
||||
return map[string]interface{}{
|
||||
key: value,
|
||||
}, nil
|
||||
@ -120,6 +128,7 @@ func queryToMap(param string) (map[string]interface{}, error) {
|
||||
// and then we set {"a": queryToMap("b[c]", value)}
|
||||
ret := make(map[string]interface{}, 0)
|
||||
ret[key], err = queryToMap(buildNewKey(rawKey) + "=" + rawValue)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@ -132,6 +141,7 @@ func queryToMap(param string) (map[string]interface{}, error) {
|
||||
temp := ret[key].(map[string]interface{})
|
||||
ret[key] = []interface{}{temp[""]}
|
||||
}
|
||||
|
||||
return ret, nil
|
||||
}
|
||||
|
||||
@ -143,6 +153,7 @@ func buildNewKey(origKey string) string {
|
||||
pieces := bracketSplitter.Split(origKey, -1)
|
||||
ret := origKey[len(pieces[0])+1:]
|
||||
ret = ret[:len(pieces[1])] + ret[len(pieces[1])+1:]
|
||||
|
||||
return ret
|
||||
}
|
||||
|
||||
@ -154,5 +165,6 @@ func splitKeyAndValue(param string) (string, string, error) {
|
||||
if li == -1 {
|
||||
return "", "", ErrInvalidParam
|
||||
}
|
||||
|
||||
return param[:li], param[li+1:], nil
|
||||
}
|
||||
|
@ -92,6 +92,7 @@ func Icon(ico string) Option {
|
||||
if o.Metadata == nil {
|
||||
o.Metadata = make(map[string]string)
|
||||
}
|
||||
|
||||
o.Metadata["icon"] = ico
|
||||
}
|
||||
}
|
||||
@ -224,7 +225,8 @@ func AfterStop(fn func() error) Option {
|
||||
}
|
||||
}
|
||||
|
||||
// Secure Use secure communication. If TLSConfig is not specified we use InsecureSkipVerify and generate a self signed cert.
|
||||
// Secure Use secure communication.
|
||||
// If TLSConfig is not specified we use InsecureSkipVerify and generate a self signed cert.
|
||||
func Secure(b bool) Option {
|
||||
return func(o *Options) {
|
||||
o.Secure = b
|
||||
|
@ -13,9 +13,7 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/urfave/cli/v2"
|
||||
|
||||
"go-micro.dev/v4"
|
||||
|
||||
log "go-micro.dev/v4/logger"
|
||||
"go-micro.dev/v4/registry"
|
||||
maddr "go-micro.dev/v4/util/addr"
|
||||
@ -48,13 +46,16 @@ func newService(opts ...Option) Service {
|
||||
ex: make(chan bool),
|
||||
}
|
||||
s.srv = s.genSrv()
|
||||
|
||||
return s
|
||||
}
|
||||
|
||||
func (s *service) genSrv() *registry.Service {
|
||||
var host string
|
||||
var port string
|
||||
var err error
|
||||
var (
|
||||
host string
|
||||
port string
|
||||
err error
|
||||
)
|
||||
|
||||
logger := s.opts.Logger
|
||||
|
||||
@ -155,10 +156,12 @@ func (s *service) register() error {
|
||||
regErr = err
|
||||
// backoff then retry
|
||||
time.Sleep(backoff.Do(i + 1))
|
||||
|
||||
continue
|
||||
}
|
||||
// success so nil error
|
||||
regErr = nil
|
||||
|
||||
break
|
||||
}
|
||||
|
||||
@ -178,6 +181,7 @@ func (s *service) deregister() error {
|
||||
if s.opts.Registry != nil {
|
||||
r = s.opts.Registry
|
||||
}
|
||||
|
||||
return r.Deregister(s.srv)
|
||||
}
|
||||
|
||||
@ -195,24 +199,24 @@ func (s *service) start() error {
|
||||
}
|
||||
}
|
||||
|
||||
l, err := s.listen("tcp", s.opts.Address)
|
||||
listener, err := s.listen("tcp", s.opts.Address)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
logger := s.opts.Logger
|
||||
|
||||
s.opts.Address = l.Addr().String()
|
||||
s.opts.Address = listener.Addr().String()
|
||||
srv := s.genSrv()
|
||||
srv.Endpoints = s.srv.Endpoints
|
||||
s.srv = srv
|
||||
|
||||
var h http.Handler
|
||||
var handler http.Handler
|
||||
|
||||
if s.opts.Handler != nil {
|
||||
h = s.opts.Handler
|
||||
handler = s.opts.Handler
|
||||
} else {
|
||||
h = s.mux
|
||||
handler = s.mux
|
||||
var r sync.Once
|
||||
|
||||
// register the html dir
|
||||
@ -242,9 +246,9 @@ func (s *service) start() error {
|
||||
httpSrv = &http.Server{}
|
||||
}
|
||||
|
||||
httpSrv.Handler = h
|
||||
httpSrv.Handler = handler
|
||||
|
||||
go httpSrv.Serve(l)
|
||||
go httpSrv.Serve(listener)
|
||||
|
||||
for _, fn := range s.opts.AfterStart {
|
||||
if err := fn(); err != nil {
|
||||
@ -257,10 +261,11 @@ func (s *service) start() error {
|
||||
|
||||
go func() {
|
||||
ch := <-s.exit
|
||||
ch <- l.Close()
|
||||
ch <- listener.Close()
|
||||
}()
|
||||
|
||||
logger.Logf(log.InfoLevel, "Listening on %v", l.Addr().String())
|
||||
logger.Logf(log.InfoLevel, "Listening on %v", listener.Addr().String())
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -289,6 +294,7 @@ func (s *service) stop() error {
|
||||
if chErr := <-ch; chErr != nil {
|
||||
return chErr
|
||||
}
|
||||
|
||||
return err
|
||||
}
|
||||
}
|
||||
@ -338,6 +344,7 @@ func (s *service) Handle(pattern string, handler http.Handler) {
|
||||
|
||||
func (s *service) HandleFunc(pattern string, handler func(http.ResponseWriter, *http.Request)) {
|
||||
var seen bool
|
||||
|
||||
s.RLock()
|
||||
for _, ep := range s.srv.Endpoints {
|
||||
if ep.Name == pattern {
|
||||
@ -428,7 +435,9 @@ func (s *service) Init(opts ...Option) error {
|
||||
if s.opts.Service.Name() == "" {
|
||||
serviceOpts = append(serviceOpts, micro.Name(s.opts.Name))
|
||||
}
|
||||
|
||||
serviceOpts = append(serviceOpts, micro.Version(s.opts.Version))
|
||||
|
||||
s.RUnlock()
|
||||
|
||||
s.opts.Service.Init(serviceOpts...)
|
||||
@ -484,6 +493,7 @@ func (s *service) Run() error {
|
||||
if err := s.opts.Service.Options().Profile.Start(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
defer func() {
|
||||
if err := s.opts.Service.Options().Profile.Stop(); err != nil {
|
||||
logger.Log(log.ErrorLevel, err)
|
||||
@ -528,8 +538,10 @@ func (s *service) Options() Options {
|
||||
}
|
||||
|
||||
func (s *service) listen(network, addr string) (net.Listener, error) {
|
||||
var l net.Listener
|
||||
var err error
|
||||
var (
|
||||
listener net.Listener
|
||||
err error
|
||||
)
|
||||
|
||||
// TODO: support use of listen options
|
||||
if s.opts.Secure || s.opts.TLSConfig != nil {
|
||||
@ -555,21 +567,22 @@ func (s *service) listen(network, addr string) (net.Listener, error) {
|
||||
}
|
||||
config = &tls.Config{Certificates: []tls.Certificate{cert}}
|
||||
}
|
||||
|
||||
return tls.Listen(network, addr, config)
|
||||
}
|
||||
|
||||
l, err = mnet.Listen(addr, fn)
|
||||
listener, err = mnet.Listen(addr, fn)
|
||||
} else {
|
||||
fn := func(addr string) (net.Listener, error) {
|
||||
return net.Listen(network, addr)
|
||||
}
|
||||
|
||||
l, err = mnet.Listen(addr, fn)
|
||||
listener, err = mnet.Listen(addr, fn)
|
||||
}
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return l, nil
|
||||
return listener, nil
|
||||
}
|
||||
|
@ -2,7 +2,6 @@ package web_test
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
@ -15,7 +14,6 @@ import (
|
||||
|
||||
func TestWeb(t *testing.T) {
|
||||
for i := 0; i < 10; i++ {
|
||||
fmt.Println("Test nr", i)
|
||||
testFunc()
|
||||
}
|
||||
}
|
||||
@ -24,7 +22,7 @@ func testFunc() {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), time.Millisecond*250)
|
||||
defer cancel()
|
||||
|
||||
s := micro.NewService(
|
||||
service := micro.NewService(
|
||||
micro.Name("test"),
|
||||
micro.Context(ctx),
|
||||
micro.HandleSignal(false),
|
||||
@ -44,18 +42,18 @@ func testFunc() {
|
||||
),
|
||||
)
|
||||
w := web.NewService(
|
||||
web.MicroService(s),
|
||||
web.MicroService(service),
|
||||
web.Context(ctx),
|
||||
web.HandleSignal(false),
|
||||
)
|
||||
// s.Init()
|
||||
//w.Init()
|
||||
// w.Init()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
err := s.Run()
|
||||
err := service.Run()
|
||||
if err != nil {
|
||||
logger.Logf(logger.ErrorLevel, "micro run error: %v", err)
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user