Always CloseStream when some error happen (especailly when reading header metadata returns an error). That is the only way to remove the stream from ClientTransport.activeStreams.
This commit is contained in:
24
call.go
24
call.go
@ -37,26 +37,34 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/transport"
|
"google.golang.org/grpc/transport"
|
||||||
"golang.org/x/net/context"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// recv receives and parses an RPC response.
|
// recv receives and parses an RPC response.
|
||||||
// On error, it returns the error and indicates whether the call should be retried.
|
// On error, it returns the error and indicates whether the call should be retried.
|
||||||
//
|
//
|
||||||
// TODO(zhaoq): Check whether the received message sequence is valid.
|
// TODO(zhaoq): Check whether the received message sequence is valid.
|
||||||
func recv(stream *transport.Stream, reply proto.Message) error {
|
func recv(t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply proto.Message) error {
|
||||||
|
// Try to acquire header metadata from the server if there is any.
|
||||||
|
var err error
|
||||||
|
c.headerMD, err = stream.Header()
|
||||||
|
if err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
p := &parser{s: stream}
|
p := &parser{s: stream}
|
||||||
for {
|
for {
|
||||||
if err := recvProto(p, reply); err != nil {
|
if err = recvProto(p, reply); err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
return nil
|
break
|
||||||
}
|
}
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
c.trailerMD = stream.Trailer()
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
// sendRPC writes out various information of an RPC such as Context and Message.
|
// sendRPC writes out various information of an RPC such as Context and Message.
|
||||||
@ -145,17 +153,11 @@ func Invoke(ctx context.Context, method string, args, reply proto.Message, cc *C
|
|||||||
}
|
}
|
||||||
return toRPCErr(err)
|
return toRPCErr(err)
|
||||||
}
|
}
|
||||||
// Try to acquire header metadata from the server if there is any.
|
|
||||||
c.headerMD, err = stream.Header()
|
|
||||||
if err != nil {
|
|
||||||
return toRPCErr(err)
|
|
||||||
}
|
|
||||||
// Receive the response
|
// Receive the response
|
||||||
lastErr = recv(stream, reply)
|
lastErr = recv(t, &c, stream, reply)
|
||||||
if _, ok := lastErr.(transport.ConnectionError); ok {
|
if _, ok := lastErr.(transport.ConnectionError); ok {
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
c.trailerMD = stream.Trailer()
|
|
||||||
t.CloseStream(stream, lastErr)
|
t.CloseStream(stream, lastErr)
|
||||||
if lastErr != nil {
|
if lastErr != nil {
|
||||||
return toRPCErr(lastErr)
|
return toRPCErr(lastErr)
|
||||||
|
18
stream.go
18
stream.go
@ -37,10 +37,10 @@ import (
|
|||||||
"io"
|
"io"
|
||||||
|
|
||||||
"github.com/golang/protobuf/proto"
|
"github.com/golang/protobuf/proto"
|
||||||
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/transport"
|
"google.golang.org/grpc/transport"
|
||||||
"golang.org/x/net/context"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Stream defines the common interface a client or server stream has to satisfy.
|
// Stream defines the common interface a client or server stream has to satisfy.
|
||||||
@ -112,8 +112,14 @@ func (cs *clientStream) Context() context.Context {
|
|||||||
return cs.s.Context()
|
return cs.s.Context()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) Header() (md metadata.MD, err error) {
|
func (cs *clientStream) Header() (metadata.MD, error) {
|
||||||
return cs.s.Header()
|
m, err := cs.s.Header()
|
||||||
|
if err != nil {
|
||||||
|
if _, ok := err.(transport.ConnectionError); !ok {
|
||||||
|
cs.t.CloseStream(cs.s, err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return m, err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (cs *clientStream) Trailer() metadata.MD {
|
func (cs *clientStream) Trailer() metadata.MD {
|
||||||
@ -142,6 +148,9 @@ func (cs *clientStream) RecvProto(m proto.Message) (err error) {
|
|||||||
if err == nil {
|
if err == nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
if _, ok := err.(transport.ConnectionError); !ok {
|
||||||
|
cs.t.CloseStream(cs.s, err)
|
||||||
|
}
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
if cs.s.StatusCode() == codes.OK {
|
if cs.s.StatusCode() == codes.OK {
|
||||||
// Returns io.EOF to indicate the end of the stream.
|
// Returns io.EOF to indicate the end of the stream.
|
||||||
@ -149,9 +158,6 @@ func (cs *clientStream) RecvProto(m proto.Message) (err error) {
|
|||||||
}
|
}
|
||||||
return Errorf(cs.s.StatusCode(), cs.s.StatusDesc())
|
return Errorf(cs.s.StatusCode(), cs.s.StatusDesc())
|
||||||
}
|
}
|
||||||
if _, ok := err.(transport.ConnectionError); !ok {
|
|
||||||
cs.t.CloseStream(cs.s, err)
|
|
||||||
}
|
|
||||||
return toRPCErr(err)
|
return toRPCErr(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user