transport: block reading frames when too many transport control frames are queued (#2970)

Addresses CVE-2019-9512 (Ping Flood), CVE-2019-9514 (Reset Flood), and CVE-2019-9515 (Settings Flood).
This commit is contained in:
Doug Fawley
2019-08-13 10:52:26 -07:00
committed by GitHub
parent ee87494b1f
commit ee21c923a2
3 changed files with 83 additions and 3 deletions

View File

@ -23,6 +23,7 @@ import (
"fmt" "fmt"
"runtime" "runtime"
"sync" "sync"
"sync/atomic"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"golang.org/x/net/http2/hpack" "golang.org/x/net/http2/hpack"
@ -84,12 +85,24 @@ func (il *itemList) isEmpty() bool {
// the control buffer of transport. They represent different aspects of // the control buffer of transport. They represent different aspects of
// control tasks, e.g., flow control, settings, streaming resetting, etc. // control tasks, e.g., flow control, settings, streaming resetting, etc.
// maxQueuedTransportResponseFrames is the most queued "transport response"
// frames we will buffer before preventing new reads from occurring on the
// transport. These are control frames sent in response to client requests,
// such as RST_STREAM due to bad headers or settings acks.
const maxQueuedTransportResponseFrames = 50
type cbItem interface {
isTransportResponseFrame() bool
}
// registerStream is used to register an incoming stream with loopy writer. // registerStream is used to register an incoming stream with loopy writer.
type registerStream struct { type registerStream struct {
streamID uint32 streamID uint32
wq *writeQuota wq *writeQuota
} }
func (*registerStream) isTransportResponseFrame() bool { return false }
// headerFrame is also used to register stream on the client-side. // headerFrame is also used to register stream on the client-side.
type headerFrame struct { type headerFrame struct {
streamID uint32 streamID uint32
@ -102,6 +115,10 @@ type headerFrame struct {
onOrphaned func(error) // Valid on client-side onOrphaned func(error) // Valid on client-side
} }
func (h *headerFrame) isTransportResponseFrame() bool {
return h.cleanup != nil && h.cleanup.rst // Results in a RST_STREAM
}
type cleanupStream struct { type cleanupStream struct {
streamID uint32 streamID uint32
rst bool rst bool
@ -109,6 +126,8 @@ type cleanupStream struct {
onWrite func() onWrite func()
} }
func (c *cleanupStream) isTransportResponseFrame() bool { return c.rst } // Results in a RST_STREAM
type dataFrame struct { type dataFrame struct {
streamID uint32 streamID uint32
endStream bool endStream bool
@ -119,27 +138,41 @@ type dataFrame struct {
onEachWrite func() onEachWrite func()
} }
func (*dataFrame) isTransportResponseFrame() bool { return false }
type incomingWindowUpdate struct { type incomingWindowUpdate struct {
streamID uint32 streamID uint32
increment uint32 increment uint32
} }
func (*incomingWindowUpdate) isTransportResponseFrame() bool { return false }
type outgoingWindowUpdate struct { type outgoingWindowUpdate struct {
streamID uint32 streamID uint32
increment uint32 increment uint32
} }
func (*outgoingWindowUpdate) isTransportResponseFrame() bool {
return false // window updates are throttled by thresholds
}
type incomingSettings struct { type incomingSettings struct {
ss []http2.Setting ss []http2.Setting
} }
func (*incomingSettings) isTransportResponseFrame() bool { return true } // Results in a settings ACK
type outgoingSettings struct { type outgoingSettings struct {
ss []http2.Setting ss []http2.Setting
} }
func (*outgoingSettings) isTransportResponseFrame() bool { return false }
type incomingGoAway struct { type incomingGoAway struct {
} }
func (*incomingGoAway) isTransportResponseFrame() bool { return false }
type goAway struct { type goAway struct {
code http2.ErrCode code http2.ErrCode
debugData []byte debugData []byte
@ -147,15 +180,21 @@ type goAway struct {
closeConn bool closeConn bool
} }
func (*goAway) isTransportResponseFrame() bool { return false }
type ping struct { type ping struct {
ack bool ack bool
data [8]byte data [8]byte
} }
func (*ping) isTransportResponseFrame() bool { return true }
type outFlowControlSizeRequest struct { type outFlowControlSizeRequest struct {
resp chan uint32 resp chan uint32
} }
func (*outFlowControlSizeRequest) isTransportResponseFrame() bool { return false }
type outStreamState int type outStreamState int
const ( const (
@ -238,6 +277,14 @@ type controlBuffer struct {
consumerWaiting bool consumerWaiting bool
list *itemList list *itemList
err error err error
// transportResponseFrames counts the number of queued items that represent
// the response of an action initiated by the peer. trfChan is created
// when transportResponseFrames >= maxQueuedTransportResponseFrames and is
// closed and nilled when transportResponseFrames drops below the
// threshold. Both fields are protected by mu.
transportResponseFrames int
trfChan atomic.Value // *chan struct{}
} }
func newControlBuffer(done <-chan struct{}) *controlBuffer { func newControlBuffer(done <-chan struct{}) *controlBuffer {
@ -248,12 +295,24 @@ func newControlBuffer(done <-chan struct{}) *controlBuffer {
} }
} }
func (c *controlBuffer) put(it interface{}) error { // throttle blocks if there are too many incomingSettings/cleanupStreams in the
// controlbuf.
func (c *controlBuffer) throttle() {
ch, _ := c.trfChan.Load().(*chan struct{})
if ch != nil {
select {
case <-*ch:
case <-c.done:
}
}
}
func (c *controlBuffer) put(it cbItem) error {
_, err := c.executeAndPut(nil, it) _, err := c.executeAndPut(nil, it)
return err return err
} }
func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{}) (bool, error) { func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it cbItem) (bool, error) {
var wakeUp bool var wakeUp bool
c.mu.Lock() c.mu.Lock()
if c.err != nil { if c.err != nil {
@ -271,6 +330,15 @@ func (c *controlBuffer) executeAndPut(f func(it interface{}) bool, it interface{
c.consumerWaiting = false c.consumerWaiting = false
} }
c.list.enqueue(it) c.list.enqueue(it)
if it.isTransportResponseFrame() {
c.transportResponseFrames++
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are adding the frame that puts us over the threshold; create
// a throttling channel.
ch := make(chan struct{})
c.trfChan.Store(&ch)
}
}
c.mu.Unlock() c.mu.Unlock()
if wakeUp { if wakeUp {
select { select {
@ -304,7 +372,17 @@ func (c *controlBuffer) get(block bool) (interface{}, error) {
return nil, c.err return nil, c.err
} }
if !c.list.isEmpty() { if !c.list.isEmpty() {
h := c.list.dequeue() h := c.list.dequeue().(cbItem)
if h.isTransportResponseFrame() {
if c.transportResponseFrames == maxQueuedTransportResponseFrames {
// We are removing the frame that put us over the
// threshold; close and clear the throttling channel.
ch := c.trfChan.Load().(*chan struct{})
close(*ch)
c.trfChan.Store((*chan struct{})(nil))
}
c.transportResponseFrames--
}
c.mu.Unlock() c.mu.Unlock()
return h, nil return h, nil
} }

View File

@ -1245,6 +1245,7 @@ func (t *http2Client) reader() {
// loop to keep reading incoming messages on this transport. // loop to keep reading incoming messages on this transport.
for { for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame() frame, err := t.framer.fr.ReadFrame()
if t.keepaliveEnabled { if t.keepaliveEnabled {
atomic.CompareAndSwapUint32(&t.activity, 0, 1) atomic.CompareAndSwapUint32(&t.activity, 0, 1)

View File

@ -436,6 +436,7 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) { func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.Context, string) context.Context) {
defer close(t.readerDone) defer close(t.readerDone)
for { for {
t.controlBuf.throttle()
frame, err := t.framer.fr.ReadFrame() frame, err := t.framer.fr.ReadFrame()
atomic.StoreUint32(&t.activity, 1) atomic.StoreUint32(&t.activity, 1)
if err != nil { if err != nil {