refactored and rectified test failures

This commit is contained in:
Mahak Mukhi
2016-11-18 13:55:04 -08:00
parent 93111e4c27
commit 058907d033
5 changed files with 37 additions and 21 deletions

View File

@ -232,7 +232,7 @@ func WithUserAgent(s string) DialOption {
} }
// WithKeepaliveParams returns a DialOption that specifies a user agent string for all the RPCs. // WithKeepaliveParams returns a DialOption that specifies a user agent string for all the RPCs.
func WithKeepaliveParams(k keepalive.KeepaliveParams) DialOption { func WithKeepaliveParams(k keepalive.Params) DialOption {
return func(o *dialOptions) { return func(o *dialOptions) {
o.copts.KParams = k o.copts.KParams = k
} }
@ -285,9 +285,6 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
} }
// Set defaults. // Set defaults.
if cc.dopts.copts.KParams == (keepalive.KeepaliveParams{}) {
cc.dopts.copts.KParams = keepalive.DefaultKParams
}
if cc.dopts.codec == nil { if cc.dopts.codec == nil {
cc.dopts.codec = protoCodec{} cc.dopts.codec = protoCodec{}
} }

View File

@ -1,10 +1,12 @@
package keepalive package keepalive
import ( import (
"math"
"sync"
"time" "time"
) )
type KeepaliveParams struct { type Params struct {
// After a duration of this time the client pings the server to see if the transport is still alive. // After a duration of this time the client pings the server to see if the transport is still alive.
Ktime time.Duration Ktime time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
@ -13,10 +15,13 @@ type KeepaliveParams struct {
KNoStream bool KNoStream bool
} }
var DefaultKParams KeepaliveParams = KeepaliveParams{ var DefaultKParams Params = Params{
Ktime: time.Duration(290 * 365 * 24 * 60 * 60 * 1000 * 1000 * 1000), // default to infinite Ktime: time.Duration(math.MaxInt64), // default to infinite
Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds
KNoStream: false, KNoStream: false,
} }
// Mutex to protect Enabled variable
var Mu sync.Mutex = sync.Mutex{}
var Enabled = false var Enabled = false

View File

@ -103,7 +103,7 @@ type http2Client struct {
// activity counter // activity counter
activity *uint64 activity *uint64
// keepalive parameters // keepalive parameters
kParams keepalive.KeepaliveParams keepaliveParams keepalive.Params
mu sync.Mutex // guard the following variables mu sync.Mutex // guard the following variables
state transportState // the state of underlying connection state transportState // the state of underlying connection
@ -186,6 +186,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if opts.UserAgent != "" { if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua ua = opts.UserAgent + " " + ua
} }
kp := keepalive.DefaultKParams
if opts.KParams != (keepalive.Params{}) {
kp = opts.KParams
}
var buf bytes.Buffer var buf bytes.Buffer
t := &http2Client{ t := &http2Client{
target: addr.Addr, target: addr.Addr,
@ -213,7 +217,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
creds: opts.PerRPCCredentials, creds: opts.PerRPCCredentials,
maxStreams: math.MaxInt32, maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize, streamSendQuota: defaultWindowSize,
kParams: opts.KParams, keepaliveParams: kp,
activity: new(uint64), activity: new(uint64),
} }
// Start the reader goroutine for incoming message. Each transport has // Start the reader goroutine for incoming message. Each transport has
@ -1069,15 +1073,17 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
func (t *http2Client) controller() { func (t *http2Client) controller() {
// Activity value seen by timer // Activity value seen by timer
ta := atomic.LoadUint64(t.activity) ta := atomic.LoadUint64(t.activity)
timer := time.NewTimer(t.kParams.Ktime) timer := time.NewTimer(t.keepaliveParams.Ktime)
keepalive.Mu.Lock()
if !keepalive.Enabled { if !keepalive.Enabled {
// Prevent the timer from firing, ever. // Prevent the timer from firing, ever.
if !timer.Stop() { if !timer.Stop() {
<-timer.C <-timer.C
} }
} }
keepalive.Mu.Unlock()
isPingSent := false isPingSent := false
kPing := &ping{data: [8]byte{}} keepalivePing := &ping{data: [8]byte{}}
for { for {
select { select {
case i := <-t.controlBuf.get(): case i := <-t.controlBuf.get():
@ -1114,15 +1120,15 @@ func (t *http2Client) controller() {
t.mu.Unlock() t.mu.Unlock()
// Global activity value. // Global activity value.
ga := atomic.LoadUint64(t.activity) ga := atomic.LoadUint64(t.activity)
if ga > ta || (!t.kParams.KNoStream && ns < 1) { if ga > ta || (!t.keepaliveParams.KNoStream && ns < 1) {
timer.Reset(t.kParams.Ktime) timer.Reset(t.keepaliveParams.Ktime)
isPingSent = false isPingSent = false
} else { } else {
if !isPingSent { if !isPingSent {
// send ping // send ping
t.controlBuf.put(kPing) t.controlBuf.put(keepalivePing)
isPingSent = true isPingSent = true
timer.Reset(t.kParams.Ktimeout) timer.Reset(t.keepaliveParams.Ktimeout)
} else { } else {
t.Close() t.Close()
continue continue

View File

@ -382,7 +382,7 @@ type ConnectOptions struct {
// TransportCredentials stores the Authenticator required to setup a client connection. // TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials TransportCredentials credentials.TransportCredentials
// Keepalive parameters // Keepalive parameters
KParams keepalive.KeepaliveParams KParams keepalive.Params
} }
// TargetInfo contains the information of the target such as network address and metadata. // TargetInfo contains the information of the target such as network address and metadata.

View File

@ -305,9 +305,11 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
} }
func TestKeepaliveClientClosesIdleTransport(t *testing.T) { func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
keepalive.Mu.Lock()
keepalive.Enabled = true keepalive.Enabled = true
keepalive.Mu.Unlock()
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.KeepaliveParams{ cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{
Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec
Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec
KNoStream: true, // run keepalive even with no RPCs KNoStream: true, // run keepalive even with no RPCs
@ -329,9 +331,11 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
} }
func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
keepalive.Mu.Lock()
keepalive.Enabled = true keepalive.Enabled = true
keepalive.Mu.Unlock()
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.KeepaliveParams{ cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{
Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec
Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec
KNoStream: false, // don't run keepalive even with no RPCs KNoStream: false, // don't run keepalive even with no RPCs
@ -353,9 +357,11 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
} }
func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
keepalive.Mu.Lock()
keepalive.Enabled = true keepalive.Enabled = true
keepalive.Mu.Unlock()
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.KeepaliveParams{ cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{
Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec
Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec
KNoStream: false, // don't run keepalive even with no RPCs KNoStream: false, // don't run keepalive even with no RPCs
@ -382,8 +388,10 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
} }
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
keepalive.Mu.Lock()
keepalive.Enabled = true keepalive.Enabled = true
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.KeepaliveParams{ keepalive.Mu.Unlock()
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.Params{
Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec
Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec
KNoStream: true, // don't run keepalive even with no RPCs KNoStream: true, // don't run keepalive even with no RPCs