Fix more cases of Fatalf being used from goroutines started by tests.
Follow-up to #515 based on comments there from @maniksurtani.
This commit is contained in:
33
call_test.go
33
call_test.go
@ -34,6 +34,7 @@
|
|||||||
package grpc
|
package grpc
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
@ -90,7 +91,8 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
|
|||||||
var v string
|
var v string
|
||||||
codec := testCodec{}
|
codec := testCodec{}
|
||||||
if err := codec.Unmarshal(req, &v); err != nil {
|
if err := codec.Unmarshal(req, &v); err != nil {
|
||||||
t.Fatalf("Failed to unmarshal the received message %v", err)
|
t.Errorf("Failed to unmarshal the received message: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
if v != expectedRequest {
|
if v != expectedRequest {
|
||||||
h.t.WriteStatus(s, codes.Internal, string(make([]byte, sizeLargeErr)))
|
h.t.WriteStatus(s, codes.Internal, string(make([]byte, sizeLargeErr)))
|
||||||
@ -100,7 +102,8 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
|
|||||||
// send a response back to end the stream.
|
// send a response back to end the stream.
|
||||||
reply, err := encode(testCodec{}, &expectedResponse, nil, nil)
|
reply, err := encode(testCodec{}, &expectedResponse, nil, nil)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to encode the response: %v", err)
|
t.Errorf("Failed to encode the response: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
h.t.Write(s, reply, &transport.Options{})
|
h.t.Write(s, reply, &transport.Options{})
|
||||||
h.t.WriteStatus(s, codes.OK, "")
|
h.t.WriteStatus(s, codes.OK, "")
|
||||||
@ -109,13 +112,16 @@ func (h *testStreamHandler) handleStream(t *testing.T, s *transport.Stream) {
|
|||||||
type server struct {
|
type server struct {
|
||||||
lis net.Listener
|
lis net.Listener
|
||||||
port string
|
port string
|
||||||
// channel to signal server is ready to serve.
|
startedErr chan error // sent nil or an error after server starts
|
||||||
readyChan chan bool
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
conns map[transport.ServerTransport]bool
|
conns map[transport.ServerTransport]bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// start starts server. Other goroutines should block on s.readyChan for futher operations.
|
func newTestServer() *server {
|
||||||
|
return &server{startedErr: make(chan error, 1)}
|
||||||
|
}
|
||||||
|
|
||||||
|
// start starts server. Other goroutines should block on s.startedErr for futher operations.
|
||||||
func (s *server) start(t *testing.T, port int, maxStreams uint32) {
|
func (s *server) start(t *testing.T, port int, maxStreams uint32) {
|
||||||
var err error
|
var err error
|
||||||
if port == 0 {
|
if port == 0 {
|
||||||
@ -124,17 +130,17 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32) {
|
|||||||
s.lis, err = net.Listen("tcp", ":"+strconv.Itoa(port))
|
s.lis, err = net.Listen("tcp", ":"+strconv.Itoa(port))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to listen: %v", err)
|
s.startedErr <- fmt.Errorf("failed to listen: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
_, p, err := net.SplitHostPort(s.lis.Addr().String())
|
_, p, err := net.SplitHostPort(s.lis.Addr().String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to parse listener address: %v", err)
|
s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
s.port = p
|
s.port = p
|
||||||
s.conns = make(map[transport.ServerTransport]bool)
|
s.conns = make(map[transport.ServerTransport]bool)
|
||||||
if s.readyChan != nil {
|
s.startedErr <- nil
|
||||||
close(s.readyChan)
|
|
||||||
}
|
|
||||||
for {
|
for {
|
||||||
conn, err := s.lis.Accept()
|
conn, err := s.lis.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -161,7 +167,10 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32) {
|
|||||||
|
|
||||||
func (s *server) wait(t *testing.T, timeout time.Duration) {
|
func (s *server) wait(t *testing.T, timeout time.Duration) {
|
||||||
select {
|
select {
|
||||||
case <-s.readyChan:
|
case err := <-s.startedErr:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
|
t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
|
||||||
}
|
}
|
||||||
@ -178,7 +187,7 @@ func (s *server) stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func setUp(t *testing.T, port int, maxStreams uint32) (*server, *ClientConn) {
|
func setUp(t *testing.T, port int, maxStreams uint32) (*server, *ClientConn) {
|
||||||
server := &server{readyChan: make(chan bool)}
|
server := newTestServer()
|
||||||
go server.start(t, port, maxStreams)
|
go server.start(t, port, maxStreams)
|
||||||
server.wait(t, 2*time.Second)
|
server.wait(t, 2*time.Second)
|
||||||
addr := "localhost:" + server.port
|
addr := "localhost:" + server.port
|
||||||
|
@ -103,7 +103,7 @@ func (r *testNameResolver) Resolve(target string) (naming.Watcher, error) {
|
|||||||
func startServers(t *testing.T, numServers, port int, maxStreams uint32) ([]*server, *testNameResolver) {
|
func startServers(t *testing.T, numServers, port int, maxStreams uint32) ([]*server, *testNameResolver) {
|
||||||
var servers []*server
|
var servers []*server
|
||||||
for i := 0; i < numServers; i++ {
|
for i := 0; i < numServers; i++ {
|
||||||
s := &server{readyChan: make(chan bool)}
|
s := newTestServer()
|
||||||
servers = append(servers, s)
|
servers = append(servers, s)
|
||||||
go s.start(t, port, maxStreams)
|
go s.start(t, port, maxStreams)
|
||||||
s.wait(t, 2*time.Second)
|
s.wait(t, 2*time.Second)
|
||||||
|
@ -35,6 +35,7 @@ package transport
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
@ -52,8 +53,7 @@ import (
|
|||||||
type server struct {
|
type server struct {
|
||||||
lis net.Listener
|
lis net.Listener
|
||||||
port string
|
port string
|
||||||
// channel to signal server is ready to serve.
|
startedErr chan error // error (or nil) with server start value
|
||||||
readyChan chan bool
|
|
||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
conns map[ServerTransport]bool
|
conns map[ServerTransport]bool
|
||||||
}
|
}
|
||||||
@ -136,17 +136,17 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
|
|||||||
s.lis, err = net.Listen("tcp", ":"+strconv.Itoa(port))
|
s.lis, err = net.Listen("tcp", ":"+strconv.Itoa(port))
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to listen: %v", err)
|
s.startedErr <- fmt.Errorf("failed to listen: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
_, p, err := net.SplitHostPort(s.lis.Addr().String())
|
_, p, err := net.SplitHostPort(s.lis.Addr().String())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("failed to parse listener address: %v", err)
|
s.startedErr <- fmt.Errorf("failed to parse listener address: %v", err)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
s.port = p
|
s.port = p
|
||||||
s.conns = make(map[ServerTransport]bool)
|
s.conns = make(map[ServerTransport]bool)
|
||||||
if s.readyChan != nil {
|
s.startedErr <- nil
|
||||||
close(s.readyChan)
|
|
||||||
}
|
|
||||||
for {
|
for {
|
||||||
conn, err := s.lis.Accept()
|
conn, err := s.lis.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@ -182,7 +182,10 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
|
|||||||
|
|
||||||
func (s *server) wait(t *testing.T, timeout time.Duration) {
|
func (s *server) wait(t *testing.T, timeout time.Duration) {
|
||||||
select {
|
select {
|
||||||
case <-s.readyChan:
|
case err := <-s.startedErr:
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
case <-time.After(timeout):
|
case <-time.After(timeout):
|
||||||
t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
|
t.Fatalf("Timed out after %v waiting for server to be ready", timeout)
|
||||||
}
|
}
|
||||||
@ -199,7 +202,7 @@ func (s *server) stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
|
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
|
||||||
server := &server{readyChan: make(chan bool)}
|
server := &server{startedErr: make(chan error, 1)}
|
||||||
go server.start(t, port, maxStreams, ht)
|
go server.start(t, port, maxStreams, ht)
|
||||||
server.wait(t, 2*time.Second)
|
server.wait(t, 2*time.Second)
|
||||||
addr := "localhost:" + server.port
|
addr := "localhost:" + server.port
|
||||||
|
Reference in New Issue
Block a user