stream: in withRetry, block until Status is valid and check on io.EOF (#2199)
Also, return an error if SendMsg is called after CloseSend.
This commit is contained in:
@ -542,7 +542,10 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func())
|
|||||||
// We started another attempt already.
|
// We started another attempt already.
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
if err == nil || err == io.EOF {
|
if err == io.EOF {
|
||||||
|
<-a.s.Done()
|
||||||
|
}
|
||||||
|
if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) {
|
||||||
onSuccess()
|
onSuccess()
|
||||||
cs.mu.Unlock()
|
cs.mu.Unlock()
|
||||||
return err
|
return err
|
||||||
@ -616,7 +619,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
|
|||||||
cs.finish(err)
|
cs.finish(err)
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
// TODO: Check cs.sentLast and error if we already ended the stream.
|
if cs.sentLast {
|
||||||
|
return status.Errorf(codes.Internal, "SendMsg called after CloseSend")
|
||||||
|
}
|
||||||
if !cs.desc.ClientStreams {
|
if !cs.desc.ClientStreams {
|
||||||
cs.sentLast = true
|
cs.sentLast = true
|
||||||
}
|
}
|
||||||
|
@ -334,7 +334,7 @@ func (s *Stream) Method() string {
|
|||||||
|
|
||||||
// Status returns the status received from the server.
|
// Status returns the status received from the server.
|
||||||
// Status can be read safely only after the stream has ended,
|
// Status can be read safely only after the stream has ended,
|
||||||
// that is, read or write has returned io.EOF.
|
// that is, after Done() is closed.
|
||||||
func (s *Stream) Status() *status.Status {
|
func (s *Stream) Status() *status.Status {
|
||||||
return s.status
|
return s.status
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user