diff --git a/clientconn.go b/clientconn.go index e4b699de..977987e6 100644 --- a/clientconn.go +++ b/clientconn.go @@ -251,7 +251,7 @@ func WithUserAgent(s string) DialOption { // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. func WithKeepaliveParams(k KeepaliveParameters) DialOption { - kp := &transport.KeepaliveParameters{ + kp := transport.KeepaliveParameters{ Time: k.Time, Timeout: k.Timeout, PermitWithoutStream: k.PermitWithoutStream, diff --git a/transport/http2_client.go b/transport/http2_client.go index 5adb1124..42086bc7 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -107,7 +107,7 @@ type http2Client struct { // keepaliveSkipped = 1 means skipped keepaliveSkipped uint32 // accessed atomically // keepalive parameters. - kp *KeepaliveParameters + kp KeepaliveParameters statsHandler stats.Handler mu sync.Mutex // guard the following variables @@ -191,10 +191,13 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( if opts.UserAgent != "" { ua = opts.UserAgent + " " + ua } - kp := defaultKeepaliveParams - if opts.KeepaliveParams != nil { - kp = opts.KeepaliveParams - kp.validate() + kp := opts.KeepaliveParams + // Validate keepalive parameters. + if kp.Time == 0 { + kp.Time = defaultKeepaliveTime + } + if kp.Timeout == 0 { + kp.Timeout = defaultKeepaliveTimeout } var buf bytes.Buffer t := &http2Client{ @@ -1092,11 +1095,13 @@ func (t *http2Client) applySettings(ss []http2.Setting) { // frames (e.g., window update, reset stream, setting, etc.) to the server. func (t *http2Client) controller() { timer := time.NewTimer(t.kp.Time) + timerUsed := true if t.kp.Time == infinity { // Prevent the timer from firing, ever. if !timer.Stop() { <-timer.C } + timerUsed = false } isPingSent := false keepalivePing := &ping{data: [8]byte{}} @@ -1174,28 +1179,26 @@ func (t *http2Client) controller() { t.mu.Lock() ns := len(t.activeStreams) if !t.kp.PermitWithoutStream && ns < 1 { - // set flag that signifyies keepalive was skipped + // Set flag that signifyies keepalive was skipped. atomic.StoreUint32(&t.keepaliveSkipped, 1) t.mu.Unlock() timer.Reset(infinity) continue } t.mu.Unlock() - // reset the keepaliveSkipped flag + // Reset the keepaliveSkipped flag. atomic.StoreUint32(&t.keepaliveSkipped, 0) // Send ping. t.controlBuf.put(keepalivePing) isPingSent = true timer.Reset(t.kp.Timeout) case <-t.shutdownChan: - // stop the keepalive timer + if !timerUsed { + return + } + // Stop the keepalive timer. if !timer.Stop() { - select { - case <-timer.C: - default: - // In case we stopped the timer before the for loop began. - // This happens when keepalive time provided was infinity. - } + <-timer.C } return } diff --git a/transport/keepalive.go b/transport/keepalive.go index 92049feb..035f5b55 100644 --- a/transport/keepalive.go +++ b/transport/keepalive.go @@ -6,29 +6,20 @@ import ( ) // KeepaliveParameters is used to set keepalive parameters. +// These configure how the client will actively probe to notice when a connection broken +// and to cause activity so intermediaries are aware the connection is still in use. type KeepaliveParameters struct { - // After a duration of this time the client pings the server to see if the transport is still alive. - Time time.Duration - // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. - Timeout time.Duration + // After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive. + Time time.Duration // The current default value is inifinity. + // After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that + // the connection is closed. + Timeout time.Duration // The current default value is 20 seconds. //If true, client runs keepalive checks even with no active RPCs. PermitWithoutStream bool } -// Validate is used to validate the keepalive parameters. -// Time durations initialized to 0 will be replaced with default Values. -func (p *KeepaliveParameters) validate() { - if p.Time == 0 { - p.Time = infinity - } - if p.Timeout == 0 { - p.Timeout = twentyScnd - } -} - const ( - // Infinity is the default value of keepalive time. - infinity = time.Duration(math.MaxInt64) - // TwentyScnd is the default value of timeout. - twentyScnd = time.Duration(20 * time.Second) + infinity = time.Duration(math.MaxInt64) + defaultKeepaliveTime = infinity + defaultKeepaliveTimeout = time.Duration(20 * time.Second) ) diff --git a/transport/transport.go b/transport/transport.go index 343fbe0c..0f18d05d 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -383,17 +383,11 @@ type ConnectOptions struct { // TransportCredentials stores the Authenticator required to setup a client connection. TransportCredentials credentials.TransportCredentials // KeepaliveParams stores the keepalive parameters. - KeepaliveParams *KeepaliveParameters + KeepaliveParams KeepaliveParameters // StatsHandler stores the handler for stats. StatsHandler stats.Handler } -// default values for keepalive parameters. -var defaultKeepaliveParams = &KeepaliveParameters{ - Time: infinity, // default to infinite. - Timeout: twentyScnd, -} - // TargetInfo contains the information of the target such as network address and metadata. type TargetInfo struct { Addr string diff --git a/transport/transport_test.go b/transport/transport_test.go index 485f0577..37138a7a 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -298,7 +298,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con func TestKeepaliveClientClosesIdleTransport(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs. @@ -322,7 +322,7 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) { func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. }}, done) @@ -345,7 +345,7 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { done := make(chan net.Conn, 1) - tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ + tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. }}, done) @@ -372,7 +372,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { - s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ + s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: KeepaliveParameters{ Time: 2 * time.Second, // Keepalive time = 2 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. PermitWithoutStream: true, // Run keepalive even with no RPCs.