Allocate clientstream after getTransport
This commit is contained in:
65
stream.go
65
stream.go
@ -117,35 +117,25 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
if cc.dopts.cp != nil {
|
if cc.dopts.cp != nil {
|
||||||
callHdr.SendCompress = cc.dopts.cp.Type()
|
callHdr.SendCompress = cc.dopts.cp.Type()
|
||||||
}
|
}
|
||||||
cs := &clientStream{
|
var trInfo traceInfo
|
||||||
opts: opts,
|
if EnableTracing {
|
||||||
c: c,
|
trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
|
||||||
desc: desc,
|
trInfo.firstLine.client = true
|
||||||
codec: cc.dopts.codec,
|
|
||||||
cp: cc.dopts.cp,
|
|
||||||
dc: cc.dopts.dc,
|
|
||||||
tracing: EnableTracing,
|
|
||||||
}
|
|
||||||
if cc.dopts.cp != nil {
|
|
||||||
callHdr.SendCompress = cc.dopts.cp.Type()
|
|
||||||
cs.cbuf = new(bytes.Buffer)
|
|
||||||
}
|
|
||||||
if cs.tracing {
|
|
||||||
cs.trInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
|
|
||||||
cs.trInfo.firstLine.client = true
|
|
||||||
if deadline, ok := ctx.Deadline(); ok {
|
if deadline, ok := ctx.Deadline(); ok {
|
||||||
cs.trInfo.firstLine.deadline = deadline.Sub(time.Now())
|
trInfo.firstLine.deadline = deadline.Sub(time.Now())
|
||||||
}
|
}
|
||||||
cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false)
|
trInfo.tr.LazyLog(&trInfo.firstLine, false)
|
||||||
ctx = trace.NewContext(ctx, cs.trInfo.tr)
|
ctx = trace.NewContext(ctx, trInfo.tr)
|
||||||
|
defer func() {
|
||||||
|
if err != nil {
|
||||||
|
// Need to call tr.finish() if error is returned.
|
||||||
|
// Because tr will not be returned to caller.
|
||||||
|
trInfo.tr.LazyPrintf("RPC: [%v]", err)
|
||||||
|
trInfo.tr.SetError()
|
||||||
|
trInfo.tr.Finish()
|
||||||
|
}
|
||||||
|
}()
|
||||||
}
|
}
|
||||||
defer func() {
|
|
||||||
if err != nil {
|
|
||||||
// Need to call cs.finish() if error is returned.
|
|
||||||
// Because cs will not be returned to caller.
|
|
||||||
cs.finish(err)
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
gopts := BalancerGetOptions{
|
gopts := BalancerGetOptions{
|
||||||
BlockingWait: !c.failFast,
|
BlockingWait: !c.failFast,
|
||||||
}
|
}
|
||||||
@ -182,10 +172,25 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
}
|
}
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
cs.put = put
|
cs := &clientStream{
|
||||||
cs.t = t
|
opts: opts,
|
||||||
cs.s = s
|
c: c,
|
||||||
cs.p = &parser{r: s}
|
desc: desc,
|
||||||
|
codec: cc.dopts.codec,
|
||||||
|
cp: cc.dopts.cp,
|
||||||
|
dc: cc.dopts.dc,
|
||||||
|
|
||||||
|
put: put,
|
||||||
|
t: t,
|
||||||
|
s: s,
|
||||||
|
p: &parser{r: s},
|
||||||
|
|
||||||
|
tracing: EnableTracing,
|
||||||
|
trInfo: trInfo,
|
||||||
|
}
|
||||||
|
if cc.dopts.cp != nil {
|
||||||
|
cs.cbuf = new(bytes.Buffer)
|
||||||
|
}
|
||||||
// Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination
|
// Listen on ctx.Done() to detect cancellation and s.Done() to detect normal termination
|
||||||
// when there is no pending I/O operations on this stream.
|
// when there is no pending I/O operations on this stream.
|
||||||
go func() {
|
go func() {
|
||||||
|
Reference in New Issue
Block a user