Not returning error in StartServer().

This commit is contained in:
Menghan Li
2016-05-03 11:29:00 -07:00
parent 444ab5553f
commit 020c480810
4 changed files with 14 additions and 32 deletions

View File

@ -93,12 +93,13 @@ func (s *testServer) StreamingCall(stream testpb.BenchmarkService_StreamingCallS
} }
// byteBufServer is a gRPC server that sends and receives byte buffer. // byteBufServer is a gRPC server that sends and receives byte buffer.
// The purpose is to remove the protobuf overhead from benchmark tests. // The purpose is to benchmark the gRPC performance without protobuf serialization/deserialization overhead.
// Now only StreamingCall() is tested.
type byteBufServer struct { type byteBufServer struct {
respSize int32 respSize int32
} }
// UnaryCall is an empty function and is not used for benchmark.
// If bytebuf UnaryCall benchmark is needed later, the function body needs to be updated.
func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) { func (s *byteBufServer) UnaryCall(ctx context.Context, in *testpb.SimpleRequest) (*testpb.SimpleResponse, error) {
return &testpb.SimpleResponse{}, nil return &testpb.SimpleResponse{}, nil
} }
@ -120,7 +121,7 @@ func (s *byteBufServer) StreamingCall(stream testpb.BenchmarkService_StreamingCa
} }
} }
// ServerInfo is used to create server. // ServerInfo contains the information to create a gRPC benchmark server.
type ServerInfo struct { type ServerInfo struct {
// Addr is the address of the server. // Addr is the address of the server.
Addr string Addr string
@ -136,9 +137,8 @@ type ServerInfo struct {
} }
// StartServer starts a gRPC server serving a benchmark service according to info. // StartServer starts a gRPC server serving a benchmark service according to info.
// Different types of servers are created according to Type.
// It returns its listen address and a function to stop the server. // It returns its listen address and a function to stop the server.
func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func(), error) { func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) {
lis, err := net.Listen("tcp", info.Addr) lis, err := net.Listen("tcp", info.Addr)
if err != nil { if err != nil {
grpclog.Fatalf("Failed to listen: %v", err) grpclog.Fatalf("Failed to listen: %v", err)
@ -150,16 +150,16 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func(), er
case "bytebuf": case "bytebuf":
respSize, ok := info.Metadata.(int32) respSize, ok := info.Metadata.(int32)
if !ok { if !ok {
return "", nil, fmt.Errorf("invalid metadata: %v, for Type: %v", info.Metadata, info.Type) grpclog.Fatalf("failed to StartServer, invalid metadata: %v, for Type: %v", info.Metadata, info.Type)
} }
testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize}) testpb.RegisterBenchmarkServiceServer(s, &byteBufServer{respSize: respSize})
default: default:
return "", nil, fmt.Errorf("unknown Type: %v", info.Type) grpclog.Fatalf("failed to StartServer, unknown Type: %v", info.Type)
} }
go s.Serve(lis) go s.Serve(lis)
return lis.Addr().String(), func() { return lis.Addr().String(), func() {
s.Stop() s.Stop()
}, nil }
} }
// DoUnaryCall performs an unary RPC with given stub and request and response sizes. // DoUnaryCall performs an unary RPC with given stub and request and response sizes.

View File

@ -16,10 +16,7 @@ import (
func runUnary(b *testing.B, maxConcurrentCalls int) { func runUnary(b *testing.B, maxConcurrentCalls int) {
s := stats.AddStats(b, 38) s := stats.AddStats(b, 38)
b.StopTimer() b.StopTimer()
target, stopper, err := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}) target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"})
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
defer stopper() defer stopper()
conn := NewClientConn(target, grpc.WithInsecure()) conn := NewClientConn(target, grpc.WithInsecure())
tc := testpb.NewBenchmarkServiceClient(conn) tc := testpb.NewBenchmarkServiceClient(conn)
@ -62,10 +59,7 @@ func runUnary(b *testing.B, maxConcurrentCalls int) {
func runStream(b *testing.B, maxConcurrentCalls int) { func runStream(b *testing.B, maxConcurrentCalls int) {
s := stats.AddStats(b, 38) s := stats.AddStats(b, 38)
b.StopTimer() b.StopTimer()
target, stopper, err := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"}) target, stopper := StartServer(ServerInfo{Addr: "localhost:0", Type: "protobuf"})
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
defer stopper() defer stopper()
conn := NewClientConn(target, grpc.WithInsecure()) conn := NewClientConn(target, grpc.WithInsecure())
tc := testpb.NewBenchmarkServiceClient(conn) tc := testpb.NewBenchmarkServiceClient(conn)

View File

@ -28,10 +28,7 @@ func main() {
grpclog.Fatalf("Failed to serve: %v", err) grpclog.Fatalf("Failed to serve: %v", err)
} }
}() }()
addr, stopper, err := benchmark.StartServer(benchmark.ServerInfo{Addr: ":0", Type: "protobuf"}) // listen on all interfaces addr, stopper := benchmark.StartServer(benchmark.ServerInfo{Addr: ":0", Type: "protobuf"}) // listen on all interfaces
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
grpclog.Println("Server Address: ", addr) grpclog.Println("Server Address: ", addr)
<-time.After(time.Duration(*duration) * time.Second) <-time.After(time.Duration(*duration) * time.Second)
stopper() stopper()

View File

@ -115,22 +115,16 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
switch payload := config.PayloadConfig.Payload.(type) { switch payload := config.PayloadConfig.Payload.(type) {
case *testpb.PayloadConfig_BytebufParams: case *testpb.PayloadConfig_BytebufParams:
opts = append(opts, grpc.CustomCodec(byteBufCodec{})) opts = append(opts, grpc.CustomCodec(byteBufCodec{}))
addr, closeFunc, err = benchmark.StartServer(benchmark.ServerInfo{ addr, closeFunc = benchmark.StartServer(benchmark.ServerInfo{
Addr: ":" + strconv.Itoa(port), Addr: ":" + strconv.Itoa(port),
Type: "bytebuf", Type: "bytebuf",
Metadata: payload.BytebufParams.RespSize, Metadata: payload.BytebufParams.RespSize,
}, opts...) }, opts...)
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
case *testpb.PayloadConfig_SimpleParams: case *testpb.PayloadConfig_SimpleParams:
addr, closeFunc, err = benchmark.StartServer(benchmark.ServerInfo{ addr, closeFunc = benchmark.StartServer(benchmark.ServerInfo{
Addr: ":" + strconv.Itoa(port), Addr: ":" + strconv.Itoa(port),
Type: "protobuf", Type: "protobuf",
}, opts...) }, opts...)
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
case *testpb.PayloadConfig_ComplexParams: case *testpb.PayloadConfig_ComplexParams:
return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig) return nil, grpc.Errorf(codes.Unimplemented, "unsupported payload config: %v", config.PayloadConfig)
default: default:
@ -138,13 +132,10 @@ func startBenchmarkServer(config *testpb.ServerConfig, serverPort int) (*benchma
} }
} else { } else {
// Start protobuf server is payload config is nil. // Start protobuf server is payload config is nil.
addr, closeFunc, err = benchmark.StartServer(benchmark.ServerInfo{ addr, closeFunc = benchmark.StartServer(benchmark.ServerInfo{
Addr: ":" + strconv.Itoa(port), Addr: ":" + strconv.Itoa(port),
Type: "protobuf", Type: "protobuf",
}, opts...) }, opts...)
if err != nil {
grpclog.Fatalf("failed to start server: %v", err)
}
} }
grpclog.Printf("benchmark server listening at %v", addr) grpclog.Printf("benchmark server listening at %v", addr)