Merge pull request #273 from iamqizhao/master

handle empty data frame
This commit is contained in:
Qi Zhao
2015-07-30 13:04:36 -07:00
2 changed files with 37 additions and 34 deletions

View File

@ -529,31 +529,32 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
return return
} }
size := len(f.Data()) size := len(f.Data())
if err := s.fc.onData(uint32(size)); err != nil { if size > 0 {
if _, ok := err.(ConnectionError); ok { if err := s.fc.onData(uint32(size)); err != nil {
t.notifyError(err) if _, ok := err.(ConnectionError); ok {
return t.notifyError(err)
} return
s.mu.Lock() }
if s.state == streamDone { s.mu.Lock()
if s.state == streamDone {
s.mu.Unlock()
return
}
s.state = streamDone
s.statusCode = codes.Internal
s.statusDesc = err.Error()
s.mu.Unlock() s.mu.Unlock()
s.write(recvMsg{err: io.EOF})
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return return
} }
s.state = streamDone // TODO(bradfitz, zhaoq): A copy is required here because there is no
s.statusCode = codes.Internal // guarantee f.Data() is consumed before the arrival of next frame.
s.statusDesc = err.Error() // Can this copy be eliminated?
s.mu.Unlock() data := make([]byte, size)
s.write(recvMsg{err: io.EOF}) copy(data, f.Data())
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) s.write(recvMsg{data: data})
return
} }
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
// The server has closed the stream without sending trailers. Record that // The server has closed the stream without sending trailers. Record that
// the read direction is closed, and set the status appropriately. // the read direction is closed, and set the status appropriately.
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {

View File

@ -322,22 +322,24 @@ func (t *http2Server) handleData(f *http2.DataFrame) {
return return
} }
size := len(f.Data()) size := len(f.Data())
if err := s.fc.onData(uint32(size)); err != nil { if size > 0 {
if _, ok := err.(ConnectionError); ok { if err := s.fc.onData(uint32(size)); err != nil {
grpclog.Printf("transport: http2Server %v", err) if _, ok := err.(ConnectionError); ok {
t.Close() grpclog.Printf("transport: http2Server %v", err)
t.Close()
return
}
t.closeStream(s)
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl})
return return
} }
t.closeStream(s) // TODO(bradfitz, zhaoq): A copy is required here because there is no
t.controlBuf.put(&resetStream{s.id, http2.ErrCodeFlowControl}) // guarantee f.Data() is consumed before the arrival of next frame.
return // Can this copy be eliminated?
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
} }
// TODO(bradfitz, zhaoq): A copy is required here because there is no
// guarantee f.Data() is consumed before the arrival of next frame.
// Can this copy be eliminated?
data := make([]byte, size)
copy(data, f.Data())
s.write(recvMsg{data: data})
if f.Header().Flags.Has(http2.FlagDataEndStream) { if f.Header().Flags.Has(http2.FlagDataEndStream) {
// Received the end of stream from the client. // Received the end of stream from the client.
s.mu.Lock() s.mu.Lock()