@ -164,12 +164,10 @@ type inFlow struct {
|
|||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
// pendingData is the overall data which have been received but not been
|
// pendingData is the overall data which have been received but not been
|
||||||
// fully consumed (either pending for application to read or pending for
|
// consumed by applications.
|
||||||
// window update).
|
|
||||||
pendingData uint32
|
pendingData uint32
|
||||||
// The amount of data the application has consumed but grpc has not sent
|
// The amount of data the application has consumed but grpc has not sent
|
||||||
// window update for them. Used to reduce window update frequency. It is
|
// window update for them. Used to reduce window update frequency.
|
||||||
// always part of pendingData.
|
|
||||||
pendingUpdate uint32
|
pendingUpdate uint32
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -181,8 +179,8 @@ func (f *inFlow) onData(n uint32) error {
|
|||||||
}
|
}
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
defer f.mu.Unlock()
|
defer f.mu.Unlock()
|
||||||
if f.pendingData+n > f.limit {
|
if f.pendingData+f.pendingUpdate+n > f.limit {
|
||||||
return fmt.Errorf("recieved %d-bytes data exceeding the limit %d bytes", f.pendingData+n, f.limit)
|
return fmt.Errorf("recieved %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate+n, f.limit)
|
||||||
}
|
}
|
||||||
if f.conn != nil {
|
if f.conn != nil {
|
||||||
if err := f.conn.onData(n); err != nil {
|
if err := f.conn.onData(n); err != nil {
|
||||||
@ -193,23 +191,45 @@ func (f *inFlow) onData(n uint32) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// onRead is invoked when the application reads the data.
|
// connOnRead updates the connection level states when the application consumes data.
|
||||||
func (f *inFlow) onRead(n uint32) uint32 {
|
func (f *inFlow) connOnRead(n uint32) uint32 {
|
||||||
if n == 0 {
|
if n == 0 || f.conn != nil {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
defer f.mu.Unlock()
|
f.mu.Unlock()
|
||||||
|
f.pendingData -= n
|
||||||
f.pendingUpdate += n
|
f.pendingUpdate += n
|
||||||
if f.pendingUpdate >= f.limit/4 {
|
if f.pendingUpdate >= f.limit/4 {
|
||||||
ret := f.pendingUpdate
|
ret := f.pendingUpdate
|
||||||
f.pendingData -= ret
|
|
||||||
f.pendingUpdate = 0
|
f.pendingUpdate = 0
|
||||||
return ret
|
return ret
|
||||||
}
|
}
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// onRead is invoked when the application reads the data. It returns the window updates
|
||||||
|
// for both stream and connection level.
|
||||||
|
func (f *inFlow) onRead(n uint32) (swu, cwu uint32) {
|
||||||
|
if n == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.mu.Lock()
|
||||||
|
defer f.mu.Unlock()
|
||||||
|
if f.pendingData == 0 {
|
||||||
|
// pendingData has been adjusted by restoreConn.
|
||||||
|
return
|
||||||
|
}
|
||||||
|
f.pendingData -= n
|
||||||
|
f.pendingUpdate += n
|
||||||
|
if f.pendingUpdate >= f.limit/4 {
|
||||||
|
swu = f.pendingUpdate
|
||||||
|
f.pendingUpdate = 0
|
||||||
|
}
|
||||||
|
cwu = f.conn.connOnRead(n)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// restoreConn is invoked when a stream is terminated. It removes its stake in
|
// restoreConn is invoked when a stream is terminated. It removes its stake in
|
||||||
// the connection-level flow and resets its own state.
|
// the connection-level flow and resets its own state.
|
||||||
func (f *inFlow) restoreConn() uint32 {
|
func (f *inFlow) restoreConn() uint32 {
|
||||||
@ -218,14 +238,8 @@ func (f *inFlow) restoreConn() uint32 {
|
|||||||
}
|
}
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
defer f.mu.Unlock()
|
defer f.mu.Unlock()
|
||||||
ret := f.pendingData
|
n := f.pendingData
|
||||||
f.conn.mu.Lock()
|
|
||||||
f.conn.pendingData -= ret
|
|
||||||
if f.conn.pendingUpdate > f.conn.pendingData {
|
|
||||||
f.conn.pendingUpdate = f.conn.pendingData
|
|
||||||
}
|
|
||||||
f.conn.mu.Unlock()
|
|
||||||
f.pendingData = 0
|
f.pendingData = 0
|
||||||
f.pendingUpdate = 0
|
f.pendingUpdate = 0
|
||||||
return ret
|
return f.conn.connOnRead(n)
|
||||||
}
|
}
|
||||||
|
@ -479,12 +479,13 @@ func (t *http2Client) getStream(f http2.Frame) (*Stream, bool) {
|
|||||||
// Window updates will deliver to the controller for sending when
|
// Window updates will deliver to the controller for sending when
|
||||||
// the cumulative quota exceeds the corresponding threshold.
|
// the cumulative quota exceeds the corresponding threshold.
|
||||||
func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
||||||
if q := t.fc.onRead(n); q > 0 {
|
swu, cwu := s.fc.onRead(n)
|
||||||
t.controlBuf.put(&windowUpdate{0, q})
|
if swu > 0 {
|
||||||
}
|
t.controlBuf.put(&windowUpdate{s.id, swu})
|
||||||
if q := s.fc.onRead(n); q > 0 {
|
}
|
||||||
t.controlBuf.put(&windowUpdate{s.id, q})
|
if cwu > 0 {
|
||||||
}
|
t.controlBuf.put(&windowUpdate{0, cwu})
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (t *http2Client) handleData(f *http2.DataFrame) {
|
func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||||
|
@ -305,11 +305,12 @@ func (t *http2Server) getStream(f http2.Frame) (*Stream, bool) {
|
|||||||
// Window updates will deliver to the controller for sending when
|
// Window updates will deliver to the controller for sending when
|
||||||
// the cumulative quota exceeds the corresponding threshold.
|
// the cumulative quota exceeds the corresponding threshold.
|
||||||
func (t *http2Server) updateWindow(s *Stream, n uint32) {
|
func (t *http2Server) updateWindow(s *Stream, n uint32) {
|
||||||
if q := t.fc.onRead(n); q > 0 {
|
swu, cwu := s.fc.onRead(n)
|
||||||
t.controlBuf.put(&windowUpdate{0, q})
|
if swu > 0 {
|
||||||
|
t.controlBuf.put(&windowUpdate{s.id, swu})
|
||||||
}
|
}
|
||||||
if q := s.fc.onRead(n); q > 0 {
|
if cwu > 0 {
|
||||||
t.controlBuf.put(&windowUpdate{s.id, q})
|
t.controlBuf.put(&windowUpdate{0, cwu})
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -197,9 +197,8 @@ type Stream struct {
|
|||||||
// multiple times.
|
// multiple times.
|
||||||
headerDone bool
|
headerDone bool
|
||||||
// the status received from the server.
|
// the status received from the server.
|
||||||
statusCode codes.Code
|
statusCode codes.Code
|
||||||
statusDesc string
|
statusDesc string
|
||||||
pendingData uint32
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Header acquires the key-value pairs of header metadata once it
|
// Header acquires the key-value pairs of header metadata once it
|
||||||
|
@ -489,11 +489,11 @@ func TestServerWithMisbehavedClient(t *testing.T) {
|
|||||||
// Keep sending until the server inbound window is drained for that stream.
|
// Keep sending until the server inbound window is drained for that stream.
|
||||||
for sent <= initialWindowSize {
|
for sent <= initialWindowSize {
|
||||||
<-cc.writableChan
|
<-cc.writableChan
|
||||||
if err = cc.framer.writeData(true, s.id, false, make([]byte, http2MaxFrameLen)); err != nil {
|
if err = cc.framer.writeData(true, s.id, false, make([]byte, 1)); err != nil {
|
||||||
t.Fatalf("Failed to write data: ", err)
|
t.Fatalf("Failed to write data: ", err)
|
||||||
}
|
}
|
||||||
cc.writableChan <- 0
|
cc.writableChan <- 0
|
||||||
sent += http2MaxFrameLen
|
sent += 1
|
||||||
}
|
}
|
||||||
// Server sent a resetStream for s already.
|
// Server sent a resetStream for s already.
|
||||||
code := http2RSTErrConvTab[http2.ErrCodeFlowControl]
|
code := http2RSTErrConvTab[http2.ErrCodeFlowControl]
|
||||||
@ -501,8 +501,8 @@ func TestServerWithMisbehavedClient(t *testing.T) {
|
|||||||
t.Fatalf("%v got err %v with statusCode %d, want err <EOF> with statusCode %d", s, err, s.statusCode, code)
|
t.Fatalf("%v got err %v with statusCode %d, want err <EOF> with statusCode %d", s, err, s.statusCode, code)
|
||||||
}
|
}
|
||||||
|
|
||||||
if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != 0 {
|
if ss.fc.pendingData != 0 || ss.fc.pendingUpdate != 0 || sc.fc.pendingData != 0 || sc.fc.pendingUpdate != initialWindowSize {
|
||||||
t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, 0", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate)
|
t.Fatalf("Server mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, %d", ss.fc.pendingData, ss.fc.pendingUpdate, sc.fc.pendingData, sc.fc.pendingUpdate, initialWindowSize)
|
||||||
}
|
}
|
||||||
ct.CloseStream(s, nil)
|
ct.CloseStream(s, nil)
|
||||||
// Test server behavior for violation of connection flow control window size restriction.
|
// Test server behavior for violation of connection flow control window size restriction.
|
||||||
@ -512,14 +512,11 @@ func TestServerWithMisbehavedClient(t *testing.T) {
|
|||||||
for {
|
for {
|
||||||
s, err := ct.NewStream(context.Background(), callHdr)
|
s, err := ct.NewStream(context.Background(), callHdr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to open stream: %v", err)
|
|
||||||
}
|
|
||||||
<-cc.writableChan
|
|
||||||
// Write will fail when connection flow control window runs out.
|
|
||||||
if err := cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen)); err != nil {
|
|
||||||
// The server tears down the connection.
|
// The server tears down the connection.
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
|
<-cc.writableChan
|
||||||
|
cc.framer.writeData(true, s.id, true, make([]byte, http2MaxFrameLen))
|
||||||
cc.writableChan <- 0
|
cc.writableChan <- 0
|
||||||
}
|
}
|
||||||
ct.Close()
|
ct.Close()
|
||||||
@ -558,8 +555,8 @@ func TestClientWithMisbehavedServer(t *testing.T) {
|
|||||||
t.Fatalf("Got err %v and the status code %d, want <EOF> and the code %d", err, s.statusCode, codes.Internal)
|
t.Fatalf("Got err %v and the status code %d, want <EOF> and the code %d", err, s.statusCode, codes.Internal)
|
||||||
}
|
}
|
||||||
conn.CloseStream(s, err)
|
conn.CloseStream(s, err)
|
||||||
if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate != 0 {
|
if s.fc.pendingData != 0 || s.fc.pendingUpdate != 0 || conn.fc.pendingData != 0 || conn.fc.pendingUpdate != initialWindowSize {
|
||||||
t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, 0", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate)
|
t.Fatalf("Client mistakenly resets inbound flow control params: got %d, %d, %d, %d; want 0, 0, 0, %d", s.fc.pendingData, s.fc.pendingUpdate, conn.fc.pendingData, conn.fc.pendingUpdate, initialWindowSize)
|
||||||
}
|
}
|
||||||
// Test the logic for the violation of the connection flow control window size restriction.
|
// Test the logic for the violation of the connection flow control window size restriction.
|
||||||
//
|
//
|
||||||
|
Reference in New Issue
Block a user