1
0
mirror of https://github.com/go-kratos/kratos.git synced 2025-01-10 00:29:01 +02:00

Merge pull request #629 from Windfarer/fix_redis_trace

fix trace pending count
This commit is contained in:
Terry.Mao 2020-09-17 19:42:38 +08:00 committed by GitHub
commit 5af9f75477
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 58 additions and 21 deletions

View File

@ -20,6 +20,9 @@ func formatErr(err error, name, addr string) string {
case strings.HasPrefix(es, "read"):
return "read timeout"
case strings.HasPrefix(es, "dial"):
if strings.Contains(es, "connection refused") {
return "connection refused"
}
return "dial timeout"
case strings.HasPrefix(es, "write"):
return "write timeout"
@ -29,6 +32,10 @@ func formatErr(err error, name, addr string) string {
return "reset"
case strings.Contains(es, "broken"):
return "broken pipe"
case strings.Contains(es, "pool exhausted"):
return "pool exhausted"
case strings.Contains(es, "pool closed"):
return "pool closed"
default:
return "unexpected err"
}

View File

@ -22,13 +22,14 @@ var _internalTags = []trace.Tag{
}
type traceConn struct {
// tr for pipeline, if tr != nil meaning on pipeline
// tr parent trace.
tr trace.Trace
// trPipe for pipeline, if trPipe != nil meaning on pipeline.
trPipe trace.Trace
// connTag include e.g. ip,port
connTags []trace.Tag
ctx context.Context
// origin redis conn
Conn
pending int
@ -39,9 +40,15 @@ type traceConn struct {
func (t *traceConn) Do(commandName string, args ...interface{}) (reply interface{}, err error) {
statement := getStatement(commandName, args...)
defer t.slowLog(statement, time.Now())
// NOTE: ignored empty commandName
// current sdk will Do empty command after pipeline finished
if t.tr == nil || commandName == "" {
if commandName == "" {
t.pending = 0
t.trPipe = nil
return t.Conn.Do(commandName, args...)
}
if t.tr == nil {
return t.Conn.Do(commandName, args...)
}
tr := t.tr.Fork("", "Redis:"+commandName)
@ -60,18 +67,19 @@ func (t *traceConn) Send(commandName string, args ...interface{}) (err error) {
if t.tr == nil {
return t.Conn.Send(commandName, args...)
}
if t.pending == 1 {
t.tr = t.tr.Fork("", "Redis:Pipeline")
t.tr.SetTag(_internalTags...)
t.tr.SetTag(t.connTags...)
if t.trPipe == nil {
t.trPipe = t.tr.Fork("", "Redis:Pipeline")
t.trPipe.SetTag(_internalTags...)
t.trPipe.SetTag(t.connTags...)
}
t.tr.SetLog(
t.trPipe.SetLog(
trace.Log(trace.LogEvent, "Send"),
trace.Log("db.statement", statement),
)
if err = t.Conn.Send(commandName, args...); err != nil {
t.tr.SetTag(trace.TagBool(trace.TagError, true))
t.tr.SetLog(
t.trPipe.SetTag(trace.TagBool(trace.TagError, true))
t.trPipe.SetLog(
trace.Log(trace.LogEvent, "Send Fail"),
trace.Log(trace.LogMessage, err.Error()),
)
@ -81,14 +89,14 @@ func (t *traceConn) Send(commandName string, args ...interface{}) (err error) {
func (t *traceConn) Flush() error {
defer t.slowLog("Flush", time.Now())
if t.tr == nil {
if t.trPipe == nil {
return t.Conn.Flush()
}
t.tr.SetLog(trace.Log(trace.LogEvent, "Flush"))
t.trPipe.SetLog(trace.Log(trace.LogEvent, "Flush"))
err := t.Conn.Flush()
if err != nil {
t.tr.SetTag(trace.TagBool(trace.TagError, true))
t.tr.SetLog(
t.trPipe.SetTag(trace.TagBool(trace.TagError, true))
t.trPipe.SetLog(
trace.Log(trace.LogEvent, "Flush Fail"),
trace.Log(trace.LogMessage, err.Error()),
)
@ -98,14 +106,14 @@ func (t *traceConn) Flush() error {
func (t *traceConn) Receive() (reply interface{}, err error) {
defer t.slowLog("Receive", time.Now())
if t.tr == nil {
if t.trPipe == nil {
return t.Conn.Receive()
}
t.tr.SetLog(trace.Log(trace.LogEvent, "Receive"))
t.trPipe.SetLog(trace.Log(trace.LogEvent, "Receive"))
reply, err = t.Conn.Receive()
if err != nil {
t.tr.SetTag(trace.TagBool(trace.TagError, true))
t.tr.SetLog(
t.trPipe.SetTag(trace.TagBool(trace.TagError, true))
t.trPipe.SetLog(
trace.Log(trace.LogEvent, "Receive Fail"),
trace.Log(trace.LogMessage, err.Error()),
)
@ -114,8 +122,8 @@ func (t *traceConn) Receive() (reply interface{}, err error) {
t.pending--
}
if t.pending == 0 {
t.tr.Finish(nil)
t.tr = nil
t.trPipe.Finish(nil)
t.trPipe = nil
}
return reply, err
}
@ -123,6 +131,8 @@ func (t *traceConn) Receive() (reply interface{}, err error) {
func (t *traceConn) WithContext(ctx context.Context) Conn {
t.Conn = t.Conn.WithContext(ctx)
t.tr, _ = trace.FromContext(ctx)
t.pending = 0
t.trPipe = nil
return t
}

View File

@ -190,3 +190,23 @@ func BenchmarkTraceConn(b *testing.B) {
c2.Close()
}
}
func TestTraceConnPending(t *testing.T) {
c, err := DialDefaultServer()
if err != nil {
t.Fatal(err)
}
tc := &traceConn{
Conn: c,
connTags: []trace.Tag{trace.TagString(trace.TagPeerAddress, "abc")},
slowLogThreshold: time.Duration(1 * time.Second),
}
err = tc.Send("SET", "a", "x")
if err != nil {
t.Fatal(err)
}
tc.Close()
assert.Equal(t, 1, tc.pending)
tc.Do("")
assert.Equal(t, 0, tc.pending)
}