Add documentation for loopy. (#2169)
This commit is contained in:
@ -229,6 +229,12 @@ func (l *outStreamList) dequeue() *outStream {
|
|||||||
return b
|
return b
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// controlBuffer is a way to pass information to loopy.
|
||||||
|
// Information is passed as specific struct types called control frames.
|
||||||
|
// A control frame not only represents data, messages or headers to be sent out
|
||||||
|
// but can also be used to instruct loopy to update its internal state.
|
||||||
|
// It shouldn't be confused with an HTTP2 frame, although some of the control frames
|
||||||
|
// like dataFrame and headerFrame do go out on wire as HTTP2 frames.
|
||||||
type controlBuffer struct {
|
type controlBuffer struct {
|
||||||
ch chan struct{}
|
ch chan struct{}
|
||||||
done <-chan struct{}
|
done <-chan struct{}
|
||||||
@ -335,13 +341,29 @@ const (
|
|||||||
serverSide
|
serverSide
|
||||||
)
|
)
|
||||||
|
|
||||||
|
// Loopy receives frames from the control buffer.
|
||||||
|
// Each frame is handled individually; most of the work done by loopy goes
|
||||||
|
// into handling data frames. Loopy maintains a queue of active streams, and each
|
||||||
|
// stream maintains a queue of data frames; as loopy receives data frames
|
||||||
|
// it gets added to the queue of the relevant stream.
|
||||||
|
// Loopy goes over this list of active streams by processing one node every iteration,
|
||||||
|
// thereby closely resemebling to a round-robin scheduling over all streams. While
|
||||||
|
// processing a stream, loopy writes out data bytes from this stream capped by the min
|
||||||
|
// of http2MaxFrameLen, connection-level flow control and stream-level flow control.
|
||||||
type loopyWriter struct {
|
type loopyWriter struct {
|
||||||
side side
|
side side
|
||||||
cbuf *controlBuffer
|
cbuf *controlBuffer
|
||||||
sendQuota uint32
|
sendQuota uint32
|
||||||
oiws uint32 // outbound initial window size.
|
oiws uint32 // outbound initial window size.
|
||||||
estdStreams map[uint32]*outStream // Established streams.
|
// estdStreams is map of all established streams that are not cleaned-up yet.
|
||||||
activeStreams *outStreamList // Streams that are sending data.
|
// On client-side, this is all streams whose headers were sent out.
|
||||||
|
// On server-side, this is all streams whose headers were received.
|
||||||
|
estdStreams map[uint32]*outStream // Established streams.
|
||||||
|
// activeStreams is a linked-list of all streams that have data to send and some
|
||||||
|
// stream-level flow control quota.
|
||||||
|
// Each of these streams internally have a list of data items(and perhaps trailers
|
||||||
|
// on the server-side) to be sent out.
|
||||||
|
activeStreams *outStreamList
|
||||||
framer *framer
|
framer *framer
|
||||||
hBuf *bytes.Buffer // The buffer for HPACK encoding.
|
hBuf *bytes.Buffer // The buffer for HPACK encoding.
|
||||||
hEnc *hpack.Encoder // HPACK encoder.
|
hEnc *hpack.Encoder // HPACK encoder.
|
||||||
@ -372,6 +394,21 @@ func newLoopyWriter(s side, fr *framer, cbuf *controlBuffer, bdpEst *bdpEstimato
|
|||||||
const minBatchSize = 1000
|
const minBatchSize = 1000
|
||||||
|
|
||||||
// run should be run in a separate goroutine.
|
// run should be run in a separate goroutine.
|
||||||
|
// It reads control frames from controlBuf and processes them by:
|
||||||
|
// 1. Updating loopy's internal state, or/and
|
||||||
|
// 2. Writing out HTTP2 frames on the wire.
|
||||||
|
//
|
||||||
|
// Loopy keeps all active streams with data to send in a linked-list.
|
||||||
|
// All streams in the activeStreams linked-list must have both:
|
||||||
|
// 1. Data to send, and
|
||||||
|
// 2. Stream level flow control quota available.
|
||||||
|
//
|
||||||
|
// In each iteration of run loop, other than processing the incoming control
|
||||||
|
// frame, loopy calls processData, which processes one node from the activeStreams linked-list.
|
||||||
|
// This results in writing of HTTP2 frames into an underlying write buffer.
|
||||||
|
// When there's no more control frames to read from controlBuf, loopy flushes the write buffer.
|
||||||
|
// As an optimization, to increase the batch size for each flush, loopy yields the processor, once
|
||||||
|
// if the batch size is too low to give stream goroutines a chance to fill it up.
|
||||||
func (l *loopyWriter) run() (err error) {
|
func (l *loopyWriter) run() (err error) {
|
||||||
defer func() {
|
defer func() {
|
||||||
if err == ErrConnClosing {
|
if err == ErrConnClosing {
|
||||||
@ -696,21 +733,30 @@ func (l *loopyWriter) applySettings(ss []http2.Setting) error {
|
|||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processData removes the first stream from active streams, writes out at most 16KB
|
||||||
|
// of its data and then puts it at the end of activeStreams if there's still more data
|
||||||
|
// to be sent and stream has some stream-level flow control.
|
||||||
func (l *loopyWriter) processData() (bool, error) {
|
func (l *loopyWriter) processData() (bool, error) {
|
||||||
if l.sendQuota == 0 {
|
if l.sendQuota == 0 {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
str := l.activeStreams.dequeue()
|
str := l.activeStreams.dequeue() // Remove the first stream.
|
||||||
if str == nil {
|
if str == nil {
|
||||||
return true, nil
|
return true, nil
|
||||||
}
|
}
|
||||||
dataItem := str.itl.peek().(*dataFrame)
|
dataItem := str.itl.peek().(*dataFrame) // Peek at the first data item this stream.
|
||||||
if len(dataItem.h) == 0 && len(dataItem.d) == 0 {
|
// A data item is represented by a dataFrame, since it later translates into
|
||||||
|
// multiple HTTP2 data frames.
|
||||||
|
// Every dataFrame has two buffers; h that keeps grpc-message header and d that is acutal data.
|
||||||
|
// As an optimization to keep wire traffic low, data from d is copied to h to make as big as the
|
||||||
|
// maximum possilbe HTTP2 frame size.
|
||||||
|
|
||||||
|
if len(dataItem.h) == 0 && len(dataItem.d) == 0 { // Empty data frame
|
||||||
// Client sends out empty data frame with endStream = true
|
// Client sends out empty data frame with endStream = true
|
||||||
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
|
if err := l.framer.fr.WriteData(dataItem.streamID, dataItem.endStream, nil); err != nil {
|
||||||
return false, err
|
return false, err
|
||||||
}
|
}
|
||||||
str.itl.dequeue()
|
str.itl.dequeue() // remove the empty data item from stream
|
||||||
if str.itl.isEmpty() {
|
if str.itl.isEmpty() {
|
||||||
str.state = empty
|
str.state = empty
|
||||||
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
|
} else if trailer, ok := str.itl.peek().(*headerFrame); ok { // the next item is trailers.
|
||||||
@ -739,21 +785,20 @@ func (l *loopyWriter) processData() (bool, error) {
|
|||||||
if len(buf) < size {
|
if len(buf) < size {
|
||||||
size = len(buf)
|
size = len(buf)
|
||||||
}
|
}
|
||||||
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 {
|
if strQuota := int(l.oiws) - str.bytesOutStanding; strQuota <= 0 { // stream-level flow control.
|
||||||
str.state = waitingOnStreamQuota
|
str.state = waitingOnStreamQuota
|
||||||
return false, nil
|
return false, nil
|
||||||
} else if strQuota < size {
|
} else if strQuota < size {
|
||||||
size = strQuota
|
size = strQuota
|
||||||
}
|
}
|
||||||
|
|
||||||
if l.sendQuota < uint32(size) {
|
if l.sendQuota < uint32(size) { // connection-level flow control.
|
||||||
size = int(l.sendQuota)
|
size = int(l.sendQuota)
|
||||||
}
|
}
|
||||||
// Now that outgoing flow controls are checked we can replenish str's write quota
|
// Now that outgoing flow controls are checked we can replenish str's write quota
|
||||||
str.wq.replenish(size)
|
str.wq.replenish(size)
|
||||||
var endStream bool
|
var endStream bool
|
||||||
// This last data message on this stream and all
|
// If this is the last data message on this stream and all of it can be written in this iteration.
|
||||||
// of it can be written in this go.
|
|
||||||
if dataItem.endStream && size == len(buf) {
|
if dataItem.endStream && size == len(buf) {
|
||||||
// buf contains either data or it contains header but data is empty.
|
// buf contains either data or it contains header but data is empty.
|
||||||
if idx == 1 || len(dataItem.d) == 0 {
|
if idx == 1 || len(dataItem.d) == 0 {
|
||||||
|
Reference in New Issue
Block a user