Rewrite timeout stop with channel

This commit is contained in:
Menghan Li
2016-04-11 13:48:26 -07:00
parent 70f7fa1c19
commit 0bf5c5c18f

View File

@ -179,26 +179,28 @@ func startServer(server *server, port int) {
type stressClient struct { type stressClient struct {
testID int testID int
address string address string
testDurationSecs int
selector *weightedRandomTestSelector selector *weightedRandomTestSelector
interopClient testpb.TestServiceClient interopClient testpb.TestServiceClient
stop <-chan bool
} }
// newStressClient construct a new stressClient. // newStressClient construct a new stressClient.
func newStressClient(id int, addr string, conn *grpc.ClientConn, selector *weightedRandomTestSelector, testDurSecs int) *stressClient { func newStressClient(id int, addr string, conn *grpc.ClientConn, selector *weightedRandomTestSelector, stop <-chan bool) *stressClient {
client := testpb.NewTestServiceClient(conn) client := testpb.NewTestServiceClient(conn)
return &stressClient{testID: id, address: addr, selector: selector, testDurationSecs: testDurSecs, interopClient: client} return &stressClient{testID: id, address: addr, selector: selector, interopClient: client, stop: stop}
} }
// mainLoop uses weightedRandomTestSelector to select test case and runs the tests. // mainLoop uses weightedRandomTestSelector to select test case and runs the tests.
func (c *stressClient) mainLoop(gauge *gauge) { func (c *stressClient) mainLoop(gauge *gauge) {
var numCalls int64 var numCalls int64
timeStarted := time.Now() timeStarted := time.Now()
for testEndTime := time.Now().Add(time.Duration(c.testDurationSecs) * time.Second); c.testDurationSecs < 0 || time.Now().Before(testEndTime); { for {
done := make(chan bool)
go func() {
test, err := c.selector.getNextTest() test, err := c.selector.getNextTest()
if err != nil { if err != nil {
grpclog.Printf("%v", err) grpclog.Printf("%v", err)
continue done <- false
} }
switch test { switch test {
case "empty_unary": case "empty_unary":
@ -214,9 +216,18 @@ func (c *stressClient) mainLoop(gauge *gauge) {
default: default:
grpclog.Fatalf("Unsupported test case: %d", test) grpclog.Fatalf("Unsupported test case: %d", test)
} }
done <- true
}()
select {
case <-c.stop:
return
case r := <-done:
if r {
numCalls++ numCalls++
gauge.set(int64(float64(numCalls) / time.Since(timeStarted).Seconds())) gauge.set(int64(float64(numCalls) / time.Since(timeStarted).Seconds()))
} }
}
}
} }
func logParameterInfo(addresses []string, tests []testCaseWithWeight) { func logParameterInfo(addresses []string, tests []testCaseWithWeight) {
@ -250,6 +261,8 @@ func main() {
var wg sync.WaitGroup var wg sync.WaitGroup
wg.Add(len(addresses) * *numChannelsPerServer * *numStubsPerChannel) wg.Add(len(addresses) * *numChannelsPerServer * *numStubsPerChannel)
stop := make(chan bool)
var clientIndex int var clientIndex int
for serverIndex, address := range addresses { for serverIndex, address := range addresses {
for connIndex := 0; connIndex < *numChannelsPerServer; connIndex++ { for connIndex := 0; connIndex < *numChannelsPerServer; connIndex++ {
@ -260,7 +273,7 @@ func main() {
defer conn.Close() defer conn.Close()
for stubIndex := 0; stubIndex < *numStubsPerChannel; stubIndex++ { for stubIndex := 0; stubIndex < *numStubsPerChannel; stubIndex++ {
clientIndex++ clientIndex++
client := newStressClient(clientIndex, address, conn, testSelector, *testDurationSecs) client := newStressClient(clientIndex, address, conn, testSelector, stop)
buf := fmt.Sprintf("/stress_test/server_%d/channel_%d/stub_%d/qps", serverIndex+1, connIndex+1, stubIndex+1) buf := fmt.Sprintf("/stress_test/server_%d/channel_%d/stub_%d/qps", serverIndex+1, connIndex+1, stubIndex+1)
go func() { go func() {
defer wg.Done() defer wg.Done()
@ -275,6 +288,10 @@ func main() {
} }
} }
go startServer(metricsServer, *metricsPort) go startServer(metricsServer, *metricsPort)
if *testDurationSecs > 0 {
time.Sleep(time.Duration(*testDurationSecs) * time.Second)
close(stop)
}
wg.Wait() wg.Wait()
grpclog.Printf(" ===== ALL DONE ===== ") grpclog.Printf(" ===== ALL DONE ===== ")