From 25f14b7f8493444bbe78286b23ee2c0245133736 Mon Sep 17 00:00:00 2001 From: Mahak Mukhi Date: Thu, 2 Mar 2017 18:00:55 -0800 Subject: [PATCH] post-review updates --- transport/control.go | 1 - transport/http2_client.go | 26 +++++++++++++------------- 2 files changed, 13 insertions(+), 14 deletions(-) diff --git a/transport/control.go b/transport/control.go index 68b9e650..c28c0403 100644 --- a/transport/control.go +++ b/transport/control.go @@ -56,7 +56,6 @@ const ( // The following defines various control items which could flow through // the control buffer of transport. They represent different aspects of // control tasks, e.g., flow control, settings, streaming resetting, etc. - type windowUpdate struct { streamID uint32 increment uint32 diff --git a/transport/http2_client.go b/transport/http2_client.go index 6813f7b5..95924ae3 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -103,10 +103,10 @@ type http2Client struct { creds []credentials.PerRPCCredentials - // Counter to keep track of reading activity on transport. - activity uint64 // Accessed atomically. - - kp keepalive.ClientParameters + // Boolean to keep track of reading activity on transport. + // 1 is true and 0 is false. + activity uint32 // Accessed atomically. + kp keepalive.ClientParameters statsHandler stats.Handler @@ -278,7 +278,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( } } go t.controller() - go t.keepalive() + if t.kp.Time != infinity { + go t.keepalive() + } t.writableChan <- 0 return t, nil } @@ -401,7 +403,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea if len(t.activeStreams) == 1 { select { case t.awakenKeepalive <- struct{}{}: - t.framer.writePing(true, false, [8]byte{}) + t.framer.writePing(false, false, [8]byte{}) default: } } @@ -1014,7 +1016,7 @@ func (t *http2Client) reader() { t.notifyError(err) return } - atomic.AddUint64(&t.activity, 1) + atomic.CompareAndSwapUint32(&t.activity, 0, 1) sf, ok := frame.(*http2.SettingsFrame) if !ok { t.notifyError(err) @@ -1025,7 +1027,7 @@ func (t *http2Client) reader() { // loop to keep reading incoming messages on this transport. for { frame, err := t.framer.readFrame() - atomic.AddUint64(&t.activity, 1) + atomic.CompareAndSwapUint32(&t.activity, 0, 1) if err != nil { // Abort an active stream if the http2.Framer returns a // http2.StreamError. This can happen only if the server's response @@ -1140,15 +1142,12 @@ func (t *http2Client) controller() { // keepalive running in a separate goroutune makes sure the connection is alive by sending pings. func (t *http2Client) keepalive() { - if t.kp.Time == infinity { - return - } p := &ping{data: [8]byte{}} timer := time.NewTimer(t.kp.Time) for { select { case <-timer.C: - if a := atomic.SwapUint64(&t.activity, 0); a > 0 { + if a := atomic.SwapUint32(&t.activity, 0); a == 1 { timer.Reset(t.kp.Time) continue } @@ -1175,11 +1174,12 @@ func (t *http2Client) keepalive() { timer.Reset(t.kp.Timeout) select { case <-timer.C: - if a := atomic.SwapUint64(&t.activity, 0); a > 0 { + if a := atomic.SwapUint32(&t.activity, 0); a == 1 { timer.Reset(t.kp.Time) continue } t.Close() + return case <-t.shutdownChan: if !timer.Stop() { <-timer.C