Add a sanity check on the data size
This commit is contained in:
@ -199,19 +199,22 @@ func (f *inFlow) onData(n uint32) error {
|
|||||||
// adjustConnPendingUpdate increments the connection level pending updates by n.
|
// adjustConnPendingUpdate increments the connection level pending updates by n.
|
||||||
// This is called to make the proper connection level window updates when
|
// This is called to make the proper connection level window updates when
|
||||||
// receiving data frame targeting the canceled RPCs.
|
// receiving data frame targeting the canceled RPCs.
|
||||||
func (f *inFlow) adjustConnPendingUpdate(n uint32) uint32 {
|
func (f *inFlow) adjustConnPendingUpdate(n uint32) (uint32, error) {
|
||||||
if n == 0 || f.conn != nil {
|
if n == 0 || f.conn != nil {
|
||||||
return 0
|
return 0, nil
|
||||||
}
|
}
|
||||||
f.mu.Lock()
|
f.mu.Lock()
|
||||||
defer f.mu.Unlock()
|
defer f.mu.Unlock()
|
||||||
|
if f.pendingData+f.pendingUpdate+n > f.limit {
|
||||||
|
return 0, ConnectionErrorf("received %d-bytes data exceeding the limit %d bytes", f.pendingData+f.pendingUpdate+n, f.limit)
|
||||||
|
}
|
||||||
f.pendingUpdate += n
|
f.pendingUpdate += n
|
||||||
if f.pendingUpdate >= f.limit/4 {
|
if f.pendingUpdate >= f.limit/4 {
|
||||||
ret := f.pendingUpdate
|
ret := f.pendingUpdate
|
||||||
f.pendingUpdate = 0
|
f.pendingUpdate = 0
|
||||||
return ret
|
return ret, nil
|
||||||
}
|
}
|
||||||
return 0
|
return 0, nil
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -571,14 +571,19 @@ func (t *http2Client) updateWindow(s *Stream, n uint32) {
|
|||||||
|
|
||||||
func (t *http2Client) handleData(f *http2.DataFrame) {
|
func (t *http2Client) handleData(f *http2.DataFrame) {
|
||||||
// Select the right stream to dispatch.
|
// Select the right stream to dispatch.
|
||||||
|
size := len(f.Data())
|
||||||
s, ok := t.getStream(f)
|
s, ok := t.getStream(f)
|
||||||
if !ok {
|
if !ok {
|
||||||
if cwu := t.fc.adjustConnPendingUpdate(uint32(len(f.Data()))); cwu > 0 {
|
cwu, err := t.fc.adjustConnPendingUpdate(uint32(size))
|
||||||
|
if err != nil {
|
||||||
|
t.notifyError(err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if cwu > 0 {
|
||||||
t.controlBuf.put(&windowUpdate{0, cwu})
|
t.controlBuf.put(&windowUpdate{0, cwu})
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
size := len(f.Data())
|
|
||||||
if size > 0 {
|
if size > 0 {
|
||||||
if err := s.fc.onData(uint32(size)); err != nil {
|
if err := s.fc.onData(uint32(size)); err != nil {
|
||||||
if _, ok := err.(ConnectionError); ok {
|
if _, ok := err.(ConnectionError); ok {
|
||||||
|
@ -318,14 +318,20 @@ func (t *http2Server) updateWindow(s *Stream, n uint32) {
|
|||||||
|
|
||||||
func (t *http2Server) handleData(f *http2.DataFrame) {
|
func (t *http2Server) handleData(f *http2.DataFrame) {
|
||||||
// Select the right stream to dispatch.
|
// Select the right stream to dispatch.
|
||||||
|
size := len(f.Data())
|
||||||
s, ok := t.getStream(f)
|
s, ok := t.getStream(f)
|
||||||
if !ok {
|
if !ok {
|
||||||
if cwu := t.fc.adjustConnPendingUpdate(uint32(len(f.Data()))); cwu > 0 {
|
cwu, err := t.fc.adjustConnPendingUpdate(uint32(size))
|
||||||
|
if err != nil {
|
||||||
|
grpclog.Printf("transport: http2Server %v", err)
|
||||||
|
t.Close()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
if cwu > 0 {
|
||||||
t.controlBuf.put(&windowUpdate{0, cwu})
|
t.controlBuf.put(&windowUpdate{0, cwu})
|
||||||
}
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
size := len(f.Data())
|
|
||||||
if size > 0 {
|
if size > 0 {
|
||||||
if err := s.fc.onData(uint32(size)); err != nil {
|
if err := s.fc.onData(uint32(size)); err != nil {
|
||||||
if _, ok := err.(ConnectionError); ok {
|
if _, ok := err.(ConnectionError); ok {
|
||||||
|
Reference in New Issue
Block a user