Piggyback window updates for connection with those of a stream. (#1273)

This commit is contained in:
MakMukhi
2017-06-05 10:25:41 -07:00
committed by GitHub
parent 6fecf2831a
commit 8de2dff78c
3 changed files with 32 additions and 10 deletions

View File

@ -68,6 +68,7 @@ const (
type windowUpdate struct { type windowUpdate struct {
streamID uint32 streamID uint32
increment uint32 increment uint32
flush bool
} }
func (*windowUpdate) item() {} func (*windowUpdate) item() {}
@ -240,3 +241,11 @@ func (f *inFlow) onRead(n uint32) uint32 {
} }
return 0 return 0
} }
func (f *inFlow) resetPendingUpdate() uint32 {
f.mu.Lock()
defer f.mu.Unlock()
n := f.pendingUpdate
f.pendingUpdate = 0
return n
}

View File

@ -813,7 +813,11 @@ func (t *http2Client) adjustWindow(s *Stream, n uint32) {
return return
} }
if w := s.fc.maybeAdjust(n); w > 0 { if w := s.fc.maybeAdjust(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w}) // Piggyback conneciton's window update along.
if cw := t.fc.resetPendingUpdate(); cw > 0 {
t.controlBuf.put(&windowUpdate{0, cw, false})
}
t.controlBuf.put(&windowUpdate{s.id, w, true})
} }
} }
@ -827,7 +831,10 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
return return
} }
if w := s.fc.onRead(n); w > 0 { if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w}) if cw := t.fc.resetPendingUpdate(); cw > 0 {
t.controlBuf.put(&windowUpdate{0, cw, false})
}
t.controlBuf.put(&windowUpdate{s.id, w, true})
} }
} }
@ -846,7 +853,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// active(fast) streams from starving in presence of slow or // active(fast) streams from starving in presence of slow or
// inactive streams. // inactive streams.
if w := t.fc.onRead(uint32(size)); w > 0 { if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w}) t.controlBuf.put(&windowUpdate{0, w, true})
} }
// Select the right stream to dispatch. // Select the right stream to dispatch.
s, ok := t.getStream(f) s, ok := t.getStream(f)
@ -869,7 +876,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
} }
if f.Header().Flags.Has(http2.FlagDataPadded) { if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w}) t.controlBuf.put(&windowUpdate{s.id, w, true})
} }
} }
s.mu.Unlock() s.mu.Unlock()
@ -1185,7 +1192,7 @@ func (t *http2Client) controller() {
case <-t.writableChan: case <-t.writableChan:
switch i := i.(type) { switch i := i.(type) {
case *windowUpdate: case *windowUpdate:
t.framer.writeWindowUpdate(true, i.streamID, i.increment) t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
case *settings: case *settings:
if i.ack { if i.ack {
t.framer.writeSettingsAck(true) t.framer.writeSettingsAck(true)

View File

@ -449,7 +449,10 @@ func (t *http2Server) adjustWindow(s *Stream, n uint32) {
return return
} }
if w := s.fc.maybeAdjust(n); w > 0 { if w := s.fc.maybeAdjust(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w}) if cw := t.fc.resetPendingUpdate(); cw > 0 {
t.controlBuf.put(&windowUpdate{0, cw, false})
}
t.controlBuf.put(&windowUpdate{s.id, w, true})
} }
} }
@ -463,7 +466,10 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
return return
} }
if w := s.fc.onRead(n); w > 0 { if w := s.fc.onRead(n); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w}) if cw := t.fc.resetPendingUpdate(); cw > 0 {
t.controlBuf.put(&windowUpdate{0, cw, false})
}
t.controlBuf.put(&windowUpdate{s.id, w, true})
} }
} }
@ -483,7 +489,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
// active(fast) streams from starving in presence of slow or // active(fast) streams from starving in presence of slow or
// inactive streams. // inactive streams.
if w := t.fc.onRead(uint32(size)); w > 0 { if w := t.fc.onRead(uint32(size)); w > 0 {
t.controlBuf.put(&windowUpdate{0, w}) t.controlBuf.put(&windowUpdate{0, w, true})
} }
// Select the right stream to dispatch. // Select the right stream to dispatch.
s, ok := t.getStream(f) s, ok := t.getStream(f)
@ -504,7 +510,7 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
} }
if f.Header().Flags.Has(http2.FlagDataPadded) { if f.Header().Flags.Has(http2.FlagDataPadded) {
if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 { if w := s.fc.onRead(uint32(size) - uint32(len(f.Data()))); w > 0 {
t.controlBuf.put(&windowUpdate{s.id, w}) t.controlBuf.put(&windowUpdate{s.id, w, true})
} }
} }
s.mu.Unlock() s.mu.Unlock()
@ -979,7 +985,7 @@ func (t *http2Server) controller() {
case <-t.writableChan: case <-t.writableChan:
switch i := i.(type) { switch i := i.(type) {
case *windowUpdate: case *windowUpdate:
t.framer.writeWindowUpdate(true, i.streamID, i.increment) t.framer.writeWindowUpdate(i.flush, i.streamID, i.increment)
case *settings: case *settings:
if i.ack { if i.ack {
t.framer.writeSettingsAck(true) t.framer.writeSettingsAck(true)