From b5774fd760d0ecea631fabc49eb3b94b3324df92 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Sun, 12 Apr 2015 15:50:48 -0700 Subject: [PATCH] some additonal fix --- transport/control.go | 40 ++++++++++++++++++++++++++++----------- transport/http2_client.go | 13 +++++++------ transport/http2_server.go | 9 +++++---- 3 files changed, 41 insertions(+), 21 deletions(-) diff --git a/transport/control.go b/transport/control.go index 5860458e..4319ecaf 100644 --- a/transport/control.go +++ b/transport/control.go @@ -191,17 +191,13 @@ func (f *inFlow) onData(n uint32) error { return nil } -// onRead is invoked when the application reads the data. -func (f *inFlow) onRead(n uint32) uint32 { - if n == 0 { - return 0 - } +// connOnRead updates the connection level states when the application consumes data. +func (f *inFlow) connOnRead(n uint32) uint32 { + if n == 0 || f.conn != nil { + return 0 + } f.mu.Lock() - defer f.mu.Unlock() - if f.pendingData == 0 { - // pendingData has been adjusted by restoreConn. - return 0 - } + f.mu.Unlock() f.pendingData -= n f.pendingUpdate += n if f.pendingUpdate >= f.limit/4 { @@ -212,6 +208,28 @@ func (f *inFlow) onRead(n uint32) uint32 { return 0 } +// onRead is invoked when the application reads the data. It returns the window updates +// for both stream and connection level. +func (f *inFlow) onRead(n uint32) (swu, cwu uint32) { + if n == 0 { + return + } + f.mu.Lock() + defer f.mu.Unlock() + if f.pendingData == 0 { + // pendingData has been adjusted by restoreConn. + return + } + f.pendingData -= n + f.pendingUpdate += n + if f.pendingUpdate >= f.limit/4 { + swu = f.pendingUpdate + f.pendingUpdate = 0 + } + cwu = f.conn.connOnRead(n) + return +} + // restoreConn is invoked when a stream is terminated. It removes its stake in // the connection-level flow and resets its own state. func (f *inFlow) restoreConn() uint32 { @@ -223,5 +241,5 @@ func (f *inFlow) restoreConn() uint32 { n := f.pendingData f.pendingData = 0 f.pendingUpdate = 0 - return f.conn.onRead(n) + return f.conn.connOnRead(n) } diff --git a/transport/http2_client.go b/transport/http2_client.go index 4803390d..d16f8dd8 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -479,12 +479,13 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) { // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. func (t *http2Client) updateWindow(s *Stream, n uint32) { - if q := s.fc.onRead(n); q > 0 { - t.controlBuf.put(&windowUpdate{s.id, q}) - } - if q := t.fc.onRead(n); q > 0 { - t.controlBuf.put(&windowUpdate{0, q}) - } + swu, cwu := s.fc.onRead(n) + if swu > 0 { + t.controlBuf.put(&windowUpdate{s.id, swu}) + } + if cwu > 0 { + t.controlBuf.put(&windowUpdate{0, cwu}) + } } func (t *http2Client) handleData(f *http2.DataFrame) { diff --git a/transport/http2_server.go b/transport/http2_server.go index 21e487a0..a1d606f7 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -305,11 +305,12 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) { // Window updates will deliver to the controller for sending when // the cumulative quota exceeds the corresponding threshold. func (t *http2Server) updateWindow(s *Stream, n uint32) { - if q := s.fc.onRead(n); q > 0 { - t.controlBuf.put(&windowUpdate{s.id, q}) + swu, cwu := s.fc.onRead(n) + if swu > 0 { + t.controlBuf.put(&windowUpdate{s.id, swu}) } - if q := t.fc.onRead(n); q > 0 { - t.controlBuf.put(&windowUpdate{0, q}) + if cwu > 0 { + t.controlBuf.put(&windowUpdate{0, cwu}) } }