change objects in recvBuffer queue from interface to concrete type to reduce allocs (#1029)

This commit is contained in:
apolcyn
2017-06-26 15:04:42 -07:00
committed by dfawley
parent 57ff3285cf
commit eb7b130505
4 changed files with 75 additions and 26 deletions

View File

@ -325,11 +325,11 @@ func (ht *serverHandlerTransport) HandleStreams(startStream func(*Stream), trace
for buf := make([]byte, readSize); ; {
n, err := req.Body.Read(buf)
if n > 0 {
s.buf.put(&recvMsg{data: buf[:n:n]})
s.buf.put(recvMsg{data: buf[:n:n]})
buf = buf[n:]
}
if err != nil {
s.buf.put(&recvMsg{err: mapRecvMsgError(err)})
s.buf.put(recvMsg{err: mapRecvMsgError(err)})
return
}
if len(buf) == 0 {

View File

@ -76,7 +76,7 @@ type http2Client struct {
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
controlBuf *recvBuffer
controlBuf *controlBuffer
fc *inFlow
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool
@ -215,7 +215,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
framer: newFramer(conn),
hBuf: &buf,
hEnc: hpack.NewEncoder(&buf),
controlBuf: newRecvBuffer(),
controlBuf: newControlBuffer(),
fc: &inFlow{limit: uint32(icwz)},
sendQuotaPool: newQuotaPool(defaultWindowSize),
scheme: scheme,

View File

@ -74,7 +74,7 @@ type http2Server struct {
maxStreams uint32
// controlBuf delivers all the control related tasks (e.g., window
// updates, reset streams, and various settings) to the controller.
controlBuf *recvBuffer
controlBuf *controlBuffer
fc *inFlow
// sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool
@ -185,7 +185,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
hEnc: hpack.NewEncoder(&buf),
maxStreams: maxStreams,
inTapHandle: config.InTapHandle,
controlBuf: newRecvBuffer(),
controlBuf: newControlBuffer(),
fc: &inFlow{limit: uint32(icwz)},
sendQuotaPool: newQuotaPool(defaultWindowSize),
state: reachable,

View File

@ -47,28 +47,25 @@ type recvMsg struct {
err error
}
func (*recvMsg) item() {}
// All items in an out of a recvBuffer should be the same type.
type item interface {
item()
}
// recvBuffer is an unbounded channel of item.
// recvBuffer is an unbounded channel of recvMsg structs.
// Note recvBuffer differs from controlBuffer only in that recvBuffer
// holds a channel of only recvMsg structs instead of objects implementing "item" interface.
// recvBuffer is written to much more often than
// controlBuffer and using strict recvMsg structs helps avoid allocation in "recvBuffer.put"
type recvBuffer struct {
c chan item
c chan recvMsg
mu sync.Mutex
backlog []item
backlog []recvMsg
}
func newRecvBuffer() *recvBuffer {
b := &recvBuffer{
c: make(chan item, 1),
c: make(chan recvMsg, 1),
}
return b
}
func (b *recvBuffer) put(r item) {
func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) == 0 {
@ -87,18 +84,18 @@ func (b *recvBuffer) load() {
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog[0] = recvMsg{}
b.backlog = b.backlog[1:]
default:
}
}
}
// get returns the channel that receives an item in the buffer.
// get returns the channel that receives a recvMsg in the buffer.
//
// Upon receipt of an item, the caller should call load to send another
// item onto the channel if there is any.
func (b *recvBuffer) get() <-chan item {
// Upon receipt of a recvMsg, the caller should call load to send another
// recvMsg onto the channel if there is any.
func (b *recvBuffer) get() <-chan recvMsg {
return b.c
}
@ -131,9 +128,8 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
return 0, ContextErr(r.ctx.Err())
case <-r.goAway:
return 0, ErrStreamDrain
case i := <-r.recv.get():
case m := <-r.recv.get():
r.recv.load()
m := i.(*recvMsg)
if m.err != nil {
return 0, m.err
}
@ -143,6 +139,59 @@ func (r *recvBufferReader) Read(p []byte) (n int, err error) {
}
}
// All items in an out of a controlBuffer should be the same type.
type item interface {
item()
}
// controlBuffer is an unbounded channel of item.
type controlBuffer struct {
c chan item
mu sync.Mutex
backlog []item
}
func newControlBuffer() *controlBuffer {
b := &controlBuffer{
c: make(chan item, 1),
}
return b
}
func (b *controlBuffer) put(r item) {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) == 0 {
select {
case b.c <- r:
return
default:
}
}
b.backlog = append(b.backlog, r)
}
func (b *controlBuffer) load() {
b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) > 0 {
select {
case b.c <- b.backlog[0]:
b.backlog[0] = nil
b.backlog = b.backlog[1:]
default:
}
}
}
// get returns the channel that receives an item in the buffer.
//
// Upon receipt of an item, the caller should call load to send another
// item onto the channel if there is any.
func (b *controlBuffer) get() <-chan item {
return b.c
}
type streamState uint8
const (
@ -313,7 +362,7 @@ func (s *Stream) SetTrailer(md metadata.MD) error {
}
func (s *Stream) write(m recvMsg) {
s.buf.put(&m)
s.buf.put(m)
}
// Read reads all p bytes from the wire for this stream.