This commit is contained in:
iamqizhao
2016-07-21 18:12:01 -07:00
parent 9ad4c58355
commit 046e606dc5
8 changed files with 34 additions and 46 deletions

View File

@ -636,12 +636,6 @@ func (ac *addrConn) transportMonitor() {
ac.mu.Unlock() ac.mu.Unlock()
return return
} }
//if t.Err() == transport.ErrConnDrain {
// ac.mu.Unlock()
// ac.tearDown(errConnDrain)
// ac.cc.newAddrConn(ac.addr, true)
// return
//}
ac.state = TransientFailure ac.state = TransientFailure
ac.stateCV.Broadcast() ac.stateCV.Broadcast()
ac.mu.Unlock() ac.mu.Unlock()

View File

@ -385,12 +385,6 @@ func toRPCErr(err error) error {
desc: e.Desc, desc: e.Desc,
} }
case transport.ConnectionError: case transport.ConnectionError:
if err == transport.ErrConnDrain {
return &rpcError{
code: codes.Unavailable,
desc: e.Desc,
}
}
return &rpcError{ return &rpcError{
code: codes.Internal, code: codes.Internal,
desc: e.Desc, desc: e.Desc,

View File

@ -93,6 +93,8 @@ type Server struct {
lis map[net.Listener]bool lis map[net.Listener]bool
conns map[io.Closer]bool conns map[io.Closer]bool
drain bool drain bool
// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
// and all the transport goes away.
cv *sync.Cond cv *sync.Cond
m map[string]*service // service name -> service info m map[string]*service // service name -> service info
events trace.EventLog events trace.EventLog
@ -391,7 +393,6 @@ func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInf
st.Close() st.Close()
return return
} }
grpclog.Println("DEBUG addConn ... ")
s.serveStreams(st) s.serveStreams(st)
} }
@ -790,6 +791,8 @@ func (s *Server) Stop() {
s.mu.Unlock() s.mu.Unlock()
} }
// GracefulStop stops the gRPC server gracefully. It stops the server to accept new
// connections and RPCs and blocks until all the pending RPCs are finished.
func (s *Server) GracefulStop() { func (s *Server) GracefulStop() {
s.mu.Lock() s.mu.Lock()
s.drain = true s.drain = true

View File

@ -589,6 +589,7 @@ func testServerGoAway(t *testing.T, e env) {
cc := te.clientConn() cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
// Finish an RPC to make sure the connection is good.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err) t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, <nil>", err)
} }
@ -597,6 +598,7 @@ func testServerGoAway(t *testing.T, e env) {
te.srv.GracefulStop() te.srv.GracefulStop()
close(ch) close(ch)
}() }()
// Loop until the server side GoAway signal is propagated to the client.
for { for {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil { if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil {
@ -604,6 +606,7 @@ func testServerGoAway(t *testing.T, e env) {
} }
break break
} }
// A new RPC should fail with Unavailable error.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err == nil || grpc.Code(err) != codes.Unavailable { if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}); err == nil || grpc.Code(err) != codes.Unavailable {
t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, error code: %d", err, codes.Unavailable) t.Fatalf("TestService/EmptyCall(_, _) = _, %v, want _, error code: %d", err, codes.Unavailable)
} }
@ -640,6 +643,7 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) {
if err != nil { if err != nil {
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
} }
// Finish an RPC to make sure the connection is good.
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil { if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
t.Fatalf("fadjflajdflkaflj") t.Fatalf("fadjflajdflkaflj")
} }
@ -648,13 +652,13 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) {
te.srv.GracefulStop() te.srv.GracefulStop()
close(ch) close(ch)
}() }()
// Loop until the server side GoAway signal is propagated to the client.
for { for {
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond) ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil { if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil {
continue continue
} else {
break
} }
break
} }
respParam := []*testpb.ResponseParameters{ respParam := []*testpb.ResponseParameters{
{ {
@ -670,6 +674,7 @@ func testServerGoAwayPendingRPC(t *testing.T, e env) {
ResponseParameters: respParam, ResponseParameters: respParam,
Payload: payload, Payload: payload,
} }
// The existing RPC should be still good to proceed.
if err := stream.Send(req); err != nil { if err := stream.Send(req); err != nil {
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err) t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
} }

View File

@ -371,6 +371,7 @@ func (ht *serverHandlerTransport) runStream() {
} }
func (ht *serverHandlerTransport) GoAway() { func (ht *serverHandlerTransport) GoAway() {
panic("not implemented")
} }
// mapRecvMsgError returns the non-nil err into the appropriate // mapRecvMsgError returns the non-nil err into the appropriate

View File

@ -71,7 +71,8 @@ type http2Client struct {
shutdownChan chan struct{} shutdownChan chan struct{}
// errorChan is closed to notify the I/O error to the caller. // errorChan is closed to notify the I/O error to the caller.
errorChan chan struct{} errorChan chan struct{}
//err error // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor)
// that the server sent GoAway on this transport.
goAway chan struct{} goAway chan struct{}
framer *framer framer *framer
@ -99,6 +100,7 @@ type http2Client struct {
maxStreams int maxStreams int
// the per-stream outbound flow control window size set by the peer. // the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32 streamSendQuota uint32
// goAwayID records the Last-Stream-ID in the GoAway frame from the server.
goAwayID uint32 goAwayID uint32
} }
@ -957,10 +959,6 @@ func (t *http2Client) GoAway() <-chan struct{} {
return t.goAway return t.goAway
} }
//func (t *http2Client) Err() error {
// return t.err
//}
func (t *http2Client) notifyError(err error) { func (t *http2Client) notifyError(err error) {
t.mu.Lock() t.mu.Lock()
defer t.mu.Unlock() defer t.mu.Unlock()

View File

@ -196,11 +196,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
s.recvCompress = state.encoding s.recvCompress = state.encoding
s.method = state.method s.method = state.method
t.mu.Lock() t.mu.Lock()
if t.state == draining {
t.mu.Unlock()
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeRefusedStream})
return
}
if t.state != reachable { if t.state != reachable {
t.mu.Unlock() t.mu.Unlock()
return return
@ -292,8 +287,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream)) {
case *http2.WindowUpdateFrame: case *http2.WindowUpdateFrame:
t.handleWindowUpdate(frame) t.handleWindowUpdate(frame)
case *http2.GoAwayFrame: case *http2.GoAwayFrame:
t.Close() // TODO: Handle GoAway from the client appropriately.
break
default: default:
grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame) grpclog.Printf("transport: http2Server.HandleStreams found unhandled frame type %v.", frame)
} }

View File

@ -174,7 +174,7 @@ type Stream struct {
cancel context.CancelFunc cancel context.CancelFunc
// done is closed when the final status arrives. // done is closed when the final status arrives.
done chan struct{} done chan struct{}
// goAway // goAway is closed when the server sent GoAways signal before this stream was initiated.
goAway chan struct{} goAway chan struct{}
// method records the associated RPC method of the stream. // method records the associated RPC method of the stream.
method string method string
@ -221,10 +221,14 @@ func (s *Stream) SetSendCompress(str string) {
s.sendCompress = str s.sendCompress = str
} }
// Done returns a chanel which is closed when it receives the final status
// from the server.
func (s *Stream) Done() <-chan struct{} { func (s *Stream) Done() <-chan struct{} {
return s.done return s.done
} }
// GoAway returns a channel which is closed when the server sent GoAways signal
// before this stream was initiated.
func (s *Stream) GoAway() <-chan struct{} { func (s *Stream) GoAway() <-chan struct{} {
return s.goAway return s.goAway
} }
@ -433,16 +437,10 @@ type ClientTransport interface {
// once the transport is initiated. // once the transport is initiated.
Error() <-chan struct{} Error() <-chan struct{}
// Done returns a channel that is closed when some I/O error // GoAway returns a channel that is closed when ClientTranspor
// happens or ClientTranspor receives the draining signal from the server // receives the draining signal from the server (e.g., GOAWAY frame in
// (e.g., GOAWAY frame in HTTP/2). Typically the caller should have // HTTP/2).
// a goroutine to monitor this in order to take action (e.g., close
// the current transport and create a new one) in error case. It should
// not return nil once the transport is initiated.
GoAway() <-chan struct{} GoAway() <-chan struct{}
// Err returns ...
//Err() error
} }
// ServerTransport is the common interface for all gRPC server-side transport // ServerTransport is the common interface for all gRPC server-side transport
@ -475,7 +473,7 @@ type ServerTransport interface {
// RemoteAddr returns the remote network address. // RemoteAddr returns the remote network address.
RemoteAddr() net.Addr RemoteAddr() net.Addr
// GoAway ... // GoAway notifies the client this ServerTransport stops accepting new RPCs.
GoAway() GoAway()
} }
@ -504,11 +502,12 @@ func (e ConnectionError) Error() string {
return fmt.Sprintf("connection error: desc = %q", e.Desc) return fmt.Sprintf("connection error: desc = %q", e.Desc)
} }
// ErrConnClosing indicates that the transport is closing.
var ( var (
// ErrConnClosing indicates that the transport is closing.
ErrConnClosing = ConnectionError{Desc: "transport is closing"} ErrConnClosing = ConnectionError{Desc: "transport is closing"}
ErrConnDrain = ConnectionError{Desc: "transport is being drained"} // ErrStreamDrain indicates that the stream is rejected by the server because
ErrStreamDrain = StreamErrorf(codes.Unavailable, "afjlalf") // the server stops accepting new RPCs.
ErrStreamDrain = StreamErrorf(codes.Unavailable, "the server stops accepting new RPCs")
) )
// StreamError is an error that only affects one stream within a connection. // StreamError is an error that only affects one stream within a connection.