diff --git a/server.go b/server.go index 55295fda..714f7542 100644 --- a/server.go +++ b/server.go @@ -96,7 +96,7 @@ type options struct { maxConcurrentStreams uint32 } -// ServerOption sets options. +// A ServerOption sets options. type ServerOption func(*options) // MaxConcurrentStreams returns an Option that will apply a limit on the number @@ -312,7 +312,7 @@ func (s *Server) handleStream(t transport.ServerTransport, stream *transport.Str t.WriteStatus(stream, codes.Unimplemented, fmt.Sprintf("unknown method %v", method)) } -// Stop stops the gRPC server. Once it returns, the server stops accepting +// Stop stops the gRPC server. Once Stop returns, the server stops accepting // connection requests and closes all the connected connections. func (s *Server) Stop() { s.mu.Lock() @@ -329,9 +329,9 @@ func (s *Server) Stop() { } } -// CloseConns closes all exiting transports but keeps s.lis accepting new +// TestingCloseConns closes all exiting transports but keeps s.lis accepting new // connections. This is for test only now. -func (s *Server) CloseConns() { +func (s *Server) TestingCloseConns() { s.mu.Lock() for c := range s.conns { c.Close() diff --git a/stream.go b/stream.go index 73681e9e..53baacac 100644 --- a/stream.go +++ b/stream.go @@ -47,12 +47,17 @@ import ( type Stream interface { // Context returns the context for this stream. Context() context.Context - // SendProto blocks until it sends a proto message out or some - // error happens. - SendProto(proto.Message) error - // RecvProto blocks until either a proto message is received or some - // error happens. - RecvProto(proto.Message) error + // SendProto blocks until it sends m, the stream is done or the stream + // breaks. + // On error, it aborts the stream and returns an RPC status on client + // side. On server side, it simply returns the error to the caller. + // SendProto is called by generated code. + SendProto(m proto.Message) error + // RecvProto blocks until it receives a proto message or the stream is + // done. On client side, it returns io.EOF when the stream is done. On + // any other error, it aborts the streama nd returns an RPC status. On + // server side, it simply returns the error to the caller. + RecvProto(m proto.Message) error } // ClientStream defines the interface a client stream has to satify. @@ -64,9 +69,11 @@ type ClientStream interface { // after stream.Recv() returns non-nil error (including io.EOF) for // bi-directional streaming and server streaming or stream.CloseAndRecv() // returns for client streaming in order to receive trailer metadata if - // present. + // present. Otherwise, it could returns an empty MD even though trailer + // is present. Trailer() metadata.MD - // CloseSend closes the send direction of the stream. + // CloseSend closes the send direction of the stream. It closes the stream + // when non-nil error is met. CloseSend() error Stream } @@ -101,29 +108,18 @@ type clientStream struct { p *parser } -// Context returns the clientStream's associated context. func (cs *clientStream) Context() context.Context { return cs.s.Context() } -// Header returns the header metedata received from the server if there -// is any. Empty metadata.MD is returned if there is no header metadata. -// It blocks if the metadata is not ready to read. func (cs *clientStream) Header() (md metadata.MD, err error) { return cs.s.Header() } -// Trailer returns the trailer metadata from the server. It must be called -// after stream.Recv() returns non-nil error (including io.EOF) for -// bi-directional streaming and server streaming or stream.CloseAndRecv() -// returns for client streaming in order to receive trailer metadata if -// present. func (cs *clientStream) Trailer() metadata.MD { return cs.s.Trailer() } -// SendProto blocks until m is sent out or an error happens. It closes the -// stream when a non-nil error is met. This is called by generated code. func (cs *clientStream) SendProto(m proto.Message) (err error) { defer func() { if err == nil || err == io.EOF { @@ -141,10 +137,6 @@ func (cs *clientStream) SendProto(m proto.Message) (err error) { return cs.t.Write(cs.s, out, &transport.Options{Last: false}) } -// RecvProto blocks until it receives a proto message or an error happens. -// When an non-nil error (including EOF which indicates the success of an -// RPC) is met, it also closes the stream and returns the RPC status to -// the caller. This is called by generated code. func (cs *clientStream) RecvProto(m proto.Message) (err error) { err = recvProto(cs.p, m) if err == nil { @@ -163,8 +155,6 @@ func (cs *clientStream) RecvProto(m proto.Message) (err error) { return toRPCErr(err) } -// CloseSend closes the send direction of the stream. It closes the stream -// when non-nil error is met. func (cs *clientStream) CloseSend() (err error) { err = cs.t.Write(cs.s, nil, &transport.Options{Last: true}) if err == nil || err == io.EOF { @@ -180,7 +170,8 @@ func (cs *clientStream) CloseSend() (err error) { // ServerStream defines the interface a server stream has to satisfy. type ServerStream interface { // SendHeader sends the header metadata. It should not be called - // after SendProto. + // after SendProto. It fails if called multiple times or if + // called after SendProto. SendHeader(metadata.MD) error // SetTrailer sets the trailer metadata which will be sent with the // RPC status. @@ -197,20 +188,14 @@ type serverStream struct { statusDesc string } -// Context returns the associated context so that server applications can -// manipulate it. func (ss *serverStream) Context() context.Context { return ss.s.Context() } -// SendHeader sends header metadata. It fails if called multiple times or if -// called after SendProto. func (ss *serverStream) SendHeader(md metadata.MD) error { return ss.t.WriteHeader(ss.s, md) } -// SetTrailer sends trailer metadata. The metadata will be sent with the final -// RPC status. func (ss *serverStream) SetTrailer(md metadata.MD) { if md.Len() == 0 { return @@ -219,8 +204,6 @@ func (ss *serverStream) SetTrailer(md metadata.MD) { return } -// SendProto blocks until m is sent out or an error is met. This is called by -// generated code. func (ss *serverStream) SendProto(m proto.Message) error { out, err := encode(m, compressionNone) if err != nil { @@ -230,8 +213,6 @@ func (ss *serverStream) SendProto(m proto.Message) error { return ss.t.Write(ss.s, out, &transport.Options{Last: false}) } -// RecvProto blocks until it receives a message or an error is met. This is -// called by generated code. func (ss *serverStream) RecvProto(m proto.Message) error { return recvProto(ss.p, m) } diff --git a/test/end2end_test.go b/test/end2end_test.go index 75eb8622..b0951454 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -318,7 +318,7 @@ func TestRetry(t *testing.T) { // The server shuts down the network connection to make a // transport error which will be detected by the client side // code. - s.CloseConns() + s.TestingCloseConns() wg.Done() }() // All these RPCs should succeed eventually.