code cleaning

This commit is contained in:
Mahak Mukhi
2017-01-06 16:52:37 -08:00
parent 901cdf6fb5
commit eeb6f5bade
5 changed files with 55 additions and 65 deletions

View File

@ -234,7 +234,7 @@ func WithUserAgent(s string) DialOption {
// WithKeepaliveParams returns a DialOption that specifies a user agent string for all the RPCs. // WithKeepaliveParams returns a DialOption that specifies a user agent string for all the RPCs.
func WithKeepaliveParams(k keepalive.Params) DialOption { func WithKeepaliveParams(k keepalive.Params) DialOption {
return func(o *dialOptions) { return func(o *dialOptions) {
o.copts.KParams = k o.copts.KeepaliveParams = k
} }
} }

View File

@ -13,11 +13,11 @@ type Params struct {
// After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport. // After having pinged fot keepalive check, the client waits for a duration of keepalive_timeout before closing the transport.
Timeout time.Duration Timeout time.Duration
//If true, client runs keepalive checks even with no active RPCs. //If true, client runs keepalive checks even with no active RPCs.
PermitNoStream bool PermitWithoutStream bool
} }
// DefaultKParams contains default values for keepalive parameters // DefaultParams contains default values for keepalive parameters
var DefaultKParams = Params{ var DefaultParams = Params{
Time: time.Duration(math.MaxInt64), // default to infinite Time: time.Duration(math.MaxInt64), // default to infinite
Timeout: time.Duration(20 * time.Second), Timeout: time.Duration(20 * time.Second),
} }

View File

@ -103,7 +103,7 @@ type http2Client struct {
// activity counter // activity counter
activity uint64 // accessed atomically activity uint64 // accessed atomically
// keepalive parameters // keepalive parameters
keepaliveParams keepalive.Params kp keepalive.Params
mu sync.Mutex // guard the following variables mu sync.Mutex // guard the following variables
state transportState // the state of underlying connection state transportState // the state of underlying connection
@ -186,9 +186,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
if opts.UserAgent != "" { if opts.UserAgent != "" {
ua = opts.UserAgent + " " + ua ua = opts.UserAgent + " " + ua
} }
kp := keepalive.DefaultKParams kp := keepalive.DefaultParams
if opts.KParams != (keepalive.Params{}) { if opts.KeepaliveParams != (keepalive.Params{}) {
kp = opts.KParams kp = opts.KeepaliveParams
} }
var buf bytes.Buffer var buf bytes.Buffer
t := &http2Client{ t := &http2Client{
@ -217,7 +217,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
creds: opts.PerRPCCredentials, creds: opts.PerRPCCredentials,
maxStreams: math.MaxInt32, maxStreams: math.MaxInt32,
streamSendQuota: defaultWindowSize, streamSendQuota: defaultWindowSize,
keepaliveParams: kp, kp: kp,
} }
// Start the reader goroutine for incoming message. Each transport has // Start the reader goroutine for incoming message. Each transport has
// a dedicated goroutine which reads HTTP2 frame from network. Then it // a dedicated goroutine which reads HTTP2 frame from network. Then it
@ -844,8 +844,6 @@ func (t *http2Client) handlePing(f *http2.PingFrame) {
pingAck := &ping{ack: true} pingAck := &ping{ack: true}
copy(pingAck.data[:], f.Data[:]) copy(pingAck.data[:], f.Data[:])
t.controlBuf.put(pingAck) t.controlBuf.put(pingAck)
// activity++
atomic.AddUint64(&t.activity, 1)
} }
func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) { func (t *http2Client) handleGoAway(f *http2.GoAwayFrame) {
@ -1070,9 +1068,7 @@ func (t *http2Client) applySettings(ss []http2.Setting) {
// controller running in a separate goroutine takes charge of sending control // controller running in a separate goroutine takes charge of sending control
// frames (e.g., window update, reset stream, setting, etc.) to the server. // frames (e.g., window update, reset stream, setting, etc.) to the server.
func (t *http2Client) controller() { func (t *http2Client) controller() {
// Activity value seen by timer timer := time.NewTimer(t.kp.Time)
ta := atomic.LoadUint64(&t.activity)
timer := time.NewTimer(t.keepaliveParams.Time)
if !keepalive.Enabled() { if !keepalive.Enabled() {
// Prevent the timer from firing, ever. // Prevent the timer from firing, ever.
if !timer.Stop() { if !timer.Stop() {
@ -1115,24 +1111,22 @@ func (t *http2Client) controller() {
t.mu.Lock() t.mu.Lock()
ns := len(t.activeStreams) ns := len(t.activeStreams)
t.mu.Unlock() t.mu.Unlock()
// Global activity value. // Get the activity counter value and reset it.
ga := atomic.LoadUint64(&t.activity) a := atomic.SwapUint64(&t.activity, 0)
if ga > ta || (!t.keepaliveParams.PermitNoStream && ns < 1) { if a > 0 || (!t.kp.PermitWithoutStream && ns < 1) {
timer.Reset(t.keepaliveParams.Time) timer.Reset(t.kp.Time)
isPingSent = false isPingSent = false
} else { } else {
if !isPingSent { if !isPingSent {
// send ping // send ping
t.controlBuf.put(keepalivePing) t.controlBuf.put(keepalivePing)
isPingSent = true isPingSent = true
timer.Reset(t.keepaliveParams.Timeout) timer.Reset(t.kp.Timeout)
} else { } else {
t.Close() t.Close()
continue continue
} }
} }
// Update timer activity counter.
ta = ga
case <-t.shutdownChan: case <-t.shutdownChan:
return return
} }

View File

@ -382,7 +382,7 @@ type ConnectOptions struct {
// TransportCredentials stores the Authenticator required to setup a client connection. // TransportCredentials stores the Authenticator required to setup a client connection.
TransportCredentials credentials.TransportCredentials TransportCredentials credentials.TransportCredentials
// Keepalive parameters // Keepalive parameters
KParams keepalive.Params KeepaliveParams keepalive.Params
} }
// TargetInfo contains the information of the target such as network address and metadata. // TargetInfo contains the information of the target such as network address and metadata.

View File

@ -274,7 +274,7 @@ func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts
return server, ct return server, ct
} }
func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) *http2Client { func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) ClientTransport {
lis, err := net.Listen("tcp", "localhost:0") lis, err := net.Listen("tcp", "localhost:0")
if err != nil { if err != nil {
t.Fatalf("Failed to listen: %v", err) t.Fatalf("Failed to listen: %v", err)
@ -290,30 +290,23 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
} }
done <- conn done <- conn
}() }()
tr, err := newHTTP2Client(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts) tr, err := NewClientTransport(context.Background(), TargetInfo{Addr: lis.Addr().String()}, copts)
if err != nil { if err != nil {
t.Fatalf("Failed to dial: %v", err) t.Fatalf("Failed to dial: %v", err)
} }
cT := tr.(*http2Client) return tr
// Assert client transport is healthy
cT.mu.Lock()
defer cT.mu.Unlock()
if cT.state != reachable {
t.Fatalf("Client transport not healthy")
}
return cT
} }
func TestKeepaliveClientClosesIdleTransport(t *testing.T) { func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
keepalive.Enable() keepalive.Enable()
defer keepalive.Disable() defer keepalive.Disable()
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{
Time: 2 * time.Second, // keepalive time = 2 sec Time: 2 * time.Second, // keepalive time = 2 sec
Timeout: 1 * time.Second, // keepalive timeout = 1 sec Timeout: 1 * time.Second, // keepalive timeout = 1 sec
PermitNoStream: true, // run keepalive even with no RPCs PermitWithoutStream: true, // run keepalive even with no RPCs
}}, done) }}, done)
defer cT.Close() defer tr.Close()
conn, ok := <-done conn, ok := <-done
if !ok { if !ok {
t.Fatalf("Server didn't return connection object") t.Fatalf("Server didn't return connection object")
@ -322,9 +315,10 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
// Sleep for keepalive to close the connection // Sleep for keepalive to close the connection
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
// Assert that the connection was closed // Assert that the connection was closed
cT.mu.Lock() ct := tr.(*http2Client)
defer cT.mu.Unlock() ct.mu.Lock()
if cT.state == reachable { defer ct.mu.Unlock()
if ct.state == reachable {
t.Fatalf("Test Failed: Expected client transport to have closed.") t.Fatalf("Test Failed: Expected client transport to have closed.")
} }
} }
@ -333,12 +327,12 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
keepalive.Enable() keepalive.Enable()
defer keepalive.Disable() defer keepalive.Disable()
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{
Time: 2 * time.Second, // keepalive time = 2 sec Time: 2 * time.Second, // keepalive time = 2 sec
Timeout: 1 * time.Second, // keepalive timeout = 1 sec Timeout: 1 * time.Second, // keepalive timeout = 1 sec
PermitNoStream: false, // don't run keepalive even with no RPCs PermitWithoutStream: false, // don't run keepalive even with no RPCs
}}, done) }}, done)
defer cT.Close() defer tr.Close()
conn, ok := <-done conn, ok := <-done
if !ok { if !ok {
t.Fatalf("server didn't reutrn connection object") t.Fatalf("server didn't reutrn connection object")
@ -347,9 +341,10 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
// Give keepalive some time // Give keepalive some time
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
// Assert that connections is still healthy // Assert that connections is still healthy
cT.mu.Lock() ct := tr.(*http2Client)
defer cT.mu.Unlock() ct.mu.Lock()
if cT.state != reachable { defer ct.mu.Unlock()
if ct.state != reachable {
t.Fatalf("Test failed: Expected client transport to be healthy.") t.Fatalf("Test failed: Expected client transport to be healthy.")
} }
} }
@ -358,28 +353,29 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
keepalive.Enable() keepalive.Enable()
defer keepalive.Disable() defer keepalive.Disable()
done := make(chan net.Conn, 1) done := make(chan net.Conn, 1)
cT := setUpWithNoPingServer(t, ConnectOptions{KParams: keepalive.Params{ tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.Params{
Time: 2 * time.Second, // keepalive time = 2 sec Time: 2 * time.Second, // keepalive time = 2 sec
Timeout: 1 * time.Second, // keepalive timeout = 1 sec Timeout: 1 * time.Second, // keepalive timeout = 1 sec
PermitNoStream: false, // don't run keepalive even with no RPCs PermitWithoutStream: false, // don't run keepalive even with no RPCs
}}, done) }}, done)
defer cT.Close() defer tr.Close()
conn, ok := <-done conn, ok := <-done
if !ok { if !ok {
t.Fatalf("Server didn't return connection object") t.Fatalf("Server didn't return connection object")
} }
defer conn.Close() defer conn.Close()
// create a stream // create a stream
_, err := cT.NewStream(context.Background(), &CallHdr{}) _, err := tr.NewStream(context.Background(), &CallHdr{})
if err != nil { if err != nil {
t.Fatalf("Failed to create a new stream: %v", err) t.Fatalf("Failed to create a new stream: %v", err)
} }
// Give keepalive some time // Give keepalive some time
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
// Asser that transport was closed // Asser that transport was closed
cT.mu.Lock() ct := tr.(*http2Client)
defer cT.mu.Unlock() ct.mu.Lock()
if cT.state == reachable { defer ct.mu.Unlock()
if ct.state == reachable {
t.Fatalf("Test failed: Expected client transport to have closed.") t.Fatalf("Test failed: Expected client transport to have closed.")
} }
} }
@ -387,20 +383,20 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) { func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
keepalive.Enable() keepalive.Enable()
defer keepalive.Disable() defer keepalive.Disable()
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KParams: keepalive.Params{ s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.Params{
Time: 2 * time.Second, // keepalive time = 2 sec Time: 2 * time.Second, // keepalive time = 2 sec
Timeout: 1 * time.Second, // keepalive timeout = 1 sec Timeout: 1 * time.Second, // keepalive timeout = 1 sec
PermitNoStream: true, // don't run keepalive even with no RPCs PermitWithoutStream: true, // don't run keepalive even with no RPCs
}}) }})
defer s.stop() defer s.stop()
defer tr.Close() defer tr.Close()
// Give keep alive some time // Give keep alive some time
time.Sleep(4 * time.Second) time.Sleep(4 * time.Second)
// Assert that transport is healthy // Assert that transport is healthy
cT := tr.(*http2Client) ct := tr.(*http2Client)
cT.mu.Lock() ct.mu.Lock()
defer cT.mu.Unlock() defer ct.mu.Unlock()
if cT.state != reachable { if ct.state != reachable {
t.Fatalf("Test failed: Expected client transport to be healthy.") t.Fatalf("Test failed: Expected client transport to be healthy.")
} }
} }