Make it work for streaming
This commit is contained in:
@ -625,19 +625,23 @@ func (ac *addrConn) transportMonitor() {
|
||||
// the addrConn is idle (i.e., no RPC in flight).
|
||||
case <-ac.shutdownChan:
|
||||
return
|
||||
case <-t.Done():
|
||||
case <-t.GoAway():
|
||||
ac.tearDown(errConnDrain)
|
||||
ac.cc.newAddrConn(ac.addr, true)
|
||||
return
|
||||
case <-t.Error():
|
||||
ac.mu.Lock()
|
||||
if ac.state == Shutdown {
|
||||
// ac.tearDown(...) has been invoked.
|
||||
ac.mu.Unlock()
|
||||
return
|
||||
}
|
||||
if t.Err() == transport.ErrConnDrain {
|
||||
ac.mu.Unlock()
|
||||
ac.tearDown(errConnDrain)
|
||||
ac.cc.newAddrConn(ac.addr, true)
|
||||
return
|
||||
}
|
||||
//if t.Err() == transport.ErrConnDrain {
|
||||
// ac.mu.Unlock()
|
||||
// ac.tearDown(errConnDrain)
|
||||
// ac.cc.newAddrConn(ac.addr, true)
|
||||
// return
|
||||
//}
|
||||
ac.state = TransientFailure
|
||||
ac.stateCV.Broadcast()
|
||||
ac.mu.Unlock()
|
||||
|
@ -391,6 +391,7 @@ func (s *Server) serveNewHTTP2Transport(c net.Conn, authInfo credentials.AuthInf
|
||||
st.Close()
|
||||
return
|
||||
}
|
||||
grpclog.Println("DEBUG addConn ... ")
|
||||
s.serveStreams(st)
|
||||
}
|
||||
|
||||
@ -808,7 +809,6 @@ func (s *Server) GracefulStop() {
|
||||
s.events = nil
|
||||
}
|
||||
s.mu.Unlock()
|
||||
|
||||
}
|
||||
|
||||
func init() {
|
||||
|
@ -184,7 +184,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
// when there is no pending I/O operations on this stream.
|
||||
go func() {
|
||||
select {
|
||||
case <-t.Done():
|
||||
case <-t.Error():
|
||||
// Incur transport error, simply exit.
|
||||
case <-s.Done():
|
||||
// TODO: The trace of the RPC is terminated here when there is no pending
|
||||
|
@ -565,22 +565,12 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
func TestFailFast(t *testing.T) {
|
||||
defer leakCheck(t)()
|
||||
for _, e := range listTestEnv() {
|
||||
testFailFast(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func TestServerGoAway(t *testing.T) {
|
||||
defer leakCheck(t)()
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
//if e.name != "tcp-clear" {
|
||||
// continue
|
||||
//}
|
||||
testServerGoAway(t, e)
|
||||
}
|
||||
}
|
||||
@ -621,6 +611,83 @@ func testServerGoAway(t *testing.T, e env) {
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
func TestServerGoAwayPendingRPC(t *testing.T) {
|
||||
defer leakCheck(t)()
|
||||
for _, e := range listTestEnv() {
|
||||
if e.name == "handler-tls" {
|
||||
continue
|
||||
}
|
||||
testServerGoAwayPendingRPC(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testServerGoAwayPendingRPC(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
te.declareLogNoise(
|
||||
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||
"grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing",
|
||||
"grpc: Conn.resetTransport failed to create client transport: connection error",
|
||||
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix",
|
||||
)
|
||||
te.startServer(&testServer{security: e.security})
|
||||
defer te.tearDown()
|
||||
|
||||
cc := te.clientConn()
|
||||
tc := testpb.NewTestServiceClient(cc)
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
stream, err := tc.FullDuplexCall(ctx, grpc.FailFast(false))
|
||||
if err != nil {
|
||||
t.Fatalf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
|
||||
}
|
||||
if _, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.FailFast(false)); err != nil {
|
||||
t.Fatalf("fadjflajdflkaflj")
|
||||
}
|
||||
ch := make(chan struct{})
|
||||
go func() {
|
||||
te.srv.GracefulStop()
|
||||
close(ch)
|
||||
}()
|
||||
for {
|
||||
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
|
||||
if _, err := tc.EmptyCall(ctx, &testpb.Empty{}, grpc.FailFast(false)); err == nil {
|
||||
continue
|
||||
} else {
|
||||
break
|
||||
}
|
||||
}
|
||||
respParam := []*testpb.ResponseParameters{
|
||||
{
|
||||
Size: proto.Int32(1),
|
||||
},
|
||||
}
|
||||
payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, int32(100))
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
req := &testpb.StreamingOutputCallRequest{
|
||||
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
|
||||
ResponseParameters: respParam,
|
||||
Payload: payload,
|
||||
}
|
||||
if err := stream.Send(req); err != nil {
|
||||
t.Fatalf("%v.Send(%v) = %v, want <nil>", stream, req, err)
|
||||
}
|
||||
if _, err := stream.Recv(); err != nil {
|
||||
t.Fatalf("%v.Recv() = %v, want _, <nil>", stream, err)
|
||||
}
|
||||
cancel()
|
||||
<-ch
|
||||
awaitNewConnLogOutput()
|
||||
}
|
||||
|
||||
func TestFailFast(t *testing.T) {
|
||||
defer leakCheck(t)()
|
||||
for _, e := range listTestEnv() {
|
||||
testFailFast(t, e)
|
||||
}
|
||||
}
|
||||
|
||||
func testFailFast(t *testing.T, e env) {
|
||||
te := newTest(t, e)
|
||||
te.userAgent = testAppUA
|
||||
|
@ -71,7 +71,8 @@ type http2Client struct {
|
||||
shutdownChan chan struct{}
|
||||
// errorChan is closed to notify the I/O error to the caller.
|
||||
errorChan chan struct{}
|
||||
err error
|
||||
//err error
|
||||
goAway chan struct{}
|
||||
|
||||
framer *framer
|
||||
hBuf *bytes.Buffer // the buffer for HPACK encoding
|
||||
@ -149,6 +150,7 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
|
||||
writableChan: make(chan int, 1),
|
||||
shutdownChan: make(chan struct{}),
|
||||
errorChan: make(chan struct{}),
|
||||
goAway: make(chan struct{}),
|
||||
framer: newFramer(conn),
|
||||
hBuf: &buf,
|
||||
hEnc: hpack.NewEncoder(&buf),
|
||||
@ -408,13 +410,13 @@ func (t *http2Client) CloseStream(s *Stream, err error) {
|
||||
if t.streamsQuota != nil {
|
||||
updateStreams = true
|
||||
}
|
||||
if t.state == draining && len(t.activeStreams) == 1 {
|
||||
delete(t.activeStreams, s.id)
|
||||
if t.state == draining && len(t.activeStreams) == 0 {
|
||||
// The transport is draining and s is the last live stream on t.
|
||||
t.mu.Unlock()
|
||||
t.Close()
|
||||
return
|
||||
}
|
||||
delete(t.activeStreams, s.id)
|
||||
t.mu.Unlock()
|
||||
if updateStreams {
|
||||
t.streamsQuota.add(1)
|
||||
@ -485,11 +487,15 @@ func (t *http2Client) GracefulClose() error {
|
||||
}
|
||||
t.state = draining
|
||||
// Notify the streams which were initiated after the server sent GOAWAY.
|
||||
select {
|
||||
case <-t.goAway:
|
||||
for i := t.goAwayID + 2; i < t.nextID; i += 2 {
|
||||
if s, ok := t.activeStreams[i]; ok {
|
||||
close(s.goAway)
|
||||
}
|
||||
}
|
||||
default:
|
||||
}
|
||||
active := len(t.activeStreams)
|
||||
t.mu.Unlock()
|
||||
if active == 0 {
|
||||
@ -736,8 +742,7 @@ func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
|
||||
t.mu.Lock()
|
||||
if t.state == reachable {
|
||||
t.goAwayID = f.LastStreamID
|
||||
t.err = ErrConnDrain
|
||||
close(t.errorChan)
|
||||
close(t.goAway)
|
||||
}
|
||||
t.mu.Unlock()
|
||||
}
|
||||
@ -944,14 +949,18 @@ func (t *http2Client) controller() {
|
||||
}
|
||||
}
|
||||
|
||||
func (t *http2Client) Done() <-chan struct{} {
|
||||
func (t *http2Client) Error() <-chan struct{} {
|
||||
return t.errorChan
|
||||
}
|
||||
|
||||
func (t *http2Client) Err() error {
|
||||
return t.err
|
||||
func (t *http2Client) GoAway() <-chan struct{} {
|
||||
return t.goAway
|
||||
}
|
||||
|
||||
//func (t *http2Client) Err() error {
|
||||
// return t.err
|
||||
//}
|
||||
|
||||
func (t *http2Client) notifyError(err error) {
|
||||
t.mu.Lock()
|
||||
defer t.mu.Unlock()
|
||||
|
@ -737,6 +737,9 @@ func (t *http2Server) Close() (err error) {
|
||||
func (t *http2Server) closeStream(s *Stream) {
|
||||
t.mu.Lock()
|
||||
delete(t.activeStreams, s.id)
|
||||
if t.state == draining && len(t.activeStreams) == 0 {
|
||||
defer t.Close()
|
||||
}
|
||||
t.mu.Unlock()
|
||||
// In case stream sending and receiving are invoked in separate
|
||||
// goroutines (e.g., bi-directional streaming), cancel needs to be
|
||||
|
@ -431,7 +431,7 @@ type ClientTransport interface {
|
||||
// this in order to take action (e.g., close the current transport
|
||||
// and create a new one) in error case. It should not return nil
|
||||
// once the transport is initiated.
|
||||
//Error() <-chan struct{}
|
||||
Error() <-chan struct{}
|
||||
|
||||
// Done returns a channel that is closed when some I/O error
|
||||
// happens or ClientTranspor receives the draining signal from the server
|
||||
@ -439,10 +439,10 @@ type ClientTransport interface {
|
||||
// a goroutine to monitor this in order to take action (e.g., close
|
||||
// the current transport and create a new one) in error case. It should
|
||||
// not return nil once the transport is initiated.
|
||||
Done() <-chan struct{}
|
||||
GoAway() <-chan struct{}
|
||||
|
||||
// Err returns ...
|
||||
Err() error
|
||||
//Err() error
|
||||
}
|
||||
|
||||
// ServerTransport is the common interface for all gRPC server-side transport
|
||||
|
@ -271,8 +271,8 @@ func TestClientSendAndReceive(t *testing.T) {
|
||||
func TestClientErrorNotify(t *testing.T) {
|
||||
server, ct := setUp(t, 0, math.MaxUint32, normal)
|
||||
go server.stop()
|
||||
// ct.reader should detect the error and activate ct.Done().
|
||||
<-ct.Done()
|
||||
// ct.reader should detect the error and activate ct.Error().
|
||||
<-ct.Error()
|
||||
ct.Close()
|
||||
}
|
||||
|
||||
@ -309,7 +309,7 @@ func TestClientMix(t *testing.T) {
|
||||
s.stop()
|
||||
}(s)
|
||||
go func(ct ClientTransport) {
|
||||
<-ct.Done()
|
||||
<-ct.Error()
|
||||
ct.Close()
|
||||
}(ct)
|
||||
for i := 0; i < 1000; i++ {
|
||||
@ -709,7 +709,7 @@ func TestClientWithMisbehavedServer(t *testing.T) {
|
||||
}
|
||||
}
|
||||
// http2Client.errChan is closed due to connection flow control window size violation.
|
||||
<-conn.Done()
|
||||
<-conn.Error()
|
||||
ct.Close()
|
||||
server.stop()
|
||||
}
|
||||
|
Reference in New Issue
Block a user