Merge pull request #382 from iamqizhao/master
Refactor server side tracing
This commit is contained in:
2
call.go
2
call.go
@ -117,7 +117,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
||||
}
|
||||
}()
|
||||
if EnableTracing {
|
||||
c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
|
||||
c.traceInfo.tr = trace.New("grpc.Sent."+transport.MethodFamily(method), method)
|
||||
defer c.traceInfo.tr.Finish()
|
||||
c.traceInfo.firstLine.client = true
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
|
19
server.go
19
server.go
@ -247,7 +247,7 @@ func (s *Server) Serve(lis net.Listener) error {
|
||||
c.Close()
|
||||
return nil
|
||||
}
|
||||
st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo)
|
||||
st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo, EnableTracing)
|
||||
if err != nil {
|
||||
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
|
||||
s.mu.Unlock()
|
||||
@ -285,19 +285,16 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
|
||||
}
|
||||
|
||||
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) {
|
||||
ctx, cancel := context.WithCancel(stream.Context())
|
||||
defer cancel()
|
||||
var traceInfo traceInfo
|
||||
if EnableTracing {
|
||||
traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
|
||||
traceInfo.tr = stream.Trace()
|
||||
defer traceInfo.tr.Finish()
|
||||
traceInfo.firstLine.client = false
|
||||
traceInfo.firstLine.remoteAddr = t.RemoteAddr()
|
||||
if dl, ok := ctx.Deadline(); ok {
|
||||
if dl, ok := stream.Context().Deadline(); ok {
|
||||
traceInfo.firstLine.deadline = dl.Sub(time.Now())
|
||||
}
|
||||
traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
|
||||
ctx = trace.NewContext(ctx, traceInfo.tr)
|
||||
defer func() {
|
||||
if err != nil && err != io.EOF {
|
||||
traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
|
||||
@ -338,7 +335,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
}
|
||||
return nil
|
||||
}
|
||||
reply, appErr := md.Handler(srv.server, ctx, df)
|
||||
reply, appErr := md.Handler(srv.server, stream.Context(), df)
|
||||
if appErr != nil {
|
||||
if err, ok := appErr.(rpcError); ok {
|
||||
statusCode = err.code
|
||||
@ -389,25 +386,21 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
||||
}
|
||||
|
||||
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) {
|
||||
ctx, cancel := context.WithCancel(stream.Context())
|
||||
defer cancel()
|
||||
ss := &serverStream{
|
||||
t: t,
|
||||
s: stream,
|
||||
ctx: ctx,
|
||||
p: &parser{s: stream},
|
||||
codec: s.opts.codec,
|
||||
tracing: EnableTracing,
|
||||
}
|
||||
if ss.tracing {
|
||||
ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
|
||||
ss.traceInfo.tr = stream.Trace()
|
||||
ss.traceInfo.firstLine.client = false
|
||||
ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr()
|
||||
if dl, ok := ctx.Deadline(); ok {
|
||||
if dl, ok := stream.Context().Deadline(); ok {
|
||||
ss.traceInfo.firstLine.deadline = dl.Sub(time.Now())
|
||||
}
|
||||
ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
|
||||
ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr)
|
||||
defer func() {
|
||||
ss.mu.Lock()
|
||||
if err != nil && err != io.EOF {
|
||||
|
@ -126,7 +126,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
||||
tracing: EnableTracing,
|
||||
}
|
||||
if cs.tracing {
|
||||
cs.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method)
|
||||
cs.traceInfo.tr = trace.New("grpc.Sent."+transport.MethodFamily(method), method)
|
||||
cs.traceInfo.firstLine.client = true
|
||||
if deadline, ok := ctx.Deadline(); ok {
|
||||
cs.traceInfo.firstLine.deadline = deadline.Sub(time.Now())
|
||||
@ -294,7 +294,6 @@ type ServerStream interface {
|
||||
type serverStream struct {
|
||||
t transport.ServerTransport
|
||||
s *transport.Stream
|
||||
ctx context.Context // provides trace.FromContext when tracing
|
||||
p *parser
|
||||
codec Codec
|
||||
statusCode codes.Code
|
||||
@ -309,7 +308,7 @@ type serverStream struct {
|
||||
}
|
||||
|
||||
func (ss *serverStream) Context() context.Context {
|
||||
return ss.ctx
|
||||
return ss.s.Context()
|
||||
}
|
||||
|
||||
func (ss *serverStream) SendHeader(md metadata.MD) error {
|
||||
|
14
trace.go
14
trace.go
@ -38,7 +38,6 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/trace"
|
||||
@ -48,19 +47,6 @@ import (
|
||||
// This should only be set before any RPCs are sent or received by this program.
|
||||
var EnableTracing = true
|
||||
|
||||
// methodFamily returns the trace family for the given method.
|
||||
// It turns "/pkg.Service/GetFoo" into "pkg.Service".
|
||||
func methodFamily(m string) string {
|
||||
m = strings.TrimPrefix(m, "/") // remove leading slash
|
||||
if i := strings.Index(m, "/"); i >= 0 {
|
||||
m = m[:i] // remove everything from second slash
|
||||
}
|
||||
if i := strings.LastIndex(m, "."); i >= 0 {
|
||||
m = m[i+1:] // cut down to last dotted component
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// traceInfo contains tracing information for an RPC.
|
||||
type traceInfo struct {
|
||||
tr trace.Trace
|
||||
|
@ -45,6 +45,7 @@ import (
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/http2"
|
||||
"golang.org/x/net/http2/hpack"
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
@ -80,6 +81,8 @@ type http2Server struct {
|
||||
fc *inFlow
|
||||
// sendQuotaPool provides flow control to outbound message.
|
||||
sendQuotaPool *quotaPool
|
||||
// tracing indicates whether tracing is on for this http2Server transport.
|
||||
tracing bool
|
||||
|
||||
mu sync.Mutex // guard the following
|
||||
state transportState
|
||||
@ -90,7 +93,7 @@ type http2Server struct {
|
||||
|
||||
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
|
||||
// returned if something goes wrong.
|
||||
func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) {
|
||||
func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo, tracing bool) (_ ServerTransport, err error) {
|
||||
framer := newFramer(conn)
|
||||
// Send initial settings as connection preface to client.
|
||||
var settings []http2.Setting
|
||||
@ -124,6 +127,7 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
|
||||
controlBuf: newRecvBuffer(),
|
||||
fc: &inFlow{limit: initialConnWindowSize},
|
||||
sendQuotaPool: newQuotaPool(defaultWindowSize),
|
||||
tracing: tracing,
|
||||
state: reachable,
|
||||
writableChan: make(chan int, 1),
|
||||
shutdownChan: make(chan struct{}),
|
||||
@ -202,7 +206,10 @@ func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame header
|
||||
recv: s.buf,
|
||||
}
|
||||
s.method = hDec.state.method
|
||||
|
||||
if t.tracing {
|
||||
s.tr = trace.New("grpc.Recv."+MethodFamily(s.method), s.method)
|
||||
s.ctx = trace.NewContext(s.ctx, s.tr)
|
||||
}
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
handle(s)
|
||||
|
@ -43,15 +43,30 @@ import (
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/trace"
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/metadata"
|
||||
)
|
||||
|
||||
// MethodFamily returns the trace family for the given method.
|
||||
// It turns "/pkg.Service/GetFoo" into "pkg.Service".
|
||||
func MethodFamily(m string) string {
|
||||
m = strings.TrimPrefix(m, "/") // remove leading slash
|
||||
if i := strings.Index(m, "/"); i >= 0 {
|
||||
m = m[:i] // remove everything from second slash
|
||||
}
|
||||
if i := strings.LastIndex(m, "."); i >= 0 {
|
||||
m = m[i+1:] // cut down to last dotted component
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
// recvMsg represents the received msg from the transport. All transport
|
||||
// protocol specific info has been removed.
|
||||
type recvMsg struct {
|
||||
@ -198,6 +213,8 @@ type Stream struct {
|
||||
// the status received from the server.
|
||||
statusCode codes.Code
|
||||
statusDesc string
|
||||
// tracing information
|
||||
tr trace.Trace
|
||||
}
|
||||
|
||||
// Header acquires the key-value pairs of header metadata once it
|
||||
@ -232,6 +249,11 @@ func (s *Stream) Context() context.Context {
|
||||
return s.ctx
|
||||
}
|
||||
|
||||
// Trace returns the trace.Trace of the stream.
|
||||
func (s *Stream) Trace() trace.Trace {
|
||||
return s.tr
|
||||
}
|
||||
|
||||
// Method returns the method for the stream.
|
||||
func (s *Stream) Method() string {
|
||||
return s.method
|
||||
@ -308,8 +330,8 @@ const (
|
||||
|
||||
// NewServerTransport creates a ServerTransport with conn or non-nil error
|
||||
// if it fails.
|
||||
func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) {
|
||||
return newHTTP2Server(conn, maxStreams, authInfo)
|
||||
func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo, tracing bool) (ServerTransport, error) {
|
||||
return newHTTP2Server(conn, maxStreams, authInfo, tracing)
|
||||
}
|
||||
|
||||
// ConnectOptions covers all relevant options for dialing a server.
|
||||
|
@ -150,7 +150,7 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
transport, err := NewServerTransport("http2", conn, maxStreams, nil)
|
||||
transport, err := NewServerTransport("http2", conn, maxStreams, nil, false)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
|
Reference in New Issue
Block a user