diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index e3a4068c..4ee72804 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -16,3 +16,12 @@ type ClientParameters struct { // If true, client runs keepalive checks even with no active RPCs. PermitWithoutStream bool } + +// TODO(mmukhi) : documentation +type ServerParameters struct { + MaxConnectionIdle time.Duration + MaxConnectionAge time.Duration + MaxConnectionAgeGrace time.Duration + Time time.Duration + Timeout time.Duration +} diff --git a/server.go b/server.go index 157f35ee..4fe5edac 100644 --- a/server.go +++ b/server.go @@ -53,6 +53,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/internal" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/tap" @@ -117,6 +118,7 @@ type options struct { maxConcurrentStreams uint32 useHandlerImpl bool // use http.Handler-based server unknownStreamDesc *StreamDesc + keepaliveParams keepalive.ServerParameters } var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit @@ -124,6 +126,13 @@ var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size l // A ServerOption sets options. type ServerOption func(*options) +// TODO(mmukhi) : Documentation +func KeepaliveParams(kp keepalive.ServerParameters) ServerOption { + return func(o *options) { + o.keepaliveParams = kp + } +} + // CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling. func CustomCodec(codec Codec) ServerOption { return func(o *options) { @@ -465,10 +474,11 @@ func (s *Server) handleRawConn(rawConn net.Conn) { // transport.NewServerTransport). func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) { config := &transport.ServerConfig{ - MaxStreams: s.opts.maxConcurrentStreams, - AuthInfo: authInfo, - InTapHandle: s.opts.inTapHandle, - StatsHandler: s.opts.statsHandler, + MaxStreams: s.opts.maxConcurrentStreams, + AuthInfo: authInfo, + InTapHandle: s.opts.inTapHandle, + StatsHandler: s.opts.statsHandler, + keepaliveParams: s.opts.keepaliveParams, } st, err := transport.NewServerTransport("http2", c, config) if err != nil { diff --git a/transport/control.go b/transport/control.go index 0edbe53a..64d22f84 100644 --- a/transport/control.go +++ b/transport/control.go @@ -46,12 +46,17 @@ const ( // The default value of flow control window size in HTTP2 spec. defaultWindowSize = 65535 // The initial window size for flow control. - initialWindowSize = defaultWindowSize // for an RPC - initialConnWindowSize = defaultWindowSize * 16 // for a connection - infinity = time.Duration(math.MaxInt64) - defaultKeepaliveTime = infinity - defaultKeepaliveTimeout = time.Duration(20 * time.Second) - defaultMaxStreamsClient = 100 + initialWindowSize = defaultWindowSize // for an RPC + initialConnWindowSize = defaultWindowSize * 16 // for a connection + infinity = time.Duration(math.MaxInt64) + defaultClientKeepaliveTime = infinity + defaultClientKeepaliveTimeout = time.Duration(20 * time.Second) + defaultMaxStreamsClient = 100 + defaultMaxConnectionIdle = infinity + defaultMaxConnectionAge = infinity + defaultMaxConnectionAgeGrace = infinity + defaultServerKeepaliveTime = time.Duration(2 * time.Hour) + defaultServerKeepaliveTimeout = time.Duration(20 * time.Second) ) // The following defines various control items which could flow through diff --git a/transport/http2_client.go b/transport/http2_client.go index c02ee160..61988e04 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -194,10 +194,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( kp := opts.KeepaliveParams // Validate keepalive parameters. if kp.Time == 0 { - kp.Time = defaultKeepaliveTime + kp.Time = defaultClientKeepaliveTime } if kp.Timeout == 0 { - kp.Timeout = defaultKeepaliveTimeout + kp.Timeout = defaultClientKeepaliveTimeout } var buf bytes.Buffer t := &http2Client{ diff --git a/transport/http2_server.go b/transport/http2_server.go index f5c590f4..deb91d2b 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -41,6 +41,7 @@ import ( "net" "strconv" "sync" + "time" "golang.org/x/net/context" "golang.org/x/net/http2" @@ -48,6 +49,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" @@ -90,11 +92,15 @@ type http2Server struct { stats stats.Handler + // TODO(mmukhi): Documentation + kp keepalive.ServerParameters + mu sync.Mutex // guard the following state transportState activeStreams map[uint32]*Stream // the per-stream outbound flow control window size set by the peer. streamSendQuota uint32 + idle time.Time } // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is @@ -128,6 +134,22 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err return nil, connectionErrorf(true, err, "transport: %v", err) } } + kp := config.keepaliveParams + if kp.MaxConnectionIdle == 0 { + kp.MaxConnectionIdle = defaultMaxConnectionIdle + } + if kp.MaxConnectionAge == 0 { + kp.MaxConnectionAge = defaultMaxConnectionAge + } + if kp.MaxConnectionAgeGrace == 0 { + kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace + } + if kp.Time == 0 { + kp.Time = defaultServerKeepaliveTime + } + if kp.Timeout == 0 { + kp.Timeout = defaultServerKeepaliveTimeout + } var buf bytes.Buffer t := &http2Server{ ctx: context.Background(), @@ -149,6 +171,8 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err activeStreams: make(map[uint32]*Stream), streamSendQuota: defaultWindowSize, stats: config.StatsHandler, + kp: kp, + idle: time.Now(), } if t.stats != nil { t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{ @@ -248,6 +272,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( t.maxStreamID = s.id s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota)) t.activeStreams[s.id] = s + if len(t.activeStreams) == 1 { + t.idle = time.Time{} + } t.mu.Unlock() s.windowHandler = func(n int) { t.updateWindow(s, uint32(n)) @@ -735,6 +762,37 @@ func (t *http2Server) applySettings(ss []http2.Setting) { } } +// TODO(mmukhi): Documentation +func (t *http2Server) keepalive() { + //p := &ping{data: [8]byte{}} + maxIdle := time.NewTimer(t.kp.MaxConnectionIdle) + maxAge := time.NewTimer(t.kp.MaxConnectionAge) + keepalive := time.NewTimer(t.kp.Time) + t.mu.Lock() + idle := t.idle + t.mu.Unlock() + for { + select { + case <-maxIdle.C: + if idle == t.idle { + // send go away + continue + } + if idle.IsZero() { + maxIdle.Reset(t.kp.MaxConnectionIdle) + continue + } + maxIdle.Reset(t.kp.MaxConnectionIdle - time.Since(idle)) + case <-maxAge.C: + case <-keepalive.C: + case <-t.shutdownChan: + // TODO(mmukhi): clean-up + return + } + } + +} + // controller running in a separate goroutine takes charge of sending control // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Server) controller() { @@ -816,6 +874,9 @@ func (t *http2Server) Close() (err error) { func (t *http2Server) closeStream(s *Stream) { t.mu.Lock() delete(t.activeStreams, s.id) + if len(t.activeStreams) == 0 { + t.idle = time.Now() + } if t.state == draining && len(t.activeStreams) == 0 { defer t.Close() } diff --git a/transport/transport.go b/transport/transport.go index fed69089..9b27bab8 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -362,10 +362,11 @@ const ( // ServerConfig consists of all the configurations to establish a server transport. type ServerConfig struct { - MaxStreams uint32 - AuthInfo credentials.AuthInfo - InTapHandle tap.ServerInHandle - StatsHandler stats.Handler + MaxStreams uint32 + AuthInfo credentials.AuthInfo + InTapHandle tap.ServerInHandle + StatsHandler stats.Handler + keepaliveParams keepalive.ServerParameters } // NewServerTransport creates a ServerTransport with conn or non-nil error diff --git a/transport/transport_test.go b/transport/transport_test.go index 2e9674aa..f58365a5 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -156,7 +156,7 @@ func (h *testStreamHandler) handleStreamInvalidHeaderField(t *testing.T, s *Stre } // start starts server. Other goroutines should block on s.readyChan for further operations. -func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) { +func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hType) { var err error if port == 0 { s.lis, err = net.Listen("tcp", "localhost:0") @@ -180,10 +180,7 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) { if err != nil { return } - config := &ServerConfig{ - MaxStreams: maxStreams, - } - transport, err := NewServerTransport("http2", conn, config) + transport, err := NewServerTransport("http2", conn, serverConfig) if err != nil { return } @@ -252,12 +249,12 @@ func (s *server) stop() { } func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) { - return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{}) + return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{}) } -func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) { +func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, ClientTransport) { server := &server{startedErr: make(chan error, 1)} - go server.start(t, port, maxStreams, ht) + go server.start(t, port, serverConfig, ht) server.wait(t, 2*time.Second) addr := "localhost:" + server.port var ( @@ -301,6 +298,18 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con return tr } +// TestMaxConnectionIdle tests that a server will send GoAway to a idle client. +// An idle client is one who doesn't make any RPC calls for a duration of +// MaxConnectionIdle time. +func TestMaxConnectionIdle(t *testing.T) { + serverConfig := &ServerConfig{ + keepaliveParams: keepalive.ServerParams{ + MaxConnectionIdle: 2 * time.Second, + }, + } + server, client := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{}) +} + func TestKeepaliveClientClosesIdleTransport(t *testing.T) { done := make(chan net.Conn, 1) tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ @@ -377,7 +386,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { - s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ + s, tr := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs.