Merge pull request #790 from iamqizhao/race-fix
Fix some goaway related test flakiness
This commit is contained in:
11
call.go
11
call.go
@ -51,6 +51,13 @@ import (
|
||||
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
|
||||
// Try to acquire header metadata from the server if there is any.
|
||||
var err error
|
||||
defer func() {
|
||||
if err != nil {
|
||||
if _, ok := err.(transport.ConnectionError); !ok {
|
||||
t.CloseStream(stream, err)
|
||||
}
|
||||
}
|
||||
}()
|
||||
c.headerMD, err = stream.Header()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -190,20 +197,18 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
||||
}
|
||||
return toRPCErr(err)
|
||||
}
|
||||
// Receive the response
|
||||
err = recvResponse(cc.dopts, t, &c, stream, reply)
|
||||
if err != nil {
|
||||
if put != nil {
|
||||
put()
|
||||
put = nil
|
||||
}
|
||||
if _, ok := err.(transport.ConnectionError); ok {
|
||||
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
|
||||
if c.failFast {
|
||||
return toRPCErr(err)
|
||||
}
|
||||
continue
|
||||
}
|
||||
t.CloseStream(stream, err)
|
||||
return toRPCErr(err)
|
||||
}
|
||||
if c.traceInfo.tr != nil {
|
||||
|
@ -165,7 +165,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
put()
|
||||
put = nil
|
||||
}
|
||||
if _, ok := err.(transport.ConnectionError); ok {
|
||||
if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain {
|
||||
if c.failFast {
|
||||
cs.finish(err)
|
||||
return nil, toRPCErr(err)
|
||||
|
@ -566,12 +566,6 @@ func TestServerGoAway(t *testing.T) {
|
||||
func testServerGoAway(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.declareLogNoise(
|
||||
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||
"grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing",
|
||||
"grpc: Conn.resetTransport failed to create client transport: connection error",
|
||||
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix",
|
||||
)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
|
@ -284,6 +284,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
t.mu.Unlock()
|
||||
return nil, ErrConnClosing
|
||||
}
|
||||
if t.state == draining {
|
||||
t.mu.Unlock()
|
||||
return nil, ErrStreamDrain
|
||||
}
|
||||
if t.state != reachable {
|
||||
t.mu.Unlock()
|
||||
return nil, ErrConnClosing
|
||||
@ -308,6 +312,15 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
||||
return nil, err
|
||||
}
|
||||
t.mu.Lock()
|
||||
if t.state == draining {
|
||||
t.mu.Unlock()
|
||||
if checkStreamsQuota {
|
||||
t.streamsQuota.add(1)
|
||||
}
|
||||
// Need to make t writable again so that the rpc in flight can still proceed.
|
||||
t.writableChan <- 0
|
||||
return nil, ErrStreamDrain
|
||||
}
|
||||
if t.state != reachable {
|
||||
t.mu.Unlock()
|
||||
return nil, ErrConnClosing
|
||||
@ -419,7 +432,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
delete(t.activeStreams, s.id)
|
||||
if t.state == draining && len(t.activeStreams) == 0 {
|
||||
// The transport is draining and s is the last live stream on t.
|
||||
defer t.Close()
|
||||
t.mu.Unlock()
|
||||
t.Close()
|
||||
return
|
||||
}
|
||||
t.mu.Unlock()
|
||||
if updateStreams {
|
||||
@ -481,7 +496,7 @@ func (t *http2Client) Close() (err error) {
|
||||
|
||||
func (t *http2Client) GracefulClose() error {
|
||||
t.mu.Lock()
|
||||
if t.state == closing {
|
||||
if t.state == closing || t.state == unreachable {
|
||||
t.mu.Unlock()
|
||||
return nil
|
||||
}
|
||||
|
@ -240,6 +240,8 @@ func (s *Stream) Header() (metadata.MD, error) {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return nil, ContextErr(s.ctx.Err())
|
||||
case <-s.goAway:
|
||||
return nil, ErrStreamDrain
|
||||
case <-s.headerChan:
|
||||
return s.header.Copy(), nil
|
||||
}
|
||||
|
@ -368,8 +368,8 @@ func TestGracefulClose(t *testing.T) {
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
if _, err := ct.NewStream(context.Background(), callHdr); err != ErrConnClosing {
|
||||
t.Errorf("%v.NewStream(_, _) = _, %v, want _, %v", ct, err, ErrConnClosing)
|
||||
if _, err := ct.NewStream(context.Background(), callHdr); err != ErrStreamDrain {
|
||||
t.Errorf("%v.NewStream(_, _) = _, %v, want _, %v", ct, err, ErrStreamDrain)
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
Reference in New Issue
Block a user