mirror of
https://github.com/go-kratos/kratos.git
synced 2025-01-24 03:46:37 +02:00
commit
5949c6debf
27
pkg/cache/redis/commandinfo_test.go
vendored
Normal file
27
pkg/cache/redis/commandinfo_test.go
vendored
Normal file
@ -0,0 +1,27 @@
|
||||
package redis
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestLookupCommandInfo(t *testing.T) {
|
||||
for _, n := range []string{"watch", "WATCH", "wAtch"} {
|
||||
if LookupCommandInfo(n) == (CommandInfo{}) {
|
||||
t.Errorf("LookupCommandInfo(%q) = CommandInfo{}, expected non-zero value", n)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func benchmarkLookupCommandInfo(b *testing.B, names ...string) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
for _, c := range names {
|
||||
LookupCommandInfo(c)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkLookupCommandInfoCorrectCase(b *testing.B) {
|
||||
benchmarkLookupCommandInfo(b, "watch", "WATCH", "monitor", "MONITOR")
|
||||
}
|
||||
|
||||
func BenchmarkLookupCommandInfoMixedCase(b *testing.B) {
|
||||
benchmarkLookupCommandInfo(b, "wAtch", "WeTCH", "monItor", "MONiTOR")
|
||||
}
|
62
pkg/cache/redis/conn.go
vendored
62
pkg/cache/redis/conn.go
vendored
@ -30,6 +30,33 @@ import (
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
// Conn represents a connection to a Redis server.
|
||||
type Conn interface {
|
||||
// Close closes the connection.
|
||||
Close() error
|
||||
|
||||
// Err returns a non-nil value if the connection is broken. The returned
|
||||
// value is either the first non-nil value returned from the underlying
|
||||
// network connection or a protocol parsing error. Applications should
|
||||
// close broken connections.
|
||||
Err() error
|
||||
|
||||
// Do sends a command to the server and returns the received reply.
|
||||
Do(commandName string, args ...interface{}) (reply interface{}, err error)
|
||||
|
||||
// Send writes the command to the client's output buffer.
|
||||
Send(commandName string, args ...interface{}) error
|
||||
|
||||
// Flush flushes the output buffer to the Redis server.
|
||||
Flush() error
|
||||
|
||||
// Receive receives a single reply from the Redis server
|
||||
Receive() (reply interface{}, err error)
|
||||
|
||||
// WithContext returns Conn with the input ctx.
|
||||
WithContext(ctx context.Context) Conn
|
||||
}
|
||||
|
||||
// conn is the low-level implementation of Conn
|
||||
type conn struct {
|
||||
// Shared
|
||||
@ -38,6 +65,8 @@ type conn struct {
|
||||
err error
|
||||
conn net.Conn
|
||||
|
||||
ctx context.Context
|
||||
|
||||
// Read
|
||||
readTimeout time.Duration
|
||||
br *bufio.Reader
|
||||
@ -226,6 +255,7 @@ func NewConn(c *Config) (cn Conn, err error) {
|
||||
|
||||
func (c *conn) Close() error {
|
||||
c.mu.Lock()
|
||||
c.ctx = nil
|
||||
err := c.err
|
||||
if c.err == nil {
|
||||
c.err = errors.New("redigo: closed")
|
||||
@ -295,7 +325,7 @@ func (c *conn) writeFloat64(n float64) error {
|
||||
|
||||
func (c *conn) writeCommand(cmd string, args []interface{}) (err error) {
|
||||
if c.writeTimeout != 0 {
|
||||
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
|
||||
c.conn.SetWriteDeadline(shrinkDeadline(c.ctx, c.writeTimeout))
|
||||
}
|
||||
c.writeLen('*', 1+len(args))
|
||||
err = c.writeString(cmd)
|
||||
@ -478,7 +508,7 @@ func (c *conn) Send(cmd string, args ...interface{}) (err error) {
|
||||
|
||||
func (c *conn) Flush() (err error) {
|
||||
if c.writeTimeout != 0 {
|
||||
c.conn.SetWriteDeadline(time.Now().Add(c.writeTimeout))
|
||||
c.conn.SetWriteDeadline(shrinkDeadline(c.ctx, c.writeTimeout))
|
||||
}
|
||||
if err = c.bw.Flush(); err != nil {
|
||||
c.fatal(err)
|
||||
@ -488,7 +518,7 @@ func (c *conn) Flush() (err error) {
|
||||
|
||||
func (c *conn) Receive() (reply interface{}, err error) {
|
||||
if c.readTimeout != 0 {
|
||||
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
|
||||
c.conn.SetReadDeadline(shrinkDeadline(c.ctx, c.readTimeout))
|
||||
}
|
||||
if reply, err = c.readReply(); err != nil {
|
||||
return nil, c.fatal(err)
|
||||
@ -511,7 +541,7 @@ func (c *conn) Receive() (reply interface{}, err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
||||
func (c *conn) Do(cmd string, args ...interface{}) (reply interface{}, err error) {
|
||||
c.mu.Lock()
|
||||
pending := c.pending
|
||||
c.pending = 0
|
||||
@ -519,7 +549,7 @@ func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
||||
if cmd == "" && pending == 0 {
|
||||
return nil, nil
|
||||
}
|
||||
var err error
|
||||
|
||||
if cmd != "" {
|
||||
err = c.writeCommand(cmd, args)
|
||||
}
|
||||
@ -530,7 +560,7 @@ func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
||||
return nil, c.fatal(err)
|
||||
}
|
||||
if c.readTimeout != 0 {
|
||||
c.conn.SetReadDeadline(time.Now().Add(c.readTimeout))
|
||||
c.conn.SetReadDeadline(shrinkDeadline(c.ctx, c.readTimeout))
|
||||
}
|
||||
if cmd == "" {
|
||||
reply := make([]interface{}, pending)
|
||||
@ -548,7 +578,6 @@ func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
||||
return reply, nil
|
||||
}
|
||||
|
||||
var reply interface{}
|
||||
for i := 0; i <= pending; i++ {
|
||||
var e error
|
||||
if reply, e = c.readReply(); e != nil {
|
||||
@ -561,5 +590,20 @@ func (c *conn) Do(cmd string, args ...interface{}) (interface{}, error) {
|
||||
return reply, err
|
||||
}
|
||||
|
||||
// WithContext FIXME: implement WithContext
|
||||
func (c *conn) WithContext(ctx context.Context) Conn { return c }
|
||||
func (c *conn) copy() *conn {
|
||||
return &conn{
|
||||
pending: c.pending,
|
||||
err: c.err,
|
||||
conn: c.conn,
|
||||
bw: c.bw,
|
||||
br: c.br,
|
||||
readTimeout: c.readTimeout,
|
||||
writeTimeout: c.writeTimeout,
|
||||
}
|
||||
}
|
||||
|
||||
func (c *conn) WithContext(ctx context.Context) Conn {
|
||||
c2 := c.copy()
|
||||
c2.ctx = ctx
|
||||
return c2
|
||||
}
|
||||
|
670
pkg/cache/redis/conn_test.go
vendored
Normal file
670
pkg/cache/redis/conn_test.go
vendored
Normal file
@ -0,0 +1,670 @@
|
||||
// Copyright 2012 Gary Burd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"): you may
|
||||
// not use this file except in compliance with the License. You may obtain
|
||||
// a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
// License for the specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package redis
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"io"
|
||||
"math"
|
||||
"net"
|
||||
"os"
|
||||
"reflect"
|
||||
"strings"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
type tConn struct {
|
||||
io.Reader
|
||||
io.Writer
|
||||
}
|
||||
|
||||
func (*tConn) Close() error { return nil }
|
||||
func (*tConn) LocalAddr() net.Addr { return nil }
|
||||
func (*tConn) RemoteAddr() net.Addr { return nil }
|
||||
func (*tConn) SetDeadline(t time.Time) error { return nil }
|
||||
func (*tConn) SetReadDeadline(t time.Time) error { return nil }
|
||||
func (*tConn) SetWriteDeadline(t time.Time) error { return nil }
|
||||
|
||||
func dialTestConn(r io.Reader, w io.Writer) DialOption {
|
||||
return DialNetDial(func(net, addr string) (net.Conn, error) {
|
||||
return &tConn{Reader: r, Writer: w}, nil
|
||||
})
|
||||
}
|
||||
|
||||
var writeTests = []struct {
|
||||
args []interface{}
|
||||
expected string
|
||||
}{
|
||||
{
|
||||
[]interface{}{"SET", "key", "value"},
|
||||
"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n",
|
||||
},
|
||||
{
|
||||
[]interface{}{"SET", "key", "value"},
|
||||
"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$5\r\nvalue\r\n",
|
||||
},
|
||||
{
|
||||
[]interface{}{"SET", "key", byte(100)},
|
||||
"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$3\r\n100\r\n",
|
||||
},
|
||||
{
|
||||
[]interface{}{"SET", "key", 100},
|
||||
"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$3\r\n100\r\n",
|
||||
},
|
||||
{
|
||||
[]interface{}{"SET", "key", int64(math.MinInt64)},
|
||||
"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$20\r\n-9223372036854775808\r\n",
|
||||
},
|
||||
{
|
||||
[]interface{}{"SET", "key", float64(1349673917.939762)},
|
||||
"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$21\r\n1.349673917939762e+09\r\n",
|
||||
},
|
||||
{
|
||||
[]interface{}{"SET", "key", ""},
|
||||
"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$0\r\n\r\n",
|
||||
},
|
||||
{
|
||||
[]interface{}{"SET", "key", nil},
|
||||
"*3\r\n$3\r\nSET\r\n$3\r\nkey\r\n$0\r\n\r\n",
|
||||
},
|
||||
{
|
||||
[]interface{}{"ECHO", true, false},
|
||||
"*3\r\n$4\r\nECHO\r\n$1\r\n1\r\n$1\r\n0\r\n",
|
||||
},
|
||||
}
|
||||
|
||||
func TestWrite(t *testing.T) {
|
||||
for _, tt := range writeTests {
|
||||
var buf bytes.Buffer
|
||||
c, _ := Dial("", "", dialTestConn(nil, &buf))
|
||||
err := c.Send(tt.args[0].(string), tt.args[1:]...)
|
||||
if err != nil {
|
||||
t.Errorf("Send(%v) returned error %v", tt.args, err)
|
||||
continue
|
||||
}
|
||||
c.Flush()
|
||||
actual := buf.String()
|
||||
if actual != tt.expected {
|
||||
t.Errorf("Send(%v) = %q, want %q", tt.args, actual, tt.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var errorSentinel = &struct{}{}
|
||||
|
||||
var readTests = []struct {
|
||||
reply string
|
||||
expected interface{}
|
||||
}{
|
||||
{
|
||||
"+OK\r\n",
|
||||
"OK",
|
||||
},
|
||||
{
|
||||
"+PONG\r\n",
|
||||
"PONG",
|
||||
},
|
||||
{
|
||||
"@OK\r\n",
|
||||
errorSentinel,
|
||||
},
|
||||
{
|
||||
"$6\r\nfoobar\r\n",
|
||||
[]byte("foobar"),
|
||||
},
|
||||
{
|
||||
"$-1\r\n",
|
||||
nil,
|
||||
},
|
||||
{
|
||||
":1\r\n",
|
||||
int64(1),
|
||||
},
|
||||
{
|
||||
":-2\r\n",
|
||||
int64(-2),
|
||||
},
|
||||
{
|
||||
"*0\r\n",
|
||||
[]interface{}{},
|
||||
},
|
||||
{
|
||||
"*-1\r\n",
|
||||
nil,
|
||||
},
|
||||
{
|
||||
"*4\r\n$3\r\nfoo\r\n$3\r\nbar\r\n$5\r\nHello\r\n$5\r\nWorld\r\n",
|
||||
[]interface{}{[]byte("foo"), []byte("bar"), []byte("Hello"), []byte("World")},
|
||||
},
|
||||
{
|
||||
"*3\r\n$3\r\nfoo\r\n$-1\r\n$3\r\nbar\r\n",
|
||||
[]interface{}{[]byte("foo"), nil, []byte("bar")},
|
||||
},
|
||||
|
||||
{
|
||||
// "x" is not a valid length
|
||||
"$x\r\nfoobar\r\n",
|
||||
errorSentinel,
|
||||
},
|
||||
{
|
||||
// -2 is not a valid length
|
||||
"$-2\r\n",
|
||||
errorSentinel,
|
||||
},
|
||||
{
|
||||
// "x" is not a valid integer
|
||||
":x\r\n",
|
||||
errorSentinel,
|
||||
},
|
||||
{
|
||||
// missing \r\n following value
|
||||
"$6\r\nfoobar",
|
||||
errorSentinel,
|
||||
},
|
||||
{
|
||||
// short value
|
||||
"$6\r\nxx",
|
||||
errorSentinel,
|
||||
},
|
||||
{
|
||||
// long value
|
||||
"$6\r\nfoobarx\r\n",
|
||||
errorSentinel,
|
||||
},
|
||||
}
|
||||
|
||||
func TestRead(t *testing.T) {
|
||||
for _, tt := range readTests {
|
||||
c, _ := Dial("", "", dialTestConn(strings.NewReader(tt.reply), nil))
|
||||
actual, err := c.Receive()
|
||||
if tt.expected == errorSentinel {
|
||||
if err == nil {
|
||||
t.Errorf("Receive(%q) did not return expected error", tt.reply)
|
||||
}
|
||||
} else {
|
||||
if err != nil {
|
||||
t.Errorf("Receive(%q) returned error %v", tt.reply, err)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(actual, tt.expected) {
|
||||
t.Errorf("Receive(%q) = %v, want %v", tt.reply, actual, tt.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var testCommands = []struct {
|
||||
args []interface{}
|
||||
expected interface{}
|
||||
}{
|
||||
{
|
||||
[]interface{}{"PING"},
|
||||
"PONG",
|
||||
},
|
||||
{
|
||||
[]interface{}{"SET", "foo", "bar"},
|
||||
"OK",
|
||||
},
|
||||
{
|
||||
[]interface{}{"GET", "foo"},
|
||||
[]byte("bar"),
|
||||
},
|
||||
{
|
||||
[]interface{}{"GET", "nokey"},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
[]interface{}{"MGET", "nokey", "foo"},
|
||||
[]interface{}{nil, []byte("bar")},
|
||||
},
|
||||
{
|
||||
[]interface{}{"INCR", "mycounter"},
|
||||
int64(1),
|
||||
},
|
||||
{
|
||||
[]interface{}{"LPUSH", "mylist", "foo"},
|
||||
int64(1),
|
||||
},
|
||||
{
|
||||
[]interface{}{"LPUSH", "mylist", "bar"},
|
||||
int64(2),
|
||||
},
|
||||
{
|
||||
[]interface{}{"LRANGE", "mylist", 0, -1},
|
||||
[]interface{}{[]byte("bar"), []byte("foo")},
|
||||
},
|
||||
{
|
||||
[]interface{}{"MULTI"},
|
||||
"OK",
|
||||
},
|
||||
{
|
||||
[]interface{}{"LRANGE", "mylist", 0, -1},
|
||||
"QUEUED",
|
||||
},
|
||||
{
|
||||
[]interface{}{"PING"},
|
||||
"QUEUED",
|
||||
},
|
||||
{
|
||||
[]interface{}{"EXEC"},
|
||||
[]interface{}{
|
||||
[]interface{}{[]byte("bar"), []byte("foo")},
|
||||
"PONG",
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
func TestDoCommands(t *testing.T) {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
t.Fatalf("error connection to database, %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
for _, cmd := range testCommands {
|
||||
actual, err := c.Do(cmd.args[0].(string), cmd.args[1:]...)
|
||||
if err != nil {
|
||||
t.Errorf("Do(%v) returned error %v", cmd.args, err)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(actual, cmd.expected) {
|
||||
t.Errorf("Do(%v) = %v, want %v", cmd.args, actual, cmd.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestPipelineCommands(t *testing.T) {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
t.Fatalf("error connection to database, %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
for _, cmd := range testCommands {
|
||||
if err := c.Send(cmd.args[0].(string), cmd.args[1:]...); err != nil {
|
||||
t.Fatalf("Send(%v) returned error %v", cmd.args, err)
|
||||
}
|
||||
}
|
||||
if err := c.Flush(); err != nil {
|
||||
t.Errorf("Flush() returned error %v", err)
|
||||
}
|
||||
for _, cmd := range testCommands {
|
||||
actual, err := c.Receive()
|
||||
if err != nil {
|
||||
t.Fatalf("Receive(%v) returned error %v", cmd.args, err)
|
||||
}
|
||||
if !reflect.DeepEqual(actual, cmd.expected) {
|
||||
t.Errorf("Receive(%v) = %v, want %v", cmd.args, actual, cmd.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBlankCommmand(t *testing.T) {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
t.Fatalf("error connection to database, %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
for _, cmd := range testCommands {
|
||||
if err = c.Send(cmd.args[0].(string), cmd.args[1:]...); err != nil {
|
||||
t.Fatalf("Send(%v) returned error %v", cmd.args, err)
|
||||
}
|
||||
}
|
||||
reply, err := Values(c.Do(""))
|
||||
if err != nil {
|
||||
t.Fatalf("Do() returned error %v", err)
|
||||
}
|
||||
if len(reply) != len(testCommands) {
|
||||
t.Fatalf("len(reply)=%d, want %d", len(reply), len(testCommands))
|
||||
}
|
||||
for i, cmd := range testCommands {
|
||||
actual := reply[i]
|
||||
if !reflect.DeepEqual(actual, cmd.expected) {
|
||||
t.Errorf("Receive(%v) = %v, want %v", cmd.args, actual, cmd.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestRecvBeforeSend(t *testing.T) {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
t.Fatalf("error connection to database, %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
c.Receive()
|
||||
close(done)
|
||||
}()
|
||||
time.Sleep(time.Millisecond)
|
||||
c.Send("PING")
|
||||
c.Flush()
|
||||
<-done
|
||||
_, err = c.Do("")
|
||||
if err != nil {
|
||||
t.Fatalf("error=%v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestError(t *testing.T) {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
t.Fatalf("error connection to database, %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
c.Do("SET", "key", "val")
|
||||
_, err = c.Do("HSET", "key", "fld", "val")
|
||||
if err == nil {
|
||||
t.Errorf("Expected err for HSET on string key.")
|
||||
}
|
||||
if c.Err() != nil {
|
||||
t.Errorf("Conn has Err()=%v, expect nil", c.Err())
|
||||
}
|
||||
_, err = c.Do("SET", "key", "val")
|
||||
if err != nil {
|
||||
t.Errorf("Do(SET, key, val) returned error %v, expected nil.", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestReadTimeout(t *testing.T) {
|
||||
l, err := net.Listen("tcp", "127.0.0.1:0")
|
||||
if err != nil {
|
||||
t.Fatalf("net.Listen returned %v", err)
|
||||
}
|
||||
defer l.Close()
|
||||
|
||||
go func() {
|
||||
for {
|
||||
c, err1 := l.Accept()
|
||||
if err1 != nil {
|
||||
return
|
||||
}
|
||||
go func() {
|
||||
time.Sleep(time.Second)
|
||||
c.Write([]byte("+OK\r\n"))
|
||||
c.Close()
|
||||
}()
|
||||
}
|
||||
}()
|
||||
|
||||
// Do
|
||||
|
||||
c1, err := Dial(l.Addr().Network(), l.Addr().String(), DialReadTimeout(time.Millisecond))
|
||||
if err != nil {
|
||||
t.Fatalf("Dial returned %v", err)
|
||||
}
|
||||
defer c1.Close()
|
||||
|
||||
_, err = c1.Do("PING")
|
||||
if err == nil {
|
||||
t.Fatalf("c1.Do() returned nil, expect error")
|
||||
}
|
||||
if c1.Err() == nil {
|
||||
t.Fatalf("c1.Err() = nil, expect error")
|
||||
}
|
||||
|
||||
// Send/Flush/Receive
|
||||
|
||||
c2, err := Dial(l.Addr().Network(), l.Addr().String(), DialReadTimeout(time.Millisecond))
|
||||
if err != nil {
|
||||
t.Fatalf("Dial returned %v", err)
|
||||
}
|
||||
defer c2.Close()
|
||||
|
||||
c2.Send("PING")
|
||||
c2.Flush()
|
||||
_, err = c2.Receive()
|
||||
if err == nil {
|
||||
t.Fatalf("c2.Receive() returned nil, expect error")
|
||||
}
|
||||
if c2.Err() == nil {
|
||||
t.Fatalf("c2.Err() = nil, expect error")
|
||||
}
|
||||
}
|
||||
|
||||
var dialErrors = []struct {
|
||||
rawurl string
|
||||
expectedError string
|
||||
}{
|
||||
{
|
||||
"localhost",
|
||||
"invalid redis URL scheme",
|
||||
},
|
||||
// The error message for invalid hosts is diffferent in different
|
||||
// versions of Go, so just check that there is an error message.
|
||||
{
|
||||
"redis://weird url",
|
||||
"",
|
||||
},
|
||||
{
|
||||
"redis://foo:bar:baz",
|
||||
"",
|
||||
},
|
||||
{
|
||||
"http://www.google.com",
|
||||
"invalid redis URL scheme: http",
|
||||
},
|
||||
{
|
||||
"redis://localhost:6379/abc123",
|
||||
"invalid database: abc123",
|
||||
},
|
||||
}
|
||||
|
||||
func TestDialURLErrors(t *testing.T) {
|
||||
for _, d := range dialErrors {
|
||||
_, err := DialURL(d.rawurl)
|
||||
if err == nil || !strings.Contains(err.Error(), d.expectedError) {
|
||||
t.Errorf("DialURL did not return expected error (expected %v to contain %s)", err, d.expectedError)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestDialURLPort(t *testing.T) {
|
||||
checkPort := func(network, address string) (net.Conn, error) {
|
||||
if address != "localhost:6379" {
|
||||
t.Errorf("DialURL did not set port to 6379 by default (got %v)", address)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
_, err := DialURL("redis://localhost", DialNetDial(checkPort))
|
||||
if err != nil {
|
||||
t.Error("dial error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDialURLHost(t *testing.T) {
|
||||
checkHost := func(network, address string) (net.Conn, error) {
|
||||
if address != "localhost:6379" {
|
||||
t.Errorf("DialURL did not set host to localhost by default (got %v)", address)
|
||||
}
|
||||
return nil, nil
|
||||
}
|
||||
_, err := DialURL("redis://:6379", DialNetDial(checkHost))
|
||||
if err != nil {
|
||||
t.Error("dial error:", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDialURLPassword(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
_, err := DialURL("redis://x:abc123@localhost", dialTestConn(strings.NewReader("+OK\r\n"), &buf))
|
||||
if err != nil {
|
||||
t.Error("dial error:", err)
|
||||
}
|
||||
expected := "*2\r\n$4\r\nAUTH\r\n$6\r\nabc123\r\n"
|
||||
actual := buf.String()
|
||||
if actual != expected {
|
||||
t.Errorf("commands = %q, want %q", actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestDialURLDatabase(t *testing.T) {
|
||||
var buf bytes.Buffer
|
||||
_, err := DialURL("redis://localhost/3", dialTestConn(strings.NewReader("+OK\r\n"), &buf))
|
||||
if err != nil {
|
||||
t.Error("dial error:", err)
|
||||
}
|
||||
expected := "*2\r\n$6\r\nSELECT\r\n$1\r\n3\r\n"
|
||||
actual := buf.String()
|
||||
if actual != expected {
|
||||
t.Errorf("commands = %q, want %q", actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
// Connect to local instance of Redis running on the default port.
|
||||
func ExampleDial() {
|
||||
c, err := Dial("tcp", ":6379")
|
||||
if err != nil {
|
||||
// handle error
|
||||
}
|
||||
defer c.Close()
|
||||
}
|
||||
|
||||
// Connect to remote instance of Redis using a URL.
|
||||
func ExampleDialURL() {
|
||||
c, err := DialURL(os.Getenv("REDIS_URL"))
|
||||
if err != nil {
|
||||
// handle connection error
|
||||
}
|
||||
defer c.Close()
|
||||
}
|
||||
|
||||
// TextExecError tests handling of errors in a transaction. See
|
||||
// http://io/topics/transactions for information on how Redis handles
|
||||
// errors in a transaction.
|
||||
func TestExecError(t *testing.T) {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
t.Fatalf("error connection to database, %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
// Execute commands that fail before EXEC is called.
|
||||
|
||||
c.Do("DEL", "k0")
|
||||
c.Do("ZADD", "k0", 0, 0)
|
||||
c.Send("MULTI")
|
||||
c.Send("NOTACOMMAND", "k0", 0, 0)
|
||||
c.Send("ZINCRBY", "k0", 0, 0)
|
||||
v, err := c.Do("EXEC")
|
||||
if err == nil {
|
||||
t.Fatalf("EXEC returned values %v, expected error", v)
|
||||
}
|
||||
|
||||
// Execute commands that fail after EXEC is called. The first command
|
||||
// returns an error.
|
||||
|
||||
c.Do("DEL", "k1")
|
||||
c.Do("ZADD", "k1", 0, 0)
|
||||
c.Send("MULTI")
|
||||
c.Send("HSET", "k1", 0, 0)
|
||||
c.Send("ZINCRBY", "k1", 0, 0)
|
||||
v, err = c.Do("EXEC")
|
||||
if err != nil {
|
||||
t.Fatalf("EXEC returned error %v", err)
|
||||
}
|
||||
|
||||
vs, err := Values(v, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Values(v) returned error %v", err)
|
||||
}
|
||||
|
||||
if len(vs) != 2 {
|
||||
t.Fatalf("len(vs) == %d, want 2", len(vs))
|
||||
}
|
||||
|
||||
if _, ok := vs[0].(error); !ok {
|
||||
t.Fatalf("first result is type %T, expected error", vs[0])
|
||||
}
|
||||
|
||||
if _, ok := vs[1].([]byte); !ok {
|
||||
t.Fatalf("second result is type %T, expected []byte", vs[1])
|
||||
}
|
||||
|
||||
// Execute commands that fail after EXEC is called. The second command
|
||||
// returns an error.
|
||||
|
||||
c.Do("ZADD", "k2", 0, 0)
|
||||
c.Send("MULTI")
|
||||
c.Send("ZINCRBY", "k2", 0, 0)
|
||||
c.Send("HSET", "k2", 0, 0)
|
||||
v, err = c.Do("EXEC")
|
||||
if err != nil {
|
||||
t.Fatalf("EXEC returned error %v", err)
|
||||
}
|
||||
|
||||
vs, err = Values(v, nil)
|
||||
if err != nil {
|
||||
t.Fatalf("Values(v) returned error %v", err)
|
||||
}
|
||||
|
||||
if len(vs) != 2 {
|
||||
t.Fatalf("len(vs) == %d, want 2", len(vs))
|
||||
}
|
||||
|
||||
if _, ok := vs[0].([]byte); !ok {
|
||||
t.Fatalf("first result is type %T, expected []byte", vs[0])
|
||||
}
|
||||
|
||||
if _, ok := vs[1].(error); !ok {
|
||||
t.Fatalf("second result is type %T, expected error", vs[2])
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDoEmpty(b *testing.B) {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if _, err := c.Do(""); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkDoPing(b *testing.B) {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
defer c.Close()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if _, err := c.Do("PING"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkConn(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c2 := c.WithContext(context.TODO())
|
||||
if _, err := c2.Do("PING"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c2.Close()
|
||||
}
|
||||
}
|
18
pkg/cache/redis/log.go
vendored
18
pkg/cache/redis/log.go
vendored
@ -16,16 +16,18 @@ package redis
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"fmt"
|
||||
"log"
|
||||
)
|
||||
|
||||
// NewLoggingConn returns a logging wrapper around a connection.
|
||||
// ATTENTION: ONLY use loggingConn in developing, DO NOT use this in production.
|
||||
func NewLoggingConn(conn Conn, logger *log.Logger, prefix string) Conn {
|
||||
if prefix != "" {
|
||||
prefix = prefix + "."
|
||||
}
|
||||
return &loggingConn{conn, logger, prefix}
|
||||
return &loggingConn{Conn: conn, logger: logger, prefix: prefix}
|
||||
}
|
||||
|
||||
type loggingConn struct {
|
||||
@ -98,16 +100,16 @@ func (c *loggingConn) print(method, commandName string, args []interface{}, repl
|
||||
c.logger.Output(3, buf.String())
|
||||
}
|
||||
|
||||
func (c *loggingConn) Do(commandName string, args ...interface{}) (interface{}, error) {
|
||||
reply, err := c.Conn.Do(commandName, args...)
|
||||
func (c *loggingConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
||||
reply, err = c.Conn.Do(commandName, args...)
|
||||
c.print("Do", commandName, args, reply, err)
|
||||
return reply, err
|
||||
}
|
||||
|
||||
func (c *loggingConn) Send(commandName string, args ...interface{}) error {
|
||||
err := c.Conn.Send(commandName, args...)
|
||||
func (c *loggingConn) Send(commandName string, args ...interface{}) (err error) {
|
||||
err = c.Conn.Send(commandName, args...)
|
||||
c.print("Send", commandName, args, nil, err)
|
||||
return err
|
||||
return
|
||||
}
|
||||
|
||||
func (c *loggingConn) Receive() (interface{}, error) {
|
||||
@ -115,3 +117,7 @@ func (c *loggingConn) Receive() (interface{}, error) {
|
||||
c.print("Receive", "", nil, reply, err)
|
||||
return reply, err
|
||||
}
|
||||
|
||||
func (c *loggingConn) WithContext(ctx context.Context) Conn {
|
||||
return c
|
||||
}
|
||||
|
67
pkg/cache/redis/main_test.go
vendored
Normal file
67
pkg/cache/redis/main_test.go
vendored
Normal file
@ -0,0 +1,67 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"os"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/bilibili/kratos/pkg/container/pool"
|
||||
"github.com/bilibili/kratos/pkg/testing/lich"
|
||||
xtime "github.com/bilibili/kratos/pkg/time"
|
||||
)
|
||||
|
||||
var (
|
||||
testRedisAddr string
|
||||
testPool *Pool
|
||||
testConfig *Config
|
||||
)
|
||||
|
||||
func setupTestConfig(addr string) {
|
||||
c := getTestConfig(addr)
|
||||
c.Config = &pool.Config{
|
||||
Active: 20,
|
||||
Idle: 2,
|
||||
IdleTimeout: xtime.Duration(90 * time.Second),
|
||||
}
|
||||
testConfig = c
|
||||
}
|
||||
|
||||
func getTestConfig(addr string) *Config {
|
||||
return &Config{
|
||||
Name: "test",
|
||||
Proto: "tcp",
|
||||
Addr: addr,
|
||||
DialTimeout: xtime.Duration(time.Second),
|
||||
ReadTimeout: xtime.Duration(time.Second),
|
||||
WriteTimeout: xtime.Duration(time.Second),
|
||||
}
|
||||
}
|
||||
|
||||
func setupTestPool() {
|
||||
testPool = NewPool(testConfig)
|
||||
}
|
||||
|
||||
// DialDefaultServer starts the test server if not already started and dials a
|
||||
// connection to the server.
|
||||
func DialDefaultServer() (Conn, error) {
|
||||
c, err := Dial("tcp", testRedisAddr, DialReadTimeout(1*time.Second), DialWriteTimeout(1*time.Second))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
c.Do("FLUSHDB")
|
||||
return c, nil
|
||||
}
|
||||
|
||||
func TestMain(m *testing.M) {
|
||||
flag.Set("f", "./test/docker-compose.yaml")
|
||||
if err := lich.Setup(); err != nil {
|
||||
panic(err)
|
||||
}
|
||||
defer lich.Teardown()
|
||||
testRedisAddr = "localhost:6379"
|
||||
setupTestConfig(testRedisAddr)
|
||||
setupTestPool()
|
||||
ret := m.Run()
|
||||
os.Exit(ret)
|
||||
}
|
4
pkg/cache/redis/metrics.go
vendored
4
pkg/cache/redis/metrics.go
vendored
@ -1,6 +1,8 @@
|
||||
package redis
|
||||
|
||||
import "github.com/bilibili/kratos/pkg/stat/metric"
|
||||
import (
|
||||
"github.com/bilibili/kratos/pkg/stat/metric"
|
||||
)
|
||||
|
||||
const namespace = "redis_client"
|
||||
|
||||
|
4
pkg/cache/redis/mock.go
vendored
4
pkg/cache/redis/mock.go
vendored
@ -1,8 +1,6 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
)
|
||||
import "context"
|
||||
|
||||
// MockErr for unit test.
|
||||
type MockErr struct {
|
||||
|
85
pkg/cache/redis/pipeline.go
vendored
Normal file
85
pkg/cache/redis/pipeline.go
vendored
Normal file
@ -0,0 +1,85 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
)
|
||||
|
||||
type Pipeliner interface {
|
||||
// Send writes the command to the client's output buffer.
|
||||
Send(commandName string, args ...interface{})
|
||||
|
||||
// Exec executes all commands and get replies.
|
||||
Exec(ctx context.Context) (rs *Replies, err error)
|
||||
}
|
||||
|
||||
var (
|
||||
ErrNoReply = errors.New("redis: no reply in result set")
|
||||
)
|
||||
|
||||
type pipeliner struct {
|
||||
pool *Pool
|
||||
cmds []*cmd
|
||||
}
|
||||
|
||||
type Replies struct {
|
||||
replies []*reply
|
||||
}
|
||||
|
||||
type reply struct {
|
||||
reply interface{}
|
||||
err error
|
||||
}
|
||||
|
||||
func (rs *Replies) Next() bool {
|
||||
return len(rs.replies) > 0
|
||||
}
|
||||
|
||||
func (rs *Replies) Scan() (reply interface{}, err error) {
|
||||
if !rs.Next() {
|
||||
return nil, ErrNoReply
|
||||
}
|
||||
reply, err = rs.replies[0].reply, rs.replies[0].err
|
||||
rs.replies = rs.replies[1:]
|
||||
return
|
||||
}
|
||||
|
||||
type cmd struct {
|
||||
commandName string
|
||||
args []interface{}
|
||||
}
|
||||
|
||||
func (p *pipeliner) Send(commandName string, args ...interface{}) {
|
||||
p.cmds = append(p.cmds, &cmd{commandName: commandName, args: args})
|
||||
return
|
||||
}
|
||||
|
||||
func (p *pipeliner) Exec(ctx context.Context) (rs *Replies, err error) {
|
||||
n := len(p.cmds)
|
||||
if n == 0 {
|
||||
return &Replies{}, nil
|
||||
}
|
||||
c := p.pool.Get(ctx)
|
||||
defer c.Close()
|
||||
for len(p.cmds) > 0 {
|
||||
cmd := p.cmds[0]
|
||||
p.cmds = p.cmds[1:]
|
||||
if err := c.Send(cmd.commandName, cmd.args...); err != nil {
|
||||
p.cmds = p.cmds[:0]
|
||||
return nil, err
|
||||
}
|
||||
}
|
||||
if err = c.Flush(); err != nil {
|
||||
p.cmds = p.cmds[:0]
|
||||
return nil, err
|
||||
}
|
||||
rps := make([]*reply, 0, n)
|
||||
for i := 0; i < n; i++ {
|
||||
rp, err := c.Receive()
|
||||
rps = append(rps, &reply{reply: rp, err: err})
|
||||
}
|
||||
rs = &Replies{
|
||||
replies: rps,
|
||||
}
|
||||
return
|
||||
}
|
96
pkg/cache/redis/pipeline_test.go
vendored
Normal file
96
pkg/cache/redis/pipeline_test.go
vendored
Normal file
@ -0,0 +1,96 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/bilibili/kratos/pkg/container/pool"
|
||||
xtime "github.com/bilibili/kratos/pkg/time"
|
||||
)
|
||||
|
||||
func TestRedis_Pipeline(t *testing.T) {
|
||||
conf := &Config{
|
||||
Name: "test",
|
||||
Proto: "tcp",
|
||||
Addr: testRedisAddr,
|
||||
DialTimeout: xtime.Duration(1 * time.Second),
|
||||
ReadTimeout: xtime.Duration(1 * time.Second),
|
||||
WriteTimeout: xtime.Duration(1 * time.Second),
|
||||
}
|
||||
conf.Config = &pool.Config{
|
||||
Active: 10,
|
||||
Idle: 2,
|
||||
IdleTimeout: xtime.Duration(90 * time.Second),
|
||||
}
|
||||
|
||||
r := NewRedis(conf)
|
||||
r.Do(context.TODO(), "FLUSHDB")
|
||||
|
||||
p := r.Pipeline()
|
||||
|
||||
for _, cmd := range testCommands {
|
||||
p.Send(cmd.args[0].(string), cmd.args[1:]...)
|
||||
}
|
||||
|
||||
replies, err := p.Exec(context.TODO())
|
||||
|
||||
i := 0
|
||||
for replies.Next() {
|
||||
cmd := testCommands[i]
|
||||
actual, err := replies.Scan()
|
||||
if err != nil {
|
||||
t.Fatalf("Receive(%v) returned error %v", cmd.args, err)
|
||||
}
|
||||
if !reflect.DeepEqual(actual, cmd.expected) {
|
||||
t.Errorf("Receive(%v) = %v, want %v", cmd.args, actual, cmd.expected)
|
||||
}
|
||||
i++
|
||||
}
|
||||
err = r.Close()
|
||||
if err != nil {
|
||||
t.Errorf("Close() error %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func ExamplePipeliner() {
|
||||
r := NewRedis(testConfig)
|
||||
defer r.Close()
|
||||
|
||||
pip := r.Pipeline()
|
||||
pip.Send("SET", "hello", "world")
|
||||
pip.Send("GET", "hello")
|
||||
replies, err := pip.Exec(context.TODO())
|
||||
if err != nil {
|
||||
fmt.Printf("%#v\n", err)
|
||||
}
|
||||
for replies.Next() {
|
||||
s, err := String(replies.Scan())
|
||||
if err != nil {
|
||||
fmt.Printf("err %#v\n", err)
|
||||
}
|
||||
fmt.Printf("%#v\n", s)
|
||||
}
|
||||
// Output:
|
||||
// "OK"
|
||||
// "world"
|
||||
}
|
||||
|
||||
func BenchmarkRedisPipelineExec(b *testing.B) {
|
||||
r := NewRedis(testConfig)
|
||||
defer r.Close()
|
||||
|
||||
r.Do(context.TODO(), "SET", "abcde", "fghiasdfasdf")
|
||||
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
p := r.Pipeline()
|
||||
p.Send("GET", "abcde")
|
||||
_, err := p.Exec(context.TODO())
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
31
pkg/cache/redis/pool.go
vendored
31
pkg/cache/redis/pool.go
vendored
@ -45,24 +45,14 @@ type Pool struct {
|
||||
statfunc func(name, addr, cmd string, t time.Time, err error) func()
|
||||
}
|
||||
|
||||
// Config client settings.
|
||||
type Config struct {
|
||||
*pool.Config
|
||||
|
||||
Name string // redis name, for trace
|
||||
Proto string
|
||||
Addr string
|
||||
Auth string
|
||||
DialTimeout xtime.Duration
|
||||
ReadTimeout xtime.Duration
|
||||
WriteTimeout xtime.Duration
|
||||
}
|
||||
|
||||
// NewPool creates a new pool.
|
||||
func NewPool(c *Config, options ...DialOption) (p *Pool) {
|
||||
if c.DialTimeout <= 0 || c.ReadTimeout <= 0 || c.WriteTimeout <= 0 {
|
||||
panic("must config redis timeout")
|
||||
}
|
||||
if c.SlowLog <= 0 {
|
||||
c.SlowLog = xtime.Duration(250 * time.Millisecond)
|
||||
}
|
||||
ops := []DialOption{
|
||||
DialConnectTimeout(time.Duration(c.DialTimeout)),
|
||||
DialReadTimeout(time.Duration(c.ReadTimeout)),
|
||||
@ -71,12 +61,18 @@ func NewPool(c *Config, options ...DialOption) (p *Pool) {
|
||||
}
|
||||
ops = append(ops, options...)
|
||||
p1 := pool.NewSlice(c.Config)
|
||||
|
||||
// new pool
|
||||
p1.New = func(ctx context.Context) (io.Closer, error) {
|
||||
conn, err := Dial(c.Proto, c.Addr, ops...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &traceConn{Conn: conn, connTags: []trace.Tag{trace.TagString(trace.TagPeerAddress, c.Addr)}}, nil
|
||||
return &traceConn{
|
||||
Conn: conn,
|
||||
connTags: []trace.Tag{trace.TagString(trace.TagPeerAddress, c.Addr)},
|
||||
slowLogThreshold: time.Duration(c.SlowLog),
|
||||
}, nil
|
||||
}
|
||||
p = &Pool{Slice: p1, c: c, statfunc: pstat}
|
||||
return
|
||||
@ -93,7 +89,7 @@ func (p *Pool) Get(ctx context.Context) Conn {
|
||||
return errorConnection{err}
|
||||
}
|
||||
c1, _ := c.(Conn)
|
||||
return &pooledConnection{p: p, c: c1.WithContext(ctx), ctx: ctx, now: beginTime}
|
||||
return &pooledConnection{p: p, c: c1.WithContext(ctx), rc: c1, now: beginTime}
|
||||
}
|
||||
|
||||
// Close releases the resources used by the pool.
|
||||
@ -103,12 +99,12 @@ func (p *Pool) Close() error {
|
||||
|
||||
type pooledConnection struct {
|
||||
p *Pool
|
||||
rc Conn
|
||||
c Conn
|
||||
state int
|
||||
|
||||
now time.Time
|
||||
cmds []string
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
var (
|
||||
@ -180,7 +176,7 @@ func (pc *pooledConnection) Close() error {
|
||||
}
|
||||
}
|
||||
_, err := c.Do("")
|
||||
pc.p.Slice.Put(context.Background(), c, pc.state != 0 || c.Err() != nil)
|
||||
pc.p.Slice.Put(context.Background(), pc.rc, pc.state != 0 || c.Err() != nil)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -227,7 +223,6 @@ func (pc *pooledConnection) Receive() (reply interface{}, err error) {
|
||||
}
|
||||
|
||||
func (pc *pooledConnection) WithContext(ctx context.Context) Conn {
|
||||
pc.ctx = ctx
|
||||
return pc
|
||||
}
|
||||
|
||||
|
540
pkg/cache/redis/pool_test.go
vendored
Normal file
540
pkg/cache/redis/pool_test.go
vendored
Normal file
@ -0,0 +1,540 @@
|
||||
// Copyright 2011 Gary Burd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"): you may
|
||||
// not use this file except in compliance with the License. You may obtain
|
||||
// a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
// License for the specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"io"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/bilibili/kratos/pkg/container/pool"
|
||||
)
|
||||
|
||||
type poolTestConn struct {
|
||||
d *poolDialer
|
||||
err error
|
||||
c Conn
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
func (c *poolTestConn) Flush() error {
|
||||
return c.c.Flush()
|
||||
}
|
||||
|
||||
func (c *poolTestConn) Receive() (reply interface{}, err error) {
|
||||
return c.c.Receive()
|
||||
}
|
||||
|
||||
func (c *poolTestConn) WithContext(ctx context.Context) Conn {
|
||||
c.c.WithContext(ctx)
|
||||
c.ctx = ctx
|
||||
return c
|
||||
}
|
||||
|
||||
func (c *poolTestConn) Close() error {
|
||||
c.d.mu.Lock()
|
||||
c.d.open--
|
||||
c.d.mu.Unlock()
|
||||
return c.c.Close()
|
||||
}
|
||||
|
||||
func (c *poolTestConn) Err() error { return c.err }
|
||||
|
||||
func (c *poolTestConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
||||
if commandName == "ERR" {
|
||||
c.err = args[0].(error)
|
||||
commandName = "PING"
|
||||
}
|
||||
if commandName != "" {
|
||||
c.d.commands = append(c.d.commands, commandName)
|
||||
}
|
||||
return c.c.Do(commandName, args...)
|
||||
}
|
||||
|
||||
func (c *poolTestConn) Send(commandName string, args ...interface{}) error {
|
||||
c.d.commands = append(c.d.commands, commandName)
|
||||
return c.c.Send(commandName, args...)
|
||||
}
|
||||
|
||||
type poolDialer struct {
|
||||
mu sync.Mutex
|
||||
t *testing.T
|
||||
dialed int
|
||||
open int
|
||||
commands []string
|
||||
dialErr error
|
||||
}
|
||||
|
||||
func (d *poolDialer) dial() (Conn, error) {
|
||||
d.mu.Lock()
|
||||
d.dialed += 1
|
||||
dialErr := d.dialErr
|
||||
d.mu.Unlock()
|
||||
if dialErr != nil {
|
||||
return nil, d.dialErr
|
||||
}
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
d.mu.Lock()
|
||||
d.open += 1
|
||||
d.mu.Unlock()
|
||||
return &poolTestConn{d: d, c: c}, nil
|
||||
}
|
||||
|
||||
func (d *poolDialer) check(message string, p *Pool, dialed, open int) {
|
||||
d.mu.Lock()
|
||||
if d.dialed != dialed {
|
||||
d.t.Errorf("%s: dialed=%d, want %d", message, d.dialed, dialed)
|
||||
}
|
||||
if d.open != open {
|
||||
d.t.Errorf("%s: open=%d, want %d", message, d.open, open)
|
||||
}
|
||||
// if active := p.ActiveCount(); active != open {
|
||||
// d.t.Errorf("%s: active=%d, want %d", message, active, open)
|
||||
// }
|
||||
d.mu.Unlock()
|
||||
}
|
||||
|
||||
func TestPoolReuse(t *testing.T) {
|
||||
d := poolDialer{t: t}
|
||||
p := NewPool(testConfig)
|
||||
p.Slice.New = func(ctx context.Context) (io.Closer, error) {
|
||||
return d.dial()
|
||||
}
|
||||
var err error
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
c1 := p.Get(context.TODO())
|
||||
c1.Do("PING")
|
||||
c2 := p.Get(context.TODO())
|
||||
c2.Do("PING")
|
||||
c1.Close()
|
||||
c2.Close()
|
||||
|
||||
}
|
||||
|
||||
d.check("before close", p, 2, 2)
|
||||
err = p.Close()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
d.check("after close", p, 2, 0)
|
||||
}
|
||||
|
||||
func TestPoolMaxIdle(t *testing.T) {
|
||||
d := poolDialer{t: t}
|
||||
p := NewPool(testConfig)
|
||||
p.Slice.New = func(ctx context.Context) (io.Closer, error) {
|
||||
return d.dial()
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
for i := 0; i < 10; i++ {
|
||||
c1 := p.Get(context.TODO())
|
||||
c1.Do("PING")
|
||||
c2 := p.Get(context.TODO())
|
||||
c2.Do("PING")
|
||||
c3 := p.Get(context.TODO())
|
||||
c3.Do("PING")
|
||||
c1.Close()
|
||||
c2.Close()
|
||||
c3.Close()
|
||||
}
|
||||
d.check("before close", p, 12, 2)
|
||||
p.Close()
|
||||
d.check("after close", p, 12, 0)
|
||||
}
|
||||
|
||||
func TestPoolError(t *testing.T) {
|
||||
d := poolDialer{t: t}
|
||||
p := NewPool(testConfig)
|
||||
p.Slice.New = func(ctx context.Context) (io.Closer, error) {
|
||||
return d.dial()
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
c := p.Get(context.TODO())
|
||||
c.Do("ERR", io.EOF)
|
||||
if c.Err() == nil {
|
||||
t.Errorf("expected c.Err() != nil")
|
||||
}
|
||||
c.Close()
|
||||
|
||||
c = p.Get(context.TODO())
|
||||
c.Do("ERR", io.EOF)
|
||||
c.Close()
|
||||
|
||||
d.check(".", p, 2, 0)
|
||||
}
|
||||
|
||||
func TestPoolClose(t *testing.T) {
|
||||
d := poolDialer{t: t}
|
||||
p := NewPool(testConfig)
|
||||
p.Slice.New = func(ctx context.Context) (io.Closer, error) {
|
||||
return d.dial()
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
c1 := p.Get(context.TODO())
|
||||
c1.Do("PING")
|
||||
c2 := p.Get(context.TODO())
|
||||
c2.Do("PING")
|
||||
c3 := p.Get(context.TODO())
|
||||
c3.Do("PING")
|
||||
|
||||
c1.Close()
|
||||
if _, err := c1.Do("PING"); err == nil {
|
||||
t.Errorf("expected error after connection closed")
|
||||
}
|
||||
|
||||
c2.Close()
|
||||
c2.Close()
|
||||
|
||||
p.Close()
|
||||
|
||||
d.check("after pool close", p, 3, 1)
|
||||
|
||||
if _, err := c1.Do("PING"); err == nil {
|
||||
t.Errorf("expected error after connection and pool closed")
|
||||
}
|
||||
|
||||
c3.Close()
|
||||
|
||||
d.check("after conn close", p, 3, 0)
|
||||
|
||||
c1 = p.Get(context.TODO())
|
||||
if _, err := c1.Do("PING"); err == nil {
|
||||
t.Errorf("expected error after pool closed")
|
||||
}
|
||||
}
|
||||
|
||||
func TestPoolConcurrenSendReceive(t *testing.T) {
|
||||
p := NewPool(testConfig)
|
||||
p.Slice.New = func(ctx context.Context) (io.Closer, error) {
|
||||
return DialDefaultServer()
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
c := p.Get(context.TODO())
|
||||
done := make(chan error, 1)
|
||||
go func() {
|
||||
_, err := c.Receive()
|
||||
done <- err
|
||||
}()
|
||||
c.Send("PING")
|
||||
c.Flush()
|
||||
err := <-done
|
||||
if err != nil {
|
||||
t.Fatalf("Receive() returned error %v", err)
|
||||
}
|
||||
_, err = c.Do("")
|
||||
if err != nil {
|
||||
t.Fatalf("Do() returned error %v", err)
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
|
||||
func TestPoolMaxActive(t *testing.T) {
|
||||
d := poolDialer{t: t}
|
||||
conf := getTestConfig(testRedisAddr)
|
||||
conf.Config = &pool.Config{
|
||||
Active: 2,
|
||||
Idle: 2,
|
||||
}
|
||||
p := NewPool(conf)
|
||||
p.Slice.New = func(ctx context.Context) (io.Closer, error) {
|
||||
return d.dial()
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
c1 := p.Get(context.TODO())
|
||||
c1.Do("PING")
|
||||
c2 := p.Get(context.TODO())
|
||||
c2.Do("PING")
|
||||
|
||||
d.check("1", p, 2, 2)
|
||||
|
||||
c3 := p.Get(context.TODO())
|
||||
if _, err := c3.Do("PING"); err != pool.ErrPoolExhausted {
|
||||
t.Errorf("expected pool exhausted")
|
||||
}
|
||||
|
||||
c3.Close()
|
||||
d.check("2", p, 2, 2)
|
||||
c2.Close()
|
||||
d.check("3", p, 2, 2)
|
||||
|
||||
c3 = p.Get(context.TODO())
|
||||
if _, err := c3.Do("PING"); err != nil {
|
||||
t.Errorf("expected good channel, err=%v", err)
|
||||
}
|
||||
c3.Close()
|
||||
|
||||
d.check("4", p, 2, 2)
|
||||
}
|
||||
|
||||
func TestPoolMonitorCleanup(t *testing.T) {
|
||||
d := poolDialer{t: t}
|
||||
p := NewPool(testConfig)
|
||||
p.Slice.New = func(ctx context.Context) (io.Closer, error) {
|
||||
return d.dial()
|
||||
}
|
||||
defer p.Close()
|
||||
c := p.Get(context.TODO())
|
||||
c.Send("MONITOR")
|
||||
c.Close()
|
||||
|
||||
d.check("", p, 1, 0)
|
||||
}
|
||||
|
||||
func TestPoolPubSubCleanup(t *testing.T) {
|
||||
d := poolDialer{t: t}
|
||||
p := NewPool(testConfig)
|
||||
p.Slice.New = func(ctx context.Context) (io.Closer, error) {
|
||||
return d.dial()
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
c := p.Get(context.TODO())
|
||||
c.Send("SUBSCRIBE", "x")
|
||||
c.Close()
|
||||
|
||||
want := []string{"SUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE", "ECHO"}
|
||||
if !reflect.DeepEqual(d.commands, want) {
|
||||
t.Errorf("got commands %v, want %v", d.commands, want)
|
||||
}
|
||||
d.commands = nil
|
||||
|
||||
c = p.Get(context.TODO())
|
||||
c.Send("PSUBSCRIBE", "x*")
|
||||
c.Close()
|
||||
|
||||
want = []string{"PSUBSCRIBE", "UNSUBSCRIBE", "PUNSUBSCRIBE", "ECHO"}
|
||||
if !reflect.DeepEqual(d.commands, want) {
|
||||
t.Errorf("got commands %v, want %v", d.commands, want)
|
||||
}
|
||||
d.commands = nil
|
||||
}
|
||||
|
||||
func TestPoolTransactionCleanup(t *testing.T) {
|
||||
d := poolDialer{t: t}
|
||||
p := NewPool(testConfig)
|
||||
p.Slice.New = func(ctx context.Context) (io.Closer, error) {
|
||||
return d.dial()
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
c := p.Get(context.TODO())
|
||||
c.Do("WATCH", "key")
|
||||
c.Do("PING")
|
||||
c.Close()
|
||||
|
||||
want := []string{"WATCH", "PING", "UNWATCH"}
|
||||
if !reflect.DeepEqual(d.commands, want) {
|
||||
t.Errorf("got commands %v, want %v", d.commands, want)
|
||||
}
|
||||
d.commands = nil
|
||||
|
||||
c = p.Get(context.TODO())
|
||||
c.Do("WATCH", "key")
|
||||
c.Do("UNWATCH")
|
||||
c.Do("PING")
|
||||
c.Close()
|
||||
|
||||
want = []string{"WATCH", "UNWATCH", "PING"}
|
||||
if !reflect.DeepEqual(d.commands, want) {
|
||||
t.Errorf("got commands %v, want %v", d.commands, want)
|
||||
}
|
||||
d.commands = nil
|
||||
|
||||
c = p.Get(context.TODO())
|
||||
c.Do("WATCH", "key")
|
||||
c.Do("MULTI")
|
||||
c.Do("PING")
|
||||
c.Close()
|
||||
|
||||
want = []string{"WATCH", "MULTI", "PING", "DISCARD"}
|
||||
if !reflect.DeepEqual(d.commands, want) {
|
||||
t.Errorf("got commands %v, want %v", d.commands, want)
|
||||
}
|
||||
d.commands = nil
|
||||
|
||||
c = p.Get(context.TODO())
|
||||
c.Do("WATCH", "key")
|
||||
c.Do("MULTI")
|
||||
c.Do("DISCARD")
|
||||
c.Do("PING")
|
||||
c.Close()
|
||||
|
||||
want = []string{"WATCH", "MULTI", "DISCARD", "PING"}
|
||||
if !reflect.DeepEqual(d.commands, want) {
|
||||
t.Errorf("got commands %v, want %v", d.commands, want)
|
||||
}
|
||||
d.commands = nil
|
||||
|
||||
c = p.Get(context.TODO())
|
||||
c.Do("WATCH", "key")
|
||||
c.Do("MULTI")
|
||||
c.Do("EXEC")
|
||||
c.Do("PING")
|
||||
c.Close()
|
||||
|
||||
want = []string{"WATCH", "MULTI", "EXEC", "PING"}
|
||||
if !reflect.DeepEqual(d.commands, want) {
|
||||
t.Errorf("got commands %v, want %v", d.commands, want)
|
||||
}
|
||||
d.commands = nil
|
||||
}
|
||||
|
||||
func startGoroutines(p *Pool, cmd string, args ...interface{}) chan error {
|
||||
errs := make(chan error, 10)
|
||||
for i := 0; i < cap(errs); i++ {
|
||||
go func() {
|
||||
c := p.Get(context.TODO())
|
||||
_, err := c.Do(cmd, args...)
|
||||
errs <- err
|
||||
c.Close()
|
||||
}()
|
||||
}
|
||||
|
||||
// Wait for goroutines to block.
|
||||
time.Sleep(time.Second / 4)
|
||||
|
||||
return errs
|
||||
}
|
||||
|
||||
func TestWaitPoolDialError(t *testing.T) {
|
||||
testErr := errors.New("test")
|
||||
d := poolDialer{t: t}
|
||||
config1 := testConfig
|
||||
config1.Config = &pool.Config{
|
||||
Active: 1,
|
||||
Idle: 1,
|
||||
Wait: true,
|
||||
}
|
||||
p := NewPool(config1)
|
||||
p.Slice.New = func(ctx context.Context) (io.Closer, error) {
|
||||
return d.dial()
|
||||
}
|
||||
defer p.Close()
|
||||
|
||||
c := p.Get(context.TODO())
|
||||
errs := startGoroutines(p, "ERR", testErr)
|
||||
d.check("before close", p, 1, 1)
|
||||
|
||||
d.dialErr = errors.New("dial")
|
||||
c.Close()
|
||||
|
||||
nilCount := 0
|
||||
errCount := 0
|
||||
timeout := time.After(2 * time.Second)
|
||||
for i := 0; i < cap(errs); i++ {
|
||||
select {
|
||||
case err := <-errs:
|
||||
switch err {
|
||||
case nil:
|
||||
nilCount++
|
||||
case d.dialErr:
|
||||
errCount++
|
||||
default:
|
||||
t.Fatalf("expected dial error or nil, got %v", err)
|
||||
}
|
||||
case <-timeout:
|
||||
t.Logf("Wait all the time and timeout %d", i)
|
||||
return
|
||||
}
|
||||
}
|
||||
if nilCount != 1 {
|
||||
t.Errorf("expected one nil error, got %d", nilCount)
|
||||
}
|
||||
if errCount != cap(errs)-1 {
|
||||
t.Errorf("expected %d dial erors, got %d", cap(errs)-1, errCount)
|
||||
}
|
||||
d.check("done", p, cap(errs), 0)
|
||||
}
|
||||
|
||||
func BenchmarkPoolGet(b *testing.B) {
|
||||
b.StopTimer()
|
||||
p := NewPool(testConfig)
|
||||
c := p.Get(context.Background())
|
||||
if err := c.Err(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c.Close()
|
||||
defer p.Close()
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c := p.Get(context.Background())
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPoolGetErr(b *testing.B) {
|
||||
b.StopTimer()
|
||||
p := NewPool(testConfig)
|
||||
c := p.Get(context.Background())
|
||||
if err := c.Err(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c.Close()
|
||||
defer p.Close()
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c = p.Get(context.Background())
|
||||
if err := c.Err(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPoolGetPing(b *testing.B) {
|
||||
b.StopTimer()
|
||||
p := NewPool(testConfig)
|
||||
c := p.Get(context.Background())
|
||||
if err := c.Err(); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c.Close()
|
||||
defer p.Close()
|
||||
b.StartTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
c := p.Get(context.Background())
|
||||
if _, err := c.Do("PING"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkPooledConn(b *testing.B) {
|
||||
p := NewPool(testConfig)
|
||||
defer p.Close()
|
||||
for i := 0; i < b.N; i++ {
|
||||
ctx := context.TODO()
|
||||
c := p.Get(ctx)
|
||||
c2 := c.WithContext(context.TODO())
|
||||
if _, err := c2.Do("PING"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c2.Close()
|
||||
}
|
||||
}
|
146
pkg/cache/redis/pubsub_test.go
vendored
Normal file
146
pkg/cache/redis/pubsub_test.go
vendored
Normal file
@ -0,0 +1,146 @@
|
||||
// Copyright 2012 Gary Burd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"): you may
|
||||
// not use this file except in compliance with the License. You may obtain
|
||||
// a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
// License for the specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package redis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"sync"
|
||||
"testing"
|
||||
)
|
||||
|
||||
func publish(channel, value interface{}) {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
c.Do("PUBLISH", channel, value)
|
||||
}
|
||||
|
||||
// Applications can receive pushed messages from one goroutine and manage subscriptions from another goroutine.
|
||||
func ExamplePubSubConn() {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(2)
|
||||
|
||||
psc := PubSubConn{Conn: c}
|
||||
|
||||
// This goroutine receives and prints pushed notifications from the server.
|
||||
// The goroutine exits when the connection is unsubscribed from all
|
||||
// channels or there is an error.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for {
|
||||
switch n := psc.Receive().(type) {
|
||||
case Message:
|
||||
fmt.Printf("Message: %s %s\n", n.Channel, n.Data)
|
||||
case PMessage:
|
||||
fmt.Printf("PMessage: %s %s %s\n", n.Pattern, n.Channel, n.Data)
|
||||
case Subscription:
|
||||
fmt.Printf("Subscription: %s %s %d\n", n.Kind, n.Channel, n.Count)
|
||||
if n.Count == 0 {
|
||||
return
|
||||
}
|
||||
case error:
|
||||
fmt.Printf("error: %v\n", n)
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// This goroutine manages subscriptions for the connection.
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
|
||||
psc.Subscribe("example")
|
||||
psc.PSubscribe("p*")
|
||||
|
||||
// The following function calls publish a message using another
|
||||
// connection to the Redis server.
|
||||
publish("example", "hello")
|
||||
publish("example", "world")
|
||||
publish("pexample", "foo")
|
||||
publish("pexample", "bar")
|
||||
|
||||
// Unsubscribe from all connections. This will cause the receiving
|
||||
// goroutine to exit.
|
||||
psc.Unsubscribe()
|
||||
psc.PUnsubscribe()
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
|
||||
// Output:
|
||||
// Subscription: subscribe example 1
|
||||
// Subscription: psubscribe p* 2
|
||||
// Message: example hello
|
||||
// Message: example world
|
||||
// PMessage: p* pexample foo
|
||||
// PMessage: p* pexample bar
|
||||
// Subscription: unsubscribe example 1
|
||||
// Subscription: punsubscribe p* 0
|
||||
}
|
||||
|
||||
func expectPushed(t *testing.T, c PubSubConn, message string, expected interface{}) {
|
||||
actual := c.Receive()
|
||||
if !reflect.DeepEqual(actual, expected) {
|
||||
t.Errorf("%s = %v, want %v", message, actual, expected)
|
||||
}
|
||||
}
|
||||
|
||||
func TestPushed(t *testing.T) {
|
||||
pc, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
t.Fatalf("error connection to database, %v", err)
|
||||
}
|
||||
defer pc.Close()
|
||||
|
||||
sc, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
t.Fatalf("error connection to database, %v", err)
|
||||
}
|
||||
defer sc.Close()
|
||||
|
||||
c := PubSubConn{Conn: sc}
|
||||
|
||||
c.Subscribe("c1")
|
||||
expectPushed(t, c, "Subscribe(c1)", Subscription{Kind: "subscribe", Channel: "c1", Count: 1})
|
||||
c.Subscribe("c2")
|
||||
expectPushed(t, c, "Subscribe(c2)", Subscription{Kind: "subscribe", Channel: "c2", Count: 2})
|
||||
c.PSubscribe("p1")
|
||||
expectPushed(t, c, "PSubscribe(p1)", Subscription{Kind: "psubscribe", Channel: "p1", Count: 3})
|
||||
c.PSubscribe("p2")
|
||||
expectPushed(t, c, "PSubscribe(p2)", Subscription{Kind: "psubscribe", Channel: "p2", Count: 4})
|
||||
c.PUnsubscribe()
|
||||
expectPushed(t, c, "Punsubscribe(p1)", Subscription{Kind: "punsubscribe", Channel: "p1", Count: 3})
|
||||
expectPushed(t, c, "Punsubscribe()", Subscription{Kind: "punsubscribe", Channel: "p2", Count: 2})
|
||||
|
||||
pc.Do("PUBLISH", "c1", "hello")
|
||||
expectPushed(t, c, "PUBLISH c1 hello", Message{Channel: "c1", Data: []byte("hello")})
|
||||
|
||||
c.Ping("hello")
|
||||
expectPushed(t, c, `Ping("hello")`, Pong{"hello"})
|
||||
|
||||
c.Conn.Send("PING")
|
||||
c.Conn.Flush()
|
||||
expectPushed(t, c, `Send("PING")`, Pong{})
|
||||
}
|
75
pkg/cache/redis/redis.go
vendored
75
pkg/cache/redis/redis.go
vendored
@ -16,6 +16,9 @@ package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/bilibili/kratos/pkg/container/pool"
|
||||
xtime "github.com/bilibili/kratos/pkg/time"
|
||||
)
|
||||
|
||||
// Error represents an error returned in a command reply.
|
||||
@ -23,29 +26,53 @@ type Error string
|
||||
|
||||
func (err Error) Error() string { return string(err) }
|
||||
|
||||
// Conn represents a connection to a Redis server.
|
||||
type Conn interface {
|
||||
// Close closes the connection.
|
||||
Close() error
|
||||
// Config client settings.
|
||||
type Config struct {
|
||||
*pool.Config
|
||||
|
||||
// Err returns a non-nil value if the connection is broken. The returned
|
||||
// value is either the first non-nil value returned from the underlying
|
||||
// network connection or a protocol parsing error. Applications should
|
||||
// close broken connections.
|
||||
Err() error
|
||||
|
||||
// Do sends a command to the server and returns the received reply.
|
||||
Do(commandName string, args ...interface{}) (reply interface{}, err error)
|
||||
|
||||
// Send writes the command to the client's output buffer.
|
||||
Send(commandName string, args ...interface{}) error
|
||||
|
||||
// Flush flushes the output buffer to the Redis server.
|
||||
Flush() error
|
||||
|
||||
// Receive receives a single reply from the Redis server
|
||||
Receive() (reply interface{}, err error)
|
||||
|
||||
// WithContext
|
||||
WithContext(ctx context.Context) Conn
|
||||
Name string // redis name, for trace
|
||||
Proto string
|
||||
Addr string
|
||||
Auth string
|
||||
DialTimeout xtime.Duration
|
||||
ReadTimeout xtime.Duration
|
||||
WriteTimeout xtime.Duration
|
||||
SlowLog xtime.Duration
|
||||
}
|
||||
|
||||
type Redis struct {
|
||||
pool *Pool
|
||||
conf *Config
|
||||
}
|
||||
|
||||
func NewRedis(c *Config, options ...DialOption) *Redis {
|
||||
return &Redis{
|
||||
pool: NewPool(c, options...),
|
||||
conf: c,
|
||||
}
|
||||
}
|
||||
|
||||
// Do gets a new conn from pool, then execute Do with this conn, finally close this conn.
|
||||
// ATTENTION: Don't use this method with transaction command like MULTI etc. Because every Do will close conn automatically, use r.Conn to get a raw conn for this situation.
|
||||
func (r *Redis) Do(ctx context.Context, commandName string, args ...interface{}) (reply interface{}, err error) {
|
||||
conn := r.pool.Get(ctx)
|
||||
defer conn.Close()
|
||||
reply, err = conn.Do(commandName, args...)
|
||||
return
|
||||
}
|
||||
|
||||
// Close closes connection pool
|
||||
func (r *Redis) Close() error {
|
||||
return r.pool.Close()
|
||||
}
|
||||
|
||||
// Conn direct gets a connection
|
||||
func (r *Redis) Conn(ctx context.Context) Conn {
|
||||
return r.pool.Get(ctx)
|
||||
}
|
||||
|
||||
func (r *Redis) Pipeline() (p Pipeliner) {
|
||||
return &pipeliner{
|
||||
pool: r.pool,
|
||||
}
|
||||
}
|
||||
|
324
pkg/cache/redis/redis_test.go
vendored
Normal file
324
pkg/cache/redis/redis_test.go
vendored
Normal file
@ -0,0 +1,324 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/bilibili/kratos/pkg/container/pool"
|
||||
xtime "github.com/bilibili/kratos/pkg/time"
|
||||
)
|
||||
|
||||
func TestRedis(t *testing.T) {
|
||||
testSet(t, testPool)
|
||||
testSend(t, testPool)
|
||||
testGet(t, testPool)
|
||||
testErr(t, testPool)
|
||||
if err := testPool.Close(); err != nil {
|
||||
t.Errorf("redis: close error(%v)", err)
|
||||
}
|
||||
conn, err := NewConn(testConfig)
|
||||
if err != nil {
|
||||
t.Errorf("redis: new conn error(%v)", err)
|
||||
}
|
||||
if err := conn.Close(); err != nil {
|
||||
t.Errorf("redis: close error(%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
func testSet(t *testing.T, p *Pool) {
|
||||
var (
|
||||
key = "test"
|
||||
value = "test"
|
||||
conn = p.Get(context.TODO())
|
||||
)
|
||||
defer conn.Close()
|
||||
if reply, err := conn.Do("set", key, value); err != nil {
|
||||
t.Errorf("redis: conn.Do(SET, %s, %s) error(%v)", key, value, err)
|
||||
} else {
|
||||
t.Logf("redis: set status: %s", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func testSend(t *testing.T, p *Pool) {
|
||||
var (
|
||||
key = "test"
|
||||
value = "test"
|
||||
expire = 1000
|
||||
conn = p.Get(context.TODO())
|
||||
)
|
||||
defer conn.Close()
|
||||
if err := conn.Send("SET", key, value); err != nil {
|
||||
t.Errorf("redis: conn.Send(SET, %s, %s) error(%v)", key, value, err)
|
||||
}
|
||||
if err := conn.Send("EXPIRE", key, expire); err != nil {
|
||||
t.Errorf("redis: conn.Send(EXPIRE key(%s) expire(%d)) error(%v)", key, expire, err)
|
||||
}
|
||||
if err := conn.Flush(); err != nil {
|
||||
t.Errorf("redis: conn.Flush error(%v)", err)
|
||||
}
|
||||
for i := 0; i < 2; i++ {
|
||||
if _, err := conn.Receive(); err != nil {
|
||||
t.Errorf("redis: conn.Receive error(%v)", err)
|
||||
return
|
||||
}
|
||||
}
|
||||
t.Logf("redis: set value: %s", value)
|
||||
}
|
||||
|
||||
func testGet(t *testing.T, p *Pool) {
|
||||
var (
|
||||
key = "test"
|
||||
conn = p.Get(context.TODO())
|
||||
)
|
||||
defer conn.Close()
|
||||
if reply, err := conn.Do("GET", key); err != nil {
|
||||
t.Errorf("redis: conn.Do(GET, %s) error(%v)", key, err)
|
||||
} else {
|
||||
t.Logf("redis: get value: %s", reply)
|
||||
}
|
||||
}
|
||||
|
||||
func testErr(t *testing.T, p *Pool) {
|
||||
conn := p.Get(context.TODO())
|
||||
if err := conn.Close(); err != nil {
|
||||
t.Errorf("redis: close error(%v)", err)
|
||||
}
|
||||
if err := conn.Err(); err == nil {
|
||||
t.Errorf("redis: err not nil")
|
||||
} else {
|
||||
t.Logf("redis: err: %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRedis(b *testing.B) {
|
||||
conf := &Config{
|
||||
Name: "test",
|
||||
Proto: "tcp",
|
||||
Addr: testRedisAddr,
|
||||
DialTimeout: xtime.Duration(time.Second),
|
||||
ReadTimeout: xtime.Duration(time.Second),
|
||||
WriteTimeout: xtime.Duration(time.Second),
|
||||
}
|
||||
conf.Config = &pool.Config{
|
||||
Active: 10,
|
||||
Idle: 5,
|
||||
IdleTimeout: xtime.Duration(90 * time.Second),
|
||||
}
|
||||
benchmarkPool := NewPool(conf)
|
||||
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
conn := benchmarkPool.Get(context.TODO())
|
||||
if err := conn.Close(); err != nil {
|
||||
b.Errorf("redis: close error(%v)", err)
|
||||
}
|
||||
}
|
||||
})
|
||||
if err := benchmarkPool.Close(); err != nil {
|
||||
b.Errorf("redis: close error(%v)", err)
|
||||
}
|
||||
}
|
||||
|
||||
var testRedisCommands = []struct {
|
||||
args []interface{}
|
||||
expected interface{}
|
||||
}{
|
||||
{
|
||||
[]interface{}{"PING"},
|
||||
"PONG",
|
||||
},
|
||||
{
|
||||
[]interface{}{"SET", "foo", "bar"},
|
||||
"OK",
|
||||
},
|
||||
{
|
||||
[]interface{}{"GET", "foo"},
|
||||
[]byte("bar"),
|
||||
},
|
||||
{
|
||||
[]interface{}{"GET", "nokey"},
|
||||
nil,
|
||||
},
|
||||
{
|
||||
[]interface{}{"MGET", "nokey", "foo"},
|
||||
[]interface{}{nil, []byte("bar")},
|
||||
},
|
||||
{
|
||||
[]interface{}{"INCR", "mycounter"},
|
||||
int64(1),
|
||||
},
|
||||
{
|
||||
[]interface{}{"LPUSH", "mylist", "foo"},
|
||||
int64(1),
|
||||
},
|
||||
{
|
||||
[]interface{}{"LPUSH", "mylist", "bar"},
|
||||
int64(2),
|
||||
},
|
||||
{
|
||||
[]interface{}{"LRANGE", "mylist", 0, -1},
|
||||
[]interface{}{[]byte("bar"), []byte("foo")},
|
||||
},
|
||||
}
|
||||
|
||||
func TestNewRedis(t *testing.T) {
|
||||
type args struct {
|
||||
c *Config
|
||||
options []DialOption
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
args args
|
||||
wantErr bool
|
||||
}{
|
||||
{
|
||||
"new_redis",
|
||||
args{
|
||||
testConfig,
|
||||
make([]DialOption, 0),
|
||||
},
|
||||
false,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
r := NewRedis(tt.args.c, tt.args.options...)
|
||||
if r == nil {
|
||||
t.Errorf("NewRedis() error, got nil")
|
||||
return
|
||||
}
|
||||
err := r.Close()
|
||||
if err != nil {
|
||||
t.Errorf("Close() error %v", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func TestRedis_Do(t *testing.T) {
|
||||
r := NewRedis(testConfig)
|
||||
r.Do(context.TODO(), "FLUSHDB")
|
||||
|
||||
for _, cmd := range testRedisCommands {
|
||||
actual, err := r.Do(context.TODO(), cmd.args[0].(string), cmd.args[1:]...)
|
||||
if err != nil {
|
||||
t.Errorf("Do(%v) returned error %v", cmd.args, err)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(actual, cmd.expected) {
|
||||
t.Errorf("Do(%v) = %v, want %v", cmd.args, actual, cmd.expected)
|
||||
}
|
||||
}
|
||||
err := r.Close()
|
||||
if err != nil {
|
||||
t.Errorf("Close() error %v", err)
|
||||
}
|
||||
}
|
||||
|
||||
func TestRedis_Conn(t *testing.T) {
|
||||
|
||||
type args struct {
|
||||
ctx context.Context
|
||||
}
|
||||
tests := []struct {
|
||||
name string
|
||||
p *Redis
|
||||
args args
|
||||
wantErr bool
|
||||
g int
|
||||
c int
|
||||
}{
|
||||
{
|
||||
"Close",
|
||||
NewRedis(&Config{
|
||||
Config: &pool.Config{
|
||||
Active: 1,
|
||||
Idle: 1,
|
||||
},
|
||||
Name: "test_get",
|
||||
Proto: "tcp",
|
||||
Addr: testRedisAddr,
|
||||
DialTimeout: xtime.Duration(time.Second),
|
||||
ReadTimeout: xtime.Duration(time.Second),
|
||||
WriteTimeout: xtime.Duration(time.Second),
|
||||
}),
|
||||
args{context.TODO()},
|
||||
false,
|
||||
3,
|
||||
3,
|
||||
},
|
||||
{
|
||||
"CloseExceededPoolSize",
|
||||
NewRedis(&Config{
|
||||
Config: &pool.Config{
|
||||
Active: 1,
|
||||
Idle: 1,
|
||||
},
|
||||
Name: "test_get_out",
|
||||
Proto: "tcp",
|
||||
Addr: testRedisAddr,
|
||||
DialTimeout: xtime.Duration(time.Second),
|
||||
ReadTimeout: xtime.Duration(time.Second),
|
||||
WriteTimeout: xtime.Duration(time.Second),
|
||||
}),
|
||||
args{context.TODO()},
|
||||
true,
|
||||
5,
|
||||
3,
|
||||
},
|
||||
}
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
for i := 1; i <= tt.g; i++ {
|
||||
got := tt.p.Conn(tt.args.ctx)
|
||||
if err := got.Close(); err != nil {
|
||||
if !tt.wantErr {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
if i <= tt.c {
|
||||
if err := got.Close(); err != nil {
|
||||
t.Error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRedisDoPing(b *testing.B) {
|
||||
r := NewRedis(testConfig)
|
||||
defer r.Close()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if _, err := r.Do(context.Background(), "PING"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRedisDoSET(b *testing.B) {
|
||||
r := NewRedis(testConfig)
|
||||
defer r.Close()
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if _, err := r.Do(context.Background(), "SET", "a", "b"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func BenchmarkRedisDoGET(b *testing.B) {
|
||||
r := NewRedis(testConfig)
|
||||
defer r.Close()
|
||||
r.Do(context.Background(), "SET", "a", "b")
|
||||
b.ResetTimer()
|
||||
for i := 0; i < b.N; i++ {
|
||||
if _, err := r.Do(context.Background(), "GET", "b"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
}
|
179
pkg/cache/redis/reply_test.go
vendored
Normal file
179
pkg/cache/redis/reply_test.go
vendored
Normal file
@ -0,0 +1,179 @@
|
||||
// Copyright 2012 Gary Burd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"): you may
|
||||
// not use this file except in compliance with the License. You may obtain
|
||||
// a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
// License for the specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package redis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
|
||||
"github.com/pkg/errors"
|
||||
)
|
||||
|
||||
type valueError struct {
|
||||
v interface{}
|
||||
err error
|
||||
}
|
||||
|
||||
func ve(v interface{}, err error) valueError {
|
||||
return valueError{v, err}
|
||||
}
|
||||
|
||||
var replyTests = []struct {
|
||||
name interface{}
|
||||
actual valueError
|
||||
expected valueError
|
||||
}{
|
||||
{
|
||||
"ints([v1, v2])",
|
||||
ve(Ints([]interface{}{[]byte("4"), []byte("5")}, nil)),
|
||||
ve([]int{4, 5}, nil),
|
||||
},
|
||||
{
|
||||
"ints(nil)",
|
||||
ve(Ints(nil, nil)),
|
||||
ve([]int(nil), ErrNil),
|
||||
},
|
||||
{
|
||||
"strings([v1, v2])",
|
||||
ve(Strings([]interface{}{[]byte("v1"), []byte("v2")}, nil)),
|
||||
ve([]string{"v1", "v2"}, nil),
|
||||
},
|
||||
{
|
||||
"strings(nil)",
|
||||
ve(Strings(nil, nil)),
|
||||
ve([]string(nil), ErrNil),
|
||||
},
|
||||
{
|
||||
"byteslices([v1, v2])",
|
||||
ve(ByteSlices([]interface{}{[]byte("v1"), []byte("v2")}, nil)),
|
||||
ve([][]byte{[]byte("v1"), []byte("v2")}, nil),
|
||||
},
|
||||
{
|
||||
"byteslices(nil)",
|
||||
ve(ByteSlices(nil, nil)),
|
||||
ve([][]byte(nil), ErrNil),
|
||||
},
|
||||
{
|
||||
"values([v1, v2])",
|
||||
ve(Values([]interface{}{[]byte("v1"), []byte("v2")}, nil)),
|
||||
ve([]interface{}{[]byte("v1"), []byte("v2")}, nil),
|
||||
},
|
||||
{
|
||||
"values(nil)",
|
||||
ve(Values(nil, nil)),
|
||||
ve([]interface{}(nil), ErrNil),
|
||||
},
|
||||
{
|
||||
"float64(1.0)",
|
||||
ve(Float64([]byte("1.0"), nil)),
|
||||
ve(float64(1.0), nil),
|
||||
},
|
||||
{
|
||||
"float64(nil)",
|
||||
ve(Float64(nil, nil)),
|
||||
ve(float64(0.0), ErrNil),
|
||||
},
|
||||
{
|
||||
"uint64(1)",
|
||||
ve(Uint64(int64(1), nil)),
|
||||
ve(uint64(1), nil),
|
||||
},
|
||||
{
|
||||
"uint64(-1)",
|
||||
ve(Uint64(int64(-1), nil)),
|
||||
ve(uint64(0), errNegativeInt),
|
||||
},
|
||||
}
|
||||
|
||||
func TestReply(t *testing.T) {
|
||||
for _, rt := range replyTests {
|
||||
if errors.Cause(rt.actual.err) != rt.expected.err {
|
||||
t.Errorf("%s returned err %v, want %v", rt.name, rt.actual.err, rt.expected.err)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(rt.actual.v, rt.expected.v) {
|
||||
t.Errorf("%s=%+v, want %+v", rt.name, rt.actual.v, rt.expected.v)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// dial wraps DialDefaultServer() with a more suitable function name for examples.
|
||||
func dial() (Conn, error) {
|
||||
return DialDefaultServer()
|
||||
}
|
||||
|
||||
func ExampleBool() {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
c.Do("SET", "foo", 1)
|
||||
exists, _ := Bool(c.Do("EXISTS", "foo"))
|
||||
fmt.Printf("%#v\n", exists)
|
||||
// Output:
|
||||
// true
|
||||
}
|
||||
|
||||
func ExampleInt() {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
c.Do("SET", "k1", 1)
|
||||
n, _ := Int(c.Do("GET", "k1"))
|
||||
fmt.Printf("%#v\n", n)
|
||||
n, _ = Int(c.Do("INCR", "k1"))
|
||||
fmt.Printf("%#v\n", n)
|
||||
// Output:
|
||||
// 1
|
||||
// 2
|
||||
}
|
||||
|
||||
func ExampleInts() {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
c.Do("SADD", "set_with_integers", 4, 5, 6)
|
||||
ints, _ := Ints(c.Do("SMEMBERS", "set_with_integers"))
|
||||
fmt.Printf("%#v\n", ints)
|
||||
// Output:
|
||||
// []int{4, 5, 6}
|
||||
}
|
||||
|
||||
func ExampleString() {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
c.Do("SET", "hello", "world")
|
||||
s, _ := String(c.Do("GET", "hello"))
|
||||
fmt.Printf("%#v", s)
|
||||
// Output:
|
||||
// "world"
|
||||
}
|
438
pkg/cache/redis/scan_test.go
vendored
Normal file
438
pkg/cache/redis/scan_test.go
vendored
Normal file
@ -0,0 +1,438 @@
|
||||
// Copyright 2012 Gary Burd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"): you may
|
||||
// not use this file except in compliance with the License. You may obtain
|
||||
// a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
// License for the specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package redis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"math"
|
||||
"reflect"
|
||||
"testing"
|
||||
)
|
||||
|
||||
var scanConversionTests = []struct {
|
||||
src interface{}
|
||||
dest interface{}
|
||||
}{
|
||||
{[]byte("-inf"), math.Inf(-1)},
|
||||
{[]byte("+inf"), math.Inf(1)},
|
||||
{[]byte("0"), float64(0)},
|
||||
{[]byte("3.14159"), float64(3.14159)},
|
||||
{[]byte("3.14"), float32(3.14)},
|
||||
{[]byte("-100"), int(-100)},
|
||||
{[]byte("101"), int(101)},
|
||||
{int64(102), int(102)},
|
||||
{[]byte("103"), uint(103)},
|
||||
{int64(104), uint(104)},
|
||||
{[]byte("105"), int8(105)},
|
||||
{int64(106), int8(106)},
|
||||
{[]byte("107"), uint8(107)},
|
||||
{int64(108), uint8(108)},
|
||||
{[]byte("0"), false},
|
||||
{int64(0), false},
|
||||
{[]byte("f"), false},
|
||||
{[]byte("1"), true},
|
||||
{int64(1), true},
|
||||
{[]byte("t"), true},
|
||||
{"hello", "hello"},
|
||||
{[]byte("hello"), "hello"},
|
||||
{[]byte("world"), []byte("world")},
|
||||
{[]interface{}{[]byte("foo")}, []interface{}{[]byte("foo")}},
|
||||
{[]interface{}{[]byte("foo")}, []string{"foo"}},
|
||||
{[]interface{}{[]byte("hello"), []byte("world")}, []string{"hello", "world"}},
|
||||
{[]interface{}{[]byte("bar")}, [][]byte{[]byte("bar")}},
|
||||
{[]interface{}{[]byte("1")}, []int{1}},
|
||||
{[]interface{}{[]byte("1"), []byte("2")}, []int{1, 2}},
|
||||
{[]interface{}{[]byte("1"), []byte("2")}, []float64{1, 2}},
|
||||
{[]interface{}{[]byte("1")}, []byte{1}},
|
||||
{[]interface{}{[]byte("1")}, []bool{true}},
|
||||
}
|
||||
|
||||
func TestScanConversion(t *testing.T) {
|
||||
for _, tt := range scanConversionTests {
|
||||
values := []interface{}{tt.src}
|
||||
dest := reflect.New(reflect.TypeOf(tt.dest))
|
||||
values, err := Scan(values, dest.Interface())
|
||||
if err != nil {
|
||||
t.Errorf("Scan(%v) returned error %v", tt, err)
|
||||
continue
|
||||
}
|
||||
if !reflect.DeepEqual(tt.dest, dest.Elem().Interface()) {
|
||||
t.Errorf("Scan(%v) returned %v values: %v, want %v", tt, dest.Elem().Interface(), values, tt.dest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var scanConversionErrorTests = []struct {
|
||||
src interface{}
|
||||
dest interface{}
|
||||
}{
|
||||
{[]byte("1234"), byte(0)},
|
||||
{int64(1234), byte(0)},
|
||||
{[]byte("-1"), byte(0)},
|
||||
{int64(-1), byte(0)},
|
||||
{[]byte("junk"), false},
|
||||
{Error("blah"), false},
|
||||
}
|
||||
|
||||
func TestScanConversionError(t *testing.T) {
|
||||
for _, tt := range scanConversionErrorTests {
|
||||
values := []interface{}{tt.src}
|
||||
dest := reflect.New(reflect.TypeOf(tt.dest))
|
||||
values, err := Scan(values, dest.Interface())
|
||||
if err == nil {
|
||||
t.Errorf("Scan(%v) did not return error values: %v", tt, values)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleScan() {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
c.Send("HMSET", "album:1", "title", "Red", "rating", 5)
|
||||
c.Send("HMSET", "album:2", "title", "Earthbound", "rating", 1)
|
||||
c.Send("HMSET", "album:3", "title", "Beat")
|
||||
c.Send("LPUSH", "albums", "1")
|
||||
c.Send("LPUSH", "albums", "2")
|
||||
c.Send("LPUSH", "albums", "3")
|
||||
values, err := Values(c.Do("SORT", "albums",
|
||||
"BY", "album:*->rating",
|
||||
"GET", "album:*->title",
|
||||
"GET", "album:*->rating"))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
for len(values) > 0 {
|
||||
var title string
|
||||
rating := -1 // initialize to illegal value to detect nil.
|
||||
values, err = Scan(values, &title, &rating)
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
if rating == -1 {
|
||||
fmt.Println(title, "not-rated")
|
||||
} else {
|
||||
fmt.Println(title, rating)
|
||||
}
|
||||
}
|
||||
// Output:
|
||||
// Beat not-rated
|
||||
// Earthbound 1
|
||||
// Red 5
|
||||
}
|
||||
|
||||
type s0 struct {
|
||||
X int
|
||||
Y int `redis:"y"`
|
||||
Bt bool
|
||||
}
|
||||
|
||||
type s1 struct {
|
||||
X int `redis:"-"`
|
||||
I int `redis:"i"`
|
||||
U uint `redis:"u"`
|
||||
S string `redis:"s"`
|
||||
P []byte `redis:"p"`
|
||||
B bool `redis:"b"`
|
||||
Bt bool
|
||||
Bf bool
|
||||
s0
|
||||
}
|
||||
|
||||
var scanStructTests = []struct {
|
||||
title string
|
||||
reply []string
|
||||
value interface{}
|
||||
}{
|
||||
{"basic",
|
||||
[]string{"i", "-1234", "u", "5678", "s", "hello", "p", "world", "b", "t", "Bt", "1", "Bf", "0", "X", "123", "y", "456"},
|
||||
&s1{I: -1234, U: 5678, S: "hello", P: []byte("world"), B: true, Bt: true, Bf: false, s0: s0{X: 123, Y: 456}},
|
||||
},
|
||||
}
|
||||
|
||||
func TestScanStruct(t *testing.T) {
|
||||
for _, tt := range scanStructTests {
|
||||
|
||||
var reply []interface{}
|
||||
for _, v := range tt.reply {
|
||||
reply = append(reply, []byte(v))
|
||||
}
|
||||
|
||||
value := reflect.New(reflect.ValueOf(tt.value).Type().Elem())
|
||||
|
||||
if err := ScanStruct(reply, value.Interface()); err != nil {
|
||||
t.Fatalf("ScanStruct(%s) returned error %v", tt.title, err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(value.Interface(), tt.value) {
|
||||
t.Fatalf("ScanStruct(%s) returned %v, want %v", tt.title, value.Interface(), tt.value)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func TestBadScanStructArgs(t *testing.T) {
|
||||
x := []interface{}{"A", "b"}
|
||||
test := func(v interface{}) {
|
||||
if err := ScanStruct(x, v); err == nil {
|
||||
t.Errorf("Expect error for ScanStruct(%T, %T)", x, v)
|
||||
}
|
||||
}
|
||||
|
||||
test(nil)
|
||||
|
||||
var v0 *struct{}
|
||||
test(v0)
|
||||
|
||||
var v1 int
|
||||
test(&v1)
|
||||
|
||||
x = x[:1]
|
||||
v2 := struct{ A string }{}
|
||||
test(&v2)
|
||||
}
|
||||
|
||||
var scanSliceTests = []struct {
|
||||
src []interface{}
|
||||
fieldNames []string
|
||||
ok bool
|
||||
dest interface{}
|
||||
}{
|
||||
{
|
||||
[]interface{}{[]byte("1"), nil, []byte("-1")},
|
||||
nil,
|
||||
true,
|
||||
[]int{1, 0, -1},
|
||||
},
|
||||
{
|
||||
[]interface{}{[]byte("1"), nil, []byte("2")},
|
||||
nil,
|
||||
true,
|
||||
[]uint{1, 0, 2},
|
||||
},
|
||||
{
|
||||
[]interface{}{[]byte("-1")},
|
||||
nil,
|
||||
false,
|
||||
[]uint{1},
|
||||
},
|
||||
{
|
||||
[]interface{}{[]byte("hello"), nil, []byte("world")},
|
||||
nil,
|
||||
true,
|
||||
[][]byte{[]byte("hello"), nil, []byte("world")},
|
||||
},
|
||||
{
|
||||
[]interface{}{[]byte("hello"), nil, []byte("world")},
|
||||
nil,
|
||||
true,
|
||||
[]string{"hello", "", "world"},
|
||||
},
|
||||
{
|
||||
[]interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")},
|
||||
nil,
|
||||
true,
|
||||
[]struct{ A, B string }{{"a1", "b1"}, {"a2", "b2"}},
|
||||
},
|
||||
{
|
||||
[]interface{}{[]byte("a1"), []byte("b1")},
|
||||
nil,
|
||||
false,
|
||||
[]struct{ A, B, C string }{{"a1", "b1", ""}},
|
||||
},
|
||||
{
|
||||
[]interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")},
|
||||
nil,
|
||||
true,
|
||||
[]*struct{ A, B string }{{"a1", "b1"}, {"a2", "b2"}},
|
||||
},
|
||||
{
|
||||
[]interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")},
|
||||
[]string{"A", "B"},
|
||||
true,
|
||||
[]struct{ A, C, B string }{{"a1", "", "b1"}, {"a2", "", "b2"}},
|
||||
},
|
||||
{
|
||||
[]interface{}{[]byte("a1"), []byte("b1"), []byte("a2"), []byte("b2")},
|
||||
nil,
|
||||
false,
|
||||
[]struct{}{},
|
||||
},
|
||||
}
|
||||
|
||||
func TestScanSlice(t *testing.T) {
|
||||
for _, tt := range scanSliceTests {
|
||||
|
||||
typ := reflect.ValueOf(tt.dest).Type()
|
||||
dest := reflect.New(typ)
|
||||
|
||||
err := ScanSlice(tt.src, dest.Interface(), tt.fieldNames...)
|
||||
if tt.ok != (err == nil) {
|
||||
t.Errorf("ScanSlice(%v, []%s, %v) returned error %v", tt.src, typ, tt.fieldNames, err)
|
||||
continue
|
||||
}
|
||||
if tt.ok && !reflect.DeepEqual(dest.Elem().Interface(), tt.dest) {
|
||||
t.Errorf("ScanSlice(src, []%s) returned %#v, want %#v", typ, dest.Elem().Interface(), tt.dest)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleScanSlice() {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
c.Send("HMSET", "album:1", "title", "Red", "rating", 5)
|
||||
c.Send("HMSET", "album:2", "title", "Earthbound", "rating", 1)
|
||||
c.Send("HMSET", "album:3", "title", "Beat", "rating", 4)
|
||||
c.Send("LPUSH", "albums", "1")
|
||||
c.Send("LPUSH", "albums", "2")
|
||||
c.Send("LPUSH", "albums", "3")
|
||||
values, err := Values(c.Do("SORT", "albums",
|
||||
"BY", "album:*->rating",
|
||||
"GET", "album:*->title",
|
||||
"GET", "album:*->rating"))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
var albums []struct {
|
||||
Title string
|
||||
Rating int
|
||||
}
|
||||
if err := ScanSlice(values, &albums); err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
fmt.Printf("%v\n", albums)
|
||||
// Output:
|
||||
// [{Earthbound 1} {Beat 4} {Red 5}]
|
||||
}
|
||||
|
||||
var argsTests = []struct {
|
||||
title string
|
||||
actual Args
|
||||
expected Args
|
||||
}{
|
||||
{"struct ptr",
|
||||
Args{}.AddFlat(&struct {
|
||||
I int `redis:"i"`
|
||||
U uint `redis:"u"`
|
||||
S string `redis:"s"`
|
||||
P []byte `redis:"p"`
|
||||
M map[string]string `redis:"m"`
|
||||
Bt bool
|
||||
Bf bool
|
||||
}{
|
||||
-1234, 5678, "hello", []byte("world"), map[string]string{"hello": "world"}, true, false,
|
||||
}),
|
||||
Args{"i", int(-1234), "u", uint(5678), "s", "hello", "p", []byte("world"), "m", map[string]string{"hello": "world"}, "Bt", true, "Bf", false},
|
||||
},
|
||||
{"struct",
|
||||
Args{}.AddFlat(struct{ I int }{123}),
|
||||
Args{"I", 123},
|
||||
},
|
||||
{"slice",
|
||||
Args{}.Add(1).AddFlat([]string{"a", "b", "c"}).Add(2),
|
||||
Args{1, "a", "b", "c", 2},
|
||||
},
|
||||
{"struct omitempty",
|
||||
Args{}.AddFlat(&struct {
|
||||
I int `redis:"i,omitempty"`
|
||||
U uint `redis:"u,omitempty"`
|
||||
S string `redis:"s,omitempty"`
|
||||
P []byte `redis:"p,omitempty"`
|
||||
M map[string]string `redis:"m,omitempty"`
|
||||
Bt bool `redis:"Bt,omitempty"`
|
||||
Bf bool `redis:"Bf,omitempty"`
|
||||
}{
|
||||
0, 0, "", []byte{}, map[string]string{}, true, false,
|
||||
}),
|
||||
Args{"Bt", true},
|
||||
},
|
||||
}
|
||||
|
||||
func TestArgs(t *testing.T) {
|
||||
for _, tt := range argsTests {
|
||||
if !reflect.DeepEqual(tt.actual, tt.expected) {
|
||||
t.Fatalf("%s is %v, want %v", tt.title, tt.actual, tt.expected)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func ExampleArgs() {
|
||||
c, err := dial()
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
var p1, p2 struct {
|
||||
Title string `redis:"title"`
|
||||
Author string `redis:"author"`
|
||||
Body string `redis:"body"`
|
||||
}
|
||||
|
||||
p1.Title = "Example"
|
||||
p1.Author = "Gary"
|
||||
p1.Body = "Hello"
|
||||
|
||||
if _, err := c.Do("HMSET", Args{}.Add("id1").AddFlat(&p1)...); err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
m := map[string]string{
|
||||
"title": "Example2",
|
||||
"author": "Steve",
|
||||
"body": "Map",
|
||||
}
|
||||
|
||||
if _, err := c.Do("HMSET", Args{}.Add("id2").AddFlat(m)...); err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
for _, id := range []string{"id1", "id2"} {
|
||||
|
||||
v, err := Values(c.Do("HGETALL", id))
|
||||
if err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
if err := ScanStruct(v, &p2); err != nil {
|
||||
fmt.Println(err)
|
||||
return
|
||||
}
|
||||
|
||||
fmt.Printf("%+v\n", p2)
|
||||
}
|
||||
|
||||
// Output:
|
||||
// {Title:Example Author:Gary Body:Hello}
|
||||
// {Title:Example2 Author:Steve Body:Map}
|
||||
}
|
103
pkg/cache/redis/script_test.go
vendored
Normal file
103
pkg/cache/redis/script_test.go
vendored
Normal file
@ -0,0 +1,103 @@
|
||||
// Copyright 2012 Gary Burd
|
||||
//
|
||||
// Licensed under the Apache License, Version 2.0 (the "License"): you may
|
||||
// not use this file except in compliance with the License. You may obtain
|
||||
// a copy of the License at
|
||||
//
|
||||
// http://www.apache.org/licenses/LICENSE-2.0
|
||||
//
|
||||
// Unless required by applicable law or agreed to in writing, software
|
||||
// distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
|
||||
// WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
|
||||
// License for the specific language governing permissions and limitations
|
||||
// under the License.
|
||||
|
||||
package redis
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"reflect"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
func ExampleScript() {
|
||||
c, err := Dial("tcp", ":6379")
|
||||
if err != nil {
|
||||
// handle error
|
||||
}
|
||||
defer c.Close()
|
||||
// Initialize a package-level variable with a script.
|
||||
var getScript = NewScript(1, `return call('get', KEYS[1])`)
|
||||
|
||||
// In a function, use the script Do method to evaluate the script. The Do
|
||||
// method optimistically uses the EVALSHA command. If the script is not
|
||||
// loaded, then the Do method falls back to the EVAL command.
|
||||
if _, err = getScript.Do(c, "foo"); err != nil {
|
||||
// handle error
|
||||
}
|
||||
}
|
||||
|
||||
func TestScript(t *testing.T) {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
t.Fatalf("error connection to database, %v", err)
|
||||
}
|
||||
defer c.Close()
|
||||
|
||||
// To test fall back in Do, we make script unique by adding comment with current time.
|
||||
script := fmt.Sprintf("--%d\nreturn {KEYS[1],KEYS[2],ARGV[1],ARGV[2]}", time.Now().UnixNano())
|
||||
s := NewScript(2, script)
|
||||
reply := []interface{}{[]byte("key1"), []byte("key2"), []byte("arg1"), []byte("arg2")}
|
||||
|
||||
v, err := s.Do(c, "key1", "key2", "arg1", "arg2")
|
||||
if err != nil {
|
||||
t.Errorf("s.Do(c, ...) returned %v", err)
|
||||
}
|
||||
|
||||
if !reflect.DeepEqual(v, reply) {
|
||||
t.Errorf("s.Do(c, ..); = %v, want %v", v, reply)
|
||||
}
|
||||
|
||||
err = s.Load(c)
|
||||
if err != nil {
|
||||
t.Errorf("s.Load(c) returned %v", err)
|
||||
}
|
||||
|
||||
err = s.SendHash(c, "key1", "key2", "arg1", "arg2")
|
||||
if err != nil {
|
||||
t.Errorf("s.SendHash(c, ...) returned %v", err)
|
||||
}
|
||||
|
||||
err = c.Flush()
|
||||
if err != nil {
|
||||
t.Errorf("c.Flush() returned %v", err)
|
||||
}
|
||||
|
||||
v, err = c.Receive()
|
||||
if err != nil {
|
||||
t.Errorf("c.Receive() returned %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(v, reply) {
|
||||
t.Errorf("s.SendHash(c, ..); c.Receive() = %v, want %v", v, reply)
|
||||
}
|
||||
|
||||
err = s.Send(c, "key1", "key2", "arg1", "arg2")
|
||||
if err != nil {
|
||||
t.Errorf("s.Send(c, ...) returned %v", err)
|
||||
}
|
||||
|
||||
err = c.Flush()
|
||||
if err != nil {
|
||||
t.Errorf("c.Flush() returned %v", err)
|
||||
}
|
||||
|
||||
v, err = c.Receive()
|
||||
if err != nil {
|
||||
t.Errorf("c.Receive() returned %v", err)
|
||||
}
|
||||
if !reflect.DeepEqual(v, reply) {
|
||||
t.Errorf("s.Send(c, ..); c.Receive() = %v, want %v", v, reply)
|
||||
}
|
||||
|
||||
}
|
12
pkg/cache/redis/test/docker-compose.yaml
vendored
Normal file
12
pkg/cache/redis/test/docker-compose.yaml
vendored
Normal file
@ -0,0 +1,12 @@
|
||||
version: "3.7"
|
||||
|
||||
services:
|
||||
redis:
|
||||
image: redis
|
||||
ports:
|
||||
- 6379:6379
|
||||
healthcheck:
|
||||
test: ["CMD", "redis-cli","ping"]
|
||||
interval: 20s
|
||||
timeout: 1s
|
||||
retries: 20
|
44
pkg/cache/redis/trace.go
vendored
44
pkg/cache/redis/trace.go
vendored
@ -10,10 +10,9 @@ import (
|
||||
)
|
||||
|
||||
const (
|
||||
_traceComponentName = "pkg/cache/redis"
|
||||
_traceComponentName = "library/cache/redis"
|
||||
_tracePeerService = "redis"
|
||||
_traceSpanKind = "client"
|
||||
_slowLogDuration = time.Millisecond * 250
|
||||
)
|
||||
|
||||
var _internalTags = []trace.Tag{
|
||||
@ -24,26 +23,28 @@ var _internalTags = []trace.Tag{
|
||||
|
||||
type traceConn struct {
|
||||
// tr for pipeline, if tr != nil meaning on pipeline
|
||||
tr trace.Trace
|
||||
ctx context.Context
|
||||
tr trace.Trace
|
||||
// connTag include e.g. ip,port
|
||||
connTags []trace.Tag
|
||||
|
||||
ctx context.Context
|
||||
|
||||
// origin redis conn
|
||||
Conn
|
||||
pending int
|
||||
// TODO: split slow log from trace.
|
||||
slowLogThreshold time.Duration
|
||||
}
|
||||
|
||||
func (t *traceConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
||||
statement := getStatement(commandName, args...)
|
||||
defer slowLog(statement, time.Now())
|
||||
root, ok := trace.FromContext(t.ctx)
|
||||
defer t.slowLog(statement, time.Now())
|
||||
// NOTE: ignored empty commandName
|
||||
// current sdk will Do empty command after pipeline finished
|
||||
if !ok || commandName == "" {
|
||||
if t.tr == nil || commandName == "" {
|
||||
return t.Conn.Do(commandName, args...)
|
||||
}
|
||||
tr := root.Fork("", "Redis:"+commandName)
|
||||
tr := t.tr.Fork("", "Redis:"+commandName)
|
||||
tr.SetTag(_internalTags...)
|
||||
tr.SetTag(t.connTags...)
|
||||
tr.SetTag(trace.TagString(trace.TagDBStatement, statement))
|
||||
@ -52,16 +53,15 @@ func (t *traceConn) Do(commandName string, args ...interface{}) (reply interface
|
||||
return
|
||||
}
|
||||
|
||||
func (t *traceConn) Send(commandName string, args ...interface{}) error {
|
||||
func (t *traceConn) Send(commandName string, args ...interface{}) (err error) {
|
||||
statement := getStatement(commandName, args...)
|
||||
defer slowLog(statement, time.Now())
|
||||
defer t.slowLog(statement, time.Now())
|
||||
t.pending++
|
||||
root, ok := trace.FromContext(t.ctx)
|
||||
if !ok {
|
||||
if t.tr == nil {
|
||||
return t.Conn.Send(commandName, args...)
|
||||
}
|
||||
if t.tr == nil {
|
||||
t.tr = root.Fork("", "Redis:Pipeline")
|
||||
if t.pending == 1 {
|
||||
t.tr = t.tr.Fork("", "Redis:Pipeline")
|
||||
t.tr.SetTag(_internalTags...)
|
||||
t.tr.SetTag(t.connTags...)
|
||||
}
|
||||
@ -69,8 +69,7 @@ func (t *traceConn) Send(commandName string, args ...interface{}) error {
|
||||
trace.Log(trace.LogEvent, "Send"),
|
||||
trace.Log("db.statement", statement),
|
||||
)
|
||||
err := t.Conn.Send(commandName, args...)
|
||||
if err != nil {
|
||||
if err = t.Conn.Send(commandName, args...); err != nil {
|
||||
t.tr.SetTag(trace.TagBool(trace.TagError, true))
|
||||
t.tr.SetLog(
|
||||
trace.Log(trace.LogEvent, "Send Fail"),
|
||||
@ -81,7 +80,7 @@ func (t *traceConn) Send(commandName string, args ...interface{}) error {
|
||||
}
|
||||
|
||||
func (t *traceConn) Flush() error {
|
||||
defer slowLog("Flush", time.Now())
|
||||
defer t.slowLog("Flush", time.Now())
|
||||
if t.tr == nil {
|
||||
return t.Conn.Flush()
|
||||
}
|
||||
@ -98,7 +97,7 @@ func (t *traceConn) Flush() error {
|
||||
}
|
||||
|
||||
func (t *traceConn) Receive() (reply interface{}, err error) {
|
||||
defer slowLog("Receive", time.Now())
|
||||
defer t.slowLog("Receive", time.Now())
|
||||
if t.tr == nil {
|
||||
return t.Conn.Receive()
|
||||
}
|
||||
@ -122,13 +121,16 @@ func (t *traceConn) Receive() (reply interface{}, err error) {
|
||||
}
|
||||
|
||||
func (t *traceConn) WithContext(ctx context.Context) Conn {
|
||||
t.ctx = ctx
|
||||
t.Conn = t.Conn.WithContext(ctx)
|
||||
if root, ok := trace.FromContext(ctx); ok {
|
||||
t.tr = root
|
||||
}
|
||||
return t
|
||||
}
|
||||
|
||||
func slowLog(statement string, now time.Time) {
|
||||
func (t *traceConn) slowLog(statement string, now time.Time) {
|
||||
du := time.Since(now)
|
||||
if du > _slowLogDuration {
|
||||
if du > t.slowLogThreshold {
|
||||
log.Warn("%s slow log statement: %s time: %v", _tracePeerService, statement, du)
|
||||
}
|
||||
}
|
||||
|
192
pkg/cache/redis/trace_test.go
vendored
Normal file
192
pkg/cache/redis/trace_test.go
vendored
Normal file
@ -0,0 +1,192 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/bilibili/kratos/pkg/net/trace"
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
const testTraceSlowLogThreshold = time.Duration(250 * time.Millisecond)
|
||||
|
||||
type mockTrace struct {
|
||||
tags []trace.Tag
|
||||
logs []trace.LogField
|
||||
perr *error
|
||||
operationName string
|
||||
finished bool
|
||||
}
|
||||
|
||||
func (m *mockTrace) Fork(serviceName string, operationName string) trace.Trace {
|
||||
m.operationName = operationName
|
||||
return m
|
||||
}
|
||||
func (m *mockTrace) Follow(serviceName string, operationName string) trace.Trace {
|
||||
panic("not implemented")
|
||||
}
|
||||
func (m *mockTrace) Finish(err *error) {
|
||||
m.perr = err
|
||||
m.finished = true
|
||||
}
|
||||
func (m *mockTrace) SetTag(tags ...trace.Tag) trace.Trace {
|
||||
m.tags = append(m.tags, tags...)
|
||||
return m
|
||||
}
|
||||
func (m *mockTrace) SetLog(logs ...trace.LogField) trace.Trace {
|
||||
m.logs = append(m.logs, logs...)
|
||||
return m
|
||||
}
|
||||
func (m *mockTrace) Visit(fn func(k, v string)) {}
|
||||
func (m *mockTrace) SetTitle(title string) {}
|
||||
func (m *mockTrace) TraceID() string { return "" }
|
||||
|
||||
type mockConn struct{}
|
||||
|
||||
func (c *mockConn) Close() error { return nil }
|
||||
func (c *mockConn) Err() error { return nil }
|
||||
func (c *mockConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
||||
return nil, nil
|
||||
}
|
||||
func (c *mockConn) Send(commandName string, args ...interface{}) error { return nil }
|
||||
func (c *mockConn) Flush() error { return nil }
|
||||
func (c *mockConn) Receive() (reply interface{}, err error) { return nil, nil }
|
||||
func (c *mockConn) WithContext(context.Context) Conn { return c }
|
||||
|
||||
func TestTraceDo(t *testing.T) {
|
||||
tr := &mockTrace{}
|
||||
ctx := trace.NewContext(context.Background(), tr)
|
||||
tc := &traceConn{Conn: &mockConn{}, slowLogThreshold: testTraceSlowLogThreshold}
|
||||
conn := tc.WithContext(ctx)
|
||||
|
||||
conn.Do("GET", "test")
|
||||
|
||||
assert.Equal(t, "Redis:GET", tr.operationName)
|
||||
assert.NotEmpty(t, tr.tags)
|
||||
assert.True(t, tr.finished)
|
||||
}
|
||||
|
||||
func TestTraceDoErr(t *testing.T) {
|
||||
tr := &mockTrace{}
|
||||
ctx := trace.NewContext(context.Background(), tr)
|
||||
tc := &traceConn{Conn: MockErr{Error: fmt.Errorf("hhhhhhh")},
|
||||
slowLogThreshold: testTraceSlowLogThreshold}
|
||||
conn := tc.WithContext(ctx)
|
||||
|
||||
conn.Do("GET", "test")
|
||||
|
||||
assert.Equal(t, "Redis:GET", tr.operationName)
|
||||
assert.True(t, tr.finished)
|
||||
assert.NotNil(t, *tr.perr)
|
||||
}
|
||||
|
||||
func TestTracePipeline(t *testing.T) {
|
||||
tr := &mockTrace{}
|
||||
ctx := trace.NewContext(context.Background(), tr)
|
||||
tc := &traceConn{Conn: &mockConn{}, slowLogThreshold: testTraceSlowLogThreshold}
|
||||
conn := tc.WithContext(ctx)
|
||||
|
||||
N := 2
|
||||
for i := 0; i < N; i++ {
|
||||
conn.Send("GET", "hello, world")
|
||||
}
|
||||
conn.Flush()
|
||||
for i := 0; i < N; i++ {
|
||||
conn.Receive()
|
||||
}
|
||||
|
||||
assert.Equal(t, "Redis:Pipeline", tr.operationName)
|
||||
assert.NotEmpty(t, tr.tags)
|
||||
assert.NotEmpty(t, tr.logs)
|
||||
assert.True(t, tr.finished)
|
||||
}
|
||||
|
||||
func TestTracePipelineErr(t *testing.T) {
|
||||
tr := &mockTrace{}
|
||||
ctx := trace.NewContext(context.Background(), tr)
|
||||
tc := &traceConn{Conn: MockErr{Error: fmt.Errorf("hahah")},
|
||||
slowLogThreshold: testTraceSlowLogThreshold}
|
||||
conn := tc.WithContext(ctx)
|
||||
|
||||
N := 2
|
||||
for i := 0; i < N; i++ {
|
||||
conn.Send("GET", "hello, world")
|
||||
}
|
||||
conn.Flush()
|
||||
for i := 0; i < N; i++ {
|
||||
conn.Receive()
|
||||
}
|
||||
|
||||
assert.Equal(t, "Redis:Pipeline", tr.operationName)
|
||||
assert.NotEmpty(t, tr.tags)
|
||||
assert.NotEmpty(t, tr.logs)
|
||||
assert.True(t, tr.finished)
|
||||
var isError bool
|
||||
for _, tag := range tr.tags {
|
||||
if tag.Key == "error" {
|
||||
isError = true
|
||||
}
|
||||
}
|
||||
assert.True(t, isError)
|
||||
}
|
||||
|
||||
func TestSendStatement(t *testing.T) {
|
||||
tr := &mockTrace{}
|
||||
ctx := trace.NewContext(context.Background(), tr)
|
||||
tc := &traceConn{Conn: MockErr{Error: fmt.Errorf("hahah")},
|
||||
slowLogThreshold: testTraceSlowLogThreshold}
|
||||
conn := tc.WithContext(ctx)
|
||||
conn.Send("SET", "hello", "test")
|
||||
conn.Flush()
|
||||
conn.Receive()
|
||||
|
||||
assert.Equal(t, "Redis:Pipeline", tr.operationName)
|
||||
assert.NotEmpty(t, tr.tags)
|
||||
assert.NotEmpty(t, tr.logs)
|
||||
assert.Equal(t, "event", tr.logs[0].Key)
|
||||
assert.Equal(t, "Send", tr.logs[0].Value)
|
||||
assert.Equal(t, "db.statement", tr.logs[1].Key)
|
||||
assert.Equal(t, "SET hello", tr.logs[1].Value)
|
||||
assert.True(t, tr.finished)
|
||||
var isError bool
|
||||
for _, tag := range tr.tags {
|
||||
if tag.Key == "error" {
|
||||
isError = true
|
||||
}
|
||||
}
|
||||
assert.True(t, isError)
|
||||
}
|
||||
|
||||
func TestDoStatement(t *testing.T) {
|
||||
tr := &mockTrace{}
|
||||
ctx := trace.NewContext(context.Background(), tr)
|
||||
tc := &traceConn{Conn: MockErr{Error: fmt.Errorf("hahah")},
|
||||
slowLogThreshold: testTraceSlowLogThreshold}
|
||||
conn := tc.WithContext(ctx)
|
||||
conn.Do("SET", "hello", "test")
|
||||
|
||||
assert.Equal(t, "Redis:SET", tr.operationName)
|
||||
assert.Equal(t, "SET hello", tr.tags[len(tr.tags)-1].Value)
|
||||
assert.True(t, tr.finished)
|
||||
}
|
||||
|
||||
func BenchmarkTraceConn(b *testing.B) {
|
||||
for i := 0; i < b.N; i++ {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
t := &traceConn{
|
||||
Conn: c,
|
||||
connTags: []trace.Tag{trace.TagString(trace.TagPeerAddress, "abc")},
|
||||
slowLogThreshold: time.Duration(1 * time.Second),
|
||||
}
|
||||
c2 := t.WithContext(context.TODO())
|
||||
if _, err := c2.Do("PING"); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
c2.Close()
|
||||
}
|
||||
}
|
17
pkg/cache/redis/util.go
vendored
Normal file
17
pkg/cache/redis/util.go
vendored
Normal file
@ -0,0 +1,17 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"time"
|
||||
)
|
||||
|
||||
func shrinkDeadline(ctx context.Context, timeout time.Duration) time.Time {
|
||||
var timeoutTime = time.Now().Add(timeout)
|
||||
if ctx == nil {
|
||||
return timeoutTime
|
||||
}
|
||||
if deadline, ok := ctx.Deadline(); ok && timeoutTime.After(deadline) {
|
||||
return deadline
|
||||
}
|
||||
return timeoutTime
|
||||
}
|
37
pkg/cache/redis/util_test.go
vendored
Normal file
37
pkg/cache/redis/util_test.go
vendored
Normal file
@ -0,0 +1,37 @@
|
||||
package redis
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/stretchr/testify/assert"
|
||||
)
|
||||
|
||||
func TestShrinkDeadline(t *testing.T) {
|
||||
t.Run("test not deadline", func(t *testing.T) {
|
||||
timeout := time.Second
|
||||
timeoutTime := time.Now().Add(timeout)
|
||||
tm := shrinkDeadline(context.Background(), timeout)
|
||||
assert.True(t, tm.After(timeoutTime))
|
||||
})
|
||||
t.Run("test big deadline", func(t *testing.T) {
|
||||
timeout := time.Second
|
||||
timeoutTime := time.Now().Add(timeout)
|
||||
deadlineTime := time.Now().Add(2 * time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||
defer cancel()
|
||||
|
||||
tm := shrinkDeadline(ctx, timeout)
|
||||
assert.True(t, tm.After(timeoutTime) && tm.Before(deadlineTime))
|
||||
})
|
||||
t.Run("test small deadline", func(t *testing.T) {
|
||||
timeout := time.Second
|
||||
deadlineTime := time.Now().Add(500 * time.Millisecond)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
|
||||
defer cancel()
|
||||
|
||||
tm := shrinkDeadline(ctx, timeout)
|
||||
assert.True(t, tm.After(deadlineTime) && tm.Before(time.Now().Add(timeout)))
|
||||
})
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user