diff --git a/clientconn.go b/clientconn.go index 17796b2e..d2c4f988 100644 --- a/clientconn.go +++ b/clientconn.go @@ -240,8 +240,8 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * creds := cc.dopts.copts.TransportCredentials if creds != nil && creds.Info().ServerName != "" { cc.authority = creds.Info().ServerName - } else if cc.dopts.insecure && cc.dopts.copts.Authority != "" { - cc.authority = cc.dopts.copts.Authority + } else if cc.dopts.insecure && cc.dopts.authority != "" { + cc.authority = cc.dopts.authority } else { // Use endpoint from "scheme://authority/endpoint" as the default // authority for ClientConn. diff --git a/dialoptions.go b/dialoptions.go index 584ed30d..ea9a3c1b 100644 --- a/dialoptions.go +++ b/dialoptions.go @@ -47,6 +47,7 @@ type dialOptions struct { insecure bool timeout time.Duration scChan <-chan ServiceConfig + authority string copts transport.ConnectOptions callOptions []CallOption // This is used by v1 balancer dial option WithBalancer to support v1 @@ -366,7 +367,7 @@ func WithStreamInterceptor(f StreamClientInterceptor) DialOption { // effect if TransportCredentials are present. func WithAuthority(a string) DialOption { return func(o *dialOptions) { - o.copts.Authority = a + o.authority = a } } diff --git a/internal/transport/handler_server.go b/internal/transport/handler_server.go index f71b7482..0d7cad45 100644 --- a/internal/transport/handler_server.go +++ b/internal/transport/handler_server.go @@ -274,9 +274,7 @@ func (ht *serverHandlerTransport) Write(s *Stream, hdr []byte, data []byte, opts ht.writeCommonHeaders(s) ht.rw.Write(hdr) ht.rw.Write(data) - if !opts.Delay { - ht.rw.(http.Flusher).Flush() - } + ht.rw.(http.Flusher).Flush() }) } diff --git a/internal/transport/transport.go b/internal/transport/transport.go index 4ae3a5cb..192f55a5 100644 --- a/internal/transport/transport.go +++ b/internal/transport/transport.go @@ -467,9 +467,6 @@ func NewServerTransport(protocol string, conn net.Conn, config *ServerConfig) (S type ConnectOptions struct { // UserAgent is the application user agent. UserAgent string - // Authority is the :authority pseudo-header to use. This field has no effect if - // TransportCredentials is set. - Authority string // Dialer specifies how to dial a network address. Dialer func(context.Context, string) (net.Conn, error) // FailOnNonTempDialError specifies if gRPC fails on non-temporary dial errors. @@ -515,11 +512,6 @@ type Options struct { // Last indicates whether this write is the last piece for // this stream. Last bool - - // Delay is a hint to the transport implementation for whether - // the data could be buffered for a batching write. The - // transport implementation may ignore the hint. - Delay bool } // CallHdr carries the information of a particular RPC. @@ -537,14 +529,6 @@ type CallHdr struct { // Creds specifies credentials.PerRPCCredentials for a call. Creds credentials.PerRPCCredentials - // Flush indicates whether a new stream command should be sent - // to the peer without waiting for the first data. This is - // only a hint. - // If it's true, the transport may modify the flush decision - // for performance purposes. - // If it's false, new stream will never be flushed. - Flush bool - // ContentSubtype specifies the content-subtype for a request. For example, a // content-subtype of "proto" will result in a content-type of // "application/grpc+proto". The value of ContentSubtype must be all diff --git a/internal/transport/transport_test.go b/internal/transport/transport_test.go index fb95e7c5..0a3d5adc 100644 --- a/internal/transport/transport_test.go +++ b/internal/transport/transport_test.go @@ -498,7 +498,7 @@ func TestMaxConnectionIdle(t *testing.T) { server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) defer server.stop() defer client.Close() - stream, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + stream, err := client.NewStream(context.Background(), &CallHdr{}) if err != nil { t.Fatalf("Client failed to create RPC request: %v", err) } @@ -525,7 +525,7 @@ func TestMaxConnectionIdleNegative(t *testing.T) { server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{}, func() {}) defer server.stop() defer client.Close() - _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + _, err := client.NewStream(context.Background(), &CallHdr{}) if err != nil { t.Fatalf("Client failed to create RPC request: %v", err) } @@ -697,7 +697,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) { } defer conn.Close() // Create a stream. - _, err := tr.NewStream(context.Background(), &CallHdr{Flush: true}) + _, err := tr.NewStream(context.Background(), &CallHdr{}) if err != nil { t.Fatalf("Failed to create a new stream: %v", err) } @@ -782,7 +782,7 @@ func TestKeepaliveServerEnforcementWithAbusiveClientWithRPC(t *testing.T) { defer server.stop() defer client.Close() - if _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}); err != nil { + if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { t.Fatalf("Client failed to create stream.") } timeout := time.NewTimer(10 * time.Second) @@ -848,7 +848,7 @@ func TestKeepaliveServerEnforcementWithObeyingClientWithRPC(t *testing.T) { defer server.stop() defer client.Close() - if _, err := client.NewStream(context.Background(), &CallHdr{Flush: true}); err != nil { + if _, err := client.NewStream(context.Background(), &CallHdr{}); err != nil { t.Fatalf("Client failed to create stream.") } @@ -883,10 +883,7 @@ func TestClientSendAndReceive(t *testing.T) { if s2.id != 3 { t.Fatalf("wrong stream id: %d", s2.id) } - opts := Options{ - Last: true, - Delay: false, - } + opts := Options{Last: true} if err := ct.Write(s1, nil, expectedRequest, &opts); err != nil && err != io.EOF { t.Fatalf("failed to send data: %v", err) } @@ -920,10 +917,7 @@ func performOneRPC(ct ClientTransport) { if err != nil { return } - opts := Options{ - Last: true, - Delay: false, - } + opts := Options{Last: true} if err := ct.Write(s, []byte{}, expectedRequest, &opts); err == nil || err == io.EOF { time.Sleep(5 * time.Millisecond) // The following s.Recv()'s could error out because the @@ -968,7 +962,7 @@ func TestLargeMessage(t *testing.T) { if err != nil { t.Errorf("%v.NewStream(_, _) = _, %v, want _, ", ct, err) } - if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true, Delay: false}); err != nil && err != io.EOF { + if err := ct.Write(s, []byte{}, expectedRequestLarge, &Options{Last: true}); err != nil && err != io.EOF { t.Errorf("%v.Write(_, _, _) = %v, want ", ct, err) } p := make([]byte, len(expectedResponseLarge)) @@ -1342,7 +1336,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) { notifyChan := make(chan struct{}) server.h.notify = notifyChan server.mu.Unlock() - cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + cstream1, err := client.NewStream(context.Background(), &CallHdr{}) if err != nil { t.Fatalf("Client failed to create first stream. Err: %v", err) } @@ -1369,7 +1363,7 @@ func TestClientConnDecoupledFromApplicationRead(t *testing.T) { server.h.notify = notifyChan server.mu.Unlock() // Create another stream on client. - cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + cstream2, err := client.NewStream(context.Background(), &CallHdr{}) if err != nil { t.Fatalf("Client failed to create second stream. Err: %v", err) } @@ -1424,7 +1418,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) { st = k.(*http2Server) } server.mu.Unlock() - cstream1, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + cstream1, err := client.NewStream(context.Background(), &CallHdr{}) if err != nil { t.Fatalf("Failed to create 1st stream. Err: %v", err) } @@ -1433,7 +1427,7 @@ func TestServerConnDecoupledFromApplicationRead(t *testing.T) { t.Fatalf("Client failed to write data. Err: %v", err) } //Client should be able to create another stream and send data on it. - cstream2, err := client.NewStream(context.Background(), &CallHdr{Flush: true}) + cstream2, err := client.NewStream(context.Background(), &CallHdr{}) if err != nil { t.Fatalf("Failed to create 2nd stream. Err: %v", err) } @@ -1668,10 +1662,7 @@ func TestEncodingRequiredStatus(t *testing.T) { if err != nil { return } - opts := Options{ - Last: true, - Delay: false, - } + opts := Options{Last: true} if err := ct.Write(s, nil, expectedRequest, &opts); err != nil && err != errStreamDone { t.Fatalf("Failed to write the request: %v", err) } @@ -2069,7 +2060,7 @@ func setUpHTTPStatusTest(t *testing.T, httpStatus int, wh writeHeaders) (stream cancel() // Do not cancel in success path. t.Fatalf("Error creating client. Err: %v", err) } - stream, err = client.NewStream(context.Background(), &CallHdr{Method: "bogus/method", Flush: true}) + stream, err = client.NewStream(context.Background(), &CallHdr{Method: "bogus/method"}) if err != nil { t.Fatalf("Error creating stream at client-side. Err: %v", err) } diff --git a/server.go b/server.go index 3f64e219..fe83421f 100644 --- a/server.go +++ b/server.go @@ -1047,10 +1047,7 @@ func (s *Server) processUnaryRPC(t transport.ServerTransport, stream *transport. if trInfo != nil { trInfo.tr.LazyLog(stringer("OK"), false) } - opts := &transport.Options{ - Last: true, - Delay: false, - } + opts := &transport.Options{Last: true} if err := s.sendResponse(t, stream, reply, cp, opts, comp); err != nil { if err == io.EOF { diff --git a/stream.go b/stream.go index 70b19848..65d45a1d 100644 --- a/stream.go +++ b/stream.go @@ -194,13 +194,8 @@ func newClientStream(ctx context.Context, desc *StreamDesc, cc *ClientConn, meth } callHdr := &transport.CallHdr{ - Host: cc.authority, - Method: method, - // If it's not client streaming, we should already have the request to be sent, - // so we don't flush the header. - // If it's client streaming, the user may never send a request or send it any - // time soon, so we ask the transport to flush the header. - Flush: desc.ClientStreams, + Host: cc.authority, + Method: method, ContentSubtype: c.contentSubtype, }