some additonal fix

This commit is contained in:
iamqizhao
2015-04-12 15:50:48 -07:00
parent 43ba5d8e47
commit b5774fd760
3 changed files with 41 additions and 21 deletions

View File

@ -191,17 +191,13 @@ func (f *inFlow) onData(n uint32) error {
return nil return nil
} }
// onRead is invoked when the application reads the data. // connOnRead updates the connection level states when the application consumes data.
func (f *inFlow) onRead(n uint32) uint32 { func (f *inFlow) connOnRead(n uint32) uint32 {
if n == 0 { if n == 0 || f.conn != nil {
return 0 return 0
} }
f.mu.Lock() f.mu.Lock()
defer f.mu.Unlock() f.mu.Unlock()
if f.pendingData == 0 {
// pendingData has been adjusted by restoreConn.
return 0
}
f.pendingData -= n f.pendingData -= n
f.pendingUpdate += n f.pendingUpdate += n
if f.pendingUpdate >= f.limit/4 { if f.pendingUpdate >= f.limit/4 {
@ -212,6 +208,28 @@ func (f *inFlow) onRead(n uint32) uint32 {
return 0 return 0
} }
// onRead is invoked when the application reads the data. It returns the window updates
// for both stream and connection level.
func (f *inFlow) onRead(n uint32) (swu, cwu uint32) {
if n == 0 {
return
}
f.mu.Lock()
defer f.mu.Unlock()
if f.pendingData == 0 {
// pendingData has been adjusted by restoreConn.
return
}
f.pendingData -= n
f.pendingUpdate += n
if f.pendingUpdate >= f.limit/4 {
swu = f.pendingUpdate
f.pendingUpdate = 0
}
cwu = f.conn.connOnRead(n)
return
}
// restoreConn is invoked when a stream is terminated. It removes its stake in // restoreConn is invoked when a stream is terminated. It removes its stake in
// the connection-level flow and resets its own state. // the connection-level flow and resets its own state.
func (f *inFlow) restoreConn() uint32 { func (f *inFlow) restoreConn() uint32 {
@ -223,5 +241,5 @@ func (f *inFlow) restoreConn() uint32 {
n := f.pendingData n := f.pendingData
f.pendingData = 0 f.pendingData = 0
f.pendingUpdate = 0 f.pendingUpdate = 0
return f.conn.onRead(n) return f.conn.connOnRead(n)
} }

View File

@ -479,12 +479,13 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
// Window updates will deliver to the controller for sending when // Window updates will deliver to the controller for sending when
// the cumulative quota exceeds the corresponding threshold. // the cumulative quota exceeds the corresponding threshold.
func (t *http2Client) updateWindow(s *Stream, n uint32) { func (t *http2Client) updateWindow(s *Stream, n uint32) {
if q := s.fc.onRead(n); q > 0 { swu, cwu := s.fc.onRead(n)
t.controlBuf.put(&windowUpdate{s.id, q}) if swu > 0 {
} t.controlBuf.put(&windowUpdate{s.id, swu})
if q := t.fc.onRead(n); q > 0 { }
t.controlBuf.put(&windowUpdate{0, q}) if cwu > 0 {
} t.controlBuf.put(&windowUpdate{0, cwu})
}
} }
func (t *http2Client) handleData(f *http2.DataFrame) { func (t *http2Client) handleData(f *http2.DataFrame) {

View File

@ -305,11 +305,12 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
// Window updates will deliver to the controller for sending when // Window updates will deliver to the controller for sending when
// the cumulative quota exceeds the corresponding threshold. // the cumulative quota exceeds the corresponding threshold.
func (t *http2Server) updateWindow(s *Stream, n uint32) { func (t *http2Server) updateWindow(s *Stream, n uint32) {
if q := s.fc.onRead(n); q > 0 { swu, cwu := s.fc.onRead(n)
t.controlBuf.put(&windowUpdate{s.id, q}) if swu > 0 {
t.controlBuf.put(&windowUpdate{s.id, swu})
} }
if q := t.fc.onRead(n); q > 0 { if cwu > 0 {
t.controlBuf.put(&windowUpdate{0, q}) t.controlBuf.put(&windowUpdate{0, cwu})
} }
} }