internal/transport_test.go: prevent leaking context (#2227)

This commit is contained in:
lyuxuan
2018-07-16 18:48:37 -07:00
committed by GitHub
parent 7273294e1a
commit a7d6032db4

View File

@ -396,11 +396,11 @@ func setUpServerOnly(t *testing.T, port int, serverConfig *ServerConfig, ht hTyp
return server
}
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport, func()) {
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{}, func() {})
}
func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions, onHandshake func()) (*server, ClientTransport) {
func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions, onHandshake func()) (*server, ClientTransport, func()) {
server := setUpServerOnly(t, port, serverConfig, ht)
addr := "localhost:" + server.port
var (
@ -416,10 +416,10 @@ func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hTy
cancel() // Do not cancel in success path.
t.Fatalf("failed to create transport: %v", connErr)
}
return server, ct
return server, ct, cancel
}
func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) ClientTransport {
func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Conn) (ClientTransport, func()) {
lis, err := net.Listen("tcp", "localhost:0")
if err != nil {
t.Fatalf("Failed to listen: %v", err)
@ -446,14 +446,15 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
}
t.Fatalf("Failed to dial: %v", err)
}
return tr
return tr, cancel
}
// TestInflightStreamClosing ensures that closing in-flight stream
// sends StreamError to concurrent stream reader.
func TestInflightStreamClosing(t *testing.T) {
serverConfig := &ServerConfig{}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
defer cancel()
defer server.stop()
defer client.Close()
@ -495,7 +496,8 @@ func TestMaxConnectionIdle(t *testing.T) {
MaxConnectionIdle: 2 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
defer cancel()
defer server.stop()
defer client.Close()
stream, err := client.NewStream(context.Background(), &CallHdr{})
@ -522,7 +524,8 @@ func TestMaxConnectionIdleNegative(t *testing.T) {
MaxConnectionIdle: 2 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
defer cancel()
defer server.stop()
defer client.Close()
_, err := client.NewStream(context.Background(), &CallHdr{})
@ -548,7 +551,8 @@ func TestMaxConnectionAge(t *testing.T) {
MaxConnectionAge: 2 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
defer cancel()
defer server.stop()
defer client.Close()
_, err := client.NewStream(context.Background(), &CallHdr{})
@ -580,7 +584,8 @@ func TestKeepaliveServer(t *testing.T) {
Timeout: 1 * time.Second,
},
}
server, c := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, c, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
defer cancel()
defer server.stop()
defer c.Close()
client, err := net.Dial("tcp", server.lis.Addr().String())
@ -623,7 +628,8 @@ func TestKeepaliveServerNegative(t *testing.T) {
Timeout: 1 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
defer cancel()
defer server.stop()
defer client.Close()
// Give keepalive logic some time by sleeping.
@ -639,11 +645,12 @@ func TestKeepaliveServerNegative(t *testing.T) {
func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
tr, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.
}}, done)
defer cancel()
defer tr.Close()
conn, ok := <-done
if !ok {
@ -663,10 +670,11 @@ func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
tr, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
}}, done)
defer cancel()
defer tr.Close()
conn, ok := <-done
if !ok {
@ -686,10 +694,11 @@ func TestKeepaliveClientStaysHealthyOnIdleTransport(t *testing.T) {
func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
done := make(chan net.Conn, 1)
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
tr, cancel := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
}}, done)
defer cancel()
defer tr.Close()
conn, ok := <-done
if !ok {
@ -713,11 +722,12 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
}
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
s, tr := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
s, tr, cancel := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
Time: 2 * time.Second, // Keepalive time = 2 sec.
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
PermitWithoutStream: true, // Run keepalive even with no RPCs.
}}, func() {})
defer cancel()
defer s.stop()
defer tr.Close()
// Give keep alive some time.
@ -744,7 +754,8 @@ func TestKeepaliveServerEnforcementWithAbusiveClientNoRPC(t *testing.T) {
PermitWithoutStream: true,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {})
defer cancel()
defer server.stop()
defer client.Close()
@ -778,7 +789,8 @@ func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
Timeout: 1 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {})
defer cancel()
defer server.stop()
defer client.Close()
@ -817,7 +829,8 @@ func TestKeepaliveServerEnforcementWithObeyingClientNoRPC(t *testing.T) {
PermitWithoutStream: true,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, normal, clientOptions, func() {})
defer cancel()
defer server.stop()
defer client.Close()
@ -844,7 +857,8 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
Timeout: 1 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, clientOptions, func() {})
defer cancel()
defer server.stop()
defer client.Close()
@ -864,7 +878,8 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
}
func TestClientSendAndReceive(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, normal)
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
defer cancel()
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Small",
@ -901,7 +916,8 @@ func TestClientSendAndReceive(t *testing.T) {
}
func TestClientErrorNotify(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, normal)
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
defer cancel()
go server.stop()
// ct.reader should detect the error and activate ct.Error().
<-ct.Error()
@ -932,7 +948,8 @@ func performOneRPC(ct ClientTransport) {
}
func TestClientMix(t *testing.T) {
s, ct := setUp(t, 0, math.MaxUint32, normal)
s, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
defer cancel()
go func(s *server) {
time.Sleep(5 * time.Second)
s.stop()
@ -948,7 +965,8 @@ func TestClientMix(t *testing.T) {
}
func TestLargeMessage(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, normal)
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
defer cancel()
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Large",
@ -989,7 +1007,8 @@ func TestLargeMessageWithDelayRead(t *testing.T) {
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
server, ct := setUpWithOptions(t, 0, sc, delayRead, co, func() {})
server, ct, cancel := setUpWithOptions(t, 0, sc, delayRead, co, func() {})
defer cancel()
defer server.stop()
defer ct.Close()
server.mu.Lock()
@ -1076,7 +1095,7 @@ func TestLargeMessageWithDelayRead(t *testing.T) {
}
func TestGracefulClose(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, pingpong)
server, ct, cancel := setUp(t, 0, math.MaxUint32, pingpong)
defer func() {
// Stop the server's listener to make the server's goroutines terminate
// (after the last active stream is done).
@ -1086,6 +1105,7 @@ func TestGracefulClose(t *testing.T) {
leakcheck.Check(t)
// Correctly clean up the server
server.stop()
cancel()
}()
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second*10))
defer cancel()
@ -1140,7 +1160,8 @@ func TestGracefulClose(t *testing.T) {
}
func TestLargeMessageSuspension(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, suspended)
server, ct, cancelsvr := setUp(t, 0, math.MaxUint32, suspended)
defer cancelsvr()
callHdr := &CallHdr{
Host: "localhost",
Method: "foo.Large",
@ -1177,7 +1198,8 @@ func TestMaxStreams(t *testing.T) {
serverConfig := &ServerConfig{
MaxStreams: 1,
}
server, ct := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, ct, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
defer cancel()
defer ct.Close()
defer server.stop()
callHdr := &CallHdr{
@ -1245,7 +1267,8 @@ func TestMaxStreams(t *testing.T) {
}
func TestServerContextCanceledOnClosedConnection(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, suspended)
server, ct, cancel := setUp(t, 0, math.MaxUint32, suspended)
defer cancel()
callHdr := &CallHdr{
Host: "localhost",
Method: "foo",
@ -1314,7 +1337,8 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
server, client := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions, func() {})
server, client, cancel := setUpWithOptions(t, 0, &ServerConfig{}, notifyCall, connectOptions, func() {})
defer cancel()
defer server.stop()
defer client.Close()
@ -1400,7 +1424,8 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
InitialWindowSize: defaultWindowSize,
InitialConnWindowSize: defaultWindowSize,
}
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
server, client, cancel := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
defer cancel()
defer server.stop()
defer client.Close()
waitWhileTrue(t, func() (bool, error) {
@ -1653,7 +1678,8 @@ func TestClientWithMisbehavedServer(t *testing.T) {
var encodingTestStatus = status.New(codes.Internal, "\n")
func TestEncodingRequiredStatus(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, encodingRequiredStatus)
server, ct, cancel := setUp(t, 0, math.MaxUint32, encodingRequiredStatus)
defer cancel()
callHdr := &CallHdr{
Host: "localhost",
Method: "foo",
@ -1678,7 +1704,8 @@ func TestEncodingRequiredStatus(t *testing.T) {
}
func TestInvalidHeaderField(t *testing.T) {
server, ct := setUp(t, 0, math.MaxUint32, invalidHeaderField)
server, ct, cancel := setUp(t, 0, math.MaxUint32, invalidHeaderField)
defer cancel()
callHdr := &CallHdr{
Host: "localhost",
Method: "foo",
@ -1790,7 +1817,8 @@ func testFlowControlAccountCheck(t *testing.T, msgSize int, wc windowSizeConfig)
InitialWindowSize: wc.clientStream,
InitialConnWindowSize: wc.clientConn,
}
server, client := setUpWithOptions(t, 0, sc, pingpong, co, func() {})
server, client, cancel := setUpWithOptions(t, 0, sc, pingpong, co, func() {})
defer cancel()
defer server.stop()
defer client.Close()
waitWhileTrue(t, func() (bool, error) {
@ -2022,7 +2050,7 @@ func (s *httpServer) cleanUp() {
}
}
func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func()) {
func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream *Stream, cleanUp func(), cancel func()) {
var (
err error
lis net.Listener
@ -2054,7 +2082,8 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream
wh: wh,
}
server.start(t, lis)
connectCtx, cancel := context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
var connectCtx context.Context
connectCtx, cancel = context.WithDeadline(context.Background(), time.Now().Add(2*time.Second))
client, err = newHTTP2Client(connectCtx, context.Background(), TargetInfo{Addr: lis.Addr().String()}, ConnectOptions{}, func() {})
if err != nil {
cancel() // Do not cancel in success path.
@ -2074,7 +2103,8 @@ func TestHTTPToGRPCStatusMapping(t *testing.T) {
}
func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders) {
stream, cleanUp := setUpHTTPStatusTest(t, httpStatus, wh)
stream, cleanUp, cancel := setUpHTTPStatusTest(t, httpStatus, wh)
defer cancel()
defer cleanUp()
want := httpStatusConvTab[httpStatus]
buf := make([]byte, 8)
@ -2092,7 +2122,8 @@ func testHTTPToGRPCStatusMapping(t *testing.T, httpStatus int, wh writeHeaders)
}
func TestHTTPStatusOKAndMissingGRPCStatus(t *testing.T) {
stream, cleanUp := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader)
stream, cleanUp, cancel := setUpHTTPStatusTest(t, http.StatusOK, writeOneHeader)
defer cancel()
defer cleanUp()
buf := make([]byte, 8)
_, err := stream.Read(buf)
@ -2173,7 +2204,8 @@ func TestPingPong1MB(t *testing.T) {
//This is a stress-test of flow control logic.
func runPingPongTest(t *testing.T, msgSize int) {
server, client := setUp(t, 0, 0, pingpong)
server, client, cancel := setUp(t, 0, 0, pingpong)
defer cancel()
defer server.stop()
defer client.Close()
waitWhileTrue(t, func() (bool, error) {
@ -2260,7 +2292,8 @@ func TestHeaderTblSize(t *testing.T) {
}
}()
server, ct := setUp(t, 0, math.MaxUint32, normal)
server, ct, cancel := setUp(t, 0, math.MaxUint32, normal)
defer cancel()
defer ct.Close()
defer server.stop()
_, err := ct.NewStream(context.Background(), &CallHdr{})