298
stress/client/main.go
Normal file
298
stress/client/main.go
Normal file
@ -0,0 +1,298 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2016, Google Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
*/
|
||||
|
||||
// client starts an interop client to do stress test and a metrics server to report qps.
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/interop"
|
||||
testpb "google.golang.org/grpc/interop/grpc_testing"
|
||||
metricspb "google.golang.org/grpc/stress/grpc_testing"
|
||||
)
|
||||
|
||||
var (
|
||||
serverAddresses = flag.String("server_addresses", "localhost:8080", "a list of server addresses")
|
||||
testCases = flag.String("test_cases", "", "a list of test cases along with the relative weights")
|
||||
testDurationSecs = flag.Int("test_duration_secs", -1, "test duration in seconds")
|
||||
numChannelsPerServer = flag.Int("num_channels_per_server", 1, "Number of channels (i.e connections) to each server")
|
||||
numStubsPerChannel = flag.Int("num_stubs_per_channel", 1, "Number of client stubs per each connection to server")
|
||||
metricsPort = flag.Int("metrics_port", 8081, "The port at which the stress client exposes QPS metrics")
|
||||
)
|
||||
|
||||
// testCaseWithWeight contains the test case type and its weight.
|
||||
type testCaseWithWeight struct {
|
||||
name string
|
||||
weight int
|
||||
}
|
||||
|
||||
// parseTestCases converts test case string to a list of struct testCaseWithWeight.
|
||||
func parseTestCases(testCaseString string) []testCaseWithWeight {
|
||||
testCaseStrings := strings.Split(testCaseString, ",")
|
||||
testCases := make([]testCaseWithWeight, len(testCaseStrings))
|
||||
for i, str := range testCaseStrings {
|
||||
testCase := strings.Split(str, ":")
|
||||
if len(testCase) != 2 {
|
||||
panic(fmt.Sprintf("invalid test case with weight: %s", str))
|
||||
}
|
||||
// Check if test case is supported.
|
||||
switch testCase[0] {
|
||||
case
|
||||
"empty_unary",
|
||||
"large_unary",
|
||||
"client_streaming",
|
||||
"server_streaming",
|
||||
"empty_stream":
|
||||
default:
|
||||
panic(fmt.Sprintf("unknown test type: %s", testCase[0]))
|
||||
}
|
||||
testCases[i].name = testCase[0]
|
||||
w, err := strconv.Atoi(testCase[1])
|
||||
if err != nil {
|
||||
panic(fmt.Sprintf("%v", err))
|
||||
}
|
||||
testCases[i].weight = w
|
||||
}
|
||||
return testCases
|
||||
}
|
||||
|
||||
// weightedRandomTestSelector defines a weighted random selector for test case types.
|
||||
type weightedRandomTestSelector struct {
|
||||
tests []testCaseWithWeight
|
||||
totalWeight int
|
||||
}
|
||||
|
||||
// newWeightedRandomTestSelector constructs a weightedRandomTestSelector with the given list of testCaseWithWeight.
|
||||
func newWeightedRandomTestSelector(tests []testCaseWithWeight) *weightedRandomTestSelector {
|
||||
var totalWeight int
|
||||
for _, t := range tests {
|
||||
totalWeight += t.weight
|
||||
}
|
||||
rand.Seed(time.Now().UnixNano())
|
||||
return &weightedRandomTestSelector{tests, totalWeight}
|
||||
}
|
||||
|
||||
func (selector weightedRandomTestSelector) getNextTest() string {
|
||||
random := rand.Intn(selector.totalWeight)
|
||||
var weightSofar int
|
||||
for _, test := range selector.tests {
|
||||
weightSofar += test.weight
|
||||
if random < weightSofar {
|
||||
return test.name
|
||||
}
|
||||
}
|
||||
panic("no test case selected by weightedRandomTestSelector")
|
||||
}
|
||||
|
||||
// gauge stores the qps of one interop client (one stub).
|
||||
type gauge struct {
|
||||
mutex sync.RWMutex
|
||||
val int64
|
||||
}
|
||||
|
||||
func (g *gauge) set(v int64) {
|
||||
g.mutex.Lock()
|
||||
defer g.mutex.Unlock()
|
||||
g.val = v
|
||||
}
|
||||
|
||||
func (g *gauge) get() int64 {
|
||||
g.mutex.RLock()
|
||||
defer g.mutex.RUnlock()
|
||||
return g.val
|
||||
}
|
||||
|
||||
// server implements metrics server functions.
|
||||
type server struct {
|
||||
mutex sync.RWMutex
|
||||
// gauges is a map from /stress_test/server_<n>/channel_<n>/stub_<n>/qps to its qps gauge.
|
||||
gauges map[string]*gauge
|
||||
}
|
||||
|
||||
// newMetricsServer returns a new metrics server.
|
||||
func newMetricsServer() *server {
|
||||
return &server{gauges: make(map[string]*gauge)}
|
||||
}
|
||||
|
||||
// GetAllGauges returns all gauges.
|
||||
func (s *server) GetAllGauges(in *metricspb.EmptyMessage, stream metricspb.MetricsService_GetAllGaugesServer) error {
|
||||
s.mutex.RLock()
|
||||
defer s.mutex.RUnlock()
|
||||
|
||||
for name, gauge := range s.gauges {
|
||||
if err := stream.Send(&metricspb.GaugeResponse{Name: name, Value: &metricspb.GaugeResponse_LongValue{gauge.get()}}); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetGauge returns the gauge for the given name.
|
||||
func (s *server) GetGauge(ctx context.Context, in *metricspb.GaugeRequest) (*metricspb.GaugeResponse, error) {
|
||||
s.mutex.RLock()
|
||||
defer s.mutex.RUnlock()
|
||||
|
||||
if g, ok := s.gauges[in.Name]; ok {
|
||||
return &metricspb.GaugeResponse{Name: in.Name, Value: &metricspb.GaugeResponse_LongValue{g.get()}}, nil
|
||||
}
|
||||
return nil, grpc.Errorf(codes.InvalidArgument, "gauge with name %s not found", in.Name)
|
||||
}
|
||||
|
||||
// createGauge creates a guage using the given name in metrics server.
|
||||
func (s *server) createGauge(name string) *gauge {
|
||||
s.mutex.Lock()
|
||||
defer s.mutex.Unlock()
|
||||
|
||||
if _, ok := s.gauges[name]; ok {
|
||||
// gauge already exists.
|
||||
panic(fmt.Sprintf("gauge %s already exists", name))
|
||||
}
|
||||
var g gauge
|
||||
s.gauges[name] = &g
|
||||
return &g
|
||||
}
|
||||
|
||||
func startServer(server *server, port int) {
|
||||
lis, err := net.Listen("tcp", ":"+strconv.Itoa(port))
|
||||
if err != nil {
|
||||
grpclog.Fatalf("failed to listen: %v", err)
|
||||
}
|
||||
|
||||
s := grpc.NewServer()
|
||||
metricspb.RegisterMetricsServiceServer(s, server)
|
||||
s.Serve(lis)
|
||||
|
||||
}
|
||||
|
||||
// performRPCs uses weightedRandomTestSelector to select test case and runs the tests.
|
||||
func performRPCs(gauge *gauge, conn *grpc.ClientConn, selector *weightedRandomTestSelector, stop <-chan bool) {
|
||||
client := testpb.NewTestServiceClient(conn)
|
||||
var numCalls int64
|
||||
startTime := time.Now()
|
||||
for {
|
||||
done := make(chan bool, 1)
|
||||
go func() {
|
||||
test := selector.getNextTest()
|
||||
switch test {
|
||||
case "empty_unary":
|
||||
interop.DoEmptyUnaryCall(client)
|
||||
case "large_unary":
|
||||
interop.DoLargeUnaryCall(client)
|
||||
case "client_streaming":
|
||||
interop.DoClientStreaming(client)
|
||||
case "server_streaming":
|
||||
interop.DoServerStreaming(client)
|
||||
case "empty_stream":
|
||||
interop.DoEmptyStream(client)
|
||||
}
|
||||
done <- true
|
||||
}()
|
||||
select {
|
||||
case <-stop:
|
||||
return
|
||||
case <-done:
|
||||
numCalls++
|
||||
gauge.set(int64(float64(numCalls) / time.Since(startTime).Seconds()))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func logParameterInfo(addresses []string, tests []testCaseWithWeight) {
|
||||
grpclog.Printf("server_addresses: %s", *serverAddresses)
|
||||
grpclog.Printf("test_cases: %s", *testCases)
|
||||
grpclog.Printf("test_duration-secs: %d", *testDurationSecs)
|
||||
grpclog.Printf("num_channels_per_server: %d", *numChannelsPerServer)
|
||||
grpclog.Printf("num_stubs_per_channel: %d", *numStubsPerChannel)
|
||||
grpclog.Printf("metrics_port: %d", *metricsPort)
|
||||
|
||||
grpclog.Println("addresses:")
|
||||
for i, addr := range addresses {
|
||||
grpclog.Printf("%d. %s\n", i+1, addr)
|
||||
}
|
||||
grpclog.Println("tests:")
|
||||
for i, test := range tests {
|
||||
grpclog.Printf("%d. %v\n", i+1, test)
|
||||
}
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
addresses := strings.Split(*serverAddresses, ",")
|
||||
tests := parseTestCases(*testCases)
|
||||
logParameterInfo(addresses, tests)
|
||||
testSelector := newWeightedRandomTestSelector(tests)
|
||||
metricsServer := newMetricsServer()
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(addresses) * *numChannelsPerServer * *numStubsPerChannel)
|
||||
stop := make(chan bool)
|
||||
|
||||
for serverIndex, address := range addresses {
|
||||
for connIndex := 0; connIndex < *numChannelsPerServer; connIndex++ {
|
||||
conn, err := grpc.Dial(address, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
grpclog.Fatalf("Fail to dial: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
for clientIndex := 0; clientIndex < *numStubsPerChannel; clientIndex++ {
|
||||
name := fmt.Sprintf("/stress_test/server_%d/channel_%d/stub_%d/qps", serverIndex+1, connIndex+1, clientIndex+1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
g := metricsServer.createGauge(name)
|
||||
performRPCs(g, conn, testSelector, stop)
|
||||
}()
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
go startServer(metricsServer, *metricsPort)
|
||||
if *testDurationSecs > 0 {
|
||||
time.Sleep(time.Duration(*testDurationSecs) * time.Second)
|
||||
close(stop)
|
||||
}
|
||||
wg.Wait()
|
||||
grpclog.Printf(" ===== ALL DONE ===== ")
|
||||
|
||||
}
|
350
stress/grpc_testing/metrics.pb.go
Normal file
350
stress/grpc_testing/metrics.pb.go
Normal file
@ -0,0 +1,350 @@
|
||||
// Code generated by protoc-gen-go.
|
||||
// source: metrics.proto
|
||||
// DO NOT EDIT!
|
||||
|
||||
/*
|
||||
Package grpc_testing is a generated protocol buffer package.
|
||||
|
||||
It is generated from these files:
|
||||
metrics.proto
|
||||
|
||||
It has these top-level messages:
|
||||
GaugeResponse
|
||||
GaugeRequest
|
||||
EmptyMessage
|
||||
*/
|
||||
package grpc_testing
|
||||
|
||||
import proto "github.com/golang/protobuf/proto"
|
||||
import fmt "fmt"
|
||||
import math "math"
|
||||
|
||||
import (
|
||||
context "golang.org/x/net/context"
|
||||
grpc "google.golang.org/grpc"
|
||||
)
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ = proto.Marshal
|
||||
var _ = fmt.Errorf
|
||||
var _ = math.Inf
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the proto package it is being compiled against.
|
||||
const _ = proto.ProtoPackageIsVersion1
|
||||
|
||||
// Reponse message containing the gauge name and value
|
||||
type GaugeResponse struct {
|
||||
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
|
||||
// Types that are valid to be assigned to Value:
|
||||
// *GaugeResponse_LongValue
|
||||
// *GaugeResponse_DoubleValue
|
||||
// *GaugeResponse_StringValue
|
||||
Value isGaugeResponse_Value `protobuf_oneof:"value"`
|
||||
}
|
||||
|
||||
func (m *GaugeResponse) Reset() { *m = GaugeResponse{} }
|
||||
func (m *GaugeResponse) String() string { return proto.CompactTextString(m) }
|
||||
func (*GaugeResponse) ProtoMessage() {}
|
||||
func (*GaugeResponse) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{0} }
|
||||
|
||||
type isGaugeResponse_Value interface {
|
||||
isGaugeResponse_Value()
|
||||
}
|
||||
|
||||
type GaugeResponse_LongValue struct {
|
||||
LongValue int64 `protobuf:"varint,2,opt,name=long_value,json=longValue,oneof"`
|
||||
}
|
||||
type GaugeResponse_DoubleValue struct {
|
||||
DoubleValue float64 `protobuf:"fixed64,3,opt,name=double_value,json=doubleValue,oneof"`
|
||||
}
|
||||
type GaugeResponse_StringValue struct {
|
||||
StringValue string `protobuf:"bytes,4,opt,name=string_value,json=stringValue,oneof"`
|
||||
}
|
||||
|
||||
func (*GaugeResponse_LongValue) isGaugeResponse_Value() {}
|
||||
func (*GaugeResponse_DoubleValue) isGaugeResponse_Value() {}
|
||||
func (*GaugeResponse_StringValue) isGaugeResponse_Value() {}
|
||||
|
||||
func (m *GaugeResponse) GetValue() isGaugeResponse_Value {
|
||||
if m != nil {
|
||||
return m.Value
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func (m *GaugeResponse) GetLongValue() int64 {
|
||||
if x, ok := m.GetValue().(*GaugeResponse_LongValue); ok {
|
||||
return x.LongValue
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *GaugeResponse) GetDoubleValue() float64 {
|
||||
if x, ok := m.GetValue().(*GaugeResponse_DoubleValue); ok {
|
||||
return x.DoubleValue
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
func (m *GaugeResponse) GetStringValue() string {
|
||||
if x, ok := m.GetValue().(*GaugeResponse_StringValue); ok {
|
||||
return x.StringValue
|
||||
}
|
||||
return ""
|
||||
}
|
||||
|
||||
// XXX_OneofFuncs is for the internal use of the proto package.
|
||||
func (*GaugeResponse) XXX_OneofFuncs() (func(msg proto.Message, b *proto.Buffer) error, func(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error), func(msg proto.Message) (n int), []interface{}) {
|
||||
return _GaugeResponse_OneofMarshaler, _GaugeResponse_OneofUnmarshaler, _GaugeResponse_OneofSizer, []interface{}{
|
||||
(*GaugeResponse_LongValue)(nil),
|
||||
(*GaugeResponse_DoubleValue)(nil),
|
||||
(*GaugeResponse_StringValue)(nil),
|
||||
}
|
||||
}
|
||||
|
||||
func _GaugeResponse_OneofMarshaler(msg proto.Message, b *proto.Buffer) error {
|
||||
m := msg.(*GaugeResponse)
|
||||
// value
|
||||
switch x := m.Value.(type) {
|
||||
case *GaugeResponse_LongValue:
|
||||
b.EncodeVarint(2<<3 | proto.WireVarint)
|
||||
b.EncodeVarint(uint64(x.LongValue))
|
||||
case *GaugeResponse_DoubleValue:
|
||||
b.EncodeVarint(3<<3 | proto.WireFixed64)
|
||||
b.EncodeFixed64(math.Float64bits(x.DoubleValue))
|
||||
case *GaugeResponse_StringValue:
|
||||
b.EncodeVarint(4<<3 | proto.WireBytes)
|
||||
b.EncodeStringBytes(x.StringValue)
|
||||
case nil:
|
||||
default:
|
||||
return fmt.Errorf("GaugeResponse.Value has unexpected type %T", x)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func _GaugeResponse_OneofUnmarshaler(msg proto.Message, tag, wire int, b *proto.Buffer) (bool, error) {
|
||||
m := msg.(*GaugeResponse)
|
||||
switch tag {
|
||||
case 2: // value.long_value
|
||||
if wire != proto.WireVarint {
|
||||
return true, proto.ErrInternalBadWireType
|
||||
}
|
||||
x, err := b.DecodeVarint()
|
||||
m.Value = &GaugeResponse_LongValue{int64(x)}
|
||||
return true, err
|
||||
case 3: // value.double_value
|
||||
if wire != proto.WireFixed64 {
|
||||
return true, proto.ErrInternalBadWireType
|
||||
}
|
||||
x, err := b.DecodeFixed64()
|
||||
m.Value = &GaugeResponse_DoubleValue{math.Float64frombits(x)}
|
||||
return true, err
|
||||
case 4: // value.string_value
|
||||
if wire != proto.WireBytes {
|
||||
return true, proto.ErrInternalBadWireType
|
||||
}
|
||||
x, err := b.DecodeStringBytes()
|
||||
m.Value = &GaugeResponse_StringValue{x}
|
||||
return true, err
|
||||
default:
|
||||
return false, nil
|
||||
}
|
||||
}
|
||||
|
||||
func _GaugeResponse_OneofSizer(msg proto.Message) (n int) {
|
||||
m := msg.(*GaugeResponse)
|
||||
// value
|
||||
switch x := m.Value.(type) {
|
||||
case *GaugeResponse_LongValue:
|
||||
n += proto.SizeVarint(2<<3 | proto.WireVarint)
|
||||
n += proto.SizeVarint(uint64(x.LongValue))
|
||||
case *GaugeResponse_DoubleValue:
|
||||
n += proto.SizeVarint(3<<3 | proto.WireFixed64)
|
||||
n += 8
|
||||
case *GaugeResponse_StringValue:
|
||||
n += proto.SizeVarint(4<<3 | proto.WireBytes)
|
||||
n += proto.SizeVarint(uint64(len(x.StringValue)))
|
||||
n += len(x.StringValue)
|
||||
case nil:
|
||||
default:
|
||||
panic(fmt.Sprintf("proto: unexpected type %T in oneof", x))
|
||||
}
|
||||
return n
|
||||
}
|
||||
|
||||
// Request message containing the gauge name
|
||||
type GaugeRequest struct {
|
||||
Name string `protobuf:"bytes,1,opt,name=name" json:"name,omitempty"`
|
||||
}
|
||||
|
||||
func (m *GaugeRequest) Reset() { *m = GaugeRequest{} }
|
||||
func (m *GaugeRequest) String() string { return proto.CompactTextString(m) }
|
||||
func (*GaugeRequest) ProtoMessage() {}
|
||||
func (*GaugeRequest) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{1} }
|
||||
|
||||
type EmptyMessage struct {
|
||||
}
|
||||
|
||||
func (m *EmptyMessage) Reset() { *m = EmptyMessage{} }
|
||||
func (m *EmptyMessage) String() string { return proto.CompactTextString(m) }
|
||||
func (*EmptyMessage) ProtoMessage() {}
|
||||
func (*EmptyMessage) Descriptor() ([]byte, []int) { return fileDescriptor0, []int{2} }
|
||||
|
||||
func init() {
|
||||
proto.RegisterType((*GaugeResponse)(nil), "grpc.testing.GaugeResponse")
|
||||
proto.RegisterType((*GaugeRequest)(nil), "grpc.testing.GaugeRequest")
|
||||
proto.RegisterType((*EmptyMessage)(nil), "grpc.testing.EmptyMessage")
|
||||
}
|
||||
|
||||
// Reference imports to suppress errors if they are not otherwise used.
|
||||
var _ context.Context
|
||||
var _ grpc.ClientConn
|
||||
|
||||
// This is a compile-time assertion to ensure that this generated file
|
||||
// is compatible with the grpc package it is being compiled against.
|
||||
const _ = grpc.SupportPackageIsVersion1
|
||||
|
||||
// Client API for MetricsService service
|
||||
|
||||
type MetricsServiceClient interface {
|
||||
// Returns the values of all the gauges that are currently being maintained by
|
||||
// the service
|
||||
GetAllGauges(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (MetricsService_GetAllGaugesClient, error)
|
||||
// Returns the value of one gauge
|
||||
GetGauge(ctx context.Context, in *GaugeRequest, opts ...grpc.CallOption) (*GaugeResponse, error)
|
||||
}
|
||||
|
||||
type metricsServiceClient struct {
|
||||
cc *grpc.ClientConn
|
||||
}
|
||||
|
||||
func NewMetricsServiceClient(cc *grpc.ClientConn) MetricsServiceClient {
|
||||
return &metricsServiceClient{cc}
|
||||
}
|
||||
|
||||
func (c *metricsServiceClient) GetAllGauges(ctx context.Context, in *EmptyMessage, opts ...grpc.CallOption) (MetricsService_GetAllGaugesClient, error) {
|
||||
stream, err := grpc.NewClientStream(ctx, &_MetricsService_serviceDesc.Streams[0], c.cc, "/grpc.testing.MetricsService/GetAllGauges", opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
x := &metricsServiceGetAllGaugesClient{stream}
|
||||
if err := x.ClientStream.SendMsg(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if err := x.ClientStream.CloseSend(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return x, nil
|
||||
}
|
||||
|
||||
type MetricsService_GetAllGaugesClient interface {
|
||||
Recv() (*GaugeResponse, error)
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
type metricsServiceGetAllGaugesClient struct {
|
||||
grpc.ClientStream
|
||||
}
|
||||
|
||||
func (x *metricsServiceGetAllGaugesClient) Recv() (*GaugeResponse, error) {
|
||||
m := new(GaugeResponse)
|
||||
if err := x.ClientStream.RecvMsg(m); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return m, nil
|
||||
}
|
||||
|
||||
func (c *metricsServiceClient) GetGauge(ctx context.Context, in *GaugeRequest, opts ...grpc.CallOption) (*GaugeResponse, error) {
|
||||
out := new(GaugeResponse)
|
||||
err := grpc.Invoke(ctx, "/grpc.testing.MetricsService/GetGauge", in, out, c.cc, opts...)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
// Server API for MetricsService service
|
||||
|
||||
type MetricsServiceServer interface {
|
||||
// Returns the values of all the gauges that are currently being maintained by
|
||||
// the service
|
||||
GetAllGauges(*EmptyMessage, MetricsService_GetAllGaugesServer) error
|
||||
// Returns the value of one gauge
|
||||
GetGauge(context.Context, *GaugeRequest) (*GaugeResponse, error)
|
||||
}
|
||||
|
||||
func RegisterMetricsServiceServer(s *grpc.Server, srv MetricsServiceServer) {
|
||||
s.RegisterService(&_MetricsService_serviceDesc, srv)
|
||||
}
|
||||
|
||||
func _MetricsService_GetAllGauges_Handler(srv interface{}, stream grpc.ServerStream) error {
|
||||
m := new(EmptyMessage)
|
||||
if err := stream.RecvMsg(m); err != nil {
|
||||
return err
|
||||
}
|
||||
return srv.(MetricsServiceServer).GetAllGauges(m, &metricsServiceGetAllGaugesServer{stream})
|
||||
}
|
||||
|
||||
type MetricsService_GetAllGaugesServer interface {
|
||||
Send(*GaugeResponse) error
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
type metricsServiceGetAllGaugesServer struct {
|
||||
grpc.ServerStream
|
||||
}
|
||||
|
||||
func (x *metricsServiceGetAllGaugesServer) Send(m *GaugeResponse) error {
|
||||
return x.ServerStream.SendMsg(m)
|
||||
}
|
||||
|
||||
func _MetricsService_GetGauge_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) {
|
||||
in := new(GaugeRequest)
|
||||
if err := dec(in); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
out, err := srv.(MetricsServiceServer).GetGauge(ctx, in)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return out, nil
|
||||
}
|
||||
|
||||
var _MetricsService_serviceDesc = grpc.ServiceDesc{
|
||||
ServiceName: "grpc.testing.MetricsService",
|
||||
HandlerType: (*MetricsServiceServer)(nil),
|
||||
Methods: []grpc.MethodDesc{
|
||||
{
|
||||
MethodName: "GetGauge",
|
||||
Handler: _MetricsService_GetGauge_Handler,
|
||||
},
|
||||
},
|
||||
Streams: []grpc.StreamDesc{
|
||||
{
|
||||
StreamName: "GetAllGauges",
|
||||
Handler: _MetricsService_GetAllGauges_Handler,
|
||||
ServerStreams: true,
|
||||
},
|
||||
},
|
||||
}
|
||||
|
||||
var fileDescriptor0 = []byte{
|
||||
// 253 bytes of a gzipped FileDescriptorProto
|
||||
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x09, 0x6e, 0x88, 0x02, 0xff, 0xe2, 0xe2, 0xcd, 0x4d, 0x2d, 0x29,
|
||||
0xca, 0x4c, 0x2e, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x49, 0x2f, 0x2a, 0x48, 0xd6,
|
||||
0x2b, 0x49, 0x2d, 0x2e, 0xc9, 0xcc, 0x4b, 0x57, 0x9a, 0xce, 0xc8, 0xc5, 0xeb, 0x9e, 0x58, 0x9a,
|
||||
0x9e, 0x1a, 0x94, 0x5a, 0x5c, 0x90, 0x9f, 0x57, 0x9c, 0x2a, 0x24, 0xc4, 0xc5, 0x92, 0x97, 0x98,
|
||||
0x9b, 0x2a, 0xc1, 0xa8, 0xc0, 0xa8, 0xc1, 0x19, 0x04, 0x66, 0x0b, 0xc9, 0x73, 0x71, 0xe5, 0xe4,
|
||||
0xe7, 0xa5, 0xc7, 0x97, 0x25, 0xe6, 0x94, 0xa6, 0x4a, 0x30, 0x01, 0x65, 0x98, 0x3d, 0x18, 0x82,
|
||||
0x38, 0x41, 0x62, 0x61, 0x20, 0x21, 0x21, 0x65, 0x2e, 0x9e, 0x94, 0xfc, 0xd2, 0xa4, 0x9c, 0x54,
|
||||
0xa8, 0x12, 0x66, 0xa0, 0x12, 0x46, 0xa0, 0x12, 0x6e, 0x88, 0x28, 0x5c, 0x51, 0x31, 0xd0, 0x25,
|
||||
0x70, 0x73, 0x58, 0x40, 0x36, 0x80, 0x14, 0x41, 0x44, 0xc1, 0x8a, 0x9c, 0xd8, 0xb9, 0x58, 0xc1,
|
||||
0xb2, 0x4a, 0x4a, 0x5c, 0x3c, 0x50, 0x87, 0x15, 0x96, 0x02, 0x1d, 0x8b, 0xcd, 0x5d, 0x4a, 0x7c,
|
||||
0x5c, 0x3c, 0xae, 0xb9, 0x05, 0x25, 0x95, 0xbe, 0xa9, 0xc5, 0xc5, 0x89, 0xe9, 0xa9, 0x46, 0x0b,
|
||||
0x18, 0xb9, 0xf8, 0x7c, 0x21, 0xbe, 0x0d, 0x4e, 0x2d, 0x2a, 0xcb, 0x4c, 0x4e, 0x15, 0xf2, 0x04,
|
||||
0x1a, 0x93, 0x5a, 0xe2, 0x98, 0x93, 0x03, 0x36, 0xac, 0x58, 0x48, 0x4a, 0x0f, 0xd9, 0xff, 0x7a,
|
||||
0xc8, 0xda, 0xa5, 0xa4, 0x51, 0xe5, 0x50, 0xc2, 0xc5, 0x80, 0x51, 0xc8, 0x99, 0x8b, 0x03, 0x68,
|
||||
0x14, 0x58, 0x14, 0xdd, 0x18, 0x64, 0x97, 0xe2, 0x35, 0x26, 0x89, 0x0d, 0x1c, 0x0b, 0xc6, 0x80,
|
||||
0x00, 0x00, 0x00, 0xff, 0xff, 0x5e, 0x7d, 0xb2, 0xc9, 0x96, 0x01, 0x00, 0x00,
|
||||
}
|
64
stress/grpc_testing/metrics.proto
Normal file
64
stress/grpc_testing/metrics.proto
Normal file
@ -0,0 +1,64 @@
|
||||
// Copyright 2015-2016, Google Inc.
|
||||
// All rights reserved.
|
||||
//
|
||||
// Redistribution and use in source and binary forms, with or without
|
||||
// modification, are permitted provided that the following conditions are
|
||||
// met:
|
||||
//
|
||||
// * Redistributions of source code must retain the above copyright
|
||||
// notice, this list of conditions and the following disclaimer.
|
||||
// * Redistributions in binary form must reproduce the above
|
||||
// copyright notice, this list of conditions and the following disclaimer
|
||||
// in the documentation and/or other materials provided with the
|
||||
// distribution.
|
||||
// * Neither the name of Google Inc. nor the names of its
|
||||
// contributors may be used to endorse or promote products derived from
|
||||
// this software without specific prior written permission.
|
||||
//
|
||||
// THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
// "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
// LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
// A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
// OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
// SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
// LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
// DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
// THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
// (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
// OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
|
||||
// Contains the definitions for a metrics service and the type of metrics
|
||||
// exposed by the service.
|
||||
//
|
||||
// Currently, 'Gauge' (i.e a metric that represents the measured value of
|
||||
// something at an instant of time) is the only metric type supported by the
|
||||
// service.
|
||||
syntax = "proto3";
|
||||
|
||||
package grpc.testing;
|
||||
|
||||
// Reponse message containing the gauge name and value
|
||||
message GaugeResponse {
|
||||
string name = 1;
|
||||
oneof value {
|
||||
int64 long_value = 2;
|
||||
double double_value = 3;
|
||||
string string_value = 4;
|
||||
}
|
||||
}
|
||||
|
||||
// Request message containing the gauge name
|
||||
message GaugeRequest {
|
||||
string name = 1;
|
||||
}
|
||||
|
||||
message EmptyMessage {}
|
||||
|
||||
service MetricsService {
|
||||
// Returns the values of all the gauges that are currently being maintained by
|
||||
// the service
|
||||
rpc GetAllGauges(EmptyMessage) returns (stream GaugeResponse);
|
||||
|
||||
// Returns the value of one gauge
|
||||
rpc GetGauge(GaugeRequest) returns (GaugeResponse);
|
||||
}
|
97
stress/metrics_client/main.go
Normal file
97
stress/metrics_client/main.go
Normal file
@ -0,0 +1,97 @@
|
||||
/*
|
||||
*
|
||||
* Copyright 2016, Google Inc.
|
||||
* All rights reserved.
|
||||
*
|
||||
* Redistribution and use in source and binary forms, with or without
|
||||
* modification, are permitted provided that the following conditions are
|
||||
* met:
|
||||
*
|
||||
* * Redistributions of source code must retain the above copyright
|
||||
* notice, this list of conditions and the following disclaimer.
|
||||
* * Redistributions in binary form must reproduce the above
|
||||
* copyright notice, this list of conditions and the following disclaimer
|
||||
* in the documentation and/or other materials provided with the
|
||||
* distribution.
|
||||
* * Neither the name of Google Inc. nor the names of its
|
||||
* contributors may be used to endorse or promote products derived from
|
||||
* this software without specific prior written permission.
|
||||
*
|
||||
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
|
||||
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
|
||||
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
|
||||
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
|
||||
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
|
||||
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
|
||||
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
|
||||
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
|
||||
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
|
||||
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
|
||||
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
|
||||
*
|
||||
*/
|
||||
|
||||
package main
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"google.golang.org/grpc"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
metricspb "google.golang.org/grpc/stress/grpc_testing"
|
||||
)
|
||||
|
||||
var (
|
||||
metricsServerAddress = flag.String("metrics_server_address", "", "The metrics server addresses in the fomrat <hostname>:<port>")
|
||||
totalOnly = flag.Bool("total_only", false, "If true, this prints only the total value of all gauges")
|
||||
)
|
||||
|
||||
func printMetrics(client metricspb.MetricsServiceClient, totalOnly bool) {
|
||||
stream, err := client.GetAllGauges(context.Background(), &metricspb.EmptyMessage{})
|
||||
if err != nil {
|
||||
grpclog.Fatalf("failed to call GetAllGuages: %v", err)
|
||||
}
|
||||
|
||||
var (
|
||||
overallQPS int64
|
||||
rpcStatus error
|
||||
)
|
||||
for {
|
||||
gaugeResponse, err := stream.Recv()
|
||||
if err != nil {
|
||||
rpcStatus = err
|
||||
break
|
||||
}
|
||||
if _, ok := gaugeResponse.GetValue().(*metricspb.GaugeResponse_LongValue); !ok {
|
||||
panic(fmt.Sprintf("gauge %s is not a long value", gaugeResponse.Name))
|
||||
}
|
||||
v := gaugeResponse.GetLongValue()
|
||||
if !totalOnly {
|
||||
grpclog.Printf("%s: %d", gaugeResponse.Name, v)
|
||||
}
|
||||
overallQPS += v
|
||||
}
|
||||
if rpcStatus != io.EOF {
|
||||
grpclog.Fatalf("failed to finish server streaming: %v", rpcStatus)
|
||||
}
|
||||
grpclog.Printf("overall qps: %d", overallQPS)
|
||||
}
|
||||
|
||||
func main() {
|
||||
flag.Parse()
|
||||
if *metricsServerAddress == "" {
|
||||
grpclog.Fatalf("Metrics server address is empty.")
|
||||
}
|
||||
|
||||
conn, err := grpc.Dial(*metricsServerAddress, grpc.WithInsecure())
|
||||
if err != nil {
|
||||
grpclog.Fatalf("cannot connect to metrics server: %v", err)
|
||||
}
|
||||
defer conn.Close()
|
||||
|
||||
c := metricspb.NewMetricsServiceClient(conn)
|
||||
printMetrics(c, *totalOnly)
|
||||
}
|
Reference in New Issue
Block a user