1
0
mirror of https://github.com/go-micro/go-micro.git synced 2025-01-05 10:20:53 +02:00
go-micro/tests/service.go

396 lines
11 KiB
Go
Raw Normal View History

// Package tests implements a testing framwork, and provides default tests.
package tests
import (
"context"
"fmt"
"sync"
"testing"
"time"
"github.com/pkg/errors"
proto "github.com/go-micro/plugins/v4/server/grpc/proto"
"go-micro.dev/v4"
"go-micro.dev/v4/client"
"go-micro.dev/v4/debug/handler"
pb "go-micro.dev/v4/debug/proto"
)
var (
// ErrNoTests returns no test params are set.
ErrNoTests = errors.New("No tests to run, all values set to 0")
testTopic = "Test-Topic"
errorTopic = "Error-Topic"
)
type parTest func(name string, c client.Client, p, s int, errChan chan error)
type testFunc func(name string, c client.Client, errChan chan error)
// ServiceTestConfig allows you to easily test a service configuration by
// running predefined tests against your custom service. You only need to
// provide a function to create the service, and how many of which test you
// want to run.
//
// The default tests provided, all running with separate parallel routines are:
// - Sequential Call requests
// - Bi-directional streaming
// - Pub/Sub events brokering
//
// You can provide an array of parallel routines to run for the request and
// stream tests. They will be run as matrix tests, so with each possible combination.
// Thus, in total (p * seq) + (p * streams) tests will be run.
type ServiceTestConfig struct {
// Service name to use for the tests
Name string
// NewService function will be called to setup the new service.
// It takes in a list of options, which by default will Context and an
// AfterStart with channel to signal when the service has been started.
NewService func(name string, opts ...micro.Option) (micro.Service, error)
// Parallel is the number of prallell routines to use for the tests.
Parallel []int
// Sequential is the number of sequential requests to send per parallel process.
Sequential []int
// Streams is the nummber of streaming messages to send over the stream per routine.
Streams []int
// PubSub is the number of times to publish messages to the broker per routine.
PubSub []int
mu sync.Mutex
msgCount int
}
// Run will start the benchmark tests.
func (stc *ServiceTestConfig) Run(b *testing.B) {
if err := stc.validate(); err != nil {
b.Fatal("Failed to validate config", err)
}
// Run routines with sequential requests
stc.prepBench(b, "req", stc.runParSeqTest, stc.Sequential)
// Run routines with streams
stc.prepBench(b, "streams", stc.runParStreamTest, stc.Streams)
// Run routines with pub/sub
stc.prepBench(b, "pubsub", stc.runBrokerTest, stc.PubSub)
}
// prepBench will prepare the benmark by setting the right parameters,
// and invoking the test.
func (stc *ServiceTestConfig) prepBench(b *testing.B, tName string, test parTest, seq []int) {
par := stc.Parallel
// No requests needed
if len(seq) == 0 || seq[0] == 0 {
return
}
for _, parallel := range par {
for _, sequential := range seq {
// Create the service name for the test
name := fmt.Sprintf("%s.%dp-%d%s", stc.Name, parallel, sequential, tName)
// Run test with parallel routines making each sequential requests
test := func(name string, c client.Client, errChan chan error) {
test(name, c, parallel, sequential, errChan)
}
benchmark := func(b *testing.B) {
b.ReportAllocs()
stc.runBench(b, name, test)
}
b.Logf("----------- STARTING TEST %s -----------", name)
// Run test, return if it fails
if !b.Run(name, benchmark) {
return
}
}
}
}
// runParSeqTest will make s sequential requests in p parallel routines.
func (stc *ServiceTestConfig) runParSeqTest(name string, c client.Client, p, s int, errChan chan error) {
testParallel(p, func() {
// Make serial requests
for z := 0; z < s; z++ {
if err := testRequest(context.Background(), c, name); err != nil {
errChan <- errors.Wrapf(err, "[%s] Request failed during testRequest", name)
return
}
}
})
}
// Handle is used as a test handler.
func (stc *ServiceTestConfig) Handle(ctx context.Context, msg *proto.Request) error {
stc.mu.Lock()
stc.msgCount++
stc.mu.Unlock()
return nil
}
// HandleError is used as a test handler.
func (stc *ServiceTestConfig) HandleError(ctx context.Context, msg *proto.Request) error {
return errors.New("dummy error")
}
// runBrokerTest will publish messages to the broker to test pub/sub.
func (stc *ServiceTestConfig) runBrokerTest(name string, c client.Client, p, s int, errChan chan error) {
stc.msgCount = 0
testParallel(p, func() {
for z := 0; z < s; z++ {
msg := pb.BusMsg{Msg: "Hello from broker!"}
if err := c.Publish(context.Background(), c.NewMessage(testTopic, &msg)); err != nil {
errChan <- errors.Wrap(err, "failed to publish message to broker")
return
}
msg = pb.BusMsg{Msg: "Some message that will error"}
if err := c.Publish(context.Background(), c.NewMessage(errorTopic, &msg)); err == nil {
errChan <- errors.New("Publish is supposed to return an error, but got no error")
return
}
}
})
if stc.msgCount != s*p {
errChan <- fmt.Errorf("pub/sub does not work properly, invalid message count. Expected %d messaged, but received %d", s*p, stc.msgCount)
return
}
}
// runParStreamTest will start streaming, and send s messages parallel in p routines.
func (stc *ServiceTestConfig) runParStreamTest(name string, c client.Client, p, s int, errChan chan error) {
testParallel(p, func() {
// Create a client service
srv := pb.NewDebugService(name, c)
// Establish a connection to server over which we start streaming
bus, err := srv.MessageBus(context.Background())
if err != nil {
errChan <- errors.Wrap(err, "failed to connect to message bus")
return
}
// Start streaming requests
for z := 0; z < s; z++ {
if err := bus.Send(&pb.BusMsg{Msg: "Hack the world!"}); err != nil {
errChan <- errors.Wrap(err, "failed to send to stream")
return
}
msg, err := bus.Recv()
if err != nil {
errChan <- errors.Wrap(err, "failed to receive message from stream")
return
}
expected := "Request received!"
if msg.Msg != expected {
errChan <- fmt.Errorf("stream returned unexpected mesage. Expected '%s', but got '%s'", expected, msg.Msg)
return
}
}
})
}
// validate will make sure the provided test parameters are a legal combination.
func (stc *ServiceTestConfig) validate() error {
lp, lseq, lstr := len(stc.Parallel), len(stc.Sequential), len(stc.Streams)
if lp == 0 || (lseq == 0 && lstr == 0) {
return ErrNoTests
}
return nil
}
// runBench will create a service with the provided stc.NewService function,
// and run a benchmark on the test function.
func (stc *ServiceTestConfig) runBench(b *testing.B, name string, test testFunc) {
b.StopTimer()
// Channel to signal service has started
started := make(chan struct{})
// Context with cancel to stop the service
ctx, cancel := context.WithCancel(context.Background())
opts := []micro.Option{
micro.Context(ctx),
micro.AfterStart(func() error {
started <- struct{}{}
return nil
}),
}
// Create a new service per test
service, err := stc.NewService(name, opts...)
if err != nil {
b.Fatalf("failed to create service: %v", err)
}
// Register handler
if err := pb.RegisterDebugHandler(service.Server(), handler.NewHandler(service.Client())); err != nil {
b.Fatalf("failed to register handler during initial service setup: %v", err)
}
o := service.Options()
if err := o.Broker.Connect(); err != nil {
b.Fatal(err)
}
// a := new(testService)
if err := o.Server.Subscribe(o.Server.NewSubscriber(testTopic, stc.Handle)); err != nil {
b.Fatalf("[%s] Failed to register subscriber: %v", name, err)
}
if err := o.Server.Subscribe(o.Server.NewSubscriber(errorTopic, stc.HandleError)); err != nil {
b.Fatalf("[%s] Failed to register subscriber: %v", name, err)
}
b.Logf("# == [ Service ] ==================")
b.Logf("# * Server: %s", o.Server.String())
b.Logf("# * Client: %s", o.Client.String())
b.Logf("# * Transport: %s", o.Transport.String())
b.Logf("# * Broker: %s", o.Broker.String())
b.Logf("# * Registry: %s", o.Registry.String())
b.Logf("# * Auth: %s", o.Auth.String())
b.Logf("# * Cache: %s", o.Cache.String())
b.Logf("# * Runtime: %s", o.Runtime.String())
b.Logf("# ================================")
RunBenchmark(b, name, service, test, cancel, started)
}
// RunBenchmark will run benchmarks on a provided service.
//
// A test function can be provided that will be fun b.N times.
func RunBenchmark(b *testing.B, name string, service micro.Service, test testFunc,
cancel context.CancelFunc, started chan struct{}) {
b.StopTimer()
// Receive errors from routines on this channel
errChan := make(chan error, 1)
// Receive singal after service has shutdown
done := make(chan struct{})
// Start the server
go func() {
b.Logf("[%s] Starting server for benchmark", name)
if err := service.Run(); err != nil {
errChan <- errors.Wrapf(err, "[%s] Error occurred during service.Run", name)
}
done <- struct{}{}
}()
sigTerm := make(chan struct{})
// Benchmark routine
go func() {
defer func() {
b.StopTimer()
// Shutdown service
b.Logf("[%s] Shutting down", name)
cancel()
// Wait for service to be fully stopped
<-done
sigTerm <- struct{}{}
}()
// Wait for service to start
<-started
// Give the registry more time to setup
time.Sleep(time.Second)
b.Logf("[%s] Server started", name)
// Make a test call to warm the cache
for i := 0; i < 10; i++ {
if err := testRequest(context.Background(), service.Client(), name); err != nil {
errChan <- errors.Wrapf(err, "[%s] Failure during cache warmup testRequest", name)
}
}
// Check registration
services, err := service.Options().Registry.GetService(name)
if err != nil || len(services) == 0 {
errChan <- fmt.Errorf("service registration must have failed (%d services found), unable to get service: %w", len(services), err)
return
}
// Start benchmark
b.Logf("[%s] Starting benchtest", name)
b.ResetTimer()
b.StartTimer()
// Number of iterations
for i := 0; i < b.N; i++ {
test(name, service.Client(), errChan)
}
}()
// Wait for completion or catch any errors
select {
case err := <-errChan:
b.Fatal(err)
case <-sigTerm:
b.Logf("[%s] Completed benchmark", name)
}
}
// testParallel will run the test function in p parallel routines.
func testParallel(p int, test func()) {
// Waitgroup to wait for requests to finish
wg := sync.WaitGroup{}
// For concurrency
for j := 0; j < p; j++ {
wg.Add(1)
go func() {
defer wg.Done()
test()
}()
}
// Wait for test completion
wg.Wait()
}
// testRequest sends one test request.
// It calls the Debug.Health endpoint, and validates if the response returned
// contains the expected message.
func testRequest(ctx context.Context, c client.Client, name string) error {
req := c.NewRequest(
name,
"Debug.Health",
new(pb.HealthRequest),
)
rsp := new(pb.HealthResponse)
if err := c.Call(ctx, req, rsp); err != nil {
return err
}
if rsp.Status != "ok" {
return errors.New("service response: " + rsp.Status)
}
return nil
}