Revert "add tracing for streaming rpc"

This reverts commit 8071e421ab7170765af71d381294185bd9d303a9.
This commit is contained in:
yangzhouhan
2015-06-17 19:21:04 -07:00
parent 8071e421ab
commit 4782e693c2
4 changed files with 21 additions and 224 deletions

View File

@ -1,14 +1,11 @@
package main package main
import ( import (
// "fmt"
"flag" "flag"
"math" "math"
"math/rand"
"net" "net"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"runtime"
"sync" "sync"
"time" "time"
@ -24,14 +21,11 @@ var (
server = flag.String("server", "", "The server address") server = flag.String("server", "", "The server address")
maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs") maxConcurrentRPCs = flag.Int("max_concurrent_rpcs", 1, "The max number of concurrent RPCs")
duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client") duration = flag.Int("duration", math.MaxInt32, "The duration in seconds to run the benchmark client")
trace = flag.Bool("trace", true, "Whether tracing is on")
rpcType = flag.Int("rpc_type", 0, rpcType = flag.Int("rpc_type", 0,
`Configure different client rpc type. Valid options are: `Configure different client rpc type. Valid options are:
0 : unary call close loop; 0 : unary call;
1 : streaming call close loop; 1 : streaming call.`)
2 : unary call open loop`)
targetQps = flag.Int("target_qps", 1000, "The target number of rpcs per second")
workerNum = flag.Int("worker_number", 1, "The number of workers")
maxProcs = flag.Int("max_procs", 2, "The number of operating system threads that can execute user-level Go code simultaneously")
) )
func unaryCaller(client testpb.TestServiceClient) { func unaryCaller(client testpb.TestServiceClient) {
@ -49,10 +43,10 @@ func buildConnection() (s *stats.Stats, conn *grpc.ClientConn, tc testpb.TestSer
return s, conn, tc return s, conn, tc
} }
// Close loop test for unary call.
func closeLoopUnary() { func closeLoopUnary() {
s, conn, tc := buildConnection() s, conn, tc := buildConnection()
for i := 0; i < 5000; i++ {
for i := 0; i < 100; i++ {
unaryCaller(tc) unaryCaller(tc)
} }
ch := make(chan int, *maxConcurrentRPCs*4) ch := make(chan int, *maxConcurrentRPCs*4)
@ -96,7 +90,6 @@ func closeLoopUnary() {
} }
// Close loop test for streaming call.
func closeLoopStream() { func closeLoopStream() {
s, conn, tc := buildConnection() s, conn, tc := buildConnection()
stream, err := tc.StreamingCall(context.Background()) stream, err := tc.StreamingCall(context.Background())
@ -146,169 +139,9 @@ func closeLoopStream() {
grpclog.Println(s.String()) grpclog.Println(s.String())
} }
// Open loop test for unary call.
func openLoop() {
s, conn, tc := buildConnection()
// Warm up connection.
for i := 0; i < 10000; i++ {
unaryCaller(tc)
}
var (
mu sync.Mutex
wg sync.WaitGroup
// delay time.Duration
count int
)
wg.Add(*workerNum)
ch := make(chan int, 5**workerNum)
for j := 0; j < *workerNum; j++ {
go func() {
for _ = range ch {
start := time.Now()
unaryCaller(tc)
elapse := time.Since(start)
mu.Lock()
count++
s.Add(elapse)
mu.Unlock()
}
wg.Done()
}()
}
startTime := time.Now()
done := make(chan struct{})
go func() {
<-time.After(time.Duration(*duration) * time.Second)
close(done)
}()
ok := true
//delay = nextRpcDelay(*targetQps)
tick := time.Tick(time.Duration(1e9 / *targetQps))
for ok {
select {
case <-tick:
ch <- 0
//delay = nextRpcDelay(*targetQps)
//tick = time.Tick(delay)
//grpclog.Println("debug: the next delay interval is: ", delay)
case <-done:
ok = false
}
}
d := time.Since(startTime)
actualQps := float64(count) / float64(d/time.Second)
grpclog.Println("actual qps = ", actualQps)
close(ch)
wg.Wait()
conn.Close()
grpclog.Println(s.String())
}
func openLoopV3() {
// rpcNum := *targetQps * *duration
s, conn, tc := buildConnection()
// Warm up connection.
var (
mu sync.Mutex
wg sync.WaitGroup
)
//wg.Add(rpcNum)
for i := 0; i < 5000; i++ {
unaryCaller(tc)
}
startTime := time.Now()
i := 0
now := time.Now()
nextRpc := now
end := now.Add(time.Duration(*duration) * time.Second)
for now.Before(time.Time(end)) {
now = time.Now()
if now.After(time.Time(nextRpc)) {
//runtime.LockOSThread()
wg.Add(1)
nextRpc = nextRpc.Add(nextRpcDelay(*targetQps))
go func() {
start := time.Now()
unaryCaller(tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
wg.Done()
}()
i++
}
runtime.Gosched()
}
d := time.Since(startTime)
actualQps := float64(i) / float64(d/time.Second)
grpclog.Println("actual qps = ", actualQps)
wg.Wait()
conn.Close()
grpclog.Println(s.String())
}
func openLoopV4() {
rpcNum := *targetQps * *duration
s, conn, tc := buildConnection()
// Warm up connection.
var (
mu sync.Mutex
wg sync.WaitGroup
delay time.Duration
)
wg.Add(rpcNum)
for i := 0; i < 5000; i++ {
unaryCaller(tc)
}
startTime := time.Now()
i := 0
// now := time.Now()
//nextRpc := now
timer := time.NewTimer(delay)
for i < rpcNum {
//now = time.Now()
select {
//case i == rpcNum :
// break loop
case <-timer.C:
//now.After(time.Time(nextRpc)):
//runtime.LockOSThread()
// nextRpc = nextRpc.Add(nextRpcDelay(*targetQps))
delay = nextRpcDelay(*targetQps)
timer.Reset(delay)
go func() {
start := time.Now()
unaryCaller(tc)
elapse := time.Since(start)
mu.Lock()
s.Add(elapse)
mu.Unlock()
wg.Done()
}()
i++
}
// runtime.Gosched()
}
d := time.Since(startTime)
actualQps := float64(rpcNum) / float64(d/time.Second)
grpclog.Println("actual qps = ", actualQps)
wg.Wait()
conn.Close()
grpclog.Println(s.String())
}
//workerNum := 3
// Generate the next rpc interval.
func nextRpcDelay(targetQps int) time.Duration {
return time.Duration(rand.ExpFloat64() * float64(time.Second) / float64(targetQps))
}
func main() { func main() {
flag.Parse() flag.Parse()
runtime.GOMAXPROCS(*maxProcs) grpc.EnableTracing = *trace
go func() { go func() {
lis, err := net.Listen("tcp", ":0") lis, err := net.Listen("tcp", ":0")
if err != nil { if err != nil {
@ -324,11 +157,5 @@ func main() {
closeLoopUnary() closeLoopUnary()
case 1: case 1:
closeLoopStream() closeLoopStream()
case 2:
openLoop()
case 3:
openLoopV3()
case 4:
openLoopV4()
} }
} }

View File

@ -48,8 +48,6 @@ import (
// On error, it returns the error and indicates whether the call should be retried. // On error, it returns the error and indicates whether the call should be retried.
// //
// TODO(zhaoq): Check whether the received message sequence is valid. // TODO(zhaoq): Check whether the received message sequence is valid.
//var EnableTracing = true
func recvResponse(codec Codec, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error { func recvResponse(codec Codec, t transport.ClientTransport, c *callInfo, stream *transport.Stream, reply interface{}) error {
// Try to acquire header metadata from the server if there is any. // Try to acquire header metadata from the server if there is any.
var err error var err error
@ -104,6 +102,10 @@ type callInfo struct {
traceInfo traceInfo // in trace.go traceInfo traceInfo // in trace.go
} }
// EnableTracing controls whether to trace RPCs using the golang.org/x/net/trace package.
// This should only be set before any RPCs are sent or received by this program.
var EnableTracing = true
// Invoke is called by the generated code. It sends the RPC request on the // Invoke is called by the generated code. It sends the RPC request on the
// wire and returns after response is received. // wire and returns after response is received.
func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) { func Invoke(ctx context.Context, method string, args, reply interface{}, cc *ClientConn, opts ...CallOption) (err error) {
@ -135,6 +137,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
} }
}() }()
} }
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
Host: cc.authority, Host: cc.authority,
Method: method, Method: method,

View File

@ -36,10 +36,8 @@ package grpc
import ( import (
"errors" "errors"
"io" "io"
"time"
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
"google.golang.org/grpc/transport" "google.golang.org/grpc/transport"
@ -96,7 +94,6 @@ type ClientStream interface {
// by generated code. // by generated code.
func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) { func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, opts ...CallOption) (ClientStream, error) {
// TODO(zhaoq): CallOption is omitted. Add support when it is needed. // TODO(zhaoq): CallOption is omitted. Add support when it is needed.
var trInfo traceInfo
callHdr := &transport.CallHdr{ callHdr := &transport.CallHdr{
Host: cc.authority, Host: cc.authority,
Method: method, Method: method,
@ -105,14 +102,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
if err != nil { if err != nil {
return nil, toRPCErr(err) return nil, toRPCErr(err)
} }
if EnableTracing {
trInfo.tr = trace.New("Sent."+methodFamily(desc.StreamName), desc.StreamName)
trInfo.firstLine.client = true
if deadline, ok := ctx.Deadline(); ok {
trInfo.firstLine.deadline = deadline.Sub(time.Now())
}
trInfo.tr.LazyLog(&trInfo.firstLine, false)
}
s, err := t.NewStream(ctx, callHdr) s, err := t.NewStream(ctx, callHdr)
if err != nil { if err != nil {
return nil, toRPCErr(err) return nil, toRPCErr(err)
@ -123,7 +112,6 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
p: &parser{s: s}, p: &parser{s: s},
desc: desc, desc: desc,
codec: cc.dopts.codec, codec: cc.dopts.codec,
traceInfo: trInfo,
}, nil }, nil
} }
@ -134,7 +122,6 @@ type clientStream struct {
p *parser p *parser
desc *StreamDesc desc *StreamDesc
codec Codec codec Codec
traceInfo traceInfo
} }
func (cs *clientStream) Context() context.Context { func (cs *clientStream) Context() context.Context {
@ -165,7 +152,6 @@ func (cs *clientStream) SendMsg(m interface{}) (err error) {
} }
err = toRPCErr(err) err = toRPCErr(err)
}() }()
out, err := encode(cs.codec, m, compressionNone) out, err := encode(cs.codec, m, compressionNone)
if err != nil { if err != nil {
return transport.StreamErrorf(codes.Internal, "grpc: %v", err) return transport.StreamErrorf(codes.Internal, "grpc: %v", err)
@ -186,38 +172,23 @@ func (cs *clientStream) RecvMsg(m interface{}) (err error) {
return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>")) return toRPCErr(errors.New("grpc: client streaming protocol violation: get <nil>, want <EOF>"))
} }
if err == io.EOF { if err == io.EOF {
if EnableTracing {
defer cs.traceInfo.tr.Finish()
}
if cs.s.StatusCode() == codes.OK { if cs.s.StatusCode() == codes.OK {
return nil return nil
} }
return Errorf(cs.s.StatusCode(), cs.s.StatusDesc()) return Errorf(cs.s.StatusCode(), cs.s.StatusDesc())
} }
if EnableTracing {
cs.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
cs.traceInfo.tr.SetError()
}
return toRPCErr(err) return toRPCErr(err)
} }
if _, ok := err.(transport.ConnectionError); !ok { if _, ok := err.(transport.ConnectionError); !ok {
cs.t.CloseStream(cs.s, err) cs.t.CloseStream(cs.s, err)
} }
if err == io.EOF { if err == io.EOF {
if EnableTracing {
cs.traceInfo.tr.Finish()
}
if cs.s.StatusCode() == codes.OK { if cs.s.StatusCode() == codes.OK {
// Returns io.EOF to indicate the end of the stream. // Returns io.EOF to indicate the end of the stream.
return return
} }
return Errorf(cs.s.StatusCode(), cs.s.StatusDesc()) return Errorf(cs.s.StatusCode(), cs.s.StatusDesc())
} }
if EnableTracing {
cs.traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
cs.traceInfo.tr.SetError()
}
return toRPCErr(err) return toRPCErr(err)
} }

View File

@ -44,10 +44,6 @@ import (
"golang.org/x/net/trace" "golang.org/x/net/trace"
) )
// EnableTracing controls whether to trace RPCs using the golang.org/x/net/trace package.
// This should only be set before any RPCs are sent or received by this program.
var EnableTracing = true
// methodFamily returns the trace family for the given method. // methodFamily returns the trace family for the given method.
// It turns "/pkg.Service/GetFoo" into "pkg.Service". // It turns "/pkg.Service/GetFoo" into "pkg.Service".
func methodFamily(m string) string { func methodFamily(m string) string {