Speed-up quota pools. (#1636)
* First commit. * First commit. * Second commit. * Post-review update.
This commit is contained in:
@ -255,7 +255,7 @@ func (stats *Stats) maybeUpdate() {
|
|||||||
stats.dirty = false
|
stats.dirty = false
|
||||||
|
|
||||||
if stats.durations.Len() != 0 {
|
if stats.durations.Len() != 0 {
|
||||||
var percentToObserve = []int{50, 90}
|
var percentToObserve = []int{50, 90, 99}
|
||||||
// First data record min unit from the latency result.
|
// First data record min unit from the latency result.
|
||||||
stats.result.Latency = append(stats.result.Latency, percentLatency{Percent: -1, Value: stats.unit})
|
stats.result.Latency = append(stats.result.Latency, percentLatency{Percent: -1, Value: stats.unit})
|
||||||
for _, position := range percentToObserve {
|
for _, position := range percentToObserve {
|
||||||
|
|||||||
@ -20,9 +20,9 @@ package transport
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"sync"
|
"sync"
|
||||||
"sync/atomic"
|
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/http2"
|
"golang.org/x/net/http2"
|
||||||
@ -49,7 +49,7 @@ const (
|
|||||||
// defaultLocalSendQuota sets is default value for number of data
|
// defaultLocalSendQuota sets is default value for number of data
|
||||||
// bytes that each stream can schedule before some of it being
|
// bytes that each stream can schedule before some of it being
|
||||||
// flushed out.
|
// flushed out.
|
||||||
defaultLocalSendQuota = 64 * 1024
|
defaultLocalSendQuota = 128 * 1024
|
||||||
)
|
)
|
||||||
|
|
||||||
// The following defines various control items which could flow through
|
// The following defines various control items which could flow through
|
||||||
@ -130,9 +130,8 @@ func (*ping) item() {}
|
|||||||
// quotaPool is a pool which accumulates the quota and sends it to acquire()
|
// quotaPool is a pool which accumulates the quota and sends it to acquire()
|
||||||
// when it is available.
|
// when it is available.
|
||||||
type quotaPool struct {
|
type quotaPool struct {
|
||||||
c chan int
|
|
||||||
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
c chan struct{}
|
||||||
version uint32
|
version uint32
|
||||||
quota int
|
quota int
|
||||||
}
|
}
|
||||||
@ -140,12 +139,8 @@ type quotaPool struct {
|
|||||||
// newQuotaPool creates a quotaPool which has quota q available to consume.
|
// newQuotaPool creates a quotaPool which has quota q available to consume.
|
||||||
func newQuotaPool(q int) *quotaPool {
|
func newQuotaPool(q int) *quotaPool {
|
||||||
qb := "aPool{
|
qb := "aPool{
|
||||||
c: make(chan int, 1),
|
quota: q,
|
||||||
}
|
c: make(chan struct{}, 1),
|
||||||
if q > 0 {
|
|
||||||
qb.c <- q
|
|
||||||
} else {
|
|
||||||
qb.quota = q
|
|
||||||
}
|
}
|
||||||
return qb
|
return qb
|
||||||
}
|
}
|
||||||
@ -159,60 +154,83 @@ func (qb *quotaPool) add(v int) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (qb *quotaPool) lockedAdd(v int) {
|
func (qb *quotaPool) lockedAdd(v int) {
|
||||||
select {
|
var wakeUp bool
|
||||||
case n := <-qb.c:
|
if qb.quota <= 0 {
|
||||||
qb.quota += n
|
wakeUp = true // Wake up potential watiers.
|
||||||
default:
|
|
||||||
}
|
}
|
||||||
qb.quota += v
|
qb.quota += v
|
||||||
if qb.quota <= 0 {
|
if wakeUp && qb.quota > 0 {
|
||||||
return
|
|
||||||
}
|
|
||||||
// After the pool has been created, this is the only place that sends on
|
|
||||||
// the channel. Since mu is held at this point and any quota that was sent
|
|
||||||
// on the channel has been retrieved, we know that this code will always
|
|
||||||
// place any positive quota value on the channel.
|
|
||||||
select {
|
select {
|
||||||
case qb.c <- qb.quota:
|
case qb.c <- struct{}{}:
|
||||||
qb.quota = 0
|
|
||||||
default:
|
default:
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
}
|
||||||
|
|
||||||
func (qb *quotaPool) addAndUpdate(v int) {
|
func (qb *quotaPool) addAndUpdate(v int) {
|
||||||
qb.mu.Lock()
|
qb.mu.Lock()
|
||||||
defer qb.mu.Unlock()
|
|
||||||
qb.lockedAdd(v)
|
qb.lockedAdd(v)
|
||||||
// Update the version only after having added to the quota
|
qb.version++
|
||||||
// so that if acquireWithVesrion sees the new vesrion it is
|
qb.mu.Unlock()
|
||||||
// guaranteed to have seen the updated quota.
|
|
||||||
// Also, still keep this inside of the lock, so that when
|
|
||||||
// compareAndExecute is processing, this function doesn't
|
|
||||||
// get executed partially (quota gets updated but the version
|
|
||||||
// doesn't).
|
|
||||||
atomic.AddUint32(&(qb.version), 1)
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qb *quotaPool) acquireWithVersion() (<-chan int, uint32) {
|
func (qb *quotaPool) get(v int, wc waiters) (int, uint32, error) {
|
||||||
return qb.c, atomic.LoadUint32(&(qb.version))
|
qb.mu.Lock()
|
||||||
|
if qb.quota > 0 {
|
||||||
|
if v > qb.quota {
|
||||||
|
v = qb.quota
|
||||||
|
}
|
||||||
|
qb.quota -= v
|
||||||
|
ver := qb.version
|
||||||
|
qb.mu.Unlock()
|
||||||
|
return v, ver, nil
|
||||||
|
}
|
||||||
|
qb.mu.Unlock()
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-wc.ctx.Done():
|
||||||
|
return 0, 0, ContextErr(wc.ctx.Err())
|
||||||
|
case <-wc.tctx.Done():
|
||||||
|
return 0, 0, ErrConnClosing
|
||||||
|
case <-wc.done:
|
||||||
|
return 0, 0, io.EOF
|
||||||
|
case <-wc.goAway:
|
||||||
|
return 0, 0, ErrStreamDrain
|
||||||
|
case <-qb.c:
|
||||||
|
qb.mu.Lock()
|
||||||
|
if qb.quota > 0 {
|
||||||
|
if v > qb.quota {
|
||||||
|
v = qb.quota
|
||||||
|
}
|
||||||
|
qb.quota -= v
|
||||||
|
ver := qb.version
|
||||||
|
if qb.quota > 0 {
|
||||||
|
select {
|
||||||
|
case qb.c <- struct{}{}:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
qb.mu.Unlock()
|
||||||
|
return v, ver, nil
|
||||||
|
|
||||||
|
}
|
||||||
|
qb.mu.Unlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (qb *quotaPool) compareAndExecute(version uint32, success, failure func()) bool {
|
func (qb *quotaPool) compareAndExecute(version uint32, success, failure func()) bool {
|
||||||
qb.mu.Lock()
|
qb.mu.Lock()
|
||||||
defer qb.mu.Unlock()
|
if version == qb.version {
|
||||||
if version == atomic.LoadUint32(&(qb.version)) {
|
|
||||||
success()
|
success()
|
||||||
|
qb.mu.Unlock()
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
failure()
|
failure()
|
||||||
|
qb.mu.Unlock()
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// acquire returns the channel on which available quota amounts are sent.
|
|
||||||
func (qb *quotaPool) acquire() <-chan int {
|
|
||||||
return qb.c
|
|
||||||
}
|
|
||||||
|
|
||||||
// inFlow deals with inbound flow control
|
// inFlow deals with inbound flow control
|
||||||
type inFlow struct {
|
type inFlow struct {
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
|
|||||||
@ -68,6 +68,9 @@ type http2Client struct {
|
|||||||
fc *inFlow
|
fc *inFlow
|
||||||
// sendQuotaPool provides flow control to outbound message.
|
// sendQuotaPool provides flow control to outbound message.
|
||||||
sendQuotaPool *quotaPool
|
sendQuotaPool *quotaPool
|
||||||
|
// localSendQuota limits the amount of data that can be scheduled
|
||||||
|
// for writing before it is actually written out.
|
||||||
|
localSendQuota *quotaPool
|
||||||
// streamsQuota limits the max number of concurrent streams.
|
// streamsQuota limits the max number of concurrent streams.
|
||||||
streamsQuota *quotaPool
|
streamsQuota *quotaPool
|
||||||
|
|
||||||
@ -225,6 +228,7 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions, t
|
|||||||
controlBuf: newControlBuffer(),
|
controlBuf: newControlBuffer(),
|
||||||
fc: &inFlow{limit: uint32(icwz)},
|
fc: &inFlow{limit: uint32(icwz)},
|
||||||
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
||||||
|
localSendQuota: newQuotaPool(defaultLocalSendQuota),
|
||||||
scheme: scheme,
|
scheme: scheme,
|
||||||
state: reachable,
|
state: reachable,
|
||||||
activeStreams: make(map[uint32]*Stream),
|
activeStreams: make(map[uint32]*Stream),
|
||||||
@ -315,7 +319,6 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
|||||||
buf: newRecvBuffer(),
|
buf: newRecvBuffer(),
|
||||||
fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
fc: &inFlow{limit: uint32(t.initialWindowSize)},
|
||||||
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
|
sendQuotaPool: newQuotaPool(int(t.streamSendQuota)),
|
||||||
localSendQuota: newQuotaPool(defaultLocalSendQuota),
|
|
||||||
headerChan: make(chan struct{}),
|
headerChan: make(chan struct{}),
|
||||||
}
|
}
|
||||||
t.nextID += 2
|
t.nextID += 2
|
||||||
@ -336,7 +339,12 @@ func (t *http2Client) newStream(ctx context.Context, callHdr *CallHdr) *Stream {
|
|||||||
t.updateWindow(s, uint32(n))
|
t.updateWindow(s, uint32(n))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
s.waiters = waiters{
|
||||||
|
ctx: s.ctx,
|
||||||
|
tctx: t.ctx,
|
||||||
|
done: s.done,
|
||||||
|
goAway: s.goAway,
|
||||||
|
}
|
||||||
return s
|
return s
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -409,14 +417,10 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
return nil, ErrConnClosing
|
return nil, ErrConnClosing
|
||||||
}
|
}
|
||||||
t.mu.Unlock()
|
t.mu.Unlock()
|
||||||
sq, err := wait(ctx, t.ctx, nil, nil, t.streamsQuota.acquire())
|
// Get a quota of 1 from streamsQuota.
|
||||||
if err != nil {
|
if _, _, err := t.streamsQuota.get(1, waiters{ctx: ctx, tctx: t.ctx}); err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
// Returns the quota balance back.
|
|
||||||
if sq > 1 {
|
|
||||||
t.streamsQuota.add(sq - 1)
|
|
||||||
}
|
|
||||||
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
// TODO(mmukhi): Benchmark if the performance gets better if count the metadata and other header fields
|
||||||
// first and create a slice of that exact size.
|
// first and create a slice of that exact size.
|
||||||
// Make the slice of certain predictable size to reduce allocations made by append.
|
// Make the slice of certain predictable size to reduce allocations made by append.
|
||||||
@ -662,9 +666,7 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||||||
var (
|
var (
|
||||||
streamQuota int
|
streamQuota int
|
||||||
streamQuotaVer uint32
|
streamQuotaVer uint32
|
||||||
localSendQuota int
|
|
||||||
err error
|
err error
|
||||||
sqChan <-chan int
|
|
||||||
)
|
)
|
||||||
for idx, r := range [][]byte{hdr, data} {
|
for idx, r := range [][]byte{hdr, data} {
|
||||||
for len(r) > 0 {
|
for len(r) > 0 {
|
||||||
@ -673,36 +675,31 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||||||
size = len(r)
|
size = len(r)
|
||||||
}
|
}
|
||||||
if streamQuota == 0 { // Used up all the locally cached stream quota.
|
if streamQuota == 0 { // Used up all the locally cached stream quota.
|
||||||
sqChan, streamQuotaVer = s.sendQuotaPool.acquireWithVersion()
|
// Get all the stream quota there is.
|
||||||
// Wait until the stream has some quota to send the data.
|
streamQuota, streamQuotaVer, err = s.sendQuotaPool.get(math.MaxInt32, s.waiters)
|
||||||
streamQuota, err = wait(s.ctx, t.ctx, s.done, s.goAway, sqChan)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if localSendQuota <= 0 { // Being a soft limit, it can go negative.
|
|
||||||
// Acquire local send quota to be able to write to the controlBuf.
|
|
||||||
localSendQuota, err = wait(s.ctx, t.ctx, s.done, s.goAway, s.localSendQuota.acquire())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if size > streamQuota {
|
if size > streamQuota {
|
||||||
size = streamQuota
|
size = streamQuota
|
||||||
} // No need to do that for localSendQuota since that's only a soft limit.
|
}
|
||||||
// Wait until the transport has some quota to send the data.
|
|
||||||
tq, err := wait(s.ctx, t.ctx, s.done, s.goAway, t.sendQuotaPool.acquire())
|
// Get size worth quota from transport.
|
||||||
|
tq, _, err := t.sendQuotaPool.get(size, s.waiters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if tq < size {
|
if tq < size {
|
||||||
size = tq
|
size = tq
|
||||||
}
|
}
|
||||||
if tq > size { // Overbooked transport quota. Return it back.
|
ltq, _, err := t.localSendQuota.get(size, s.waiters)
|
||||||
t.sendQuotaPool.add(tq - size)
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
// even if ltq is smaller than size we don't adjust size since
|
||||||
|
// ltq is only a soft limit.
|
||||||
streamQuota -= size
|
streamQuota -= size
|
||||||
localSendQuota -= size
|
|
||||||
p := r[:size]
|
p := r[:size]
|
||||||
var endStream bool
|
var endStream bool
|
||||||
// See if this is the last frame to be written.
|
// See if this is the last frame to be written.
|
||||||
@ -718,8 +715,8 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
success := func() {
|
success := func() {
|
||||||
sz := size
|
ltq := ltq
|
||||||
t.controlBuf.put(&dataFrame{streamID: s.id, endStream: endStream, d: p, f: func() { s.localSendQuota.add(sz) }})
|
t.controlBuf.put(&dataFrame{streamID: s.id, endStream: endStream, d: p, f: func() { t.localSendQuota.add(ltq) }})
|
||||||
r = r[size:]
|
r = r[size:]
|
||||||
}
|
}
|
||||||
failure := func() { // The stream quota version must have changed.
|
failure := func() { // The stream quota version must have changed.
|
||||||
@ -729,7 +726,7 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||||||
if !s.sendQuotaPool.compareAndExecute(streamQuotaVer, success, failure) {
|
if !s.sendQuotaPool.compareAndExecute(streamQuotaVer, success, failure) {
|
||||||
// Couldn't send this chunk out.
|
// Couldn't send this chunk out.
|
||||||
t.sendQuotaPool.add(size)
|
t.sendQuotaPool.add(size)
|
||||||
localSendQuota += size
|
t.localSendQuota.add(ltq)
|
||||||
streamQuota = 0
|
streamQuota = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -737,9 +734,6 @@ func (t *http2Client) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||||||
if streamQuota > 0 { // Add the left over quota back to stream.
|
if streamQuota > 0 { // Add the left over quota back to stream.
|
||||||
s.sendQuotaPool.add(streamQuota)
|
s.sendQuotaPool.add(streamQuota)
|
||||||
}
|
}
|
||||||
if localSendQuota > 0 {
|
|
||||||
s.localSendQuota.add(localSendQuota)
|
|
||||||
}
|
|
||||||
if !opts.Last {
|
if !opts.Last {
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@ -70,6 +70,9 @@ type http2Server struct {
|
|||||||
fc *inFlow
|
fc *inFlow
|
||||||
// sendQuotaPool provides flow control to outbound message.
|
// sendQuotaPool provides flow control to outbound message.
|
||||||
sendQuotaPool *quotaPool
|
sendQuotaPool *quotaPool
|
||||||
|
// localSendQuota limits the amount of data that can be scheduled
|
||||||
|
// for writing before it is actually written out.
|
||||||
|
localSendQuota *quotaPool
|
||||||
stats stats.Handler
|
stats stats.Handler
|
||||||
// Flag to keep track of reading activity on transport.
|
// Flag to keep track of reading activity on transport.
|
||||||
// 1 is true and 0 is false.
|
// 1 is true and 0 is false.
|
||||||
@ -199,6 +202,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
|||||||
controlBuf: newControlBuffer(),
|
controlBuf: newControlBuffer(),
|
||||||
fc: &inFlow{limit: uint32(icwz)},
|
fc: &inFlow{limit: uint32(icwz)},
|
||||||
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
||||||
|
localSendQuota: newQuotaPool(defaultLocalSendQuota),
|
||||||
state: reachable,
|
state: reachable,
|
||||||
activeStreams: make(map[uint32]*Stream),
|
activeStreams: make(map[uint32]*Stream),
|
||||||
streamSendQuota: defaultWindowSize,
|
streamSendQuota: defaultWindowSize,
|
||||||
@ -316,7 +320,6 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
|||||||
}
|
}
|
||||||
t.maxStreamID = streamID
|
t.maxStreamID = streamID
|
||||||
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
|
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
|
||||||
s.localSendQuota = newQuotaPool(defaultLocalSendQuota)
|
|
||||||
t.activeStreams[streamID] = s
|
t.activeStreams[streamID] = s
|
||||||
if len(t.activeStreams) == 1 {
|
if len(t.activeStreams) == 1 {
|
||||||
t.idle = time.Time{}
|
t.idle = time.Time{}
|
||||||
@ -346,6 +349,10 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
|||||||
t.updateWindow(s, uint32(n))
|
t.updateWindow(s, uint32(n))
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
|
s.waiters = waiters{
|
||||||
|
ctx: s.ctx,
|
||||||
|
tctx: t.ctx,
|
||||||
|
}
|
||||||
handle(s)
|
handle(s)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -861,9 +868,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||||||
var (
|
var (
|
||||||
streamQuota int
|
streamQuota int
|
||||||
streamQuotaVer uint32
|
streamQuotaVer uint32
|
||||||
localSendQuota int
|
|
||||||
err error
|
err error
|
||||||
sqChan <-chan int
|
|
||||||
)
|
)
|
||||||
for _, r := range [][]byte{hdr, data} {
|
for _, r := range [][]byte{hdr, data} {
|
||||||
for len(r) > 0 {
|
for len(r) > 0 {
|
||||||
@ -872,43 +877,38 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||||||
size = len(r)
|
size = len(r)
|
||||||
}
|
}
|
||||||
if streamQuota == 0 { // Used up all the locally cached stream quota.
|
if streamQuota == 0 { // Used up all the locally cached stream quota.
|
||||||
sqChan, streamQuotaVer = s.sendQuotaPool.acquireWithVersion()
|
// Get all the stream quota there is.
|
||||||
// Wait until the stream has some quota to send the data.
|
streamQuota, streamQuotaVer, err = s.sendQuotaPool.get(math.MaxInt32, s.waiters)
|
||||||
streamQuota, err = wait(s.ctx, t.ctx, nil, nil, sqChan)
|
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if localSendQuota <= 0 {
|
|
||||||
localSendQuota, err = wait(s.ctx, t.ctx, nil, nil, s.localSendQuota.acquire())
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
if size > streamQuota {
|
if size > streamQuota {
|
||||||
size = streamQuota
|
size = streamQuota
|
||||||
} // No need to do that for localSendQuota since that's only a soft limit.
|
}
|
||||||
// Wait until the transport has some quota to send the data.
|
// Get size worth quota from transport.
|
||||||
tq, err := wait(s.ctx, t.ctx, nil, nil, t.sendQuotaPool.acquire())
|
tq, _, err := t.sendQuotaPool.get(size, s.waiters)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
if tq < size {
|
if tq < size {
|
||||||
size = tq
|
size = tq
|
||||||
}
|
}
|
||||||
if tq > size {
|
ltq, _, err := t.localSendQuota.get(size, s.waiters)
|
||||||
t.sendQuotaPool.add(tq - size)
|
if err != nil {
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
// even if ltq is smaller than size we don't adjust size since,
|
||||||
|
// ltq is only a soft limit.
|
||||||
streamQuota -= size
|
streamQuota -= size
|
||||||
localSendQuota -= size
|
|
||||||
p := r[:size]
|
p := r[:size]
|
||||||
// Reset ping strikes when sending data since this might cause
|
// Reset ping strikes when sending data since this might cause
|
||||||
// the peer to send ping.
|
// the peer to send ping.
|
||||||
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
atomic.StoreUint32(&t.resetPingStrikes, 1)
|
||||||
success := func() {
|
success := func() {
|
||||||
sz := size
|
ltq := ltq
|
||||||
t.controlBuf.put(&dataFrame{streamID: s.id, endStream: false, d: p, f: func() {
|
t.controlBuf.put(&dataFrame{streamID: s.id, endStream: false, d: p, f: func() {
|
||||||
s.localSendQuota.add(sz)
|
t.localSendQuota.add(ltq)
|
||||||
}})
|
}})
|
||||||
r = r[size:]
|
r = r[size:]
|
||||||
}
|
}
|
||||||
@ -919,7 +919,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||||||
if !s.sendQuotaPool.compareAndExecute(streamQuotaVer, success, failure) {
|
if !s.sendQuotaPool.compareAndExecute(streamQuotaVer, success, failure) {
|
||||||
// Couldn't send this chunk out.
|
// Couldn't send this chunk out.
|
||||||
t.sendQuotaPool.add(size)
|
t.sendQuotaPool.add(size)
|
||||||
localSendQuota += size
|
t.localSendQuota.add(ltq)
|
||||||
streamQuota = 0
|
streamQuota = 0
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -928,9 +928,6 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
|
|||||||
// ADd the left over quota back to stream.
|
// ADd the left over quota back to stream.
|
||||||
s.sendQuotaPool.add(streamQuota)
|
s.sendQuotaPool.add(streamQuota)
|
||||||
}
|
}
|
||||||
if localSendQuota > 0 {
|
|
||||||
s.localSendQuota.add(localSendQuota)
|
|
||||||
}
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@ -229,6 +229,7 @@ type Stream struct {
|
|||||||
trReader io.Reader
|
trReader io.Reader
|
||||||
fc *inFlow
|
fc *inFlow
|
||||||
recvQuota uint32
|
recvQuota uint32
|
||||||
|
waiters waiters
|
||||||
|
|
||||||
// TODO: Remote this unused variable.
|
// TODO: Remote this unused variable.
|
||||||
// The accumulated inbound quota pending for window update.
|
// The accumulated inbound quota pending for window update.
|
||||||
@ -239,7 +240,6 @@ type Stream struct {
|
|||||||
requestRead func(int)
|
requestRead func(int)
|
||||||
|
|
||||||
sendQuotaPool *quotaPool
|
sendQuotaPool *quotaPool
|
||||||
localSendQuota *quotaPool
|
|
||||||
// Close headerChan to indicate the end of reception of header metadata.
|
// Close headerChan to indicate the end of reception of header metadata.
|
||||||
headerChan chan struct{}
|
headerChan chan struct{}
|
||||||
// header caches the received header metadata.
|
// header caches the received header metadata.
|
||||||
@ -703,25 +703,13 @@ func (e StreamError) Error() string {
|
|||||||
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
|
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
|
||||||
}
|
}
|
||||||
|
|
||||||
// wait blocks until it can receive from one of the provided contexts or
|
// waiters are passed to quotaPool get methods to
|
||||||
// channels. ctx is the context of the RPC, tctx is the context of the
|
// wait on in addition to waiting on quota.
|
||||||
// transport, done is a channel closed to indicate the end of the RPC, goAway
|
type waiters struct {
|
||||||
// is a channel closed to indicate a GOAWAY was received, and proceed is a
|
ctx context.Context
|
||||||
// quota channel, whose received value is returned from this function if none
|
tctx context.Context
|
||||||
// of the other signals occur first.
|
done chan struct{}
|
||||||
func wait(ctx, tctx context.Context, done, goAway <-chan struct{}, proceed <-chan int) (int, error) {
|
goAway chan struct{}
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return 0, ContextErr(ctx.Err())
|
|
||||||
case <-done:
|
|
||||||
return 0, io.EOF
|
|
||||||
case <-goAway:
|
|
||||||
return 0, ErrStreamDrain
|
|
||||||
case <-tctx.Done():
|
|
||||||
return 0, ErrConnClosing
|
|
||||||
case i := <-proceed:
|
|
||||||
return i, nil
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GoAwayReason contains the reason for the GoAway frame received.
|
// GoAwayReason contains the reason for the GoAway frame received.
|
||||||
|
|||||||
@ -1093,44 +1093,29 @@ func TestMaxStreams(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
var failureReason string
|
|
||||||
// Test these conditions untill they pass or
|
// Test these conditions untill they pass or
|
||||||
// we reach the deadline (failure case).
|
// we reach the deadline (failure case).
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-ch:
|
case <-ch:
|
||||||
case <-done:
|
case <-done:
|
||||||
t.Fatalf(failureReason)
|
t.Fatalf("streamsQuota.quota shouldn't be non-zero.")
|
||||||
}
|
}
|
||||||
select {
|
|
||||||
case q := <-cc.streamsQuota.acquire():
|
|
||||||
failureReason = "streamsQuota.acquire() becomes readable mistakenly."
|
|
||||||
cc.streamsQuota.add(q)
|
|
||||||
default:
|
|
||||||
cc.streamsQuota.mu.Lock()
|
cc.streamsQuota.mu.Lock()
|
||||||
quota := cc.streamsQuota.quota
|
sq := cc.streamsQuota.quota
|
||||||
cc.streamsQuota.mu.Unlock()
|
cc.streamsQuota.mu.Unlock()
|
||||||
if quota != 0 {
|
if sq == 0 {
|
||||||
failureReason = "streamsQuota.quota got non-zero quota mistakenly."
|
|
||||||
} else {
|
|
||||||
failureReason = ""
|
|
||||||
}
|
|
||||||
}
|
|
||||||
if failureReason == "" {
|
|
||||||
break
|
break
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
close(ready)
|
close(ready)
|
||||||
// Close the pending stream so that the streams quota becomes available for the next new stream.
|
// Close the pending stream so that the streams quota becomes available for the next new stream.
|
||||||
ct.CloseStream(s, nil)
|
ct.CloseStream(s, nil)
|
||||||
select {
|
cc.streamsQuota.mu.Lock()
|
||||||
case i := <-cc.streamsQuota.acquire():
|
i := cc.streamsQuota.quota
|
||||||
|
cc.streamsQuota.mu.Unlock()
|
||||||
if i != 1 {
|
if i != 1 {
|
||||||
t.Fatalf("streamsQuota.acquire() got %d quota, want 1.", i)
|
t.Fatalf("streamsQuota is %d, want 1.", i)
|
||||||
}
|
|
||||||
cc.streamsQuota.add(i)
|
|
||||||
default:
|
|
||||||
t.Fatalf("streamsQuota.acquire() is not readable.")
|
|
||||||
}
|
}
|
||||||
if _, err := ct.NewStream(context.Background(), callHdr); err != nil {
|
if _, err := ct.NewStream(context.Background(), callHdr); err != nil {
|
||||||
t.Fatalf("Failed to open stream: %v", err)
|
t.Fatalf("Failed to open stream: %v", err)
|
||||||
@ -1685,7 +1670,12 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
|
|||||||
})
|
})
|
||||||
|
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
serverSendQuota, err := wait(ctx, context.Background(), nil, nil, st.sendQuotaPool.acquire())
|
serverSendQuota, _, err := st.sendQuotaPool.get(math.MaxInt32, waiters{
|
||||||
|
ctx: ctx,
|
||||||
|
tctx: st.ctx,
|
||||||
|
done: nil,
|
||||||
|
goAway: nil,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error while acquiring sendQuota on server. Err: %v", err)
|
t.Fatalf("Error while acquiring sendQuota on server. Err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1707,7 +1697,12 @@ func testAccountCheckWindowSize(t *testing.T, wc windowSizeConfig) {
|
|||||||
t.Fatalf("Client transport flow control window size is %v, want %v", limit, connectOptions.InitialConnWindowSize)
|
t.Fatalf("Client transport flow control window size is %v, want %v", limit, connectOptions.InitialConnWindowSize)
|
||||||
}
|
}
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
||||||
clientSendQuota, err := wait(ctx, context.Background(), nil, nil, ct.sendQuotaPool.acquire())
|
clientSendQuota, _, err := ct.sendQuotaPool.get(math.MaxInt32, waiters{
|
||||||
|
ctx: ctx,
|
||||||
|
tctx: ct.ctx,
|
||||||
|
done: nil,
|
||||||
|
goAway: nil,
|
||||||
|
})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error while acquiring sendQuota on client. Err: %v", err)
|
t.Fatalf("Error while acquiring sendQuota on client. Err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1849,7 +1844,12 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
|
|||||||
|
|
||||||
// Check flow conrtrol window on client stream is equal to out flow on server stream.
|
// Check flow conrtrol window on client stream is equal to out flow on server stream.
|
||||||
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
|
||||||
serverStreamSendQuota, err := wait(ctx, context.Background(), nil, nil, sstream.sendQuotaPool.acquire())
|
serverStreamSendQuota, _, err := sstream.sendQuotaPool.get(math.MaxInt32, waiters{
|
||||||
|
ctx: ctx,
|
||||||
|
tctx: context.Background(),
|
||||||
|
done: nil,
|
||||||
|
goAway: nil,
|
||||||
|
})
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, fmt.Errorf("error while acquiring server stream send quota. Err: %v", err)
|
return true, fmt.Errorf("error while acquiring server stream send quota. Err: %v", err)
|
||||||
@ -1864,7 +1864,12 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
|
|||||||
|
|
||||||
// Check flow control window on server stream is equal to out flow on client stream.
|
// Check flow control window on server stream is equal to out flow on client stream.
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
||||||
clientStreamSendQuota, err := wait(ctx, context.Background(), nil, nil, cstream.sendQuotaPool.acquire())
|
clientStreamSendQuota, _, err := cstream.sendQuotaPool.get(math.MaxInt32, waiters{
|
||||||
|
ctx: ctx,
|
||||||
|
tctx: context.Background(),
|
||||||
|
done: nil,
|
||||||
|
goAway: nil,
|
||||||
|
})
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, fmt.Errorf("error while acquiring client stream send quota. Err: %v", err)
|
return true, fmt.Errorf("error while acquiring client stream send quota. Err: %v", err)
|
||||||
@ -1879,7 +1884,12 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
|
|||||||
|
|
||||||
// Check flow control window on client transport is equal to out flow of server transport.
|
// Check flow control window on client transport is equal to out flow of server transport.
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
||||||
serverTrSendQuota, err := wait(ctx, context.Background(), nil, nil, st.sendQuotaPool.acquire())
|
serverTrSendQuota, _, err := st.sendQuotaPool.get(math.MaxInt32, waiters{
|
||||||
|
ctx: ctx,
|
||||||
|
tctx: st.ctx,
|
||||||
|
done: nil,
|
||||||
|
goAway: nil,
|
||||||
|
})
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, fmt.Errorf("error while acquring server transport send quota. Err: %v", err)
|
return true, fmt.Errorf("error while acquring server transport send quota. Err: %v", err)
|
||||||
@ -1894,7 +1904,12 @@ func TestAccountCheckExpandingWindow(t *testing.T) {
|
|||||||
|
|
||||||
// Check flow control window on server transport is equal to out flow of client transport.
|
// Check flow control window on server transport is equal to out flow of client transport.
|
||||||
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second)
|
||||||
clientTrSendQuota, err := wait(ctx, context.Background(), nil, nil, ct.sendQuotaPool.acquire())
|
clientTrSendQuota, _, err := ct.sendQuotaPool.get(math.MaxInt32, waiters{
|
||||||
|
ctx: ctx,
|
||||||
|
tctx: ct.ctx,
|
||||||
|
done: nil,
|
||||||
|
goAway: nil,
|
||||||
|
})
|
||||||
cancel()
|
cancel()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return true, fmt.Errorf("error while acquiring client transport send quota. Err: %v", err)
|
return true, fmt.Errorf("error while acquiring client transport send quota. Err: %v", err)
|
||||||
|
|||||||
Reference in New Issue
Block a user