diff --git a/stream.go b/stream.go index fb7e50f9..008ad1e2 100644 --- a/stream.go +++ b/stream.go @@ -165,7 +165,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth put() put = nil } - if _, ok := err.(transport.ConnectionError); ok { + if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain { if c.failFast { cs.finish(err) return nil, toRPCErr(err) diff --git a/transport/http2_client.go b/transport/http2_client.go index b6b744d5..6c72431a 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -284,6 +284,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Unlock() return nil, ErrConnClosing } + if t.state == draining { + t.mu.Unlock() + return nil, ErrStreamDrain + } if t.state != reachable { t.mu.Unlock() return nil, ErrConnClosing @@ -310,7 +314,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Lock() if t.state == draining { t.mu.Unlock() - // t has been drained. Return the quota. t.streamsQuota.add(1) // Need to make t writable again so that the rpc in flight can still proceed. t.writableChan <- 0