support user-agent and allow users attach their own application user agent string

This commit is contained in:
iamqizhao
2015-07-24 11:19:08 -07:00
parent d0141d6577
commit 611d960757
5 changed files with 71 additions and 32 deletions

View File

@ -114,9 +114,14 @@ func WithDialer(f func(addr string, timeout time.Duration) (net.Conn, error)) Di
}
}
// WithUserAgent returns a DialOption that specifies a user agent string for all the RPCs.
func WithUserAgent(s string) DialOption {
return func(o *dialOptions) {
o.copts.UserAgent = s
}
}
// Dial creates a client connection the given target.
// TODO(zhaoq): Have an option to make Dial return immediately without waiting
// for connection to complete.
func Dial(target string, opts ...DialOption) (*ClientConn, error) {
if target == "" {
return nil, ErrUnspecTarget

View File

@ -62,15 +62,21 @@ var (
"key1": "value1",
"key2": "value2",
}
testAppUA = "myApp/1.0"
)
type testServer struct {
}
func (s *testServer) EmptyCall(ctx context.Context, in *testpb.Empty) (*testpb.Empty, error) {
if _, ok := metadata.FromContext(ctx); ok {
// For testing purpose, returns an error if there is attached metadata.
return nil, grpc.Errorf(codes.DataLoss, "got extra metadata")
if md, ok := metadata.FromContext(ctx); ok {
// For testing purpose, returns an error if there is attached metadata other than
// the user agent set by the client application.
if ua, isOK := md["user-agent"]; isOK {
grpc.SendHeader(ctx, metadata.Pairs("ua", ua))
} else {
return nil, grpc.Errorf(codes.DataLoss, "got extra metadata")
}
}
return new(testpb.Empty), nil
}
@ -285,7 +291,7 @@ func listTestEnv() []env {
return []env{env{"tcp", nil, ""}, env{"tcp", nil, "tls"}, env{"unix", unixDialer, ""}, env{"unix", unixDialer, "tls"}}
}
func setUp(hs *health.HealthServer, maxStream uint32, e env) (s *grpc.Server, cc *grpc.ClientConn) {
func setUp(hs *health.HealthServer, maxStream uint32, ua string, e env) (s *grpc.Server, cc *grpc.ClientConn) {
sopts := []grpc.ServerOption{grpc.MaxConcurrentStreams(maxStream)}
la := ":0"
switch e.network {
@ -325,9 +331,9 @@ func setUp(hs *health.HealthServer, maxStream uint32, e env) (s *grpc.Server, cc
if err != nil {
grpclog.Fatalf("Failed to create credentials %v", err)
}
cc, err = grpc.Dial(addr, grpc.WithTransportCredentials(creds), grpc.WithDialer(e.dialer))
cc, err = grpc.Dial(addr, grpc.WithTransportCredentials(creds), grpc.WithDialer(e.dialer), grpc.WithUserAgent(ua))
} else {
cc, err = grpc.Dial(addr, grpc.WithDialer(e.dialer))
cc, err = grpc.Dial(addr, grpc.WithDialer(e.dialer), grpc.WithUserAgent(ua))
}
if err != nil {
grpclog.Fatalf("Dial(%q) = %v", addr, err)
@ -347,7 +353,7 @@ func TestTimeoutOnDeadServer(t *testing.T) {
}
func testTimeoutOnDeadServer(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
s.Stop()
// Set -1 as the timeout to make sure if transportMonitor gets error
@ -379,7 +385,7 @@ func TestHealthCheckOnSuccess(t *testing.T) {
func testHealthCheckOnSuccess(t *testing.T, e env) {
hs := health.NewHealthServer()
hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1)
s, cc := setUp(hs, math.MaxUint32, e)
s, cc := setUp(hs, math.MaxUint32, "", e)
defer tearDown(s, cc)
if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != nil {
t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, <nil>", err)
@ -395,7 +401,7 @@ func TestHealthCheckOnFailure(t *testing.T) {
func testHealthCheckOnFailure(t *testing.T, e env) {
hs := health.NewHealthServer()
hs.SetServingStatus("", "grpc.health.v1alpha.HealthCheck", 1)
s, cc := setUp(hs, math.MaxUint32, e)
s, cc := setUp(hs, math.MaxUint32, "", e)
defer tearDown(s, cc)
if _, err := healthCheck(0*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.DeadlineExceeded, "context deadline exceeded") {
t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.DeadlineExceeded)
@ -409,7 +415,7 @@ func TestHealthCheckOff(t *testing.T) {
}
func testHealthCheckOff(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
defer tearDown(s, cc)
if _, err := healthCheck(1*time.Second, cc, ""); err != grpc.Errorf(codes.Unimplemented, "unknown service grpc.health.v1alpha.HealthCheck") {
t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.Unimplemented)
@ -424,7 +430,7 @@ func TestHealthCheckServingStatus(t *testing.T) {
func testHealthCheckServingStatus(t *testing.T, e env) {
hs := health.NewHealthServer()
s, cc := setUp(hs, math.MaxUint32, e)
s, cc := setUp(hs, math.MaxUint32, "", e)
defer tearDown(s, cc)
if _, err := healthCheck(1*time.Second, cc, "grpc.health.v1alpha.HealthCheck"); err != grpc.Errorf(codes.NotFound, "unknown service") {
t.Fatalf("HealthCheck/Check(_, _) = _, %v, want _, error code %d", err, codes.NotFound)
@ -448,20 +454,24 @@ func testHealthCheckServingStatus(t *testing.T, e env) {
}
func TestEmptyUnary(t *testing.T) {
func TestEmptyUnaryWithUserAgent(t *testing.T) {
for _, e := range listTestEnv() {
testEmptyUnary(t, e)
testEmptyUnaryWithUserAgent(t, e)
}
}
func testEmptyUnary(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
func testEmptyUnaryWithUserAgent(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, testAppUA, e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{})
var header metadata.MD
reply, err := tc.EmptyCall(context.Background(), &testpb.Empty{}, grpc.Header(&header))
if err != nil || !proto.Equal(&testpb.Empty{}, reply) {
t.Fatalf("TestService/EmptyCall(_, _) = %v, %v, want %v, <nil>", reply, err, &testpb.Empty{})
}
if v, ok := header["ua"]; !ok || v != testAppUA {
t.Fatalf("header[\"ua\"] = %q, %t, want %q, true", v, ok, testAppUA)
}
}
func TestFailedEmptyUnary(t *testing.T) {
@ -471,7 +481,7 @@ func TestFailedEmptyUnary(t *testing.T) {
}
func testFailedEmptyUnary(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
ctx := metadata.NewContext(context.Background(), testMetadata)
@ -487,7 +497,7 @@ func TestLargeUnary(t *testing.T) {
}
func testLargeUnary(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
argSize := 271828
@ -515,7 +525,7 @@ func TestMetadataUnaryRPC(t *testing.T) {
}
func testMetadataUnaryRPC(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
argSize := 2718
@ -569,7 +579,7 @@ func TestRetry(t *testing.T) {
// TODO(zhaoq): Refactor to make this clearer and add more cases to test racy
// and error-prone paths.
func testRetry(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
var wg sync.WaitGroup
@ -599,7 +609,7 @@ func TestRPCTimeout(t *testing.T) {
// TODO(zhaoq): Have a better test coverage of timeout and cancellation mechanism.
func testRPCTimeout(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
argSize := 2718
@ -625,7 +635,7 @@ func TestCancel(t *testing.T) {
}
func testCancel(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
argSize := 2718
@ -657,7 +667,7 @@ func TestPingPong(t *testing.T) {
}
func testPingPong(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
stream, err := tc.FullDuplexCall(context.Background())
@ -708,7 +718,7 @@ func TestMetadataStreamingRPC(t *testing.T) {
}
func testMetadataStreamingRPC(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
ctx := metadata.NewContext(context.Background(), testMetadata)
@ -765,7 +775,7 @@ func TestServerStreaming(t *testing.T) {
}
func testServerStreaming(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
respParam := make([]*testpb.ResponseParameters, len(respSizes))
@ -817,7 +827,7 @@ func TestFailedServerStreaming(t *testing.T) {
}
func testFailedServerStreaming(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
respParam := make([]*testpb.ResponseParameters, len(respSizes))
@ -847,7 +857,7 @@ func TestClientStreaming(t *testing.T) {
}
func testClientStreaming(t *testing.T, e env) {
s, cc := setUp(nil, math.MaxUint32, e)
s, cc := setUp(nil, math.MaxUint32, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
stream, err := tc.StreamingInputCall(context.Background())
@ -882,7 +892,7 @@ func TestExceedMaxStreamsLimit(t *testing.T) {
func testExceedMaxStreamsLimit(t *testing.T, e env) {
// Only allows 1 live stream per server transport.
s, cc := setUp(nil, 1, e)
s, cc := setUp(nil, 1, "", e)
tc := testpb.NewTestServiceClient(cc)
defer tearDown(s, cc)
// Perform a unary RPC to make sure the new settings were propagated to the client.

View File

@ -54,6 +54,7 @@ import (
// http2Client implements the ClientTransport interface with HTTP2.
type http2Client struct {
target string // server name/addr
userAgent string
conn net.Conn // underlying communication channel
nextID uint32 // the next stream ID to be used
@ -158,9 +159,14 @@ func newHTTP2Client(addr string, opts *ConnectOptions) (_ ClientTransport, err e
return nil, ConnectionErrorf("transport: %v", err)
}
}
ua := primaryUA
if opts.UserAgent != "" {
ua += " " + opts.UserAgent
}
var buf bytes.Buffer
t := &http2Client{
target: addr,
userAgent: ua,
conn: conn,
// The client initiated stream id is odd starting from 1.
nextID: 1,
@ -273,7 +279,9 @@ func (t *http2Client) NewStream(ctx context.Context, callHdr *CallHdr) (_ *Strea
t.hEnc.WriteField(hpack.HeaderField{Name: ":path", Value: callHdr.Method})
t.hEnc.WriteField(hpack.HeaderField{Name: ":authority", Value: callHdr.Host})
t.hEnc.WriteField(hpack.HeaderField{Name: "content-type", Value: "application/grpc"})
t.hEnc.WriteField(hpack.HeaderField{Name: "user-agent", Value: t.userAgent})
t.hEnc.WriteField(hpack.HeaderField{Name: "te", Value: "trailers"})
if timeout > 0 {
t.hEnc.WriteField(hpack.HeaderField{Name: "grpc-timeout", Value: timeoutEncode(timeout)})
}

View File

@ -39,6 +39,7 @@ import (
"io"
"net"
"strconv"
"strings"
"sync/atomic"
"time"
@ -50,6 +51,8 @@ import (
)
const (
// The primary user agent
primaryUA = "grpc-go/0.7"
// http2MaxFrameLen specifies the max length of a HTTP2 frame.
http2MaxFrameLen = 16384 // 16KB frame
// http://http2.github.io/http2-spec/#SettingValues
@ -128,8 +131,7 @@ func isReservedHeader(hdr string) bool {
"grpc-message",
"grpc-status",
"grpc-timeout",
"te",
"user-agent":
"te":
return true
default:
return false
@ -161,6 +163,15 @@ func newHPACKDecoder() *hpackDecoder {
d.state.method = f.Value
default:
if !isReservedHeader(f.Name) {
if f.Name == "user-agent" {
str := strings.Split(f.Value, " ")
if len(str) <= 1 {
// There is no application user agent string being set.
return
}
// Extract the application user agent string.
f.Value = str[1]
}
if d.state.mdata == nil {
d.state.mdata = make(map[string]string)
}

View File

@ -320,8 +320,13 @@ func NewServerTransport(protocol string, conn net.Conn, maxStreams uint32) (Serv
// ConnectOptions covers all relevant options for dialing a server.
type ConnectOptions struct {
// UserAgent is the application user agent.
UserAgent string
// Dialer specifies how to dial a network address.
Dialer func(string, time.Duration) (net.Conn, error)
// AuthOptions stores the credentials required to setup a client connection and issue RPCs.
AuthOptions []credentials.Credentials
// Timeout specifies the timeout for dialing a client connection.
Timeout time.Duration
}