diff --git a/stats/handlers.go b/stats/handlers.go index d41c5244..ce47786d 100644 --- a/stats/handlers.go +++ b/stats/handlers.go @@ -140,12 +140,6 @@ func Start() { atomic.StoreInt32(on, 1) } -// Stop stops the stats collection and processing. -// Stop does not unregister the handlers. -func Stop() { - atomic.StoreInt32(on, 0) -} - // On indicates whether the stats collection and processing is on. func On() bool { return atomic.CompareAndSwapInt32(on, 1, 1) diff --git a/stats/stats_test.go b/stats/stats_test.go index 1761e794..a1e116de 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -40,6 +40,7 @@ import ( "reflect" "sync" "testing" + "time" "github.com/golang/protobuf/proto" "golang.org/x/net/context" @@ -53,46 +54,6 @@ func init() { grpc.EnableTracing = false } -func TestStartStop(t *testing.T) { - stats.RegisterRPCHandler(nil) - stats.RegisterConnHandler(nil) - stats.Start() - if stats.On() { - t.Fatalf("stats.Start() with nil handler, stats.On() = true, want false") - } - - stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {}) - stats.RegisterConnHandler(nil) - stats.Start() - if !stats.On() { - t.Fatalf("stats.Start() with non-nil handler, stats.On() = false, want true") - } - stats.Stop() - - stats.RegisterRPCHandler(nil) - stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {}) - stats.Start() - if !stats.On() { - t.Fatalf("stats.Start() with non-nil conn handler, stats.On() = false, want true") - } - stats.Stop() - - stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) {}) - stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) {}) - if stats.On() { - t.Fatalf("after stats.RegisterRPCHandler(), stats.On() = true, want false") - } - stats.Start() - if !stats.On() { - t.Fatalf("after stats.Start(_), stats.On() = false, want true") - } - - stats.Stop() - if stats.On() { - t.Fatalf("after stats.Stop(), stats.On() = true, want false") - } -} - type connCtxKey struct{} type rpcCtxKey struct{} @@ -200,9 +161,6 @@ type test struct { t *testing.T compress string - ctx context.Context // valid for life of test, before tearDown - cancel context.CancelFunc - testServer testpb.TestServiceServer // nil means none // srv and srvAddr are set once startServer is called. srv *grpc.Server @@ -212,10 +170,6 @@ type test struct { } func (te *test) tearDown() { - if te.cancel != nil { - te.cancel() - te.cancel = nil - } if te.cc != nil { te.cc.Close() te.cc = nil @@ -223,12 +177,15 @@ func (te *test) tearDown() { te.srv.Stop() } +type testConfig struct { + compress string +} + // newTest returns a new test using the provided testing.T and // environment. It is returned with default values. Tests should // modify it before calling its startServer and clientConn methods. -func newTest(t *testing.T, compress string) *test { - te := &test{t: t, compress: compress} - te.ctx, te.cancel = context.WithCancel(context.Background()) +func newTest(t *testing.T, tc *testConfig) *test { + te := &test{t: t, compress: tc.compress} return te } @@ -266,7 +223,7 @@ func (te *test) clientConn() *grpc.ClientConn { if te.cc != nil { return te.cc } - opts := []grpc.DialOption{grpc.WithInsecure()} + opts := []grpc.DialOption{grpc.WithInsecure(), grpc.WithBlock()} if te.compress == "gzip" { opts = append(opts, grpc.WithCompressor(grpc.NewGZIPCompressor()), @@ -283,9 +240,10 @@ func (te *test) clientConn() *grpc.ClientConn { } type rpcConfig struct { - count int // Number of requests and responses for streaming RPCs. - success bool // Whether the RPC should succeed or return error. - failfast bool + count int // Number of requests and responses for streaming RPCs. + success bool // Whether the RPC should succeed or return error. + failfast bool + streaming bool // Whether the rpc should be a streaming RPC. } func (te *test) doUnaryCall(c *rpcConfig) (*testpb.SimpleRequest, *testpb.SimpleResponse, error) { @@ -335,14 +293,14 @@ func (te *test) doFullDuplexCallRoundtrip(c *rpcConfig) ([]*testpb.SimpleRequest } resps = append(resps, resp) } - if err = stream.CloseSend(); err != nil { + if err = stream.CloseSend(); err != nil && err != io.EOF { return reqs, resps, err } if _, err = stream.Recv(); err != io.EOF { return reqs, resps, err } - return reqs, resps, err + return reqs, resps, nil } type expectedData struct { @@ -667,26 +625,34 @@ func tagRPCCtx(ctx context.Context, info *stats.RPCTagInfo) context.Context { return context.WithValue(ctx, rpcCtxKey{}, info) } +func checkConnStats(t *testing.T, got []*gotData) { + if len(got) <= 0 || len(got)%2 != 0 { + for i, g := range got { + t.Errorf(" - %v, %T = %+v, ctx: %v", i, g.s, g.s, g.ctx) + } + t.Fatalf("got %v stats, want even positive number", len(got)) + } + // The first conn stats must be a ConnBegin. + checkConnBegin(t, got[0], nil) + // The last conn stats must be a ConnEnd. + checkConnEnd(t, got[len(got)-1], nil) +} + func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) { if len(got) != len(checkFuncs) { + for i, g := range got { + t.Errorf(" - %v, %T", i, g.s) + } t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs)) } - var ( - rpcctx context.Context - connctx context.Context - ) + var rpcctx context.Context for i := 0; i < len(got); i++ { if _, ok := got[i].s.(stats.RPCStats); ok { if rpcctx != nil && got[i].ctx != rpcctx { t.Fatalf("got different contexts with stats %T", got[i].s) } rpcctx = got[i].ctx - } else { - if connctx != nil && got[i].ctx != connctx { - t.Fatalf("got different contexts with stats %T", got[i].s) - } - connctx = got[i].ctx } } @@ -695,49 +661,92 @@ func checkServerStats(t *testing.T, got []*gotData, expect *expectedData, checkF } } -func TestServerStatsUnaryRPC(t *testing.T) { +func testServerStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs []func(t *testing.T, d *gotData, e *expectedData)) { var ( - mu sync.Mutex - got []*gotData + mu sync.Mutex + gotRPC []*gotData + gotConn []*gotData ) stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) { mu.Lock() defer mu.Unlock() if !s.IsClient() { - got = append(got, &gotData{ctx, false, s}) + gotRPC = append(gotRPC, &gotData{ctx, false, s}) } }) stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) { mu.Lock() defer mu.Unlock() if !s.IsClient() { - got = append(got, &gotData{ctx, false, s}) + gotConn = append(gotConn, &gotData{ctx, false, s}) } }) stats.RegisterConnTagger(tagConnCtx) stats.RegisterRPCTagger(tagRPCCtx) stats.Start() - defer stats.Stop() - te := newTest(t, "") + te := newTest(t, tc) te.startServer(&testServer{}) defer te.tearDown() - req, resp, err := te.doUnaryCall(&rpcConfig{success: true}) - if err != nil { - t.Fatalf(err.Error()) + var ( + reqs []*testpb.SimpleRequest + resps []*testpb.SimpleResponse + err error + ) + if !cc.streaming { + req, resp, e := te.doUnaryCall(cc) + reqs = []*testpb.SimpleRequest{req} + resps = []*testpb.SimpleResponse{resp} + err = e + } else { + reqs, resps, err = te.doFullDuplexCallRoundtrip(cc) } + if cc.success != (err == nil) { + t.Fatalf("cc.success: %v, got error: %v", cc.success, err) + } + te.cc.Close() te.srv.GracefulStop() // Wait for the server to stop. - expect := &expectedData{ - method: "/grpc.testing.TestService/UnaryCall", - serverAddr: te.srvAddr, - requests: []*testpb.SimpleRequest{req}, - responses: []*testpb.SimpleResponse{resp}, + for { + mu.Lock() + if len(gotRPC) >= len(checkFuncs) { + mu.Unlock() + break + } + mu.Unlock() + time.Sleep(10 * time.Millisecond) } - checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ - checkConnBegin, + for { + mu.Lock() + if _, ok := gotConn[len(gotConn)-1].s.(*stats.ConnEnd); ok { + mu.Unlock() + break + } + mu.Unlock() + time.Sleep(10 * time.Millisecond) + } + + expect := &expectedData{ + serverAddr: te.srvAddr, + compression: tc.compress, + requests: reqs, + responses: resps, + err: err, + } + if !cc.streaming { + expect.method = "/grpc.testing.TestService/UnaryCall" + } else { + expect.method = "/grpc.testing.TestService/FullDuplexCall" + } + + checkConnStats(t, gotConn) + checkServerStats(t, gotRPC, expect, checkFuncs) +} + +func TestServerStatsUnaryRPC(t *testing.T) { + testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: true}, []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, checkBegin, checkInPayload, @@ -745,113 +754,23 @@ func TestServerStatsUnaryRPC(t *testing.T) { checkOutPayload, checkOutTrailer, checkEnd, - checkConnEnd, - } - - checkServerStats(t, got, expect, checkFuncs) + }) } func TestServerStatsUnaryRPCError(t *testing.T) { - var ( - mu sync.Mutex - got []*gotData - ) - stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) { - mu.Lock() - defer mu.Unlock() - if !s.IsClient() { - got = append(got, &gotData{ctx, false, s}) - } - }) - stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) { - mu.Lock() - defer mu.Unlock() - if !s.IsClient() { - got = append(got, &gotData{ctx, false, s}) - } - }) - stats.RegisterConnTagger(tagConnCtx) - stats.RegisterRPCTagger(tagRPCCtx) - stats.Start() - defer stats.Stop() - - te := newTest(t, "") - te.startServer(&testServer{}) - defer te.tearDown() - - req, resp, err := te.doUnaryCall(&rpcConfig{success: false}) - if err == nil { - t.Fatalf("got error ; want ") - } - te.srv.GracefulStop() // Wait for the server to stop. - - expect := &expectedData{ - method: "/grpc.testing.TestService/UnaryCall", - serverAddr: te.srvAddr, - requests: []*testpb.SimpleRequest{req}, - responses: []*testpb.SimpleResponse{resp}, - err: err, - } - - checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ - checkConnBegin, + testServerStats(t, &testConfig{compress: ""}, &rpcConfig{success: false}, []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, checkBegin, checkInPayload, checkOutHeader, checkOutTrailer, checkEnd, - checkConnEnd, - } - - checkServerStats(t, got, expect, checkFuncs) + }) } func TestServerStatsStreamingRPC(t *testing.T) { - var ( - mu sync.Mutex - got []*gotData - ) - stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) { - mu.Lock() - defer mu.Unlock() - if !s.IsClient() { - got = append(got, &gotData{ctx, false, s}) - } - }) - stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) { - mu.Lock() - defer mu.Unlock() - if !s.IsClient() { - got = append(got, &gotData{ctx, false, s}) - } - }) - stats.RegisterConnTagger(tagConnCtx) - stats.RegisterRPCTagger(tagRPCCtx) - stats.Start() - defer stats.Stop() - - te := newTest(t, "gzip") - te.startServer(&testServer{}) - defer te.tearDown() - count := 5 - reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: true}) - if err == nil { - t.Fatalf(err.Error()) - } - te.srv.GracefulStop() // Wait for the server to stop. - - expect := &expectedData{ - method: "/grpc.testing.TestService/FullDuplexCall", - serverAddr: te.srvAddr, - compression: "gzip", - requests: reqs, - responses: resps, - } - checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ - checkConnBegin, checkInHeader, checkBegin, checkOutHeader, @@ -866,68 +785,20 @@ func TestServerStatsStreamingRPC(t *testing.T) { checkFuncs = append(checkFuncs, checkOutTrailer, checkEnd, - checkConnEnd, ) - - checkServerStats(t, got, expect, checkFuncs) + testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, streaming: true}, checkFuncs) } func TestServerStatsStreamingRPCError(t *testing.T) { - var ( - mu sync.Mutex - got []*gotData - ) - stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) { - mu.Lock() - defer mu.Unlock() - if !s.IsClient() { - got = append(got, &gotData{ctx, false, s}) - } - }) - stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) { - mu.Lock() - defer mu.Unlock() - if !s.IsClient() { - got = append(got, &gotData{ctx, false, s}) - } - }) - stats.RegisterConnTagger(tagConnCtx) - stats.RegisterRPCTagger(tagRPCCtx) - stats.Start() - defer stats.Stop() - - te := newTest(t, "gzip") - te.startServer(&testServer{}) - defer te.tearDown() - count := 5 - reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: false}) - if err == nil { - t.Fatalf("got error ; want ") - } - te.srv.GracefulStop() // Wait for the server to stop. - - expect := &expectedData{ - method: "/grpc.testing.TestService/FullDuplexCall", - serverAddr: te.srvAddr, - compression: "gzip", - requests: reqs, - responses: resps, - err: err, - } - - checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ - checkConnBegin, + testServerStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, streaming: true}, []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, checkBegin, checkOutHeader, checkInPayload, checkOutTrailer, checkEnd, - checkConnEnd, - } - - checkServerStats(t, got, expect, checkFuncs) + }) } type checkFuncWithCount struct { @@ -941,24 +812,19 @@ func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkF expectLen += v.c } if len(got) != expectLen { + for i, g := range got { + t.Errorf(" - %v, %T", i, g.s) + } t.Fatalf("got %v stats, want %v stats", len(got), expectLen) } - var ( - rpcctx context.Context - connctx context.Context - ) + var rpcctx context.Context for i := 0; i < len(got); i++ { if _, ok := got[i].s.(stats.RPCStats); ok { if rpcctx != nil && got[i].ctx != rpcctx { t.Fatalf("got different contexts with stats %T", got[i].s) } rpcctx = got[i].ctx - } else { - if connctx != nil && got[i].ctx != connctx { - t.Fatalf("got different contexts with stats %T", got[i].s) - } - connctx = got[i].ctx } } @@ -1024,51 +890,97 @@ func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkF } } -func TestClientStatsUnaryRPC(t *testing.T) { +func testClientStats(t *testing.T, tc *testConfig, cc *rpcConfig, checkFuncs map[int]*checkFuncWithCount) { var ( - mu sync.Mutex - got []*gotData + mu sync.Mutex + gotRPC []*gotData + gotConn []*gotData ) stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) { mu.Lock() defer mu.Unlock() if s.IsClient() { - got = append(got, &gotData{ctx, true, s}) + gotRPC = append(gotRPC, &gotData{ctx, true, s}) } }) stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) { mu.Lock() defer mu.Unlock() if s.IsClient() { - got = append(got, &gotData{ctx, true, s}) + gotConn = append(gotConn, &gotData{ctx, true, s}) } }) stats.RegisterConnTagger(tagConnCtx) stats.RegisterRPCTagger(tagRPCCtx) stats.Start() - defer stats.Stop() - te := newTest(t, "") + te := newTest(t, tc) te.startServer(&testServer{}) defer te.tearDown() - failfast := false - req, resp, err := te.doUnaryCall(&rpcConfig{success: true, failfast: failfast}) - if err != nil { - t.Fatalf(err.Error()) + var ( + reqs []*testpb.SimpleRequest + resps []*testpb.SimpleResponse + err error + ) + if !cc.streaming { + req, resp, e := te.doUnaryCall(cc) + reqs = []*testpb.SimpleRequest{req} + resps = []*testpb.SimpleResponse{resp} + err = e + } else { + reqs, resps, err = te.doFullDuplexCallRoundtrip(cc) } + if cc.success != (err == nil) { + t.Fatalf("cc.success: %v, got error: %v", cc.success, err) + } + te.cc.Close() te.srv.GracefulStop() // Wait for the server to stop. - expect := &expectedData{ - method: "/grpc.testing.TestService/UnaryCall", - serverAddr: te.srvAddr, - requests: []*testpb.SimpleRequest{req}, - responses: []*testpb.SimpleResponse{resp}, - failfast: failfast, + lenRPCStats := 0 + for _, v := range checkFuncs { + lenRPCStats += v.c + } + for { + mu.Lock() + if len(gotRPC) >= lenRPCStats { + mu.Unlock() + break + } + mu.Unlock() + time.Sleep(10 * time.Millisecond) } - checkFuncs := map[int]*checkFuncWithCount{ - connbegin: {checkConnBegin, 1}, + for { + mu.Lock() + if _, ok := gotConn[len(gotConn)-1].s.(*stats.ConnEnd); ok { + mu.Unlock() + break + } + mu.Unlock() + time.Sleep(10 * time.Millisecond) + } + + expect := &expectedData{ + serverAddr: te.srvAddr, + compression: tc.compress, + requests: reqs, + responses: resps, + failfast: cc.failfast, + err: err, + } + if !cc.streaming { + expect.method = "/grpc.testing.TestService/UnaryCall" + } else { + expect.method = "/grpc.testing.TestService/FullDuplexCall" + } + + checkConnStats(t, gotConn) + checkClientStats(t, gotRPC, expect, checkFuncs) +} + +func TestClientStatsUnaryRPC(t *testing.T) { + testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: true, failfast: false}, map[int]*checkFuncWithCount{ begin: {checkBegin, 1}, outHeader: {checkOutHeader, 1}, outPayload: {checkOutPayload, 1}, @@ -1076,117 +988,23 @@ func TestClientStatsUnaryRPC(t *testing.T) { inPayload: {checkInPayload, 1}, inTrailer: {checkInTrailer, 1}, end: {checkEnd, 1}, - connend: {checkConnEnd, 1}, - } - - checkClientStats(t, got, expect, checkFuncs) + }) } func TestClientStatsUnaryRPCError(t *testing.T) { - var ( - mu sync.Mutex - got []*gotData - ) - stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) { - mu.Lock() - defer mu.Unlock() - if s.IsClient() { - got = append(got, &gotData{ctx, true, s}) - } - }) - stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) { - mu.Lock() - defer mu.Unlock() - if s.IsClient() { - got = append(got, &gotData{ctx, true, s}) - } - }) - stats.RegisterConnTagger(tagConnCtx) - stats.RegisterRPCTagger(tagRPCCtx) - stats.Start() - defer stats.Stop() - - te := newTest(t, "") - te.startServer(&testServer{}) - defer te.tearDown() - - failfast := true - req, resp, err := te.doUnaryCall(&rpcConfig{success: false, failfast: failfast}) - if err == nil { - t.Fatalf("got error ; want ") - } - te.srv.GracefulStop() // Wait for the server to stop. - - expect := &expectedData{ - method: "/grpc.testing.TestService/UnaryCall", - serverAddr: te.srvAddr, - requests: []*testpb.SimpleRequest{req}, - responses: []*testpb.SimpleResponse{resp}, - err: err, - failfast: failfast, - } - - checkFuncs := map[int]*checkFuncWithCount{ - connbegin: {checkConnBegin, 1}, + testClientStats(t, &testConfig{compress: ""}, &rpcConfig{success: false, failfast: false}, map[int]*checkFuncWithCount{ begin: {checkBegin, 1}, outHeader: {checkOutHeader, 1}, outPayload: {checkOutPayload, 1}, inHeader: {checkInHeader, 1}, inTrailer: {checkInTrailer, 1}, end: {checkEnd, 1}, - connend: {checkConnEnd, 1}, - } - - checkClientStats(t, got, expect, checkFuncs) + }) } func TestClientStatsStreamingRPC(t *testing.T) { - var ( - mu sync.Mutex - got []*gotData - ) - stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) { - mu.Lock() - defer mu.Unlock() - if s.IsClient() { - got = append(got, &gotData{ctx, true, s}) - } - }) - stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) { - mu.Lock() - defer mu.Unlock() - if s.IsClient() { - got = append(got, &gotData{ctx, true, s}) - } - }) - stats.RegisterConnTagger(tagConnCtx) - stats.RegisterRPCTagger(tagRPCCtx) - stats.Start() - defer stats.Stop() - - te := newTest(t, "gzip") - te.startServer(&testServer{}) - defer te.tearDown() - count := 5 - failfast := false - reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: true, failfast: failfast}) - if err == nil { - t.Fatalf(err.Error()) - } - te.srv.GracefulStop() // Wait for the server to stop. - - expect := &expectedData{ - method: "/grpc.testing.TestService/FullDuplexCall", - serverAddr: te.srvAddr, - compression: "gzip", - requests: reqs, - responses: resps, - failfast: failfast, - } - - checkFuncs := map[int]*checkFuncWithCount{ - connbegin: {checkConnBegin, 1}, + testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: true, failfast: false, streaming: true}, map[int]*checkFuncWithCount{ begin: {checkBegin, 1}, outHeader: {checkOutHeader, 1}, outPayload: {checkOutPayload, count}, @@ -1194,68 +1012,17 @@ func TestClientStatsStreamingRPC(t *testing.T) { inPayload: {checkInPayload, count}, inTrailer: {checkInTrailer, 1}, end: {checkEnd, 1}, - connend: {checkConnEnd, 1}, - } - - checkClientStats(t, got, expect, checkFuncs) + }) } func TestClientStatsStreamingRPCError(t *testing.T) { - var ( - mu sync.Mutex - got []*gotData - ) - stats.RegisterRPCHandler(func(ctx context.Context, s stats.RPCStats) { - mu.Lock() - defer mu.Unlock() - if s.IsClient() { - got = append(got, &gotData{ctx, true, s}) - } - }) - stats.RegisterConnHandler(func(ctx context.Context, s stats.ConnStats) { - mu.Lock() - defer mu.Unlock() - if s.IsClient() { - got = append(got, &gotData{ctx, true, s}) - } - }) - stats.RegisterConnTagger(tagConnCtx) - stats.RegisterRPCTagger(tagRPCCtx) - stats.Start() - defer stats.Stop() - - te := newTest(t, "gzip") - te.startServer(&testServer{}) - defer te.tearDown() - count := 5 - failfast := true - reqs, resps, err := te.doFullDuplexCallRoundtrip(&rpcConfig{count: count, success: false, failfast: failfast}) - if err == nil { - t.Fatalf("got error ; want ") - } - te.srv.GracefulStop() // Wait for the server to stop. - - expect := &expectedData{ - method: "/grpc.testing.TestService/FullDuplexCall", - serverAddr: te.srvAddr, - compression: "gzip", - requests: reqs, - responses: resps, - err: err, - failfast: failfast, - } - - checkFuncs := map[int]*checkFuncWithCount{ - connbegin: {checkConnBegin, 1}, + testClientStats(t, &testConfig{compress: "gzip"}, &rpcConfig{count: count, success: false, failfast: false, streaming: true}, map[int]*checkFuncWithCount{ begin: {checkBegin, 1}, outHeader: {checkOutHeader, 1}, outPayload: {checkOutPayload, 1}, inHeader: {checkInHeader, 1}, inTrailer: {checkInTrailer, 1}, end: {checkEnd, 1}, - connend: {checkConnEnd, 1}, - } - - checkClientStats(t, got, expect, checkFuncs) + }) } diff --git a/transport/http2_client.go b/transport/http2_client.go index 3936b9cf..605b1e5a 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -209,6 +209,16 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( maxStreams: math.MaxInt32, streamSendQuota: defaultWindowSize, } + if stats.On() { + t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{ + RemoteAddr: t.remoteAddr, + LocalAddr: t.localAddr, + }) + connBegin := &stats.ConnBegin{ + Client: true, + } + stats.HandleConn(t.ctx, connBegin) + } // Start the reader goroutine for incoming message. Each transport has // a dedicated goroutine which reads HTTP2 frame from network. Then it // dispatches the frame to the corresponding stream entity. @@ -244,16 +254,6 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( } go t.controller() t.writableChan <- 0 - if stats.On() { - t.ctx = stats.TagConn(t.ctx, &stats.ConnTagInfo{ - RemoteAddr: t.remoteAddr, - LocalAddr: t.localAddr, - }) - connBegin := &stats.ConnBegin{ - Client: true, - } - stats.HandleConn(t.ctx, connBegin) - } return t, nil }