trace refactoring

This commit is contained in:
iamqizhao
2015-10-01 17:24:39 -07:00
parent f3de7c238c
commit d84ff12005
6 changed files with 154 additions and 18 deletions

View File

@ -52,7 +52,7 @@ import (
"google.golang.org/grpc/transport" "google.golang.org/grpc/transport"
) )
type methodHandler func(srv interface{}, ctx context.Context, codec Codec, buf []byte) (interface{}, error) type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error)
// MethodDesc represents an RPC service's method specification. // MethodDesc represents an RPC service's method specification.
type MethodDesc struct { type MethodDesc struct {
@ -246,7 +246,7 @@ func (s *Server) Serve(lis net.Listener) error {
c.Close() c.Close()
return nil return nil
} }
st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo) st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo, EnableTracing)
if err != nil { if err != nil {
s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err)
s.mu.Unlock() s.mu.Unlock()
@ -284,15 +284,12 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str
} }
func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) { func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
var traceInfo traceInfo var traceInfo traceInfo
if EnableTracing { if EnableTracing {
traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) traceInfo.tr = stream.Trace()
defer traceInfo.tr.Finish() defer traceInfo.tr.Finish()
traceInfo.firstLine.client = false traceInfo.firstLine.client = false
traceInfo.tr.LazyLog(&traceInfo.firstLine, false) traceInfo.tr.LazyLog(&traceInfo.firstLine, false)
ctx = trace.NewContext(ctx, traceInfo.tr)
defer func() { defer func() {
if err != nil && err != io.EOF { if err != nil && err != io.EOF {
traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true)
@ -329,7 +326,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
case compressionNone: case compressionNone:
statusCode := codes.OK statusCode := codes.OK
statusDesc := "" statusDesc := ""
reply, appErr := md.Handler(srv.server, ctx, s.opts.codec, req) df := func(v interface{}) error {
if err := s.opts.codec.Unmarshal(req, v); err != nil {
return err
}
if traceInfo.tr != nil {
traceInfo.tr.LazyLog(&payload{sent: false, msg: v}, true)
}
return nil
}
reply, appErr := md.Handler(srv.server, ctx, df)
if appErr != nil { if appErr != nil {
if err, ok := appErr.(rpcError); ok { if err, ok := appErr.(rpcError); ok {
statusCode = err.code statusCode = err.code
@ -380,18 +386,17 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
} }
func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) { func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) {
ctx, cancel := context.WithCancel(stream.Context())
defer cancel()
ss := &serverStream{ ss := &serverStream{
t: t, t: t,
s: stream, s: stream,
ctx: ctx, //ctx: ctx,
p: &parser{s: stream}, p: &parser{s: stream},
codec: s.opts.codec, codec: s.opts.codec,
tracing: EnableTracing, tracing: EnableTracing,
} }
if ss.tracing { if ss.tracing {
ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) //ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method())
ss.traceInfo.tr = stream.Trace()
ss.traceInfo.firstLine.client = false ss.traceInfo.firstLine.client = false
ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false) ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false)
ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr) ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr)

View File

@ -294,7 +294,7 @@ type ServerStream interface {
type serverStream struct { type serverStream struct {
t transport.ServerTransport t transport.ServerTransport
s *transport.Stream s *transport.Stream
ctx context.Context // provides trace.FromContext when tracing //ctx context.Context // provides trace.FromContext when tracing
p *parser p *parser
codec Codec codec Codec
statusCode codes.Code statusCode codes.Code
@ -309,7 +309,7 @@ type serverStream struct {
} }
func (ss *serverStream) Context() context.Context { func (ss *serverStream) Context() context.Context {
return ss.ctx return ss.s.Context()
} }
func (ss *serverStream) SendHeader(md metadata.MD) error { func (ss *serverStream) SendHeader(md metadata.MD) error {

View File

@ -44,6 +44,7 @@ import (
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/net/http2" "golang.org/x/net/http2"
"golang.org/x/net/trace"
"golang.org/x/net/http2/hpack" "golang.org/x/net/http2/hpack"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
@ -80,6 +81,8 @@ type http2Server struct {
fc *inFlow fc *inFlow
// sendQuotaPool provides flow control to outbound message. // sendQuotaPool provides flow control to outbound message.
sendQuotaPool *quotaPool sendQuotaPool *quotaPool
// tracing indicates whether tracing is on for this http2Server transport.
tracing bool
mu sync.Mutex // guard the following mu sync.Mutex // guard the following
state transportState state transportState
@ -90,7 +93,7 @@ type http2Server struct {
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
// returned if something goes wrong. // returned if something goes wrong.
func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) { func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo, tracing bool) (_ ServerTransport, err error) {
framer := newFramer(conn) framer := newFramer(conn)
// Send initial settings as connection preface to client. // Send initial settings as connection preface to client.
var settings []http2.Setting var settings []http2.Setting
@ -124,6 +127,7 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI
controlBuf: newRecvBuffer(), controlBuf: newRecvBuffer(),
fc: &inFlow{limit: initialConnWindowSize}, fc: &inFlow{limit: initialConnWindowSize},
sendQuotaPool: newQuotaPool(defaultWindowSize), sendQuotaPool: newQuotaPool(defaultWindowSize),
tracing: tracing,
state: reachable, state: reachable,
writableChan: make(chan int, 1), writableChan: make(chan int, 1),
shutdownChan: make(chan struct{}), shutdownChan: make(chan struct{}),
@ -202,7 +206,10 @@ func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame header
recv: s.buf, recv: s.buf,
} }
s.method = hDec.state.method s.method = hDec.state.method
if t.tracing {
s.tr = trace.New("grpc.Recv."+methodFamily(s.method), s.method)
s.ctx = trace.NewContext(s.ctx, s.tr)
}
wg.Add(1) wg.Add(1)
go func() { go func() {
handle(s) handle(s)

116
transport/trace.go Normal file
View File

@ -0,0 +1,116 @@
/*
*
* Copyright 2015, Google Inc.
* All rights reserved.
*
* Redistribution and use in source and binary forms, with or without
* modification, are permitted provided that the following conditions are
* met:
*
* * Redistributions of source code must retain the above copyright
* notice, this list of conditions and the following disclaimer.
* * Redistributions in binary form must reproduce the above
* copyright notice, this list of conditions and the following disclaimer
* in the documentation and/or other materials provided with the
* distribution.
* * Neither the name of Google Inc. nor the names of its
* contributors may be used to endorse or promote products derived from
* this software without specific prior written permission.
*
* THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
* "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
* LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
* A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
* OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
* SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
* LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
* DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
* THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
* (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
* OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
*
*/
package transport
import (
"bytes"
"fmt"
"io"
"net"
"strings"
"time"
"golang.org/x/net/trace"
)
// methodFamily returns the trace family for the given method.
// It turns "/pkg.Service/GetFoo" into "pkg.Service".
func methodFamily(m string) string {
m = strings.TrimPrefix(m, "/") // remove leading slash
if i := strings.Index(m, "/"); i >= 0 {
m = m[:i] // remove everything from second slash
}
if i := strings.LastIndex(m, "."); i >= 0 {
m = m[i+1:] // cut down to last dotted component
}
return m
}
// traceInfo contains tracing information for an RPC.
type traceInfo struct {
tr trace.Trace
firstLine firstLine
}
// firstLine is the first line of an RPC trace.
type firstLine struct {
client bool // whether this is a client (outgoing) RPC
remoteAddr net.Addr
deadline time.Duration // may be zero
}
func (f *firstLine) String() string {
var line bytes.Buffer
io.WriteString(&line, "RPC: ")
if f.client {
io.WriteString(&line, "to")
} else {
io.WriteString(&line, "from")
}
fmt.Fprintf(&line, " %v deadline:", f.remoteAddr)
if f.deadline != 0 {
fmt.Fprint(&line, f.deadline)
} else {
io.WriteString(&line, "none")
}
return line.String()
}
// payload represents an RPC request or response payload.
type payload struct {
sent bool // whether this is an outgoing payload
msg interface{} // e.g. a proto.Message
// TODO(dsymonds): add stringifying info to codec, and limit how much we hold here?
}
func (p payload) String() string {
if p.sent {
return fmt.Sprintf("sent: %v", p.msg)
} else {
return fmt.Sprintf("recv: %v", p.msg)
}
}
type fmtStringer struct {
format string
a []interface{}
}
func (f *fmtStringer) String() string {
return fmt.Sprintf(f.format, f.a...)
}
type stringer string
func (s stringer) String() string { return string(s) }

View File

@ -47,6 +47,7 @@ import (
"time" "time"
"golang.org/x/net/context" "golang.org/x/net/context"
"golang.org/x/net/trace"
"google.golang.org/grpc/codes" "google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials" "google.golang.org/grpc/credentials"
"google.golang.org/grpc/metadata" "google.golang.org/grpc/metadata"
@ -198,6 +199,8 @@ type Stream struct {
// the status received from the server. // the status received from the server.
statusCode codes.Code statusCode codes.Code
statusDesc string statusDesc string
// tracing information
tr trace.Trace
} }
// Header acquires the key-value pairs of header metadata once it // Header acquires the key-value pairs of header metadata once it
@ -232,6 +235,11 @@ func (s *Stream) Context() context.Context {
return s.ctx return s.ctx
} }
// Trace returns the trace.Trace of the stream.
func (s *Stream) Trace() trace.Trace {
return s.tr
}
// Method returns the method for the stream. // Method returns the method for the stream.
func (s *Stream) Method() string { func (s *Stream) Method() string {
return s.method return s.method
@ -308,8 +316,8 @@ const (
// NewServerTransport creates a ServerTransport with conn or non-nil error // NewServerTransport creates a ServerTransport with conn or non-nil error
// if it fails. // if it fails.
func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) { func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo, tracing bool) (ServerTransport, error) {
return newHTTP2Server(conn, maxStreams, authInfo) return newHTTP2Server(conn, maxStreams, authInfo, tracing)
} }
// ConnectOptions covers all relevant options for dialing a server. // ConnectOptions covers all relevant options for dialing a server.

View File

@ -150,7 +150,7 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
if err != nil { if err != nil {
return return
} }
transport, err := NewServerTransport("http2", conn, maxStreams, nil) transport, err := NewServerTransport("http2", conn, maxStreams, nil, false)
if err != nil { if err != nil {
return return
} }