mirror of
https://github.com/go-kratos/kratos.git
synced 2025-01-24 03:46:37 +02:00
fix trace pending
This commit is contained in:
parent
9636e6bcf0
commit
b0159d695e
7
pkg/cache/redis/errors.go
vendored
7
pkg/cache/redis/errors.go
vendored
@ -20,6 +20,9 @@ func formatErr(err error, name, addr string) string {
|
||||
case strings.HasPrefix(es, "read"):
|
||||
return "read timeout"
|
||||
case strings.HasPrefix(es, "dial"):
|
||||
if strings.Contains(es, "connection refused") {
|
||||
return "connection refused"
|
||||
}
|
||||
return "dial timeout"
|
||||
case strings.HasPrefix(es, "write"):
|
||||
return "write timeout"
|
||||
@ -29,6 +32,10 @@ func formatErr(err error, name, addr string) string {
|
||||
return "reset"
|
||||
case strings.Contains(es, "broken"):
|
||||
return "broken pipe"
|
||||
case strings.Contains(es, "pool exhausted"):
|
||||
return "pool exhausted"
|
||||
case strings.Contains(es, "pool closed"):
|
||||
return "pool closed"
|
||||
default:
|
||||
return "unexpected err"
|
||||
}
|
||||
|
52
pkg/cache/redis/trace.go
vendored
52
pkg/cache/redis/trace.go
vendored
@ -22,13 +22,14 @@ var _internalTags = []trace.Tag{
|
||||
}
|
||||
|
||||
type traceConn struct {
|
||||
// tr for pipeline, if tr != nil meaning on pipeline
|
||||
// tr parent trace.
|
||||
tr trace.Trace
|
||||
// trPipe for pipeline, if trPipe != nil meaning on pipeline.
|
||||
trPipe trace.Trace
|
||||
|
||||
// connTag include e.g. ip,port
|
||||
connTags []trace.Tag
|
||||
|
||||
ctx context.Context
|
||||
|
||||
// origin redis conn
|
||||
Conn
|
||||
pending int
|
||||
@ -39,9 +40,15 @@ type traceConn struct {
|
||||
func (t *traceConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
|
||||
statement := getStatement(commandName, args...)
|
||||
defer t.slowLog(statement, time.Now())
|
||||
|
||||
// NOTE: ignored empty commandName
|
||||
// current sdk will Do empty command after pipeline finished
|
||||
if t.tr == nil || commandName == "" {
|
||||
if commandName == "" {
|
||||
t.pending = 0
|
||||
t.trPipe = nil
|
||||
return t.Conn.Do(commandName, args...)
|
||||
}
|
||||
if t.tr == nil {
|
||||
return t.Conn.Do(commandName, args...)
|
||||
}
|
||||
tr := t.tr.Fork("", "Redis:"+commandName)
|
||||
@ -60,18 +67,19 @@ func (t *traceConn) Send(commandName string, args ...interface{}) (err error) {
|
||||
if t.tr == nil {
|
||||
return t.Conn.Send(commandName, args...)
|
||||
}
|
||||
if t.pending == 1 {
|
||||
t.tr = t.tr.Fork("", "Redis:Pipeline")
|
||||
t.tr.SetTag(_internalTags...)
|
||||
t.tr.SetTag(t.connTags...)
|
||||
|
||||
if t.trPipe == nil {
|
||||
t.trPipe = t.tr.Fork("", "Redis:Pipeline")
|
||||
t.trPipe.SetTag(_internalTags...)
|
||||
t.trPipe.SetTag(t.connTags...)
|
||||
}
|
||||
t.tr.SetLog(
|
||||
t.trPipe.SetLog(
|
||||
trace.Log(trace.LogEvent, "Send"),
|
||||
trace.Log("db.statement", statement),
|
||||
)
|
||||
if err = t.Conn.Send(commandName, args...); err != nil {
|
||||
t.tr.SetTag(trace.TagBool(trace.TagError, true))
|
||||
t.tr.SetLog(
|
||||
t.trPipe.SetTag(trace.TagBool(trace.TagError, true))
|
||||
t.trPipe.SetLog(
|
||||
trace.Log(trace.LogEvent, "Send Fail"),
|
||||
trace.Log(trace.LogMessage, err.Error()),
|
||||
)
|
||||
@ -81,14 +89,14 @@ func (t *traceConn) Send(commandName string, args ...interface{}) (err error) {
|
||||
|
||||
func (t *traceConn) Flush() error {
|
||||
defer t.slowLog("Flush", time.Now())
|
||||
if t.tr == nil {
|
||||
if t.trPipe == nil {
|
||||
return t.Conn.Flush()
|
||||
}
|
||||
t.tr.SetLog(trace.Log(trace.LogEvent, "Flush"))
|
||||
t.trPipe.SetLog(trace.Log(trace.LogEvent, "Flush"))
|
||||
err := t.Conn.Flush()
|
||||
if err != nil {
|
||||
t.tr.SetTag(trace.TagBool(trace.TagError, true))
|
||||
t.tr.SetLog(
|
||||
t.trPipe.SetTag(trace.TagBool(trace.TagError, true))
|
||||
t.trPipe.SetLog(
|
||||
trace.Log(trace.LogEvent, "Flush Fail"),
|
||||
trace.Log(trace.LogMessage, err.Error()),
|
||||
)
|
||||
@ -98,14 +106,14 @@ func (t *traceConn) Flush() error {
|
||||
|
||||
func (t *traceConn) Receive() (reply interface{}, err error) {
|
||||
defer t.slowLog("Receive", time.Now())
|
||||
if t.tr == nil {
|
||||
if t.trPipe == nil {
|
||||
return t.Conn.Receive()
|
||||
}
|
||||
t.tr.SetLog(trace.Log(trace.LogEvent, "Receive"))
|
||||
t.trPipe.SetLog(trace.Log(trace.LogEvent, "Receive"))
|
||||
reply, err = t.Conn.Receive()
|
||||
if err != nil {
|
||||
t.tr.SetTag(trace.TagBool(trace.TagError, true))
|
||||
t.tr.SetLog(
|
||||
t.trPipe.SetTag(trace.TagBool(trace.TagError, true))
|
||||
t.trPipe.SetLog(
|
||||
trace.Log(trace.LogEvent, "Receive Fail"),
|
||||
trace.Log(trace.LogMessage, err.Error()),
|
||||
)
|
||||
@ -114,8 +122,8 @@ func (t *traceConn) Receive() (reply interface{}, err error) {
|
||||
t.pending--
|
||||
}
|
||||
if t.pending == 0 {
|
||||
t.tr.Finish(nil)
|
||||
t.tr = nil
|
||||
t.trPipe.Finish(nil)
|
||||
t.trPipe = nil
|
||||
}
|
||||
return reply, err
|
||||
}
|
||||
@ -123,6 +131,8 @@ func (t *traceConn) Receive() (reply interface{}, err error) {
|
||||
func (t *traceConn) WithContext(ctx context.Context) Conn {
|
||||
t.Conn = t.Conn.WithContext(ctx)
|
||||
t.tr, _ = trace.FromContext(ctx)
|
||||
t.pending = 0
|
||||
t.trPipe = nil
|
||||
return t
|
||||
}
|
||||
|
||||
|
20
pkg/cache/redis/trace_test.go
vendored
20
pkg/cache/redis/trace_test.go
vendored
@ -190,3 +190,23 @@ func BenchmarkTraceConn(b *testing.B) {
|
||||
c2.Close()
|
||||
}
|
||||
}
|
||||
|
||||
func TestTraceConnPending(t *testing.T) {
|
||||
c, err := DialDefaultServer()
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tc := &traceConn{
|
||||
Conn: c,
|
||||
connTags: []trace.Tag{trace.TagString(trace.TagPeerAddress, "abc")},
|
||||
slowLogThreshold: time.Duration(1 * time.Second),
|
||||
}
|
||||
err = tc.Send("SET", "a", "x")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
tc.Close()
|
||||
assert.Equal(t, 1, tc.pending)
|
||||
tc.Do("")
|
||||
assert.Equal(t, 0, tc.pending)
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user