From 3104ff998c521fb873ff5e0b8eb12c2bf280f0f8 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 22 Jul 2015 18:49:19 -0700 Subject: [PATCH 1/6] to be deleted --- credentials/credentials.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/credentials/credentials.go b/credentials/credentials.go index 63c42a29..f8e5d354 100644 --- a/credentials/credentials.go +++ b/credentials/credentials.go @@ -201,12 +201,12 @@ func NewServerTLSFromFile(certFile, keyFile string) (TransportAuthenticator, err } // TokenSource supplies credentials from an oauth2.TokenSource. -type TokenSource struct { +type tokenSource struct { oauth2.TokenSource } // GetRequestMetadata gets the request metadata as a map from a TokenSource. -func (ts TokenSource) GetRequestMetadata(ctx context.Context) (map[string]string, error) { +func (ts tokenSource) GetRequestMetadata(ctx context.Context) (map[string]string, error) { token, err := ts.Token() if err != nil { return nil, err @@ -221,7 +221,7 @@ func (ts TokenSource) GetRequestMetadata(ctx context.Context) (map[string]string // if your program is running on a GCE instance. // TODO(dsymonds): Deprecate and remove this. func NewComputeEngine() Credentials { - return TokenSource{google.ComputeTokenSource("")} + return tokenSource{google.ComputeTokenSource("")} } // serviceAccount represents credentials via JWT signing key. @@ -266,5 +266,5 @@ func NewApplicationDefault(ctx context.Context, scope ...string) (Credentials, e if err != nil { return nil, err } - return TokenSource{t}, nil + return tokenSource{t}, nil } From e1fea116b7d06bb2ddcb35ec52bb0a22378dc697 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Wed, 22 Jul 2015 19:24:44 -0700 Subject: [PATCH 2/6] revert --- credentials/credentials.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/credentials/credentials.go b/credentials/credentials.go index f8e5d354..63c42a29 100644 --- a/credentials/credentials.go +++ b/credentials/credentials.go @@ -201,12 +201,12 @@ func NewServerTLSFromFile(certFile, keyFile string) (TransportAuthenticator, err } // TokenSource supplies credentials from an oauth2.TokenSource. -type tokenSource struct { +type TokenSource struct { oauth2.TokenSource } // GetRequestMetadata gets the request metadata as a map from a TokenSource. -func (ts tokenSource) GetRequestMetadata(ctx context.Context) (map[string]string, error) { +func (ts TokenSource) GetRequestMetadata(ctx context.Context) (map[string]string, error) { token, err := ts.Token() if err != nil { return nil, err @@ -221,7 +221,7 @@ func (ts tokenSource) GetRequestMetadata(ctx context.Context) (map[string]string // if your program is running on a GCE instance. // TODO(dsymonds): Deprecate and remove this. func NewComputeEngine() Credentials { - return tokenSource{google.ComputeTokenSource("")} + return TokenSource{google.ComputeTokenSource("")} } // serviceAccount represents credentials via JWT signing key. @@ -266,5 +266,5 @@ func NewApplicationDefault(ctx context.Context, scope ...string) (Credentials, e if err != nil { return nil, err } - return tokenSource{t}, nil + return TokenSource{t}, nil } From 611d9607577a3d4245bc0070698338ede17bc4e6 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 24 Jul 2015 11:19:08 -0700 Subject: [PATCH 3/6] support user-agent and allow users attach their own application user agent string --- clientconn.go | 9 ++++-- test/end2end_test.go | 66 ++++++++++++++++++++++----------------- transport/http2_client.go | 8 +++++ transport/http_util.go | 15 +++++++-- transport/transport.go | 5 +++ 5 files changed, 71 insertions(+), 32 deletions(-) diff --git a/clientconn.go b/clientconn.go index e7bb9453..bc6d6f5b 100644 --- a/clientconn.go +++ b/clientconn.go @@ -114,9 +114,14 @@ func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) Di } } +// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs. +func WithUserAgent(s string) DialOption { + return func(o *dialOptions) { + o.copts.UserAgent = s + } +} + // Dial creates a client connection the given target. -// TODO(zhaoq): Have an option to make Dial return immediately without waiting -// for connection to complete. func Dial(target string, opts ...DialOption) (*ClientConn, error) { if target == "" { return nil, ErrUnspecTarget diff --git a/test/end2end_test.go b/test/end2end_test.go index 8430ed28..9b541e6e 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -62,15 +62,21 @@ var ( "key1": "value1", "key2": "value2", } + testAppUA = "myApp/1.0" ) type testServer struct { } func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) { - if _, ok := metadata.FromContext(ctx); ok { - // For testing purpose, returns an error if there is attached metadata. - return nil, grpc.Errorf(codes.DataLoss, "got extra metadata") + if md, ok := metadata.FromContext(ctx); ok { + // For testing purpose, returns an error if there is attached metadata other than + // the user agent set by the client application. + if ua, isOK := md["user-agent"]; isOK { + grpc.SendHeader(ctx, metadata.Pairs("ua", ua)) + } else { + return nil, grpc.Errorf(codes.DataLoss, "got extra metadata") + } } return new(testpb.Empty), nil } @@ -285,7 +291,7 @@ func listTestEnv() []env { return []env{env{"tcp", nil, ""}, env{"tcp", nil, "tls"}, env{"unix", unixDialer, ""}, env{"unix", unixDialer, "tls"}} } -func setUp(hs *health.HealthServer, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) { +func setUp(hs *health.HealthServer, maxStream uint32, ua string, e env) (s *grpc.Server, cc *grpc.ClientConn) { sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(maxStream)} la := ":0" switch e.network { @@ -325,9 +331,9 @@ func setUp(hs *health.HealthServer, maxStream uint32, e env) (s *grpc.Server, cc if err != nil { grpclog.Fatalf("Failed to create credentials %v", err) } - cc, err = grpc.Dial(addr, grpc.WithTransportCredentials(creds), grpc.WithDialer(e.dialer)) + cc, err = grpc.Dial(addr, grpc.WithTransportCredentials(creds), grpc.WithDialer(e.dialer), grpc.WithUserAgent(ua)) } else { - cc, err = grpc.Dial(addr, grpc.WithDialer(e.dialer)) + cc, err = grpc.Dial(addr, grpc.WithDialer(e.dialer), grpc.WithUserAgent(ua)) } if err != nil { grpclog.Fatalf("Dial(%q) = %v", addr, err) @@ -347,7 +353,7 @@ func TestTimeoutOnDeadServer(t *testing.T) { } func testTimeoutOnDeadServer(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) s.Stop() // Set -1 as the timeout to make sure if transportMonitor gets error @@ -379,7 +385,7 @@ func TestHealthCheckOnSuccess(t *testing.T) { func testHealthCheckOnSuccess(t *testing.T, e env) { hs := health.NewHealthServer() hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) - s, cc := setUp(hs, math.MaxUint32, e) + s, cc := setUp(hs, math.MaxUint32, "", e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil { t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, ", err) @@ -395,7 +401,7 @@ func TestHealthCheckOnFailure(t *testing.T) { func testHealthCheckOnFailure(t *testing.T, e env) { hs := health.NewHealthServer() hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1) - s, cc := setUp(hs, math.MaxUint32, e) + s, cc := setUp(hs, math.MaxUint32, "", e) defer tearDown(s, cc) if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") { t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.DeadlineExceeded) @@ -409,7 +415,7 @@ func TestHealthCheckOff(t *testing.T) { } func testHealthCheckOff(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") { t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.Unimplemented) @@ -424,7 +430,7 @@ func TestHealthCheckServingStatus(t *testing.T) { func testHealthCheckServingStatus(t *testing.T, e env) { hs := health.NewHealthServer() - s, cc := setUp(hs, math.MaxUint32, e) + s, cc := setUp(hs, math.MaxUint32, "", e) defer tearDown(s, cc) if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") { t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.NotFound) @@ -448,20 +454,24 @@ func testHealthCheckServingStatus(t *testing.T, e env) { } -func TestEmptyUnary(t *testing.T) { +func TestEmptyUnaryWithUserAgent(t *testing.T) { for _, e := range listTestEnv() { - testEmptyUnary(t, e) + testEmptyUnaryWithUserAgent(t, e) } } -func testEmptyUnary(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) +func testEmptyUnaryWithUserAgent(t *testing.T, e env) { + s, cc := setUp(nil, math.MaxUint32, testAppUA, e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) - reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}) + var header metadata.MD + reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header)) if err != nil || !proto.Equal(&testpb.Empty{}, reply) { t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, ", reply, err, &testpb.Empty{}) } + if v, ok := header["ua"]; !ok || v != testAppUA { + t.Fatalf("header[\"ua\"] = %q, %t, want %q, true", v, ok, testAppUA) + } } func TestFailedEmptyUnary(t *testing.T) { @@ -471,7 +481,7 @@ func TestFailedEmptyUnary(t *testing.T) { } func testFailedEmptyUnary(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) @@ -487,7 +497,7 @@ func TestLargeUnary(t *testing.T) { } func testLargeUnary(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 271828 @@ -515,7 +525,7 @@ func TestMetadataUnaryRPC(t *testing.T) { } func testMetadataUnaryRPC(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -569,7 +579,7 @@ func TestRetry(t *testing.T) { // TODO(zhaoq): Refactor to make this clearer and add more cases to test racy // and error-prone paths. func testRetry(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) var wg sync.WaitGroup @@ -599,7 +609,7 @@ func TestRPCTimeout(t *testing.T) { // TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism. func testRPCTimeout(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -625,7 +635,7 @@ func TestCancel(t *testing.T) { } func testCancel(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) argSize := 2718 @@ -657,7 +667,7 @@ func TestPingPong(t *testing.T) { } func testPingPong(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) stream, err := tc.FullDuplexCall(context.Background()) @@ -708,7 +718,7 @@ func TestMetadataStreamingRPC(t *testing.T) { } func testMetadataStreamingRPC(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) ctx := metadata.NewContext(context.Background(), testMetadata) @@ -765,7 +775,7 @@ func TestServerStreaming(t *testing.T) { } func testServerStreaming(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) @@ -817,7 +827,7 @@ func TestFailedServerStreaming(t *testing.T) { } func testFailedServerStreaming(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) respParam := make([]*testpb.ResponseParameters, len(respSizes)) @@ -847,7 +857,7 @@ func TestClientStreaming(t *testing.T) { } func testClientStreaming(t *testing.T, e env) { - s, cc := setUp(nil, math.MaxUint32, e) + s, cc := setUp(nil, math.MaxUint32, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) stream, err := tc.StreamingInputCall(context.Background()) @@ -882,7 +892,7 @@ func TestExceedMaxStreamsLimit(t *testing.T) { func testExceedMaxStreamsLimit(t *testing.T, e env) { // Only allows 1 live stream per server transport. - s, cc := setUp(nil, 1, e) + s, cc := setUp(nil, 1, "", e) tc := testpb.NewTestServiceClient(cc) defer tearDown(s, cc) // Perform a unary RPC to make sure the new settings were propagated to the client. diff --git a/transport/http2_client.go b/transport/http2_client.go index 59f4d5c3..b5a97d33 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -54,6 +54,7 @@ import ( // http2Client implements the ClientTransport interface with HTTP2. type http2Client struct { target string // server name/addr + userAgent string conn net.Conn // underlying communication channel nextID uint32 // the next stream ID to be used @@ -158,9 +159,14 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e return nil, ConnectionErrorf("transport: %v", err) } } + ua := primaryUA + if opts.UserAgent != "" { + ua += " " + opts.UserAgent + } var buf bytes.Buffer t := &http2Client{ target: addr, + userAgent: ua, conn: conn, // The client initiated stream id is odd starting from 1. nextID: 1, @@ -273,7 +279,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method}) t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host}) t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"}) + t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent}) t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"}) + if timeout > 0 { t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: timeoutEncode(timeout)}) } diff --git a/transport/http_util.go b/transport/http_util.go index 2babe729..fa0a4d0c 100644 --- a/transport/http_util.go +++ b/transport/http_util.go @@ -39,6 +39,7 @@ import ( "io" "net" "strconv" + "strings" "sync/atomic" "time" @@ -50,6 +51,8 @@ import ( ) const ( + // The primary user agent + primaryUA = "grpc-go/0.7" // http2MaxFrameLen specifies the max length of a HTTP2 frame. http2MaxFrameLen = 16384 // 16KB frame // http://http2.github.io/http2-spec/#SettingValues @@ -128,8 +131,7 @@ func isReservedHeader(hdr string) bool { "grpc-message", "grpc-status", "grpc-timeout", - "te", - "user-agent": + "te": return true default: return false @@ -161,6 +163,15 @@ func newHPACKDecoder() *hpackDecoder { d.state.method = f.Value default: if !isReservedHeader(f.Name) { + if f.Name == "user-agent" { + str := strings.Split(f.Value, " ") + if len(str) <= 1 { + // There is no application user agent string being set. + return + } + // Extract the application user agent string. + f.Value = str[1] + } if d.state.mdata == nil { d.state.mdata = make(map[string]string) } diff --git a/transport/transport.go b/transport/transport.go index 498cee54..b79cfd89 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -320,8 +320,13 @@ func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32) (Serv // ConnectOptions covers all relevant options for dialing a server. type ConnectOptions struct { + // UserAgent is the application user agent. + UserAgent string + // Dialer specifies how to dial a network address. Dialer func(string, time.Duration) (net.Conn, error) + // AuthOptions stores the credentials required to setup a client connection and issue RPCs. AuthOptions []credentials.Credentials + // Timeout specifies the timeout for dialing a client connection. Timeout time.Duration } From 23c960bc018d1f8e0ed0c8274745245deb0b5e1b Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 24 Jul 2015 11:26:21 -0700 Subject: [PATCH 4/6] gofmt -w --- transport/http2_client.go | 10 +++++----- transport/transport.go | 6 +++--- 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/transport/http2_client.go b/transport/http2_client.go index b5a97d33..8f79ad96 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -53,10 +53,10 @@ import ( // http2Client implements the ClientTransport interface with HTTP2. type http2Client struct { - target string // server name/addr + target string // server name/addr userAgent string - conn net.Conn // underlying communication channel - nextID uint32 // the next stream ID to be used + conn net.Conn // underlying communication channel + nextID uint32 // the next stream ID to be used // writableChan synchronizes write access to the transport. // A writer acquires the write lock by sending a value on writableChan @@ -165,9 +165,9 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e } var buf bytes.Buffer t := &http2Client{ - target: addr, + target: addr, userAgent: ua, - conn: conn, + conn: conn, // The client initiated stream id is odd starting from 1. nextID: 1, writableChan: make(chan int, 1), diff --git a/transport/transport.go b/transport/transport.go index b79cfd89..454511f7 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -321,13 +321,13 @@ func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32) (Serv // ConnectOptions covers all relevant options for dialing a server. type ConnectOptions struct { // UserAgent is the application user agent. - UserAgent string + UserAgent string // Dialer specifies how to dial a network address. - Dialer func(string, time.Duration) (net.Conn, error) + Dialer func(string, time.Duration) (net.Conn, error) // AuthOptions stores the credentials required to setup a client connection and issue RPCs. AuthOptions []credentials.Credentials // Timeout specifies the timeout for dialing a client connection. - Timeout time.Duration + Timeout time.Duration } // NewClientTransport establishes the transport with the required ConnectOptions From 8969f785ebbc14c96aa33a436801464083cb87a3 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 24 Jul 2015 11:29:32 -0700 Subject: [PATCH 5/6] fix a comment --- transport/transport.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/transport/transport.go b/transport/transport.go index 454511f7..c2ac3f88 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -324,7 +324,7 @@ type ConnectOptions struct { UserAgent string // Dialer specifies how to dial a network address. Dialer func(string, time.Duration) (net.Conn, error) - // AuthOptions stores the credentials required to setup a client connection and issue RPCs. + // AuthOptions stores the credentials required to setup a client connection and/or issue RPCs. AuthOptions []credentials.Credentials // Timeout specifies the timeout for dialing a client connection. Timeout time.Duration From c33d1609af3d698e1855a441b7230d08e2715be1 Mon Sep 17 00:00:00 2001 From: iamqizhao Date: Fri, 24 Jul 2015 11:39:27 -0700 Subject: [PATCH 6/6] refactor a bit --- test/end2end_test.go | 5 ++--- 1 file changed, 2 insertions(+), 3 deletions(-) diff --git a/test/end2end_test.go b/test/end2end_test.go index 9b541e6e..8d7eac83 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -72,11 +72,10 @@ func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.E if md, ok := metadata.FromContext(ctx); ok { // For testing purpose, returns an error if there is attached metadata other than // the user agent set by the client application. - if ua, isOK := md["user-agent"]; isOK { - grpc.SendHeader(ctx, metadata.Pairs("ua", ua)) - } else { + if _, ok := md["user-agent"]; !ok { return nil, grpc.Errorf(codes.DataLoss, "got extra metadata") } + grpc.SendHeader(ctx, metadata.Pairs("ua", md["user-agent"])) } return new(testpb.Empty), nil }