Merge pull request #546 from bradfitz/quiet
test: reduce end2end log spam
This commit is contained in:
@ -38,8 +38,10 @@ import (
|
|||||||
"flag"
|
"flag"
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
|
"log"
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
|
"os"
|
||||||
"reflect"
|
"reflect"
|
||||||
"runtime"
|
"runtime"
|
||||||
"sort"
|
"sort"
|
||||||
@ -54,6 +56,7 @@ import (
|
|||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
"google.golang.org/grpc/codes"
|
"google.golang.org/grpc/codes"
|
||||||
"google.golang.org/grpc/credentials"
|
"google.golang.org/grpc/credentials"
|
||||||
|
"google.golang.org/grpc/grpclog"
|
||||||
"google.golang.org/grpc/health"
|
"google.golang.org/grpc/health"
|
||||||
healthpb "google.golang.org/grpc/health/grpc_health_v1alpha"
|
healthpb "google.golang.org/grpc/health/grpc_health_v1alpha"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
@ -280,6 +283,13 @@ const tlsDir = "testdata/"
|
|||||||
|
|
||||||
func TestReconnectTimeout(t *testing.T) {
|
func TestReconnectTimeout(t *testing.T) {
|
||||||
defer leakCheck(t)()
|
defer leakCheck(t)()
|
||||||
|
restore := declareLogNoise(t,
|
||||||
|
"transport: http2Client.notifyError got notified that the client transport was broken",
|
||||||
|
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport",
|
||||||
|
"grpc: Conn.transportMonitor exits due to: grpc: timed out trying to connect",
|
||||||
|
)
|
||||||
|
defer restore()
|
||||||
|
|
||||||
lis, err := net.Listen("tcp", ":0")
|
lis, err := net.Listen("tcp", ":0")
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to listen: %v", err)
|
t.Fatalf("Failed to listen: %v", err)
|
||||||
@ -520,6 +530,14 @@ func TestTimeoutOnDeadServer(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testTimeoutOnDeadServer(t *testing.T, e env) {
|
func testTimeoutOnDeadServer(t *testing.T, e env) {
|
||||||
|
restore := declareLogNoise(t,
|
||||||
|
"transport: http2Client.notifyError got notified that the client transport was broken EOF",
|
||||||
|
"grpc: Conn.transportMonitor exits due to: grpc: the client connection is closing",
|
||||||
|
"grpc: Conn.resetTransport failed to create client transport: connection error",
|
||||||
|
"grpc: Conn.resetTransport failed to create client transport: connection error: desc = \"transport: dial unix",
|
||||||
|
)
|
||||||
|
defer restore()
|
||||||
|
|
||||||
s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
|
s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
|
||||||
cc := clientSetUp(t, addr, nil, nil, "", e)
|
cc := clientSetUp(t, addr, nil, nil, "", e)
|
||||||
tc := testpb.NewTestServiceClient(cc)
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
@ -554,6 +572,7 @@ func testTimeoutOnDeadServer(t *testing.T, e env) {
|
|||||||
t.Fatalf("cc.State() = %s, %v, want %s or %s, <nil>", state, err, grpc.Connecting, grpc.TransientFailure)
|
t.Fatalf("cc.State() = %s, %v, want %s or %s, <nil>", state, err, grpc.Connecting, grpc.TransientFailure)
|
||||||
}
|
}
|
||||||
cc.Close()
|
cc.Close()
|
||||||
|
awaitNewConnLogOutput()
|
||||||
}
|
}
|
||||||
|
|
||||||
func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) {
|
func healthCheck(d time.Duration, cc *grpc.ClientConn, serviceName string) (*healthpb.HealthCheckResponse, error) {
|
||||||
@ -591,6 +610,12 @@ func TestHealthCheckOnFailure(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testHealthCheckOnFailure(t *testing.T, e env) {
|
func testHealthCheckOnFailure(t *testing.T, e env) {
|
||||||
|
defer leakCheck(t)()
|
||||||
|
restore := declareLogNoise(t,
|
||||||
|
"Failed to dial ",
|
||||||
|
"grpc: the client connection is closing; please retry",
|
||||||
|
)
|
||||||
|
defer restore()
|
||||||
hs := health.NewHealthServer()
|
hs := health.NewHealthServer()
|
||||||
hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1)
|
hs.SetServingStatus("grpc.health.v1alpha.HealthCheck", 1)
|
||||||
s, addr := serverSetUp(t, true, hs, math.MaxUint32, nil, nil, e)
|
s, addr := serverSetUp(t, true, hs, math.MaxUint32, nil, nil, e)
|
||||||
@ -599,6 +624,7 @@ func testHealthCheckOnFailure(t *testing.T, e env) {
|
|||||||
if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.Health"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") {
|
if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.Health"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") {
|
||||||
t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %d", err, codes.DeadlineExceeded)
|
t.Fatalf("Health/Check(_, _) = _, %v, want _, error code %d", err, codes.DeadlineExceeded)
|
||||||
}
|
}
|
||||||
|
awaitNewConnLogOutput()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestHealthCheckOff(t *testing.T) {
|
func TestHealthCheckOff(t *testing.T) {
|
||||||
@ -841,6 +867,10 @@ func TestRetry(t *testing.T) {
|
|||||||
// TODO(zhaoq): Refactor to make this clearer and add more cases to test racy
|
// TODO(zhaoq): Refactor to make this clearer and add more cases to test racy
|
||||||
// and error-prone paths.
|
// and error-prone paths.
|
||||||
func testRetry(t *testing.T, e env) {
|
func testRetry(t *testing.T, e env) {
|
||||||
|
restore := declareLogNoise(t,
|
||||||
|
"transport: http2Client.notifyError got notified that the client transport was broken",
|
||||||
|
)
|
||||||
|
defer restore()
|
||||||
s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
|
s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
|
||||||
cc := clientSetUp(t, addr, nil, nil, "", e)
|
cc := clientSetUp(t, addr, nil, nil, "", e)
|
||||||
tc := testpb.NewTestServiceClient(cc)
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
@ -922,6 +952,10 @@ func TestCancel(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func testCancel(t *testing.T, e env) {
|
func testCancel(t *testing.T, e env) {
|
||||||
|
restore := declareLogNoise(t,
|
||||||
|
"grpc: the client connection is closing; please retry",
|
||||||
|
)
|
||||||
|
defer restore()
|
||||||
s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
|
s, addr := serverSetUp(t, true, nil, math.MaxUint32, nil, nil, e)
|
||||||
cc := clientSetUp(t, addr, nil, nil, "", e)
|
cc := clientSetUp(t, addr, nil, nil, "", e)
|
||||||
tc := testpb.NewTestServiceClient(cc)
|
tc := testpb.NewTestServiceClient(cc)
|
||||||
@ -945,6 +979,9 @@ func testCancel(t *testing.T, e env) {
|
|||||||
if grpc.Code(err) != codes.Canceled {
|
if grpc.Code(err) != codes.Canceled {
|
||||||
t.Fatalf(`TestService/UnaryCall(_, _) = %v, %v; want <nil>, error code: %d`, reply, err, codes.Canceled)
|
t.Fatalf(`TestService/UnaryCall(_, _) = %v, %v; want <nil>, error code: %d`, reply, err, codes.Canceled)
|
||||||
}
|
}
|
||||||
|
cc.Close()
|
||||||
|
|
||||||
|
awaitNewConnLogOutput()
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestCancelNoIO(t *testing.T) {
|
func TestCancelNoIO(t *testing.T) {
|
||||||
@ -1585,3 +1622,126 @@ func leakCheck(t testing.TB) func() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type lockingWriter struct {
|
||||||
|
mu sync.Mutex
|
||||||
|
w io.Writer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lw *lockingWriter) Write(p []byte) (n int, err error) {
|
||||||
|
lw.mu.Lock()
|
||||||
|
defer lw.mu.Unlock()
|
||||||
|
return lw.w.Write(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (lw *lockingWriter) setWriter(w io.Writer) {
|
||||||
|
lw.mu.Lock()
|
||||||
|
defer lw.mu.Unlock()
|
||||||
|
lw.w = w
|
||||||
|
}
|
||||||
|
|
||||||
|
var testLogOutput = &lockingWriter{w: os.Stderr}
|
||||||
|
|
||||||
|
// awaitNewConnLogOutput waits for any of grpc.NewConn's goroutines to
|
||||||
|
// terminate, if they're still running. It spams logs with this
|
||||||
|
// message. We wait for it so our log filter is still
|
||||||
|
// active. Otherwise the "defer restore()" at the top of various test
|
||||||
|
// functions restores our log filter and then the goroutine spams.
|
||||||
|
func awaitNewConnLogOutput() {
|
||||||
|
awaitLogOutput(50*time.Millisecond, "grpc: the client connection is closing; please retry")
|
||||||
|
}
|
||||||
|
|
||||||
|
func awaitLogOutput(maxWait time.Duration, phrase string) {
|
||||||
|
pb := []byte(phrase)
|
||||||
|
|
||||||
|
timer := time.NewTimer(maxWait)
|
||||||
|
defer timer.Stop()
|
||||||
|
wakeup := make(chan bool, 1)
|
||||||
|
for {
|
||||||
|
if logOutputHasContents(pb, wakeup) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
select {
|
||||||
|
case <-timer.C:
|
||||||
|
// Too slow. Oh well.
|
||||||
|
return
|
||||||
|
case <-wakeup:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func logOutputHasContents(v []byte, wakeup chan<- bool) bool {
|
||||||
|
testLogOutput.mu.Lock()
|
||||||
|
defer testLogOutput.mu.Unlock()
|
||||||
|
fw, ok := testLogOutput.w.(*filterWriter)
|
||||||
|
if !ok {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
fw.mu.Lock()
|
||||||
|
defer fw.mu.Unlock()
|
||||||
|
if bytes.Contains(fw.buf.Bytes(), v) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
fw.wakeup = wakeup
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
func init() {
|
||||||
|
grpclog.SetLogger(log.New(testLogOutput, "", log.LstdFlags))
|
||||||
|
}
|
||||||
|
|
||||||
|
var verboseLogs = flag.Bool("verbose_logs", false, "show all grpclog output, without filtering")
|
||||||
|
|
||||||
|
func noop() {}
|
||||||
|
|
||||||
|
// declareLogNoise declares that t is expected to emit the following noisy phrases,
|
||||||
|
// even on success. Those phrases will be filtered from grpclog output
|
||||||
|
// and only be shown if *verbose_logs or t ends up failing.
|
||||||
|
// The returned restore function should be called with defer to be run
|
||||||
|
// before the test ends.
|
||||||
|
func declareLogNoise(t *testing.T, phrases ...string) (restore func()) {
|
||||||
|
if *verboseLogs {
|
||||||
|
return noop
|
||||||
|
}
|
||||||
|
fw := &filterWriter{dst: os.Stderr, filter: phrases}
|
||||||
|
testLogOutput.setWriter(fw)
|
||||||
|
return func() {
|
||||||
|
if t.Failed() {
|
||||||
|
fw.mu.Lock()
|
||||||
|
defer fw.mu.Unlock()
|
||||||
|
if fw.buf.Len() > 0 {
|
||||||
|
t.Logf("Complete log output:\n%s", fw.buf.Bytes())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
testLogOutput.setWriter(os.Stderr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type filterWriter struct {
|
||||||
|
dst io.Writer
|
||||||
|
filter []string
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
buf bytes.Buffer
|
||||||
|
wakeup chan<- bool // if non-nil, gets true on write
|
||||||
|
}
|
||||||
|
|
||||||
|
func (fw *filterWriter) Write(p []byte) (n int, err error) {
|
||||||
|
fw.mu.Lock()
|
||||||
|
fw.buf.Write(p)
|
||||||
|
if fw.wakeup != nil {
|
||||||
|
select {
|
||||||
|
case fw.wakeup <- true:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}
|
||||||
|
fw.mu.Unlock()
|
||||||
|
|
||||||
|
ps := string(p)
|
||||||
|
for _, f := range fw.filter {
|
||||||
|
if strings.Contains(ps, f) {
|
||||||
|
return len(p), nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return fw.dst.Write(p)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user