From f7992fa659cdb1645bf8eb596d3d80a615947f56 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 4 Apr 2016 15:52:17 -0700 Subject: [PATCH] Add metrics server to stress client and add metrics client --- stress/client/main.go | 130 +++++++++++++++++++++++++++++----- stress/metrics_client/main.go | 65 +++++++++++++++++ 2 files changed, 179 insertions(+), 16 deletions(-) create mode 100644 stress/metrics_client/main.go diff --git a/stress/client/main.go b/stress/client/main.go index cd611047..11ccbc0e 100644 --- a/stress/client/main.go +++ b/stress/client/main.go @@ -4,15 +4,19 @@ import ( "flag" "fmt" "math/rand" + "net" "strconv" "strings" "sync" "time" + "golang.org/x/net/context" "google.golang.org/grpc" + "google.golang.org/grpc/codes" "google.golang.org/grpc/grpclog" "google.golang.org/grpc/interop" testpb "google.golang.org/grpc/interop/grpc_testing" + metricspb "google.golang.org/grpc/stress/grpc_testing" ) var ( @@ -105,23 +109,23 @@ func parseTestCases(testCaseString string) ([]testCaseWithWeight, error) { return testCases, nil } -// WeightedRandomTestSelector defines a weighted random selector for test case types. -type WeightedRandomTestSelector struct { +// weightedRandomTestSelector defines a weighted random selector for test case types. +type weightedRandomTestSelector struct { tests []testCaseWithWeight totalWeight int } -// newWeightedRandomTestSelector constructs a WeightedRandomTestSelector with the given list of testCaseWithWeight. -func newWeightedRandomTestSelector(tests []testCaseWithWeight) *WeightedRandomTestSelector { +// newWeightedRandomTestSelector constructs a weightedRandomTestSelector with the given list of testCaseWithWeight. +func newWeightedRandomTestSelector(tests []testCaseWithWeight) *weightedRandomTestSelector { var totalWeight int for _, t := range tests { totalWeight += t.weight } rand.Seed(time.Now().UnixNano()) - return &WeightedRandomTestSelector{tests, totalWeight} + return &weightedRandomTestSelector{tests, totalWeight} } -func (selector WeightedRandomTestSelector) getNextTest() (testCaseType, error) { +func (selector weightedRandomTestSelector) getNextTest() (testCaseType, error) { random := rand.Intn(selector.totalWeight) var weightSofar int for _, test := range selector.tests { @@ -130,26 +134,110 @@ func (selector WeightedRandomTestSelector) getNextTest() (testCaseType, error) { return test.testCase, nil } } - return unknownTest, fmt.Errorf("no test case selected by WeightedRandomTestSelector") + return unknownTest, fmt.Errorf("no test case selected by weightedRandomTestSelector") } -// StressClient defines client for stress test. -type StressClient struct { +// gauge defines type for gauge. +type gauge struct { + mutex sync.RWMutex + val int64 +} + +// Set updates the gauge value +func (g *gauge) set(v int64) { + g.mutex.Lock() + defer g.mutex.Unlock() + g.val = v +} + +func (g *gauge) get() int64 { + g.mutex.RLock() + defer g.mutex.RUnlock() + return g.val +} + +// Server implements metrics server functions. +type server struct { + mutex sync.RWMutex + gauges map[string]*gauge +} + +// newMetricsServer returns a new metrics server. +func newMetricsServer() *server { + return &server{gauges: make(map[string]*gauge)} +} + +// GetAllGauges returns all gauges. +func (s *server) GetAllGauges(in *metricspb.EmptyMessage, stream metricspb.MetricsService_GetAllGaugesServer) error { + s.mutex.RLock() + defer s.mutex.RUnlock() + + for name, gauge := range s.gauges { + err := stream.Send(&metricspb.GaugeResponse{Name: name, Value: &metricspb.GaugeResponse_LongValue{gauge.get()}}) + if err != nil { + return err + } + } + return nil +} + +// GetGauge returns the gauge for the given name. +func (s *server) GetGauge(ctx context.Context, in *metricspb.GaugeRequest) (*metricspb.GaugeResponse, error) { + s.mutex.RLock() + defer s.mutex.RUnlock() + + if g, ok := s.gauges[in.Name]; ok { + return &metricspb.GaugeResponse{Name: in.Name, Value: &metricspb.GaugeResponse_LongValue{g.get()}}, nil + } + return nil, grpc.Errorf(codes.InvalidArgument, "gauge with name %s not found", in.Name) +} + +// CreateGauge creates a guage using the given name in metrics server +func (s *server) createGauge(name string) (*gauge, error) { + s.mutex.Lock() + defer s.mutex.Unlock() + + grpclog.Printf("create gauge: %s", name) + if _, ok := s.gauges[name]; ok { + // gauge already exists. + return nil, fmt.Errorf("gauge %s already exists", name) + } + var g gauge + s.gauges[name] = &g + return &g, nil +} + +func startServer(server *server, port int) { + lis, err := net.Listen("tcp", ":"+strconv.Itoa(port)) + if err != nil { + grpclog.Fatalf("failed to listen: %v", err) + } + + s := grpc.NewServer() + metricspb.RegisterMetricsServiceServer(s, server) + s.Serve(lis) + +} + +// stressClient defines client for stress test. +type stressClient struct { testID int address string testDurationSecs int - selector *WeightedRandomTestSelector + selector *weightedRandomTestSelector interopClient testpb.TestServiceClient } -// newStressClient construct a new StressClient. -func newStressClient(id int, addr string, conn *grpc.ClientConn, selector *WeightedRandomTestSelector, testDurSecs int) *StressClient { +// newStressClient construct a new stressClient. +func newStressClient(id int, addr string, conn *grpc.ClientConn, selector *weightedRandomTestSelector, testDurSecs int) *stressClient { client := testpb.NewTestServiceClient(conn) - return &StressClient{testID: id, address: addr, selector: selector, testDurationSecs: testDurSecs, interopClient: client} + return &stressClient{testID: id, address: addr, selector: selector, testDurationSecs: testDurSecs, interopClient: client} } -// MainLoop uses WeightedRandomTestSelector to select test case and runs the tests. -func (c *StressClient) MainLoop(buf string) { +// mainLoop uses weightedRandomTestSelector to select test case and runs the tests. +func (c *stressClient) mainLoop(gauge *gauge) { + var numCalls int64 + timeStarted := time.Now() for testEndTime := time.Now().Add(time.Duration(c.testDurationSecs) * time.Second); c.testDurationSecs < 0 || time.Now().Before(testEndTime); { test, err := c.selector.getNextTest() if err != nil { @@ -170,6 +258,8 @@ func (c *StressClient) MainLoop(buf string) { default: grpclog.Fatal("Unsupported test case: %d", test) } + numCalls++ + gauge.set(int64(float64(numCalls) / time.Since(timeStarted).Seconds())) } } @@ -198,6 +288,8 @@ func main() { } logParameterInfo(serverAddresses, testCases) testSelector := newWeightedRandomTestSelector(testCases) + metricsServer := newMetricsServer() + var wg sync.WaitGroup wg.Add(len(serverAddresses) * *numChannelsPerServerPtr * *numStubsPerChannelPtr) var clientIndex int @@ -214,12 +306,18 @@ func main() { buf := fmt.Sprintf("/stress_test/server_%d/channel_%d/stub_%d/qps", serverIndex+1, connIndex+1, stubIndex+1) go func() { defer wg.Done() - client.MainLoop(buf) + if g, err := metricsServer.createGauge(buf); err != nil { + grpclog.Fatalf("%v", err) + } else { + client.mainLoop(g) + } }() } } } + go startServer(metricsServer, *metricsPortPtr) wg.Wait() grpclog.Printf(" ===== ALL DONE ===== ") + } diff --git a/stress/metrics_client/main.go b/stress/metrics_client/main.go new file mode 100644 index 00000000..2d22819a --- /dev/null +++ b/stress/metrics_client/main.go @@ -0,0 +1,65 @@ +package main + +import ( + "flag" + "io" + "time" + + "golang.org/x/net/context" + "google.golang.org/grpc" + "google.golang.org/grpc/grpclog" + metricspb "google.golang.org/grpc/stress/grpc_testing" +) + +var ( + metricsServerAddressPtr = flag.String("metrics_server_address", "", "The metrics server addresses in the fomrat :") + totalOnlyPtr = flag.Bool("total_only", false, "If true, this prints only the total value of all gauges") +) + +const timeoutSeconds = 10 + +func printMetrics(client metricspb.MetricsServiceClient, totalOnly bool) { + ctx, _ := context.WithTimeout(context.Background(), timeoutSeconds*time.Second) + stream, err := client.GetAllGauges(ctx, &metricspb.EmptyMessage{}) + if err != nil { + grpclog.Fatalf("failed to call GetAllGuages: %v", err) + } + + var overallQPS int64 + var rpcStatus error + for { + gaugeResponse, err := stream.Recv() + if err != nil { + rpcStatus = err + break + } + if _, ok := gaugeResponse.GetValue().(*metricspb.GaugeResponse_LongValue); ok { + if !totalOnly { + grpclog.Printf("%s: %d", gaugeResponse.Name, gaugeResponse.GetLongValue()) + } + overallQPS += gaugeResponse.GetLongValue() + } else { + grpclog.Printf("gauge %s is not a long value", gaugeResponse.Name) + } + } + grpclog.Printf("overall qps: %d", overallQPS) + if rpcStatus != io.EOF { + grpclog.Fatalf("failed to finish server streaming: %v", rpcStatus) + } +} + +func main() { + flag.Parse() + if len(*metricsServerAddressPtr) == 0 { + grpclog.Fatalf("Cannot connect to the Metrics server. Please pass the address of the metrics server to connect to via the 'metrics_server_address' flag") + } + + conn, err := grpc.Dial(*metricsServerAddressPtr, grpc.WithInsecure()) + if err != nil { + grpclog.Fatalf("cannot connect to metrics server: %v", err) + } + defer conn.Close() + + c := metricspb.NewMetricsServiceClient(conn) + printMetrics(c, *totalOnlyPtr) +}