interop: update --fail_on_failed_rpc to wait for initial success (#3763)
This commit is contained in:
@ -27,6 +27,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
|
"sync/atomic"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@ -73,7 +74,7 @@ func (watcher *statsWatcher) buildResp() *testpb.LoadBalancerStatsResponse {
|
|||||||
}
|
}
|
||||||
|
|
||||||
var (
|
var (
|
||||||
failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail")
|
failOnFailedRPC = flag.Bool("fail_on_failed_rpc", false, "Fail client if any RPCs fail after first success")
|
||||||
numChannels = flag.Int("num_channels", 1, "Num of channels")
|
numChannels = flag.Int("num_channels", 1, "Num of channels")
|
||||||
printResponse = flag.Bool("print_response", false, "Write RPC response to stdout")
|
printResponse = flag.Bool("print_response", false, "Write RPC response to stdout")
|
||||||
qps = flag.Int("qps", 1, "QPS per channel, for each type of RPC")
|
qps = flag.Int("qps", 1, "QPS per channel, for each type of RPC")
|
||||||
@ -86,12 +87,24 @@ var (
|
|||||||
mu sync.Mutex
|
mu sync.Mutex
|
||||||
currentRequestID int32
|
currentRequestID int32
|
||||||
watchers = make(map[statsWatcherKey]*statsWatcher)
|
watchers = make(map[statsWatcherKey]*statsWatcher)
|
||||||
|
|
||||||
|
// 0 or 1 representing an RPC has succeeded. Use hasRPCSucceeded and
|
||||||
|
// setRPCSucceeded to access in a safe manner.
|
||||||
|
rpcSucceeded uint32
|
||||||
)
|
)
|
||||||
|
|
||||||
type statsService struct {
|
type statsService struct {
|
||||||
testpb.UnimplementedLoadBalancerStatsServiceServer
|
testpb.UnimplementedLoadBalancerStatsServiceServer
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func hasRPCSucceeded() bool {
|
||||||
|
return atomic.LoadUint32(&rpcSucceeded) > 0
|
||||||
|
}
|
||||||
|
|
||||||
|
func setRPCSucceeded() {
|
||||||
|
atomic.StoreUint32(&rpcSucceeded, 1)
|
||||||
|
}
|
||||||
|
|
||||||
// Wait for the next LoadBalancerStatsRequest.GetNumRpcs to start and complete,
|
// Wait for the next LoadBalancerStatsRequest.GetNumRpcs to start and complete,
|
||||||
// and return the distribution of remote peers. This is essentially a clientside
|
// and return the distribution of remote peers. This is essentially a clientside
|
||||||
// LB reporting mechanism that is designed to be queried by an external test
|
// LB reporting mechanism that is designed to be queried by an external test
|
||||||
@ -291,9 +304,12 @@ func sendRPCs(clients []testpb.TestServiceClient, cfgs []*rpcConfig, ticker *tim
|
|||||||
// This sends an empty string if the RPC failed.
|
// This sends an empty string if the RPC failed.
|
||||||
watcher.chanHosts <- info
|
watcher.chanHosts <- info
|
||||||
}
|
}
|
||||||
if err != nil && *failOnFailedRPC {
|
if err != nil && *failOnFailedRPC && hasRPCSucceeded() {
|
||||||
grpclog.Fatalf("RPC failed: %v", err)
|
grpclog.Fatalf("RPC failed: %v", err)
|
||||||
}
|
}
|
||||||
|
if err == nil {
|
||||||
|
setRPCSucceeded()
|
||||||
|
}
|
||||||
if *printResponse {
|
if *printResponse {
|
||||||
if err == nil {
|
if err == nil {
|
||||||
if cfg.typ == unaryCall {
|
if cfg.typ == unaryCall {
|
||||||
|
Reference in New Issue
Block a user