diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 11c8d3b4..738b316b 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -22,9 +22,10 @@ type ServerParameters struct { // MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway. // Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment. MaxConnectionIdle time.Duration - // MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway + // MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway. + // A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out connection storms. MaxConnectionAge time.Duration - //MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. + // MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. MaxConnectionAgeGrace time.Duration // After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive. Time time.Duration diff --git a/transport/http2_server.go b/transport/http2_server.go index 915074be..6b8535a6 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -105,7 +105,11 @@ type http2Server struct { activeStreams map[uint32]*Stream // the per-stream outbound flow control window size set by the peer. streamSendQuota uint32 - idle time.Time + // idle is the time instant when the connection went idle. + // This is either the begining of the connection or when the number of + // RPCs go down to 0. + // When the connection is busy, this value is set to 0. + idle time.Time } // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is @@ -779,7 +783,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) { // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection // after an additional duration of keepalive.Timeout. func (t *http2Server) keepalive() { - p := &ping{data: [8]byte{}} + p := &ping{} var pingSent bool maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) maxAge := time.NewTimer(t.kp.MaxConnectionAge) @@ -798,16 +802,19 @@ func (t *http2Server) keepalive() { <-keepalive.C } }() - t.mu.Lock() - oidle := t.idle - t.mu.Unlock() for { select { case <-maxIdle.C: t.mu.Lock() idle := t.idle - if idle == oidle { - // The connection has been idle for a duration of keepalive.MaxConnectionIdle. + if idle.IsZero() { // The connection is non-idle. + t.mu.Unlock() + maxIdle.Reset(t.kp.MaxConnectionIdle) + continue + } + val := t.kp.MaxConnectionIdle - time.Since(idle) + if val <= 0 { + // The connection has been idle for a duration of keepalive.MaxConnectionIdle or more. // Gracefully close the connection. t.state = draining t.mu.Unlock() @@ -817,12 +824,7 @@ func (t *http2Server) keepalive() { return } t.mu.Unlock() - if idle.IsZero() { - maxIdle.Reset(t.kp.MaxConnectionIdle) - continue - } - oidle = idle - maxIdle.Reset(t.kp.MaxConnectionIdle - time.Since(idle)) + maxIdle.Reset(val) case <-maxAge.C: t.mu.Lock() t.state = draining diff --git a/transport/transport_test.go b/transport/transport_test.go index 8ce92a2b..8e86075d 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -307,10 +307,10 @@ func TestMaxConnectionIdle(t *testing.T) { MaxConnectionIdle: 2 * time.Second, }, } - server, client := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{}) + server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer server.stop() defer client.Close() - stream, err := client.NewStream(context.Background(), &CallHdr{Host: "localhost", Method: "foo.small"}) + stream, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) if err != nil { t.Fatalf("Client failed to create RPC request: %v", err) } @@ -327,6 +327,29 @@ func TestMaxConnectionIdle(t *testing.T) { } } +// TestMaxConenctionIdleNegative tests that a server will not send GoAway to a non-idle(busy) client. +func TestMaxConnectionIdleNegative(t *testing.T) { + serverConfig := &ServerConfig{ + KeepaliveParams: keepalive.ServerParameters{ + MaxConnectionIdle: 2 * time.Second, + }, + } + server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) + defer server.stop() + defer client.Close() + _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + if err != nil { + t.Fatalf("Client failed to create RPC request: %v", err) + } + timeout := time.NewTimer(time.Second * 4) + select { + case <-client.GoAway(): + t.Fatalf("A non-idle client received a GoAway.") + case <-timeout.C: + } + +} + // TestMaxConnectinoAge tests that a server will send GoAway after a duration of MaxConnectionAge. func TestMaxConnectionAge(t *testing.T) { serverConfig := &ServerConfig{ @@ -334,10 +357,10 @@ func TestMaxConnectionAge(t *testing.T) { MaxConnectionAge: 2 * time.Second, }, } - server, client := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{}) + server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer server.stop() defer client.Close() - _, err := client.NewStream(context.Background(), &CallHdr{Host: "localhost", Method: "foo.small"}) + _, err := client.NewStream(context.Background(), &CallHdr{}) if err != nil { t.Fatalf("Client failed to create stream: %v", err) } @@ -350,7 +373,7 @@ func TestMaxConnectionAge(t *testing.T) { } } -// TestKeepaliveServer tests that a server closes a peer that doesn't respont to keepalive pings. +// TestKeepaliveServer tests that a server closes conneciton with a client that doesn't respond to keepalive pings. func TestKeepaliveServer(t *testing.T) { serverConfig := &ServerConfig{ KeepaliveParams: keepalive.ServerParameters{ @@ -358,7 +381,7 @@ func TestKeepaliveServer(t *testing.T) { Timeout: 1 * time.Second, }, } - server, c := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{}) + server, c := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) defer server.stop() defer c.Close() client, err := net.Dial("tcp", server.lis.Addr().String()) @@ -366,6 +389,8 @@ func TestKeepaliveServer(t *testing.T) { t.Fatalf("Failed to dial: %v", err) } defer client.Close() + // Set read deadline on client conn so that it doesn't block forever in errorsome cases. + client.SetReadDeadline(time.Now().Add(10 * time.Second)) // Wait for keepalive logic to close the connection. time.Sleep(4 * time.Second) b := make([]byte, 24) @@ -381,6 +406,28 @@ func TestKeepaliveServer(t *testing.T) { } } +// TestKeepaliveServerNegative tests that a server doesn't close connection with a client that responds to keepalive pings. +func TestKeepaliveServerNegative(t *testing.T) { + serverConfig := &ServerConfig{ + KeepaliveParams: keepalive.ServerParameters{ + Time: 2 * time.Second, + Timeout: 1 * time.Second, + }, + } + server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}) + defer server.stop() + defer client.Close() + // Give keepalive logic some time by sleeping. + time.Sleep(4 * time.Second) + // Assert that client is still active. + clientTr := client.(*http2Client) + clientTr.mu.Lock() + defer clientTr.mu.Unlock() + if clientTr.state != reachable { + t.Fatalf("Test failed: Expected server-client connection to be healthy.") + } +} + func TestKeepaliveClientClosesIdleTransport(t *testing.T) { done := make(chan net.Conn, 1) tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ @@ -441,7 +488,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } defer conn.Close() // Create a stream. - _, err := tr.NewStream(context.Background(), &CallHdr{}) + _, err := tr.NewStream(context.Background(), &CallHdr{Flush: true}) if err != nil { t.Fatalf("Failed to create a new stream: %v", err) }