post review updates

This commit is contained in:
Mahak Mukhi
2017-01-31 16:09:40 -08:00
parent c8983baf3a
commit 0bdf059601
7 changed files with 95 additions and 64 deletions

View File

@ -45,7 +45,6 @@ import (
"golang.org/x/net/trace"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/transport"
)
@ -251,9 +250,14 @@ func WithUserAgent(s string) DialOption {
}
// WithKeepaliveParams returns a DialOption that specifies keepalive paramaters for the client transport.
func WithKeepaliveParams(k *keepalive.Params) DialOption {
func WithKeepaliveParams(k KeepaliveParameters) DialOption {
kp := &transport.KeepaliveParameters{
Time: k.Time,
Timeout: k.Timeout,
PermitWithoutStream: k.PermitWithoutStream,
}
return func(o *dialOptions) {
o.copts.KeepaliveParams = k
o.copts.KeepaliveParams = kp
}
}

15
keepalive.go Normal file
View File

@ -0,0 +1,15 @@
package grpc
import (
"time"
)
// KeepaliveParameters is used to set keepalive parameters.
type KeepaliveParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Timeout time.Duration
//If true, client runs keepalive checks even with no active RPCs.
PermitWithoutStream bool
}

View File

@ -52,9 +52,9 @@ const (
// the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc.
type fireKeepaliveTimer struct{}
type resetKeepaliveTimer struct{}
func (fireKeepaliveTimer) item() {}
func (resetKeepaliveTimer) item() {}
type windowUpdate struct {
streamID uint32

View File

@ -50,7 +50,6 @@ import (
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/peer"
"google.golang.org/grpc/stats"
@ -104,11 +103,11 @@ type http2Client struct {
// Counter to keep track of reading activity on transport.
activity uint64 // accessed atomically.
// Flag to keep track if the keepalive check was skipped because there
// were no active streams and keepalive.PermitWithoutStream was false
// were no active streams and PermitWithoutStream was false
// keepaliveSkipped = 1 means skipped
keepaliveSkipped uint32 // accessed atomically
// keepalive parameters.
kp *keepalive.Params
kp *KeepaliveParameters
statsHandler stats.Handler
mu sync.Mutex // guard the following variables
@ -195,7 +194,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
kp := defaultKeepaliveParams
if opts.KeepaliveParams != nil {
kp = opts.KeepaliveParams
kp.Validate()
kp.validate()
}
var buf bytes.Buffer
t := &http2Client{
@ -392,7 +391,8 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
// if the number of active streams are now equal to 1, then check if keepalive
// was being skipped. If so, fire the keepalive timer
if len(t.activeStreams) == 1 && atomic.LoadUint32(&t.keepaliveSkipped) == 1 {
t.controlBuf.put(fireKeepaliveTimer{})
t.framer.writePing(true, false, [8]byte{})
t.controlBuf.put(resetKeepaliveTimer{})
}
// This stream is not counted when applySetings(...) initialize t.streamsQuota.
@ -1092,7 +1092,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
// frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Client) controller() {
timer := time.NewTimer(t.kp.Time)
if t.kp.Timeout == keepalive.Infinity {
if t.kp.Time == infinity {
// Prevent the timer from firing, ever.
if !timer.Stop() {
<-timer.C
@ -1113,21 +1113,25 @@ func (t *http2Client) controller() {
select {
case controlMsg = <-cchan:
t.controlBuf.load()
// If controlMsg is of type fireKeepaliveTimer,
// If controlMsg is of type resetKeepaliveTimer,
// then check if the keepaliveSkipped flag is still set.
if _, ok := controlMsg.(fireKeepaliveTimer); ok {
if atomic.LoadUint32(&t.keepaliveSkipped) == 1 {
// Reset the timer to 0 so that it fires.
if !timer.Stop() {
<-timer.C
}
timer.Reset(0)
if _, ok := controlMsg.(resetKeepaliveTimer); ok {
atomic.StoreUint32(&t.keepaliveSkipped, 0)
// Reset the timer to timeout.
// Note : This is safe to read, since the
// only codepath that sets the keepaliveSkipped
// flag also resets the timer to infinity.
// Thus, there'll never be a case where we are
// trying to read from an empty timer channel.
isPingSent = true
if !timer.Stop() {
<-timer.C
}
timer.Reset(t.kp.Timeout)
continue
}
wchan = t.writableChan
cchan = nil
continue
case <-wchan:
switch i := controlMsg.(type) {
case *windowUpdate:
@ -1151,36 +1155,48 @@ func (t *http2Client) controller() {
wchan <- 0
wchan = nil
cchan = t.controlBuf.get()
continue
case <-timer.C:
t.mu.Lock()
ns := len(t.activeStreams)
t.mu.Unlock()
if !t.kp.PermitWithoutStream && ns < 1 {
timer.Reset(t.kp.Time)
isPingSent = false
// set flag that signifyies keepalive was skipped
atomic.StoreUint32(&t.keepaliveSkipped, 1)
continue
}
// reset the keepaliveSkipped flag
atomic.StoreUint32(&t.keepaliveSkipped, 0)
// All code paths in this case must reset the timer.
// Get the activity counter value and reset it.
a := atomic.SwapUint64(&t.activity, 0)
if a > 0 {
timer.Reset(t.kp.Time)
atomic.StoreUint32(&t.keepaliveSkipped, 0)
isPingSent = false
timer.Reset(t.kp.Time)
continue
}
if !isPingSent {
// Send ping.
t.controlBuf.put(keepalivePing)
isPingSent = true
timer.Reset(t.kp.Timeout)
if isPingSent {
t.Close()
timer.Reset(infinity)
continue
}
t.Close()
t.mu.Lock()
ns := len(t.activeStreams)
if !t.kp.PermitWithoutStream && ns < 1 {
// set flag that signifyies keepalive was skipped
atomic.StoreUint32(&t.keepaliveSkipped, 1)
t.mu.Unlock()
timer.Reset(infinity)
continue
}
t.mu.Unlock()
// reset the keepaliveSkipped flag
atomic.StoreUint32(&t.keepaliveSkipped, 0)
// Send ping.
t.controlBuf.put(keepalivePing)
isPingSent = true
timer.Reset(t.kp.Timeout)
case <-t.shutdownChan:
// stop the keepalive timer
if !timer.Stop() {
select {
case <-timer.C:
default:
// In case we stopped the timer before the for loop began.
// This happens when keepalive time provided was infinity.
}
}
return
}
}

View File

@ -1,12 +1,12 @@
package keepalive
package transport
import (
"math"
"time"
)
// Params is used to set keepalive parameters.
type Params struct {
// KeepaliveParameters is used to set keepalive parameters.
type KeepaliveParameters struct {
// After a duration of this time the client pings the server to see if the transport is still alive.
Time time.Duration
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
@ -17,18 +17,18 @@ type Params struct {
// Validate is used to validate the keepalive parameters.
// Time durations initialized to 0 will be replaced with default Values.
func (p *Params) Validate() {
func (p *KeepaliveParameters) validate() {
if p.Time == 0 {
p.Time = Infinity
p.Time = infinity
}
if p.Timeout == 0 {
p.Time = TwentyScnd
p.Timeout = twentyScnd
}
}
const (
// Infinity is the default value of keepalive time.
Infinity = time.Duration(math.MaxInt64)
infinity = time.Duration(math.MaxInt64)
// TwentyScnd is the default value of timeout.
TwentyScnd = time.Duration(20 * time.Second)
twentyScnd = time.Duration(20 * time.Second)
)

View File

@ -47,7 +47,6 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/metadata"
"google.golang.org/grpc/stats"
"google.golang.org/grpc/tap"
@ -384,15 +383,15 @@ type ConnectOptions struct {
// TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials
// KeepaliveParams stores the keepalive parameters.
KeepaliveParams *keepalive.Params
KeepaliveParams *KeepaliveParameters
// StatsHandler stores the handler for stats.
StatsHandler stats.Handler
}
// default values for keepalive parameters.
var defaultKeepaliveParams = &keepalive.Params{
Time: keepalive.Infinity, // default to infinite.
Timeout: keepalive.TwentyScnd,
var defaultKeepaliveParams = &KeepaliveParameters{
Time: infinity, // default to infinite.
Timeout: twentyScnd,
}
// TargetInfo contains the information of the target such as network address and metadata.

View File

@ -49,7 +49,6 @@ import (
"golang.org/x/net/http2"
"golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/keepalive"
)
type server struct {
@ -299,7 +298,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &keepalive.Params{
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.
@ -323,10 +322,9 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &keepalive.Params{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: false, // Don't run keepalive even with no RPCs.
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
}}, done)
defer tr.Close()
conn, ok := <-done
@ -347,10 +345,9 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &keepalive.Params{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: false, // Don't run keepalive even with no RPCs.
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: &KeepaliveParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
}}, done)
defer tr.Close()
conn, ok := <-done
@ -375,7 +372,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
}
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: &keepalive.Params{
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: &KeepaliveParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.