Finish clientstream when newclientsteram returns error
This commit is contained in:
11
stream.go
11
stream.go
@ -97,11 +97,10 @@ type ClientStream interface {
|
||||
|
||||
// NewClientStream creates a new Stream for the client side. This is called
|
||||
// by generated code.
|
||||
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
|
||||
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (_ ClientStream, err error) {
|
||||
var (
|
||||
t transport.ClientTransport
|
||||
s *transport.Stream
|
||||
err error
|
||||
put func()
|
||||
)
|
||||
c := defaultCallInfo
|
||||
@ -140,6 +139,13 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
cs.trInfo.tr.LazyLog(&cs.trInfo.firstLine, false)
|
||||
ctx = trace.NewContext(ctx, cs.trInfo.tr)
|
||||
}
|
||||
defer func() {
|
||||
if err != nil {
|
||||
// Need to call cs.finish() if error is returned.
|
||||
// Because cs will not be returned to caller.
|
||||
cs.finish(err)
|
||||
}
|
||||
}()
|
||||
gopts := BalancerGetOptions{
|
||||
BlockingWait: !c.failFast,
|
||||
}
|
||||
@ -168,7 +174,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
}
|
||||
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
|
||||
if c.failFast {
|
||||
cs.finish(err)
|
||||
return nil, toRPCErr(err)
|
||||
}
|
||||
continue
|
||||
|
Reference in New Issue
Block a user