transport: remove some defers (#1538)

This commit is contained in:
dfawley
2017-09-25 15:21:26 -07:00
committed by GitHub
parent 956d689a24
commit d555c82380

View File

@ -67,20 +67,20 @@ func newRecvBuffer() *recvBuffer {
func (b *recvBuffer) put(r recvMsg) { func (b *recvBuffer) put(r recvMsg) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) == 0 { if len(b.backlog) == 0 {
select { select {
case b.c <- r: case b.c <- r:
b.mu.Unlock()
return return
default: default:
} }
} }
b.backlog = append(b.backlog, r) b.backlog = append(b.backlog, r)
b.mu.Unlock()
} }
func (b *recvBuffer) load() { func (b *recvBuffer) load() {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) > 0 { if len(b.backlog) > 0 {
select { select {
case b.c <- b.backlog[0]: case b.c <- b.backlog[0]:
@ -89,6 +89,7 @@ func (b *recvBuffer) load() {
default: default:
} }
} }
b.mu.Unlock()
} }
// get returns the channel that receives a recvMsg in the buffer. // get returns the channel that receives a recvMsg in the buffer.
@ -164,20 +165,20 @@ func newControlBuffer() *controlBuffer {
func (b *controlBuffer) put(r item) { func (b *controlBuffer) put(r item) {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) == 0 { if len(b.backlog) == 0 {
select { select {
case b.c <- r: case b.c <- r:
b.mu.Unlock()
return return
default: default:
} }
} }
b.backlog = append(b.backlog, r) b.backlog = append(b.backlog, r)
b.mu.Unlock()
} }
func (b *controlBuffer) load() { func (b *controlBuffer) load() {
b.mu.Lock() b.mu.Lock()
defer b.mu.Unlock()
if len(b.backlog) > 0 { if len(b.backlog) > 0 {
select { select {
case b.c <- b.backlog[0]: case b.c <- b.backlog[0]:
@ -186,6 +187,7 @@ func (b *controlBuffer) load() {
default: default:
} }
} }
b.mu.Unlock()
} }
// get returns the channel that receives an item in the buffer. // get returns the channel that receives an item in the buffer.
@ -314,8 +316,9 @@ func (s *Stream) Header() (metadata.MD, error) {
// side only. // side only.
func (s *Stream) Trailer() metadata.MD { func (s *Stream) Trailer() metadata.MD {
s.mu.RLock() s.mu.RLock()
defer s.mu.RUnlock() c := s.trailer.Copy()
return s.trailer.Copy() s.mu.RUnlock()
return c
} }
// ServerTransport returns the underlying ServerTransport for the stream. // ServerTransport returns the underlying ServerTransport for the stream.
@ -343,14 +346,16 @@ func (s *Stream) Status() *status.Status {
// Server side only. // Server side only.
func (s *Stream) SetHeader(md metadata.MD) error { func (s *Stream) SetHeader(md metadata.MD) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock()
if s.headerOk || s.state == streamDone { if s.headerOk || s.state == streamDone {
s.mu.Unlock()
return ErrIllegalHeaderWrite return ErrIllegalHeaderWrite
} }
if md.Len() == 0 { if md.Len() == 0 {
s.mu.Unlock()
return nil return nil
} }
s.header = metadata.Join(s.header, md) s.header = metadata.Join(s.header, md)
s.mu.Unlock()
return nil return nil
} }
@ -361,8 +366,8 @@ func (s *Stream) SetTrailer(md metadata.MD) error {
return nil return nil
} }
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock()
s.trailer = metadata.Join(s.trailer, md) s.trailer = metadata.Join(s.trailer, md)
s.mu.Unlock()
return nil return nil
} }
@ -413,15 +418,17 @@ func (s *Stream) finish(st *status.Status) {
// BytesSent indicates whether any bytes have been sent on this stream. // BytesSent indicates whether any bytes have been sent on this stream.
func (s *Stream) BytesSent() bool { func (s *Stream) BytesSent() bool {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() bs := s.bytesSent
return s.bytesSent s.mu.Unlock()
return bs
} }
// BytesReceived indicates whether any bytes have been received on this stream. // BytesReceived indicates whether any bytes have been received on this stream.
func (s *Stream) BytesReceived() bool { func (s *Stream) BytesReceived() bool {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() br := s.bytesReceived
return s.bytesReceived s.mu.Unlock()
return br
} }
// GoString is implemented by Stream so context.String() won't // GoString is implemented by Stream so context.String() won't