transport: eliminate StreamError; use status errors instead (#2239)

This commit is contained in:
dfawley
2018-07-26 10:30:58 -07:00
committed by GitHub
parent c03697249e
commit 339b6cb107
12 changed files with 36 additions and 82 deletions

View File

@ -57,8 +57,6 @@ func toRPCErr(err error) error {
return err return err
} }
switch e := err.(type) { switch e := err.(type) {
case transport.StreamError:
return status.Error(e.Code, e.Desc)
case transport.ConnectionError: case transport.ConnectionError:
return status.Error(codes.Unavailable, e.Desc) return status.Error(codes.Unavailable, e.Desc)
default: default:

View File

@ -58,8 +58,6 @@ func toRPCErr(err error) error {
return err return err
} }
switch e := err.(type) { switch e := err.(type) {
case transport.StreamError:
return status.Error(e.Code, e.Desc)
case transport.ConnectionError: case transport.ConnectionError:
return status.Error(codes.Unavailable, e.Desc) return status.Error(codes.Unavailable, e.Desc)
default: default:

View File

@ -80,7 +80,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats sta
if v := r.Header.Get("grpc-timeout"); v != "" { if v := r.Header.Get("grpc-timeout"); v != "" {
to, err := decodeTimeout(v) to, err := decodeTimeout(v)
if err != nil { if err != nil {
return nil, streamErrorf(codes.Internal, "malformed time-out: %v", err) return nil, status.Errorf(codes.Internal, "malformed time-out: %v", err)
} }
st.timeoutSet = true st.timeoutSet = true
st.timeout = to st.timeout = to
@ -98,7 +98,7 @@ func NewServerHandlerTransport(w http.ResponseWriter, r *http.Request, stats sta
for _, v := range vv { for _, v := range vv {
v, err := decodeMetadataHeader(k, v) v, err := decodeMetadataHeader(k, v)
if err != nil { if err != nil {
return nil, streamErrorf(codes.Internal, "malformed binary metadata: %v", err) return nil, status.Errorf(codes.Internal, "malformed binary metadata: %v", err)
} }
metakv = append(metakv, k, v) metakv = append(metakv, k, v)
} }
@ -432,17 +432,14 @@ func (ht *serverHandlerTransport) Drain() {
// * io.EOF // * io.EOF
// * io.ErrUnexpectedEOF // * io.ErrUnexpectedEOF
// * of type transport.ConnectionError // * of type transport.ConnectionError
// * of type transport.StreamError // * an error from the status package
func mapRecvMsgError(err error) error { func mapRecvMsgError(err error) error {
if err == io.EOF || err == io.ErrUnexpectedEOF { if err == io.EOF || err == io.ErrUnexpectedEOF {
return err return err
} }
if se, ok := err.(http2.StreamError); ok { if se, ok := err.(http2.StreamError); ok {
if code, ok := http2ErrConvTab[se.Code]; ok { if code, ok := http2ErrConvTab[se.Code]; ok {
return StreamError{ return status.Error(code, se.Error())
Code: code,
Desc: se.Error(),
}
} }
} }
return connectionErrorf(true, err, err.Error()) return connectionErrorf(true, err, err.Error())

View File

@ -179,7 +179,7 @@ func TestHandlerTransport_NewServerHandlerTransport(t *testing.T) {
}, },
RequestURI: "/service/foo.bar", RequestURI: "/service/foo.bar",
}, },
wantErr: `stream error: code = Internal desc = "malformed time-out: transport: timeout unit is not recognized: \"tomorrow\""`, wantErr: `rpc error: code = Internal desc = malformed time-out: transport: timeout unit is not recognized: "tomorrow"`,
}, },
{ {
name: "with metadata", name: "with metadata",
@ -220,7 +220,7 @@ func TestHandlerTransport_NewServerHandlerTransport(t *testing.T) {
} }
got, gotErr := NewServerHandlerTransport(rw, tt.req, nil) got, gotErr := NewServerHandlerTransport(rw, tt.req, nil)
if (gotErr != nil) != (tt.wantErr != "") || (gotErr != nil && gotErr.Error() != tt.wantErr) { if (gotErr != nil) != (tt.wantErr != "") || (gotErr != nil && gotErr.Error() != tt.wantErr) {
t.Errorf("%s: error = %v; want %q", tt.name, gotErr, tt.wantErr) t.Errorf("%s: error = %q; want %q", tt.name, gotErr.Error(), tt.wantErr)
continue continue
} }
if gotErr != nil { if gotErr != nil {

View File

@ -476,7 +476,7 @@ func (t *http2Client) getTrAuthData(ctx context.Context, audience string) (map[s
return nil, err return nil, err
} }
return nil, streamErrorf(codes.Unauthenticated, "transport: %v", err) return nil, status.Errorf(codes.Unauthenticated, "transport: %v", err)
} }
for k, v := range data { for k, v := range data {
// Capital header names are illegal in HTTP/2. // Capital header names are illegal in HTTP/2.
@ -494,11 +494,11 @@ func (t *http2Client) getCallAuthData(ctx context.Context, audience string, call
// options, then both sets of credentials will be applied. // options, then both sets of credentials will be applied.
if callCreds := callHdr.Creds; callCreds != nil { if callCreds := callHdr.Creds; callCreds != nil {
if !t.isSecure && callCreds.RequireTransportSecurity() { if !t.isSecure && callCreds.RequireTransportSecurity() {
return nil, streamErrorf(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection") return nil, status.Error(codes.Unauthenticated, "transport: cannot send secure credentials on an insecure connection")
} }
data, err := callCreds.GetRequestMetadata(ctx, audience) data, err := callCreds.GetRequestMetadata(ctx, audience)
if err != nil { if err != nil {
return nil, streamErrorf(codes.Internal, "transport: %v", err) return nil, status.Errorf(codes.Internal, "transport: %v", err)
} }
for k, v := range data { for k, v := range data {
// Capital header names are illegal in HTTP/2 // Capital header names are illegal in HTTP/2
@ -611,7 +611,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
var sz int64 var sz int64
for _, f := range hdrFrame.hf { for _, f := range hdrFrame.hf {
if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) { if sz += int64(f.Size()); sz > int64(*t.maxSendHeaderListSize) {
hdrListSizeErr = streamErrorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize) hdrListSizeErr = status.Errorf(codes.Internal, "header list size to send violates the maximum size (%d bytes) set by server", *t.maxSendHeaderListSize)
return false return false
} }
} }
@ -1206,7 +1206,7 @@ func (t *http2Client) reader() {
// use error detail to provide better err message // use error detail to provide better err message
code := http2ErrConvTab[se.Code] code := http2ErrConvTab[se.Code]
msg := t.framer.fr.ErrorDetail().Error() msg := t.framer.fr.ErrorDetail().Error()
t.closeStream(s, streamError(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false) t.closeStream(s, status.Error(code, msg), true, http2.ErrCodeProtocol, status.New(code, msg), nil, false)
} }
continue continue
} else { } else {

View File

@ -299,11 +299,11 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
streamID := frame.Header().StreamID streamID := frame.Header().StreamID
state := decodeState{serverSide: true} state := decodeState{serverSide: true}
if err := state.decodeHeader(frame); err != nil { if err := state.decodeHeader(frame); err != nil {
if se, ok := err.(StreamError); ok { if se, ok := status.FromError(err); ok {
t.controlBuf.put(&cleanupStream{ t.controlBuf.put(&cleanupStream{
streamID: streamID, streamID: streamID,
rst: true, rst: true,
rstCode: statusCodeConvTab[se.Code], rstCode: statusCodeConvTab[se.Code()],
onWrite: func() {}, onWrite: func() {},
}) })
} }
@ -863,7 +863,7 @@ func (t *http2Server) Write(s *Stream, hdr []byte, data []byte, opts *Options) e
if !s.isHeaderSent() { // Headers haven't been written yet. if !s.isHeaderSent() { // Headers haven't been written yet.
if err := t.WriteHeader(s, nil); err != nil { if err := t.WriteHeader(s, nil); err != nil {
// TODO(mmukhi, dfawley): Make sure this is the right code to return. // TODO(mmukhi, dfawley): Make sure this is the right code to return.
return streamErrorf(codes.Internal, "transport: %v", err) return status.Errorf(codes.Internal, "transport: %v", err)
} }
} else { } else {
// Writing headers checks for this condition. // Writing headers checks for this condition.

View File

@ -241,7 +241,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error {
// frame.Truncated is set to true when framer detects that the current header // frame.Truncated is set to true when framer detects that the current header
// list size hits MaxHeaderListSize limit. // list size hits MaxHeaderListSize limit.
if frame.Truncated { if frame.Truncated {
return streamErrorf(codes.Internal, "peer header list size exceeded limit") return status.Error(codes.Internal, "peer header list size exceeded limit")
} }
for _, hf := range frame.Fields { for _, hf := range frame.Fields {
if err := d.processHeaderField(hf); err != nil { if err := d.processHeaderField(hf); err != nil {
@ -261,7 +261,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error {
// If grpc status doesn't exist and http status doesn't exist, // If grpc status doesn't exist and http status doesn't exist,
// then it's a malformed header. // then it's a malformed header.
if d.httpStatus == nil { if d.httpStatus == nil {
return streamErrorf(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)") return status.Error(codes.Internal, "malformed header: doesn't contain status(gRPC or HTTP)")
} }
if *(d.httpStatus) != http.StatusOK { if *(d.httpStatus) != http.StatusOK {
@ -269,7 +269,7 @@ func (d *decodeState) decodeHeader(frame *http2.MetaHeadersFrame) error {
if !ok { if !ok {
code = codes.Unknown code = codes.Unknown
} }
return streamErrorf(code, http.StatusText(*(d.httpStatus))) return status.Error(code, http.StatusText(*(d.httpStatus)))
} }
// gRPC status doesn't exist and http status is OK. // gRPC status doesn't exist and http status is OK.
@ -295,7 +295,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
case "content-type": case "content-type":
contentSubtype, validContentType := contentSubtype(f.Value) contentSubtype, validContentType := contentSubtype(f.Value)
if !validContentType { if !validContentType {
return streamErrorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value) return status.Errorf(codes.Internal, "transport: received the unexpected content-type %q", f.Value)
} }
d.contentSubtype = contentSubtype d.contentSubtype = contentSubtype
// TODO: do we want to propagate the whole content-type in the metadata, // TODO: do we want to propagate the whole content-type in the metadata,
@ -308,7 +308,7 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
case "grpc-status": case "grpc-status":
code, err := strconv.Atoi(f.Value) code, err := strconv.Atoi(f.Value)
if err != nil { if err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-status: %v", err) return status.Errorf(codes.Internal, "transport: malformed grpc-status: %v", err)
} }
d.rawStatusCode = &code d.rawStatusCode = &code
case "grpc-message": case "grpc-message":
@ -316,38 +316,38 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
case "grpc-status-details-bin": case "grpc-status-details-bin":
v, err := decodeBinHeader(f.Value) v, err := decodeBinHeader(f.Value)
if err != nil { if err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
} }
s := &spb.Status{} s := &spb.Status{}
if err := proto.Unmarshal(v, s); err != nil { if err := proto.Unmarshal(v, s); err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err) return status.Errorf(codes.Internal, "transport: malformed grpc-status-details-bin: %v", err)
} }
d.statusGen = status.FromProto(s) d.statusGen = status.FromProto(s)
case "grpc-timeout": case "grpc-timeout":
d.timeoutSet = true d.timeoutSet = true
var err error var err error
if d.timeout, err = decodeTimeout(f.Value); err != nil { if d.timeout, err = decodeTimeout(f.Value); err != nil {
return streamErrorf(codes.Internal, "transport: malformed time-out: %v", err) return status.Errorf(codes.Internal, "transport: malformed time-out: %v", err)
} }
case ":path": case ":path":
d.method = f.Value d.method = f.Value
case ":status": case ":status":
code, err := strconv.Atoi(f.Value) code, err := strconv.Atoi(f.Value)
if err != nil { if err != nil {
return streamErrorf(codes.Internal, "transport: malformed http-status: %v", err) return status.Errorf(codes.Internal, "transport: malformed http-status: %v", err)
} }
d.httpStatus = &code d.httpStatus = &code
case "grpc-tags-bin": case "grpc-tags-bin":
v, err := decodeBinHeader(f.Value) v, err := decodeBinHeader(f.Value)
if err != nil { if err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) return status.Errorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
} }
d.statsTags = v d.statsTags = v
d.addMetadata(f.Name, string(v)) d.addMetadata(f.Name, string(v))
case "grpc-trace-bin": case "grpc-trace-bin":
v, err := decodeBinHeader(f.Value) v, err := decodeBinHeader(f.Value)
if err != nil { if err != nil {
return streamErrorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) return status.Errorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
} }
d.statsTrace = v d.statsTrace = v
d.addMetadata(f.Name, string(v)) d.addMetadata(f.Name, string(v))

View File

@ -619,19 +619,6 @@ type ServerTransport interface {
IncrMsgRecv() IncrMsgRecv()
} }
// streamErrorf creates an StreamError with the specified error code and description.
func streamErrorf(c codes.Code, format string, a ...interface{}) StreamError {
return StreamError{
Code: c,
Desc: fmt.Sprintf(format, a...),
}
}
// streamError creates an StreamError with the specified error code and description.
func streamError(c codes.Code, desc string) StreamError {
return StreamError{Code: c, Desc: desc}
}
// connectionErrorf creates an ConnectionError with the specified error description. // connectionErrorf creates an ConnectionError with the specified error description.
func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError { func connectionErrorf(temp bool, e error, format string, a ...interface{}) ConnectionError {
return ConnectionError{ return ConnectionError{
@ -674,7 +661,7 @@ var (
// errStreamDrain indicates that the stream is rejected because the // errStreamDrain indicates that the stream is rejected because the
// connection is draining. This could be caused by goaway or balancer // connection is draining. This could be caused by goaway or balancer
// removing the address. // removing the address.
errStreamDrain = streamErrorf(codes.Unavailable, "the connection is draining") errStreamDrain = status.Error(codes.Unavailable, "the connection is draining")
// errStreamDone is returned from write at the client side to indiacte application // errStreamDone is returned from write at the client side to indiacte application
// layer of an error. // layer of an error.
errStreamDone = errors.New("the stream is done") errStreamDone = errors.New("the stream is done")
@ -683,18 +670,6 @@ var (
statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection") statusGoAway = status.New(codes.Unavailable, "the stream is rejected because server is draining the connection")
) )
// TODO: See if we can replace StreamError with status package errors.
// StreamError is an error that only affects one stream within a connection.
type StreamError struct {
Code codes.Code
Desc string
}
func (e StreamError) Error() string {
return fmt.Sprintf("stream error: code = %s desc = %q", e.Code, e.Desc)
}
// GoAwayReason contains the reason for the GoAway frame received. // GoAwayReason contains the reason for the GoAway frame received.
type GoAwayReason uint8 type GoAwayReason uint8

View File

@ -446,7 +446,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
} }
// TestInflightStreamClosing ensures that closing in-flight stream // TestInflightStreamClosing ensures that closing in-flight stream
// sends StreamError to concurrent stream reader. // sends status error to concurrent stream reader.
func TestInflightStreamClosing(t *testing.T) { func TestInflightStreamClosing(t *testing.T) {
serverConfig := &ServerConfig{} serverConfig := &ServerConfig{}
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
@ -460,7 +460,7 @@ func TestInflightStreamClosing(t *testing.T) {
} }
donec := make(chan struct{}) donec := make(chan struct{})
serr := StreamError{Desc: "client connection is closing"} serr := status.Error(codes.Internal, "client connection is closing")
go func() { go func() {
defer close(donec) defer close(donec)
if _, err := stream.Read(make([]byte, defaultWindowSize)); err != serr { if _, err := stream.Read(make([]byte, defaultWindowSize)); err != serr {
@ -479,7 +479,7 @@ func TestInflightStreamClosing(t *testing.T) {
<-timeout.C <-timeout.C
} }
case <-timeout.C: case <-timeout.C:
t.Fatalf("Test timed out, expected a StreamError.") t.Fatalf("Test timed out, expected a status error.")
} }
} }
@ -1698,7 +1698,7 @@ func TestInvalidHeaderField(t *testing.T) {
} }
p := make([]byte, http2MaxFrameLen) p := make([]byte, http2MaxFrameLen)
_, err = s.trReader.(*transportReader).Read(p) _, err = s.trReader.(*transportReader).Read(p)
if se, ok := err.(StreamError); !ok || se.Code != codes.Internal || !strings.Contains(err.Error(), expectedInvalidHeaderField) { if se, ok := status.FromError(err); !ok || se.Code() != codes.Internal || !strings.Contains(err.Error(), expectedInvalidHeaderField) {
t.Fatalf("Read got error %v, want error with code %s and contains %q", err, codes.Internal, expectedInvalidHeaderField) t.Fatalf("Read got error %v, want error with code %s and contains %q", err, codes.Internal, expectedInvalidHeaderField)
} }
ct.Close() ct.Close()
@ -2093,12 +2093,12 @@ func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders)
if err == nil { if err == nil {
t.Fatalf("Stream.Read(_) unexpectedly returned no error. Expected stream error with code %v", want) t.Fatalf("Stream.Read(_) unexpectedly returned no error. Expected stream error with code %v", want)
} }
serr, ok := err.(StreamError) serr, ok := status.FromError(err)
if !ok { if !ok {
t.Fatalf("err.(Type) = %T, want StreamError", err) t.Fatalf("err.(Type) = %T, want status error", err)
} }
if want != serr.Code { if want != serr.Code() {
t.Fatalf("Want error code: %v, got: %v", want, serr.Code) t.Fatalf("Want error code: %v, got: %v", want, serr.Code())
} }
} }

View File

@ -470,7 +470,7 @@ type parser struct {
// * io.EOF, when no messages remain // * io.EOF, when no messages remain
// * io.ErrUnexpectedEOF // * io.ErrUnexpectedEOF
// * of type transport.ConnectionError // * of type transport.ConnectionError
// * of type transport.StreamError // * an error from the status package
// No other error values or types must be returned, which also means // No other error values or types must be returned, which also means
// that the underlying io.Reader must not return an incompatible // that the underlying io.Reader must not return an incompatible
// error. // error.

View File

@ -177,7 +177,6 @@ func TestToRPCErr(t *testing.T) {
// outputs // outputs
errOut error errOut error
}{ }{
{transport.StreamError{Code: codes.Unknown, Desc: ""}, status.Error(codes.Unknown, "")},
{transport.ErrConnClosing, status.Error(codes.Unavailable, transport.ErrConnClosing.Desc)}, {transport.ErrConnClosing, status.Error(codes.Unavailable, transport.ErrConnClosing.Desc)},
{io.ErrUnexpectedEOF, status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())}, {io.ErrUnexpectedEOF, status.Error(codes.Internal, io.ErrUnexpectedEOF.Error())},
} { } {

View File

@ -963,10 +963,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
switch st := err.(type) { switch st := err.(type) {
case transport.ConnectionError: case transport.ConnectionError:
// Nothing to do here. // Nothing to do here.
case transport.StreamError:
if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
default: default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st)) panic(fmt.Sprintf("grpc: Unexpected error (%T) from recvMsg: %v", st, st))
} }
@ -1062,10 +1058,6 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
switch st := err.(type) { switch st := err.(type) {
case transport.ConnectionError: case transport.ConnectionError:
// Nothing to do here. // Nothing to do here.
case transport.StreamError:
if e := t.WriteStatus(stream, status.New(st.Code, st.Desc)); e != nil {
grpclog.Warningf("grpc: Server.processUnaryRPC failed to write status %v", e)
}
default: default:
panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st)) panic(fmt.Sprintf("grpc: Unexpected error (%T) from sendResponse: %v", st, st))
} }
@ -1185,12 +1177,7 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp
if appErr != nil { if appErr != nil {
appStatus, ok := status.FromError(appErr) appStatus, ok := status.FromError(appErr)
if !ok { if !ok {
switch err := appErr.(type) {
case transport.StreamError:
appStatus = status.New(err.Code, err.Desc)
default:
appStatus = status.New(codes.Unknown, appErr.Error()) appStatus = status.New(codes.Unknown, appErr.Error())
}
appErr = appStatus.Err() appErr = appStatus.Err()
} }
if trInfo != nil { if trInfo != nil {