diff --git a/clientconn.go b/clientconn.go index 4933b554..de7d0383 100644 --- a/clientconn.go +++ b/clientconn.go @@ -625,19 +625,23 @@ func (ac *addrConn) transportMonitor() { // the addrConn is idle (i.e., no RPC in flight). case <-ac.shutdownChan: return - case <-t.Done(): + case <-t.GoAway(): + ac.tearDown(errConnDrain) + ac.cc.newAddrConn(ac.addr, true) + return + case <-t.Error(): ac.mu.Lock() if ac.state == Shutdown { // ac.tearDown(...) has been invoked. ac.mu.Unlock() return } - if t.Err() == transport.ErrConnDrain { - ac.mu.Unlock() - ac.tearDown(errConnDrain) - ac.cc.newAddrConn(ac.addr, true) - return - } + //if t.Err() == transport.ErrConnDrain { + // ac.mu.Unlock() + // ac.tearDown(errConnDrain) + // ac.cc.newAddrConn(ac.addr, true) + // return + //} ac.state = TransientFailure ac.stateCV.Broadcast() ac.mu.Unlock() diff --git a/server.go b/server.go index 34e69102..2f847100 100644 --- a/server.go +++ b/server.go @@ -391,6 +391,7 @@ func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInf st.Close() return } + grpclog.Println("DEBUG addConn ... ") s.serveStreams(st) } @@ -808,7 +809,6 @@ func (s *Server) GracefulStop() { s.events = nil } s.mu.Unlock() - } func init() { diff --git a/stream.go b/stream.go index deb8663c..fb7e50f9 100644 --- a/stream.go +++ b/stream.go @@ -184,7 +184,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth // when there is no pending I/O operations on this stream. go func() { select { - case <-t.Done(): + case <-t.Error(): // Incur transport error, simply exit. case <-s.Done(): // TODO: The trace of the RPC is terminated here when there is no pending diff --git a/test/end2end_test.go b/test/end2end_test.go index 721e6706..c9b5f539 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -565,22 +565,12 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { awaitNewConnLogOutput() } -func TestFailFast(t *testing.T) { - defer leakCheck(t)() - for _, e := range listTestEnv() { - testFailFast(t, e) - } -} - func TestServerGoAway(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { if e.name == "handler-tls" { continue } - //if e.name != "tcp-clear" { - // continue - //} testServerGoAway(t, e) } } @@ -621,6 +611,83 @@ func testServerGoAway(t *testing.T, e env) { awaitNewConnLogOutput() } +func TestServerGoAwayPendingRPC(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + if e.name == "handler-tls" { + continue + } + testServerGoAwayPendingRPC(t, e) + } +} + +func testServerGoAwayPendingRPC(t *testing.T, e env) { + te := newTest(t, e) + te.userAgent = testAppUA + te.declareLogNoise( + "transport: http2Client.notifyError got notified that the client transport was broken EOF", + "grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing", + "grpc: Conn.resetTransport failed to create client transport: connection error", + "grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix", + ) + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + ctx, cancel := context.WithCancel(context.Background()) + stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false)) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + t.Fatalf("fadjflajdflkaflj") + } + ch := make(chan struct{}) + go func() { + te.srv.GracefulStop() + close(ch) + }() + for { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil { + continue + } else { + break + } + } + respParam := []*testpb.ResponseParameters{ + { + Size: proto.Int32(1), + }, + } + payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) + if err != nil { + t.Fatal(err) + } + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), + ResponseParameters: respParam, + Payload: payload, + } + if err := stream.Send(req); err != nil { + t.Fatalf("%v.Send(%v) = %v, want ", stream, req, err) + } + if _, err := stream.Recv(); err != nil { + t.Fatalf("%v.Recv() = %v, want _, ", stream, err) + } + cancel() + <-ch + awaitNewConnLogOutput() +} + +func TestFailFast(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + testFailFast(t, e) + } +} + func testFailFast(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA diff --git a/transport/http2_client.go b/transport/http2_client.go index 2ec703f0..71873ef1 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -71,7 +71,8 @@ type http2Client struct { shutdownChan chan struct{} // errorChan is closed to notify the I/O error to the caller. errorChan chan struct{} - err error + //err error + goAway chan struct{} framer *framer hBuf *bytes.Buffer // the buffer for HPACK encoding @@ -149,6 +150,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e writableChan: make(chan int, 1), shutdownChan: make(chan struct{}), errorChan: make(chan struct{}), + goAway: make(chan struct{}), framer: newFramer(conn), hBuf: &buf, hEnc: hpack.NewEncoder(&buf), @@ -408,13 +410,13 @@ func (t *http2Client) CloseStream(s *Stream, err error) { if t.streamsQuota != nil { updateStreams = true } - if t.state == draining && len(t.activeStreams) == 1 { + delete(t.activeStreams, s.id) + if t.state == draining && len(t.activeStreams) == 0 { // The transport is draining and s is the last live stream on t. t.mu.Unlock() t.Close() return } - delete(t.activeStreams, s.id) t.mu.Unlock() if updateStreams { t.streamsQuota.add(1) @@ -485,10 +487,14 @@ func (t *http2Client) GracefulClose() error { } t.state = draining // Notify the streams which were initiated after the server sent GOAWAY. - for i := t.goAwayID + 2; i < t.nextID; i += 2 { - if s, ok := t.activeStreams[i]; ok { - close(s.goAway) + select { + case <-t.goAway: + for i := t.goAwayID + 2; i < t.nextID; i += 2 { + if s, ok := t.activeStreams[i]; ok { + close(s.goAway) + } } + default: } active := len(t.activeStreams) t.mu.Unlock() @@ -736,8 +742,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { t.mu.Lock() if t.state == reachable { t.goAwayID = f.LastStreamID - t.err = ErrConnDrain - close(t.errorChan) + close(t.goAway) } t.mu.Unlock() } @@ -944,14 +949,18 @@ func (t *http2Client) controller() { } } -func (t *http2Client) Done() <-chan struct{} { +func (t *http2Client) Error() <-chan struct{} { return t.errorChan } -func (t *http2Client) Err() error { - return t.err +func (t *http2Client) GoAway() <-chan struct{} { + return t.goAway } +//func (t *http2Client) Err() error { +// return t.err +//} + func (t *http2Client) notifyError(err error) { t.mu.Lock() defer t.mu.Unlock() diff --git a/transport/http2_server.go b/transport/http2_server.go index d7cab4fe..37c9a9ae 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -737,6 +737,9 @@ func (t *http2Server) Close() (err error) { func (t *http2Server) closeStream(s *Stream) { t.mu.Lock() delete(t.activeStreams, s.id) + if t.state == draining && len(t.activeStreams) == 0 { + defer t.Close() + } t.mu.Unlock() // In case stream sending and receiving are invoked in separate // goroutines (e.g., bi-directional streaming), cancel needs to be diff --git a/transport/transport.go b/transport/transport.go index 2372f322..e592bfe9 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -431,7 +431,7 @@ type ClientTransport interface { // this in order to take action (e.g., close the current transport // and create a new one) in error case. It should not return nil // once the transport is initiated. - //Error() <-chan struct{} + Error() <-chan struct{} // Done returns a channel that is closed when some I/O error // happens or ClientTranspor receives the draining signal from the server @@ -439,10 +439,10 @@ type ClientTransport interface { // a goroutine to monitor this in order to take action (e.g., close // the current transport and create a new one) in error case. It should // not return nil once the transport is initiated. - Done() <-chan struct{} + GoAway() <-chan struct{} // Err returns ... - Err() error + //Err() error } // ServerTransport is the common interface for all gRPC server-side transport diff --git a/transport/transport_test.go b/transport/transport_test.go index a1c1cdd3..5a517e0b 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -271,8 +271,8 @@ func TestClientSendAndReceive(t *testing.T) { func TestClientErrorNotify(t *testing.T) { server, ct := setUp(t, 0, math.MaxUint32, normal) go server.stop() - // ct.reader should detect the error and activate ct.Done(). - <-ct.Done() + // ct.reader should detect the error and activate ct.Error(). + <-ct.Error() ct.Close() } @@ -309,7 +309,7 @@ func TestClientMix(t *testing.T) { s.stop() }(s) go func(ct ClientTransport) { - <-ct.Done() + <-ct.Error() ct.Close() }(ct) for i := 0; i < 1000; i++ { @@ -709,7 +709,7 @@ func TestClientWithMisbehavedServer(t *testing.T) { } } // http2Client.errChan is closed due to connection flow control window size violation. - <-conn.Done() + <-conn.Error() ct.Close() server.stop() }