From d84ff12005281b3411e2aafc893cc9cf28701a16 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Thu, 1 Oct 2015 17:24:39 -0700 Subject: [PATCH 1/6] trace refactoring --- server.go | 27 +++++---- stream.go | 4 +- transport/http2_server.go | 11 +++- transport/trace.go | 116 ++++++++++++++++++++++++++++++++++++ transport/transport.go | 12 +++- transport/transport_test.go | 2 +- 6 files changed, 154 insertions(+), 18 deletions(-) create mode 100644 transport/trace.go diff --git a/server.go b/server.go index ba68a215..b15b5cf4 100644 --- a/server.go +++ b/server.go @@ -52,7 +52,7 @@ import ( "google.golang.org/grpc/transport" ) -type methodHandler func(srv interface{}, ctx context.Context, codec Codec, buf []byte) (interface{}, error) +type methodHandler func(srv interface{}, ctx context.Context, dec func(interface{}) error) (interface{}, error) // MethodDesc represents an RPC service's method specification. type MethodDesc struct { @@ -246,7 +246,7 @@ func (s *Server) Serve(lis net.Listener) error { c.Close() return nil } - st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo) + st, err := transport.NewServerTransport("http2", c, s.opts.maxConcurrentStreams, authInfo, EnableTracing) if err != nil { s.errorf("NewServerTransport(%q) failed: %v", c.RemoteAddr(), err) s.mu.Unlock() @@ -284,15 +284,12 @@ func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Str } func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, md *MethodDesc) (err error) { - ctx, cancel := context.WithCancel(stream.Context()) - defer cancel() var traceInfo traceInfo if EnableTracing { - traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) + traceInfo.tr = stream.Trace() defer traceInfo.tr.Finish() traceInfo.firstLine.client = false traceInfo.tr.LazyLog(&traceInfo.firstLine, false) - ctx = trace.NewContext(ctx, traceInfo.tr) defer func() { if err != nil && err != io.EOF { traceInfo.tr.LazyLog(&fmtStringer{"%v", []interface{}{err}}, true) @@ -329,7 +326,16 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. case compressionNone: statusCode := codes.OK statusDesc := "" - reply, appErr := md.Handler(srv.server, ctx, s.opts.codec, req) + df := func(v interface{}) error { + if err := s.opts.codec.Unmarshal(req, v); err != nil { + return err + } + if traceInfo.tr != nil { + traceInfo.tr.LazyLog(&payload{sent: false, msg: v}, true) + } + return nil + } + reply, appErr := md.Handler(srv.server, ctx, df) if appErr != nil { if err, ok := appErr.(rpcError); ok { statusCode = err.code @@ -380,18 +386,17 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transport.Stream, srv *service, sd *StreamDesc) (err error) { - ctx, cancel := context.WithCancel(stream.Context()) - defer cancel() ss := &serverStream{ t: t, s: stream, - ctx: ctx, + //ctx: ctx, p: &parser{s: stream}, codec: s.opts.codec, tracing: EnableTracing, } if ss.tracing { - ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) + //ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) + ss.traceInfo.tr = stream.Trace() ss.traceInfo.firstLine.client = false ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false) ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr) diff --git a/stream.go b/stream.go index 34774f03..6fbed078 100644 --- a/stream.go +++ b/stream.go @@ -294,7 +294,7 @@ type ServerStream interface { type serverStream struct { t transport.ServerTransport s *transport.Stream - ctx context.Context // provides trace.FromContext when tracing + //ctx context.Context // provides trace.FromContext when tracing p *parser codec Codec statusCode codes.Code @@ -309,7 +309,7 @@ type serverStream struct { } func (ss *serverStream) Context() context.Context { - return ss.ctx + return ss.s.Context() } func (ss *serverStream) SendHeader(md metadata.MD) error { diff --git a/transport/http2_server.go b/transport/http2_server.go index 057d9368..172b4752 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -44,6 +44,7 @@ import ( "golang.org/x/net/context" "golang.org/x/net/http2" + "golang.org/x/net/trace" "golang.org/x/net/http2/hpack" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" @@ -80,6 +81,8 @@ type http2Server struct { fc *inFlow // sendQuotaPool provides flow control to outbound message. sendQuotaPool *quotaPool + // tracing indicates whether tracing is on for this http2Server transport. + tracing bool mu sync.Mutex // guard the following state transportState @@ -90,7 +93,7 @@ type http2Server struct { // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is // returned if something goes wrong. -func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (_ ServerTransport, err error) { +func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo, tracing bool) (_ ServerTransport, err error) { framer := newFramer(conn) // Send initial settings as connection preface to client. var settings []http2.Setting @@ -124,6 +127,7 @@ func newHTTP2Server(conn net.Conn, maxStreams uint32, authInfo credentials.AuthI controlBuf: newRecvBuffer(), fc: &inFlow{limit: initialConnWindowSize}, sendQuotaPool: newQuotaPool(defaultWindowSize), + tracing: tracing, state: reachable, writableChan: make(chan int, 1), shutdownChan: make(chan struct{}), @@ -202,7 +206,10 @@ func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame header recv: s.buf, } s.method = hDec.state.method - + if t.tracing { + s.tr = trace.New("grpc.Recv."+methodFamily(s.method), s.method) + s.ctx = trace.NewContext(s.ctx, s.tr) + } wg.Add(1) go func() { handle(s) diff --git a/transport/trace.go b/transport/trace.go new file mode 100644 index 00000000..dc116303 --- /dev/null +++ b/transport/trace.go @@ -0,0 +1,116 @@ +/* + * + * Copyright 2015, Google Inc. + * All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + * + */ + +package transport + +import ( + "bytes" + "fmt" + "io" + "net" + "strings" + "time" + + "golang.org/x/net/trace" +) + +// methodFamily returns the trace family for the given method. +// It turns "/pkg.Service/GetFoo" into "pkg.Service". +func methodFamily(m string) string { + m = strings.TrimPrefix(m, "/") // remove leading slash + if i := strings.Index(m, "/"); i >= 0 { + m = m[:i] // remove everything from second slash + } + if i := strings.LastIndex(m, "."); i >= 0 { + m = m[i+1:] // cut down to last dotted component + } + return m +} + +// traceInfo contains tracing information for an RPC. +type traceInfo struct { + tr trace.Trace + firstLine firstLine +} + +// firstLine is the first line of an RPC trace. +type firstLine struct { + client bool // whether this is a client (outgoing) RPC + remoteAddr net.Addr + deadline time.Duration // may be zero +} + +func (f *firstLine) String() string { + var line bytes.Buffer + io.WriteString(&line, "RPC: ") + if f.client { + io.WriteString(&line, "to") + } else { + io.WriteString(&line, "from") + } + fmt.Fprintf(&line, " %v deadline:", f.remoteAddr) + if f.deadline != 0 { + fmt.Fprint(&line, f.deadline) + } else { + io.WriteString(&line, "none") + } + return line.String() +} + +// payload represents an RPC request or response payload. +type payload struct { + sent bool // whether this is an outgoing payload + msg interface{} // e.g. a proto.Message + // TODO(dsymonds): add stringifying info to codec, and limit how much we hold here? +} + +func (p payload) String() string { + if p.sent { + return fmt.Sprintf("sent: %v", p.msg) + } else { + return fmt.Sprintf("recv: %v", p.msg) + } +} + +type fmtStringer struct { + format string + a []interface{} +} + +func (f *fmtStringer) String() string { + return fmt.Sprintf(f.format, f.a...) +} + +type stringer string + +func (s stringer) String() string { return string(s) } diff --git a/transport/transport.go b/transport/transport.go index 2dd38a83..b0287977 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -47,6 +47,7 @@ import ( "time" "golang.org/x/net/context" + "golang.org/x/net/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/metadata" @@ -198,6 +199,8 @@ type Stream struct { // the status received from the server. statusCode codes.Code statusDesc string + // tracing information + tr trace.Trace } // Header acquires the key-value pairs of header metadata once it @@ -232,6 +235,11 @@ func (s *Stream) Context() context.Context { return s.ctx } +// Trace returns the trace.Trace of the stream. +func (s *Stream) Trace() trace.Trace { + return s.tr +} + // Method returns the method for the stream. func (s *Stream) Method() string { return s.method @@ -308,8 +316,8 @@ const ( // NewServerTransport creates a ServerTransport with conn or non-nil error // if it fails. -func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo) (ServerTransport, error) { - return newHTTP2Server(conn, maxStreams, authInfo) +func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32, authInfo credentials.AuthInfo, tracing bool) (ServerTransport, error) { + return newHTTP2Server(conn, maxStreams, authInfo, tracing) } // ConnectOptions covers all relevant options for dialing a server. diff --git a/transport/transport_test.go b/transport/transport_test.go index 07ba0051..70d345ac 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -150,7 +150,7 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) { if err != nil { return } - transport, err := NewServerTransport("http2", conn, maxStreams, nil) + transport, err := NewServerTransport("http2", conn, maxStreams, nil, false) if err != nil { return } From 2aad9ba05f8a29a335de47efa79b11d3dec34dcc Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 2 Oct 2015 15:06:41 -0700 Subject: [PATCH 2/6] server side tracing refactoring --- server.go | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/server.go b/server.go index a55578a0..a5c8dc35 100644 --- a/server.go +++ b/server.go @@ -291,7 +291,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. defer traceInfo.tr.Finish() traceInfo.firstLine.client = false traceInfo.firstLine.remoteAddr = t.RemoteAddr() - if dl, ok := ctx.Deadline(); ok { + if dl, ok := stream.Context().Deadline(); ok { traceInfo.firstLine.deadline = dl.Sub(time.Now()) } traceInfo.tr.LazyLog(&traceInfo.firstLine, false) @@ -335,7 +335,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. } return nil } - reply, appErr := md.Handler(srv.server, ctx, df) + reply, appErr := md.Handler(srv.server, stream.Context(), df) if appErr != nil { if err, ok := appErr.(rpcError); ok { statusCode = err.code @@ -389,21 +389,18 @@ func (s *Server) processStreamingRPC(t transport.ServerTransport, stream *transp ss := &serverStream{ t: t, s: stream, - //ctx: ctx, p: &parser{s: stream}, codec: s.opts.codec, tracing: EnableTracing, } if ss.tracing { - //ss.traceInfo.tr = trace.New("grpc.Recv."+methodFamily(stream.Method()), stream.Method()) ss.traceInfo.tr = stream.Trace() ss.traceInfo.firstLine.client = false ss.traceInfo.firstLine.remoteAddr = t.RemoteAddr() - if dl, ok := ctx.Deadline(); ok { + if dl, ok := stream.Context().Deadline(); ok { ss.traceInfo.firstLine.deadline = dl.Sub(time.Now()) } ss.traceInfo.tr.LazyLog(&ss.traceInfo.firstLine, false) - ss.ctx = trace.NewContext(ss.ctx, ss.traceInfo.tr) defer func() { ss.mu.Lock() if err != nil && err != io.EOF { From 2e43c33e715ad48c88d435d3057f64914fa57e2c Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 2 Oct 2015 15:12:44 -0700 Subject: [PATCH 3/6] revert trace.go change --- transport/trace.go | 116 --------------------------------------------- 1 file changed, 116 deletions(-) delete mode 100644 transport/trace.go diff --git a/transport/trace.go b/transport/trace.go deleted file mode 100644 index dc116303..00000000 --- a/transport/trace.go +++ /dev/null @@ -1,116 +0,0 @@ -/* - * - * Copyright 2015, Google Inc. - * All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - * - */ - -package transport - -import ( - "bytes" - "fmt" - "io" - "net" - "strings" - "time" - - "golang.org/x/net/trace" -) - -// methodFamily returns the trace family for the given method. -// It turns "/pkg.Service/GetFoo" into "pkg.Service". -func methodFamily(m string) string { - m = strings.TrimPrefix(m, "/") // remove leading slash - if i := strings.Index(m, "/"); i >= 0 { - m = m[:i] // remove everything from second slash - } - if i := strings.LastIndex(m, "."); i >= 0 { - m = m[i+1:] // cut down to last dotted component - } - return m -} - -// traceInfo contains tracing information for an RPC. -type traceInfo struct { - tr trace.Trace - firstLine firstLine -} - -// firstLine is the first line of an RPC trace. -type firstLine struct { - client bool // whether this is a client (outgoing) RPC - remoteAddr net.Addr - deadline time.Duration // may be zero -} - -func (f *firstLine) String() string { - var line bytes.Buffer - io.WriteString(&line, "RPC: ") - if f.client { - io.WriteString(&line, "to") - } else { - io.WriteString(&line, "from") - } - fmt.Fprintf(&line, " %v deadline:", f.remoteAddr) - if f.deadline != 0 { - fmt.Fprint(&line, f.deadline) - } else { - io.WriteString(&line, "none") - } - return line.String() -} - -// payload represents an RPC request or response payload. -type payload struct { - sent bool // whether this is an outgoing payload - msg interface{} // e.g. a proto.Message - // TODO(dsymonds): add stringifying info to codec, and limit how much we hold here? -} - -func (p payload) String() string { - if p.sent { - return fmt.Sprintf("sent: %v", p.msg) - } else { - return fmt.Sprintf("recv: %v", p.msg) - } -} - -type fmtStringer struct { - format string - a []interface{} -} - -func (f *fmtStringer) String() string { - return fmt.Sprintf(f.format, f.a...) -} - -type stringer string - -func (s stringer) String() string { return string(s) } From 59258581efb03aed471d3f7283441d714ba48cd5 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 2 Oct 2015 15:14:27 -0700 Subject: [PATCH 4/6] remove temp changes --- stream.go | 1 - 1 file changed, 1 deletion(-) diff --git a/stream.go b/stream.go index 6fbed078..6dc47441 100644 --- a/stream.go +++ b/stream.go @@ -294,7 +294,6 @@ type ServerStream interface { type serverStream struct { t transport.ServerTransport s *transport.Stream - //ctx context.Context // provides trace.FromContext when tracing p *parser codec Codec statusCode codes.Code From b4aa9eae102d0034c54af60d5afa2da5a8c6fb07 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 2 Oct 2015 15:38:37 -0700 Subject: [PATCH 5/6] fix methodFamily breakage --- call.go | 2 +- stream.go | 2 +- trace.go | 14 -------------- transport/http2_server.go | 2 +- transport/transport.go | 14 ++++++++++++++ 5 files changed, 17 insertions(+), 17 deletions(-) diff --git a/call.go b/call.go index 8b688091..b5f92929 100644 --- a/call.go +++ b/call.go @@ -117,7 +117,7 @@ func Invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli } }() if EnableTracing { - c.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) + c.traceInfo.tr = trace.New("grpc.Sent."+transport.MethodFamily(method), method) defer c.traceInfo.tr.Finish() c.traceInfo.firstLine.client = true if deadline, ok := ctx.Deadline(); ok { diff --git a/stream.go b/stream.go index 6dc47441..28938a3c 100644 --- a/stream.go +++ b/stream.go @@ -126,7 +126,7 @@ func NewClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth tracing: EnableTracing, } if cs.tracing { - cs.traceInfo.tr = trace.New("grpc.Sent."+methodFamily(method), method) + cs.traceInfo.tr = trace.New("grpc.Sent."+transport.MethodFamily(method), method) cs.traceInfo.firstLine.client = true if deadline, ok := ctx.Deadline(); ok { cs.traceInfo.firstLine.deadline = deadline.Sub(time.Now()) diff --git a/trace.go b/trace.go index cde04fbf..9b88444f 100644 --- a/trace.go +++ b/trace.go @@ -38,7 +38,6 @@ import ( "fmt" "io" "net" - "strings" "time" "golang.org/x/net/trace" @@ -48,19 +47,6 @@ import ( // This should only be set before any RPCs are sent or received by this program. var EnableTracing = true -// methodFamily returns the trace family for the given method. -// It turns "/pkg.Service/GetFoo" into "pkg.Service". -func methodFamily(m string) string { - m = strings.TrimPrefix(m, "/") // remove leading slash - if i := strings.Index(m, "/"); i >= 0 { - m = m[:i] // remove everything from second slash - } - if i := strings.LastIndex(m, "."); i >= 0 { - m = m[i+1:] // cut down to last dotted component - } - return m -} - // traceInfo contains tracing information for an RPC. type traceInfo struct { tr trace.Trace diff --git a/transport/http2_server.go b/transport/http2_server.go index b7cc5360..0fcf6c47 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -44,8 +44,8 @@ import ( "golang.org/x/net/context" "golang.org/x/net/http2" - "golang.org/x/net/trace" "golang.org/x/net/http2/hpack" + "golang.org/x/net/trace" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/grpclog" diff --git a/transport/transport.go b/transport/transport.go index 7ea2a118..93efeae8 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -43,6 +43,7 @@ import ( "fmt" "io" "net" + "strings" "sync" "time" @@ -53,6 +54,19 @@ import ( "google.golang.org/grpc/metadata" ) +// MethodFamily returns the trace family for the given method. +// It turns "/pkg.Service/GetFoo" into "pkg.Service". +func MethodFamily(m string) string { + m = strings.TrimPrefix(m, "/") // remove leading slash + if i := strings.Index(m, "/"); i >= 0 { + m = m[:i] // remove everything from second slash + } + if i := strings.LastIndex(m, "."); i >= 0 { + m = m[i+1:] // cut down to last dotted component + } + return m +} + // recvMsg represents the received msg from the transport. All transport // protocol specific info has been removed. type recvMsg struct { From 4648452efaf2740671d37ae955f59197ad9523f5 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 2 Oct 2015 15:40:06 -0700 Subject: [PATCH 6/6] fix another methodFamily --- transport/http2_server.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/http2_server.go b/transport/http2_server.go index 0fcf6c47..89371c23 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -207,7 +207,7 @@ func (t *http2Server) operateHeaders(hDec *hpackDecoder, s *Stream, frame header } s.method = hDec.state.method if t.tracing { - s.tr = trace.New("grpc.Recv."+methodFamily(s.method), s.method) + s.tr = trace.New("grpc.Recv."+MethodFamily(s.method), s.method) s.ctx = trace.NewContext(s.ctx, s.tr) } wg.Add(1)