remove stats.Stop() and fix flaky stats test (#1016)

This commit is contained in:
Menghan Li
2016-12-07 17:33:01 -08:00
committed by Qi Zhao
parent 708a7f9f32
commit 7484960149
3 changed files with 183 additions and 422 deletions

View File

@ -140,12 +140,6 @@ func Start() {
atomic.StoreInt32(on, 1) atomic.StoreInt32(on, 1)
} }
// Stop stops the stats collection and processing.
// Stop does not unregister the handlers.
func Stop() {
atomic.StoreInt32(on, 0)
}
// On indicates whether the stats collection and processing is on. // On indicates whether the stats collection and processing is on.
func On() bool { func On() bool {
return atomic.CompareAndSwapInt32(on, 1, 1) return atomic.CompareAndSwapInt32(on, 1, 1)

View File

@ -40,6 +40,7 @@ import (
"reflect" "reflect"
"sync" "sync"
"testing" "testing"
"time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -53,46 +54,6 @@ func init() {
grpc.EnableTracing = false grpc.EnableTracing = false
} }
func TestStartStop(t *testing.T) {
stats.RegisterRPCHandler(nil)
stats.RegisterConnHandler(nil)
stats.Start()
if stats.On() {
t.Fatalf("stats.Start() with nil handler, stats.On() = true, want false")
}
stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {})
stats.RegisterConnHandler(nil)
stats.Start()
if !stats.On() {
t.Fatalf("stats.Start() with non-nil handler, stats.On() = false, want true")
}
stats.Stop()
stats.RegisterRPCHandler(nil)
stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {})
stats.Start()
if !stats.On() {
t.Fatalf("stats.Start() with non-nil conn handler, stats.On() = false, want true")
}
stats.Stop()
stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {})
stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {})
if stats.On() {
t.Fatalf("after stats.RegisterRPCHandler(), stats.On() = true, want false")
}
stats.Start()
if !stats.On() {
t.Fatalf("after stats.Start(_), stats.On() = false, want true")
}
stats.Stop()
if stats.On() {
t.Fatalf("after stats.Stop(), stats.On() = true, want false")
}
}
type connCtxKey struct{} type connCtxKey struct{}
type rpcCtxKey struct{} type rpcCtxKey struct{}
@ -200,9 +161,6 @@ type test struct {
t *testing.T t *testing.T
compress string compress string
ctx context.Context // valid for life of test, before tearDown
cancel context.CancelFunc
testServer testpb.TestServiceServer // nil means none testServer testpb.TestServiceServer // nil means none
// srv and srvAddr are set once startServer is called. // srv and srvAddr are set once startServer is called.
srv *grpc.Server srv *grpc.Server
@ -212,10 +170,6 @@ type test struct {
} }
func (te *test) tearDown() { func (te *test) tearDown() {
if te.cancel != nil {
te.cancel()
te.cancel = nil
}
if te.cc != nil { if te.cc != nil {
te.cc.Close() te.cc.Close()
te.cc = nil te.cc = nil
@ -223,12 +177,15 @@ func (te *test) tearDown() {
te.srv.Stop() te.srv.Stop()
} }
type testConfig struct {
compress string
}
// newTest returns a new test using the provided testing.T and // newTest returns a new test using the provided testing.T and
// environment. It is returned with default values. Tests should // environment. It is returned with default values. Tests should
// modify it before calling its startServer and clientConn methods. // modify it before calling its startServer and clientConn methods.
func newTest(t *testing.T, compress string) *test { func newTest(t *testing.T, tc *testConfig) *test {
te := &test{t: t, compress: compress} te := &test{t: t, compress: tc.compress}
te.ctx, te.cancel = context.WithCancel(context.Background())
return te return te
} }
@ -266,7 +223,7 @@ func (te *test) clientConn() *grpc.ClientConn {
if te.cc != nil { if te.cc != nil {
return te.cc return te.cc
} }
opts := []grpc.DialOption{grpc.WithInsecure()} opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()}
if te.compress == "gzip" { if te.compress == "gzip" {
opts = append(opts, opts = append(opts,
grpc.WithCompressor(grpc.NewGZIPCompressor()), grpc.WithCompressor(grpc.NewGZIPCompressor()),
@ -283,9 +240,10 @@ func (te *test) clientConn() *grpc.ClientConn {
} }
type rpcConfig struct { type rpcConfig struct {
count int // Number of requests and responses for streaming RPCs. count int // Number of requests and responses for streaming RPCs.
success bool // Whether the RPC should succeed or return error. success bool // Whether the RPC should succeed or return error.
failfast bool failfast bool
streaming bool // Whether the rpc should be a streaming RPC.
} }
func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) { func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) {
@ -335,14 +293,14 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest
} }
resps = append(resps, resp) resps = append(resps, resp)
} }
if err = stream.CloseSend(); err != nil { if err = stream.CloseSend(); err != nil && err != io.EOF {
return reqs, resps, err return reqs, resps, err
} }
if _, err = stream.Recv(); err != io.EOF { if _, err = stream.Recv(); err != io.EOF {
return reqs, resps, err return reqs, resps, err
} }
return reqs, resps, err return reqs, resps, nil
} }
type expectedData struct { type expectedData struct {
@ -667,26 +625,34 @@ func tagRPCCtx(ctx context.Context, info *stats.RPCTagInfo) context.Context {
return context.WithValue(ctx, rpcCtxKey{}, info) return context.WithValue(ctx, rpcCtxKey{}, info)
} }
func checkConnStats(t *testing.T, got []*gotData) {
if len(got) <= 0 || len(got)%2 != 0 {
for i, g := range got {
t.Errorf(" - %v, %T = %+v, ctx: %v", i, g.s, g.s, g.ctx)
}
t.Fatalf("got %v stats, want even positive number", len(got))
}
// The first conn stats must be a ConnBegin.
checkConnBegin(t, got[0], nil)
// The last conn stats must be a ConnEnd.
checkConnEnd(t, got[len(got)-1], nil)
}
func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) { func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
if len(got) != len(checkFuncs) { if len(got) != len(checkFuncs) {
for i, g := range got {
t.Errorf(" - %v, %T", i, g.s)
}
t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs)) t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs))
} }
var ( var rpcctx context.Context
rpcctx context.Context
connctx context.Context
)
for i := 0; i < len(got); i++ { for i := 0; i < len(got); i++ {
if _, ok := got[i].s.(stats.RPCStats); ok { if _, ok := got[i].s.(stats.RPCStats); ok {
if rpcctx != nil && got[i].ctx != rpcctx { if rpcctx != nil && got[i].ctx != rpcctx {
t.Fatalf("got different contexts with stats %T", got[i].s) t.Fatalf("got different contexts with stats %T", got[i].s)
} }
rpcctx = got[i].ctx rpcctx = got[i].ctx
} else {
if connctx != nil && got[i].ctx != connctx {
t.Fatalf("got different contexts with stats %T", got[i].s)
}
connctx = got[i].ctx
} }
} }
@ -695,49 +661,92 @@ func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkF
} }
} }
func TestServerStatsUnaryRPC(t *testing.T) { func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) {
var ( var (
mu sync.Mutex mu sync.Mutex
got []*gotData gotRPC []*gotData
gotConn []*gotData
) )
stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) { stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if !s.IsClient() { if !s.IsClient() {
got = append(got, &gotData{ctx, false, s}) gotRPC = append(gotRPC, &gotData{ctx, false, s})
} }
}) })
stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) { stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if !s.IsClient() { if !s.IsClient() {
got = append(got, &gotData{ctx, false, s}) gotConn = append(gotConn, &gotData{ctx, false, s})
} }
}) })
stats.RegisterConnTagger(tagConnCtx) stats.RegisterConnTagger(tagConnCtx)
stats.RegisterRPCTagger(tagRPCCtx) stats.RegisterRPCTagger(tagRPCCtx)
stats.Start() stats.Start()
defer stats.Stop()
te := newTest(t, "") te := newTest(t, tc)
te.startServer(&testServer{}) te.startServer(&testServer{})
defer te.tearDown() defer te.tearDown()
req, resp, err := te.doUnaryCall(&rpcConfig{success: true}) var (
if err != nil { reqs []*testpb.SimpleRequest
t.Fatalf(err.Error()) resps []*testpb.SimpleResponse
err error
)
if !cc.streaming {
req, resp, e := te.doUnaryCall(cc)
reqs = []*testpb.SimpleRequest{req}
resps = []*testpb.SimpleResponse{resp}
err = e
} else {
reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
} }
if cc.success != (err == nil) {
t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
}
te.cc.Close()
te.srv.GracefulStop() // Wait for the server to stop. te.srv.GracefulStop() // Wait for the server to stop.
expect := &expectedData{ for {
method: "/grpc.testing.TestService/UnaryCall", mu.Lock()
serverAddr: te.srvAddr, if len(gotRPC) >= len(checkFuncs) {
requests: []*testpb.SimpleRequest{req}, mu.Unlock()
responses: []*testpb.SimpleResponse{resp}, break
}
mu.Unlock()
time.Sleep(10 * time.Millisecond)
} }
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ for {
checkConnBegin, mu.Lock()
if _, ok := gotConn[len(gotConn)-1].s.(*stats.ConnEnd); ok {
mu.Unlock()
break
}
mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
expect := &expectedData{
serverAddr: te.srvAddr,
compression: tc.compress,
requests: reqs,
responses: resps,
err: err,
}
if !cc.streaming {
expect.method = "/grpc.testing.TestService/UnaryCall"
} else {
expect.method = "/grpc.testing.TestService/FullDuplexCall"
}
checkConnStats(t, gotConn)
checkServerStats(t, gotRPC, expect, checkFuncs)
}
func TestServerStatsUnaryRPC(t *testing.T) {
testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true}, []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader, checkInHeader,
checkBegin, checkBegin,
checkInPayload, checkInPayload,
@ -745,113 +754,23 @@ func TestServerStatsUnaryRPC(t *testing.T) {
checkOutPayload, checkOutPayload,
checkOutTrailer, checkOutTrailer,
checkEnd, checkEnd,
checkConnEnd, })
}
checkServerStats(t, got, expect, checkFuncs)
} }
func TestServerStatsUnaryRPCError(t *testing.T) { func TestServerStatsUnaryRPCError(t *testing.T) {
var ( testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false}, []func(t *testing.T, d *gotData, e *expectedData){
mu sync.Mutex
got []*gotData
)
stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if !s.IsClient() {
got = append(got, &gotData{ctx, false, s})
}
})
stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {
mu.Lock()
defer mu.Unlock()
if !s.IsClient() {
got = append(got, &gotData{ctx, false, s})
}
})
stats.RegisterConnTagger(tagConnCtx)
stats.RegisterRPCTagger(tagRPCCtx)
stats.Start()
defer stats.Stop()
te := newTest(t, "")
te.startServer(&testServer{})
defer te.tearDown()
req, resp, err := te.doUnaryCall(&rpcConfig{success: false})
if err == nil {
t.Fatalf("got error <nil>; want <non-nil>")
}
te.srv.GracefulStop() // Wait for the server to stop.
expect := &expectedData{
method: "/grpc.testing.TestService/UnaryCall",
serverAddr: te.srvAddr,
requests: []*testpb.SimpleRequest{req},
responses: []*testpb.SimpleResponse{resp},
err: err,
}
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkConnBegin,
checkInHeader, checkInHeader,
checkBegin, checkBegin,
checkInPayload, checkInPayload,
checkOutHeader, checkOutHeader,
checkOutTrailer, checkOutTrailer,
checkEnd, checkEnd,
checkConnEnd, })
}
checkServerStats(t, got, expect, checkFuncs)
} }
func TestServerStatsStreamingRPC(t *testing.T) { func TestServerStatsStreamingRPC(t *testing.T) {
var (
mu sync.Mutex
got []*gotData
)
stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if !s.IsClient() {
got = append(got, &gotData{ctx, false, s})
}
})
stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {
mu.Lock()
defer mu.Unlock()
if !s.IsClient() {
got = append(got, &gotData{ctx, false, s})
}
})
stats.RegisterConnTagger(tagConnCtx)
stats.RegisterRPCTagger(tagRPCCtx)
stats.Start()
defer stats.Stop()
te := newTest(t, "gzip")
te.startServer(&testServer{})
defer te.tearDown()
count := 5 count := 5
reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: true})
if err == nil {
t.Fatalf(err.Error())
}
te.srv.GracefulStop() // Wait for the server to stop.
expect := &expectedData{
method: "/grpc.testing.TestService/FullDuplexCall",
serverAddr: te.srvAddr,
compression: "gzip",
requests: reqs,
responses: resps,
}
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkConnBegin,
checkInHeader, checkInHeader,
checkBegin, checkBegin,
checkOutHeader, checkOutHeader,
@ -866,68 +785,20 @@ func TestServerStatsStreamingRPC(t *testing.T) {
checkFuncs = append(checkFuncs, checkFuncs = append(checkFuncs,
checkOutTrailer, checkOutTrailer,
checkEnd, checkEnd,
checkConnEnd,
) )
testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, streaming: true}, checkFuncs)
checkServerStats(t, got, expect, checkFuncs)
} }
func TestServerStatsStreamingRPCError(t *testing.T) { func TestServerStatsStreamingRPCError(t *testing.T) {
var (
mu sync.Mutex
got []*gotData
)
stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if !s.IsClient() {
got = append(got, &gotData{ctx, false, s})
}
})
stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {
mu.Lock()
defer mu.Unlock()
if !s.IsClient() {
got = append(got, &gotData{ctx, false, s})
}
})
stats.RegisterConnTagger(tagConnCtx)
stats.RegisterRPCTagger(tagRPCCtx)
stats.Start()
defer stats.Stop()
te := newTest(t, "gzip")
te.startServer(&testServer{})
defer te.tearDown()
count := 5 count := 5
reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: false}) testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, streaming: true}, []func(t *testing.T, d *gotData, e *expectedData){
if err == nil {
t.Fatalf("got error <nil>; want <non-nil>")
}
te.srv.GracefulStop() // Wait for the server to stop.
expect := &expectedData{
method: "/grpc.testing.TestService/FullDuplexCall",
serverAddr: te.srvAddr,
compression: "gzip",
requests: reqs,
responses: resps,
err: err,
}
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkConnBegin,
checkInHeader, checkInHeader,
checkBegin, checkBegin,
checkOutHeader, checkOutHeader,
checkInPayload, checkInPayload,
checkOutTrailer, checkOutTrailer,
checkEnd, checkEnd,
checkConnEnd, })
}
checkServerStats(t, got, expect, checkFuncs)
} }
type checkFuncWithCount struct { type checkFuncWithCount struct {
@ -941,24 +812,19 @@ func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkF
expectLen += v.c expectLen += v.c
} }
if len(got) != expectLen { if len(got) != expectLen {
for i, g := range got {
t.Errorf(" - %v, %T", i, g.s)
}
t.Fatalf("got %v stats, want %v stats", len(got), expectLen) t.Fatalf("got %v stats, want %v stats", len(got), expectLen)
} }
var ( var rpcctx context.Context
rpcctx context.Context
connctx context.Context
)
for i := 0; i < len(got); i++ { for i := 0; i < len(got); i++ {
if _, ok := got[i].s.(stats.RPCStats); ok { if _, ok := got[i].s.(stats.RPCStats); ok {
if rpcctx != nil && got[i].ctx != rpcctx { if rpcctx != nil && got[i].ctx != rpcctx {
t.Fatalf("got different contexts with stats %T", got[i].s) t.Fatalf("got different contexts with stats %T", got[i].s)
} }
rpcctx = got[i].ctx rpcctx = got[i].ctx
} else {
if connctx != nil && got[i].ctx != connctx {
t.Fatalf("got different contexts with stats %T", got[i].s)
}
connctx = got[i].ctx
} }
} }
@ -1024,51 +890,97 @@ func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkF
} }
} }
func TestClientStatsUnaryRPC(t *testing.T) { func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) {
var ( var (
mu sync.Mutex mu sync.Mutex
got []*gotData gotRPC []*gotData
gotConn []*gotData
) )
stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) { stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if s.IsClient() { if s.IsClient() {
got = append(got, &gotData{ctx, true, s}) gotRPC = append(gotRPC, &gotData{ctx, true, s})
} }
}) })
stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) { stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {
mu.Lock() mu.Lock()
defer mu.Unlock() defer mu.Unlock()
if s.IsClient() { if s.IsClient() {
got = append(got, &gotData{ctx, true, s}) gotConn = append(gotConn, &gotData{ctx, true, s})
} }
}) })
stats.RegisterConnTagger(tagConnCtx) stats.RegisterConnTagger(tagConnCtx)
stats.RegisterRPCTagger(tagRPCCtx) stats.RegisterRPCTagger(tagRPCCtx)
stats.Start() stats.Start()
defer stats.Stop()
te := newTest(t, "") te := newTest(t, tc)
te.startServer(&testServer{}) te.startServer(&testServer{})
defer te.tearDown() defer te.tearDown()
failfast := false var (
req, resp, err := te.doUnaryCall(&rpcConfig{success: true, failfast: failfast}) reqs []*testpb.SimpleRequest
if err != nil { resps []*testpb.SimpleResponse
t.Fatalf(err.Error()) err error
)
if !cc.streaming {
req, resp, e := te.doUnaryCall(cc)
reqs = []*testpb.SimpleRequest{req}
resps = []*testpb.SimpleResponse{resp}
err = e
} else {
reqs, resps, err = te.doFullDuplexCallRoundtrip(cc)
} }
if cc.success != (err == nil) {
t.Fatalf("cc.success: %v, got error: %v", cc.success, err)
}
te.cc.Close()
te.srv.GracefulStop() // Wait for the server to stop. te.srv.GracefulStop() // Wait for the server to stop.
expect := &expectedData{ lenRPCStats := 0
method: "/grpc.testing.TestService/UnaryCall", for _, v := range checkFuncs {
serverAddr: te.srvAddr, lenRPCStats += v.c
requests: []*testpb.SimpleRequest{req}, }
responses: []*testpb.SimpleResponse{resp}, for {
failfast: failfast, mu.Lock()
if len(gotRPC) >= lenRPCStats {
mu.Unlock()
break
}
mu.Unlock()
time.Sleep(10 * time.Millisecond)
} }
checkFuncs := map[int]*checkFuncWithCount{ for {
connbegin: {checkConnBegin, 1}, mu.Lock()
if _, ok := gotConn[len(gotConn)-1].s.(*stats.ConnEnd); ok {
mu.Unlock()
break
}
mu.Unlock()
time.Sleep(10 * time.Millisecond)
}
expect := &expectedData{
serverAddr: te.srvAddr,
compression: tc.compress,
requests: reqs,
responses: resps,
failfast: cc.failfast,
err: err,
}
if !cc.streaming {
expect.method = "/grpc.testing.TestService/UnaryCall"
} else {
expect.method = "/grpc.testing.TestService/FullDuplexCall"
}
checkConnStats(t, gotConn)
checkClientStats(t, gotRPC, expect, checkFuncs)
}
func TestClientStatsUnaryRPC(t *testing.T) {
testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false}, map[int]*checkFuncWithCount{
begin: {checkBegin, 1}, begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1}, outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1}, outPayload: {checkOutPayload, 1},
@ -1076,117 +988,23 @@ func TestClientStatsUnaryRPC(t *testing.T) {
inPayload: {checkInPayload, 1}, inPayload: {checkInPayload, 1},
inTrailer: {checkInTrailer, 1}, inTrailer: {checkInTrailer, 1},
end: {checkEnd, 1}, end: {checkEnd, 1},
connend: {checkConnEnd, 1}, })
}
checkClientStats(t, got, expect, checkFuncs)
} }
func TestClientStatsUnaryRPCError(t *testing.T) { func TestClientStatsUnaryRPCError(t *testing.T) {
var ( testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false}, map[int]*checkFuncWithCount{
mu sync.Mutex
got []*gotData
)
stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if s.IsClient() {
got = append(got, &gotData{ctx, true, s})
}
})
stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {
mu.Lock()
defer mu.Unlock()
if s.IsClient() {
got = append(got, &gotData{ctx, true, s})
}
})
stats.RegisterConnTagger(tagConnCtx)
stats.RegisterRPCTagger(tagRPCCtx)
stats.Start()
defer stats.Stop()
te := newTest(t, "")
te.startServer(&testServer{})
defer te.tearDown()
failfast := true
req, resp, err := te.doUnaryCall(&rpcConfig{success: false, failfast: failfast})
if err == nil {
t.Fatalf("got error <nil>; want <non-nil>")
}
te.srv.GracefulStop() // Wait for the server to stop.
expect := &expectedData{
method: "/grpc.testing.TestService/UnaryCall",
serverAddr: te.srvAddr,
requests: []*testpb.SimpleRequest{req},
responses: []*testpb.SimpleResponse{resp},
err: err,
failfast: failfast,
}
checkFuncs := map[int]*checkFuncWithCount{
connbegin: {checkConnBegin, 1},
begin: {checkBegin, 1}, begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1}, outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1}, outPayload: {checkOutPayload, 1},
inHeader: {checkInHeader, 1}, inHeader: {checkInHeader, 1},
inTrailer: {checkInTrailer, 1}, inTrailer: {checkInTrailer, 1},
end: {checkEnd, 1}, end: {checkEnd, 1},
connend: {checkConnEnd, 1}, })
}
checkClientStats(t, got, expect, checkFuncs)
} }
func TestClientStatsStreamingRPC(t *testing.T) { func TestClientStatsStreamingRPC(t *testing.T) {
var (
mu sync.Mutex
got []*gotData
)
stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if s.IsClient() {
got = append(got, &gotData{ctx, true, s})
}
})
stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {
mu.Lock()
defer mu.Unlock()
if s.IsClient() {
got = append(got, &gotData{ctx, true, s})
}
})
stats.RegisterConnTagger(tagConnCtx)
stats.RegisterRPCTagger(tagRPCCtx)
stats.Start()
defer stats.Stop()
te := newTest(t, "gzip")
te.startServer(&testServer{})
defer te.tearDown()
count := 5 count := 5
failfast := false testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, streaming: true}, map[int]*checkFuncWithCount{
reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: true, failfast: failfast})
if err == nil {
t.Fatalf(err.Error())
}
te.srv.GracefulStop() // Wait for the server to stop.
expect := &expectedData{
method: "/grpc.testing.TestService/FullDuplexCall",
serverAddr: te.srvAddr,
compression: "gzip",
requests: reqs,
responses: resps,
failfast: failfast,
}
checkFuncs := map[int]*checkFuncWithCount{
connbegin: {checkConnBegin, 1},
begin: {checkBegin, 1}, begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1}, outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, count}, outPayload: {checkOutPayload, count},
@ -1194,68 +1012,17 @@ func TestClientStatsStreamingRPC(t *testing.T) {
inPayload: {checkInPayload, count}, inPayload: {checkInPayload, count},
inTrailer: {checkInTrailer, 1}, inTrailer: {checkInTrailer, 1},
end: {checkEnd, 1}, end: {checkEnd, 1},
connend: {checkConnEnd, 1}, })
}
checkClientStats(t, got, expect, checkFuncs)
} }
func TestClientStatsStreamingRPCError(t *testing.T) { func TestClientStatsStreamingRPCError(t *testing.T) {
var (
mu sync.Mutex
got []*gotData
)
stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {
mu.Lock()
defer mu.Unlock()
if s.IsClient() {
got = append(got, &gotData{ctx, true, s})
}
})
stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {
mu.Lock()
defer mu.Unlock()
if s.IsClient() {
got = append(got, &gotData{ctx, true, s})
}
})
stats.RegisterConnTagger(tagConnCtx)
stats.RegisterRPCTagger(tagRPCCtx)
stats.Start()
defer stats.Stop()
te := newTest(t, "gzip")
te.startServer(&testServer{})
defer te.tearDown()
count := 5 count := 5
failfast := true testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, streaming: true}, map[int]*checkFuncWithCount{
reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: false, failfast: failfast})
if err == nil {
t.Fatalf("got error <nil>; want <non-nil>")
}
te.srv.GracefulStop() // Wait for the server to stop.
expect := &expectedData{
method: "/grpc.testing.TestService/FullDuplexCall",
serverAddr: te.srvAddr,
compression: "gzip",
requests: reqs,
responses: resps,
err: err,
failfast: failfast,
}
checkFuncs := map[int]*checkFuncWithCount{
connbegin: {checkConnBegin, 1},
begin: {checkBegin, 1}, begin: {checkBegin, 1},
outHeader: {checkOutHeader, 1}, outHeader: {checkOutHeader, 1},
outPayload: {checkOutPayload, 1}, outPayload: {checkOutPayload, 1},
inHeader: {checkInHeader, 1}, inHeader: {checkInHeader, 1},
inTrailer: {checkInTrailer, 1}, inTrailer: {checkInTrailer, 1},
end: {checkEnd, 1}, end: {checkEnd, 1},
connend: {checkConnEnd, 1}, })
}
checkClientStats(t, got, expect, checkFuncs)
} }

View File

@ -209,6 +209,16 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
maxStreams: math.MaxInt32, maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize, streamSendQuota: defaultWindowSize,
} }
if stats.On() {
t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
Client: true,
}
stats.HandleConn(t.ctx, connBegin)
}
// Start the reader goroutine for incoming message. Each transport has // Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it // a dedicated goroutine which reads HTTP2 frame from network. Then it
// dispatches the frame to the corresponding stream entity. // dispatches the frame to the corresponding stream entity.
@ -244,16 +254,6 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
} }
go t.controller() go t.controller()
t.writableChan <- 0 t.writableChan <- 0
if stats.On() {
t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{
RemoteAddr: t.remoteAddr,
LocalAddr: t.localAddr,
})
connBegin := &stats.ConnBegin{
Client: true,
}
stats.HandleConn(t.ctx, connBegin)
}
return t, nil return t, nil
} }