From 9fd1d5bee86a10d55ecdd45c9d2dff95d250cd72 Mon Sep 17 00:00:00 2001 From: Menghan Li Date: Mon, 18 Apr 2016 14:48:19 -0700 Subject: [PATCH] Implement QPS workerServer and Add RunServer() for benchmarkServer --- benchmark/benchmark.go | 7 +- benchmark/server/testdata/server1.key | 16 ++++ benchmark/server/testdata/server1.pem | 16 ++++ benchmark/worker/benchmark_server.go | 109 ++++++++++++++++++++++++++ benchmark/worker/main.go | 108 +++++++++++++++++++++++++ 5 files changed, 252 insertions(+), 4 deletions(-) create mode 100644 benchmark/server/testdata/server1.key create mode 100644 benchmark/server/testdata/server1.pem create mode 100644 benchmark/worker/benchmark_server.go create mode 100644 benchmark/worker/main.go diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index e2cd51b3..c86d96c2 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -38,7 +38,6 @@ package benchmark import ( "io" - "math" "net" "golang.org/x/net/context" @@ -95,15 +94,15 @@ func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallS // StartServer starts a gRPC server serving a benchmark service on the given // address, which may be something like "localhost:0". It returns its listen // address and a function to stop the server. -func StartServer(addr string) (string, func()) { +func StartServer(addr string, opts ...grpc.ServerOption) (int, func()) { lis, err := net.Listen("tcp", addr) if err != nil { grpclog.Fatalf("Failed to listen: %v", err) } - s := grpc.NewServer(grpc.MaxConcurrentStreams(math.MaxUint32)) + s := grpc.NewServer(opts...) testpb.RegisterBenchmarkServiceServer(s, &testServer{}) go s.Serve(lis) - return lis.Addr().String(), func() { + return lis.Addr().(*net.TCPAddr).Port, func() { s.Stop() } } diff --git a/benchmark/server/testdata/server1.key b/benchmark/server/testdata/server1.key new file mode 100644 index 00000000..143a5b87 --- /dev/null +++ b/benchmark/server/testdata/server1.key @@ -0,0 +1,16 @@ +-----BEGIN PRIVATE KEY----- +MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD +M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf +3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY +AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm +V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY +tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p +dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q +K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR +81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff +DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd +aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2 +ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3 +XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe +F98XJ7tIFfJq +-----END PRIVATE KEY----- diff --git a/benchmark/server/testdata/server1.pem b/benchmark/server/testdata/server1.pem new file mode 100644 index 00000000..f3d43fcc --- /dev/null +++ b/benchmark/server/testdata/server1.pem @@ -0,0 +1,16 @@ +-----BEGIN CERTIFICATE----- +MIICnDCCAgWgAwIBAgIBBzANBgkqhkiG9w0BAQsFADBWMQswCQYDVQQGEwJBVTET +MBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQ +dHkgTHRkMQ8wDQYDVQQDEwZ0ZXN0Y2EwHhcNMTUxMTA0MDIyMDI0WhcNMjUxMTAx +MDIyMDI0WjBlMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV +BAcTB0NoaWNhZ28xFTATBgNVBAoTDEV4YW1wbGUsIENvLjEaMBgGA1UEAxQRKi50 +ZXN0Lmdvb2dsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAOHDFSco +LCVJpYDDM4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1Bg +zkWF+slf3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd +9N8YwbBYAckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAGjazBpMAkGA1UdEwQCMAAw +CwYDVR0PBAQDAgXgME8GA1UdEQRIMEaCECoudGVzdC5nb29nbGUuZnKCGHdhdGVy +em9vaS50ZXN0Lmdvb2dsZS5iZYISKi50ZXN0LnlvdXR1YmUuY29thwTAqAEDMA0G +CSqGSIb3DQEBCwUAA4GBAJFXVifQNub1LUP4JlnX5lXNlo8FxZ2a12AFQs+bzoJ6 +hM044EDjqyxUqSbVePK0ni3w1fHQB5rY9yYC5f8G7aqqTY1QOhoUk8ZTSTRpnkTh +y4jjdvTZeLDVBlueZUTDRmy2feY5aZIU18vFDK08dTG0A87pppuv1LNIR3loveU8 +-----END CERTIFICATE----- diff --git a/benchmark/worker/benchmark_server.go b/benchmark/worker/benchmark_server.go new file mode 100644 index 00000000..76d4be93 --- /dev/null +++ b/benchmark/worker/benchmark_server.go @@ -0,0 +1,109 @@ +package main + +import ( + "runtime" + "strconv" + "sync" + "time" + + "google.golang.org/grpc" + "google.golang.org/grpc/benchmark" + testpb "google.golang.org/grpc/benchmark/grpc_testing" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" + "google.golang.org/grpc/grpclog" +) + +var ( + // TODO change filepath + certFile = "/usr/local/google/home/menghanl/go/src/google.golang.org/grpc/benchmark/server/testdata/server1.pem" + keyFile = "/usr/local/google/home/menghanl/go/src/google.golang.org/grpc/benchmark/server/testdata/server1.key" +) + +type benchmarkServer struct { + port int + close func() + mu sync.RWMutex + lastResetTime time.Time +} + +func startBenchmarkServerWithSetup(setup *testpb.ServerConfig) (*benchmarkServer, error) { + var opts []grpc.ServerOption + + grpclog.Printf(" - server type: %v", setup.ServerType) + switch setup.ServerType { + // Ignore server type. + case testpb.ServerType_SYNC_SERVER: + case testpb.ServerType_ASYNC_SERVER: + case testpb.ServerType_ASYNC_GENERIC_SERVER: + default: + return nil, grpc.Errorf(codes.InvalidArgument, "unknow server type: %v", setup.ServerType) + } + + grpclog.Printf(" - security params: %v", setup.SecurityParams) + if setup.SecurityParams != nil { + creds, err := credentials.NewServerTLSFromFile(certFile, keyFile) + if err != nil { + grpclog.Fatalf("failed to generate credentials %v", err) + } + opts = append(opts, grpc.Creds(creds)) + } + + // Ignore async server threads. + + grpclog.Printf(" - core limit: %v", setup.CoreLimit) + + grpclog.Printf(" - payload config: %v", setup.PayloadConfig) + if setup.PayloadConfig != nil { + // TODO payload config + grpclog.Printf("payload config: %v", setup.PayloadConfig) + switch setup.PayloadConfig.Payload.(type) { + case *testpb.PayloadConfig_BytebufParams: + case *testpb.PayloadConfig_SimpleParams: + case *testpb.PayloadConfig_ComplexParams: + default: + return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig) + } + } + + grpclog.Printf(" - core list: %v", setup.CoreList) + if len(setup.CoreList) > 0 { + // TODO core list + grpclog.Printf("specifying cores to run server on: %v", setup.CoreList) + } + + if setup.CoreLimit > 0 { + runtime.GOMAXPROCS(int(setup.CoreLimit)) + } else { + // runtime.GOMAXPROCS(runtime.NumCPU()) + runtime.GOMAXPROCS(1) + } + + grpclog.Printf(" - port: %v", setup.Port) + p, close := benchmark.StartServer(":"+strconv.Itoa(int(setup.Port)), opts...) + grpclog.Printf("benchmark server listening at port %v", p) + + // temp := strings.Split(addr, ":") + // if len(temp) <= 0 { + // return nil, grpc.Errorf(codes.Internal, "benchmark test address not valid: %v", addr) + // } + // p, err := strconv.Atoi(temp[len(temp)-1]) + // if err != nil { + // return nil, grpc.Errorf(codes.Internal, "%v", err) + // } + + bs := &benchmarkServer{port: p, close: close, lastResetTime: time.Now()} + return bs, nil +} + +func (bs *benchmarkServer) getStats() *testpb.ServerStats { + bs.mu.RLock() + defer bs.mu.RUnlock() + return &testpb.ServerStats{TimeElapsed: time.Since(bs.lastResetTime).Seconds(), TimeUser: 0, TimeSystem: 0} +} + +func (bs *benchmarkServer) reset() { + bs.mu.Lock() + defer bs.mu.Unlock() + bs.lastResetTime = time.Now() +} diff --git a/benchmark/worker/main.go b/benchmark/worker/main.go new file mode 100644 index 00000000..e31b3c6c --- /dev/null +++ b/benchmark/worker/main.go @@ -0,0 +1,108 @@ +package main + +import ( + "io" + "net" + "runtime" + "sync" + + "golang.org/x/net/context" + "google.golang.org/grpc" + testpb "google.golang.org/grpc/benchmark/grpc_testing" + "google.golang.org/grpc/codes" + "google.golang.org/grpc/grpclog" +) + +var ( + ports = []string{":10000"} + // ports = []string{":10010"} +) + +type workerServer struct { + bs *benchmarkServer +} + +func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) error { + for { + in, err := stream.Recv() + if err == io.EOF { + grpclog.Printf("closing benchmark server") + if s.bs != nil { + s.bs.close() + s.bs = nil + } + return nil + } + if err != nil { + return err + } + + switch t := in.Argtype.(type) { + case *testpb.ServerArgs_Setup: + grpclog.Printf("server setup received:") + + bs, err := startBenchmarkServerWithSetup(t.Setup) + if err != nil { + return err + } + s.bs = bs + case *testpb.ServerArgs_Mark: + grpclog.Printf("server mark received:") + grpclog.Printf(" - %v", t) + if s.bs == nil { + return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received") + } + if t.Mark.Reset_ { + s.bs.reset() + } + } + + out := &testpb.ServerStatus{ + Stats: s.bs.getStats(), + Port: int32(s.bs.port), + Cores: 1, + } + if err := stream.Send(out); err != nil { + return err + } + } + + return nil +} + +func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) error { + return nil +} + +func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*testpb.CoreResponse, error) { + grpclog.Printf("core count: %v", runtime.NumCPU()) + return &testpb.CoreResponse{int32(runtime.NumCPU())}, nil +} + +func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) { + grpclog.Printf("quiting worker") + if s.bs != nil { + s.bs.close() + } + return &testpb.Void{}, nil +} + +func main() { + var wg sync.WaitGroup + wg.Add(len(ports)) + for i := 0; i < len(ports); i++ { + lis, err := net.Listen("tcp", ports[i]) + if err != nil { + grpclog.Fatalf("failed to listen: %v", err) + } + grpclog.Printf("worker %d listening at port %v", i, ports[i]) + + s := grpc.NewServer() + testpb.RegisterWorkerServiceServer(s, &workerServer{}) + go func() { + defer wg.Done() + s.Serve(lis) + }() + } + wg.Wait() +}