Calling handleRPC with context derived from the original (#1227)
* Calling handleRPC with different context derived from the original context * change comment for tagRPC and stats fields
This commit is contained in:
2
call.go
2
call.go
@ -182,7 +182,7 @@ func invoke(ctx context.Context, method string, args, reply interface{}, cc *Cli
|
|||||||
ctx = newContextWithRPCInfo(ctx)
|
ctx = newContextWithRPCInfo(ctx)
|
||||||
sh := cc.dopts.copts.StatsHandler
|
sh := cc.dopts.copts.StatsHandler
|
||||||
if sh != nil {
|
if sh != nil {
|
||||||
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
|
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
|
||||||
begin := &stats.Begin{
|
begin := &stats.Begin{
|
||||||
Client: true,
|
Client: true,
|
||||||
BeginTime: time.Now(),
|
BeginTime: time.Now(),
|
||||||
|
@ -45,19 +45,22 @@ type ConnTagInfo struct {
|
|||||||
RemoteAddr net.Addr
|
RemoteAddr net.Addr
|
||||||
// LocalAddr is the local address of the corresponding connection.
|
// LocalAddr is the local address of the corresponding connection.
|
||||||
LocalAddr net.Addr
|
LocalAddr net.Addr
|
||||||
// TODO add QOS related fields.
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// RPCTagInfo defines the relevant information needed by RPC context tagger.
|
// RPCTagInfo defines the relevant information needed by RPC context tagger.
|
||||||
type RPCTagInfo struct {
|
type RPCTagInfo struct {
|
||||||
// FullMethodName is the RPC method in the format of /package.service/method.
|
// FullMethodName is the RPC method in the format of /package.service/method.
|
||||||
FullMethodName string
|
FullMethodName string
|
||||||
|
// FailFast indicates if this RPC is failfast.
|
||||||
|
// This field is only valid on client side, it's always false on server side.
|
||||||
|
FailFast bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
|
// Handler defines the interface for the related stats handling (e.g., RPCs, connections).
|
||||||
type Handler interface {
|
type Handler interface {
|
||||||
// TagRPC can attach some information to the given context.
|
// TagRPC can attach some information to the given context.
|
||||||
// The returned context is used in the rest lifetime of the RPC.
|
// The context used for the rest lifetime of the RPC will be derived from
|
||||||
|
// the returned context.
|
||||||
TagRPC(context.Context, *RPCTagInfo) context.Context
|
TagRPC(context.Context, *RPCTagInfo) context.Context
|
||||||
// HandleRPC processes the RPC stats.
|
// HandleRPC processes the RPC stats.
|
||||||
HandleRPC(context.Context, RPCStats)
|
HandleRPC(context.Context, RPCStats)
|
||||||
|
@ -86,13 +86,13 @@ func (s *InPayload) IsClient() bool { return s.Client }
|
|||||||
func (s *InPayload) isRPCStats() {}
|
func (s *InPayload) isRPCStats() {}
|
||||||
|
|
||||||
// InHeader contains stats when a header is received.
|
// InHeader contains stats when a header is received.
|
||||||
// FullMethod, addresses and Compression are only valid if Client is false.
|
|
||||||
type InHeader struct {
|
type InHeader struct {
|
||||||
// Client is true if this InHeader is from client side.
|
// Client is true if this InHeader is from client side.
|
||||||
Client bool
|
Client bool
|
||||||
// WireLength is the wire length of header.
|
// WireLength is the wire length of header.
|
||||||
WireLength int
|
WireLength int
|
||||||
|
|
||||||
|
// The following fields are valid only if Client is false.
|
||||||
// FullMethod is the full RPC method string, i.e., /package.service/method.
|
// FullMethod is the full RPC method string, i.e., /package.service/method.
|
||||||
FullMethod string
|
FullMethod string
|
||||||
// RemoteAddr is the remote address of the corresponding connection.
|
// RemoteAddr is the remote address of the corresponding connection.
|
||||||
@ -143,13 +143,13 @@ func (s *OutPayload) IsClient() bool { return s.Client }
|
|||||||
func (s *OutPayload) isRPCStats() {}
|
func (s *OutPayload) isRPCStats() {}
|
||||||
|
|
||||||
// OutHeader contains stats when a header is sent.
|
// OutHeader contains stats when a header is sent.
|
||||||
// FullMethod, addresses and Compression are only valid if Client is true.
|
|
||||||
type OutHeader struct {
|
type OutHeader struct {
|
||||||
// Client is true if this OutHeader is from client side.
|
// Client is true if this OutHeader is from client side.
|
||||||
Client bool
|
Client bool
|
||||||
// WireLength is the wire length of header.
|
// WireLength is the wire length of header.
|
||||||
WireLength int
|
WireLength int
|
||||||
|
|
||||||
|
// The following fields are valid only if Client is true.
|
||||||
// FullMethod is the full RPC method string, i.e., /package.service/method.
|
// FullMethod is the full RPC method string, i.e., /package.service/method.
|
||||||
FullMethod string
|
FullMethod string
|
||||||
// RemoteAddr is the remote address of the corresponding connection.
|
// RemoteAddr is the remote address of the corresponding connection.
|
||||||
|
@ -800,13 +800,14 @@ func checkClientStats(t *testing.T, got []*gotData, expect *expectedData, checkF
|
|||||||
t.Fatalf("got %v stats, want %v stats", len(got), expectLen)
|
t.Fatalf("got %v stats, want %v stats", len(got), expectLen)
|
||||||
}
|
}
|
||||||
|
|
||||||
var rpcctx context.Context
|
var tagInfoInCtx *stats.RPCTagInfo
|
||||||
for i := 0; i < len(got); i++ {
|
for i := 0; i < len(got); i++ {
|
||||||
if _, ok := got[i].s.(stats.RPCStats); ok {
|
if _, ok := got[i].s.(stats.RPCStats); ok {
|
||||||
if rpcctx != nil && got[i].ctx != rpcctx {
|
tagInfoInCtxNew, _ := got[i].ctx.Value(rpcCtxKey{}).(*stats.RPCTagInfo)
|
||||||
t.Fatalf("got different contexts with stats %T", got[i].s)
|
if tagInfoInCtx != nil && tagInfoInCtx != tagInfoInCtxNew {
|
||||||
|
t.Fatalf("got context containing different tagInfo with stats %T", got[i].s)
|
||||||
}
|
}
|
||||||
rpcctx = got[i].ctx
|
tagInfoInCtx = tagInfoInCtxNew
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -154,7 +154,7 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
ctx = newContextWithRPCInfo(ctx)
|
ctx = newContextWithRPCInfo(ctx)
|
||||||
sh := cc.dopts.copts.StatsHandler
|
sh := cc.dopts.copts.StatsHandler
|
||||||
if sh != nil {
|
if sh != nil {
|
||||||
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method})
|
ctx = sh.TagRPC(ctx, &stats.RPCTagInfo{FullMethodName: method, FailFast: c.failFast})
|
||||||
begin := &stats.Begin{
|
begin := &stats.Begin{
|
||||||
Client: true,
|
Client: true,
|
||||||
BeginTime: time.Now(),
|
BeginTime: time.Now(),
|
||||||
|
@ -334,7 +334,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
if t.authInfo != nil {
|
if t.authInfo != nil {
|
||||||
pr.AuthInfo = t.authInfo
|
pr.AuthInfo = t.authInfo
|
||||||
}
|
}
|
||||||
userCtx := ctx
|
|
||||||
ctx = peer.NewContext(ctx, pr)
|
ctx = peer.NewContext(ctx, pr)
|
||||||
authData := make(map[string]string)
|
authData := make(map[string]string)
|
||||||
for _, c := range t.creds {
|
for _, c := range t.creds {
|
||||||
@ -401,7 +400,6 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
return nil, ErrConnClosing
|
return nil, ErrConnClosing
|
||||||
}
|
}
|
||||||
s := t.newStream(ctx, callHdr)
|
s := t.newStream(ctx, callHdr)
|
||||||
s.clientStatsCtx = userCtx
|
|
||||||
t.activeStreams[s.id] = s
|
t.activeStreams[s.id] = s
|
||||||
// If the number of active streams change from 0 to 1, then check if keepalive
|
// If the number of active streams change from 0 to 1, then check if keepalive
|
||||||
// has gone dormant. If so, wake it up.
|
// has gone dormant. If so, wake it up.
|
||||||
@ -514,7 +512,7 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
LocalAddr: t.localAddr,
|
LocalAddr: t.localAddr,
|
||||||
Compression: callHdr.SendCompress,
|
Compression: callHdr.SendCompress,
|
||||||
}
|
}
|
||||||
t.statsHandler.HandleRPC(s.clientStatsCtx, outHeader)
|
t.statsHandler.HandleRPC(s.ctx, outHeader)
|
||||||
}
|
}
|
||||||
t.writableChan <- 0
|
t.writableChan <- 0
|
||||||
return s, nil
|
return s, nil
|
||||||
@ -993,13 +991,13 @@ func (t *http2Client) operateHeaders(frame *http2.MetaHeadersFrame) {
|
|||||||
Client: true,
|
Client: true,
|
||||||
WireLength: int(frame.Header().Length),
|
WireLength: int(frame.Header().Length),
|
||||||
}
|
}
|
||||||
t.statsHandler.HandleRPC(s.clientStatsCtx, inHeader)
|
t.statsHandler.HandleRPC(s.ctx, inHeader)
|
||||||
} else {
|
} else {
|
||||||
inTrailer := &stats.InTrailer{
|
inTrailer := &stats.InTrailer{
|
||||||
Client: true,
|
Client: true,
|
||||||
WireLength: int(frame.Header().Length),
|
WireLength: int(frame.Header().Length),
|
||||||
}
|
}
|
||||||
t.statsHandler.HandleRPC(s.clientStatsCtx, inTrailer)
|
t.statsHandler.HandleRPC(s.ctx, inTrailer)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
@ -171,11 +171,6 @@ type Stream struct {
|
|||||||
id uint32
|
id uint32
|
||||||
// nil for client side Stream.
|
// nil for client side Stream.
|
||||||
st ServerTransport
|
st ServerTransport
|
||||||
// clientStatsCtx keeps the user context for stats handling.
|
|
||||||
// It's only valid on client side. Server side stats context is same as s.ctx.
|
|
||||||
// All client side stats collection should use the clientStatsCtx (instead of the stream context)
|
|
||||||
// so that all the generated stats for a particular RPC can be associated in the processing phase.
|
|
||||||
clientStatsCtx context.Context
|
|
||||||
// ctx is the associated context of the stream.
|
// ctx is the associated context of the stream.
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
// cancel is always nil for client side Stream.
|
// cancel is always nil for client side Stream.
|
||||||
|
Reference in New Issue
Block a user