From 1d2a929ae5668ce161c533580a996df492c74d7c Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Fri, 4 Nov 2016 15:30:23 -0700 Subject: [PATCH] add stats.Begin and stats.End --- call.go | 38 +++++++++++--------- server.go | 56 ++++++++++++++++-------------- stats/stats.go | 21 +++++++++--- stats/stats_test.go | 84 +++++++++++++++++++++++++++++++++++++++------ stream.go | 42 ++++++++++++++++------- 5 files changed, 170 insertions(+), 71 deletions(-) diff --git a/call.go b/call.go index 3ef343a7..ebfffdd6 100644 --- a/call.go +++ b/call.go @@ -169,25 +169,21 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } }() } - var ( - err error - t transport.ClientTransport - stream *transport.Stream - // Record the put handler from Balancer.Get(...). It is called once the - // RPC has completed or failed. - put func() - ) + if stats.On() { + begin := &stats.Begin{ + Client: true, + BeginTime: time.Now(), + } + stats.Handle(ctx, begin) + } defer func() { - if e != nil && stats.On() { - errorStats := &stats.RPCErr{ - Client: true, - Error: e, - } - if stream != nil { - stats.Handle(stream.Context(), errorStats) - } else { - stats.Handle(ctx, errorStats) + if stats.On() { + end := &stats.End{ + Client: true, + EndTime: time.Now(), + Error: e, } + stats.Handle(ctx, end) } }() topts := &transport.Options{ @@ -195,6 +191,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli Delay: false, } for { + var ( + err error + t transport.ClientTransport + stream *transport.Stream + // Record the put handler from Balancer.Get(...). It is called once the + // RPC has completed or failed. + put func() + ) // TODO(zhaoq): Need a formal spec of fail-fast. callHdr := &transport.CallHdr{ Host: cc.authority, diff --git a/server.go b/server.go index d65ce579..bac34637 100644 --- a/server.go +++ b/server.go @@ -582,12 +582,23 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) { + if stats.On() { + begin := &stats.Begin{ + BeginTime: time.Now(), + } + stats.Handle(stream.Context(), begin) + } defer func() { - if stats.On() && err != nil && err != io.EOF { - errorStats := &stats.RPCErr{ - Error: toRPCErr(err), + if stats.On() { + var e error + if err != nil && err != io.EOF { + e = toRPCErr(err) } - stats.Handle(stream.Context(), errorStats) + end := &stats.End{ + EndTime: time.Now(), + Error: e, + } + stats.Handle(stream.Context(), end) } }() if trInfo != nil { @@ -741,12 +752,23 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) { + if stats.On() { + begin := &stats.Begin{ + BeginTime: time.Now(), + } + stats.Handle(stream.Context(), begin) + } defer func() { - if stats.On() && err != nil && err != io.EOF { - errorStats := &stats.RPCErr{ - Error: toRPCErr(err), + if stats.On() { + var e error + if err != nil && err != io.EOF { + e = toRPCErr(err) } - stats.Handle(stream.Context(), errorStats) + end := &stats.End{ + EndTime: time.Now(), + Error: e, + } + stats.Handle(stream.Context(), end) } }() if s.opts.cp != nil { @@ -831,12 +853,6 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str trInfo.tr.SetError() } errDesc := fmt.Sprintf("malformed method name: %q", stream.Method()) - if stats.On() { - errorStats := &stats.RPCErr{ - Error: Errorf(codes.InvalidArgument, errDesc), - } - stats.Handle(stream.Context(), errorStats) - } if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) @@ -858,12 +874,6 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str trInfo.tr.SetError() } errDesc := fmt.Sprintf("unknown service %v", service) - if stats.On() { - errorStats := &stats.RPCErr{ - Error: Errorf(codes.InvalidArgument, errDesc), - } - stats.Handle(stream.Context(), errorStats) - } if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) @@ -890,12 +900,6 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str trInfo.tr.SetError() } errDesc := fmt.Sprintf("unknown method %v", method) - if stats.On() { - errorStats := &stats.RPCErr{ - Error: Errorf(codes.InvalidArgument, errDesc), - } - stats.Handle(stream.Context(), errorStats) - } if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil { if trInfo != nil { trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) diff --git a/stats/stats.go b/stats/stats.go index 73394e26..e7094c1c 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -52,6 +52,17 @@ type RPCStats interface { IsClient() bool } +// Begin contains stats when an RPC begins. +type Begin struct { + // Client is true if this Begin is from client side. + Client bool + // BeginTime is the time when the RPC begins. + BeginTime time.Time +} + +// IsClient indicates if this is from client side. +func (s *Begin) IsClient() bool { return s.Client } + // InPayload contains the information for an incoming payload. type InPayload struct { // Client is true if this InPayload is from client side. @@ -156,16 +167,18 @@ type OutTrailer struct { // IsClient indicates if this is from client side. func (s *OutTrailer) IsClient() bool { return s.Client } -// RPCErr contains stats when an error happens. -type RPCErr struct { - // Client is true if this RPCErr is from client side. +// End contains stats when an RPC ends. +type End struct { + // Client is true if this End is from client side. Client bool + // EndTime is the time when the RPC ends. + EndTime time.Time // Error is the error just happened. Its type is gRPC error. Error error } // IsClient indicates if this is from client side. -func (s *RPCErr) IsClient() bool { return s.Client } +func (s *End) IsClient() bool { return s.Client } var ( on = new(int32) diff --git a/stats/stats_test.go b/stats/stats_test.go index 70ca6099..d0643a1f 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -307,7 +307,8 @@ type gotData struct { } const ( - inits int = iota + begin int = iota + end inpay inheader intrailer @@ -317,6 +318,22 @@ const ( errors ) +func checkBegin(t *testing.T, d *gotData, e *expectedData) { + var ( + ok bool + st *stats.Begin + ) + if st, ok = d.s.(*stats.Begin); !ok { + t.Fatalf("got %T, want Begin", d.s) + } + if d.ctx == nil { + t.Fatalf("d.ctx = nil, want ") + } + if st.BeginTime.IsZero() { + t.Fatalf("st.BeginTime = %v, want ", st.BeginTime) + } +} + func checkInHeader(t *testing.T, d *gotData, e *expectedData) { var ( ok bool @@ -509,17 +526,20 @@ func checkOutTrailer(t *testing.T, d *gotData, e *expectedData) { } } -func checkErrorStats(t *testing.T, d *gotData, e *expectedData) { +func checkEnd(t *testing.T, d *gotData, e *expectedData) { var ( ok bool - st *stats.RPCErr + st *stats.End ) - if st, ok = d.s.(*stats.RPCErr); !ok { - t.Fatalf("got %T, want ErrorStats", d.s) + if st, ok = d.s.(*stats.End); !ok { + t.Fatalf("got %T, want End", d.s) } if d.ctx == nil { t.Fatalf("d.ctx = nil, want ") } + if st.EndTime.IsZero() { + t.Fatalf("st.EndTime = %v, want ", st.EndTime) + } if grpc.Code(st.Error) != grpc.Code(e.err) || grpc.ErrorDesc(st.Error) != grpc.ErrorDesc(e.err) { t.Fatalf("st.Error = %v, want %v", st.Error, e.err) } @@ -559,10 +579,12 @@ func TestServerStatsUnaryRPC(t *testing.T) { checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, + checkBegin, checkInPayload, checkOutHeader, checkOutPayload, checkOutTrailer, + checkEnd, } if len(got) != len(checkFuncs) { @@ -611,10 +633,11 @@ func TestServerStatsUnaryRPCError(t *testing.T) { checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, + checkBegin, checkInPayload, checkOutHeader, checkOutTrailer, - checkErrorStats, + checkEnd, } if len(got) != len(checkFuncs) { @@ -664,6 +687,7 @@ func TestServerStatsStreamingRPC(t *testing.T) { checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, + checkBegin, checkOutHeader, } ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){ @@ -673,7 +697,7 @@ func TestServerStatsStreamingRPC(t *testing.T) { for i := 0; i < count; i++ { checkFuncs = append(checkFuncs, ioPayFuncs...) } - checkFuncs = append(checkFuncs, checkOutTrailer) + checkFuncs = append(checkFuncs, checkOutTrailer, checkEnd) if len(got) != len(checkFuncs) { t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs)) @@ -723,10 +747,11 @@ func TestServerStatsStreamingRPCError(t *testing.T) { checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ checkInHeader, + checkBegin, checkOutHeader, checkInPayload, checkOutTrailer, - checkErrorStats, + checkEnd, } if len(got) != len(checkFuncs) { @@ -780,11 +805,13 @@ func TestClientStatsUnaryRPC(t *testing.T) { } checkFuncs := map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, outheader: {checkOutHeader, 1}, outpay: {checkOutPayload, 1}, inheader: {checkInHeader, 1}, inpay: {checkInPayload, 1}, intrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, } var expectLen int @@ -798,6 +825,12 @@ func TestClientStatsUnaryRPC(t *testing.T) { for _, s := range got { mu.Lock() switch s.s.(type) { + case *stats.Begin: + if checkFuncs[begin].c <= 0 { + t.Fatalf("unexpected stats: %T", s) + } + checkFuncs[begin].f(t, s, expect) + checkFuncs[begin].c-- case *stats.OutHeader: if checkFuncs[outheader].c <= 0 { t.Fatalf("unexpected stats: %T", s) @@ -828,6 +861,12 @@ func TestClientStatsUnaryRPC(t *testing.T) { } checkFuncs[intrailer].f(t, s, expect) checkFuncs[intrailer].c-- + case *stats.End: + if checkFuncs[end].c <= 0 { + t.Fatalf("unexpected stats: %T", s) + } + checkFuncs[end].f(t, s, expect) + checkFuncs[end].c-- default: t.Fatalf("unexpected stats: %T", s) } @@ -871,11 +910,12 @@ func TestClientStatsUnaryRPCError(t *testing.T) { } checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){ + checkBegin, checkOutHeader, checkOutPayload, checkInHeader, checkInTrailer, - checkErrorStats, + checkEnd, } if len(got) != len(checkFuncs) { @@ -898,6 +938,7 @@ func TestClientStatsStreamingRPC(t *testing.T) { mu.Lock() defer mu.Unlock() if s.IsClient() { + // t.Logf(" == %T %v", s, s.IsClient()) got = append(got, &gotData{ctx, true, s}) } }) @@ -926,11 +967,13 @@ func TestClientStatsStreamingRPC(t *testing.T) { } checkFuncs := map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, outheader: {checkOutHeader, 1}, outpay: {checkOutPayload, count}, inheader: {checkInHeader, 1}, inpay: {checkInPayload, count}, intrailer: {checkInTrailer, 1}, + end: {checkEnd, 1}, } var expectLen int @@ -944,6 +987,12 @@ func TestClientStatsStreamingRPC(t *testing.T) { for _, s := range got { mu.Lock() switch s.s.(type) { + case *stats.Begin: + if checkFuncs[begin].c <= 0 { + t.Fatalf("unexpected stats: %T", s) + } + checkFuncs[begin].f(t, s, expect) + checkFuncs[begin].c-- case *stats.OutHeader: if checkFuncs[outheader].c <= 0 { t.Fatalf("unexpected stats: %T", s) @@ -974,6 +1023,12 @@ func TestClientStatsStreamingRPC(t *testing.T) { } checkFuncs[intrailer].f(t, s, expect) checkFuncs[intrailer].c-- + case *stats.End: + if checkFuncs[end].c <= 0 { + t.Fatalf("unexpected stats: %T", s) + } + checkFuncs[end].f(t, s, expect) + checkFuncs[end].c-- default: t.Fatalf("unexpected stats: %T", s) } @@ -1019,11 +1074,12 @@ func TestClientStatsStreamingRPCError(t *testing.T) { } checkFuncs := map[int]*checkFuncWithCount{ + begin: {checkBegin, 1}, outheader: {checkOutHeader, 1}, outpay: {checkOutPayload, 1}, inheader: {checkInHeader, 1}, intrailer: {checkInTrailer, 1}, - errors: {checkErrorStats, 1}, + errors: {checkEnd, 1}, } var expectLen int @@ -1037,6 +1093,12 @@ func TestClientStatsStreamingRPCError(t *testing.T) { for _, s := range got { mu.Lock() switch s.s.(type) { + case *stats.Begin: + if checkFuncs[begin].c <= 0 { + t.Fatalf("unexpected stats: %T", s) + } + checkFuncs[begin].f(t, s, expect) + checkFuncs[begin].c-- case *stats.OutHeader: if checkFuncs[outheader].c <= 0 { t.Fatalf("unexpected stats: %T", s) @@ -1067,7 +1129,7 @@ func TestClientStatsStreamingRPCError(t *testing.T) { } checkFuncs[intrailer].f(t, s, expect) checkFuncs[intrailer].c-- - case *stats.RPCErr: + case *stats.End: if checkFuncs[errors].c <= 0 { t.Fatalf("unexpected stats: %T", s) } diff --git a/stream.go b/stream.go index 9d9226a4..08a93476 100644 --- a/stream.go +++ b/stream.go @@ -99,13 +99,21 @@ type ClientStream interface { // NewClientStream creates a new Stream for the client side. This is called // by generated code. func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { + if stats.On() { + begin := &stats.Begin{ + Client: true, + BeginTime: time.Now(), + } + stats.Handle(ctx, begin) + } defer func() { if err != nil && stats.On() { - errorStats := &stats.RPCErr{ + // Only handle end stats if err != nil. + end := &stats.End{ Client: true, Error: err, } - stats.Handle(ctx, errorStats) + stats.Handle(ctx, end) } }() if cc.dopts.streamInt != nil { @@ -266,11 +274,13 @@ func (cs *clientStream) Context() context.Context { func (cs *clientStream) Header() (_ metadata.MD, err error) { defer func() { if err != nil && stats.On() { - errorStats := &stats.RPCErr{ - Client: true, - Error: err, + // Only handle end stats if err != nil. + end := &stats.End{ + Client: true, + EndTime: time.Now(), + Error: err, } - stats.Handle(cs.s.Context(), errorStats) + stats.Handle(cs.s.Context(), end) } }() m, err := cs.s.Header() @@ -296,11 +306,12 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { } defer func() { if err != nil && stats.On() { - errorStats := &stats.RPCErr{ + // Only handle end stats if err != nil. + end := &stats.End{ Client: true, Error: err, } - stats.Handle(cs.s.Context(), errorStats) + stats.Handle(cs.s.Context(), end) } }() defer func() { @@ -350,12 +361,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { func (cs *clientStream) RecvMsg(m interface{}) (err error) { defer func() { - if err != nil && err != io.EOF && stats.On() { - errorStats := &stats.RPCErr{ - Client: true, - Error: err, + if err != nil && stats.On() { + var e error + if err != nil && err != io.EOF { + e = toRPCErr(err) } - stats.Handle(cs.s.Context(), errorStats) + end := &stats.End{ + Client: true, + EndTime: time.Now(), + Error: e, + } + stats.Handle(cs.s.Context(), end) } }() var inStats *stats.InPayload