Implement QPS workerServer and Add RunServer() for benchmarkServer

This commit is contained in:
Menghan Li
2016-04-18 14:48:19 -07:00
parent 142fdefa83
commit 9fd1d5bee8
5 changed files with 252 additions and 4 deletions

View File

@ -38,7 +38,6 @@ package benchmark
import ( import (
"io" "io"
"math"
"net" "net"
"golang.org/x/net/context" "golang.org/x/net/context"
@ -95,15 +94,15 @@ func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallS
// StartServer starts a gRPC server serving a benchmark service on the given // StartServer starts a gRPC server serving a benchmark service on the given
// address, which may be something like "localhost:0". It returns its listen // address, which may be something like "localhost:0". It returns its listen
// address and a function to stop the server. // address and a function to stop the server.
func StartServer(addr string) (string, func()) { func StartServer(addr string, opts ...grpc.ServerOption) (int, func()) {
lis, err := net.Listen("tcp", addr) lis, err := net.Listen("tcp", addr)
if err != nil { if err != nil {
grpclog.Fatalf("Failed to listen: %v", err) grpclog.Fatalf("Failed to listen: %v", err)
} }
s := grpc.NewServer(grpc.MaxConcurrentStreams(math.MaxUint32)) s := grpc.NewServer(opts...)
testpb.RegisterBenchmarkServiceServer(s, &testServer{}) testpb.RegisterBenchmarkServiceServer(s, &testServer{})
go s.Serve(lis) go s.Serve(lis)
return lis.Addr().String(), func() { return lis.Addr().(*net.TCPAddr).Port, func() {
s.Stop() s.Stop()
} }
} }

16
benchmark/server/testdata/server1.key vendored Normal file
View File

@ -0,0 +1,16 @@
-----BEGIN PRIVATE KEY-----
MIICdQIBADANBgkqhkiG9w0BAQEFAASCAl8wggJbAgEAAoGBAOHDFScoLCVJpYDD
M4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1BgzkWF+slf
3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd9N8YwbBY
AckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAECgYAn7qGnM2vbjJNBm0VZCkOkTIWm
V10okw7EPJrdL2mkre9NasghNXbE1y5zDshx5Nt3KsazKOxTT8d0Jwh/3KbaN+YY
tTCbKGW0pXDRBhwUHRcuRzScjli8Rih5UOCiZkhefUTcRb6xIhZJuQy71tjaSy0p
dHZRmYyBYO2YEQ8xoQJBAPrJPhMBkzmEYFtyIEqAxQ/o/A6E+E4w8i+KM7nQCK7q
K4JXzyXVAjLfyBZWHGM2uro/fjqPggGD6QH1qXCkI4MCQQDmdKeb2TrKRh5BY1LR
81aJGKcJ2XbcDu6wMZK4oqWbTX2KiYn9GB0woM6nSr/Y6iy1u145YzYxEV/iMwff
DJULAkB8B2MnyzOg0pNFJqBJuH29bKCcHa8gHJzqXhNO5lAlEbMK95p/P2Wi+4Hd
aiEIAF1BF326QJcvYKmwSmrORp85AkAlSNxRJ50OWrfMZnBgzVjDx3xG6KsFQVk2
ol6VhqL6dFgKUORFUWBvnKSyhjJxurlPEahV6oo6+A+mPhFY8eUvAkAZQyTdupP3
XEFQKctGz+9+gKkemDp7LBBMEMBXrGTLPhpEfcjv/7KPdnFHYmhYeBTBnuVmTVWe
F98XJ7tIFfJq
-----END PRIVATE KEY-----

16
benchmark/server/testdata/server1.pem vendored Normal file
View File

@ -0,0 +1,16 @@
-----BEGIN CERTIFICATE-----
MIICnDCCAgWgAwIBAgIBBzANBgkqhkiG9w0BAQsFADBWMQswCQYDVQQGEwJBVTET
MBEGA1UECBMKU29tZS1TdGF0ZTEhMB8GA1UEChMYSW50ZXJuZXQgV2lkZ2l0cyBQ
dHkgTHRkMQ8wDQYDVQQDEwZ0ZXN0Y2EwHhcNMTUxMTA0MDIyMDI0WhcNMjUxMTAx
MDIyMDI0WjBlMQswCQYDVQQGEwJVUzERMA8GA1UECBMISWxsaW5vaXMxEDAOBgNV
BAcTB0NoaWNhZ28xFTATBgNVBAoTDEV4YW1wbGUsIENvLjEaMBgGA1UEAxQRKi50
ZXN0Lmdvb2dsZS5jb20wgZ8wDQYJKoZIhvcNAQEBBQADgY0AMIGJAoGBAOHDFSco
LCVJpYDDM4HYtIdV6Ake/sMNaaKdODjDMsux/4tDydlumN+fm+AjPEK5GHhGn1Bg
zkWF+slf3BxhrA/8dNsnunstVA7ZBgA/5qQxMfGAq4wHNVX77fBZOgp9VlSMVfyd
9N8YwbBYAckOeUQadTi2X1S6OgJXgQ0m3MWhAgMBAAGjazBpMAkGA1UdEwQCMAAw
CwYDVR0PBAQDAgXgME8GA1UdEQRIMEaCECoudGVzdC5nb29nbGUuZnKCGHdhdGVy
em9vaS50ZXN0Lmdvb2dsZS5iZYISKi50ZXN0LnlvdXR1YmUuY29thwTAqAEDMA0G
CSqGSIb3DQEBCwUAA4GBAJFXVifQNub1LUP4JlnX5lXNlo8FxZ2a12AFQs+bzoJ6
hM044EDjqyxUqSbVePK0ni3w1fHQB5rY9yYC5f8G7aqqTY1QOhoUk8ZTSTRpnkTh
y4jjdvTZeLDVBlueZUTDRmy2feY5aZIU18vFDK08dTG0A87pppuv1LNIR3loveU8
-----END CERTIFICATE-----

View File

@ -0,0 +1,109 @@
package main
import (
"runtime"
"strconv"
"sync"
"time"
"google.golang.org/grpc"
"google.golang.org/grpc/benchmark"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/grpclog"
)
var (
// TODO change filepath
certFile = "/usr/local/google/home/menghanl/go/src/google.golang.org/grpc/benchmark/server/testdata/server1.pem"
keyFile = "/usr/local/google/home/menghanl/go/src/google.golang.org/grpc/benchmark/server/testdata/server1.key"
)
type benchmarkServer struct {
port int
close func()
mu sync.RWMutex
lastResetTime time.Time
}
func startBenchmarkServerWithSetup(setup *testpb.ServerConfig) (*benchmarkServer, error) {
var opts []grpc.ServerOption
grpclog.Printf(" - server type: %v", setup.ServerType)
switch setup.ServerType {
// Ignore server type.
case testpb.ServerType_SYNC_SERVER:
case testpb.ServerType_ASYNC_SERVER:
case testpb.ServerType_ASYNC_GENERIC_SERVER:
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow server type: %v", setup.ServerType)
}
grpclog.Printf(" - security params: %v", setup.SecurityParams)
if setup.SecurityParams != nil {
creds, err := credentials.NewServerTLSFromFile(certFile, keyFile)
if err != nil {
grpclog.Fatalf("failed to generate credentials %v", err)
}
opts = append(opts, grpc.Creds(creds))
}
// Ignore async server threads.
grpclog.Printf(" - core limit: %v", setup.CoreLimit)
grpclog.Printf(" - payload config: %v", setup.PayloadConfig)
if setup.PayloadConfig != nil {
// TODO payload config
grpclog.Printf("payload config: %v", setup.PayloadConfig)
switch setup.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams:
case *testpb.PayloadConfig_SimpleParams:
case *testpb.PayloadConfig_ComplexParams:
default:
return nil, grpc.Errorf(codes.InvalidArgument, "unknow payload config: %v", setup.PayloadConfig)
}
}
grpclog.Printf(" - core list: %v", setup.CoreList)
if len(setup.CoreList) > 0 {
// TODO core list
grpclog.Printf("specifying cores to run server on: %v", setup.CoreList)
}
if setup.CoreLimit > 0 {
runtime.GOMAXPROCS(int(setup.CoreLimit))
} else {
// runtime.GOMAXPROCS(runtime.NumCPU())
runtime.GOMAXPROCS(1)
}
grpclog.Printf(" - port: %v", setup.Port)
p, close := benchmark.StartServer(":"+strconv.Itoa(int(setup.Port)), opts...)
grpclog.Printf("benchmark server listening at port %v", p)
// temp := strings.Split(addr, ":")
// if len(temp) <= 0 {
// return nil, grpc.Errorf(codes.Internal, "benchmark test address not valid: %v", addr)
// }
// p, err := strconv.Atoi(temp[len(temp)-1])
// if err != nil {
// return nil, grpc.Errorf(codes.Internal, "%v", err)
// }
bs := &benchmarkServer{port: p, close: close, lastResetTime: time.Now()}
return bs, nil
}
func (bs *benchmarkServer) getStats() *testpb.ServerStats {
bs.mu.RLock()
defer bs.mu.RUnlock()
return &testpb.ServerStats{TimeElapsed: time.Since(bs.lastResetTime).Seconds(), TimeUser: 0, TimeSystem: 0}
}
func (bs *benchmarkServer) reset() {
bs.mu.Lock()
defer bs.mu.Unlock()
bs.lastResetTime = time.Now()
}

108
benchmark/worker/main.go Normal file
View File

@ -0,0 +1,108 @@
package main
import (
"io"
"net"
"runtime"
"sync"
"golang.org/x/net/context"
"google.golang.org/grpc"
testpb "google.golang.org/grpc/benchmark/grpc_testing"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/grpclog"
)
var (
ports = []string{":10000"}
// ports = []string{":10010"}
)
type workerServer struct {
bs *benchmarkServer
}
func (s *workerServer) RunServer(stream testpb.WorkerService_RunServerServer) error {
for {
in, err := stream.Recv()
if err == io.EOF {
grpclog.Printf("closing benchmark server")
if s.bs != nil {
s.bs.close()
s.bs = nil
}
return nil
}
if err != nil {
return err
}
switch t := in.Argtype.(type) {
case *testpb.ServerArgs_Setup:
grpclog.Printf("server setup received:")
bs, err := startBenchmarkServerWithSetup(t.Setup)
if err != nil {
return err
}
s.bs = bs
case *testpb.ServerArgs_Mark:
grpclog.Printf("server mark received:")
grpclog.Printf(" - %v", t)
if s.bs == nil {
return grpc.Errorf(codes.InvalidArgument, "server does not exist when mark received")
}
if t.Mark.Reset_ {
s.bs.reset()
}
}
out := &testpb.ServerStatus{
Stats: s.bs.getStats(),
Port: int32(s.bs.port),
Cores: 1,
}
if err := stream.Send(out); err != nil {
return err
}
}
return nil
}
func (s *workerServer) RunClient(stream testpb.WorkerService_RunClientServer) error {
return nil
}
func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*testpb.CoreResponse, error) {
grpclog.Printf("core count: %v", runtime.NumCPU())
return &testpb.CoreResponse{int32(runtime.NumCPU())}, nil
}
func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) {
grpclog.Printf("quiting worker")
if s.bs != nil {
s.bs.close()
}
return &testpb.Void{}, nil
}
func main() {
var wg sync.WaitGroup
wg.Add(len(ports))
for i := 0; i < len(ports); i++ {
lis, err := net.Listen("tcp", ports[i])
if err != nil {
grpclog.Fatalf("failed to listen: %v", err)
}
grpclog.Printf("worker %d listening at port %v", i, ports[i])
s := grpc.NewServer()
testpb.RegisterWorkerServiceServer(s, &workerServer{})
go func() {
defer wg.Done()
s.Serve(lis)
}()
}
wg.Wait()
}