Merge pull request #265 from iamqizhao/master
revise reconnect backoff strategy
This commit is contained in:
@ -55,6 +55,8 @@ var (
|
|||||||
// ErrClientConnTimeout indicates that the connection could not be
|
// ErrClientConnTimeout indicates that the connection could not be
|
||||||
// established or re-established within the specified timeout.
|
// established or re-established within the specified timeout.
|
||||||
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
|
ErrClientConnTimeout = errors.New("grpc: timed out trying to connect")
|
||||||
|
// minimum time to give a connection to complete
|
||||||
|
minConnectTimeout = 20 * time.Second
|
||||||
)
|
)
|
||||||
|
|
||||||
// dialOptions configure a Dial call. dialOptions are set by the DialOption
|
// dialOptions configure a Dial call. dialOptions are set by the DialOption
|
||||||
@ -211,9 +213,21 @@ func (cc *ClientConn) resetTransport(closeTransport bool) error {
|
|||||||
return ErrClientConnTimeout
|
return ErrClientConnTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
sleepTime := backoff(retries)
|
||||||
|
timeout := sleepTime
|
||||||
|
if timeout < minConnectTimeout {
|
||||||
|
timeout = minConnectTimeout
|
||||||
|
}
|
||||||
|
if copts.Timeout == 0 || copts.Timeout > timeout {
|
||||||
|
copts.Timeout = timeout
|
||||||
|
}
|
||||||
|
connectTime := time.Now()
|
||||||
newTransport, err := transport.NewClientTransport(cc.target, &copts)
|
newTransport, err := transport.NewClientTransport(cc.target, &copts)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
sleepTime := backoff(retries)
|
sleepTime -= time.Since(connectTime)
|
||||||
|
if sleepTime < 0 {
|
||||||
|
sleepTime = 0
|
||||||
|
}
|
||||||
// Fail early before falling into sleep.
|
// Fail early before falling into sleep.
|
||||||
if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) {
|
if cc.dopts.copts.Timeout > 0 && cc.dopts.copts.Timeout < sleepTime+time.Since(start) {
|
||||||
cc.Close()
|
cc.Close()
|
||||||
|
23
rpc_util.go
23
rpc_util.go
@ -277,28 +277,29 @@ func convertCode(err error) codes.Code {
|
|||||||
const (
|
const (
|
||||||
// how long to wait after the first failure before retrying
|
// how long to wait after the first failure before retrying
|
||||||
baseDelay = 1.0 * time.Second
|
baseDelay = 1.0 * time.Second
|
||||||
// upper bound on backoff delay
|
// upper bound of backoff delay
|
||||||
maxDelay = 120 * time.Second
|
maxDelay = 120 * time.Second
|
||||||
backoffFactor = 2.0 // backoff increases by this factor on each retry
|
// backoff increases by this factor on each retry
|
||||||
backoffRange = 0.4 // backoff is randomized downwards by this factor
|
backoffFactor = 1.6
|
||||||
|
// backoff is randomized downwards by this factor
|
||||||
|
backoffJitter = 0.2
|
||||||
)
|
)
|
||||||
|
|
||||||
// backoff returns a value in [0, maxDelay] that increases exponentially with
|
func backoff(retries int) (t time.Duration) {
|
||||||
// retries, starting from baseDelay.
|
if retries == 0 {
|
||||||
func backoff(retries int) time.Duration {
|
return baseDelay
|
||||||
|
}
|
||||||
backoff, max := float64(baseDelay), float64(maxDelay)
|
backoff, max := float64(baseDelay), float64(maxDelay)
|
||||||
for backoff < max && retries > 0 {
|
for backoff < max && retries > 0 {
|
||||||
backoff = backoff * backoffFactor
|
backoff *= backoffFactor
|
||||||
retries--
|
retries--
|
||||||
}
|
}
|
||||||
if backoff > max {
|
if backoff > max {
|
||||||
backoff = max
|
backoff = max
|
||||||
}
|
}
|
||||||
|
|
||||||
// Randomize backoff delays so that if a cluster of requests start at
|
// Randomize backoff delays so that if a cluster of requests start at
|
||||||
// the same time, they won't operate in lockstep. We just subtract up
|
// the same time, they won't operate in lockstep.
|
||||||
// to 40% so that we obey maxDelay.
|
backoff *= 1 + backoffJitter*(rand.Float64()*2-1)
|
||||||
backoff -= backoff * backoffRange * rand.Float64()
|
|
||||||
if backoff < 0 {
|
if backoff < 0 {
|
||||||
return 0
|
return 0
|
||||||
}
|
}
|
||||||
|
@ -555,7 +555,7 @@ func (t *http2Client) handleData(f *http2.DataFrame) {
|
|||||||
// the read direction is closed, and set the status appropriately.
|
// the read direction is closed, and set the status appropriately.
|
||||||
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
|
if f.FrameHeader.Flags.Has(http2.FlagDataEndStream) {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
if (s.state == streamWriteDone) {
|
if s.state == streamWriteDone {
|
||||||
s.state = streamDone
|
s.state = streamDone
|
||||||
} else {
|
} else {
|
||||||
s.state = streamReadDone
|
s.state = streamReadDone
|
||||||
|
Reference in New Issue
Block a user