post-review updates

This commit is contained in:
Mahak Mukhi
2017-03-02 18:00:55 -08:00
parent f22061907e
commit 25f14b7f84
2 changed files with 13 additions and 14 deletions

View File

@ -56,7 +56,6 @@ const (
// The following defines various control items which could flow through // The following defines various control items which could flow through
// the control buffer of transport. They represent different aspects of // the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc. // control tasks, e.g., flow control, settings, streaming resetting, etc.
type windowUpdate struct { type windowUpdate struct {
streamID uint32 streamID uint32
increment uint32 increment uint32

View File

@ -103,10 +103,10 @@ type http2Client struct {
creds []credentials.PerRPCCredentials creds []credentials.PerRPCCredentials
// Counter to keep track of reading activity on transport. // Boolean to keep track of reading activity on transport.
activity uint64 // Accessed atomically. // 1 is true and 0 is false.
activity uint32 // Accessed atomically.
kp keepalive.ClientParameters kp keepalive.ClientParameters
statsHandler stats.Handler statsHandler stats.Handler
@ -278,7 +278,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
} }
} }
go t.controller() go t.controller()
go t.keepalive() if t.kp.Time != infinity {
go t.keepalive()
}
t.writableChan <- 0 t.writableChan <- 0
return t, nil return t, nil
} }
@ -401,7 +403,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
if len(t.activeStreams) == 1 { if len(t.activeStreams) == 1 {
select { select {
case t.awakenKeepalive <- struct{}{}: case t.awakenKeepalive <- struct{}{}:
t.framer.writePing(true, false, [8]byte{}) t.framer.writePing(false, false, [8]byte{})
default: default:
} }
} }
@ -1014,7 +1016,7 @@ func (t *http2Client) reader() {
t.notifyError(err) t.notifyError(err)
return return
} }
atomic.AddUint64(&t.activity, 1) atomic.CompareAndSwapUint32(&t.activity, 0, 1)
sf, ok := frame.(*http2.SettingsFrame) sf, ok := frame.(*http2.SettingsFrame)
if !ok { if !ok {
t.notifyError(err) t.notifyError(err)
@ -1025,7 +1027,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport. // loop to keep reading incoming messages on this transport.
for { for {
frame, err := t.framer.readFrame() frame, err := t.framer.readFrame()
atomic.AddUint64(&t.activity, 1) atomic.CompareAndSwapUint32(&t.activity, 0, 1)
if err != nil { if err != nil {
// Abort an active stream if the http2.Framer returns a // Abort an active stream if the http2.Framer returns a
// http2.StreamError. This can happen only if the server's response // http2.StreamError. This can happen only if the server's response
@ -1140,15 +1142,12 @@ func (t *http2Client) controller() {
// keepalive running in a separate goroutune makes sure the connection is alive by sending pings. // keepalive running in a separate goroutune makes sure the connection is alive by sending pings.
func (t *http2Client) keepalive() { func (t *http2Client) keepalive() {
if t.kp.Time == infinity {
return
}
p := &ping{data: [8]byte{}} p := &ping{data: [8]byte{}}
timer := time.NewTimer(t.kp.Time) timer := time.NewTimer(t.kp.Time)
for { for {
select { select {
case <-timer.C: case <-timer.C:
if a := atomic.SwapUint64(&t.activity, 0); a > 0 { if a := atomic.SwapUint32(&t.activity, 0); a == 1 {
timer.Reset(t.kp.Time) timer.Reset(t.kp.Time)
continue continue
} }
@ -1175,11 +1174,12 @@ func (t *http2Client) keepalive() {
timer.Reset(t.kp.Timeout) timer.Reset(t.kp.Timeout)
select { select {
case <-timer.C: case <-timer.C:
if a := atomic.SwapUint64(&t.activity, 0); a > 0 { if a := atomic.SwapUint32(&t.activity, 0); a == 1 {
timer.Reset(t.kp.Time) timer.Reset(t.kp.Time)
continue continue
} }
t.Close() t.Close()
return
case <-t.shutdownChan: case <-t.shutdownChan:
if !timer.Stop() { if !timer.Stop() {
<-timer.C <-timer.C