Post review updates

This commit is contained in:
Mahak Mukhi
2017-03-20 15:20:36 -07:00
parent 82fb96a360
commit 3c9df8ef7b
3 changed files with 72 additions and 22 deletions

View File

@ -22,9 +22,10 @@ type ServerParameters struct {
// MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway. // MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
// Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment. // Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
MaxConnectionIdle time.Duration MaxConnectionIdle time.Duration
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway // MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
// A random jitter of +/-10% will be added to MAX_CONNECTION_AGE to spread out connection storms.
MaxConnectionAge time.Duration MaxConnectionAge time.Duration
//MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. // MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
MaxConnectionAgeGrace time.Duration MaxConnectionAgeGrace time.Duration
// After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive. // After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
Time time.Duration Time time.Duration

View File

@ -105,7 +105,11 @@ type http2Server struct {
activeStreams map[uint32]*Stream activeStreams map[uint32]*Stream
// the per-stream outbound flow control window size set by the peer. // the per-stream outbound flow control window size set by the peer.
streamSendQuota uint32 streamSendQuota uint32
idle time.Time // idle is the time instant when the connection went idle.
// This is either the begining of the connection or when the number of
// RPCs go down to 0.
// When the connection is busy, this value is set to 0.
idle time.Time
} }
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
@ -779,7 +783,7 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection // 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection
// after an additional duration of keepalive.Timeout. // after an additional duration of keepalive.Timeout.
func (t *http2Server) keepalive() { func (t *http2Server) keepalive() {
p := &ping{data: [8]byte{}} p := &ping{}
var pingSent bool var pingSent bool
maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
maxAge := time.NewTimer(t.kp.MaxConnectionAge) maxAge := time.NewTimer(t.kp.MaxConnectionAge)
@ -798,16 +802,19 @@ func (t *http2Server) keepalive() {
<-keepalive.C <-keepalive.C
} }
}() }()
t.mu.Lock()
oidle := t.idle
t.mu.Unlock()
for { for {
select { select {
case <-maxIdle.C: case <-maxIdle.C:
t.mu.Lock() t.mu.Lock()
idle := t.idle idle := t.idle
if idle == oidle { if idle.IsZero() { // The connection is non-idle.
// The connection has been idle for a duration of keepalive.MaxConnectionIdle. t.mu.Unlock()
maxIdle.Reset(t.kp.MaxConnectionIdle)
continue
}
val := t.kp.MaxConnectionIdle - time.Since(idle)
if val <= 0 {
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
// Gracefully close the connection. // Gracefully close the connection.
t.state = draining t.state = draining
t.mu.Unlock() t.mu.Unlock()
@ -817,12 +824,7 @@ func (t *http2Server) keepalive() {
return return
} }
t.mu.Unlock() t.mu.Unlock()
if idle.IsZero() { maxIdle.Reset(val)
maxIdle.Reset(t.kp.MaxConnectionIdle)
continue
}
oidle = idle
maxIdle.Reset(t.kp.MaxConnectionIdle - time.Since(idle))
case <-maxAge.C: case <-maxAge.C:
t.mu.Lock() t.mu.Lock()
t.state = draining t.state = draining

View File

@ -307,10 +307,10 @@ func TestMaxConnectionIdle(t *testing.T) {
MaxConnectionIdle: 2 * time.Second, MaxConnectionIdle: 2 * time.Second,
}, },
} }
server, client := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{}) server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop() defer server.stop()
defer client.Close() defer client.Close()
stream, err := client.NewStream(context.Background(), &CallHdr{Host: "localhost", Method: "foo.small"}) stream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil { if err != nil {
t.Fatalf("Client failed to create RPC request: %v", err) t.Fatalf("Client failed to create RPC request: %v", err)
} }
@ -327,6 +327,29 @@ func TestMaxConnectionIdle(t *testing.T) {
} }
} }
// TestMaxConenctionIdleNegative tests that a server will not send GoAway to a non-idle(busy) client.
func TestMaxConnectionIdleNegative(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionIdle: 2 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop()
defer client.Close()
_, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil {
t.Fatalf("Client failed to create RPC request: %v", err)
}
timeout := time.NewTimer(time.Second * 4)
select {
case <-client.GoAway():
t.Fatalf("A non-idle client received a GoAway.")
case <-timeout.C:
}
}
// TestMaxConnectinoAge tests that a server will send GoAway after a duration of MaxConnectionAge. // TestMaxConnectinoAge tests that a server will send GoAway after a duration of MaxConnectionAge.
func TestMaxConnectionAge(t *testing.T) { func TestMaxConnectionAge(t *testing.T) {
serverConfig := &ServerConfig{ serverConfig := &ServerConfig{
@ -334,10 +357,10 @@ func TestMaxConnectionAge(t *testing.T) {
MaxConnectionAge: 2 * time.Second, MaxConnectionAge: 2 * time.Second,
}, },
} }
server, client := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{}) server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop() defer server.stop()
defer client.Close() defer client.Close()
_, err := client.NewStream(context.Background(), &CallHdr{Host: "localhost", Method: "foo.small"}) _, err := client.NewStream(context.Background(), &CallHdr{})
if err != nil { if err != nil {
t.Fatalf("Client failed to create stream: %v", err) t.Fatalf("Client failed to create stream: %v", err)
} }
@ -350,7 +373,7 @@ func TestMaxConnectionAge(t *testing.T) {
} }
} }
// TestKeepaliveServer tests that a server closes a peer that doesn't respont to keepalive pings. // TestKeepaliveServer tests that a server closes conneciton with a client that doesn't respond to keepalive pings.
func TestKeepaliveServer(t *testing.T) { func TestKeepaliveServer(t *testing.T) {
serverConfig := &ServerConfig{ serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{ KeepaliveParams: keepalive.ServerParameters{
@ -358,7 +381,7 @@ func TestKeepaliveServer(t *testing.T) {
Timeout: 1 * time.Second, Timeout: 1 * time.Second,
}, },
} }
server, c := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{}) server, c := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop() defer server.stop()
defer c.Close() defer c.Close()
client, err := net.Dial("tcp", server.lis.Addr().String()) client, err := net.Dial("tcp", server.lis.Addr().String())
@ -366,6 +389,8 @@ func TestKeepaliveServer(t *testing.T) {
t.Fatalf("Failed to dial: %v", err) t.Fatalf("Failed to dial: %v", err)
} }
defer client.Close() defer client.Close()
// Set read deadline on client conn so that it doesn't block forever in errorsome cases.
client.SetReadDeadline(time.Now().Add(10 * time.Second))
// Wait for keepalive logic to close the connection. // Wait for keepalive logic to close the connection.
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
b := make([]byte, 24) b := make([]byte, 24)
@ -381,6 +406,28 @@ func TestKeepaliveServer(t *testing.T) {
} }
} }
// TestKeepaliveServerNegative tests that a server doesn't close connection with a client that responds to keepalive pings.
func TestKeepaliveServerNegative(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
Time: 2 * time.Second,
Timeout: 1 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
defer server.stop()
defer client.Close()
// Give keepalive logic some time by sleeping.
time.Sleep(4 * time.Second)
// Assert that client is still active.
clientTr := client.(*http2Client)
clientTr.mu.Lock()
defer clientTr.mu.Unlock()
if clientTr.state != reachable {
t.Fatalf("Test failed: Expected server-client connection to be healthy.")
}
}
func TestKeepaliveClientClosesIdleTransport(t *testing.T) { func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
@ -441,7 +488,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
} }
defer conn.Close() defer conn.Close()
// Create a stream. // Create a stream.
_, err := tr.NewStream(context.Background(), &CallHdr{}) _, err := tr.NewStream(context.Background(), &CallHdr{Flush: true})
if err != nil { if err != nil {
t.Fatalf("Failed to create a new stream: %v", err) t.Fatalf("Failed to create a new stream: %v", err)
} }