Fix comments and remove useless WaitGroup
This commit is contained in:
@ -90,6 +90,8 @@ func setupClientEnv(config *testpb.ClientConfig) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// createConns creates connections according to given config.
|
||||||
|
// It returns a slice of connections created, the function to close all connections, and errors if any.
|
||||||
func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error) {
|
func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error) {
|
||||||
var opts []grpc.DialOption
|
var opts []grpc.DialOption
|
||||||
|
|
||||||
@ -112,7 +114,7 @@ func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error
|
|||||||
opts = append(opts, grpc.WithInsecure())
|
opts = append(opts, grpc.WithInsecure())
|
||||||
}
|
}
|
||||||
|
|
||||||
// Use byteBufCodec is required.
|
// Use byteBufCodec if it is required.
|
||||||
if config.PayloadConfig != nil {
|
if config.PayloadConfig != nil {
|
||||||
switch config.PayloadConfig.Payload.(type) {
|
switch config.PayloadConfig.Payload.(type) {
|
||||||
case *testpb.PayloadConfig_BytebufParams:
|
case *testpb.PayloadConfig_BytebufParams:
|
||||||
@ -123,7 +125,7 @@ func createConns(config *testpb.ClientConfig) ([]*grpc.ClientConn, func(), error
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create connestions.
|
// Create connections.
|
||||||
connCount := int(config.ClientChannels)
|
connCount := int(config.ClientChannels)
|
||||||
conns := make([]*grpc.ClientConn, connCount)
|
conns := make([]*grpc.ClientConn, connCount)
|
||||||
for connIndex := 0; connIndex < connCount; connIndex++ {
|
for connIndex := 0; connIndex < connCount; connIndex++ {
|
||||||
@ -208,7 +210,7 @@ func startBenchmarkClient(config *testpb.ClientConfig) (*benchmarkClient, error)
|
|||||||
|
|
||||||
err = performRPCs(config, conns, &bc)
|
err = performRPCs(config, conns, &bc)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// Close all conns if failed to performRPCs.
|
// Close all connections if performRPCs failed.
|
||||||
closeConns()
|
closeConns()
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -221,15 +223,12 @@ func (bc *benchmarkClient) doCloseLoopUnary(conns []*grpc.ClientConn, rpcCountPe
|
|||||||
client := testpb.NewBenchmarkServiceClient(conn)
|
client := testpb.NewBenchmarkServiceClient(conn)
|
||||||
// For each connection, create rpcCountPerConn goroutines to do rpc.
|
// For each connection, create rpcCountPerConn goroutines to do rpc.
|
||||||
// Close this connection after all goroutines finish.
|
// Close this connection after all goroutines finish.
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(rpcCountPerConn)
|
|
||||||
for j := 0; j < rpcCountPerConn; j++ {
|
for j := 0; j < rpcCountPerConn; j++ {
|
||||||
go func() {
|
go func() {
|
||||||
// TODO: do warm up if necessary.
|
// TODO: do warm up if necessary.
|
||||||
// Now relying on worker client to reserve time to do warm up.
|
// Now relying on worker client to reserve time to do warm up.
|
||||||
// The worker client needs to wait for some time after client is created,
|
// The worker client needs to wait for some time after client is created,
|
||||||
// before starting benchmark.
|
// before starting benchmark.
|
||||||
defer wg.Done()
|
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
for {
|
for {
|
||||||
go func() {
|
go func() {
|
||||||
@ -271,8 +270,6 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
|||||||
for _, conn := range conns {
|
for _, conn := range conns {
|
||||||
// For each connection, create rpcCountPerConn goroutines to do rpc.
|
// For each connection, create rpcCountPerConn goroutines to do rpc.
|
||||||
// Close this connection after all goroutines finish.
|
// Close this connection after all goroutines finish.
|
||||||
var wg sync.WaitGroup
|
|
||||||
wg.Add(rpcCountPerConn)
|
|
||||||
for j := 0; j < rpcCountPerConn; j++ {
|
for j := 0; j < rpcCountPerConn; j++ {
|
||||||
c := testpb.NewBenchmarkServiceClient(conn)
|
c := testpb.NewBenchmarkServiceClient(conn)
|
||||||
stream, err := c.StreamingCall(context.Background())
|
stream, err := c.StreamingCall(context.Background())
|
||||||
@ -285,7 +282,6 @@ func (bc *benchmarkClient) doCloseLoopStreaming(conns []*grpc.ClientConn, rpcCou
|
|||||||
// Now relying on worker client to reserve time to do warm up.
|
// Now relying on worker client to reserve time to do warm up.
|
||||||
// The worker client needs to wait for some time after client is created,
|
// The worker client needs to wait for some time after client is created,
|
||||||
// before starting benchmark.
|
// before starting benchmark.
|
||||||
defer wg.Done()
|
|
||||||
done := make(chan bool)
|
done := make(chan bool)
|
||||||
for {
|
for {
|
||||||
go func() {
|
go func() {
|
||||||
|
@ -40,6 +40,7 @@ import (
|
|||||||
"net"
|
"net"
|
||||||
"runtime"
|
"runtime"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"time"
|
||||||
|
|
||||||
"golang.org/x/net/context"
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
@ -207,7 +208,7 @@ func (s *workerServer) CoreCount(ctx context.Context, in *testpb.CoreRequest) (*
|
|||||||
|
|
||||||
func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) {
|
func (s *workerServer) QuitWorker(ctx context.Context, in *testpb.Void) (*testpb.Void, error) {
|
||||||
grpclog.Printf("quiting worker")
|
grpclog.Printf("quiting worker")
|
||||||
defer func() { s.stop <- true }()
|
s.stop <- true
|
||||||
return &testpb.Void{}, nil
|
return &testpb.Void{}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -228,6 +229,7 @@ func main() {
|
|||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
<-stop
|
<-stop
|
||||||
|
time.Sleep(time.Second)
|
||||||
s.Stop()
|
s.Stop()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user