Fix testStreamingRPCTimeoutServerError

This commit is contained in:
Menghan Li
2016-09-26 15:07:31 -07:00
parent 364424271a
commit 033c91440b

View File

@ -1698,20 +1698,18 @@ func testFailedServerStreaming(t *testing.T, e env) {
// checkTimeoutErrorServer is a gRPC server checks context timeout error in FullDuplexCall(). // checkTimeoutErrorServer is a gRPC server checks context timeout error in FullDuplexCall().
// It is only used in TestStreamingRPCTimeoutServerError. // It is only used in TestStreamingRPCTimeoutServerError.
type checkTimeoutErrorServer struct { type checkTimeoutErrorServer struct {
t *testing.T t *testing.T
done chan struct{}
testpb.TestServiceServer testpb.TestServiceServer
} }
func (s checkTimeoutErrorServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error { func (s *checkTimeoutErrorServer) FullDuplexCall(stream testpb.TestService_FullDuplexCallServer) error {
defer close(s.done)
for { for {
_, err := stream.Recv() _, err := stream.Recv()
if err == io.EOF {
// read done.
return nil
}
if err != nil { if err != nil {
if grpc.Code(err) != codes.DeadlineExceeded { if grpc.Code(err) != codes.DeadlineExceeded {
s.t.Fatalf("stream.Recv(_) = _, %v, want error code %s", err, codes.DeadlineExceeded) s.t.Errorf("stream.Recv() = _, %v, want error code %s", err, codes.DeadlineExceeded)
} }
return err return err
} }
@ -1721,7 +1719,7 @@ func (s checkTimeoutErrorServer) FullDuplexCall(stream testpb.TestService_FullDu
}, },
}); err != nil { }); err != nil {
if grpc.Code(err) != codes.DeadlineExceeded { if grpc.Code(err) != codes.DeadlineExceeded {
s.t.Fatalf("stream.Send(_) = %v, want error code %s", err, codes.DeadlineExceeded) s.t.Errorf("stream.Send(_) = %v, want error code %s", err, codes.DeadlineExceeded)
} }
return err return err
} }
@ -1739,33 +1737,41 @@ func TestStreamingRPCTimeoutServerError(t *testing.T) {
// When context timeout happens on client side, server should get deadline exceeded error. // When context timeout happens on client side, server should get deadline exceeded error.
func testStreamingRPCTimeoutServerError(t *testing.T, e env) { func testStreamingRPCTimeoutServerError(t *testing.T, e env) {
te := newTest(t, e) te := newTest(t, e)
te.startServer(checkTimeoutErrorServer{t: t}) serverDone := make(chan struct{})
te.startServer(&checkTimeoutErrorServer{t: t, done: serverDone})
defer te.tearDown()
cc := te.clientConn() cc := te.clientConn()
tc := testpb.NewTestServiceClient(cc) tc := testpb.NewTestServiceClient(cc)
req := &testpb.StreamingOutputCallRequest{} req := &testpb.StreamingOutputCallRequest{}
duration := 100 * time.Millisecond for duration := 50 * time.Millisecond; ; duration *= 2 {
ctx, _ := context.WithTimeout(context.Background(), duration) ctx, _ := context.WithTimeout(context.Background(), duration)
stream, err := tc.FullDuplexCall(ctx) stream, err := tc.FullDuplexCall(ctx)
if err != nil { if grpc.Code(err) == codes.DeadlineExceeded {
t.Errorf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err) // Redo test with double timeout.
return continue
}
for {
err := stream.Send(req)
if err != nil {
break
} }
_, err = stream.Recv()
if err != nil { if err != nil {
break t.Errorf("%v.FullDuplexCall(_) = _, %v, want <nil>", tc, err)
return
}
for {
err := stream.Send(req)
if err != nil {
break
}
_, err = stream.Recv()
if err != nil {
break
}
} }
}
// Wait for context timeout on server before closing connection. // Wait for context timeout on server before closing connection
time.Sleep(duration) // to make sure the server will get timeout error.
te.tearDown() <-serverDone
break
}
} }
// concurrentSendServer is a TestServiceServer whose // concurrentSendServer is a TestServiceServer whose