fix comments

This commit is contained in:
Menghan Li
2016-11-07 13:17:59 -08:00
parent a4edc462a2
commit 5938492b60
8 changed files with 47 additions and 66 deletions

14
call.go
View File

@ -50,7 +50,7 @@ import (
// On error, it returns the error and indicates whether the call should be retried. // On error, it returns the error and indicates whether the call should be retried.
// //
// TODO(zhaoq): Check whether the received message sequence is valid. // TODO(zhaoq): Check whether the received message sequence is valid.
// TODO ctx is userCtx, not stream.Context. It is used for stats handling. Change this later if necessary. // TODO ctx is used for stats collection and processing. It is the context passed from the application.
func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) { func recvResponse(ctx context.Context, dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
// Try to acquire header metadata from the server if there is any. // Try to acquire header metadata from the server if there is any.
defer func() { defer func() {
@ -118,11 +118,9 @@ func sendRequest(ctx context.Context, codec Codec, compressor Compressor, callHd
if err != nil { if err != nil {
return nil, Errorf(codes.Internal, "grpc: %v", err) return nil, Errorf(codes.Internal, "grpc: %v", err)
} }
if outPayload != nil {
outPayload.SentTime = time.Now()
}
err = t.Write(stream, outBuf, opts) err = t.Write(stream, outBuf, opts)
if outPayload != nil { if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
stats.Handle(ctx, outPayload) stats.Handle(ctx, outPayload)
} }
// t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method // t.NewStream(...) could lead to an early rejection of the RPC (e.g., the service/method
@ -177,6 +175,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
begin := &stats.Begin{ begin := &stats.Begin{
Client: true, Client: true,
BeginTime: time.Now(), BeginTime: time.Now(),
FailFast: c.failFast,
} }
stats.Handle(ctx, begin) stats.Handle(ctx, begin)
} }
@ -205,9 +204,8 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
) )
// TODO(zhaoq): Need a formal spec of fail-fast. // TODO(zhaoq): Need a formal spec of fail-fast.
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
Host: cc.authority, Host: cc.authority,
Method: method, Method: method,
FailFast: c.failFast,
} }
if cc.dopts.cp != nil { if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type() callHdr.SendCompress = cc.dopts.cp.Type()

View File

@ -271,6 +271,7 @@ func encode(c Codec, msg interface{}, cp Compressor, cbuf *bytes.Buffer, outPayl
} }
if outPayload != nil { if outPayload != nil {
outPayload.Payload = msg outPayload.Payload = msg
// TODO truncate large payload.
outPayload.Data = b outPayload.Data = b
outPayload.Length = len(b) outPayload.Length = len(b)
} }
@ -352,6 +353,7 @@ func recv(p *parser, c Codec, s *transport.Stream, dc Decompressor, m interface{
} }
if inPayload != nil { if inPayload != nil {
inPayload.Payload = m inPayload.Payload = m
// TODO truncate large payload.
inPayload.Data = d inPayload.Data = d
inPayload.Length = len(d) inPayload.Length = len(d)
} }

View File

@ -572,11 +572,9 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
// the optimal option. // the optimal option.
grpclog.Fatalf("grpc: Server failed to encode response %v", err) grpclog.Fatalf("grpc: Server failed to encode response %v", err)
} }
if outPayload != nil {
outPayload.SentTime = time.Now()
}
err = t.Write(stream, p, opts) err = t.Write(stream, p, opts)
if outPayload != nil { if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
stats.Handle(stream.Context(), outPayload) stats.Handle(stream.Context(), outPayload)
} }
return err return err

View File

@ -53,11 +53,14 @@ type RPCStats interface {
} }
// Begin contains stats when an RPC begins. // Begin contains stats when an RPC begins.
// FailFast are only valid if Client is true.
type Begin struct { type Begin struct {
// Client is true if this Begin is from client side. // Client is true if this Begin is from client side.
Client bool Client bool
// BeginTime is the time when the RPC begins. // BeginTime is the time when the RPC begins.
BeginTime time.Time BeginTime time.Time
// FailFast indicates if this RPC is failfast.
FailFast bool
} }
// IsClient indicates if this is from client side. // IsClient indicates if this is from client side.
@ -134,7 +137,7 @@ type OutPayload struct {
func (s *OutPayload) IsClient() bool { return s.Client } func (s *OutPayload) IsClient() bool { return s.Client }
// OutHeader contains stats when a header is sent. // OutHeader contains stats when a header is sent.
// FullMethod, addresses, Encryption and FailFast are only valid if Client is true. // FullMethod, addresses and Encryption are only valid if Client is true.
type OutHeader struct { type OutHeader struct {
// Client is true if this OutHeader is from client side. // Client is true if this OutHeader is from client side.
Client bool Client bool
@ -149,8 +152,6 @@ type OutHeader struct {
LocalAddr net.Addr LocalAddr net.Addr
// Encryption is encrypt method used in the RPC. // Encryption is encrypt method used in the RPC.
Encryption string Encryption string
// FailFast indicates if this RPC is failfast.
FailFast bool
} }
// IsClient indicates if this is from client side. // IsClient indicates if this is from client side.

View File

@ -332,6 +332,11 @@ func checkBegin(t *testing.T, d *gotData, e *expectedData) {
if st.BeginTime.IsZero() { if st.BeginTime.IsZero() {
t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime) t.Fatalf("st.BeginTime = %v, want <non-zero>", st.BeginTime)
} }
if d.client {
if st.FailFast != e.failfast {
t.Fatalf("st.FailFast = %v, want %v", st.FailFast, e.failfast)
}
}
} }
func checkInHeader(t *testing.T, d *gotData, e *expectedData) { func checkInHeader(t *testing.T, d *gotData, e *expectedData) {
@ -452,9 +457,6 @@ func checkOutHeader(t *testing.T, d *gotData, e *expectedData) {
if st.Encryption != e.encryption { if st.Encryption != e.encryption {
t.Fatalf("st.Encryption = %v, want %v", st.Encryption, e.encryption) t.Fatalf("st.Encryption = %v, want %v", st.Encryption, e.encryption)
} }
if st.FailFast != e.failfast {
t.Fatalf("st.FailFast = %v, want %v", st.FailFast, e.failfast)
}
} }
} }

View File

@ -99,23 +99,6 @@ type ClientStream interface {
// NewClientStream creates a new Stream for the client side. This is called // NewClientStream creates a new Stream for the client side. This is called
// by generated code. // by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) { func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
if stats.On() {
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
}
stats.Handle(ctx, begin)
}
defer func() {
if err != nil && stats.On() {
// Only handle end stats if err != nil.
end := &stats.End{
Client: true,
Error: err,
}
stats.Handle(ctx, end)
}
}()
if cc.dopts.streamInt != nil { if cc.dopts.streamInt != nil {
return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...) return cc.dopts.streamInt(ctx, desc, cc, method, newClientStream, opts...)
} }
@ -135,10 +118,9 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
} }
} }
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
Host: cc.authority, Host: cc.authority,
Method: method, Method: method,
Flush: desc.ServerStreams && desc.ClientStreams, Flush: desc.ServerStreams && desc.ClientStreams,
FailFast: c.failFast,
} }
if cc.dopts.cp != nil { if cc.dopts.cp != nil {
callHdr.SendCompress = cc.dopts.cp.Type() callHdr.SendCompress = cc.dopts.cp.Type()
@ -162,6 +144,24 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
} }
}() }()
} }
if stats.On() {
begin := &stats.Begin{
Client: true,
BeginTime: time.Now(),
FailFast: c.failFast,
}
stats.Handle(ctx, begin)
}
defer func() {
if err != nil && stats.On() {
// Only handle end stats if err != nil.
end := &stats.End{
Client: true,
Error: err,
}
stats.Handle(ctx, end)
}
}()
gopts := BalancerGetOptions{ gopts := BalancerGetOptions{
BlockingWait: !c.failFast, BlockingWait: !c.failFast,
} }
@ -311,8 +311,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
cs.mu.Unlock() cs.mu.Unlock()
} }
defer func() { defer func() {
if err != nil && stats.On() { if err != nil && err != io.EOF && stats.On() {
// Only handle end stats if err != nil. // Only handle end stats if err != nil.
// If err == nil, stats.End will be handled when user calls RecvMsg.
end := &stats.End{ end := &stats.End{
Client: true, Client: true,
Error: err, Error: err,
@ -357,11 +358,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
if err != nil { if err != nil {
return Errorf(codes.Internal, "grpc: %v", err) return Errorf(codes.Internal, "grpc: %v", err)
} }
if outPayload != nil {
outPayload.SentTime = time.Now()
}
err = cs.t.Write(cs.s, out, &transport.Options{Last: false}) err = cs.t.Write(cs.s, out, &transport.Options{Last: false})
if outPayload != nil { if err == nil && outPayload != nil {
outPayload.SentTime = time.Now()
stats.Handle(cs.userCtx, outPayload) stats.Handle(cs.userCtx, outPayload)
} }
return err return err
@ -579,13 +578,11 @@ func (ss *serverStream) SendMsg(m interface{}) (err error) {
err = Errorf(codes.Internal, "grpc: %v", err) err = Errorf(codes.Internal, "grpc: %v", err)
return err return err
} }
if outPayload != nil {
outPayload.SentTime = time.Now()
}
if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil { if err := ss.t.Write(ss.s, out, &transport.Options{Last: false}); err != nil {
return toRPCErr(err) return toRPCErr(err)
} }
if outPayload != nil { if outPayload != nil {
outPayload.SentTime = time.Now()
stats.Handle(ss.s.Context(), outPayload) stats.Handle(ss.s.Context(), outPayload)
} }
return nil return nil

View File

@ -456,10 +456,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
Client: true, Client: true,
WireLength: bufLen, WireLength: bufLen,
FullMethod: callHdr.Method, FullMethod: callHdr.Method,
RemoteAddr: t.RemoteAddr(), RemoteAddr: t.conn.RemoteAddr(),
LocalAddr: t.LocalAddr(), LocalAddr: t.conn.LocalAddr(),
Encryption: callHdr.SendCompress, Encryption: callHdr.SendCompress,
FailFast: callHdr.FailFast,
} }
stats.Handle(s.userCtx, outHeader) stats.Handle(s.userCtx, outHeader)
} }
@ -1105,11 +1104,3 @@ func (t *http2Client) notifyError(err error) {
} }
t.mu.Unlock() t.mu.Unlock()
} }
func (t *http2Client) LocalAddr() net.Addr {
return t.conn.LocalAddr()
}
func (t *http2Client) RemoteAddr() net.Addr {
return t.conn.RemoteAddr()
}

View File

@ -430,9 +430,6 @@ type CallHdr struct {
// only a hint. The transport may modify the flush decision // only a hint. The transport may modify the flush decision
// for performance purposes. // for performance purposes.
Flush bool Flush bool
// FailFast indicates whether the RPC is failfast.
FailFast bool
} }
// ClientTransport is the common interface for all gRPC client-side transport // ClientTransport is the common interface for all gRPC client-side transport
@ -471,11 +468,6 @@ type ClientTransport interface {
// receives the draining signal from the server (e.g., GOAWAY frame in // receives the draining signal from the server (e.g., GOAWAY frame in
// HTTP/2). // HTTP/2).
GoAway() <-chan struct{} GoAway() <-chan struct{}
// LocalAddr returns the local network address.
LocalAddr() net.Addr
// RemoteAddr returns the remote network address.
RemoteAddr() net.Addr
} }
// ServerTransport is the common interface for all gRPC server-side transport // ServerTransport is the common interface for all gRPC server-side transport