From 40cd6b15e2880b88deb091b0fcb091694285fcb0 Mon Sep 17 00:00:00 2001 From: dfawley Date: Tue, 3 Jul 2018 14:07:07 -0700 Subject: [PATCH] stream: in withRetry, block until Status is valid and check on io.EOF (#2199) Also, return an error if SendMsg is called after CloseSend. --- stream.go | 9 +++++++-- transport/transport.go | 2 +- 2 files changed, 8 insertions(+), 3 deletions(-) diff --git a/stream.go b/stream.go index b23acaf3..da478938 100644 --- a/stream.go +++ b/stream.go @@ -542,7 +542,10 @@ func (cs *clientStream) withRetry(op func(a *csAttempt) error, onSuccess func()) // We started another attempt already. continue } - if err == nil || err == io.EOF { + if err == io.EOF { + <-a.s.Done() + } + if err == nil || (err == io.EOF && a.s.Status().Code() == codes.OK) { onSuccess() cs.mu.Unlock() return err @@ -616,7 +619,9 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) { cs.finish(err) } }() - // TODO: Check cs.sentLast and error if we already ended the stream. + if cs.sentLast { + return status.Errorf(codes.Internal, "SendMsg called after CloseSend") + } if !cs.desc.ClientStreams { cs.sentLast = true } diff --git a/transport/transport.go b/transport/transport.go index d409356f..62d6e6bb 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -334,7 +334,7 @@ func (s *Stream) Method() string { // Status returns the status received from the server. // Status can be read safely only after the stream has ended, -// that is, read or write has returned io.EOF. +// that is, after Done() is closed. func (s *Stream) Status() *status.Status { return s.status }