post review update

This commit is contained in:
Mahak Mukhi
2017-02-10 16:47:43 -08:00
parent 0bdf059601
commit 336b4ea3cf
5 changed files with 33 additions and 45 deletions

View File

@ -251,7 +251,7 @@ func WithUserAgent(s string) DialOption {
// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport. // WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
func WithKeepaliveParams(k KeepaliveParameters) DialOption { func WithKeepaliveParams(k KeepaliveParameters) DialOption {
kp := &transport.KeepaliveParameters{ kp := transport.KeepaliveParameters{
Time: k.Time, Time: k.Time,
Timeout: k.Timeout, Timeout: k.Timeout,
PermitWithoutStream: k.PermitWithoutStream, PermitWithoutStream: k.PermitWithoutStream,

View File

@ -107,7 +107,7 @@ type http2Client struct {
// keepaliveSkipped = 1 means skipped // keepaliveSkipped = 1 means skipped
keepaliveSkipped uint32 // accessed atomically keepaliveSkipped uint32 // accessed atomically
// keepalive parameters. // keepalive parameters.
kp *KeepaliveParameters kp KeepaliveParameters
statsHandler stats.Handler statsHandler stats.Handler
mu sync.Mutex // guard the following variables mu sync.Mutex // guard the following variables
@ -191,10 +191,13 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if opts.UserAgent != "" { if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua ua = opts.UserAgent + " " + ua
} }
kp := defaultKeepaliveParams kp := opts.KeepaliveParams
if opts.KeepaliveParams != nil { // Validate keepalive parameters.
kp = opts.KeepaliveParams if kp.Time == 0 {
kp.validate() kp.Time = defaultKeepaliveTime
}
if kp.Timeout == 0 {
kp.Timeout = defaultKeepaliveTimeout
} }
var buf bytes.Buffer var buf bytes.Buffer
t := &http2Client{ t := &http2Client{
@ -1092,11 +1095,13 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
// frames (e.g., window update, reset stream, setting, etc.) to the server. // frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Client) controller() { func (t *http2Client) controller() {
timer := time.NewTimer(t.kp.Time) timer := time.NewTimer(t.kp.Time)
timerUsed := true
if t.kp.Time == infinity { if t.kp.Time == infinity {
// Prevent the timer from firing, ever. // Prevent the timer from firing, ever.
if !timer.Stop() { if !timer.Stop() {
<-timer.C <-timer.C
} }
timerUsed = false
} }
isPingSent := false isPingSent := false
keepalivePing := &ping{data: [8]byte{}} keepalivePing := &ping{data: [8]byte{}}
@ -1174,28 +1179,26 @@ func (t *http2Client) controller() {
t.mu.Lock() t.mu.Lock()
ns := len(t.activeStreams) ns := len(t.activeStreams)
if !t.kp.PermitWithoutStream && ns < 1 { if !t.kp.PermitWithoutStream && ns < 1 {
// set flag that signifyies keepalive was skipped // Set flag that signifyies keepalive was skipped.
atomic.StoreUint32(&t.keepaliveSkipped, 1) atomic.StoreUint32(&t.keepaliveSkipped, 1)
t.mu.Unlock() t.mu.Unlock()
timer.Reset(infinity) timer.Reset(infinity)
continue continue
} }
t.mu.Unlock() t.mu.Unlock()
// reset the keepaliveSkipped flag // Reset the keepaliveSkipped flag.
atomic.StoreUint32(&t.keepaliveSkipped, 0) atomic.StoreUint32(&t.keepaliveSkipped, 0)
// Send ping. // Send ping.
t.controlBuf.put(keepalivePing) t.controlBuf.put(keepalivePing)
isPingSent = true isPingSent = true
timer.Reset(t.kp.Timeout) timer.Reset(t.kp.Timeout)
case <-t.shutdownChan: case <-t.shutdownChan:
// stop the keepalive timer if !timerUsed {
return
}
// Stop the keepalive timer.
if !timer.Stop() { if !timer.Stop() {
select { <-timer.C
case <-timer.C:
default:
// In case we stopped the timer before the for loop began.
// This happens when keepalive time provided was infinity.
}
} }
return return
} }

View File

@ -6,29 +6,20 @@ import (
) )
// KeepaliveParameters is used to set keepalive parameters. // KeepaliveParameters is used to set keepalive parameters.
// These configure how the client will actively probe to notice when a connection broken
// and to cause activity so intermediaries are aware the connection is still in use.
type KeepaliveParameters struct { type KeepaliveParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive. // After a duration of this time if the client doesn't see any activity it pings the server to see if the transport is still alive.
Time time.Duration Time time.Duration // The current default value is inifinity.
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. // After having pinged for keepalive check, the client waits for a duration of Timeout and if no activity is seen even after that
Timeout time.Duration // the connection is closed.
Timeout time.Duration // The current default value is 20 seconds.
//If true, client runs keepalive checks even with no active RPCs. //If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool PermitWithoutStream bool
} }
// Validate is used to validate the keepalive parameters.
// Time durations initialized to 0 will be replaced with default Values.
func (p *KeepaliveParameters) validate() {
if p.Time == 0 {
p.Time = infinity
}
if p.Timeout == 0 {
p.Timeout = twentyScnd
}
}
const ( const (
// Infinity is the default value of keepalive time. infinity = time.Duration(math.MaxInt64)
infinity = time.Duration(math.MaxInt64) defaultKeepaliveTime = infinity
// TwentyScnd is the default value of timeout. defaultKeepaliveTimeout = time.Duration(20 * time.Second)
twentyScnd = time.Duration(20 * time.Second)
) )

View File

@ -383,17 +383,11 @@ type ConnectOptions struct {
// TransportCredentials stores the Authenticator required to setup a client connection. // TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters. // KeepaliveParams stores the keepalive parameters.
KeepaliveParams *KeepaliveParameters KeepaliveParams KeepaliveParameters
// StatsHandler stores the handler for stats. // StatsHandler stores the handler for stats.
StatsHandler stats.Handler StatsHandler stats.Handler
} }
// default values for keepalive parameters.
var defaultKeepaliveParams = &KeepaliveParameters{
Time: infinity, // default to infinite.
Timeout: twentyScnd,
}
// TargetInfo contains the information of the target such as network address and metadata. // TargetInfo contains the information of the target such as network address and metadata.
type TargetInfo struct { type TargetInfo struct {
Addr string Addr string

View File

@ -298,7 +298,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
func TestKeepaliveClientClosesIdleTransport(t *testing.T) { func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec. Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs. PermitWithoutStream: true, // Run keepalive even with no RPCs.
@ -322,7 +322,7 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) { func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec. Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
}}, done) }}, done)
@ -345,7 +345,7 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: KeepaliveParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec. Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
}}, done) }}, done)
@ -372,7 +372,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
} }
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: &KeepaliveParameters{ s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: KeepaliveParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec. Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec. Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs. PermitWithoutStream: true, // Run keepalive even with no RPCs.