diff --git a/metadata/metadata.go b/metadata/metadata.go index ee08032f..ccfea5d4 100644 --- a/metadata/metadata.go +++ b/metadata/metadata.go @@ -44,6 +44,9 @@ type MD map[string][]string // - lowercase letters: a-z // - special characters: -_. // Uppercase letters are automatically converted to lowercase. +// +// Keys beginning with "grpc-" are reserved for grpc-internal use only and may +// result in errors if set in metadata. func New(m map[string]string) MD { md := MD{} for k, val := range m { @@ -62,6 +65,9 @@ func New(m map[string]string) MD { // - lowercase letters: a-z // - special characters: -_. // Uppercase letters are automatically converted to lowercase. +// +// Keys beginning with "grpc-" are reserved for grpc-internal use only and may +// result in errors if set in metadata. func Pairs(kv ...string) MD { if len(kv)%2 == 1 { panic(fmt.Sprintf("metadata: Pairs got the odd number of input pairs for metadata: %d", len(kv))) diff --git a/stats/stats.go b/stats/stats.go index fb9db854..e7a14d92 100644 --- a/stats/stats.go +++ b/stats/stats.go @@ -26,6 +26,8 @@ package stats // import "google.golang.org/grpc/stats" import ( "net" "time" + + "golang.org/x/net/context" ) // RPCStats contains stats information about RPCs. @@ -210,3 +212,85 @@ type ConnEnd struct { func (s *ConnEnd) IsClient() bool { return s.Client } func (s *ConnEnd) isConnStats() {} + +type incomingTagsKey struct{} +type outgoingTagsKey struct{} + +// SetTags attaches stats tagging data to the context, which will be sent in +// the outgoing RPC with the header grpc-tags-bin. Subsequent calls to +// SetTags will overwrite the values from earlier calls. +// +// NOTE: this is provided only for backward compatibilty with existing clients +// and will likely be removed in an upcoming release. New uses should transmit +// this type of data using metadata with a different, non-reserved (i.e. does +// not begin with "grpc-") header name. +func SetTags(ctx context.Context, b []byte) context.Context { + return context.WithValue(ctx, outgoingTagsKey{}, b) +} + +// Tags returns the tags from the context for the inbound RPC. +// +// NOTE: this is provided only for backward compatibilty with existing clients +// and will likely be removed in an upcoming release. New uses should transmit +// this type of data using metadata with a different, non-reserved (i.e. does +// not begin with "grpc-") header name. +func Tags(ctx context.Context) []byte { + b, _ := ctx.Value(incomingTagsKey{}).([]byte) + return b +} + +// SetIncomingTags attaches stats tagging data to the context, to be read by +// the application (not sent in outgoing RPCs). +// +// This is intended for gRPC-internal use ONLY. +func SetIncomingTags(ctx context.Context, b []byte) context.Context { + return context.WithValue(ctx, incomingTagsKey{}, b) +} + +// OutgoingTags returns the tags from the context for the outbound RPC. +// +// This is intended for gRPC-internal use ONLY. +func OutgoingTags(ctx context.Context) []byte { + b, _ := ctx.Value(outgoingTagsKey{}).([]byte) + return b +} + +type incomingTraceKey struct{} +type outgoingTraceKey struct{} + +// SetTrace attaches stats tagging data to the context, which will be sent in +// the outgoing RPC with the header grpc-trace-bin. Subsequent calls to +// SetTrace will overwrite the values from earlier calls. +// +// NOTE: this is provided only for backward compatibilty with existing clients +// and will likely be removed in an upcoming release. New uses should transmit +// this type of data using metadata with a different, non-reserved (i.e. does +// not begin with "grpc-") header name. +func SetTrace(ctx context.Context, b []byte) context.Context { + return context.WithValue(ctx, outgoingTraceKey{}, b) +} + +// Trace returns the trace from the context for the inbound RPC. +// +// NOTE: this is provided only for backward compatibilty with existing clients +// and will likely be removed in an upcoming release. New uses should transmit +// this type of data using metadata with a different, non-reserved (i.e. does +// not begin with "grpc-") header name. +func Trace(ctx context.Context) []byte { + b, _ := ctx.Value(incomingTraceKey{}).([]byte) + return b +} + +// SetIncomingTrace attaches stats tagging data to the context, to be read by +// the application (not sent in outgoing RPCs). It is intended for +// gRPC-internal use. +func SetIncomingTrace(ctx context.Context, b []byte) context.Context { + return context.WithValue(ctx, incomingTraceKey{}, b) +} + +// OutgoingTrace returns the trace from the context for the outbound RPC. It is +// intended for gRPC-internal use. +func OutgoingTrace(ctx context.Context) []byte { + b, _ := ctx.Value(outgoingTraceKey{}).([]byte) + return b +} diff --git a/stats/stats_test.go b/stats/stats_test.go index b48eff6d..8865d3fb 100644 --- a/stats/stats_test.go +++ b/stats/stats_test.go @@ -1238,3 +1238,41 @@ func TestClientStatsFullDuplexRPCNotCallingLastRecv(t *testing.T) { end: {checkEnd, 1}, }) } + +func TestTags(t *testing.T) { + b := []byte{5, 2, 4, 3, 1} + ctx := stats.SetTags(context.Background(), b) + if tg := stats.OutgoingTags(ctx); !reflect.DeepEqual(tg, b) { + t.Errorf("OutgoingTags(%v) = %v; want %v", ctx, tg, b) + } + if tg := stats.Tags(ctx); tg != nil { + t.Errorf("Tags(%v) = %v; want nil", ctx, tg) + } + + ctx = stats.SetIncomingTags(context.Background(), b) + if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, b) { + t.Errorf("Tags(%v) = %v; want %v", ctx, tg, b) + } + if tg := stats.OutgoingTags(ctx); tg != nil { + t.Errorf("OutgoingTags(%v) = %v; want nil", ctx, tg) + } +} + +func TestTrace(t *testing.T) { + b := []byte{5, 2, 4, 3, 1} + ctx := stats.SetTrace(context.Background(), b) + if tr := stats.OutgoingTrace(ctx); !reflect.DeepEqual(tr, b) { + t.Errorf("OutgoingTrace(%v) = %v; want %v", ctx, tr, b) + } + if tr := stats.Trace(ctx); tr != nil { + t.Errorf("Trace(%v) = %v; want nil", ctx, tr) + } + + ctx = stats.SetIncomingTrace(context.Background(), b) + if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, b) { + t.Errorf("Trace(%v) = %v; want %v", ctx, tr, b) + } + if tr := stats.OutgoingTrace(ctx); tr != nil { + t.Errorf("OutgoingTrace(%v) = %v; want nil", ctx, tr) + } +} diff --git a/test/end2end_test.go b/test/end2end_test.go index 067815f0..1021744f 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -55,6 +55,7 @@ import ( "google.golang.org/grpc/internal" "google.golang.org/grpc/metadata" "google.golang.org/grpc/peer" + "google.golang.org/grpc/stats" "google.golang.org/grpc/status" "google.golang.org/grpc/tap" testpb "google.golang.org/grpc/test/grpc_testing" @@ -4616,6 +4617,61 @@ func TestStreamingProxyDoesNotForwardMetadata(t *testing.T) { } } +func TestStatsTagsAndTrace(t *testing.T) { + // Data added to context by client (typically in a stats handler). + tags := []byte{1, 5, 2, 4, 3} + trace := []byte{5, 2, 1, 3, 4} + + // endpoint ensures Tags() and Trace() in context match those that were added + // by the client and returns an error if not. + endpoint := &stubServer{ + emptyCall: func(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { + md, _ := metadata.FromIncomingContext(ctx) + if tg := stats.Tags(ctx); !reflect.DeepEqual(tg, tags) { + return nil, status.Errorf(codes.Internal, "stats.Tags(%v)=%v; want %v", ctx, tg, tags) + } + if !reflect.DeepEqual(md["grpc-tags-bin"], []string{string(tags)}) { + return nil, status.Errorf(codes.Internal, "md['grpc-tags-bin']=%v; want %v", md["grpc-tags-bin"], tags) + } + if tr := stats.Trace(ctx); !reflect.DeepEqual(tr, trace) { + return nil, status.Errorf(codes.Internal, "stats.Trace(%v)=%v; want %v", ctx, tr, trace) + } + if !reflect.DeepEqual(md["grpc-trace-bin"], []string{string(trace)}) { + return nil, status.Errorf(codes.Internal, "md['grpc-trace-bin']=%v; want %v", md["grpc-trace-bin"], trace) + } + return &testpb.Empty{}, nil + }, + } + if err := endpoint.Start(); err != nil { + t.Fatalf("Error starting endpoint server: %v", err) + } + defer endpoint.Stop() + + ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + testCases := []struct { + ctx context.Context + want codes.Code + }{ + {ctx: ctx, want: codes.Internal}, + {ctx: stats.SetTags(ctx, tags), want: codes.Internal}, + {ctx: stats.SetTrace(ctx, trace), want: codes.Internal}, + {ctx: stats.SetTags(stats.SetTrace(ctx, tags), tags), want: codes.Internal}, + {ctx: stats.SetTags(stats.SetTrace(ctx, trace), tags), want: codes.OK}, + } + + for _, tc := range testCases { + _, err := endpoint.client.EmptyCall(tc.ctx, &testpb.Empty{}) + if tc.want == codes.OK && err != nil { + t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, nil", tc.ctx, err) + } + if s, ok := status.FromError(err); !ok || s.Code() != tc.want { + t.Fatalf("endpoint.client.EmptyCall(%v, _) = _, %v; want _, ", tc.ctx, err, tc.want) + } + } +} + type windowSizeConfig struct { serverStream int32 serverConn int32 diff --git a/transport/http2_client.go b/transport/http2_client.go index 17a15a00..bccc0567 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -471,6 +471,12 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea var ( endHeaders bool ) + if b := stats.OutgoingTags(ctx); b != nil { + t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-tags-bin", Value: encodeBinHeader(b)}) + } + if b := stats.OutgoingTrace(ctx); b != nil { + t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-trace-bin", Value: encodeBinHeader(b)}) + } if md, ok := metadata.FromOutgoingContext(ctx); ok { for k, vv := range md { // HTTP doesn't allow you to set pseudoheaders after non pseudoheaders were set. diff --git a/transport/http2_server.go b/transport/http2_server.go index b6f93e3c..6ee6f40f 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -274,6 +274,12 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func( if len(state.mdata) > 0 { s.ctx = metadata.NewIncomingContext(s.ctx, state.mdata) } + if state.statsTags != nil { + s.ctx = stats.SetIncomingTags(s.ctx, state.statsTags) + } + if state.statsTrace != nil { + s.ctx = stats.SetIncomingTrace(s.ctx, state.statsTrace) + } s.trReader = &transportReader{ reader: &recvBufferReader{ ctx: s.ctx, diff --git a/transport/http_util.go b/transport/http_util.go index 685c6fbf..f7c052aa 100644 --- a/transport/http_util.go +++ b/transport/http_util.go @@ -111,7 +111,9 @@ type decodeState struct { timeout time.Duration method string // key-value metadata map from the peer. - mdata map[string][]string + mdata map[string][]string + statsTags []byte + statsTrace []byte } // isReservedHeader checks whether hdr belongs to HTTP2 headers @@ -235,6 +237,13 @@ func (d *decodeState) decodeResponseHeader(frame *http2.MetaHeadersFrame) error } +func (d *decodeState) addMetadata(k, v string) { + if d.mdata == nil { + d.mdata = make(map[string][]string) + } + d.mdata[k] = append(d.mdata[k], v) +} + func (d *decodeState) processHeaderField(f hpack.HeaderField) error { switch f.Name { case "content-type": @@ -275,18 +284,30 @@ func (d *decodeState) processHeaderField(f hpack.HeaderField) error { return streamErrorf(codes.Internal, "transport: malformed http-status: %v", err) } d.httpStatus = &code - default: - if !isReservedHeader(f.Name) || isWhitelistedPseudoHeader(f.Name) { - if d.mdata == nil { - d.mdata = make(map[string][]string) - } - v, err := decodeMetadataHeader(f.Name, f.Value) - if err != nil { - errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err) - return nil - } - d.mdata[f.Name] = append(d.mdata[f.Name], v) + case "grpc-tags-bin": + v, err := decodeBinHeader(f.Value) + if err != nil { + return streamErrorf(codes.Internal, "transport: malformed grpc-tags-bin: %v", err) } + d.statsTags = v + d.addMetadata(f.Name, string(v)) + case "grpc-trace-bin": + v, err := decodeBinHeader(f.Value) + if err != nil { + return streamErrorf(codes.Internal, "transport: malformed grpc-trace-bin: %v", err) + } + d.statsTrace = v + d.addMetadata(f.Name, string(v)) + default: + if isReservedHeader(f.Name) && !isWhitelistedPseudoHeader(f.Name) { + break + } + v, err := decodeMetadataHeader(f.Name, f.Value) + if err != nil { + errorf("Failed to decode metadata header (%q, %q): %v", f.Name, f.Value, err) + return nil + } + d.addMetadata(f.Name, string(v)) } return nil }