2 round modification
This commit is contained in:
@ -37,15 +37,14 @@ Package benchmark implements the building blocks to setup end-to-end gRPC benchm
|
|||||||
package benchmark
|
package benchmark
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"golang.org/x/net/context"
|
|
||||||
"io"
|
"io"
|
||||||
"math"
|
"math"
|
||||||
"net"
|
"net"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
"google.golang.org/grpc"
|
"google.golang.org/grpc"
|
||||||
testpb "google.golang.org/grpc/benchmark/grpc_testing"
|
testpb "google.golang.org/grpc/benchmark/grpc_testing"
|
||||||
"google.golang.org/grpc/grpclog"
|
"google.golang.org/grpc/grpclog"
|
||||||
|
|
||||||
)
|
)
|
||||||
|
|
||||||
func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
|
func newPayload(t testpb.PayloadType, size int) *testpb.Payload {
|
||||||
@ -121,8 +120,8 @@ func DoUnaryCall(tc testpb.TestServiceClient, reqSize, respSize int) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// DoStreamingcall performs a streaming RPC with given stub and request and response size.client side
|
// DoStreamingRoundTrip performs a round trip for a single streaming rpc.
|
||||||
func DoStreamingCall(stream testpb.TestService_StreamingCallClient, tc testpb.TestServiceClient, reqSize, respSize int) {
|
func DoStreamingRoundTrip(tc testpb.TestServiceClient, stream testpb.TestService_StreamingCallClient, reqSize, respSize int) {
|
||||||
pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
|
pl := newPayload(testpb.PayloadType_COMPRESSABLE, reqSize)
|
||||||
req := &testpb.SimpleRequest{
|
req := &testpb.SimpleRequest{
|
||||||
ResponseType: pl.Type,
|
ResponseType: pl.Type,
|
||||||
@ -130,10 +129,10 @@ func DoStreamingCall(stream testpb.TestService_StreamingCallClient, tc testpb.Te
|
|||||||
Payload: pl,
|
Payload: pl,
|
||||||
}
|
}
|
||||||
if err := stream.Send(req); err != nil {
|
if err := stream.Send(req); err != nil {
|
||||||
grpclog.Fatalf("%v.StreamingCall()= %v ", tc, err)
|
grpclog.Fatalf("%v.StreamingCall(_)=_, %v: ", tc, err)
|
||||||
}
|
}
|
||||||
if _, err := stream.Recv(); err != nil {
|
if _, err := stream.Recv(); err != nil {
|
||||||
grpclog.Fatal("%v.StreamingCall()= %v", tc, err)
|
grpclog.Fatal("%v.StreamingCall(_)=_, %v: ", tc, err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -64,11 +64,11 @@ func runStream(b *testing.B, maxConcurrentCalls int) {
|
|||||||
tc := testpb.NewTestServiceClient(conn)
|
tc := testpb.NewTestServiceClient(conn)
|
||||||
stream, err := tc.StreamingCall(context.Background())
|
stream, err := tc.StreamingCall(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
grpclog.Fatalf("%v.StreamingCall()=%v", tc, err)
|
grpclog.Fatalf("%v.StreamingCall(_)=_,%v: ", tc, err)
|
||||||
}
|
}
|
||||||
// Warm up connection.
|
// Warm up connection.
|
||||||
for i := 0; i < 10; i++ {
|
for i := 0; i < 10; i++ {
|
||||||
streamCaller(stream, tc)
|
streamCaller(tc, stream)
|
||||||
}
|
}
|
||||||
|
|
||||||
ch := make(chan int, maxConcurrentCalls*4)
|
ch := make(chan int, maxConcurrentCalls*4)
|
||||||
@ -83,7 +83,7 @@ func runStream(b *testing.B, maxConcurrentCalls int) {
|
|||||||
go func() {
|
go func() {
|
||||||
for _ = range ch {
|
for _ = range ch {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
streamCaller(stream, tc)
|
streamCaller(tc, stream)
|
||||||
elapse := time.Since(start)
|
elapse := time.Since(start)
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
s.Add(elapse)
|
s.Add(elapse)
|
||||||
@ -105,8 +105,8 @@ func unaryCaller(client testpb.TestServiceClient) {
|
|||||||
DoUnaryCall(client, 1, 1)
|
DoUnaryCall(client, 1, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func streamCaller(stream testpb.TestService_StreamingCallClient, client testpb.TestServiceClient) {
|
func streamCaller(client testpb.TestServiceClient, stream testpb.TestService_StreamingCallClient) {
|
||||||
DoStreamingCall(stream, client, 1, 1)
|
DoStreamingRoundTrip(client, stream, 1, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func BenchmarkClientStreamc1(b *testing.B) {
|
func BenchmarkClientStreamc1(b *testing.B) {
|
||||||
|
@ -28,8 +28,8 @@ func unaryCaller(client testpb.TestServiceClient) {
|
|||||||
benchmark.DoUnaryCall(client, 1, 1)
|
benchmark.DoUnaryCall(client, 1, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func streamCaller(stream testpb.TestService_StreamingCallClient, client testpb.TestServiceClient) {
|
func streamCaller(client testpb.TestServiceClient, stream testpb.TestService_StreamingCallClient) {
|
||||||
benchmark.DoStreamingCall(stream, client, 1, 1)
|
benchmark.DoStreamingRoundTrip(client, stream, 1, 1)
|
||||||
}
|
}
|
||||||
|
|
||||||
func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.TestServiceClient) {
|
func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.TestServiceClient) {
|
||||||
@ -91,10 +91,10 @@ func closeLoopStream() {
|
|||||||
s, conn, tc := buildConnection()
|
s, conn, tc := buildConnection()
|
||||||
stream, err := tc.StreamingCall(context.Background())
|
stream, err := tc.StreamingCall(context.Background())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
grpclog.Fatalf("%v.StreamingCall()=%v", tc, err)
|
grpclog.Fatalf("%v.StreamingCall(_)=_,%v: ", tc, err)
|
||||||
}
|
}
|
||||||
for i := 0; i < 100; i++ {
|
for i := 0; i < 100; i++ {
|
||||||
streamCaller(stream, tc)
|
streamCaller(tc, stream)
|
||||||
}
|
}
|
||||||
ch := make(chan int, *maxConcurrentRPCs*4)
|
ch := make(chan int, *maxConcurrentRPCs*4)
|
||||||
var (
|
var (
|
||||||
@ -107,7 +107,7 @@ func closeLoopStream() {
|
|||||||
go func() {
|
go func() {
|
||||||
for _ = range ch {
|
for _ = range ch {
|
||||||
start := time.Now()
|
start := time.Now()
|
||||||
streamCaller(stream, tc)
|
streamCaller(tc, stream)
|
||||||
elapse := time.Since(start)
|
elapse := time.Since(start)
|
||||||
mu.Lock()
|
mu.Lock()
|
||||||
s.Add(elapse)
|
s.Add(elapse)
|
||||||
|
Reference in New Issue
Block a user