From f921887ab5c08d234a3e38d95e919b2e609c120f Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 27 Jul 2016 11:36:22 -0700 Subject: [PATCH 01/14] Tune some clean up code --- transport/http2_client.go | 4 +--- 1 file changed, 1 insertion(+), 3 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index 51cf1792..036da819 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -418,9 +418,7 @@ func (t *http2Client) CloseStream(s *Stream, err error) { delete(t.activeStreams, s.id) if t.state == draining && len(t.activeStreams) == 0 { // The transport is draining and s is the last live stream on t. - t.mu.Unlock() - t.Close() - return + defer t.Close() } t.mu.Unlock() if updateStreams { From 110450d45ec9b606e8af18809e57c42770a3d2df Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 27 Jul 2016 17:27:10 -0700 Subject: [PATCH 02/14] fix races introduce by goaway --- clientconn.go | 6 +- server.go | 4 +- test/end2end_test.go | 140 ++++++++++++++++++++++++++++++++++++++ transport/http2_client.go | 17 +++-- transport/http2_server.go | 5 ++ 5 files changed, 166 insertions(+), 6 deletions(-) diff --git a/clientconn.go b/clientconn.go index 3206d674..01e3ef5f 100644 --- a/clientconn.go +++ b/clientconn.go @@ -296,6 +296,8 @@ const ( TransientFailure // Shutdown indicates the ClientConn has started shutting down. Shutdown + // Drain + Drain ) func (s ConnectivityState) String() string { @@ -310,6 +312,8 @@ func (s ConnectivityState) String() string { return "TRANSIENT_FAILURE" case Shutdown: return "SHUTDOWN" + case Drain: + return "DRAIN" default: panic(fmt.Sprintf("unknown connectivity state: %d", s)) } @@ -632,7 +636,7 @@ func (ac *addrConn) transportMonitor() { case <-t.Error(): ac.mu.Lock() if ac.state == Shutdown { - // ac.tearDown(...) has been invoked. + // ac has been shutdown. ac.mu.Unlock() return } diff --git a/server.go b/server.go index 1a250c79..fbf96bf7 100644 --- a/server.go +++ b/server.go @@ -774,6 +774,8 @@ func (s *Server) Stop() { s.lis = nil st := s.conns s.conns = nil + // interrupt GracefulStop if Stop and GracefulStop are called concurrently. + s.cv.Signal() s.mu.Unlock() for lis := range listeners { @@ -803,13 +805,13 @@ func (s *Server) GracefulStop() { for lis := range s.lis { lis.Close() } + s.lis = nil for c := range s.conns { c.(transport.ServerTransport).Drain() } for len(s.conns) != 0 { s.cv.Wait() } - s.lis = nil s.conns = nil if s.events != nil { s.events.Finish() diff --git a/test/end2end_test.go b/test/end2end_test.go index cdbc4c55..5fd61d5c 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -686,6 +686,146 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) { awaitNewConnLogOutput() } +func TestConcurrentClientConnCloseAndServerGoAway(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + if e.name == "handler-tls" { + continue + } + testConcurrentClientConnCloseAndServerGoAway(t, e) + } +} + +func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) { + te := newTest(t, e) + te.userAgent = testAppUA + te.declareLogNoise( + "transport: http2Client.notifyError got notified that the client transport was broken EOF", + "grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing", + "grpc: Conn.resetTransport failed to create client transport: connection error", + "grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix", + ) + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + // Finish an RPC to make sure the connection is good. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) + } + ch := make(chan struct{}) + go func() { + te.srv.GracefulStop() + close(ch) + }() + // Loop until the server side GoAway signal is propagated to the client. + for { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil { + continue + } + break + } + // Stop the server and close all the connections. + te.srv.Stop() + respParam := []*testpb.ResponseParameters{ + { + Size: proto.Int32(1), + }, + } + payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) + if err != nil { + t.Fatal(err) + } + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), + ResponseParameters: respParam, + Payload: payload, + } + if err := stream.Send(req); err == nil { + if _, err := stream.Recv(); err == nil { + t.Fatalf("%v.Recv() = _, %v, want _, ", stream, err) + } + } + <-ch + awaitNewConnLogOutput() +} + +func TestConcurrentServerStopAndGoAway(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + if e.name == "handler-tls" { + continue + } + testConcurrentServerStopAndGoAway(t, e) + } +} + +func testConcurrentServerStopAndGoAway(t *testing.T, e env) { + te := newTest(t, e) + te.userAgent = testAppUA + te.declareLogNoise( + "transport: http2Client.notifyError got notified that the client transport was broken EOF", + "grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing", + "grpc: Conn.resetTransport failed to create client transport: connection error", + "grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix", + ) + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + cc := te.clientConn() + tc := testpb.NewTestServiceClient(cc) + stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)) + if err != nil { + t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) + } + // Finish an RPC to make sure the connection is good. + if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { + t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) + } + ch := make(chan struct{}) + go func() { + te.srv.GracefulStop() + close(ch) + }() + // Loop until the server side GoAway signal is propagated to the client. + for { + ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) + if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil { + continue + } + break + } + // Stop the server and close all the connections. + te.srv.Stop() + respParam := []*testpb.ResponseParameters{ + { + Size: proto.Int32(1), + }, + } + payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) + if err != nil { + t.Fatal(err) + } + req := &testpb.StreamingOutputCallRequest{ + ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), + ResponseParameters: respParam, + Payload: payload, + } + if err := stream.Send(req); err == nil { + if _, err := stream.Recv(); err == nil { + t.Fatalf("%v.Recv() = _, %v, want _, ", stream, err) + } + } + <-ch + awaitNewConnLogOutput() +} + func TestFailFast(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() { diff --git a/transport/http2_client.go b/transport/http2_client.go index 036da819..0426cc93 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -454,7 +454,7 @@ func (t *http2Client) Close() (err error) { t.mu.Unlock() return } - if t.state == reachable { + if t.state == reachable || t.state == draining { close(t.errorChan) } t.state = closing @@ -856,7 +856,11 @@ func (t *http2Client) reader() { // Check the validity of server preface. frame, err := t.framer.readFrame() if err != nil { - t.notifyError(err) + if t.state == draining { + t.Close() + } else { + t.notifyError(err) + } return } sf, ok := frame.(*http2.SettingsFrame) @@ -884,7 +888,12 @@ func (t *http2Client) reader() { continue } else { // Transport error. - t.notifyError(err) + if t.state == draining { + // A network error happened after the connection is drained. Fail the connection immediately. + t.Close() + } else { + t.notifyError(err) + } return } } @@ -993,7 +1002,7 @@ func (t *http2Client) notifyError(err error) { t.mu.Lock() defer t.mu.Unlock() // make sure t.errorChan is closed only once. - if t.state == reachable { + if t.state == reachable || t.state == draining { t.state = unreachable close(t.errorChan) grpclog.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err) diff --git a/transport/http2_server.go b/transport/http2_server.go index 2322c938..357f01ee 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -680,6 +680,11 @@ func (t *http2Server) controller() { t.framer.writeRSTStream(true, i.streamID, i.code) case *goAway: t.mu.Lock() + if t.state == closing { + t.mu.Unlock() + // The transport is closing. + return + } sid := t.maxStreamID t.state = draining t.mu.Unlock() From 63fcabeec516f9bb6c735dd3bd2f45469f9271db Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 27 Jul 2016 17:41:43 -0700 Subject: [PATCH 03/14] some polish --- clientconn.go | 4 ---- test/end2end_test.go | 4 ++-- 2 files changed, 2 insertions(+), 6 deletions(-) diff --git a/clientconn.go b/clientconn.go index 01e3ef5f..bb809e52 100644 --- a/clientconn.go +++ b/clientconn.go @@ -296,8 +296,6 @@ const ( TransientFailure // Shutdown indicates the ClientConn has started shutting down. Shutdown - // Drain - Drain ) func (s ConnectivityState) String() string { @@ -312,8 +310,6 @@ func (s ConnectivityState) String() string { return "TRANSIENT_FAILURE" case Shutdown: return "SHUTDOWN" - case Drain: - return "DRAIN" default: panic(fmt.Sprintf("unknown connectivity state: %d", s)) } diff --git a/test/end2end_test.go b/test/end2end_test.go index 5fd61d5c..57a47482 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -2282,8 +2282,8 @@ func leakCheck(t testing.TB) func() { } return func() { // Loop, waiting for goroutines to shut down. - // Wait up to 5 seconds, but finish as quickly as possible. - deadline := time.Now().Add(5 * time.Second) + // Wait up to 10 seconds, but finish as quickly as possible. + deadline := time.Now().Add(10 * time.Second) for { var leaked []string for _, g := range interestingGoroutines() { From 1faf2ca61b75684b703bba486a1f7ae2dc330bdd Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 27 Jul 2016 19:46:34 -0700 Subject: [PATCH 04/14] fix the race on t.state --- clientconn.go | 20 ++++++++++++++++++++ transport/http2_client.go | 22 +++++++++------------- 2 files changed, 29 insertions(+), 13 deletions(-) diff --git a/clientconn.go b/clientconn.go index bb809e52..0e9b6fb4 100644 --- a/clientconn.go +++ b/clientconn.go @@ -624,12 +624,32 @@ func (ac *addrConn) transportMonitor() { // shutdownChan is needed to detect the teardown when // the addrConn is idle (i.e., no RPC in flight). case <-ac.shutdownChan: + select { + case <-t.Error(): + t.Close() + default: + } return case <-t.GoAway(): + select { + case <-t.Error(): + t.Close() + return + default: + } ac.tearDown(errConnDrain) ac.cc.newAddrConn(ac.addr, true) return case <-t.Error(): + select { + case <-ac.shutdownChan: + t.Close() + return + case <-t.GoAway(): + t.Close() + return + default: + } ac.mu.Lock() if ac.state == Shutdown { // ac has been shutdown. diff --git a/transport/http2_client.go b/transport/http2_client.go index 0426cc93..974d9227 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -856,11 +856,7 @@ func (t *http2Client) reader() { // Check the validity of server preface. frame, err := t.framer.readFrame() if err != nil { - if t.state == draining { - t.Close() - } else { - t.notifyError(err) - } + t.notifyError(err) return } sf, ok := frame.(*http2.SettingsFrame) @@ -888,12 +884,7 @@ func (t *http2Client) reader() { continue } else { // Transport error. - if t.state == draining { - // A network error happened after the connection is drained. Fail the connection immediately. - t.Close() - } else { - t.notifyError(err) - } + t.notifyError(err) return } } @@ -1000,11 +991,16 @@ func (t *http2Client) GoAway() <-chan struct{} { func (t *http2Client) notifyError(err error) { t.mu.Lock() - defer t.mu.Unlock() // make sure t.errorChan is closed only once. - if t.state == reachable || t.state == draining { + if t.state == draining { + t.mu.Unlock() + t.Close() + return + } + if t.state == reachable { t.state = unreachable close(t.errorChan) grpclog.Printf("transport: http2Client.notifyError got notified that the client transport was broken %v.", err) } + t.mu.Unlock() } From cd4ca4d8086b17a665ceea274f20bae70be22711 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 28 Jul 2016 11:07:42 -0700 Subject: [PATCH 05/14] Tune transport Monitor, modify the test and add more comments --- clientconn.go | 16 +++++++++++----- test/end2end_test.go | 39 ++++----------------------------------- 2 files changed, 15 insertions(+), 40 deletions(-) diff --git a/clientconn.go b/clientconn.go index 0e9b6fb4..55ce8e37 100644 --- a/clientconn.go +++ b/clientconn.go @@ -68,7 +68,7 @@ var ( // errCredentialsConflict indicates that grpc.WithTransportCredentials() // and grpc.WithInsecure() are both called for a connection. errCredentialsConflict = errors.New("grpc: transport credentials are set for an insecure connection (grpc.WithTransportCredentials() and grpc.WithInsecure() are both called)") - // errNetworkIP indicates that the connection is down due to some network I/O error. + // errNetworkIO indicates that the connection is down due to some network I/O error. errNetworkIO = errors.New("grpc: failed with network I/O error") // errConnDrain indicates that the connection starts to be drained and does not accept any new RPCs. errConnDrain = errors.New("grpc: the connection is drained") @@ -631,13 +631,18 @@ func (ac *addrConn) transportMonitor() { } return case <-t.GoAway(): + // If GoAway happens without any network I/O error, ac is closed without shutting down the + // underlying transport (the transport will be closed when all the pending RPCs finished or + // failed.). + // If GoAway and some network I/O error happen concurrently, ac and its underlying transport + // are closed. + // In both cases, a new ac is created. select { case <-t.Error(): - t.Close() - return + ac.tearDown(errNetworkIO) default: + ac.tearDown(errConnDrain) } - ac.tearDown(errConnDrain) ac.cc.newAddrConn(ac.addr, true) return case <-t.Error(): @@ -646,7 +651,8 @@ func (ac *addrConn) transportMonitor() { t.Close() return case <-t.GoAway(): - t.Close() + ac.tearDown(errNetworkIO) + ac.cc.newAddrConn(ac.addr, true) return default: } diff --git a/test/end2end_test.go b/test/end2end_test.go index 57a47482..17c9e11d 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -710,50 +710,19 @@ func testConcurrentClientConnCloseAndServerGoAway(t *testing.T, e env) { cc := te.clientConn() tc := testpb.NewTestServiceClient(cc) - stream, err := tc.FullDuplexCall(context.Background(), grpc.FailFast(false)) - if err != nil { - t.Fatalf("%v.FullDuplexCall(_) = _, %v, want ", tc, err) - } - // Finish an RPC to make sure the connection is good. if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { t.Fatalf("%v.EmptyCall(_, _, _) = _, %v, want _, ", tc, err) } ch := make(chan struct{}) + // Close ClientConn and Server concurrently. go func() { te.srv.GracefulStop() close(ch) }() - // Loop until the server side GoAway signal is propagated to the client. - for { - ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) - if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil { - continue - } - break - } - // Stop the server and close all the connections. - te.srv.Stop() - respParam := []*testpb.ResponseParameters{ - { - Size: proto.Int32(1), - }, - } - payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100)) - if err != nil { - t.Fatal(err) - } - req := &testpb.StreamingOutputCallRequest{ - ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(), - ResponseParameters: respParam, - Payload: payload, - } - if err := stream.Send(req); err == nil { - if _, err := stream.Recv(); err == nil { - t.Fatalf("%v.Recv() = _, %v, want _, ", stream, err) - } - } + go func() { + cc.Close() + }() <-ch - awaitNewConnLogOutput() } func TestConcurrentServerStopAndGoAway(t *testing.T) { From 096e703537634de157fd23d981d24944a5c35aba Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Thu, 28 Jul 2016 17:00:23 -0400 Subject: [PATCH 06/14] WithDialer doesn't support cancellation This maintains cancellation for the default dialer, but external users will have to opt in via a future API. --- clientconn.go | 6 ++++-- test/end2end_test.go | 10 ++-------- 2 files changed, 6 insertions(+), 10 deletions(-) diff --git a/clientconn.go b/clientconn.go index 214fb900..b7b8f48f 100644 --- a/clientconn.go +++ b/clientconn.go @@ -196,9 +196,11 @@ func WithTimeout(d time.Duration) DialOption { } // WithDialer returns a DialOption that specifies a function to use for dialing network addresses. -func WithDialer(f func(string, time.Duration, <-chan struct{}) (net.Conn, error)) DialOption { +func WithDialer(f func(string, time.Duration) (net.Conn, error)) DialOption { return func(o *dialOptions) { - o.copts.Dialer = f + o.copts.Dialer = func(addr string, timeout time.Duration, _ <-chan struct{}) (net.Conn, error) { + return f(addr, timeout) + } } } diff --git a/test/end2end_test.go b/test/end2end_test.go index 15460824..ce88d951 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -314,13 +314,7 @@ func (e env) runnable() bool { return true } -func (e env) dialer(addr string, timeout time.Duration, cancel <-chan struct{}) (net.Conn, error) { - // NB: Go 1.6 added a Cancel field on net.Dialer, which would allow this - // to be written as - // - // `(&net.Dialer{Cancel: cancel, Timeout: timeout}).Dial(e.network, addr)` - // - // but that would break compatibility with earlier Go versions. +func (e env) dialer(addr string, timeout time.Duration) (net.Conn, error) { return net.DialTimeout(e.network, addr, timeout) } @@ -511,7 +505,7 @@ func (te *test) declareLogNoise(phrases ...string) { } func (te *test) withServerTester(fn func(st *serverTester)) { - c, err := te.e.dialer(te.srvAddr, 10*time.Second, nil) + c, err := te.e.dialer(te.srvAddr, 10*time.Second) if err != nil { te.t.Fatal(err) } From b2f30e082e413423186459dd9677bd00f11c2ef1 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 28 Jul 2016 16:32:18 -0700 Subject: [PATCH 07/14] fix some goaway related test flakiness --- transport/http2_client.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/transport/http2_client.go b/transport/http2_client.go index 62a6fa28..b6b744d5 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -308,6 +308,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea return nil, err } t.mu.Lock() + if t.state == draining { + t.mu.Unlock() + // t has been drained. Return the quota. + t.streamsQuota.add(1) + // Need to make t writable again so that the rpc in flight can still proceed. + t.writableChan <- 0 + return nil, ErrStreamDrain + } if t.state != reachable { t.mu.Unlock() return nil, ErrConnClosing From 80572b27392bc0d2a9d9bef89c4a7f89db3aa17c Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 28 Jul 2016 16:45:48 -0700 Subject: [PATCH 08/14] fix the streaming rpc case --- stream.go | 2 +- transport/http2_client.go | 5 ++++- 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/stream.go b/stream.go index fb7e50f9..008ad1e2 100644 --- a/stream.go +++ b/stream.go @@ -165,7 +165,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth put() put = nil } - if _, ok := err.(transport.ConnectionError); ok { + if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain { if c.failFast { cs.finish(err) return nil, toRPCErr(err) diff --git a/transport/http2_client.go b/transport/http2_client.go index b6b744d5..6c72431a 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -284,6 +284,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Unlock() return nil, ErrConnClosing } + if t.state == draining { + t.mu.Unlock() + return nil, ErrStreamDrain + } if t.state != reachable { t.mu.Unlock() return nil, ErrConnClosing @@ -310,7 +314,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Lock() if t.state == draining { t.mu.Unlock() - // t has been drained. Return the quota. t.streamsQuota.add(1) // Need to make t writable again so that the rpc in flight can still proceed. t.writableChan <- 0 From 32df3a68d07358f29f2d775e546d83014677b17f Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 28 Jul 2016 18:32:51 -0700 Subject: [PATCH 09/14] more fix --- call.go | 11 ++++++++--- transport/http2_client.go | 4 +++- transport/transport.go | 2 ++ transport/transport_test.go | 4 ++-- 4 files changed, 15 insertions(+), 6 deletions(-) diff --git a/call.go b/call.go index 27cf6411..e4e7771f 100644 --- a/call.go +++ b/call.go @@ -51,6 +51,13 @@ import ( func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error { // Try to acquire header metadata from the server if there is any. var err error + defer func() { + if err != nil { + if _, ok := err.(transport.ConnectionError); !ok { + t.CloseStream(stream, err) + } + } + }() c.headerMD, err = stream.Header() if err != nil { return err @@ -190,20 +197,18 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } return toRPCErr(err) } - // Receive the response err = recvResponse(cc.dopts, t, &c, stream, reply) if err != nil { if put != nil { put() put = nil } - if _, ok := err.(transport.ConnectionError); ok { + if _, ok := err.(transport.ConnectionError); ok || err == transport.ErrStreamDrain { if c.failFast { return toRPCErr(err) } continue } - t.CloseStream(stream, err) return toRPCErr(err) } if c.traceInfo.tr != nil { diff --git a/transport/http2_client.go b/transport/http2_client.go index 6c72431a..d561ab66 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -430,7 +430,9 @@ func (t *http2Client) CloseStream(s *Stream, err error) { delete(t.activeStreams, s.id) if t.state == draining && len(t.activeStreams) == 0 { // The transport is draining and s is the last live stream on t. - defer t.Close() + t.mu.Unlock() + t.Close() + return } t.mu.Unlock() if updateStreams { diff --git a/transport/transport.go b/transport/transport.go index 86c8fcd6..10b4a2e2 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -240,6 +240,8 @@ func (s *Stream) Header() (metadata.MD, error) { select { case <-s.ctx.Done(): return nil, ContextErr(s.ctx.Err()) + case <-s.goAway: + return nil, ErrStreamDrain case <-s.headerChan: return s.header.Copy(), nil } diff --git a/transport/transport_test.go b/transport/transport_test.go index 6f9cc50c..f4af68b7 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -368,8 +368,8 @@ func TestGracefulClose(t *testing.T) { wg.Add(1) go func() { defer wg.Done() - if _, err := ct.NewStream(context.Background(), callHdr); err != ErrConnClosing { - t.Errorf("%v.NewStream(_, _) = _, %v, want _, %v", ct, err, ErrConnClosing) + if _, err := ct.NewStream(context.Background(), callHdr); err != ErrStreamDrain { + t.Errorf("%v.NewStream(_, _) = _, %v, want _, %v", ct, err, ErrStreamDrain) } }() } From d4acbe9e3c1f57bcf8c336149ada155ff9512733 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 28 Jul 2016 18:42:25 -0700 Subject: [PATCH 10/14] a minor fix --- transport/http2_client.go | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index d561ab66..cf392b1c 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -314,7 +314,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.mu.Lock() if t.state == draining { t.mu.Unlock() - t.streamsQuota.add(1) + if checkStreamsQuota { + t.streamsQuota.add(1) + } // Need to make t writable again so that the rpc in flight can still proceed. t.writableChan <- 0 return nil, ErrStreamDrain From 5876dcba5ee97d0cbb1aafabfee8c8ce55d3ca45 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 28 Jul 2016 18:57:53 -0700 Subject: [PATCH 11/14] a minor fix --- test/end2end_test.go | 6 ------ transport/http2_client.go | 2 +- 2 files changed, 1 insertion(+), 7 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 11e215d7..0bbc3d90 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -566,12 +566,6 @@ func TestServerGoAway(t *testing.T) { func testServerGoAway(t *testing.T, e env) { te := newTest(t, e) te.userAgent = testAppUA - te.declareLogNoise( - "transport: http2Client.notifyError got notified that the client transport was broken EOF", - "grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing", - "grpc: Conn.resetTransport failed to create client transport: connection error", - "grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix", - ) te.startServer(&testServer{security: e.security}) defer te.tearDown() diff --git a/transport/http2_client.go b/transport/http2_client.go index cf392b1c..6a34c7e5 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -496,7 +496,7 @@ func (t *http2Client) Close() (err error) { func (t *http2Client) GracefulClose() error { t.mu.Lock() - if t.state == closing { + if t.state == closing || t.state == unreachable { t.mu.Unlock() return nil } From a09c6888cc0200cd23e3a86dae9c79f7adfc5dc2 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 28 Jul 2016 19:47:38 -0700 Subject: [PATCH 12/14] fix another data race along with goaway --- clientconn.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/clientconn.go b/clientconn.go index b3b2ad40..6e018133 100644 --- a/clientconn.go +++ b/clientconn.go @@ -367,6 +367,7 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { addr: addr, dopts: cc.dopts, } + ac.stateCV = sync.NewCond(&ac.mu) ac.dopts.copts.Cancel = make(chan struct{}) if EnableTracing { ac.events = trace.NewEventLog("grpc.ClientConn", ac.addr.Addr) @@ -400,7 +401,6 @@ func (cc *ClientConn) newAddrConn(addr Address, skipWait bool) error { // ii) a buggy Balancer notifies duplicated Addresses. stale.tearDown(errConnDrain) } - ac.stateCV = sync.NewCond(&ac.mu) // skipWait may overwrite the decision in ac.dopts.block. if ac.dopts.block && !skipWait { if err := ac.resetTransport(false); err != nil { From 9102dd469116c3790db1a00e14619de613957cbf Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E9=99=88=E5=B0=8F=E7=8E=89?= Date: Fri, 29 Jul 2016 11:29:02 +0800 Subject: [PATCH 13/14] fix gotutorial.md error link fix route_guide proto link --- examples/gotutorial.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/examples/gotutorial.md b/examples/gotutorial.md index 39833fdf..25c0a2df 100644 --- a/examples/gotutorial.md +++ b/examples/gotutorial.md @@ -28,12 +28,12 @@ Then change your current directory to `grpc-go/examples/route_guide`: $ cd $GOPATH/src/google.golang.org/grpc/examples/route_guide ``` -You also should have the relevant tools installed to generate the server and client interface code - if you don't already, follow the setup instructions in [the Go quick start guide](examples/). +You also should have the relevant tools installed to generate the server and client interface code - if you don't already, follow the setup instructions in [the Go quick start guide](https://github.com/grpc/grpc-go/tree/master/examples/). ## Defining the service -Our first step (as you'll know from the [quick start](http://www.grpc.io/docs/#quick-start)) is to define the gRPC *service* and the method *request* and *response* types using [protocol buffers] (https://developers.google.com/protocol-buffers/docs/overview). You can see the complete .proto file in [`examples/route_guide/proto/route_guide.proto`](examples/route_guide/proto/route_guide.proto). +Our first step (as you'll know from the [quick start](http://www.grpc.io/docs/#quick-start)) is to define the gRPC *service* and the method *request* and *response* types using [protocol buffers] (https://developers.google.com/protocol-buffers/docs/overview). You can see the complete .proto file in [examples/route_guide/routeguide/route_guide.proto](https://github.com/grpc/grpc-go/tree/master/examples/route_guide/routeguide/route_guide.proto). To define a service, you specify a named `service` in your .proto file: From 069ed42115cb160fc413114f8cd4474165204780 Mon Sep 17 00:00:00 2001 From: Tamir Duberstein Date: Fri, 29 Jul 2016 12:26:26 -0400 Subject: [PATCH 14/14] Fix deadlock when GracefulStop()ing multiple times Fixes #793. --- server.go | 2 +- test/end2end_test.go | 21 +++++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/server.go b/server.go index a87ce932..23c4629e 100644 --- a/server.go +++ b/server.go @@ -798,7 +798,7 @@ func (s *Server) Stop() { func (s *Server) GracefulStop() { s.mu.Lock() if s.drain == true || s.conns == nil { - s.mu.Lock() + s.mu.Unlock() return } s.drain = true diff --git a/test/end2end_test.go b/test/end2end_test.go index 0bbc3d90..a272a91c 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -553,6 +553,27 @@ func testTimeoutOnDeadServer(t *testing.T, e env) { awaitNewConnLogOutput() } +func TestServerGracefulStopIdempotent(t *testing.T) { + defer leakCheck(t)() + for _, e := range listTestEnv() { + if e.name == "handler-tls" { + continue + } + testServerGracefulStopIdempotent(t, e) + } +} + +func testServerGracefulStopIdempotent(t *testing.T, e env) { + te := newTest(t, e) + te.userAgent = testAppUA + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + + for i := 0; i < 3; i++ { + te.srv.GracefulStop() + } +} + func TestServerGoAway(t *testing.T) { defer leakCheck(t)() for _, e := range listTestEnv() {