add tracing for streaming rpc
This commit is contained in:
		
							
								
								
									
										3
									
								
								call.go
									
									
									
									
									
								
							
							
						
						
									
										3
									
								
								call.go
									
									
									
									
									
								
							| @ -102,9 +102,6 @@ type callInfo struct { | ||||
| 	traceInfo traceInfo // in trace.go | ||||
| } | ||||
|  | ||||
| // EnableTracing controls whether to trace RPCs using the golang.org/x/net/trace package. | ||||
| // This should only be set before any RPCs are sent or received by this program. | ||||
| var EnableTracing = true | ||||
|  | ||||
| // Invoke is called by the generated code. It sends the RPC request on the | ||||
| // wire and returns after response is received. | ||||
|  | ||||
							
								
								
									
										49
									
								
								stream.go
									
									
									
									
									
								
							
							
						
						
									
										49
									
								
								stream.go
									
									
									
									
									
								
							| @ -36,8 +36,10 @@ package grpc | ||||
| import ( | ||||
| 	"errors" | ||||
| 	"io" | ||||
| 	"time" | ||||
|  | ||||
| 	"golang.org/x/net/context" | ||||
| 	"golang.org/x/net/trace" | ||||
| 	"google.golang.org/grpc/codes" | ||||
| 	"google.golang.org/grpc/metadata" | ||||
| 	"google.golang.org/grpc/transport" | ||||
| @ -94,6 +96,7 @@ type ClientStream interface { | ||||
| // by generated code. | ||||
| func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { | ||||
| 	// TODO(zhaoq): CallOption is omitted. Add support when it is needed. | ||||
| 	var trInfo traceInfo | ||||
| 	callHdr := &transport.CallHdr{ | ||||
| 		Host:   cc.authority, | ||||
| 		Method: method, | ||||
| @ -102,26 +105,36 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth | ||||
| 	if err != nil { | ||||
| 		return nil, toRPCErr(err) | ||||
| 	} | ||||
| 	if EnableTracing { | ||||
| 		trInfo.tr = trace.New("Sent."+methodFamily(desc.StreamName), desc.StreamName) | ||||
| 		trInfo.firstLine.client = true | ||||
| 		if deadline, ok := ctx.Deadline(); ok { | ||||
| 			trInfo.firstLine.deadline = deadline.Sub(time.Now()) | ||||
| 		} | ||||
| 		trInfo.tr.LazyLog(&trInfo.firstLine, false) | ||||
| 	} | ||||
| 	s, err := t.NewStream(ctx, callHdr) | ||||
| 	if err != nil { | ||||
| 		return nil, toRPCErr(err) | ||||
| 	} | ||||
| 	return &clientStream{ | ||||
| 		t:     t, | ||||
| 		s:     s, | ||||
| 		p:     &parser{s: s}, | ||||
| 		desc:  desc, | ||||
| 		codec: cc.dopts.codec, | ||||
| 		t:         t, | ||||
| 		s:         s, | ||||
| 		p:         &parser{s: s}, | ||||
| 		desc:      desc, | ||||
| 		codec:     cc.dopts.codec, | ||||
| 		traceInfo: trInfo, | ||||
| 	}, nil | ||||
| } | ||||
|  | ||||
| // clientStream implements a client side Stream. | ||||
| type clientStream struct { | ||||
| 	t     transport.ClientTransport | ||||
| 	s     *transport.Stream | ||||
| 	p     *parser | ||||
| 	desc  *StreamDesc | ||||
| 	codec Codec | ||||
| 	t         transport.ClientTransport | ||||
| 	s         *transport.Stream | ||||
| 	p         *parser | ||||
| 	desc      *StreamDesc | ||||
| 	codec     Codec | ||||
| 	traceInfo traceInfo | ||||
| } | ||||
|  | ||||
| func (cs *clientStream) Context() context.Context { | ||||
| @ -152,6 +165,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { | ||||
| 		} | ||||
| 		err = toRPCErr(err) | ||||
| 	}() | ||||
|  | ||||
| 	out, err := encode(cs.codec, m, compressionNone) | ||||
| 	if err != nil { | ||||
| 		return transport.StreamErrorf(codes.Internal, "grpc: %v", err) | ||||
| @ -172,23 +186,38 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) { | ||||
| 			return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) | ||||
| 		} | ||||
| 		if err == io.EOF { | ||||
| 			if EnableTracing { | ||||
| 				defer cs.traceInfo.tr.Finish() | ||||
| 			} | ||||
| 			if cs.s.StatusCode() == codes.OK { | ||||
| 				return nil | ||||
| 			} | ||||
| 			return Errorf(cs.s.StatusCode(), cs.s.StatusDesc()) | ||||
| 		} | ||||
| 		if EnableTracing { | ||||
| 			cs.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | ||||
| 			cs.traceInfo.tr.SetError() | ||||
| 		} | ||||
| 		return toRPCErr(err) | ||||
| 	} | ||||
| 	if _, ok := err.(transport.ConnectionError); !ok { | ||||
| 		cs.t.CloseStream(cs.s, err) | ||||
| 	} | ||||
| 	if err == io.EOF { | ||||
| 		if EnableTracing { | ||||
| 			cs.traceInfo.tr.Finish() | ||||
| 		} | ||||
|  | ||||
| 		if cs.s.StatusCode() == codes.OK { | ||||
| 			// Returns io.EOF to indicate the end of the stream. | ||||
| 			return | ||||
| 		} | ||||
| 		return Errorf(cs.s.StatusCode(), cs.s.StatusDesc()) | ||||
| 	} | ||||
| 	if EnableTracing { | ||||
| 		cs.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) | ||||
| 		cs.traceInfo.tr.SetError() | ||||
| 	} | ||||
| 	return toRPCErr(err) | ||||
| } | ||||
|  | ||||
|  | ||||
							
								
								
									
										4
									
								
								trace.go
									
									
									
									
									
								
							
							
						
						
									
										4
									
								
								trace.go
									
									
									
									
									
								
							| @ -44,6 +44,10 @@ import ( | ||||
| 	"golang.org/x/net/trace" | ||||
| ) | ||||
|  | ||||
| // EnableTracing controls whether to trace RPCs using the golang.org/x/net/trace package. | ||||
| // This should only be set before any RPCs are sent or received by this program. | ||||
| var EnableTracing = true | ||||
|  | ||||
| // methodFamily returns the trace family for the given method. | ||||
| // It turns "/pkg.Service/GetFoo" into "pkg.Service". | ||||
| func methodFamily(m string) string { | ||||
|  | ||||
		Reference in New Issue
	
	Block a user
	 yangzhouhan
					yangzhouhan