transport: Fix the inconsistency between headerChan and headerDone (#2818)
transport: Fix the inconsistency between headerChan and headerDone
This commit is contained in:
@ -549,7 +549,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
s.write(recvMsg{err: err})
|
s.write(recvMsg{err: err})
|
||||||
close(s.done)
|
close(s.done)
|
||||||
// If headerChan isn't closed, then close it.
|
// If headerChan isn't closed, then close it.
|
||||||
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
|
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
|
||||||
close(s.headerChan)
|
close(s.headerChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -713,7 +713,7 @@ func (t *http2Client) closeStream(s *Stream, err error, rst bool, rstCode http2.
|
|||||||
s.write(recvMsg{err: err})
|
s.write(recvMsg{err: err})
|
||||||
}
|
}
|
||||||
// If headerChan isn't closed, then close it.
|
// If headerChan isn't closed, then close it.
|
||||||
if atomic.SwapUint32(&s.headerDone, 1) == 0 {
|
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
|
||||||
s.noHeaders = true
|
s.noHeaders = true
|
||||||
close(s.headerChan)
|
close(s.headerChan)
|
||||||
}
|
}
|
||||||
@ -1142,26 +1142,24 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
|||||||
}
|
}
|
||||||
endStream := frame.StreamEnded()
|
endStream := frame.StreamEnded()
|
||||||
atomic.StoreUint32(&s.bytesReceived, 1)
|
atomic.StoreUint32(&s.bytesReceived, 1)
|
||||||
initialHeader := atomic.SwapUint32(&s.headerDone, 1) == 0
|
initialHeader := atomic.LoadUint32(&s.headerChanClosed) == 0
|
||||||
|
|
||||||
if !initialHeader && !endStream {
|
if !initialHeader && !endStream {
|
||||||
// As specified by RFC 7540, a HEADERS frame (and associated CONTINUATION frames) can only appear
|
// As specified by gRPC over HTTP2, a HEADERS frame (and associated CONTINUATION frames) can only appear at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
|
||||||
// at the start or end of a stream. Therefore, second HEADERS frame must have EOS bit set.
|
|
||||||
st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
|
st := status.New(codes.Internal, "a HEADERS frame cannot appear in the middle of a stream")
|
||||||
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
|
t.closeStream(s, st.Err(), true, http2.ErrCodeProtocol, st, nil, false)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
state := &decodeState{}
|
state := &decodeState{}
|
||||||
// Initialize isGRPC value to be !initialHeader, since if a gRPC ResponseHeader has been received
|
// Initialize isGRPC value to be !initialHeader, since if a gRPC Response-Headers has already been received, then it means that the peer is speaking gRPC and we are in gRPC mode.
|
||||||
// which indicates peer speaking gRPC, we are in gRPC mode.
|
|
||||||
state.data.isGRPC = !initialHeader
|
state.data.isGRPC = !initialHeader
|
||||||
if err := state.decodeHeader(frame); err != nil {
|
if err := state.decodeHeader(frame); err != nil {
|
||||||
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
|
t.closeStream(s, err, true, http2.ErrCodeProtocol, status.Convert(err), nil, endStream)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
var isHeader bool
|
isHeader := false
|
||||||
defer func() {
|
defer func() {
|
||||||
if t.statsHandler != nil {
|
if t.statsHandler != nil {
|
||||||
if isHeader {
|
if isHeader {
|
||||||
@ -1180,10 +1178,10 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
// If headers haven't been received yet.
|
// If headerChan hasn't been closed yet
|
||||||
if initialHeader {
|
if atomic.CompareAndSwapUint32(&s.headerChanClosed, 0, 1) {
|
||||||
if !endStream {
|
if !endStream {
|
||||||
// Headers frame is ResponseHeader.
|
// HEADERS frame block carries a Response-Headers.
|
||||||
isHeader = true
|
isHeader = true
|
||||||
// These values can be set without any synchronization because
|
// These values can be set without any synchronization because
|
||||||
// stream goroutine will read it only after seeing a closed
|
// stream goroutine will read it only after seeing a closed
|
||||||
@ -1192,14 +1190,17 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
|||||||
if len(state.data.mdata) > 0 {
|
if len(state.data.mdata) > 0 {
|
||||||
s.header = state.data.mdata
|
s.header = state.data.mdata
|
||||||
}
|
}
|
||||||
close(s.headerChan)
|
} else {
|
||||||
return
|
// HEADERS frame block carries a Trailers-Only.
|
||||||
|
s.noHeaders = true
|
||||||
}
|
}
|
||||||
// Headers frame is Trailers-only.
|
|
||||||
s.noHeaders = true
|
|
||||||
close(s.headerChan)
|
close(s.headerChan)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
if !endStream {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
// if client received END_STREAM from server while stream was still active, send RST_STREAM
|
// if client received END_STREAM from server while stream was still active, send RST_STREAM
|
||||||
rst := s.getState() == streamActive
|
rst := s.getState() == streamActive
|
||||||
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
|
t.closeStream(s, io.EOF, rst, http2.ErrCodeNo, state.status(), state.data.mdata, true)
|
||||||
|
@ -204,8 +204,8 @@ type Stream struct {
|
|||||||
// is used to adjust flow control, if needed.
|
// is used to adjust flow control, if needed.
|
||||||
requestRead func(int)
|
requestRead func(int)
|
||||||
|
|
||||||
headerChan chan struct{} // closed to indicate the end of header metadata.
|
headerChan chan struct{} // closed to indicate the end of header metadata.
|
||||||
headerDone uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
headerChanClosed uint32 // set when headerChan is closed. Used to avoid closing headerChan multiple times.
|
||||||
|
|
||||||
// hdrMu protects header and trailer metadata on the server-side.
|
// hdrMu protects header and trailer metadata on the server-side.
|
||||||
hdrMu sync.Mutex
|
hdrMu sync.Mutex
|
||||||
|
@ -1717,6 +1717,24 @@ func TestInvalidHeaderField(t *testing.T) {
|
|||||||
server.stop()
|
server.stop()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestHeaderChanClosedAfterReceivingAnInvalidHeader(t *testing.T) {
|
||||||
|
server, ct, cancel := setUp(t, 0, math.MaxUint32, invalidHeaderField)
|
||||||
|
defer cancel()
|
||||||
|
defer server.stop()
|
||||||
|
defer ct.Close()
|
||||||
|
s, err := ct.NewStream(context.Background(), &CallHdr{Host: "localhost", Method: "foo"})
|
||||||
|
if err != nil {
|
||||||
|
t.Fatalf("failed to create the stream")
|
||||||
|
}
|
||||||
|
timer := time.NewTimer(time.Second)
|
||||||
|
defer timer.Stop()
|
||||||
|
select {
|
||||||
|
case <-s.headerChan:
|
||||||
|
case <-timer.C:
|
||||||
|
t.Errorf("s.headerChan: got open, want closed")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func TestIsReservedHeader(t *testing.T) {
|
func TestIsReservedHeader(t *testing.T) {
|
||||||
tests := []struct {
|
tests := []struct {
|
||||||
h string
|
h string
|
||||||
|
Reference in New Issue
Block a user