Merge branch 'master' of https://github.com/grpc/grpc-go
This commit is contained in:
57
stream.go
57
stream.go
@ -36,6 +36,7 @@ package grpc
|
||||
import (
|
||||
"errors"
|
||||
"io"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
@ -101,10 +102,11 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
Method: method,
|
||||
}
|
||||
cs := &clientStream{
|
||||
desc: desc,
|
||||
codec: cc.dopts.codec,
|
||||
desc: desc,
|
||||
codec: cc.dopts.codec,
|
||||
tracing: EnableTracing,
|
||||
}
|
||||
if EnableTracing {
|
||||
if cs.tracing {
|
||||
cs.traceInfo.tr = trace.New("Sent."+methodFamily(method), method)
|
||||
cs.traceInfo.firstLine.client = true
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
@ -128,11 +130,17 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
|
||||
// clientStream implements a client side Stream.
|
||||
type clientStream struct {
|
||||
t transport.ClientTransport
|
||||
s *transport.Stream
|
||||
p *parser
|
||||
desc *StreamDesc
|
||||
codec Codec
|
||||
t transport.ClientTransport
|
||||
s *transport.Stream
|
||||
p *parser
|
||||
desc *StreamDesc
|
||||
codec Codec
|
||||
|
||||
tracing bool // set to EnableTracing when the clientStream is created.
|
||||
|
||||
mu sync.Mutex // protects traceInfo
|
||||
// traceInfo.tr is set when the clientStream is created (if EnableTracing is true),
|
||||
// and is set to nil when the clientStream's finish method is called.
|
||||
traceInfo traceInfo
|
||||
}
|
||||
|
||||
@ -155,6 +163,13 @@ func (cs *clientStream) Trailer() metadata.MD {
|
||||
}
|
||||
|
||||
func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
||||
if cs.tracing {
|
||||
cs.mu.Lock()
|
||||
if cs.traceInfo.tr != nil {
|
||||
cs.traceInfo.tr.LazyLog(payload{m}, true)
|
||||
}
|
||||
cs.mu.Unlock()
|
||||
}
|
||||
defer func() {
|
||||
if err == nil || err == io.EOF {
|
||||
return
|
||||
@ -175,12 +190,8 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
|
||||
err = recv(cs.p, cs.codec, m)
|
||||
defer func() {
|
||||
// err != nil indicates the termination of the stream.
|
||||
if EnableTracing && err != nil {
|
||||
if err != io.EOF {
|
||||
cs.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
cs.traceInfo.tr.SetError()
|
||||
}
|
||||
cs.traceInfo.tr.Finish()
|
||||
if err != nil {
|
||||
cs.finish(err)
|
||||
}
|
||||
}()
|
||||
if err == nil {
|
||||
@ -226,6 +237,24 @@ func (cs *clientStream) CloseSend() (err error) {
|
||||
return
|
||||
}
|
||||
|
||||
func (cs *clientStream) finish(err error) {
|
||||
if !cs.tracing {
|
||||
return
|
||||
}
|
||||
cs.mu.Lock()
|
||||
defer cs.mu.Unlock()
|
||||
if cs.traceInfo.tr != nil {
|
||||
if err == nil || err == io.EOF {
|
||||
cs.traceInfo.tr.LazyPrintf("RPC: [OK]")
|
||||
} else {
|
||||
cs.traceInfo.tr.LazyPrintf("RPC: [%v]", err)
|
||||
cs.traceInfo.tr.SetError()
|
||||
}
|
||||
cs.traceInfo.tr.Finish()
|
||||
cs.traceInfo.tr = nil
|
||||
}
|
||||
}
|
||||
|
||||
// ServerStream defines the interface a server stream has to satisfy.
|
||||
type ServerStream interface {
|
||||
// SendHeader sends the header metadata. It should not be called
|
||||
|
Reference in New Issue
Block a user