Fix flaky test: TestClientStreamingError (#2192)
This commit is contained in:
14
stream.go
14
stream.go
@ -413,6 +413,10 @@ func (cs *clientStream) shouldRetry(err error) error {
|
|||||||
// RPC is finished or committed; cannot retry.
|
// RPC is finished or committed; cannot retry.
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
// Wait for the trailers.
|
||||||
|
if cs.attempt.s != nil {
|
||||||
|
<-cs.attempt.s.Done()
|
||||||
|
}
|
||||||
if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
|
if cs.firstAttempt && !cs.callInfo.failFast && (cs.attempt.s == nil || cs.attempt.s.Unprocessed()) {
|
||||||
// First attempt, wait-for-ready, stream unprocessed: transparently retry.
|
// First attempt, wait-for-ready, stream unprocessed: transparently retry.
|
||||||
cs.firstAttempt = false
|
cs.firstAttempt = false
|
||||||
@ -502,11 +506,11 @@ func (cs *clientStream) shouldRetry(err error) error {
|
|||||||
// Returns nil if a retry was performed and succeeded; error otherwise.
|
// Returns nil if a retry was performed and succeeded; error otherwise.
|
||||||
func (cs *clientStream) retryLocked(lastErr error) error {
|
func (cs *clientStream) retryLocked(lastErr error) error {
|
||||||
for {
|
for {
|
||||||
|
cs.attempt.finish(lastErr)
|
||||||
if err := cs.shouldRetry(lastErr); err != nil {
|
if err := cs.shouldRetry(lastErr); err != nil {
|
||||||
cs.commitAttemptLocked()
|
cs.commitAttemptLocked()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
cs.attempt.finish(lastErr)
|
|
||||||
if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
|
if err := cs.newAttemptLocked(nil, traceInfo{}); err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
@ -641,12 +645,7 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
|||||||
|
|
||||||
func (cs *clientStream) RecvMsg(m interface{}) error {
|
func (cs *clientStream) RecvMsg(m interface{}) error {
|
||||||
err := cs.withRetry(func(a *csAttempt) error {
|
err := cs.withRetry(func(a *csAttempt) error {
|
||||||
err := a.recvMsg(m)
|
return a.recvMsg(m)
|
||||||
if err != nil || !cs.desc.ServerStreams {
|
|
||||||
// err != nil or non-server-streaming indicates end of stream.
|
|
||||||
a.finish(err)
|
|
||||||
}
|
|
||||||
return err
|
|
||||||
}, cs.commitAttemptLocked)
|
}, cs.commitAttemptLocked)
|
||||||
if err != nil || !cs.desc.ServerStreams {
|
if err != nil || !cs.desc.ServerStreams {
|
||||||
// err != nil or non-server-streaming indicates end of stream.
|
// err != nil or non-server-streaming indicates end of stream.
|
||||||
@ -796,6 +795,7 @@ func (a *csAttempt) recvMsg(m interface{}) (err error) {
|
|||||||
func (a *csAttempt) finish(err error) {
|
func (a *csAttempt) finish(err error) {
|
||||||
a.mu.Lock()
|
a.mu.Lock()
|
||||||
if a.finished {
|
if a.finished {
|
||||||
|
a.mu.Unlock()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
a.finished = true
|
a.finished = true
|
||||||
|
Reference in New Issue
Block a user