diff --git a/transport/transport.go b/transport/transport.go index dad69be6..594ccd6a 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -67,20 +67,20 @@ func newRecvBuffer() *recvBuffer { func (b *recvBuffer) put(r recvMsg) { b.mu.Lock() - defer b.mu.Unlock() if len(b.backlog) == 0 { select { case b.c <- r: + b.mu.Unlock() return default: } } b.backlog = append(b.backlog, r) + b.mu.Unlock() } func (b *recvBuffer) load() { b.mu.Lock() - defer b.mu.Unlock() if len(b.backlog) > 0 { select { case b.c <- b.backlog[0]: @@ -89,6 +89,7 @@ func (b *recvBuffer) load() { default: } } + b.mu.Unlock() } // get returns the channel that receives a recvMsg in the buffer. @@ -164,20 +165,20 @@ func newControlBuffer() *controlBuffer { func (b *controlBuffer) put(r item) { b.mu.Lock() - defer b.mu.Unlock() if len(b.backlog) == 0 { select { case b.c <- r: + b.mu.Unlock() return default: } } b.backlog = append(b.backlog, r) + b.mu.Unlock() } func (b *controlBuffer) load() { b.mu.Lock() - defer b.mu.Unlock() if len(b.backlog) > 0 { select { case b.c <- b.backlog[0]: @@ -186,6 +187,7 @@ func (b *controlBuffer) load() { default: } } + b.mu.Unlock() } // get returns the channel that receives an item in the buffer. @@ -314,8 +316,9 @@ func (s *Stream) Header() (metadata.MD, error) { // side only. func (s *Stream) Trailer() metadata.MD { s.mu.RLock() - defer s.mu.RUnlock() - return s.trailer.Copy() + c := s.trailer.Copy() + s.mu.RUnlock() + return c } // ServerTransport returns the underlying ServerTransport for the stream. @@ -343,14 +346,16 @@ func (s *Stream) Status() *status.Status { // Server side only. func (s *Stream) SetHeader(md metadata.MD) error { s.mu.Lock() - defer s.mu.Unlock() if s.headerOk || s.state == streamDone { + s.mu.Unlock() return ErrIllegalHeaderWrite } if md.Len() == 0 { + s.mu.Unlock() return nil } s.header = metadata.Join(s.header, md) + s.mu.Unlock() return nil } @@ -361,8 +366,8 @@ func (s *Stream) SetTrailer(md metadata.MD) error { return nil } s.mu.Lock() - defer s.mu.Unlock() s.trailer = metadata.Join(s.trailer, md) + s.mu.Unlock() return nil } @@ -413,15 +418,17 @@ func (s *Stream) finish(st *status.Status) { // BytesSent indicates whether any bytes have been sent on this stream. func (s *Stream) BytesSent() bool { s.mu.Lock() - defer s.mu.Unlock() - return s.bytesSent + bs := s.bytesSent + s.mu.Unlock() + return bs } // BytesReceived indicates whether any bytes have been received on this stream. func (s *Stream) BytesReceived() bool { s.mu.Lock() - defer s.mu.Unlock() - return s.bytesReceived + br := s.bytesReceived + s.mu.Unlock() + return br } // GoString is implemented by Stream so context.String() won't