From e58450b5d3a66f6bcd8ac4571bb3c8589b8ed254 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Thu, 17 Nov 2016 17:50:52 -0800 Subject: [PATCH 01/21] Keepalive client-side implementation --- clientconn.go | 11 +++ keepalive/keepalive.go | 22 ++++++ transport/http2_client.go | 44 ++++++++++++ transport/transport.go | 3 + transport/transport_test.go | 134 +++++++++++++++++++++++++++++++++++- 5 files changed, 213 insertions(+), 1 deletion(-) create mode 100644 keepalive/keepalive.go diff --git a/clientconn.go b/clientconn.go index f6dab4b7..99f2c37e 100644 --- a/clientconn.go +++ b/clientconn.go @@ -45,6 +45,7 @@ import ( "golang.org/x/net/trace" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/transport" ) @@ -230,6 +231,13 @@ func WithUserAgent(s string) DialOption { } } +// WithKeepaliveParams returns a DialOption that specifies a user agent string for all the RPCs. +func WithKeepaliveParams(k keepalive.KeepaliveParams) DialOption { + return func(o *dialOptions) { + o.copts.KParams = k + } +} + // WithUnaryInterceptor returns a DialOption that specifies the interceptor for unary RPCs. func WithUnaryInterceptor(f UnaryClientInterceptor) DialOption { return func(o *dialOptions) { @@ -277,6 +285,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } // Set defaults. + if cc.dopts.copts.KParams == (keepalive.KeepaliveParams{}) { + cc.dopts.copts.KParams = keepalive.DefaultKParams + } if cc.dopts.codec == nil { cc.dopts.codec = protoCodec{} } diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go new file mode 100644 index 00000000..284b6eac --- /dev/null +++ b/keepalive/keepalive.go @@ -0,0 +1,22 @@ +package keepalive + +import ( + "time" +) + +type KeepaliveParams struct { + // After a duration of this time the client pings the server to see if the transport is still alive. + Ktime time.Duration + // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. + Ktimeout time.Duration + //If true, client runs keepalive checks even with no active RPCs. + KNoStream bool +} + +var DefaultKParams KeepaliveParams = KeepaliveParams{ + Ktime: time.Duration(290 * 365 * 24 * 60 * 60 * 1000 * 1000 * 1000), // default to infinite + Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds + KNoStream: false, +} + +var Enabled = false diff --git a/transport/http2_client.go b/transport/http2_client.go index cbd9f326..0e81050d 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -49,6 +49,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" @@ -109,6 +110,15 @@ type http2Client struct { goAwayID uint32 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. prevGoAwayID uint32 + + // lastRecv counts whenever a frame is recieved + lastRecv int64 + + // lastSent counts whenever a frame is sent + lastSent int64 + + // keepalive parameters + kParams keepalive.KeepaliveParams } func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { @@ -206,6 +216,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( creds: opts.PerRPCCredentials, maxStreams: math.MaxInt32, streamSendQuota: defaultWindowSize, + kParams: opts.KParams, } // Start the reader goroutine for incoming message. Each transport has // a dedicated goroutine which reads HTTP2 frame from network. Then it @@ -690,6 +701,8 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { break } } + // update last send + t.lastSent++ if !opts.Last { return nil } @@ -830,6 +843,8 @@ func (t *http2Client) handlePing(f *http2.PingFrame) { pingAck := &ping{ack: true} copy(pingAck.data[:], f.Data[:]) t.controlBuf.put(pingAck) + // Update last sent + t.lastSent++ } func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { @@ -976,6 +991,8 @@ func (t *http2Client) reader() { // loop to keep reading incoming messages on this transport. for { frame, err := t.framer.readFrame() + // update lastRecv counter + t.lastRecv++ if err != nil { // Abort an active stream if the http2.Framer returns a // http2.StreamError. This can happen only if the server's response @@ -1052,6 +1069,16 @@ func (t *http2Client) applySettings(ss []http2.Setting) { // controller running in a separate goroutine takes charge of sending control // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Client) controller() { + tRCounter := t.lastRecv + tSCounter := t.lastSent + timer := time.NewTimer(t.kParams.Ktime) + if !keepalive.Enabled { + // prevent the timer from firing, ever + if !timer.Stop() { + <-timer.C + } + } + isPingSent := false for { select { case i := <-t.controlBuf.get(): @@ -1082,6 +1109,23 @@ func (t *http2Client) controller() { case <-t.shutdownChan: return } + case <-timer.C: + if t.lastRecv > tRCounter || t.lastSent > tSCounter || (!t.kParams.KNoStream && len(t.activeStreams) < 1) { + timer.Reset(t.kParams.Ktime) + isPingSent = false + } else { + if !isPingSent { + // send ping + t.framer.writePing(true, false, [8]byte{}) + isPingSent = true + timer.Reset(t.kParams.Ktimeout) + } else { + t.Close() + continue + } + } + tRCounter = t.lastRecv + tSCounter = t.lastSent case <-t.shutdownChan: return } diff --git a/transport/transport.go b/transport/transport.go index 4726bb2c..3648ca14 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -47,6 +47,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/tap" ) @@ -380,6 +381,8 @@ type ConnectOptions struct { PerRPCCredentials []credentials.PerRPCCredentials // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials + // Keepalive parameters + KParams keepalive.KeepaliveParams } // TargetInfo contains the information of the target such as network address and metadata. diff --git a/transport/transport_test.go b/transport/transport_test.go index 1ca6eb1a..dd464581 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -49,6 +49,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" ) type server struct { @@ -251,6 +252,10 @@ func (s *server) stop() { } func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) { + return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{}) +} + +func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) { server := &server{startedErr: make(chan error, 1)} go server.start(t, port, maxStreams, ht) server.wait(t, 2*time.Second) @@ -262,13 +267,140 @@ func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, Client target := TargetInfo{ Addr: addr, } - ct, connErr = NewClientTransport(context.Background(), target, ConnectOptions{}) + ct, connErr = NewClientTransport(context.Background(), target, copts) if connErr != nil { t.Fatalf("failed to create transport: %v", connErr) } return server, ct } +func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) *http2Client { + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("Failed to listen: %v", err) + } + // launch a non responsive server + go func() { + defer lis.Close() + conn, err := lis.Accept() + if err != nil { + t.Errorf("Error at server-side while accepting: %v", err) + close(done) + return + } + done <- conn + }() + tr, err := newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts) + if err != nil { + t.Fatalf("Failed to dial: %v", err) + } + cT := tr.(*http2Client) + // Assert client transport is healthy + cT.mu.Lock() + defer cT.mu.Unlock() + if cT.state != reachable { + t.Fatalf("Client transport not healthy") + } + return cT +} + +func TestKeepaliveClientClosesIdleTransport(t *testing.T) { + keepalive.Enabled = true + done := make(chan net.Conn, 1) + cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.KeepaliveParams{ + Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec + Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec + KNoStream: true, // run keepalive even with no RPCs + }}, done) + defer cT.Close() + conn, ok := <-done + if !ok { + t.Fatalf("Server didn't return connection object") + } + defer conn.Close() + // Sleep for keepalive to close the connection + time.Sleep(4 * time.Second) + // Assert that the connection was closed + cT.mu.Lock() + defer cT.mu.Unlock() + if cT.state == reachable { + t.Fatalf("Test Failed: Expected client transport to have closed.") + } +} + +func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { + keepalive.Enabled = true + done := make(chan net.Conn, 1) + cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.KeepaliveParams{ + Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec + Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec + KNoStream: false, // don't run keepalive even with no RPCs + }}, done) + defer cT.Close() + conn, ok := <-done + if !ok { + t.Fatalf("server didn't reutrn connection object") + } + defer conn.Close() + // Give keepalive some time + time.Sleep(4 * time.Second) + // Assert that connections is still healthy + cT.mu.Lock() + defer cT.mu.Unlock() + if cT.state != reachable { + t.Fatalf("Test failed: Expected client transport to be healthy.") + } +} + +func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { + keepalive.Enabled = true + done := make(chan net.Conn, 1) + cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.KeepaliveParams{ + Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec + Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec + KNoStream: false, // don't run keepalive even with no RPCs + }}, done) + defer cT.Close() + conn, ok := <-done + if !ok { + t.Fatalf("Server didn't return connection object") + } + defer conn.Close() + // create a stream + _, err := cT.NewStream(context.Background(), &CallHdr{}) + if err != nil { + t.Fatalf("Failed to create a new stream: %v", err) + } + // Give keepalive some time + time.Sleep(4 * time.Second) + // Asser that transport was closed + cT.mu.Lock() + defer cT.mu.Unlock() + if cT.state == reachable { + t.Fatalf("Test failed: Expected client transport to have closed.") + } +} + +func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { + keepalive.Enabled = true + s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.KeepaliveParams{ + Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec + Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec + KNoStream: true, // don't run keepalive even with no RPCs + }}) + defer s.stop() + defer tr.Close() + // Give keep alive some time + time.Sleep(4 * time.Second) + // Assert that transport is healthy + cT := tr.(*http2Client) + cT.mu.Lock() + defer cT.mu.Unlock() + if cT.state != reachable { + t.Fatalf("Test failed: Expected client transport to be healthy.") + } +} + func TestClientSendAndReceive(t *testing.T) { server, ct := setUp(t, 0, math.MaxUint32, normal) callHdr := &CallHdr{ From a417b824f1dd8714e0bb05c29ca2d624e51f6b61 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 18 Nov 2016 11:09:07 -0800 Subject: [PATCH 02/21] Added synchronization --- transport/http2_client.go | 45 +++++++++++++++++++++------------------ 1 file changed, 24 insertions(+), 21 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index 0e81050d..dff2a1e3 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -41,6 +41,7 @@ import ( "net" "strings" "sync" + "sync/atomic" "time" "golang.org/x/net/context" @@ -99,6 +100,11 @@ type http2Client struct { creds []credentials.PerRPCCredentials + // activity counter + activity *uint64 + // keepalive parameters + kParams keepalive.KeepaliveParams + mu sync.Mutex // guard the following variables state transportState // the state of underlying connection activeStreams map[uint32]*Stream @@ -110,15 +116,6 @@ type http2Client struct { goAwayID uint32 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. prevGoAwayID uint32 - - // lastRecv counts whenever a frame is recieved - lastRecv int64 - - // lastSent counts whenever a frame is sent - lastSent int64 - - // keepalive parameters - kParams keepalive.KeepaliveParams } func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { @@ -217,6 +214,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( maxStreams: math.MaxInt32, streamSendQuota: defaultWindowSize, kParams: opts.KParams, + activity: new(uint64), } // Start the reader goroutine for incoming message. Each transport has // a dedicated goroutine which reads HTTP2 frame from network. Then it @@ -701,8 +699,8 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { break } } - // update last send - t.lastSent++ + // activity++ + atomic.AddUint64(t.activity, 1) if !opts.Last { return nil } @@ -843,8 +841,8 @@ func (t *http2Client) handlePing(f *http2.PingFrame) { pingAck := &ping{ack: true} copy(pingAck.data[:], f.Data[:]) t.controlBuf.put(pingAck) - // Update last sent - t.lastSent++ + // activity++ + atomic.AddUint64(t.activity, 1) } func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { @@ -991,8 +989,8 @@ func (t *http2Client) reader() { // loop to keep reading incoming messages on this transport. for { frame, err := t.framer.readFrame() - // update lastRecv counter - t.lastRecv++ + // activity++ + atomic.AddUint64(t.activity, 1) if err != nil { // Abort an active stream if the http2.Framer returns a // http2.StreamError. This can happen only if the server's response @@ -1069,11 +1067,11 @@ func (t *http2Client) applySettings(ss []http2.Setting) { // controller running in a separate goroutine takes charge of sending control // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Client) controller() { - tRCounter := t.lastRecv - tSCounter := t.lastSent + // Activity value seen by timer + ta := atomic.LoadUint64(t.activity) timer := time.NewTimer(t.kParams.Ktime) if !keepalive.Enabled { - // prevent the timer from firing, ever + // Prevent the timer from firing, ever. if !timer.Stop() { <-timer.C } @@ -1110,7 +1108,12 @@ func (t *http2Client) controller() { return } case <-timer.C: - if t.lastRecv > tRCounter || t.lastSent > tSCounter || (!t.kParams.KNoStream && len(t.activeStreams) < 1) { + t.mu.Lock() + ns := len(t.activeStreams) + t.mu.Unlock() + // Global activity value. + ga := atomic.LoadUint64(t.activity) + if ga > ta || (!t.kParams.KNoStream && ns < 1) { timer.Reset(t.kParams.Ktime) isPingSent = false } else { @@ -1124,8 +1127,8 @@ func (t *http2Client) controller() { continue } } - tRCounter = t.lastRecv - tSCounter = t.lastSent + // Update timer activity counter. + ta = ga case <-t.shutdownChan: return } From 93111e4c2724e9352c2b69ad5121150d9f0f2f25 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 18 Nov 2016 11:54:05 -0800 Subject: [PATCH 03/21] sending ping the right way --- transport/http2_client.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index dff2a1e3..23bb91bd 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -1077,6 +1077,7 @@ func (t *http2Client) controller() { } } isPingSent := false + kPing := &ping{data: [8]byte{}} for { select { case i := <-t.controlBuf.get(): @@ -1119,7 +1120,7 @@ func (t *http2Client) controller() { } else { if !isPingSent { // send ping - t.framer.writePing(true, false, [8]byte{}) + t.controlBuf.put(kPing) isPingSent = true timer.Reset(t.kParams.Ktimeout) } else { From 058907d0334a5295248ebd54560821dd7ee6bc08 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 18 Nov 2016 13:55:04 -0800 Subject: [PATCH 04/21] refactored and rectified test failures --- clientconn.go | 5 +---- keepalive/keepalive.go | 13 +++++++++---- transport/http2_client.go | 22 ++++++++++++++-------- transport/transport.go | 2 +- transport/transport_test.go | 16 ++++++++++++---- 5 files changed, 37 insertions(+), 21 deletions(-) diff --git a/clientconn.go b/clientconn.go index 99f2c37e..b45b699f 100644 --- a/clientconn.go +++ b/clientconn.go @@ -232,7 +232,7 @@ func WithUserAgent(s string) DialOption { } // WithKeepaliveParams returns a DialOption that specifies a user agent string for all the RPCs. -func WithKeepaliveParams(k keepalive.KeepaliveParams) DialOption { +func WithKeepaliveParams(k keepalive.Params) DialOption { return func(o *dialOptions) { o.copts.KParams = k } @@ -285,9 +285,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * } // Set defaults. - if cc.dopts.copts.KParams == (keepalive.KeepaliveParams{}) { - cc.dopts.copts.KParams = keepalive.DefaultKParams - } if cc.dopts.codec == nil { cc.dopts.codec = protoCodec{} } diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 284b6eac..686315ee 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -1,10 +1,12 @@ package keepalive import ( + "math" + "sync" "time" ) -type KeepaliveParams struct { +type Params struct { // After a duration of this time the client pings the server to see if the transport is still alive. Ktime time.Duration // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. @@ -13,10 +15,13 @@ type KeepaliveParams struct { KNoStream bool } -var DefaultKParams KeepaliveParams = KeepaliveParams{ - Ktime: time.Duration(290 * 365 * 24 * 60 * 60 * 1000 * 1000 * 1000), // default to infinite - Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds +var DefaultKParams Params = Params{ + Ktime: time.Duration(math.MaxInt64), // default to infinite + Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds KNoStream: false, } +// Mutex to protect Enabled variable +var Mu sync.Mutex = sync.Mutex{} + var Enabled = false diff --git a/transport/http2_client.go b/transport/http2_client.go index 23bb91bd..e691e4be 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -103,7 +103,7 @@ type http2Client struct { // activity counter activity *uint64 // keepalive parameters - kParams keepalive.KeepaliveParams + keepaliveParams keepalive.Params mu sync.Mutex // guard the following variables state transportState // the state of underlying connection @@ -186,6 +186,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( if opts.UserAgent != "" { ua = opts.UserAgent + " " + ua } + kp := keepalive.DefaultKParams + if opts.KParams != (keepalive.Params{}) { + kp = opts.KParams + } var buf bytes.Buffer t := &http2Client{ target: addr.Addr, @@ -213,7 +217,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( creds: opts.PerRPCCredentials, maxStreams: math.MaxInt32, streamSendQuota: defaultWindowSize, - kParams: opts.KParams, + keepaliveParams: kp, activity: new(uint64), } // Start the reader goroutine for incoming message. Each transport has @@ -1069,15 +1073,17 @@ func (t *http2Client) applySettings(ss []http2.Setting) { func (t *http2Client) controller() { // Activity value seen by timer ta := atomic.LoadUint64(t.activity) - timer := time.NewTimer(t.kParams.Ktime) + timer := time.NewTimer(t.keepaliveParams.Ktime) + keepalive.Mu.Lock() if !keepalive.Enabled { // Prevent the timer from firing, ever. if !timer.Stop() { <-timer.C } } + keepalive.Mu.Unlock() isPingSent := false - kPing := &ping{data: [8]byte{}} + keepalivePing := &ping{data: [8]byte{}} for { select { case i := <-t.controlBuf.get(): @@ -1114,15 +1120,15 @@ func (t *http2Client) controller() { t.mu.Unlock() // Global activity value. ga := atomic.LoadUint64(t.activity) - if ga > ta || (!t.kParams.KNoStream && ns < 1) { - timer.Reset(t.kParams.Ktime) + if ga > ta || (!t.keepaliveParams.KNoStream && ns < 1) { + timer.Reset(t.keepaliveParams.Ktime) isPingSent = false } else { if !isPingSent { // send ping - t.controlBuf.put(kPing) + t.controlBuf.put(keepalivePing) isPingSent = true - timer.Reset(t.kParams.Ktimeout) + timer.Reset(t.keepaliveParams.Ktimeout) } else { t.Close() continue diff --git a/transport/transport.go b/transport/transport.go index 3648ca14..20dc914e 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -382,7 +382,7 @@ type ConnectOptions struct { // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials // Keepalive parameters - KParams keepalive.KeepaliveParams + KParams keepalive.Params } // TargetInfo contains the information of the target such as network address and metadata. diff --git a/transport/transport_test.go b/transport/transport_test.go index dd464581..6e8be33b 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -305,9 +305,11 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con } func TestKeepaliveClientClosesIdleTransport(t *testing.T) { + keepalive.Mu.Lock() keepalive.Enabled = true + keepalive.Mu.Unlock() done := make(chan net.Conn, 1) - cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.KeepaliveParams{ + cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec KNoStream: true, // run keepalive even with no RPCs @@ -329,9 +331,11 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { } func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { + keepalive.Mu.Lock() keepalive.Enabled = true + keepalive.Mu.Unlock() done := make(chan net.Conn, 1) - cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.KeepaliveParams{ + cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec KNoStream: false, // don't run keepalive even with no RPCs @@ -353,9 +357,11 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { } func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { + keepalive.Mu.Lock() keepalive.Enabled = true + keepalive.Mu.Unlock() done := make(chan net.Conn, 1) - cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.KeepaliveParams{ + cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec KNoStream: false, // don't run keepalive even with no RPCs @@ -382,8 +388,10 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { + keepalive.Mu.Lock() keepalive.Enabled = true - s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.KeepaliveParams{ + keepalive.Mu.Unlock() + s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.Params{ Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec KNoStream: true, // don't run keepalive even with no RPCs From 45907c27fa8cd85d95cedac78e05d4ae2ab07ea2 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 18 Nov 2016 14:23:08 -0800 Subject: [PATCH 05/21] go tool vet rectification --- keepalive/keepalive.go | 9 ++++++--- 1 file changed, 6 insertions(+), 3 deletions(-) diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 686315ee..4f588ecc 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -6,6 +6,7 @@ import ( "time" ) +// Params is used to set keepalive parameters. type Params struct { // After a duration of this time the client pings the server to see if the transport is still alive. Ktime time.Duration @@ -15,13 +16,15 @@ type Params struct { KNoStream bool } -var DefaultKParams Params = Params{ +// DefaultKParams contains default values for keepalive parameters +var DefaultKParams = Params{ Ktime: time.Duration(math.MaxInt64), // default to infinite Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds KNoStream: false, } -// Mutex to protect Enabled variable -var Mu sync.Mutex = sync.Mutex{} +// MU is a mutex to protect Enabled variable +var Mu = sync.Mutex{} +// Enabled is a knob used to turn keepalive on or off var Enabled = false From 97fb58a27f82b3f52c2104335bb3ba2dfef0d708 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 18 Nov 2016 14:29:23 -0800 Subject: [PATCH 06/21] go tool vet again --- keepalive/keepalive.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 4f588ecc..741b9b1a 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -23,7 +23,7 @@ var DefaultKParams = Params{ KNoStream: false, } -// MU is a mutex to protect Enabled variable +// Mu is a mutex to protect Enabled variable var Mu = sync.Mutex{} // Enabled is a knob used to turn keepalive on or off From 901cdf6fb57045870a4c074c17b6c3948d67c5b3 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Mon, 21 Nov 2016 14:18:57 -0800 Subject: [PATCH 07/21] Post review updates --- keepalive/keepalive.go | 40 ++++++++++++++++++++++++--------- transport/http2_client.go | 25 ++++++++++----------- transport/transport_test.go | 44 +++++++++++++++++-------------------- 3 files changed, 61 insertions(+), 48 deletions(-) diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 741b9b1a..39aa1c78 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -9,22 +9,42 @@ import ( // Params is used to set keepalive parameters. type Params struct { // After a duration of this time the client pings the server to see if the transport is still alive. - Ktime time.Duration + Time time.Duration // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. - Ktimeout time.Duration + Timeout time.Duration //If true, client runs keepalive checks even with no active RPCs. - KNoStream bool + PermitNoStream bool } // DefaultKParams contains default values for keepalive parameters var DefaultKParams = Params{ - Ktime: time.Duration(math.MaxInt64), // default to infinite - Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds - KNoStream: false, + Time: time.Duration(math.MaxInt64), // default to infinite + Timeout: time.Duration(20 * time.Second), } -// Mu is a mutex to protect Enabled variable -var Mu = sync.Mutex{} +// mu is a mutex to protect Enabled variable +var mu = sync.Mutex{} -// Enabled is a knob used to turn keepalive on or off -var Enabled = false +// enable is a knob used to turn keepalive on or off +var enable = false + +// Enabled exposes the value of enable variable +func Enabled() bool { + mu.Lock() + defer mu.Unlock() + return enable +} + +// Enable can be called to enable keepalives +func Enable() { + mu.Lock() + defer mu.Unlock() + enable = true +} + +// Disable can be called to disable keepalive +func Disable() { + mu.Lock() + defer mu.Unlock() + enable = false +} diff --git a/transport/http2_client.go b/transport/http2_client.go index e691e4be..cc516b0e 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -101,7 +101,7 @@ type http2Client struct { creds []credentials.PerRPCCredentials // activity counter - activity *uint64 + activity uint64 // accessed atomically // keepalive parameters keepaliveParams keepalive.Params @@ -218,7 +218,6 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( maxStreams: math.MaxInt32, streamSendQuota: defaultWindowSize, keepaliveParams: kp, - activity: new(uint64), } // Start the reader goroutine for incoming message. Each transport has // a dedicated goroutine which reads HTTP2 frame from network. Then it @@ -704,7 +703,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { } } // activity++ - atomic.AddUint64(t.activity, 1) + atomic.AddUint64(&t.activity, 1) if !opts.Last { return nil } @@ -846,7 +845,7 @@ func (t *http2Client) handlePing(f *http2.PingFrame) { copy(pingAck.data[:], f.Data[:]) t.controlBuf.put(pingAck) // activity++ - atomic.AddUint64(t.activity, 1) + atomic.AddUint64(&t.activity, 1) } func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { @@ -994,7 +993,7 @@ func (t *http2Client) reader() { for { frame, err := t.framer.readFrame() // activity++ - atomic.AddUint64(t.activity, 1) + atomic.AddUint64(&t.activity, 1) if err != nil { // Abort an active stream if the http2.Framer returns a // http2.StreamError. This can happen only if the server's response @@ -1072,16 +1071,14 @@ func (t *http2Client) applySettings(ss []http2.Setting) { // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Client) controller() { // Activity value seen by timer - ta := atomic.LoadUint64(t.activity) - timer := time.NewTimer(t.keepaliveParams.Ktime) - keepalive.Mu.Lock() - if !keepalive.Enabled { + ta := atomic.LoadUint64(&t.activity) + timer := time.NewTimer(t.keepaliveParams.Time) + if !keepalive.Enabled() { // Prevent the timer from firing, ever. if !timer.Stop() { <-timer.C } } - keepalive.Mu.Unlock() isPingSent := false keepalivePing := &ping{data: [8]byte{}} for { @@ -1119,16 +1116,16 @@ func (t *http2Client) controller() { ns := len(t.activeStreams) t.mu.Unlock() // Global activity value. - ga := atomic.LoadUint64(t.activity) - if ga > ta || (!t.keepaliveParams.KNoStream && ns < 1) { - timer.Reset(t.keepaliveParams.Ktime) + ga := atomic.LoadUint64(&t.activity) + if ga > ta || (!t.keepaliveParams.PermitNoStream && ns < 1) { + timer.Reset(t.keepaliveParams.Time) isPingSent = false } else { if !isPingSent { // send ping t.controlBuf.put(keepalivePing) isPingSent = true - timer.Reset(t.keepaliveParams.Ktimeout) + timer.Reset(t.keepaliveParams.Timeout) } else { t.Close() continue diff --git a/transport/transport_test.go b/transport/transport_test.go index 6e8be33b..2f677964 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -305,14 +305,13 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con } func TestKeepaliveClientClosesIdleTransport(t *testing.T) { - keepalive.Mu.Lock() - keepalive.Enabled = true - keepalive.Mu.Unlock() + keepalive.Enable() + defer keepalive.Disable() done := make(chan net.Conn, 1) cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ - Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec - Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec - KNoStream: true, // run keepalive even with no RPCs + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitNoStream: true, // run keepalive even with no RPCs }}, done) defer cT.Close() conn, ok := <-done @@ -331,14 +330,13 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { } func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { - keepalive.Mu.Lock() - keepalive.Enabled = true - keepalive.Mu.Unlock() + keepalive.Enable() + defer keepalive.Disable() done := make(chan net.Conn, 1) cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ - Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec - Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec - KNoStream: false, // don't run keepalive even with no RPCs + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitNoStream: false, // don't run keepalive even with no RPCs }}, done) defer cT.Close() conn, ok := <-done @@ -357,14 +355,13 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { } func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { - keepalive.Mu.Lock() - keepalive.Enabled = true - keepalive.Mu.Unlock() + keepalive.Enable() + defer keepalive.Disable() done := make(chan net.Conn, 1) cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ - Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec - Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec - KNoStream: false, // don't run keepalive even with no RPCs + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitNoStream: false, // don't run keepalive even with no RPCs }}, done) defer cT.Close() conn, ok := <-done @@ -388,13 +385,12 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { - keepalive.Mu.Lock() - keepalive.Enabled = true - keepalive.Mu.Unlock() + keepalive.Enable() + defer keepalive.Disable() s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.Params{ - Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec - Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec - KNoStream: true, // don't run keepalive even with no RPCs + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitNoStream: true, // don't run keepalive even with no RPCs }}) defer s.stop() defer tr.Close() From eeb6f5bade002e3b8195315f2d915e7bd7faada3 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 6 Jan 2017 16:52:37 -0800 Subject: [PATCH 08/21] code cleaning --- clientconn.go | 2 +- keepalive/keepalive.go | 6 +-- transport/http2_client.go | 28 +++++-------- transport/transport.go | 2 +- transport/transport_test.go | 82 ++++++++++++++++++------------------- 5 files changed, 55 insertions(+), 65 deletions(-) diff --git a/clientconn.go b/clientconn.go index b45b699f..8b028053 100644 --- a/clientconn.go +++ b/clientconn.go @@ -234,7 +234,7 @@ func WithUserAgent(s string) DialOption { // WithKeepaliveParams returns a DialOption that specifies a user agent string for all the RPCs. func WithKeepaliveParams(k keepalive.Params) DialOption { return func(o *dialOptions) { - o.copts.KParams = k + o.copts.KeepaliveParams = k } } diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 39aa1c78..a1005a94 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -13,11 +13,11 @@ type Params struct { // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. Timeout time.Duration //If true, client runs keepalive checks even with no active RPCs. - PermitNoStream bool + PermitWithoutStream bool } -// DefaultKParams contains default values for keepalive parameters -var DefaultKParams = Params{ +// DefaultParams contains default values for keepalive parameters +var DefaultParams = Params{ Time: time.Duration(math.MaxInt64), // default to infinite Timeout: time.Duration(20 * time.Second), } diff --git a/transport/http2_client.go b/transport/http2_client.go index cc516b0e..04d474ac 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -103,7 +103,7 @@ type http2Client struct { // activity counter activity uint64 // accessed atomically // keepalive parameters - keepaliveParams keepalive.Params + kp keepalive.Params mu sync.Mutex // guard the following variables state transportState // the state of underlying connection @@ -186,9 +186,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( if opts.UserAgent != "" { ua = opts.UserAgent + " " + ua } - kp := keepalive.DefaultKParams - if opts.KParams != (keepalive.Params{}) { - kp = opts.KParams + kp := keepalive.DefaultParams + if opts.KeepaliveParams != (keepalive.Params{}) { + kp = opts.KeepaliveParams } var buf bytes.Buffer t := &http2Client{ @@ -217,7 +217,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( creds: opts.PerRPCCredentials, maxStreams: math.MaxInt32, streamSendQuota: defaultWindowSize, - keepaliveParams: kp, + kp: kp, } // Start the reader goroutine for incoming message. Each transport has // a dedicated goroutine which reads HTTP2 frame from network. Then it @@ -844,8 +844,6 @@ func (t *http2Client) handlePing(f *http2.PingFrame) { pingAck := &ping{ack: true} copy(pingAck.data[:], f.Data[:]) t.controlBuf.put(pingAck) - // activity++ - atomic.AddUint64(&t.activity, 1) } func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { @@ -1070,9 +1068,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) { // controller running in a separate goroutine takes charge of sending control // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Client) controller() { - // Activity value seen by timer - ta := atomic.LoadUint64(&t.activity) - timer := time.NewTimer(t.keepaliveParams.Time) + timer := time.NewTimer(t.kp.Time) if !keepalive.Enabled() { // Prevent the timer from firing, ever. if !timer.Stop() { @@ -1115,24 +1111,22 @@ func (t *http2Client) controller() { t.mu.Lock() ns := len(t.activeStreams) t.mu.Unlock() - // Global activity value. - ga := atomic.LoadUint64(&t.activity) - if ga > ta || (!t.keepaliveParams.PermitNoStream && ns < 1) { - timer.Reset(t.keepaliveParams.Time) + // Get the activity counter value and reset it. + a := atomic.SwapUint64(&t.activity, 0) + if a > 0 || (!t.kp.PermitWithoutStream && ns < 1) { + timer.Reset(t.kp.Time) isPingSent = false } else { if !isPingSent { // send ping t.controlBuf.put(keepalivePing) isPingSent = true - timer.Reset(t.keepaliveParams.Timeout) + timer.Reset(t.kp.Timeout) } else { t.Close() continue } } - // Update timer activity counter. - ta = ga case <-t.shutdownChan: return } diff --git a/transport/transport.go b/transport/transport.go index 20dc914e..446b6aef 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -382,7 +382,7 @@ type ConnectOptions struct { // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials // Keepalive parameters - KParams keepalive.Params + KeepaliveParams keepalive.Params } // TargetInfo contains the information of the target such as network address and metadata. diff --git a/transport/transport_test.go b/transport/transport_test.go index 2f677964..1b80625a 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -274,7 +274,7 @@ func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts return server, ct } -func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) *http2Client { +func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) ClientTransport { lis, err := net.Listen("tcp", "localhost:0") if err != nil { t.Fatalf("Failed to listen: %v", err) @@ -290,30 +290,23 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con } done <- conn }() - tr, err := newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts) + tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts) if err != nil { t.Fatalf("Failed to dial: %v", err) } - cT := tr.(*http2Client) - // Assert client transport is healthy - cT.mu.Lock() - defer cT.mu.Unlock() - if cT.state != reachable { - t.Fatalf("Client transport not healthy") - } - return cT + return tr } func TestKeepaliveClientClosesIdleTransport(t *testing.T) { keepalive.Enable() defer keepalive.Disable() done := make(chan net.Conn, 1) - cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ - Time: 2 * time.Second, // keepalive time = 2 sec - Timeout: 1 * time.Second, // keepalive timeout = 1 sec - PermitNoStream: true, // run keepalive even with no RPCs + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitWithoutStream: true, // run keepalive even with no RPCs }}, done) - defer cT.Close() + defer tr.Close() conn, ok := <-done if !ok { t.Fatalf("Server didn't return connection object") @@ -322,9 +315,10 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { // Sleep for keepalive to close the connection time.Sleep(4 * time.Second) // Assert that the connection was closed - cT.mu.Lock() - defer cT.mu.Unlock() - if cT.state == reachable { + ct := tr.(*http2Client) + ct.mu.Lock() + defer ct.mu.Unlock() + if ct.state == reachable { t.Fatalf("Test Failed: Expected client transport to have closed.") } } @@ -333,12 +327,12 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { keepalive.Enable() defer keepalive.Disable() done := make(chan net.Conn, 1) - cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ - Time: 2 * time.Second, // keepalive time = 2 sec - Timeout: 1 * time.Second, // keepalive timeout = 1 sec - PermitNoStream: false, // don't run keepalive even with no RPCs + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitWithoutStream: false, // don't run keepalive even with no RPCs }}, done) - defer cT.Close() + defer tr.Close() conn, ok := <-done if !ok { t.Fatalf("server didn't reutrn connection object") @@ -347,9 +341,10 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { // Give keepalive some time time.Sleep(4 * time.Second) // Assert that connections is still healthy - cT.mu.Lock() - defer cT.mu.Unlock() - if cT.state != reachable { + ct := tr.(*http2Client) + ct.mu.Lock() + defer ct.mu.Unlock() + if ct.state != reachable { t.Fatalf("Test failed: Expected client transport to be healthy.") } } @@ -358,28 +353,29 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { keepalive.Enable() defer keepalive.Disable() done := make(chan net.Conn, 1) - cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ - Time: 2 * time.Second, // keepalive time = 2 sec - Timeout: 1 * time.Second, // keepalive timeout = 1 sec - PermitNoStream: false, // don't run keepalive even with no RPCs + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitWithoutStream: false, // don't run keepalive even with no RPCs }}, done) - defer cT.Close() + defer tr.Close() conn, ok := <-done if !ok { t.Fatalf("Server didn't return connection object") } defer conn.Close() // create a stream - _, err := cT.NewStream(context.Background(), &CallHdr{}) + _, err := tr.NewStream(context.Background(), &CallHdr{}) if err != nil { t.Fatalf("Failed to create a new stream: %v", err) } // Give keepalive some time time.Sleep(4 * time.Second) // Asser that transport was closed - cT.mu.Lock() - defer cT.mu.Unlock() - if cT.state == reachable { + ct := tr.(*http2Client) + ct.mu.Lock() + defer ct.mu.Unlock() + if ct.state == reachable { t.Fatalf("Test failed: Expected client transport to have closed.") } } @@ -387,20 +383,20 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { keepalive.Enable() defer keepalive.Disable() - s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.Params{ - Time: 2 * time.Second, // keepalive time = 2 sec - Timeout: 1 * time.Second, // keepalive timeout = 1 sec - PermitNoStream: true, // don't run keepalive even with no RPCs + s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.Params{ + Time: 2 * time.Second, // keepalive time = 2 sec + Timeout: 1 * time.Second, // keepalive timeout = 1 sec + PermitWithoutStream: true, // don't run keepalive even with no RPCs }}) defer s.stop() defer tr.Close() // Give keep alive some time time.Sleep(4 * time.Second) // Assert that transport is healthy - cT := tr.(*http2Client) - cT.mu.Lock() - defer cT.mu.Unlock() - if cT.state != reachable { + ct := tr.(*http2Client) + ct.mu.Lock() + defer ct.mu.Unlock() + if ct.state != reachable { t.Fatalf("Test failed: Expected client transport to be healthy.") } } From 49d2a88c2778012cbb3ccd4dc3411adebe6ef5f8 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 6 Jan 2017 17:18:22 -0800 Subject: [PATCH 09/21] Comment formatting --- clientconn.go | 2 +- keepalive/keepalive.go | 14 ++++++------ transport/http2_client.go | 10 ++++----- transport/transport.go | 2 +- transport/transport_test.go | 44 ++++++++++++++++++------------------- 5 files changed, 35 insertions(+), 37 deletions(-) diff --git a/clientconn.go b/clientconn.go index 8b028053..828dcf83 100644 --- a/clientconn.go +++ b/clientconn.go @@ -231,7 +231,7 @@ func WithUserAgent(s string) DialOption { } } -// WithKeepaliveParams returns a DialOption that specifies a user agent string for all the RPCs. +// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. func WithKeepaliveParams(k keepalive.Params) DialOption { return func(o *dialOptions) { o.copts.KeepaliveParams = k diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index a1005a94..1546c6b1 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -16,33 +16,33 @@ type Params struct { PermitWithoutStream bool } -// DefaultParams contains default values for keepalive parameters +// DefaultParams contains default values for keepalive parameters. var DefaultParams = Params{ - Time: time.Duration(math.MaxInt64), // default to infinite + Time: time.Duration(math.MaxInt64), // default to infinite. Timeout: time.Duration(20 * time.Second), } -// mu is a mutex to protect Enabled variable +// mu is a mutex to protect Enabled variable. var mu = sync.Mutex{} -// enable is a knob used to turn keepalive on or off +// enable is a knob used to turn keepalive on or off. var enable = false -// Enabled exposes the value of enable variable +// Enabled exposes the value of enable variable. func Enabled() bool { mu.Lock() defer mu.Unlock() return enable } -// Enable can be called to enable keepalives +// Enable can be called to enable keepalives. func Enable() { mu.Lock() defer mu.Unlock() enable = true } -// Disable can be called to disable keepalive +// Disable can be called to disable keepalive. func Disable() { mu.Lock() defer mu.Unlock() diff --git a/transport/http2_client.go b/transport/http2_client.go index 04d474ac..8a8d7d99 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -100,9 +100,9 @@ type http2Client struct { creds []credentials.PerRPCCredentials - // activity counter - activity uint64 // accessed atomically - // keepalive parameters + // Counter to keep track of activity(reading and writing on transport). + activity uint64 // accessed atomically. + // keepalive parameters. kp keepalive.Params mu sync.Mutex // guard the following variables @@ -702,7 +702,6 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { break } } - // activity++ atomic.AddUint64(&t.activity, 1) if !opts.Last { return nil @@ -990,7 +989,6 @@ func (t *http2Client) reader() { // loop to keep reading incoming messages on this transport. for { frame, err := t.framer.readFrame() - // activity++ atomic.AddUint64(&t.activity, 1) if err != nil { // Abort an active stream if the http2.Framer returns a @@ -1118,7 +1116,7 @@ func (t *http2Client) controller() { isPingSent = false } else { if !isPingSent { - // send ping + // Send ping. t.controlBuf.put(keepalivePing) isPingSent = true timer.Reset(t.kp.Timeout) diff --git a/transport/transport.go b/transport/transport.go index 446b6aef..fd3a5818 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -381,7 +381,7 @@ type ConnectOptions struct { PerRPCCredentials []credentials.PerRPCCredentials // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials - // Keepalive parameters + // KeepaliveParams stores the keepalive parameters. KeepaliveParams keepalive.Params } diff --git a/transport/transport_test.go b/transport/transport_test.go index 1b80625a..fe3907ed 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -279,7 +279,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con if err != nil { t.Fatalf("Failed to listen: %v", err) } - // launch a non responsive server + // Launch a non responsive server. go func() { defer lis.Close() conn, err := lis.Accept() @@ -302,9 +302,9 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { defer keepalive.Disable() done := make(chan net.Conn, 1) tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ - Time: 2 * time.Second, // keepalive time = 2 sec - Timeout: 1 * time.Second, // keepalive timeout = 1 sec - PermitWithoutStream: true, // run keepalive even with no RPCs + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. + PermitWithoutStream: true, // Run keepalive even with no RPCs. }}, done) defer tr.Close() conn, ok := <-done @@ -312,9 +312,9 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { t.Fatalf("Server didn't return connection object") } defer conn.Close() - // Sleep for keepalive to close the connection + // Sleep for keepalive to close the connection. time.Sleep(4 * time.Second) - // Assert that the connection was closed + // Assert that the connection was closed. ct := tr.(*http2Client) ct.mu.Lock() defer ct.mu.Unlock() @@ -328,9 +328,9 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { defer keepalive.Disable() done := make(chan net.Conn, 1) tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ - Time: 2 * time.Second, // keepalive time = 2 sec - Timeout: 1 * time.Second, // keepalive timeout = 1 sec - PermitWithoutStream: false, // don't run keepalive even with no RPCs + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. + PermitWithoutStream: false, // Don't run keepalive even with no RPCs. }}, done) defer tr.Close() conn, ok := <-done @@ -338,9 +338,9 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { t.Fatalf("server didn't reutrn connection object") } defer conn.Close() - // Give keepalive some time + // Give keepalive some time. time.Sleep(4 * time.Second) - // Assert that connections is still healthy + // Assert that connections is still healthy. ct := tr.(*http2Client) ct.mu.Lock() defer ct.mu.Unlock() @@ -354,9 +354,9 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { defer keepalive.Disable() done := make(chan net.Conn, 1) tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ - Time: 2 * time.Second, // keepalive time = 2 sec - Timeout: 1 * time.Second, // keepalive timeout = 1 sec - PermitWithoutStream: false, // don't run keepalive even with no RPCs + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. + PermitWithoutStream: false, // Don't run keepalive even with no RPCs. }}, done) defer tr.Close() conn, ok := <-done @@ -364,14 +364,14 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { t.Fatalf("Server didn't return connection object") } defer conn.Close() - // create a stream + // Create a stream. _, err := tr.NewStream(context.Background(), &CallHdr{}) if err != nil { t.Fatalf("Failed to create a new stream: %v", err) } - // Give keepalive some time + // Give keepalive some time. time.Sleep(4 * time.Second) - // Asser that transport was closed + // Assert that transport was closed. ct := tr.(*http2Client) ct.mu.Lock() defer ct.mu.Unlock() @@ -384,15 +384,15 @@ func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { keepalive.Enable() defer keepalive.Disable() s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.Params{ - Time: 2 * time.Second, // keepalive time = 2 sec - Timeout: 1 * time.Second, // keepalive timeout = 1 sec - PermitWithoutStream: true, // don't run keepalive even with no RPCs + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. + PermitWithoutStream: true, // Don't run keepalive even with no RPCs. }}) defer s.stop() defer tr.Close() - // Give keep alive some time + // Give keep alive some time. time.Sleep(4 * time.Second) - // Assert that transport is healthy + // Assert that transport is healthy. ct := tr.(*http2Client) ct.mu.Lock() defer ct.mu.Unlock() From b2448f6c9bcc4050829d427ad449f68756ccf3e3 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 27 Jan 2017 11:15:56 -0800 Subject: [PATCH 10/21] intermediate change --- transport/http2_client.go | 53 +++++++++++++++++++++------------------ 1 file changed, 28 insertions(+), 25 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index 8a8d7d99..6a40dff0 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -1075,36 +1075,39 @@ func (t *http2Client) controller() { } isPingSent := false keepalivePing := &ping{data: [8]byte{}} + cchan := t.controlBuf.get() + wchan := nil for { select { - case i := <-t.controlBuf.get(): + case i := <-cchan: t.controlBuf.load() - select { - case <-t.writableChan: - switch i := i.(type) { - case *windowUpdate: - t.framer.writeWindowUpdate(true, i.streamID, i.increment) - case *settings: - if i.ack { - t.framer.writeSettingsAck(true) - t.applySettings(i.ss) - } else { - t.framer.writeSettings(true, i.ss...) - } - case *resetStream: - t.framer.writeRSTStream(true, i.streamID, i.code) - case *flushIO: - t.framer.flushWrite() - case *ping: - t.framer.writePing(true, i.ack, i.data) - default: - grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i) + wchan = t.writableChan + cchan = nil + continue + case <-wchan: + switch i := i.(type) { + case *windowUpdate: + t.framer.writeWindowUpdate(true, i.streamID, i.increment) + case *settings: + if i.ack { + t.framer.writeSettingsAck(true) + t.applySettings(i.ss) + } else { + t.framer.writeSettings(true, i.ss...) } - t.writableChan <- 0 - continue - case <-t.shutdownChan: - return + case *resetStream: + t.framer.writeRSTStream(true, i.streamID, i.code) + case *flushIO: + t.framer.flushWrite() + case *ping: + t.framer.writePing(true, i.ack, i.data) + default: + grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i) } + wchan <- 0 + wchan = nil + cchan = t.controlBuf.get() + continue case <-timer.C: t.mu.Lock() ns := len(t.activeStreams) From 5fcb58f02dd79f34be50624f33324f8f5dc57b93 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Mon, 30 Jan 2017 11:32:54 -0800 Subject: [PATCH 11/21] post review updates --- clientconn.go | 2 +- keepalive/keepalive.go | 46 +++++++-------------- transport/control.go | 5 +++ transport/http2_client.go | 80 +++++++++++++++++++++++++++---------- transport/transport.go | 8 +++- transport/transport_test.go | 18 +++------ 6 files changed, 91 insertions(+), 68 deletions(-) diff --git a/clientconn.go b/clientconn.go index 05a1abc9..8e3b1bb8 100644 --- a/clientconn.go +++ b/clientconn.go @@ -251,7 +251,7 @@ func WithUserAgent(s string) DialOption { } // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. -func WithKeepaliveParams(k keepalive.Params) DialOption { +func WithKeepaliveParams(k *keepalive.Params) DialOption { return func(o *dialOptions) { o.copts.KeepaliveParams = k } diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 1546c6b1..4a18d52c 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -2,7 +2,6 @@ package keepalive import ( "math" - "sync" "time" ) @@ -16,35 +15,20 @@ type Params struct { PermitWithoutStream bool } -// DefaultParams contains default values for keepalive parameters. -var DefaultParams = Params{ - Time: time.Duration(math.MaxInt64), // default to infinite. - Timeout: time.Duration(20 * time.Second), +// Validate is used to validate the keepalive parameters. +// Time durations initialized to 0 will be replaced with default Values. +func (p *Params) Validate() { + if p.Time == 0 { + p.Time = Infinity + } + if p.Timeout == 0 { + p.Time = TwentySec + } } -// mu is a mutex to protect Enabled variable. -var mu = sync.Mutex{} - -// enable is a knob used to turn keepalive on or off. -var enable = false - -// Enabled exposes the value of enable variable. -func Enabled() bool { - mu.Lock() - defer mu.Unlock() - return enable -} - -// Enable can be called to enable keepalives. -func Enable() { - mu.Lock() - defer mu.Unlock() - enable = true -} - -// Disable can be called to disable keepalive. -func Disable() { - mu.Lock() - defer mu.Unlock() - enable = false -} +const ( + // Infinity is the default value of keepalive time. + Infinity = time.Duration(math.MaxInt64) + // TwentySec is the default value of timeout. + TwentySec = time.Duration(20 * time.Second) +) diff --git a/transport/control.go b/transport/control.go index 2586cba4..ca93de29 100644 --- a/transport/control.go +++ b/transport/control.go @@ -51,6 +51,11 @@ const ( // The following defines various control items which could flow through // the control buffer of transport. They represent different aspects of // control tasks, e.g., flow control, settings, streaming resetting, etc. + +type fireKeepaliveTimer struct{} + +func (fireKeepaliveTimer) item() {} + type windowUpdate struct { streamID uint32 increment uint32 diff --git a/transport/http2_client.go b/transport/http2_client.go index 8655e740..ee4fda8a 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -101,10 +101,14 @@ type http2Client struct { creds []credentials.PerRPCCredentials - // Counter to keep track of activity(reading and writing on transport). + // Counter to keep track of reading activity on transport. activity uint64 // accessed atomically. + // Flag to keep track if the keepalive check was skipped because there + // were no active streams and keepalive.PermitWithoutStream was false + // keepaliveSkipped = 1 means skipped + keepaliveSkipped uint32 // accessed atomically // keepalive parameters. - kp keepalive.Params + kp *keepalive.Params statsHandler stats.Handler mu sync.Mutex // guard the following variables @@ -188,9 +192,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( if opts.UserAgent != "" { ua = opts.UserAgent + " " + ua } - kp := keepalive.DefaultParams - if opts.KeepaliveParams != (keepalive.Params{}) { + kp := defaultKeepaliveParams + if opts.KeepaliveParams != nil { kp = opts.KeepaliveParams + kp.Validate() } var buf bytes.Buffer t := &http2Client{ @@ -384,6 +389,11 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea s := t.newStream(ctx, callHdr) s.clientStatsCtx = userCtx t.activeStreams[s.id] = s + // if the number of active streams are now equal to 1, then check if keepalive + // was being skipped. If so, fire the keepalive timer + if len(t.activeStreams) == 1 && atomic.LoadUint32(&t.keepaliveSkipped) == 1 { + t.controlBuf.put(fireKeepaliveTimer{}) + } // This stream is not counted when applySetings(...) initialize t.streamsQuota. // Reset t.streamsQuota to the right value. @@ -717,7 +727,6 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error { break } } - atomic.AddUint64(&t.activity, 1) if !opts.Last { return nil } @@ -1082,7 +1091,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) { // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Client) controller() { timer := time.NewTimer(t.kp.Time) - if !keepalive.Enabled() { + if t.kp.Timeout == keepalive.Infinity { // Prevent the timer from firing, ever. if !timer.Stop() { <-timer.C @@ -1090,17 +1099,36 @@ func (t *http2Client) controller() { } isPingSent := false keepalivePing := &ping{data: [8]byte{}} + // select toggles between control channel and writable chanel. + // We need to wait on writable channel only after having recieved + // a control message that requires controller to take an action. + // However, while waiting on either of these channels, the keepalive + // timer channel or shutdown channel might trigger. Such toggling + // take care of this case. cchan := t.controlBuf.get() - wchan := nil + var wchan chan int + var controlMsg item for { select { - case i := <-cchan: + case controlMsg = <-cchan: t.controlBuf.load() + // If controlMsg is of type fireKeepaliveTimer, + // then check if the keepaliveSkipped flag is still set. + if _, ok := controlMsg.(fireKeepaliveTimer); ok { + if atomic.LoadUint32(&t.keepaliveSkipped) == 1 { + // Reset the timer to 0 so that it fires. + if !timer.Stop() { + <-timer.C + } + timer.Reset(0) + } + continue + } wchan = t.writableChan cchan = nil continue case <-wchan: - switch i := i.(type) { + switch i := controlMsg.(type) { case *windowUpdate: t.framer.writeWindowUpdate(true, i.streamID, i.increment) case *settings: @@ -1127,22 +1155,30 @@ func (t *http2Client) controller() { t.mu.Lock() ns := len(t.activeStreams) t.mu.Unlock() - // Get the activity counter value and reset it. - a := atomic.SwapUint64(&t.activity, 0) - if a > 0 || (!t.kp.PermitWithoutStream && ns < 1) { + if !t.kp.PermitWithoutStream && ns < 1 { timer.Reset(t.kp.Time) isPingSent = false - } else { - if !isPingSent { - // Send ping. - t.controlBuf.put(keepalivePing) - isPingSent = true - timer.Reset(t.kp.Timeout) - } else { - t.Close() - continue - } + // set flag that signifyies keepalive was skipped + atomic.StoreUint32(&t.keepaliveSkipped, 1) + continue } + // reset the keepaliveSkipped flag + atomic.StoreUint32(&t.keepaliveSkipped, 0) + // Get the activity counter value and reset it. + a := atomic.SwapUint64(&t.activity, 0) + if a > 0 { + timer.Reset(t.kp.Time) + isPingSent = false + continue + } + if !isPingSent { + // Send ping. + t.controlBuf.put(keepalivePing) + isPingSent = true + timer.Reset(t.kp.Timeout) + continue + } + t.Close() case <-t.shutdownChan: return } diff --git a/transport/transport.go b/transport/transport.go index e1331bbf..f6b1754b 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -384,11 +384,17 @@ type ConnectOptions struct { // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials // KeepaliveParams stores the keepalive parameters. - KeepaliveParams keepalive.Params + KeepaliveParams *keepalive.Params // StatsHandler stores the handler for stats. StatsHandler stats.Handler } +// default values for keepalive parameters. +var defaultKeepaliveParams = &keepalive.Params{ + Time: keepalive.Infinity, // default to infinite. + Timeout: keepalive.TwentySec, +} + // TargetInfo contains the information of the target such as network address and metadata. type TargetInfo struct { Addr string diff --git a/transport/transport_test.go b/transport/transport_test.go index fe3907ed..3ab5a9f7 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -298,10 +298,8 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con } func TestKeepaliveClientClosesIdleTransport(t *testing.T) { - keepalive.Enable() - defer keepalive.Disable() done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &keepalive.Params{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs. @@ -324,10 +322,8 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { } func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { - keepalive.Enable() - defer keepalive.Disable() done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &keepalive.Params{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: false, // Don't run keepalive even with no RPCs. @@ -350,10 +346,8 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { } func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { - keepalive.Enable() - defer keepalive.Disable() done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &keepalive.Params{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: false, // Don't run keepalive even with no RPCs. @@ -381,12 +375,10 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { - keepalive.Enable() - defer keepalive.Disable() - s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.Params{ + s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: &keepalive.Params{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. - PermitWithoutStream: true, // Don't run keepalive even with no RPCs. + PermitWithoutStream: true, // Run keepalive even with no RPCs. }}) defer s.stop() defer tr.Close() From 0ba0eafe63d84c91f3049541ffd36b695749dcee Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Mon, 30 Jan 2017 11:41:27 -0800 Subject: [PATCH 12/21] update activity on recieveing settings --- transport/http2_client.go | 1 + 1 file changed, 1 insertion(+) diff --git a/transport/http2_client.go b/transport/http2_client.go index ee4fda8a..64f98d9e 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -1003,6 +1003,7 @@ func (t *http2Client) reader() { t.notifyError(err) return } + atomic.AddUint64(&t.activity, 1) sf, ok := frame.(*http2.SettingsFrame) if !ok { t.notifyError(err) From c8983baf3a993e1603cde6b5b6ee73a906b226b8 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Mon, 30 Jan 2017 12:09:08 -0800 Subject: [PATCH 13/21] const variable name change for goimports --- keepalive/keepalive.go | 6 +++--- transport/transport.go | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 4a18d52c..aeca2a6b 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -22,13 +22,13 @@ func (p *Params) Validate() { p.Time = Infinity } if p.Timeout == 0 { - p.Time = TwentySec + p.Time = TwentyScnd } } const ( // Infinity is the default value of keepalive time. Infinity = time.Duration(math.MaxInt64) - // TwentySec is the default value of timeout. - TwentySec = time.Duration(20 * time.Second) + // TwentyScnd is the default value of timeout. + TwentyScnd = time.Duration(20 * time.Second) ) diff --git a/transport/transport.go b/transport/transport.go index f6b1754b..7b34e99f 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -392,7 +392,7 @@ type ConnectOptions struct { // default values for keepalive parameters. var defaultKeepaliveParams = &keepalive.Params{ Time: keepalive.Infinity, // default to infinite. - Timeout: keepalive.TwentySec, + Timeout: keepalive.TwentyScnd, } // TargetInfo contains the information of the target such as network address and metadata. From 0bdf0596013938f890af2b2b56bcde882981e37d Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Tue, 31 Jan 2017 16:09:40 -0800 Subject: [PATCH 14/21] post review updates --- clientconn.go | 10 +++- keepalive.go | 15 +++++ transport/control.go | 4 +- transport/http2_client.go | 86 ++++++++++++++++----------- {keepalive => transport}/keepalive.go | 16 ++--- transport/transport.go | 9 ++- transport/transport_test.go | 19 +++--- 7 files changed, 95 insertions(+), 64 deletions(-) create mode 100644 keepalive.go rename {keepalive => transport}/keepalive.go (70%) diff --git a/clientconn.go b/clientconn.go index 8e3b1bb8..e4b699de 100644 --- a/clientconn.go +++ b/clientconn.go @@ -45,7 +45,6 @@ import ( "golang.org/x/net/trace" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/stats" "google.golang.org/grpc/transport" ) @@ -251,9 +250,14 @@ func WithUserAgent(s string) DialOption { } // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. -func WithKeepaliveParams(k *keepalive.Params) DialOption { +func WithKeepaliveParams(k KeepaliveParameters) DialOption { + kp := &transport.KeepaliveParameters{ + Time: k.Time, + Timeout: k.Timeout, + PermitWithoutStream: k.PermitWithoutStream, + } return func(o *dialOptions) { - o.copts.KeepaliveParams = k + o.copts.KeepaliveParams = kp } } diff --git a/keepalive.go b/keepalive.go new file mode 100644 index 00000000..6bca05d9 --- /dev/null +++ b/keepalive.go @@ -0,0 +1,15 @@ +package grpc + +import ( + "time" +) + +// KeepaliveParameters is used to set keepalive parameters. +type KeepaliveParameters struct { + // After a duration of this time the client pings the server to see if the transport is still alive. + Time time.Duration + // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. + Timeout time.Duration + //If true, client runs keepalive checks even with no active RPCs. + PermitWithoutStream bool +} diff --git a/transport/control.go b/transport/control.go index ca93de29..0ffccdf4 100644 --- a/transport/control.go +++ b/transport/control.go @@ -52,9 +52,9 @@ const ( // the control buffer of transport. They represent different aspects of // control tasks, e.g., flow control, settings, streaming resetting, etc. -type fireKeepaliveTimer struct{} +type resetKeepaliveTimer struct{} -func (fireKeepaliveTimer) item() {} +func (resetKeepaliveTimer) item() {} type windowUpdate struct { streamID uint32 diff --git a/transport/http2_client.go b/transport/http2_client.go index 64f98d9e..5adb1124 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -50,7 +50,6 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" @@ -104,11 +103,11 @@ type http2Client struct { // Counter to keep track of reading activity on transport. activity uint64 // accessed atomically. // Flag to keep track if the keepalive check was skipped because there - // were no active streams and keepalive.PermitWithoutStream was false + // were no active streams and PermitWithoutStream was false // keepaliveSkipped = 1 means skipped keepaliveSkipped uint32 // accessed atomically // keepalive parameters. - kp *keepalive.Params + kp *KeepaliveParameters statsHandler stats.Handler mu sync.Mutex // guard the following variables @@ -195,7 +194,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( kp := defaultKeepaliveParams if opts.KeepaliveParams != nil { kp = opts.KeepaliveParams - kp.Validate() + kp.validate() } var buf bytes.Buffer t := &http2Client{ @@ -392,7 +391,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea // if the number of active streams are now equal to 1, then check if keepalive // was being skipped. If so, fire the keepalive timer if len(t.activeStreams) == 1 && atomic.LoadUint32(&t.keepaliveSkipped) == 1 { - t.controlBuf.put(fireKeepaliveTimer{}) + t.framer.writePing(true, false, [8]byte{}) + t.controlBuf.put(resetKeepaliveTimer{}) } // This stream is not counted when applySetings(...) initialize t.streamsQuota. @@ -1092,7 +1092,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) { // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Client) controller() { timer := time.NewTimer(t.kp.Time) - if t.kp.Timeout == keepalive.Infinity { + if t.kp.Time == infinity { // Prevent the timer from firing, ever. if !timer.Stop() { <-timer.C @@ -1113,21 +1113,25 @@ func (t *http2Client) controller() { select { case controlMsg = <-cchan: t.controlBuf.load() - // If controlMsg is of type fireKeepaliveTimer, + // If controlMsg is of type resetKeepaliveTimer, // then check if the keepaliveSkipped flag is still set. - if _, ok := controlMsg.(fireKeepaliveTimer); ok { - if atomic.LoadUint32(&t.keepaliveSkipped) == 1 { - // Reset the timer to 0 so that it fires. - if !timer.Stop() { - <-timer.C - } - timer.Reset(0) + if _, ok := controlMsg.(resetKeepaliveTimer); ok { + atomic.StoreUint32(&t.keepaliveSkipped, 0) + // Reset the timer to timeout. + // Note : This is safe to read, since the + // only codepath that sets the keepaliveSkipped + // flag also resets the timer to infinity. + // Thus, there'll never be a case where we are + // trying to read from an empty timer channel. + isPingSent = true + if !timer.Stop() { + <-timer.C } + timer.Reset(t.kp.Timeout) continue } wchan = t.writableChan cchan = nil - continue case <-wchan: switch i := controlMsg.(type) { case *windowUpdate: @@ -1151,36 +1155,48 @@ func (t *http2Client) controller() { wchan <- 0 wchan = nil cchan = t.controlBuf.get() - continue case <-timer.C: - t.mu.Lock() - ns := len(t.activeStreams) - t.mu.Unlock() - if !t.kp.PermitWithoutStream && ns < 1 { - timer.Reset(t.kp.Time) - isPingSent = false - // set flag that signifyies keepalive was skipped - atomic.StoreUint32(&t.keepaliveSkipped, 1) - continue - } - // reset the keepaliveSkipped flag - atomic.StoreUint32(&t.keepaliveSkipped, 0) + // All code paths in this case must reset the timer. + // Get the activity counter value and reset it. a := atomic.SwapUint64(&t.activity, 0) if a > 0 { - timer.Reset(t.kp.Time) + atomic.StoreUint32(&t.keepaliveSkipped, 0) isPingSent = false + timer.Reset(t.kp.Time) continue } - if !isPingSent { - // Send ping. - t.controlBuf.put(keepalivePing) - isPingSent = true - timer.Reset(t.kp.Timeout) + if isPingSent { + t.Close() + timer.Reset(infinity) continue } - t.Close() + t.mu.Lock() + ns := len(t.activeStreams) + if !t.kp.PermitWithoutStream && ns < 1 { + // set flag that signifyies keepalive was skipped + atomic.StoreUint32(&t.keepaliveSkipped, 1) + t.mu.Unlock() + timer.Reset(infinity) + continue + } + t.mu.Unlock() + // reset the keepaliveSkipped flag + atomic.StoreUint32(&t.keepaliveSkipped, 0) + // Send ping. + t.controlBuf.put(keepalivePing) + isPingSent = true + timer.Reset(t.kp.Timeout) case <-t.shutdownChan: + // stop the keepalive timer + if !timer.Stop() { + select { + case <-timer.C: + default: + // In case we stopped the timer before the for loop began. + // This happens when keepalive time provided was infinity. + } + } return } } diff --git a/keepalive/keepalive.go b/transport/keepalive.go similarity index 70% rename from keepalive/keepalive.go rename to transport/keepalive.go index aeca2a6b..92049feb 100644 --- a/keepalive/keepalive.go +++ b/transport/keepalive.go @@ -1,12 +1,12 @@ -package keepalive +package transport import ( "math" "time" ) -// Params is used to set keepalive parameters. -type Params struct { +// KeepaliveParameters is used to set keepalive parameters. +type KeepaliveParameters struct { // After a duration of this time the client pings the server to see if the transport is still alive. Time time.Duration // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. @@ -17,18 +17,18 @@ type Params struct { // Validate is used to validate the keepalive parameters. // Time durations initialized to 0 will be replaced with default Values. -func (p *Params) Validate() { +func (p *KeepaliveParameters) validate() { if p.Time == 0 { - p.Time = Infinity + p.Time = infinity } if p.Timeout == 0 { - p.Time = TwentyScnd + p.Timeout = twentyScnd } } const ( // Infinity is the default value of keepalive time. - Infinity = time.Duration(math.MaxInt64) + infinity = time.Duration(math.MaxInt64) // TwentyScnd is the default value of timeout. - TwentyScnd = time.Duration(20 * time.Second) + twentyScnd = time.Duration(20 * time.Second) ) diff --git a/transport/transport.go b/transport/transport.go index 7b34e99f..343fbe0c 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -47,7 +47,6 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" - "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/tap" @@ -384,15 +383,15 @@ type ConnectOptions struct { // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials // KeepaliveParams stores the keepalive parameters. - KeepaliveParams *keepalive.Params + KeepaliveParams *KeepaliveParameters // StatsHandler stores the handler for stats. StatsHandler stats.Handler } // default values for keepalive parameters. -var defaultKeepaliveParams = &keepalive.Params{ - Time: keepalive.Infinity, // default to infinite. - Timeout: keepalive.TwentyScnd, +var defaultKeepaliveParams = &KeepaliveParameters{ + Time: infinity, // default to infinite. + Timeout: twentyScnd, } // TargetInfo contains the information of the target such as network address and metadata. diff --git a/transport/transport_test.go b/transport/transport_test.go index 3ab5a9f7..485f0577 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -49,7 +49,6 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "google.golang.org/grpc/codes" - "google.golang.org/grpc/keepalive" ) type server struct { @@ -299,7 +298,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con func TestKeepaliveClientClosesIdleTransport(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &keepalive.Params{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs. @@ -323,10 +322,9 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &keepalive.Params{ - Time: 2 * time.Second, // Keepalive time = 2 sec. - Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. - PermitWithoutStream: false, // Don't run keepalive even with no RPCs. + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. }}, done) defer tr.Close() conn, ok := <-done @@ -347,10 +345,9 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &keepalive.Params{ - Time: 2 * time.Second, // Keepalive time = 2 sec. - Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. - PermitWithoutStream: false, // Don't run keepalive even with no RPCs. + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ + Time: 2 * time.Second, // Keepalive time = 2 sec. + Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. }}, done) defer tr.Close() conn, ok := <-done @@ -375,7 +372,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { - s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: &keepalive.Params{ + s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs. From 336b4ea3cf04ad43ac7220800e7d26b7566ba3ea Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Fri, 10 Feb 2017 16:47:43 -0800 Subject: [PATCH 15/21] post review update --- clientconn.go | 2 +- transport/http2_client.go | 31 +++++++++++++++++-------------- transport/keepalive.go | 29 ++++++++++------------------- transport/transport.go | 8 +------- transport/transport_test.go | 8 ++++---- 5 files changed, 33 insertions(+), 45 deletions(-) diff --git a/clientconn.go b/clientconn.go index e4b699de..977987e6 100644 --- a/clientconn.go +++ b/clientconn.go @@ -251,7 +251,7 @@ func WithUserAgent(s string) DialOption { // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. func WithKeepaliveParams(k KeepaliveParameters) DialOption { - kp := &transport.KeepaliveParameters{ + kp := transport.KeepaliveParameters{ Time: k.Time, Timeout: k.Timeout, PermitWithoutStream: k.PermitWithoutStream, diff --git a/transport/http2_client.go b/transport/http2_client.go index 5adb1124..42086bc7 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -107,7 +107,7 @@ type http2Client struct { // keepaliveSkipped = 1 means skipped keepaliveSkipped uint32 // accessed atomically // keepalive parameters. - kp *KeepaliveParameters + kp KeepaliveParameters statsHandler stats.Handler mu sync.Mutex // guard the following variables @@ -191,10 +191,13 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( if opts.UserAgent != "" { ua = opts.UserAgent + " " + ua } - kp := defaultKeepaliveParams - if opts.KeepaliveParams != nil { - kp = opts.KeepaliveParams - kp.validate() + kp := opts.KeepaliveParams + // Validate keepalive parameters. + if kp.Time == 0 { + kp.Time = defaultKeepaliveTime + } + if kp.Timeout == 0 { + kp.Timeout = defaultKeepaliveTimeout } var buf bytes.Buffer t := &http2Client{ @@ -1092,11 +1095,13 @@ func (t *http2Client) applySettings(ss []http2.Setting) { // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Client) controller() { timer := time.NewTimer(t.kp.Time) + timerUsed := true if t.kp.Time == infinity { // Prevent the timer from firing, ever. if !timer.Stop() { <-timer.C } + timerUsed = false } isPingSent := false keepalivePing := &ping{data: [8]byte{}} @@ -1174,28 +1179,26 @@ func (t *http2Client) controller() { t.mu.Lock() ns := len(t.activeStreams) if !t.kp.PermitWithoutStream && ns < 1 { - // set flag that signifyies keepalive was skipped + // Set flag that signifyies keepalive was skipped. atomic.StoreUint32(&t.keepaliveSkipped, 1) t.mu.Unlock() timer.Reset(infinity) continue } t.mu.Unlock() - // reset the keepaliveSkipped flag + // Reset the keepaliveSkipped flag. atomic.StoreUint32(&t.keepaliveSkipped, 0) // Send ping. t.controlBuf.put(keepalivePing) isPingSent = true timer.Reset(t.kp.Timeout) case <-t.shutdownChan: - // stop the keepalive timer + if !timerUsed { + return + } + // Stop the keepalive timer. if !timer.Stop() { - select { - case <-timer.C: - default: - // In case we stopped the timer before the for loop began. - // This happens when keepalive time provided was infinity. - } + <-timer.C } return } diff --git a/transport/keepalive.go b/transport/keepalive.go index 92049feb..035f5b55 100644 --- a/transport/keepalive.go +++ b/transport/keepalive.go @@ -6,29 +6,20 @@ import ( ) // KeepaliveParameters is used to set keepalive parameters. +// These configure how the client will actively probe to notice when a connection broken +// and to cause activity so intermediaries are aware the connection is still in use. type KeepaliveParameters struct { - // After a duration of this time the client pings the server to see if the transport is still alive. - Time time.Duration - // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. - Timeout time.Duration + // After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive. + Time time.Duration // The current default value is inifinity. + // After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that + // the connection is closed. + Timeout time.Duration // The current default value is 20 seconds. //If true, client runs keepalive checks even with no active RPCs. PermitWithoutStream bool } -// Validate is used to validate the keepalive parameters. -// Time durations initialized to 0 will be replaced with default Values. -func (p *KeepaliveParameters) validate() { - if p.Time == 0 { - p.Time = infinity - } - if p.Timeout == 0 { - p.Timeout = twentyScnd - } -} - const ( - // Infinity is the default value of keepalive time. - infinity = time.Duration(math.MaxInt64) - // TwentyScnd is the default value of timeout. - twentyScnd = time.Duration(20 * time.Second) + infinity = time.Duration(math.MaxInt64) + defaultKeepaliveTime = infinity + defaultKeepaliveTimeout = time.Duration(20 * time.Second) ) diff --git a/transport/transport.go b/transport/transport.go index 343fbe0c..0f18d05d 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -383,17 +383,11 @@ type ConnectOptions struct { // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials // KeepaliveParams stores the keepalive parameters. - KeepaliveParams *KeepaliveParameters + KeepaliveParams KeepaliveParameters // StatsHandler stores the handler for stats. StatsHandler stats.Handler } -// default values for keepalive parameters. -var defaultKeepaliveParams = &KeepaliveParameters{ - Time: infinity, // default to infinite. - Timeout: twentyScnd, -} - // TargetInfo contains the information of the target such as network address and metadata. type TargetInfo struct { Addr string diff --git a/transport/transport_test.go b/transport/transport_test.go index 485f0577..37138a7a 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -298,7 +298,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con func TestKeepaliveClientClosesIdleTransport(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs. @@ -322,7 +322,7 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. }}, done) @@ -345,7 +345,7 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. }}, done) @@ -372,7 +372,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { - s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ + s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: KeepaliveParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs. From bc3cca9945440518bdc093a7bce7f09bc9344428 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Tue, 28 Feb 2017 11:49:51 -0800 Subject: [PATCH 16/21] Implementation changes;Using an independent goroutine for keepalive checks. --- clientconn.go | 8 +- keepalive.go => keepalive/keepalive.go | 6 +- transport/http2_client.go | 193 ++++++++++++------------- transport/keepalive.go | 25 ---- transport/transport.go | 12 +- transport/transport_test.go | 9 +- 6 files changed, 115 insertions(+), 138 deletions(-) rename keepalive.go => keepalive/keepalive.go (75%) delete mode 100644 transport/keepalive.go diff --git a/clientconn.go b/clientconn.go index 977987e6..4e4e11d5 100644 --- a/clientconn.go +++ b/clientconn.go @@ -45,6 +45,7 @@ import ( "golang.org/x/net/trace" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/stats" "google.golang.org/grpc/transport" ) @@ -250,12 +251,7 @@ func WithUserAgent(s string) DialOption { } // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. -func WithKeepaliveParams(k KeepaliveParameters) DialOption { - kp := transport.KeepaliveParameters{ - Time: k.Time, - Timeout: k.Timeout, - PermitWithoutStream: k.PermitWithoutStream, - } +func WithKeepaliveParams(kp keepalive.ClientParameters) DialOption { return func(o *dialOptions) { o.copts.KeepaliveParams = kp } diff --git a/keepalive.go b/keepalive/keepalive.go similarity index 75% rename from keepalive.go rename to keepalive/keepalive.go index 6bca05d9..b448cbd9 100644 --- a/keepalive.go +++ b/keepalive/keepalive.go @@ -1,11 +1,11 @@ -package grpc +package keepalive import ( "time" ) -// KeepaliveParameters is used to set keepalive parameters. -type KeepaliveParameters struct { +// ClientParameters is used to set keepalive parameters for the client side. +type ClientParameters struct { // After a duration of this time the client pings the server to see if the transport is still alive. Time time.Duration // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. diff --git a/transport/http2_client.go b/transport/http2_client.go index 42086bc7..29f6f00c 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -50,6 +50,7 @@ import ( "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" "google.golang.org/grpc/stats" @@ -81,6 +82,8 @@ type http2Client struct { // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) // that the server sent GoAway on this transport. goAway chan struct{} + // awakenKeepalive is used to tell keepalive goroutine to reset keepalive timer. + awakenKeepalive chan int framer *framer hBuf *bytes.Buffer // the buffer for HPACK encoding @@ -101,13 +104,10 @@ type http2Client struct { creds []credentials.PerRPCCredentials // Counter to keep track of reading activity on transport. - activity uint64 // accessed atomically. - // Flag to keep track if the keepalive check was skipped because there - // were no active streams and PermitWithoutStream was false - // keepaliveSkipped = 1 means skipped - keepaliveSkipped uint32 // accessed atomically - // keepalive parameters. - kp KeepaliveParameters + activity uint64 // Accessed atomically. + + kp keepalive.ClientParameters + statsHandler stats.Handler mu sync.Mutex // guard the following variables @@ -121,6 +121,11 @@ type http2Client struct { goAwayID uint32 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. prevGoAwayID uint32 + + // Flag to keep track if the keepalive check was skipped because there + // were no active streams and PermitWithoutStream was false + // keepaliveSkipped = 1 means skipped + keepaliveSkipped uint32 } func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { @@ -215,6 +220,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( shutdownChan: make(chan struct{}), errorChan: make(chan struct{}), goAway: make(chan struct{}), + awakenKeepalive: make(chan int, 1), framer: newFramer(conn), hBuf: &buf, hEnc: hpack.NewEncoder(&buf), @@ -230,6 +236,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( kp: kp, statsHandler: opts.StatsHandler, } + // make sure awakenKeepalive can't be written upon. + // keepalive routine will make it writable, if need be. + t.awakenKeepalive <- 0 if t.statsHandler != nil { t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ RemoteAddr: t.remoteAddr, @@ -274,6 +283,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( } } go t.controller() + go t.keepalive() t.writableChan <- 0 return t, nil } @@ -391,11 +401,14 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea s := t.newStream(ctx, callHdr) s.clientStatsCtx = userCtx t.activeStreams[s.id] = s - // if the number of active streams are now equal to 1, then check if keepalive - // was being skipped. If so, fire the keepalive timer - if len(t.activeStreams) == 1 && atomic.LoadUint32(&t.keepaliveSkipped) == 1 { - t.framer.writePing(true, false, [8]byte{}) - t.controlBuf.put(resetKeepaliveTimer{}) + // If the number of active streams change from 0 to 1, then check if keepalive + // has gone dormant. If so, wake it up. + if len(t.activeStreams) == 1 { + select { + case t.awakenKeepalive <- 0: + t.framer.writePing(true, false, [8]byte{}) + default: + } } // This stream is not counted when applySetings(...) initialize t.streamsQuota. @@ -1094,109 +1107,91 @@ func (t *http2Client) applySettings(ss []http2.Setting) { // controller running in a separate goroutine takes charge of sending control // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Client) controller() { - timer := time.NewTimer(t.kp.Time) - timerUsed := true - if t.kp.Time == infinity { - // Prevent the timer from firing, ever. - if !timer.Stop() { - <-timer.C - } - timerUsed = false - } - isPingSent := false - keepalivePing := &ping{data: [8]byte{}} - // select toggles between control channel and writable chanel. - // We need to wait on writable channel only after having recieved - // a control message that requires controller to take an action. - // However, while waiting on either of these channels, the keepalive - // timer channel or shutdown channel might trigger. Such toggling - // take care of this case. - cchan := t.controlBuf.get() - var wchan chan int - var controlMsg item for { select { - case controlMsg = <-cchan: + case i := <-t.controlBuf.get(): t.controlBuf.load() - // If controlMsg is of type resetKeepaliveTimer, - // then check if the keepaliveSkipped flag is still set. - if _, ok := controlMsg.(resetKeepaliveTimer); ok { - atomic.StoreUint32(&t.keepaliveSkipped, 0) - // Reset the timer to timeout. - // Note : This is safe to read, since the - // only codepath that sets the keepaliveSkipped - // flag also resets the timer to infinity. - // Thus, there'll never be a case where we are - // trying to read from an empty timer channel. - isPingSent = true - if !timer.Stop() { - <-timer.C + select { + case <-t.writableChan: + switch i := i.(type) { + case *windowUpdate: + t.framer.writeWindowUpdate(true, i.streamID, i.increment) + case *settings: + if i.ack { + t.framer.writeSettingsAck(true) + t.applySettings(i.ss) + } else { + t.framer.writeSettings(true, i.ss...) + } + case *resetStream: + t.framer.writeRSTStream(true, i.streamID, i.code) + case *flushIO: + t.framer.flushWrite() + case *ping: + t.framer.writePing(true, i.ack, i.data) + default: + grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i) } - timer.Reset(t.kp.Timeout) + t.writableChan <- 0 continue + case <-t.shutdownChan: + return } - wchan = t.writableChan - cchan = nil - case <-wchan: - switch i := controlMsg.(type) { - case *windowUpdate: - t.framer.writeWindowUpdate(true, i.streamID, i.increment) - case *settings: - if i.ack { - t.framer.writeSettingsAck(true) - t.applySettings(i.ss) - } else { - t.framer.writeSettings(true, i.ss...) - } - case *resetStream: - t.framer.writeRSTStream(true, i.streamID, i.code) - case *flushIO: - t.framer.flushWrite() - case *ping: - t.framer.writePing(true, i.ack, i.data) - default: - grpclog.Printf("transport: http2Client.controller got unexpected item type %v\n", i) - } - wchan <- 0 - wchan = nil - cchan = t.controlBuf.get() - case <-timer.C: - // All code paths in this case must reset the timer. + case <-t.shutdownChan: + return + } + } +} - // Get the activity counter value and reset it. - a := atomic.SwapUint64(&t.activity, 0) - if a > 0 { - atomic.StoreUint32(&t.keepaliveSkipped, 0) - isPingSent = false +// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. +func (t *http2Client) keepalive() { + if t.kp.Time == time.Duration(math.MaxInt64) { + return + } + p := &ping{data: [8]byte{}} + timer := time.NewTimer(t.kp.Time) + for { + select { + case <-timer.C: + if a := atomic.SwapUint64(&t.activity, 0); a > 0 { timer.Reset(t.kp.Time) continue } - if isPingSent { - t.Close() - timer.Reset(infinity) - continue - } + // Check if keepalive should go dormant. t.mu.Lock() - ns := len(t.activeStreams) - if !t.kp.PermitWithoutStream && ns < 1 { - // Set flag that signifyies keepalive was skipped. - atomic.StoreUint32(&t.keepaliveSkipped, 1) + if len(t.activeStreams) < 1 && !t.kp.PermitWithoutStream { + // Make awakenKeepalive writable. + <-t.awakenKeepalive t.mu.Unlock() - timer.Reset(infinity) - continue + select { + case <-t.awakenKeepalive: + // If the control gets here a ping has been sent + // need to reset the timer with keepalive.Timeout. + case <-t.shutdownChan: + return + } + } else { + t.mu.Unlock() + // Send ping. + t.controlBuf.put(p) } - t.mu.Unlock() - // Reset the keepaliveSkipped flag. - atomic.StoreUint32(&t.keepaliveSkipped, 0) - // Send ping. - t.controlBuf.put(keepalivePing) - isPingSent = true + + // By the time control gets here a ping has been sent one way or the other. timer.Reset(t.kp.Timeout) - case <-t.shutdownChan: - if !timerUsed { + select { + case <-timer.C: + if a := atomic.SwapUint64(&t.activity, 0); a > 0 { + timer.Reset(t.kp.Time) + continue + } + t.Close() + case <-t.shutdownChan: + if !timer.Stop() { + <-timer.C + } return } - // Stop the keepalive timer. + case <-t.shutdownChan: if !timer.Stop() { <-timer.C } diff --git a/transport/keepalive.go b/transport/keepalive.go deleted file mode 100644 index 035f5b55..00000000 --- a/transport/keepalive.go +++ /dev/null @@ -1,25 +0,0 @@ -package transport - -import ( - "math" - "time" -) - -// KeepaliveParameters is used to set keepalive parameters. -// These configure how the client will actively probe to notice when a connection broken -// and to cause activity so intermediaries are aware the connection is still in use. -type KeepaliveParameters struct { - // After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive. - Time time.Duration // The current default value is inifinity. - // After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that - // the connection is closed. - Timeout time.Duration // The current default value is 20 seconds. - //If true, client runs keepalive checks even with no active RPCs. - PermitWithoutStream bool -} - -const ( - infinity = time.Duration(math.MaxInt64) - defaultKeepaliveTime = infinity - defaultKeepaliveTimeout = time.Duration(20 * time.Second) -) diff --git a/transport/transport.go b/transport/transport.go index 0f18d05d..fb0df6d4 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -41,12 +41,15 @@ import ( "bytes" "fmt" "io" + "math" "net" "sync" + "time" "golang.org/x/net/context" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" + "google.golang.org/grpc/keepalive" "google.golang.org/grpc/metadata" "google.golang.org/grpc/stats" "google.golang.org/grpc/tap" @@ -383,7 +386,7 @@ type ConnectOptions struct { // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials // KeepaliveParams stores the keepalive parameters. - KeepaliveParams KeepaliveParameters + KeepaliveParams keepalive.ClientParameters // StatsHandler stores the handler for stats. StatsHandler stats.Handler } @@ -608,3 +611,10 @@ func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <- return i, nil } } + +// keepalive related constants. +const ( + infinity = time.Duration(math.MaxInt64) + defaultKeepaliveTime = infinity + defaultKeepaliveTimeout = time.Duration(20 * time.Second) +) diff --git a/transport/transport_test.go b/transport/transport_test.go index 37138a7a..e28a556c 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -49,6 +49,7 @@ import ( "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" "google.golang.org/grpc/codes" + "google.golang.org/grpc/keepalive" ) type server struct { @@ -298,7 +299,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con func TestKeepaliveClientClosesIdleTransport(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs. @@ -322,7 +323,7 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. }}, done) @@ -345,7 +346,7 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. }}, done) @@ -372,7 +373,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { - s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: KeepaliveParameters{ + s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs. From 3ad78109b12711ede666726b3ab85b0b66f279c3 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Tue, 28 Feb 2017 11:53:13 -0800 Subject: [PATCH 17/21] removing dead code --- transport/http2_client.go | 5 ----- 1 file changed, 5 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index 29f6f00c..6c0d586b 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -121,11 +121,6 @@ type http2Client struct { goAwayID uint32 // prevGoAway ID records the Last-Stream-ID in the previous GOAway frame. prevGoAwayID uint32 - - // Flag to keep track if the keepalive check was skipped because there - // were no active streams and PermitWithoutStream was false - // keepaliveSkipped = 1 means skipped - keepaliveSkipped uint32 } func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { From e328ededa4148255d4cdb3bdd636e5d2fa191f5d Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Tue, 28 Feb 2017 12:41:34 -0800 Subject: [PATCH 18/21] more refactoring --- transport/control.go | 12 ++++++------ transport/transport.go | 9 --------- 2 files changed, 6 insertions(+), 15 deletions(-) diff --git a/transport/control.go b/transport/control.go index 0ffccdf4..63810953 100644 --- a/transport/control.go +++ b/transport/control.go @@ -35,7 +35,9 @@ package transport import ( "fmt" + "math" "sync" + "time" "golang.org/x/net/http2" ) @@ -44,18 +46,16 @@ const ( // The default value of flow control window size in HTTP2 spec. defaultWindowSize = 65535 // The initial window size for flow control. - initialWindowSize = defaultWindowSize // for an RPC - initialConnWindowSize = defaultWindowSize * 16 // for a connection + initialWindowSize = defaultWindowSize // for an RPC + initialConnWindowSize = defaultWindowSize * 16 // for a connection + defaultKeepaliveTime = time.Duration(math.MaxInt64) + defaultKeepaliveTimeout = time.Duration(20 * time.Second) ) // The following defines various control items which could flow through // the control buffer of transport. They represent different aspects of // control tasks, e.g., flow control, settings, streaming resetting, etc. -type resetKeepaliveTimer struct{} - -func (resetKeepaliveTimer) item() {} - type windowUpdate struct { streamID uint32 increment uint32 diff --git a/transport/transport.go b/transport/transport.go index fb0df6d4..81a4f6de 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -41,10 +41,8 @@ import ( "bytes" "fmt" "io" - "math" "net" "sync" - "time" "golang.org/x/net/context" "google.golang.org/grpc/codes" @@ -611,10 +609,3 @@ func wait(ctx context.Context, done, goAway, closing <-chan struct{}, proceed <- return i, nil } } - -// keepalive related constants. -const ( - infinity = time.Duration(math.MaxInt64) - defaultKeepaliveTime = infinity - defaultKeepaliveTimeout = time.Duration(20 * time.Second) -) From f22061907e9b821452fd8d342da875372787c572 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Wed, 1 Mar 2017 13:08:05 -0800 Subject: [PATCH 19/21] post-review updates --- keepalive/keepalive.go | 4 ++-- transport/control.go | 3 ++- transport/http2_client.go | 10 +++++----- 3 files changed, 9 insertions(+), 8 deletions(-) diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index b448cbd9..19520f41 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -8,8 +8,8 @@ import ( type ClientParameters struct { // After a duration of this time the client pings the server to see if the transport is still alive. Time time.Duration - // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. + // After having pinged for keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. Timeout time.Duration - //If true, client runs keepalive checks even with no active RPCs. + // If true, client runs keepalive checks even with no active RPCs. PermitWithoutStream bool } diff --git a/transport/control.go b/transport/control.go index 63810953..68b9e650 100644 --- a/transport/control.go +++ b/transport/control.go @@ -48,7 +48,8 @@ const ( // The initial window size for flow control. initialWindowSize = defaultWindowSize // for an RPC initialConnWindowSize = defaultWindowSize * 16 // for a connection - defaultKeepaliveTime = time.Duration(math.MaxInt64) + infinity = time.Duration(math.MaxInt64) + defaultKeepaliveTime = infinity defaultKeepaliveTimeout = time.Duration(20 * time.Second) ) diff --git a/transport/http2_client.go b/transport/http2_client.go index 6c0d586b..6813f7b5 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -83,7 +83,7 @@ type http2Client struct { // that the server sent GoAway on this transport. goAway chan struct{} // awakenKeepalive is used to tell keepalive goroutine to reset keepalive timer. - awakenKeepalive chan int + awakenKeepalive chan struct{} framer *framer hBuf *bytes.Buffer // the buffer for HPACK encoding @@ -215,7 +215,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( shutdownChan: make(chan struct{}), errorChan: make(chan struct{}), goAway: make(chan struct{}), - awakenKeepalive: make(chan int, 1), + awakenKeepalive: make(chan struct{}, 1), framer: newFramer(conn), hBuf: &buf, hEnc: hpack.NewEncoder(&buf), @@ -233,7 +233,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( } // make sure awakenKeepalive can't be written upon. // keepalive routine will make it writable, if need be. - t.awakenKeepalive <- 0 + t.awakenKeepalive <- struct{}{} if t.statsHandler != nil { t.ctx = t.statsHandler.TagConn(t.ctx, &stats.ConnTagInfo{ RemoteAddr: t.remoteAddr, @@ -400,7 +400,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea // has gone dormant. If so, wake it up. if len(t.activeStreams) == 1 { select { - case t.awakenKeepalive <- 0: + case t.awakenKeepalive <- struct{}{}: t.framer.writePing(true, false, [8]byte{}) default: } @@ -1140,7 +1140,7 @@ func (t *http2Client) controller() { // keepalive running in a separate goroutune makes sure the connection is alive by sending pings. func (t *http2Client) keepalive() { - if t.kp.Time == time.Duration(math.MaxInt64) { + if t.kp.Time == infinity { return } p := &ping{data: [8]byte{}} From 25f14b7f8493444bbe78286b23ee2c0245133736 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Thu, 2 Mar 2017 18:00:55 -0800 Subject: [PATCH 20/21] post-review updates --- transport/control.go | 1 - transport/http2_client.go | 26 +++++++++++++------------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/transport/control.go b/transport/control.go index 68b9e650..c28c0403 100644 --- a/transport/control.go +++ b/transport/control.go @@ -56,7 +56,6 @@ const ( // The following defines various control items which could flow through // the control buffer of transport. They represent different aspects of // control tasks, e.g., flow control, settings, streaming resetting, etc. - type windowUpdate struct { streamID uint32 increment uint32 diff --git a/transport/http2_client.go b/transport/http2_client.go index 6813f7b5..95924ae3 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -103,10 +103,10 @@ type http2Client struct { creds []credentials.PerRPCCredentials - // Counter to keep track of reading activity on transport. - activity uint64 // Accessed atomically. - - kp keepalive.ClientParameters + // Boolean to keep track of reading activity on transport. + // 1 is true and 0 is false. + activity uint32 // Accessed atomically. + kp keepalive.ClientParameters statsHandler stats.Handler @@ -278,7 +278,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( } } go t.controller() - go t.keepalive() + if t.kp.Time != infinity { + go t.keepalive() + } t.writableChan <- 0 return t, nil } @@ -401,7 +403,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if len(t.activeStreams) == 1 { select { case t.awakenKeepalive <- struct{}{}: - t.framer.writePing(true, false, [8]byte{}) + t.framer.writePing(false, false, [8]byte{}) default: } } @@ -1014,7 +1016,7 @@ func (t *http2Client) reader() { t.notifyError(err) return } - atomic.AddUint64(&t.activity, 1) + atomic.CompareAndSwapUint32(&t.activity, 0, 1) sf, ok := frame.(*http2.SettingsFrame) if !ok { t.notifyError(err) @@ -1025,7 +1027,7 @@ func (t *http2Client) reader() { // loop to keep reading incoming messages on this transport. for { frame, err := t.framer.readFrame() - atomic.AddUint64(&t.activity, 1) + atomic.CompareAndSwapUint32(&t.activity, 0, 1) if err != nil { // Abort an active stream if the http2.Framer returns a // http2.StreamError. This can happen only if the server's response @@ -1140,15 +1142,12 @@ func (t *http2Client) controller() { // keepalive running in a separate goroutune makes sure the connection is alive by sending pings. func (t *http2Client) keepalive() { - if t.kp.Time == infinity { - return - } p := &ping{data: [8]byte{}} timer := time.NewTimer(t.kp.Time) for { select { case <-timer.C: - if a := atomic.SwapUint64(&t.activity, 0); a > 0 { + if a := atomic.SwapUint32(&t.activity, 0); a == 1 { timer.Reset(t.kp.Time) continue } @@ -1175,11 +1174,12 @@ func (t *http2Client) keepalive() { timer.Reset(t.kp.Timeout) select { case <-timer.C: - if a := atomic.SwapUint64(&t.activity, 0); a > 0 { + if a := atomic.SwapUint32(&t.activity, 0); a == 1 { timer.Reset(t.kp.Time) continue } t.Close() + return case <-t.shutdownChan: if !timer.Stop() { <-timer.C From 9d2357661814c2605430d2eb1567970ad5fff86e Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Mon, 6 Mar 2017 13:08:23 -0800 Subject: [PATCH 21/21] post-review update --- keepalive/keepalive.go | 13 ++++++++----- transport/http2_client.go | 8 ++++---- transport/transport_test.go | 4 ++++ 3 files changed, 16 insertions(+), 9 deletions(-) diff --git a/keepalive/keepalive.go b/keepalive/keepalive.go index 19520f41..e3a4068c 100644 --- a/keepalive/keepalive.go +++ b/keepalive/keepalive.go @@ -4,12 +4,15 @@ import ( "time" ) -// ClientParameters is used to set keepalive parameters for the client side. +// ClientParameters is used to set keepalive parameters on the client-side. +// These configure how the client will actively probe to notice when a connection broken +// and to cause activity so intermediaries are aware the connection is still in use. type ClientParameters struct { - // After a duration of this time the client pings the server to see if the transport is still alive. - Time time.Duration - // After having pinged for keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. - Timeout time.Duration + // After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive. + Time time.Duration // The current default value is infinity. + // After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that + // the connection is closed. + Timeout time.Duration // The current default value is 20 seconds. // If true, client runs keepalive checks even with no active RPCs. PermitWithoutStream bool } diff --git a/transport/http2_client.go b/transport/http2_client.go index 95924ae3..ec94e39d 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -82,7 +82,7 @@ type http2Client struct { // goAway is closed to notify the upper layer (i.e., addrConn.transportMonitor) // that the server sent GoAway on this transport. goAway chan struct{} - // awakenKeepalive is used to tell keepalive goroutine to reset keepalive timer. + // awakenKeepalive is used to wake up keepalive when after it has gone dormant. awakenKeepalive chan struct{} framer *framer @@ -231,7 +231,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( kp: kp, statsHandler: opts.StatsHandler, } - // make sure awakenKeepalive can't be written upon. + // Make sure awakenKeepalive can't be written upon. // keepalive routine will make it writable, if need be. t.awakenKeepalive <- struct{}{} if t.statsHandler != nil { @@ -1147,7 +1147,7 @@ func (t *http2Client) keepalive() { for { select { case <-timer.C: - if a := atomic.SwapUint32(&t.activity, 0); a == 1 { + if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { timer.Reset(t.kp.Time) continue } @@ -1174,7 +1174,7 @@ func (t *http2Client) keepalive() { timer.Reset(t.kp.Timeout) select { case <-timer.C: - if a := atomic.SwapUint32(&t.activity, 0); a == 1 { + if atomic.CompareAndSwapUint32(&t.activity, 1, 0) { timer.Reset(t.kp.Time) continue } diff --git a/transport/transport_test.go b/transport/transport_test.go index e28a556c..2c8d3aa7 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -292,6 +292,10 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con }() tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts) if err != nil { + // Server clean-up. + if conn, ok := <-done; ok { + conn.Close() + } t.Fatalf("Failed to dial: %v", err) } return tr