internal/transport: remove some unused fields from structs (#2213)
- Flush and Authority are never read by the transport. - Authority is used indirectly; move it to dialOptions. - Delay is only set to false.
This commit is contained in:
@ -240,8 +240,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn *
|
|||||||
creds := cc.dopts.copts.TransportCredentials
|
creds := cc.dopts.copts.TransportCredentials
|
||||||
if creds != nil && creds.Info().ServerName != "" {
|
if creds != nil && creds.Info().ServerName != "" {
|
||||||
cc.authority = creds.Info().ServerName
|
cc.authority = creds.Info().ServerName
|
||||||
} else if cc.dopts.insecure && cc.dopts.copts.Authority != "" {
|
} else if cc.dopts.insecure && cc.dopts.authority != "" {
|
||||||
cc.authority = cc.dopts.copts.Authority
|
cc.authority = cc.dopts.authority
|
||||||
} else {
|
} else {
|
||||||
// Use endpoint from "scheme://authority/endpoint" as the default
|
// Use endpoint from "scheme://authority/endpoint" as the default
|
||||||
// authority for ClientConn.
|
// authority for ClientConn.
|
||||||
|
@ -47,6 +47,7 @@ type dialOptions struct {
|
|||||||
insecure bool
|
insecure bool
|
||||||
timeout time.Duration
|
timeout time.Duration
|
||||||
scChan <-chan ServiceConfig
|
scChan <-chan ServiceConfig
|
||||||
|
authority string
|
||||||
copts transport.ConnectOptions
|
copts transport.ConnectOptions
|
||||||
callOptions []CallOption
|
callOptions []CallOption
|
||||||
// This is used by v1 balancer dial option WithBalancer to support v1
|
// This is used by v1 balancer dial option WithBalancer to support v1
|
||||||
@ -366,7 +367,7 @@ func WithStreamInterceptor(f StreamClientInterceptor) DialOption {
|
|||||||
// effect if TransportCredentials are present.
|
// effect if TransportCredentials are present.
|
||||||
func WithAuthority(a string) DialOption {
|
func WithAuthority(a string) DialOption {
|
||||||
return func(o *dialOptions) {
|
return func(o *dialOptions) {
|
||||||
o.copts.Authority = a
|
o.authority = a
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -274,9 +274,7 @@ func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts
|
|||||||
ht.writeCommonHeaders(s)
|
ht.writeCommonHeaders(s)
|
||||||
ht.rw.Write(hdr)
|
ht.rw.Write(hdr)
|
||||||
ht.rw.Write(data)
|
ht.rw.Write(data)
|
||||||
if !opts.Delay {
|
|
||||||
ht.rw.(http.Flusher).Flush()
|
ht.rw.(http.Flusher).Flush()
|
||||||
}
|
|
||||||
})
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -467,9 +467,6 @@ func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (S
|
|||||||
type ConnectOptions struct {
|
type ConnectOptions struct {
|
||||||
// UserAgent is the application user agent.
|
// UserAgent is the application user agent.
|
||||||
UserAgent string
|
UserAgent string
|
||||||
// Authority is the :authority pseudo-header to use. This field has no effect if
|
|
||||||
// TransportCredentials is set.
|
|
||||||
Authority string
|
|
||||||
// Dialer specifies how to dial a network address.
|
// Dialer specifies how to dial a network address.
|
||||||
Dialer func(context.Context, string) (net.Conn, error)
|
Dialer func(context.Context, string) (net.Conn, error)
|
||||||
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
|
// FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors.
|
||||||
@ -515,11 +512,6 @@ type Options struct {
|
|||||||
// Last indicates whether this write is the last piece for
|
// Last indicates whether this write is the last piece for
|
||||||
// this stream.
|
// this stream.
|
||||||
Last bool
|
Last bool
|
||||||
|
|
||||||
// Delay is a hint to the transport implementation for whether
|
|
||||||
// the data could be buffered for a batching write. The
|
|
||||||
// transport implementation may ignore the hint.
|
|
||||||
Delay bool
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// CallHdr carries the information of a particular RPC.
|
// CallHdr carries the information of a particular RPC.
|
||||||
@ -537,14 +529,6 @@ type CallHdr struct {
|
|||||||
// Creds specifies credentials.PerRPCCredentials for a call.
|
// Creds specifies credentials.PerRPCCredentials for a call.
|
||||||
Creds credentials.PerRPCCredentials
|
Creds credentials.PerRPCCredentials
|
||||||
|
|
||||||
// Flush indicates whether a new stream command should be sent
|
|
||||||
// to the peer without waiting for the first data. This is
|
|
||||||
// only a hint.
|
|
||||||
// If it's true, the transport may modify the flush decision
|
|
||||||
// for performance purposes.
|
|
||||||
// If it's false, new stream will never be flushed.
|
|
||||||
Flush bool
|
|
||||||
|
|
||||||
// ContentSubtype specifies the content-subtype for a request. For example, a
|
// ContentSubtype specifies the content-subtype for a request. For example, a
|
||||||
// content-subtype of "proto" will result in a content-type of
|
// content-subtype of "proto" will result in a content-type of
|
||||||
// "application/grpc+proto". The value of ContentSubtype must be all
|
// "application/grpc+proto". The value of ContentSubtype must be all
|
||||||
|
@ -498,7 +498,7 @@ func TestMaxConnectionIdle(t *testing.T) {
|
|||||||
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
stream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
|
stream, err := client.NewStream(context.Background(), &CallHdr{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Client failed to create RPC request: %v", err)
|
t.Fatalf("Client failed to create RPC request: %v", err)
|
||||||
}
|
}
|
||||||
@ -525,7 +525,7 @@ func TestMaxConnectionIdleNegative(t *testing.T) {
|
|||||||
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {})
|
||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
_, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
|
_, err := client.NewStream(context.Background(), &CallHdr{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Client failed to create RPC request: %v", err)
|
t.Fatalf("Client failed to create RPC request: %v", err)
|
||||||
}
|
}
|
||||||
@ -697,7 +697,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
|
|||||||
}
|
}
|
||||||
defer conn.Close()
|
defer conn.Close()
|
||||||
// Create a stream.
|
// Create a stream.
|
||||||
_, err := tr.NewStream(context.Background(), &CallHdr{Flush: true})
|
_, err := tr.NewStream(context.Background(), &CallHdr{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create a new stream: %v", err)
|
t.Fatalf("Failed to create a new stream: %v", err)
|
||||||
}
|
}
|
||||||
@ -782,7 +782,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) {
|
|||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
if _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}); err != nil {
|
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
|
||||||
t.Fatalf("Client failed to create stream.")
|
t.Fatalf("Client failed to create stream.")
|
||||||
}
|
}
|
||||||
timeout := time.NewTimer(10 * time.Second)
|
timeout := time.NewTimer(10 * time.Second)
|
||||||
@ -848,7 +848,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) {
|
|||||||
defer server.stop()
|
defer server.stop()
|
||||||
defer client.Close()
|
defer client.Close()
|
||||||
|
|
||||||
if _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}); err != nil {
|
if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil {
|
||||||
t.Fatalf("Client failed to create stream.")
|
t.Fatalf("Client failed to create stream.")
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -883,10 +883,7 @@ func TestClientSendAndReceive(t *testing.T) {
|
|||||||
if s2.id != 3 {
|
if s2.id != 3 {
|
||||||
t.Fatalf("wrong stream id: %d", s2.id)
|
t.Fatalf("wrong stream id: %d", s2.id)
|
||||||
}
|
}
|
||||||
opts := Options{
|
opts := Options{Last: true}
|
||||||
Last: true,
|
|
||||||
Delay: false,
|
|
||||||
}
|
|
||||||
if err := ct.Write(s1, nil, expectedRequest, &opts); err != nil && err != io.EOF {
|
if err := ct.Write(s1, nil, expectedRequest, &opts); err != nil && err != io.EOF {
|
||||||
t.Fatalf("failed to send data: %v", err)
|
t.Fatalf("failed to send data: %v", err)
|
||||||
}
|
}
|
||||||
@ -920,10 +917,7 @@ func performOneRPC(ct ClientTransport) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
opts := Options{
|
opts := Options{Last: true}
|
||||||
Last: true,
|
|
||||||
Delay: false,
|
|
||||||
}
|
|
||||||
if err := ct.Write(s, []byte{}, expectedRequest, &opts); err == nil || err == io.EOF {
|
if err := ct.Write(s, []byte{}, expectedRequest, &opts); err == nil || err == io.EOF {
|
||||||
time.Sleep(5 * time.Millisecond)
|
time.Sleep(5 * time.Millisecond)
|
||||||
// The following s.Recv()'s could error out because the
|
// The following s.Recv()'s could error out because the
|
||||||
@ -968,7 +962,7 @@ func TestLargeMessage(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
|
t.Errorf("%v.NewStream(_, _) = _, %v, want _, <nil>", ct, err)
|
||||||
}
|
}
|
||||||
if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF {
|
if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true}); err != nil && err != io.EOF {
|
||||||
t.Errorf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
|
t.Errorf("%v.Write(_, _, _) = %v, want <nil>", ct, err)
|
||||||
}
|
}
|
||||||
p := make([]byte, len(expectedResponseLarge))
|
p := make([]byte, len(expectedResponseLarge))
|
||||||
@ -1342,7 +1336,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
|
|||||||
notifyChan := make(chan struct{})
|
notifyChan := make(chan struct{})
|
||||||
server.h.notify = notifyChan
|
server.h.notify = notifyChan
|
||||||
server.mu.Unlock()
|
server.mu.Unlock()
|
||||||
cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
|
cstream1, err := client.NewStream(context.Background(), &CallHdr{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Client failed to create first stream. Err: %v", err)
|
t.Fatalf("Client failed to create first stream. Err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1369,7 +1363,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) {
|
|||||||
server.h.notify = notifyChan
|
server.h.notify = notifyChan
|
||||||
server.mu.Unlock()
|
server.mu.Unlock()
|
||||||
// Create another stream on client.
|
// Create another stream on client.
|
||||||
cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
|
cstream2, err := client.NewStream(context.Background(), &CallHdr{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Client failed to create second stream. Err: %v", err)
|
t.Fatalf("Client failed to create second stream. Err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1424,7 +1418,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
|
|||||||
st = k.(*http2Server)
|
st = k.(*http2Server)
|
||||||
}
|
}
|
||||||
server.mu.Unlock()
|
server.mu.Unlock()
|
||||||
cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
|
cstream1, err := client.NewStream(context.Background(), &CallHdr{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create 1st stream. Err: %v", err)
|
t.Fatalf("Failed to create 1st stream. Err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1433,7 +1427,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) {
|
|||||||
t.Fatalf("Client failed to write data. Err: %v", err)
|
t.Fatalf("Client failed to write data. Err: %v", err)
|
||||||
}
|
}
|
||||||
//Client should be able to create another stream and send data on it.
|
//Client should be able to create another stream and send data on it.
|
||||||
cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
|
cstream2, err := client.NewStream(context.Background(), &CallHdr{})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Failed to create 2nd stream. Err: %v", err)
|
t.Fatalf("Failed to create 2nd stream. Err: %v", err)
|
||||||
}
|
}
|
||||||
@ -1668,10 +1662,7 @@ func TestEncodingRequiredStatus(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
opts := Options{
|
opts := Options{Last: true}
|
||||||
Last: true,
|
|
||||||
Delay: false,
|
|
||||||
}
|
|
||||||
if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != errStreamDone {
|
if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != errStreamDone {
|
||||||
t.Fatalf("Failed to write the request: %v", err)
|
t.Fatalf("Failed to write the request: %v", err)
|
||||||
}
|
}
|
||||||
@ -2069,7 +2060,7 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream
|
|||||||
cancel() // Do not cancel in success path.
|
cancel() // Do not cancel in success path.
|
||||||
t.Fatalf("Error creating client. Err: %v", err)
|
t.Fatalf("Error creating client. Err: %v", err)
|
||||||
}
|
}
|
||||||
stream, err = client.NewStream(context.Background(), &CallHdr{Method: "bogus/method", Flush: true})
|
stream, err = client.NewStream(context.Background(), &CallHdr{Method: "bogus/method"})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Fatalf("Error creating stream at client-side. Err: %v", err)
|
t.Fatalf("Error creating stream at client-side. Err: %v", err)
|
||||||
}
|
}
|
||||||
|
@ -1047,10 +1047,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport.
|
|||||||
if trInfo != nil {
|
if trInfo != nil {
|
||||||
trInfo.tr.LazyLog(stringer("OK"), false)
|
trInfo.tr.LazyLog(stringer("OK"), false)
|
||||||
}
|
}
|
||||||
opts := &transport.Options{
|
opts := &transport.Options{Last: true}
|
||||||
Last: true,
|
|
||||||
Delay: false,
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
|
if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil {
|
||||||
if err == io.EOF {
|
if err == io.EOF {
|
||||||
|
@ -196,11 +196,6 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth
|
|||||||
callHdr := &transport.CallHdr{
|
callHdr := &transport.CallHdr{
|
||||||
Host: cc.authority,
|
Host: cc.authority,
|
||||||
Method: method,
|
Method: method,
|
||||||
// If it's not client streaming, we should already have the request to be sent,
|
|
||||||
// so we don't flush the header.
|
|
||||||
// If it's client streaming, the user may never send a request or send it any
|
|
||||||
// time soon, so we ask the transport to flush the header.
|
|
||||||
Flush: desc.ClientStreams,
|
|
||||||
ContentSubtype: c.contentSubtype,
|
ContentSubtype: c.contentSubtype,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Reference in New Issue
Block a user