stats: add methods to allow setting grpc-trace-bin and grpc-tags-bin headers (#1404)
This is in preparation for preventing any user-supplied metadata starting with "grpc-", which is reserved. * stats: add methods to allow setting grpc-trace-bin and grpc-tags-bin headers Pick these up in grpc's transport when sending and fill them when receiving. * Add tags/trace to metadata and tests for that behavior This is temporary to maintain compatibility and provide a migration strategy.
This commit is contained in:
@ -44,6 +44,9 @@ type MD map[string][]string
|
|||||||
// - lowercase letters: a-z
|
// - lowercase letters: a-z
|
||||||
// - special characters: -_.
|
// - special characters: -_.
|
||||||
// Uppercase letters are automatically converted to lowercase.
|
// Uppercase letters are automatically converted to lowercase.
|
||||||
|
//
|
||||||
|
// Keys beginning with "grpc-" are reserved for grpc-internal use only and may
|
||||||
|
// result in errors if set in metadata.
|
||||||
func New(m map[string]string) MD {
|
func New(m map[string]string) MD {
|
||||||
md := MD{}
|
md := MD{}
|
||||||
for k, val := range m {
|
for k, val := range m {
|
||||||
@ -62,6 +65,9 @@ func New(m map[string]string) MD {
|
|||||||
// - lowercase letters: a-z
|
// - lowercase letters: a-z
|
||||||
// - special characters: -_.
|
// - special characters: -_.
|
||||||
// Uppercase letters are automatically converted to lowercase.
|
// Uppercase letters are automatically converted to lowercase.
|
||||||
|
//
|
||||||
|
// Keys beginning with "grpc-" are reserved for grpc-internal use only and may
|
||||||
|
// result in errors if set in metadata.
|
||||||
func Pairs(kv ...string) MD {
|
func Pairs(kv ...string) MD {
|
||||||
if len(kv)%2 == 1 {
|
if len(kv)%2 == 1 {
|
||||||
panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv)))
|
panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv)))
|
||||||
|
@ -26,6 +26,8 @@ package stats // import "google.golang.org/grpc/stats"
|
|||||||
import (
|
import (
|
||||||
"net"
|
"net"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"golang.org/x/net/context"
|
||||||
)
|
)
|
||||||
|
|
||||||
// RPCStats contains stats information about RPCs.
|
// RPCStats contains stats information about RPCs.
|
||||||
@ -210,3 +212,85 @@ type ConnEnd struct {
|
|||||||
func (s *ConnEnd) IsClient() bool { return s.Client }
|
func (s *ConnEnd) IsClient() bool { return s.Client }
|
||||||
|
|
||||||
func (s *ConnEnd) isConnStats() {}
|
func (s *ConnEnd) isConnStats() {}
|
||||||
|
|
||||||
|
type incomingTagsKey struct{}
|
||||||
|
type outgoingTagsKey struct{}
|
||||||
|
|
||||||
|
// SetTags attaches stats tagging data to the context, which will be sent in
|
||||||
|
// the outgoing RPC with the header grpc-tags-bin. Subsequent calls to
|
||||||
|
// SetTags will overwrite the values from earlier calls.
|
||||||
|
//
|
||||||
|
// NOTE: this is provided only for backward compatibilty with existing clients
|
||||||
|
// and will likely be removed in an upcoming release. New uses should transmit
|
||||||
|
// this type of data using metadata with a different, non-reserved (i.e. does
|
||||||
|
// not begin with "grpc-") header name.
|
||||||
|
func SetTags(ctx context.Context, b []byte) context.Context {
|
||||||
|
return context.WithValue(ctx, outgoingTagsKey{}, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Tags returns the tags from the context for the inbound RPC.
|
||||||
|
//
|
||||||
|
// NOTE: this is provided only for backward compatibilty with existing clients
|
||||||
|
// and will likely be removed in an upcoming release. New uses should transmit
|
||||||
|
// this type of data using metadata with a different, non-reserved (i.e. does
|
||||||
|
// not begin with "grpc-") header name.
|
||||||
|
func Tags(ctx context.Context) []byte {
|
||||||
|
b, _ := ctx.Value(incomingTagsKey{}).([]byte)
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetIncomingTags attaches stats tagging data to the context, to be read by
|
||||||
|
// the application (not sent in outgoing RPCs).
|
||||||
|
//
|
||||||
|
// This is intended for gRPC-internal use ONLY.
|
||||||
|
func SetIncomingTags(ctx context.Context, b []byte) context.Context {
|
||||||
|
return context.WithValue(ctx, incomingTagsKey{}, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OutgoingTags returns the tags from the context for the outbound RPC.
|
||||||
|
//
|
||||||
|
// This is intended for gRPC-internal use ONLY.
|
||||||
|
func OutgoingTags(ctx context.Context) []byte {
|
||||||
|
b, _ := ctx.Value(outgoingTagsKey{}).([]byte)
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
type incomingTraceKey struct{}
|
||||||
|
type outgoingTraceKey struct{}
|
||||||
|
|
||||||
|
// SetTrace attaches stats tagging data to the context, which will be sent in
|
||||||
|
// the outgoing RPC with the header grpc-trace-bin. Subsequent calls to
|
||||||
|
// SetTrace will overwrite the values from earlier calls.
|
||||||
|
//
|
||||||
|
// NOTE: this is provided only for backward compatibilty with existing clients
|
||||||
|
// and will likely be removed in an upcoming release. New uses should transmit
|
||||||
|
// this type of data using metadata with a different, non-reserved (i.e. does
|
||||||
|
// not begin with "grpc-") header name.
|
||||||
|
func SetTrace(ctx context.Context, b []byte) context.Context {
|
||||||
|
return context.WithValue(ctx, outgoingTraceKey{}, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Trace returns the trace from the context for the inbound RPC.
|
||||||
|
//
|
||||||
|
// NOTE: this is provided only for backward compatibilty with existing clients
|
||||||
|
// and will likely be removed in an upcoming release. New uses should transmit
|
||||||
|
// this type of data using metadata with a different, non-reserved (i.e. does
|
||||||
|
// not begin with "grpc-") header name.
|
||||||
|
func Trace(ctx context.Context) []byte {
|
||||||
|
b, _ := ctx.Value(incomingTraceKey{}).([]byte)
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
|
||||||
|
// SetIncomingTrace attaches stats tagging data to the context, to be read by
|
||||||
|
// the application (not sent in outgoing RPCs). It is intended for
|
||||||
|
// gRPC-internal use.
|
||||||
|
func SetIncomingTrace(ctx context.Context, b []byte) context.Context {
|
||||||
|
return context.WithValue(ctx, incomingTraceKey{}, b)
|
||||||
|
}
|
||||||
|
|
||||||
|
// OutgoingTrace returns the trace from the context for the outbound RPC. It is
|
||||||
|
// intended for gRPC-internal use.
|
||||||
|
func OutgoingTrace(ctx context.Context) []byte {
|
||||||
|
b, _ := ctx.Value(outgoingTraceKey{}).([]byte)
|
||||||
|
return b
|
||||||
|
}
|
||||||
|
@ -1238,3 +1238,41 @@ func TestClientStatsFullDuplexRPCNotCallingLastRecv(t *testing.T) {
|
|||||||
end: {checkEnd, 1},
|
end: {checkEnd, 1},
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestTags(t *testing.T) {
|
||||||
|
b := []byte{5, 2, 4, 3, 1}
|
||||||
|
ctx := stats.SetTags(context.Background(), b)
|
||||||
|
if tg := stats.OutgoingTags(ctx); !reflect.DeepEqual(tg, b) {
|
||||||
|
t.Errorf("OutgoingTags(%v) = %v; want %v", ctx, tg, b)
|
||||||
|
}
|
||||||
|
if tg := stats.Tags(ctx); tg != nil {
|
||||||
|
t.Errorf("Tags(%v) = %v; want nil", ctx, tg)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx = stats.SetIncomingTags(context.Background(), b)
|
||||||
|
if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, b) {
|
||||||
|
t.Errorf("Tags(%v) = %v; want %v", ctx, tg, b)
|
||||||
|
}
|
||||||
|
if tg := stats.OutgoingTags(ctx); tg != nil {
|
||||||
|
t.Errorf("OutgoingTags(%v) = %v; want nil", ctx, tg)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTrace(t *testing.T) {
|
||||||
|
b := []byte{5, 2, 4, 3, 1}
|
||||||
|
ctx := stats.SetTrace(context.Background(), b)
|
||||||
|
if tr := stats.OutgoingTrace(ctx); !reflect.DeepEqual(tr, b) {
|
||||||
|
t.Errorf("OutgoingTrace(%v) = %v; want %v", ctx, tr, b)
|
||||||
|
}
|
||||||
|
if tr := stats.Trace(ctx); tr != nil {
|
||||||
|
t.Errorf("Trace(%v) = %v; want nil", ctx, tr)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx = stats.SetIncomingTrace(context.Background(), b)
|
||||||
|
if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, b) {
|
||||||
|
t.Errorf("Trace(%v) = %v; want %v", ctx, tr, b)
|
||||||
|
}
|
||||||
|
if tr := stats.OutgoingTrace(ctx); tr != nil {
|
||||||
|
t.Errorf("OutgoingTrace(%v) = %v; want nil", ctx, tr)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -55,6 +55,7 @@ import (
|
|||||||
"google.golang.org/grpc/internal"
|
"google.golang.org/grpc/internal"
|
||||||
"google.golang.org/grpc/metadata"
|
"google.golang.org/grpc/metadata"
|
||||||
"google.golang.org/grpc/peer"
|
"google.golang.org/grpc/peer"
|
||||||
|
"google.golang.org/grpc/stats"
|
||||||
"google.golang.org/grpc/status"
|
"google.golang.org/grpc/status"
|
||||||
"google.golang.org/grpc/tap"
|
"google.golang.org/grpc/tap"
|
||||||
testpb "google.golang.org/grpc/test/grpc_testing"
|
testpb "google.golang.org/grpc/test/grpc_testing"
|
||||||
@ -4616,6 +4617,61 @@ func TestStreamingProxyDoesNotForwardMetadata(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestStatsTagsAndTrace(t *testing.T) {
|
||||||
|
// Data added to context by client (typically in a stats handler).
|
||||||
|
tags := []byte{1, 5, 2, 4, 3}
|
||||||
|
trace := []byte{5, 2, 1, 3, 4}
|
||||||
|
|
||||||
|
// endpoint ensures Tags() and Trace() in context match those that were added
|
||||||
|
// by the client and returns an error if not.
|
||||||
|
endpoint := &stubServer{
|
||||||
|
emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
|
||||||
|
md, _ := metadata.FromIncomingContext(ctx)
|
||||||
|
if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) {
|
||||||
|
return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) {
|
||||||
|
return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags)
|
||||||
|
}
|
||||||
|
if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) {
|
||||||
|
return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace)
|
||||||
|
}
|
||||||
|
if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) {
|
||||||
|
return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace)
|
||||||
|
}
|
||||||
|
return &testpb.Empty{}, nil
|
||||||
|
},
|
||||||
|
}
|
||||||
|
if err := endpoint.Start(); err != nil {
|
||||||
|
t.Fatalf("Error starting endpoint server: %v", err)
|
||||||
|
}
|
||||||
|
defer endpoint.Stop()
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
ctx context.Context
|
||||||
|
want codes.Code
|
||||||
|
}{
|
||||||
|
{ctx: ctx, want: codes.Internal},
|
||||||
|
{ctx: stats.SetTags(ctx, tags), want: codes.Internal},
|
||||||
|
{ctx: stats.SetTrace(ctx, trace), want: codes.Internal},
|
||||||
|
{ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal},
|
||||||
|
{ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
_, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{})
|
||||||
|
if tc.want == codes.OK && err != nil {
|
||||||
|
t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err)
|
||||||
|
}
|
||||||
|
if s, ok := status.FromError(err); !ok || s.Code() != tc.want {
|
||||||
|
t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, <status with Code()=%v>", tc.ctx, err, tc.want)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type windowSizeConfig struct {
|
type windowSizeConfig struct {
|
||||||
serverStream int32
|
serverStream int32
|
||||||
serverConn int32
|
serverConn int32
|
||||||
|
@ -471,6 +471,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
|
|||||||
var (
|
var (
|
||||||
endHeaders bool
|
endHeaders bool
|
||||||
)
|
)
|
||||||
|
if b := stats.OutgoingTags(ctx); b != nil {
|
||||||
|
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)})
|
||||||
|
}
|
||||||
|
if b := stats.OutgoingTrace(ctx); b != nil {
|
||||||
|
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)})
|
||||||
|
}
|
||||||
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
if md, ok := metadata.FromOutgoingContext(ctx); ok {
|
||||||
for k, vv := range md {
|
for k, vv := range md {
|
||||||
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
|
// HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set.
|
||||||
|
@ -274,6 +274,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
|||||||
if len(state.mdata) > 0 {
|
if len(state.mdata) > 0 {
|
||||||
s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
|
s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata)
|
||||||
}
|
}
|
||||||
|
if state.statsTags != nil {
|
||||||
|
s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags)
|
||||||
|
}
|
||||||
|
if state.statsTrace != nil {
|
||||||
|
s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace)
|
||||||
|
}
|
||||||
s.trReader = &transportReader{
|
s.trReader = &transportReader{
|
||||||
reader: &recvBufferReader{
|
reader: &recvBufferReader{
|
||||||
ctx: s.ctx,
|
ctx: s.ctx,
|
||||||
|
@ -111,7 +111,9 @@ type decodeState struct {
|
|||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
method string
|
method string
|
||||||
// key-value metadata map from the peer.
|
// key-value metadata map from the peer.
|
||||||
mdata map[string][]string
|
mdata map[string][]string
|
||||||
|
statsTags []byte
|
||||||
|
statsTrace []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
// isReservedHeader checks whether hdr belongs to HTTP2 headers
|
// isReservedHeader checks whether hdr belongs to HTTP2 headers
|
||||||
@ -235,6 +237,13 @@ func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (d *decodeState) addMetadata(k, v string) {
|
||||||
|
if d.mdata == nil {
|
||||||
|
d.mdata = make(map[string][]string)
|
||||||
|
}
|
||||||
|
d.mdata[k] = append(d.mdata[k], v)
|
||||||
|
}
|
||||||
|
|
||||||
func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
|
func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
|
||||||
switch f.Name {
|
switch f.Name {
|
||||||
case "content-type":
|
case "content-type":
|
||||||
@ -275,18 +284,30 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error {
|
|||||||
return streamErrorf(codes.Internal, "transport: malformed http-status: %v", err)
|
return streamErrorf(codes.Internal, "transport: malformed http-status: %v", err)
|
||||||
}
|
}
|
||||||
d.httpStatus = &code
|
d.httpStatus = &code
|
||||||
default:
|
case "grpc-tags-bin":
|
||||||
if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) {
|
v, err := decodeBinHeader(f.Value)
|
||||||
if d.mdata == nil {
|
if err != nil {
|
||||||
d.mdata = make(map[string][]string)
|
return streamErrorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err)
|
||||||
}
|
|
||||||
v, err := decodeMetadataHeader(f.Name, f.Value)
|
|
||||||
if err != nil {
|
|
||||||
errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
d.mdata[f.Name] = append(d.mdata[f.Name], v)
|
|
||||||
}
|
}
|
||||||
|
d.statsTags = v
|
||||||
|
d.addMetadata(f.Name, string(v))
|
||||||
|
case "grpc-trace-bin":
|
||||||
|
v, err := decodeBinHeader(f.Value)
|
||||||
|
if err != nil {
|
||||||
|
return streamErrorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err)
|
||||||
|
}
|
||||||
|
d.statsTrace = v
|
||||||
|
d.addMetadata(f.Name, string(v))
|
||||||
|
default:
|
||||||
|
if isReservedHeader(f.Name) && !isWhitelistedPseudoHeader(f.Name) {
|
||||||
|
break
|
||||||
|
}
|
||||||
|
v, err := decodeMetadataHeader(f.Name, f.Value)
|
||||||
|
if err != nil {
|
||||||
|
errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
d.addMetadata(f.Name, string(v))
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user