Merge pull request #947 from iamqizhao/err-fix

Use the correct error in defer call
This commit is contained in:
Menghan Li
2016-10-25 16:06:07 -07:00
committed by GitHub
2 changed files with 18 additions and 11 deletions

View File

@ -49,9 +49,8 @@ import (
// On error, it returns the error and indicates whether the call should be retried. // On error, it returns the error and indicates whether the call should be retried.
// //
// TODO(zhaoq): Check whether the received message sequence is valid. // TODO(zhaoq): Check whether the received message sequence is valid.
func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error { func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) (err error) {
// Try to acquire header metadata from the server if there is any. // Try to acquire header metadata from the server if there is any.
var err error
defer func() { defer func() {
if err != nil { if err != nil {
if _, ok := err.(transport.ConnectionError); !ok { if _, ok := err.(transport.ConnectionError); !ok {
@ -61,7 +60,7 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
}() }()
c.headerMD, err = stream.Header() c.headerMD, err = stream.Header()
if err != nil { if err != nil {
return err return
} }
p := &parser{r: stream} p := &parser{r: stream}
for { for {
@ -69,7 +68,7 @@ func recvResponse(dopts dialOptions, t transport.ClientTransport, c *callInfo, s
if err == io.EOF { if err == io.EOF {
break break
} }
return err return
} }
} }
c.trailerMD = stream.Trailer() c.trailerMD = stream.Trailer()

View File

@ -2504,7 +2504,7 @@ func testStreamsQuotaRecovery(t *testing.T, e env) {
cc := te.clientConn() cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
ctx, cancel := context.WithCancel(context.Background()) ctx, _ := context.WithCancel(context.Background())
if _, err := tc.StreamingInputCall(ctx); err != nil { if _, err := tc.StreamingInputCall(ctx); err != nil {
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err)
} }
@ -2522,18 +2522,26 @@ func testStreamsQuotaRecovery(t *testing.T, e env) {
} }
t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded) t.Fatalf("%v.StreamingInputCall(_) = _, %v, want _, %s", tc, err, codes.DeadlineExceeded)
} }
cancel()
var wg sync.WaitGroup var wg sync.WaitGroup
for i := 0; i < 100; i++ { for i := 0; i < 10; i++ {
wg.Add(1) wg.Add(1)
go func() { go func() {
defer wg.Done() defer wg.Done()
ctx, cancel := context.WithCancel(context.Background()) payload, err := newPayload(testpb.PayloadType_COMPRESSABLE, 314)
if _, err := tc.StreamingInputCall(ctx); err != nil { if err != nil {
t.Errorf("%v.StreamingInputCall(_) = _, %v, want _, <nil>", tc, err) t.Fatal(err)
}
req := &testpb.SimpleRequest{
ResponseType: testpb.PayloadType_COMPRESSABLE.Enum(),
ResponseSize: proto.Int32(1592),
Payload: payload,
}
// No rpc should go through due to the max streams limit.
ctx, _ := context.WithTimeout(context.Background(), 10*time.Millisecond)
if _, err := tc.UnaryCall(ctx, req, grpc.FailFast(false)); grpc.Code(err) != codes.DeadlineExceeded {
t.Errorf("TestService/UnaryCall(_, _) = _, %v, want _, %s", err, codes.DeadlineExceeded)
} }
cancel()
}() }()
} }
wg.Wait() wg.Wait()