This commit is contained in:
yangzhouhan
2015-07-28 12:50:19 -07:00
4 changed files with 29 additions and 35 deletions

View File

@ -55,6 +55,8 @@ var (
// ErrClientConnTimeout indicates that the connection could not be // ErrClientConnTimeout indicates that the connection could not be
// established or re-established within the specified timeout. // established or re-established within the specified timeout.
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect") ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
// minimum time to give a connection to complete
minConnectTimeout = 20 * time.Second
) )
// dialOptions configure a Dial call. dialOptions are set by the DialOption // dialOptions configure a Dial call. dialOptions are set by the DialOption
@ -211,9 +213,21 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
return ErrClientConnTimeout return ErrClientConnTimeout
} }
} }
sleepTime := backoff(retries)
timeout := sleepTime
if timeout < minConnectTimeout {
timeout = minConnectTimeout
}
if copts.Timeout == 0 || copts.Timeout > timeout {
copts.Timeout = timeout
}
connectTime := time.Now()
newTransport, err := transport.NewClientTransport(cc.target, &copts) newTransport, err := transport.NewClientTransport(cc.target, &copts)
if err != nil { if err != nil {
sleepTime := backoff(retries) sleepTime -= time.Since(connectTime)
if sleepTime < 0 {
sleepTime = 0
}
// Fail early before falling into sleep. // Fail early before falling into sleep.
if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) { if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) {
cc.Close() cc.Close()

View File

@ -277,28 +277,29 @@ func convertCode(err error) codes.Code {
const ( const (
// how long to wait after the first failure before retrying // how long to wait after the first failure before retrying
baseDelay = 1.0 * time.Second baseDelay = 1.0 * time.Second
// upper bound on backoff delay // upper bound of backoff delay
maxDelay = 120 * time.Second maxDelay = 120 * time.Second
backoffFactor = 2.0 // backoff increases by this factor on each retry // backoff increases by this factor on each retry
backoffRange = 0.4 // backoff is randomized downwards by this factor backoffFactor = 1.6
// backoff is randomized downwards by this factor
backoffJitter = 0.2
) )
// backoff returns a value in [0, maxDelay] that increases exponentially with func backoff(retries int) (t time.Duration) {
// retries, starting from baseDelay. if retries == 0 {
func backoff(retries int) time.Duration { return baseDelay
}
backoff, max := float64(baseDelay), float64(maxDelay) backoff, max := float64(baseDelay), float64(maxDelay)
for backoff < max && retries > 0 { for backoff < max && retries > 0 {
backoff = backoff * backoffFactor backoff *= backoffFactor
retries-- retries--
} }
if backoff > max { if backoff > max {
backoff = max backoff = max
} }
// Randomize backoff delays so that if a cluster of requests start at // Randomize backoff delays so that if a cluster of requests start at
// the same time, they won't operate in lockstep. We just subtract up // the same time, they won't operate in lockstep.
// to 40% so that we obey maxDelay. backoff *= 1 + backoffJitter*(rand.Float64()*2-1)
backoff -= backoff * backoffRange * rand.Float64()
if backoff < 0 { if backoff < 0 {
return 0 return 0
} }

View File

@ -36,10 +36,8 @@ package grpc
import ( import (
"bytes" "bytes"
"io" "io"
"math"
"reflect" "reflect"
"testing" "testing"
"time"
"github.com/golang/protobuf/proto" "github.com/golang/protobuf/proto"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -153,25 +151,6 @@ func TestContextErr(t *testing.T) {
} }
} }
func TestBackoff(t *testing.T) {
for _, test := range []struct {
retries int
maxResult time.Duration
}{
{0, time.Second},
{1, time.Duration(1e9 * math.Pow(backoffFactor, 1))},
{2, time.Duration(1e9 * math.Pow(backoffFactor, 2))},
{3, time.Duration(1e9 * math.Pow(backoffFactor, 3))},
{4, time.Duration(1e9 * math.Pow(backoffFactor, 4))},
{int(math.Log2(float64(maxDelay)/float64(baseDelay))) + 1, maxDelay},
} {
delay := backoff(test.retries)
if delay < 0 || delay > test.maxResult {
t.Errorf("backoff(%d) = %v outside [0, %v]", test.retries, delay, test.maxResult)
}
}
}
// bmEncode benchmarks encoding a Protocol Buffer message containing mSize // bmEncode benchmarks encoding a Protocol Buffer message containing mSize
// bytes. // bytes.
func bmEncode(b *testing.B, mSize int) { func bmEncode(b *testing.B, mSize int) {

View File

@ -555,7 +555,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
// the read direction is closed, and set the status appropriately. // the read direction is closed, and set the status appropriately.
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) { if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
s.mu.Lock() s.mu.Lock()
if (s.state == streamWriteDone) { if s.state == streamWriteDone {
s.state = streamDone s.state = streamDone
} else { } else {
s.state = streamReadDone s.state = streamReadDone