add stats.Begin and stats.End

This commit is contained in:
Menghan Li
2016-11-04 15:30:23 -07:00
parent 8a126b020f
commit 1d2a929ae5
5 changed files with 170 additions and 71 deletions

38
call.go
View File

@ -169,25 +169,21 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
}
}()
}
var (
err error
t transport.ClientTransport
stream *transport.Stream
// Record the put handler from Balancer.Get(...). It is called once the
// RPC has completed or failed.
put func()
)
if stats.On() {
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
}
stats.Handle(ctx, begin)
}
defer func() {
if e != nil && stats.On() {
errorStats := &stats.RPCErr{
Client: true,
Error: e,
}
if stream != nil {
stats.Handle(stream.Context(), errorStats)
} else {
stats.Handle(ctx, errorStats)
if stats.On() {
end := &stats.End{
Client: true,
EndTime: time.Now(),
Error: e,
}
stats.Handle(ctx, end)
}
}()
topts := &transport.Options{
@ -195,6 +191,14 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
Delay: false,
}
for {
var (
err error
t transport.ClientTransport
stream *transport.Stream
// Record the put handler from Balancer.Get(...). It is called once the
// RPC has completed or failed.
put func()
)
// TODO(zhaoq): Need a formal spec of fail-fast.
callHdr := &transport.CallHdr{
Host: cc.authority,

View File

@ -582,12 +582,23 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
}
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc, trInfo *traceInfo) (err error) {
if stats.On() {
begin := &stats.Begin{
BeginTime: time.Now(),
}
stats.Handle(stream.Context(), begin)
}
defer func() {
if stats.On() && err != nil && err != io.EOF {
errorStats := &stats.RPCErr{
Error: toRPCErr(err),
if stats.On() {
var e error
if err != nil && err != io.EOF {
e = toRPCErr(err)
}
stats.Handle(stream.Context(), errorStats)
end := &stats.End{
EndTime: time.Now(),
Error: e,
}
stats.Handle(stream.Context(), end)
}
}()
if trInfo != nil {
@ -741,12 +752,23 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
}
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc, trInfo *traceInfo) (err error) {
if stats.On() {
begin := &stats.Begin{
BeginTime: time.Now(),
}
stats.Handle(stream.Context(), begin)
}
defer func() {
if stats.On() && err != nil && err != io.EOF {
errorStats := &stats.RPCErr{
Error: toRPCErr(err),
if stats.On() {
var e error
if err != nil && err != io.EOF {
e = toRPCErr(err)
}
stats.Handle(stream.Context(), errorStats)
end := &stats.End{
EndTime: time.Now(),
Error: e,
}
stats.Handle(stream.Context(), end)
}
}()
if s.opts.cp != nil {
@ -831,12 +853,6 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("malformed method name: %q", stream.Method())
if stats.On() {
errorStats := &stats.RPCErr{
Error: Errorf(codes.InvalidArgument, errDesc),
}
stats.Handle(stream.Context(), errorStats)
}
if err := t.WriteStatus(stream, codes.InvalidArgument, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
@ -858,12 +874,6 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("unknown service %v", service)
if stats.On() {
errorStats := &stats.RPCErr{
Error: Errorf(codes.InvalidArgument, errDesc),
}
stats.Handle(stream.Context(), errorStats)
}
if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
@ -890,12 +900,6 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str
trInfo.tr.SetError()
}
errDesc := fmt.Sprintf("unknown method %v", method)
if stats.On() {
errorStats := &stats.RPCErr{
Error: Errorf(codes.InvalidArgument, errDesc),
}
stats.Handle(stream.Context(), errorStats)
}
if err := t.WriteStatus(stream, codes.Unimplemented, errDesc); err != nil {
if trInfo != nil {
trInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)

View File

@ -52,6 +52,17 @@ type RPCStats interface {
IsClient() bool
}
// Begin contains stats when an RPC begins.
type Begin struct {
// Client is true if this Begin is from client side.
Client bool
// BeginTime is the time when the RPC begins.
BeginTime time.Time
}
// IsClient indicates if this is from client side.
func (s *Begin) IsClient() bool { return s.Client }
// InPayload contains the information for an incoming payload.
type InPayload struct {
// Client is true if this InPayload is from client side.
@ -156,16 +167,18 @@ type OutTrailer struct {
// IsClient indicates if this is from client side.
func (s *OutTrailer) IsClient() bool { return s.Client }
// RPCErr contains stats when an error happens.
type RPCErr struct {
// Client is true if this RPCErr is from client side.
// End contains stats when an RPC ends.
type End struct {
// Client is true if this End is from client side.
Client bool
// EndTime is the time when the RPC ends.
EndTime time.Time
// Error is the error just happened. Its type is gRPC error.
Error error
}
// IsClient indicates if this is from client side.
func (s *RPCErr) IsClient() bool { return s.Client }
func (s *End) IsClient() bool { return s.Client }
var (
on = new(int32)

View File

@ -307,7 +307,8 @@ type gotData struct {
}
const (
inits int = iota
begin int = iota
end
inpay
inheader
intrailer
@ -317,6 +318,22 @@ const (
errors
)
func checkBegin(t *testing.T, d *gotData, e *expectedData) {
var (
ok bool
st *stats.Begin
)
if st, ok = d.s.(*stats.Begin); !ok {
t.Fatalf("got %T, want Begin", d.s)
}
if d.ctx == nil {
t.Fatalf("d.ctx = nil, want <non-nil>")
}
if st.BeginTime.IsZero() {
t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
}
}
func checkInHeader(t *testing.T, d *gotData, e *expectedData) {
var (
ok bool
@ -509,17 +526,20 @@ func checkOutTrailer(t *testing.T, d *gotData, e *expectedData) {
}
}
func checkErrorStats(t *testing.T, d *gotData, e *expectedData) {
func checkEnd(t *testing.T, d *gotData, e *expectedData) {
var (
ok bool
st *stats.RPCErr
st *stats.End
)
if st, ok = d.s.(*stats.RPCErr); !ok {
t.Fatalf("got %T, want ErrorStats", d.s)
if st, ok = d.s.(*stats.End); !ok {
t.Fatalf("got %T, want End", d.s)
}
if d.ctx == nil {
t.Fatalf("d.ctx = nil, want <non-nil>")
}
if st.EndTime.IsZero() {
t.Fatalf("st.EndTime = %v, want <non-zero>", st.EndTime)
}
if grpc.Code(st.Error) != grpc.Code(e.err) || grpc.ErrorDesc(st.Error) != grpc.ErrorDesc(e.err) {
t.Fatalf("st.Error = %v, want %v", st.Error, e.err)
}
@ -559,10 +579,12 @@ func TestServerStatsUnaryRPC(t *testing.T) {
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkInPayload,
checkOutHeader,
checkOutPayload,
checkOutTrailer,
checkEnd,
}
if len(got) != len(checkFuncs) {
@ -611,10 +633,11 @@ func TestServerStatsUnaryRPCError(t *testing.T) {
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkInPayload,
checkOutHeader,
checkOutTrailer,
checkErrorStats,
checkEnd,
}
if len(got) != len(checkFuncs) {
@ -664,6 +687,7 @@ func TestServerStatsStreamingRPC(t *testing.T) {
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkOutHeader,
}
ioPayFuncs := []func(t *testing.T, d *gotData, e *expectedData){
@ -673,7 +697,7 @@ func TestServerStatsStreamingRPC(t *testing.T) {
for i := 0; i < count; i++ {
checkFuncs = append(checkFuncs, ioPayFuncs...)
}
checkFuncs = append(checkFuncs, checkOutTrailer)
checkFuncs = append(checkFuncs, checkOutTrailer, checkEnd)
if len(got) != len(checkFuncs) {
t.Fatalf("got %v stats, want %v stats", len(got), len(checkFuncs))
@ -723,10 +747,11 @@ func TestServerStatsStreamingRPCError(t *testing.T) {
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkInHeader,
checkBegin,
checkOutHeader,
checkInPayload,
checkOutTrailer,
checkErrorStats,
checkEnd,
}
if len(got) != len(checkFuncs) {
@ -780,11 +805,13 @@ func TestClientStatsUnaryRPC(t *testing.T) {
}
checkFuncs := map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outheader: {checkOutHeader, 1},
outpay: {checkOutPayload, 1},
inheader: {checkInHeader, 1},
inpay: {checkInPayload, 1},
intrailer: {checkInTrailer, 1},
end: {checkEnd, 1},
}
var expectLen int
@ -798,6 +825,12 @@ func TestClientStatsUnaryRPC(t *testing.T) {
for _, s := range got {
mu.Lock()
switch s.s.(type) {
case *stats.Begin:
if checkFuncs[begin].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[begin].f(t, s, expect)
checkFuncs[begin].c--
case *stats.OutHeader:
if checkFuncs[outheader].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
@ -828,6 +861,12 @@ func TestClientStatsUnaryRPC(t *testing.T) {
}
checkFuncs[intrailer].f(t, s, expect)
checkFuncs[intrailer].c--
case *stats.End:
if checkFuncs[end].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[end].f(t, s, expect)
checkFuncs[end].c--
default:
t.Fatalf("unexpected stats: %T", s)
}
@ -871,11 +910,12 @@ func TestClientStatsUnaryRPCError(t *testing.T) {
}
checkFuncs := []func(t *testing.T, d *gotData, e *expectedData){
checkBegin,
checkOutHeader,
checkOutPayload,
checkInHeader,
checkInTrailer,
checkErrorStats,
checkEnd,
}
if len(got) != len(checkFuncs) {
@ -898,6 +938,7 @@ func TestClientStatsStreamingRPC(t *testing.T) {
mu.Lock()
defer mu.Unlock()
if s.IsClient() {
// t.Logf(" == %T %v", s, s.IsClient())
got = append(got, &gotData{ctx, true, s})
}
})
@ -926,11 +967,13 @@ func TestClientStatsStreamingRPC(t *testing.T) {
}
checkFuncs := map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outheader: {checkOutHeader, 1},
outpay: {checkOutPayload, count},
inheader: {checkInHeader, 1},
inpay: {checkInPayload, count},
intrailer: {checkInTrailer, 1},
end: {checkEnd, 1},
}
var expectLen int
@ -944,6 +987,12 @@ func TestClientStatsStreamingRPC(t *testing.T) {
for _, s := range got {
mu.Lock()
switch s.s.(type) {
case *stats.Begin:
if checkFuncs[begin].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[begin].f(t, s, expect)
checkFuncs[begin].c--
case *stats.OutHeader:
if checkFuncs[outheader].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
@ -974,6 +1023,12 @@ func TestClientStatsStreamingRPC(t *testing.T) {
}
checkFuncs[intrailer].f(t, s, expect)
checkFuncs[intrailer].c--
case *stats.End:
if checkFuncs[end].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[end].f(t, s, expect)
checkFuncs[end].c--
default:
t.Fatalf("unexpected stats: %T", s)
}
@ -1019,11 +1074,12 @@ func TestClientStatsStreamingRPCError(t *testing.T) {
}
checkFuncs := map[int]*checkFuncWithCount{
begin: {checkBegin, 1},
outheader: {checkOutHeader, 1},
outpay: {checkOutPayload, 1},
inheader: {checkInHeader, 1},
intrailer: {checkInTrailer, 1},
errors: {checkErrorStats, 1},
errors: {checkEnd, 1},
}
var expectLen int
@ -1037,6 +1093,12 @@ func TestClientStatsStreamingRPCError(t *testing.T) {
for _, s := range got {
mu.Lock()
switch s.s.(type) {
case *stats.Begin:
if checkFuncs[begin].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}
checkFuncs[begin].f(t, s, expect)
checkFuncs[begin].c--
case *stats.OutHeader:
if checkFuncs[outheader].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
@ -1067,7 +1129,7 @@ func TestClientStatsStreamingRPCError(t *testing.T) {
}
checkFuncs[intrailer].f(t, s, expect)
checkFuncs[intrailer].c--
case *stats.RPCErr:
case *stats.End:
if checkFuncs[errors].c <= 0 {
t.Fatalf("unexpected stats: %T", s)
}

View File

@ -99,13 +99,21 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called
// by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
if stats.On() {
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
}
stats.Handle(ctx, begin)
}
defer func() {
if err != nil && stats.On() {
errorStats := &stats.RPCErr{
// Only handle end stats if err != nil.
end := &stats.End{
Client: true,
Error: err,
}
stats.Handle(ctx, errorStats)
stats.Handle(ctx, end)
}
}()
if cc.dopts.streamInt != nil {
@ -266,11 +274,13 @@ func (cs *clientStream) Context() context.Context {
func (cs *clientStream) Header() (_ metadata.MD, err error) {
defer func() {
if err != nil && stats.On() {
errorStats := &stats.RPCErr{
Client: true,
Error: err,
// Only handle end stats if err != nil.
end := &stats.End{
Client: true,
EndTime: time.Now(),
Error: err,
}
stats.Handle(cs.s.Context(), errorStats)
stats.Handle(cs.s.Context(), end)
}
}()
m, err := cs.s.Header()
@ -296,11 +306,12 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
}
defer func() {
if err != nil && stats.On() {
errorStats := &stats.RPCErr{
// Only handle end stats if err != nil.
end := &stats.End{
Client: true,
Error: err,
}
stats.Handle(cs.s.Context(), errorStats)
stats.Handle(cs.s.Context(), end)
}
}()
defer func() {
@ -350,12 +361,17 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
func (cs *clientStream) RecvMsg(m interface{}) (err error) {
defer func() {
if err != nil && err != io.EOF && stats.On() {
errorStats := &stats.RPCErr{
Client: true,
Error: err,
if err != nil && stats.On() {
var e error
if err != nil && err != io.EOF {
e = toRPCErr(err)
}
stats.Handle(cs.s.Context(), errorStats)
end := &stats.End{
Client: true,
EndTime: time.Now(),
Error: e,
}
stats.Handle(cs.s.Context(), end)
}
}()
var inStats *stats.InPayload