Post review updates

This commit is contained in:
Mahak Mukhi
2016-11-21 14:18:57 -08:00
parent 97fb58a27f
commit 901cdf6fb5
3 changed files with 61 additions and 48 deletions

View File

@ -9,22 +9,42 @@ import (
// Params is used to set keepalive parameters. // Params is used to set keepalive parameters.
type Params struct { type Params struct {
// After a duration of this time the client pings the server to see if the transport is still alive. // After a duration of this time the client pings the server to see if the transport is still alive.
Ktime time.Duration Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Ktimeout time.Duration Timeout time.Duration
//If true, client runs keepalive checks even with no active RPCs. //If true, client runs keepalive checks even with no active RPCs.
KNoStream bool PermitNoStream bool
} }
// DefaultKParams contains default values for keepalive parameters // DefaultKParams contains default values for keepalive parameters
var DefaultKParams = Params{ var DefaultKParams = Params{
Ktime: time.Duration(math.MaxInt64), // default to infinite Time: time.Duration(math.MaxInt64), // default to infinite
Ktimeout: time.Duration(20 * 1000 * 1000 * 1000), // default to 20 seconds Timeout: time.Duration(20 * time.Second),
KNoStream: false,
} }
// Mu is a mutex to protect Enabled variable // mu is a mutex to protect Enabled variable
var Mu = sync.Mutex{} var mu = sync.Mutex{}
// Enabled is a knob used to turn keepalive on or off // enable is a knob used to turn keepalive on or off
var Enabled = false var enable = false
// Enabled exposes the value of enable variable
func Enabled() bool {
mu.Lock()
defer mu.Unlock()
return enable
}
// Enable can be called to enable keepalives
func Enable() {
mu.Lock()
defer mu.Unlock()
enable = true
}
// Disable can be called to disable keepalive
func Disable() {
mu.Lock()
defer mu.Unlock()
enable = false
}

View File

@ -101,7 +101,7 @@ type http2Client struct {
creds []credentials.PerRPCCredentials creds []credentials.PerRPCCredentials
// activity counter // activity counter
activity *uint64 activity uint64 // accessed atomically
// keepalive parameters // keepalive parameters
keepaliveParams keepalive.Params keepaliveParams keepalive.Params
@ -218,7 +218,6 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
maxStreams: math.MaxInt32, maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize, streamSendQuota: defaultWindowSize,
keepaliveParams: kp, keepaliveParams: kp,
activity: new(uint64),
} }
// Start the reader goroutine for incoming message. Each transport has // Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it // a dedicated goroutine which reads HTTP2 frame from network. Then it
@ -704,7 +703,7 @@ func (t *http2Client) Write(s *Stream, data []byte, opts *Options) error {
} }
} }
// activity++ // activity++
atomic.AddUint64(t.activity, 1) atomic.AddUint64(&t.activity, 1)
if !opts.Last { if !opts.Last {
return nil return nil
} }
@ -846,7 +845,7 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
copy(pingAck.data[:], f.Data[:]) copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck) t.controlBuf.put(pingAck)
// activity++ // activity++
atomic.AddUint64(t.activity, 1) atomic.AddUint64(&t.activity, 1)
} }
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
@ -994,7 +993,7 @@ func (t *http2Client) reader() {
for { for {
frame, err := t.framer.readFrame() frame, err := t.framer.readFrame()
// activity++ // activity++
atomic.AddUint64(t.activity, 1) atomic.AddUint64(&t.activity, 1)
if err != nil { if err != nil {
// Abort an active stream if the http2.Framer returns a // Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response // http2.StreamError. This can happen only if the server's response
@ -1072,16 +1071,14 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
// frames (e.g., window update, reset stream, setting, etc.) to the server. // frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Client) controller() { func (t *http2Client) controller() {
// Activity value seen by timer // Activity value seen by timer
ta := atomic.LoadUint64(t.activity) ta := atomic.LoadUint64(&t.activity)
timer := time.NewTimer(t.keepaliveParams.Ktime) timer := time.NewTimer(t.keepaliveParams.Time)
keepalive.Mu.Lock() if !keepalive.Enabled() {
if !keepalive.Enabled {
// Prevent the timer from firing, ever. // Prevent the timer from firing, ever.
if !timer.Stop() { if !timer.Stop() {
<-timer.C <-timer.C
} }
} }
keepalive.Mu.Unlock()
isPingSent := false isPingSent := false
keepalivePing := &ping{data: [8]byte{}} keepalivePing := &ping{data: [8]byte{}}
for { for {
@ -1119,16 +1116,16 @@ func (t *http2Client) controller() {
ns := len(t.activeStreams) ns := len(t.activeStreams)
t.mu.Unlock() t.mu.Unlock()
// Global activity value. // Global activity value.
ga := atomic.LoadUint64(t.activity) ga := atomic.LoadUint64(&t.activity)
if ga > ta || (!t.keepaliveParams.KNoStream && ns < 1) { if ga > ta || (!t.keepaliveParams.PermitNoStream && ns < 1) {
timer.Reset(t.keepaliveParams.Ktime) timer.Reset(t.keepaliveParams.Time)
isPingSent = false isPingSent = false
} else { } else {
if !isPingSent { if !isPingSent {
// send ping // send ping
t.controlBuf.put(keepalivePing) t.controlBuf.put(keepalivePing)
isPingSent = true isPingSent = true
timer.Reset(t.keepaliveParams.Ktimeout) timer.Reset(t.keepaliveParams.Timeout)
} else { } else {
t.Close() t.Close()
continue continue

View File

@ -305,14 +305,13 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
} }
func TestKeepaliveClientClosesIdleTransport(t *testing.T) { func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
keepalive.Mu.Lock() keepalive.Enable()
keepalive.Enabled = true defer keepalive.Disable()
keepalive.Mu.Unlock()
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{
Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Time: 2 * time.Second, // keepalive time = 2 sec
Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec Timeout: 1 * time.Second, // keepalive timeout = 1 sec
KNoStream: true, // run keepalive even with no RPCs PermitNoStream: true, // run keepalive even with no RPCs
}}, done) }}, done)
defer cT.Close() defer cT.Close()
conn, ok := <-done conn, ok := <-done
@ -331,14 +330,13 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
} }
func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
keepalive.Mu.Lock() keepalive.Enable()
keepalive.Enabled = true defer keepalive.Disable()
keepalive.Mu.Unlock()
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{
Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Time: 2 * time.Second, // keepalive time = 2 sec
Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec Timeout: 1 * time.Second, // keepalive timeout = 1 sec
KNoStream: false, // don't run keepalive even with no RPCs PermitNoStream: false, // don't run keepalive even with no RPCs
}}, done) }}, done)
defer cT.Close() defer cT.Close()
conn, ok := <-done conn, ok := <-done
@ -357,14 +355,13 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
} }
func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
keepalive.Mu.Lock() keepalive.Enable()
keepalive.Enabled = true defer keepalive.Disable()
keepalive.Mu.Unlock()
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{
Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Time: 2 * time.Second, // keepalive time = 2 sec
Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec Timeout: 1 * time.Second, // keepalive timeout = 1 sec
KNoStream: false, // don't run keepalive even with no RPCs PermitNoStream: false, // don't run keepalive even with no RPCs
}}, done) }}, done)
defer cT.Close() defer cT.Close()
conn, ok := <-done conn, ok := <-done
@ -388,13 +385,12 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
} }
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
keepalive.Mu.Lock() keepalive.Enable()
keepalive.Enabled = true defer keepalive.Disable()
keepalive.Mu.Unlock()
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.Params{ s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.Params{
Ktime: 2 * 1000 * 1000 * 1000, // keepalive time = 2 sec Time: 2 * time.Second, // keepalive time = 2 sec
Ktimeout: 1 * 1000 * 1000 * 1000, // keepalive timeout = 1 sec Timeout: 1 * time.Second, // keepalive timeout = 1 sec
KNoStream: true, // don't run keepalive even with no RPCs PermitNoStream: true, // don't run keepalive even with no RPCs
}}) }})
defer s.stop() defer s.stop()
defer tr.Close() defer tr.Close()