diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 4ee72804..11c8d3b4 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -17,11 +17,18 @@ type ClientParameters struct { PermitWithoutStream bool } -// TODO(mmukhi) : documentation +// ServerParameters is used to set keepalive and max-age parameters on the server-side. type ServerParameters struct { - MaxConnectionIdle time.Duration - MaxConnectionAge time.Duration + // MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway. + // Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment. + MaxConnectionIdle time.Duration + // MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway + MaxConnectionAge time.Duration + //MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed. MaxConnectionAgeGrace time.Duration - Time time.Duration - Timeout time.Duration + // After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive. + Time time.Duration + // After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that + // the connection is closed. + Timeout time.Duration } diff --git a/server.go b/server.go index 4fe5edac..b19a3c4a 100644 --- a/server.go +++ b/server.go @@ -126,7 +126,7 @@ var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size l // A ServerOption sets options. type ServerOption func(*options) -// TODO(mmukhi) : Documentation +// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server. func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { return func(o *options) { o.keepaliveParams = kp @@ -478,7 +478,7 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) AuthInfo: authInfo, InTapHandle: s.opts.inTapHandle, StatsHandler: s.opts.statsHandler, - keepaliveParams: s.opts.keepaliveParams, + KeepaliveParams: s.opts.keepaliveParams, } st, err := transport.NewServerTransport("http2", c, config) if err != nil { diff --git a/transport/http2_server.go b/transport/http2_server.go index a575b167..915074be 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -38,9 +38,11 @@ import ( "errors" "io" "math" + "math/rand" "net" "strconv" "sync" + "sync/atomic" "time" "golang.org/x/net/context" @@ -92,7 +94,10 @@ type http2Server struct { stats stats.Handler - // TODO(mmukhi): Documentation + // Flag to keep track of reading activity on transport. + // 1 is true and 0 is false. + activity uint32 // Accessed atomically. + // Keepalive and max-age parameters for the server. kp keepalive.ServerParameters mu sync.Mutex // guard the following @@ -134,13 +139,15 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err return nil, connectionErrorf(true, err, "transport: %v", err) } } - kp := config.keepaliveParams + kp := config.KeepaliveParams if kp.MaxConnectionIdle == 0 { kp.MaxConnectionIdle = defaultMaxConnectionIdle } if kp.MaxConnectionAge == 0 { kp.MaxConnectionAge = defaultMaxConnectionAge } + // Add a jitter to MaxConnectionAge. + kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge) if kp.MaxConnectionAgeGrace == 0 { kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace } @@ -323,6 +330,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context. t.Close() return } + atomic.CompareAndSwapUint32(&t.activity, 0, 1) sf, ok := frame.(*http2.SettingsFrame) if !ok { grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame) @@ -333,6 +341,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context. for { frame, err := t.framer.readFrame() + atomic.CompareAndSwapUint32(&t.activity, 0, 1) if err != nil { if se, ok := err.(http2.StreamError); ok { t.mu.Lock() @@ -763,9 +772,15 @@ func (t *http2Server) applySettings(ss []http2.Setting) { } } -// TODO(mmukhi): Documentation +// keepalive running in a separate goroutine does the following: +// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. +// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. +// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. +// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection +// after an additional duration of keepalive.Timeout. func (t *http2Server) keepalive() { - //p := &ping{data: [8]byte{}} + p := &ping{data: [8]byte{}} + var pingSent bool maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) maxAge := time.NewTimer(t.kp.MaxConnectionAge) keepalive := time.NewTimer(t.kp.Time) @@ -809,16 +824,39 @@ func (t *http2Server) keepalive() { oidle = idle maxIdle.Reset(t.kp.MaxConnectionIdle - time.Since(idle)) case <-maxAge.C: - // Reseting the timer so that the clean-up doesn't deadlock. - maxAge.Reset(infinity) + t.mu.Lock() + t.state = draining + t.mu.Unlock() + t.Drain() + maxAge.Reset(t.kp.MaxConnectionAgeGrace) + select { + case <-maxAge.C: + // Close the connection after grace period. + t.Close() + // Reseting the timer so that the clean-up doesn't deadlock. + maxAge.Reset(infinity) + case <-t.shutdownChan: + } + return case <-keepalive.C: - // Reseting the timer so that the clean-up doesn't deadlock. - keepalive.Reset(infinity) + if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { + pingSent = false + keepalive.Reset(t.kp.Time) + continue + } + if pingSent { + t.Close() + // Reseting the timer so that the clean-up doesn't deadlock. + keepalive.Reset(infinity) + return + } + pingSent = true + t.controlBuf.put(p) + keepalive.Reset(t.kp.Timeout) case <-t.shutdownChan: return } } - } // controller running in a separate goroutine takes charge of sending control @@ -934,3 +972,14 @@ func (t *http2Server) RemoteAddr() net.Addr { func (t *http2Server) Drain() { t.controlBuf.put(&goAway{}) } + +func getJitter(v time.Duration) time.Duration { + if v == infinity { + return 0 + } + rand.Seed(time.Now().UnixNano()) + // Generate a jitter between +/- 10% of the value. + r := int64(v / 10) + j := rand.Int63n(2*r) - r + return time.Duration(j) +} diff --git a/transport/transport.go b/transport/transport.go index 9b27bab8..fe743dbe 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -366,7 +366,7 @@ type ServerConfig struct { AuthInfo credentials.AuthInfo InTapHandle tap.ServerInHandle StatsHandler stats.Handler - keepaliveParams keepalive.ServerParameters + KeepaliveParams keepalive.ServerParameters } // NewServerTransport creates a ServerTransport with conn or non-nil error diff --git a/transport/transport_test.go b/transport/transport_test.go index 0d3c6226..8ce92a2b 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -303,7 +303,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con // MaxConnectionIdle time. func TestMaxConnectionIdle(t *testing.T) { serverConfig := &ServerConfig{ - keepaliveParams: keepalive.ServerParameters{ + KeepaliveParams: keepalive.ServerParameters{ MaxConnectionIdle: 2 * time.Second, }, } @@ -318,12 +318,66 @@ func TestMaxConnectionIdle(t *testing.T) { stream.rstStream = true stream.mu.Unlock() client.CloseStream(stream, nil) - // wait for server to see that closed stream and max age to send goaway after no new RPCs are mode + // wait for server to see that closed stream and max-age logic to send goaway after no new RPCs are mode timeout := time.NewTimer(time.Second * 4) select { case <-client.GoAway(): case <-timeout.C: - t.Fatalf("Test timed out, expected a GoAway from server") + t.Fatalf("Test timed out, expected a GoAway from the server.") + } +} + +// TestMaxConnectinoAge tests that a server will send GoAway after a duration of MaxConnectionAge. +func TestMaxConnectionAge(t *testing.T) { + serverConfig := &ServerConfig{ + KeepaliveParams: keepalive.ServerParameters{ + MaxConnectionAge: 2 * time.Second, + }, + } + server, client := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{}) + defer server.stop() + defer client.Close() + _, err := client.NewStream(context.Background(), &CallHdr{Host: "localhost", Method: "foo.small"}) + if err != nil { + t.Fatalf("Client failed to create stream: %v", err) + } + // Wait for max-age logic to send GoAway. + timeout := time.NewTimer(4 * time.Second) + select { + case <-client.GoAway(): + case <-timeout.C: + t.Fatalf("Test timer out, expected a GoAway from the server.") + } +} + +// TestKeepaliveServer tests that a server closes a peer that doesn't respont to keepalive pings. +func TestKeepaliveServer(t *testing.T) { + serverConfig := &ServerConfig{ + KeepaliveParams: keepalive.ServerParameters{ + Time: 2 * time.Second, + Timeout: 1 * time.Second, + }, + } + server, c := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{}) + defer server.stop() + defer c.Close() + client, err := net.Dial("tcp", server.lis.Addr().String()) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + } + defer client.Close() + // Wait for keepalive logic to close the connection. + time.Sleep(4 * time.Second) + b := make([]byte, 24) + for { + _, err = client.Read(b) + if err == nil { + continue + } + if err != io.EOF { + t.Fatalf("client.Read(_) = _,%v, want io.EOF", err) + } + break } }