diff --git a/benchmark/benchmark.go b/benchmark/benchmark.go index 7c8578e8..f09fa454 100644 --- a/benchmark/benchmark.go +++ b/benchmark/benchmark.go @@ -163,6 +163,8 @@ func StartServer(info ServerInfo, opts ...grpc.ServerOption) (string, func()) { if nw != nil { lis = nw.Listener(lis) } + opts = append(opts, grpc.WriteBufferSize(128*1024)) + opts = append(opts, grpc.ReadBufferSize(128*1024)) s := grpc.NewServer(opts...) switch info.Type { case "protobuf": @@ -236,6 +238,8 @@ func DoByteBufStreamingRoundTrip(stream testpb.BenchmarkService_StreamingCallCli // NewClientConn creates a gRPC client connection to addr. func NewClientConn(addr string, opts ...grpc.DialOption) *grpc.ClientConn { + opts = append(opts, grpc.WithWriteBufferSize(128*1024)) + opts = append(opts, grpc.WithReadBufferSize(128*1024)) conn, err := grpc.Dial(addr, opts...) if err != nil { grpclog.Fatalf("NewClientConn(%q) failed to create a ClientConn %v", addr, err) diff --git a/clientconn.go b/clientconn.go index 6691d8af..d0d479b4 100644 --- a/clientconn.go +++ b/clientconn.go @@ -100,6 +100,22 @@ const ( // DialOption configures how we set up the connection. type DialOption func(*dialOptions) +// WithWriteBufferSize lets you set the size of write buffer, this determines how much data can be batched +// before doing a write on the wire. +func WithWriteBufferSize(s int) DialOption { + return func(o *dialOptions) { + o.copts.WriteBufferSize = s + } +} + +// WithReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most +// for each read syscall. +func WithReadBufferSize(s int) DialOption { + return func(o *dialOptions) { + o.copts.ReadBufferSize = s + } +} + // WithInitialWindowSize returns a DialOption which sets the value for initial window size on a stream. // The lower bound for window size is 64K and any value smaller than that will be ignored. func WithInitialWindowSize(s int32) DialOption { diff --git a/server.go b/server.go index 83fd168c..bcaaa7df 100644 --- a/server.go +++ b/server.go @@ -116,6 +116,8 @@ type options struct { keepalivePolicy keepalive.EnforcementPolicy initialWindowSize int32 initialConnWindowSize int32 + writeBufferSize int + readBufferSize int } var defaultServerOptions = options{ @@ -126,6 +128,22 @@ var defaultServerOptions = options{ // A ServerOption sets options such as credentials, codec and keepalive parameters, etc. type ServerOption func(*options) +// WriteBufferSize lets you set the size of write buffer, this determines how much data can be batched +// before doing a write on the wire. +func WriteBufferSize(s int) ServerOption { + return func(o *options) { + o.writeBufferSize = s + } +} + +// ReadBufferSize lets you set the size of read buffer, this determines how much data can be read at most +// for one read syscall. +func ReadBufferSize(s int) ServerOption { + return func(o *options) { + o.readBufferSize = s + } +} + // InitialWindowSize returns a ServerOption that sets window size for stream. // The lower bound for window size is 64K and any value smaller than that will be ignored. func InitialWindowSize(s int32) ServerOption { @@ -524,6 +542,8 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) KeepalivePolicy: s.opts.keepalivePolicy, InitialWindowSize: s.opts.initialWindowSize, InitialConnWindowSize: s.opts.initialConnWindowSize, + WriteBufferSize: s.opts.writeBufferSize, + ReadBufferSize: s.opts.readBufferSize, } st, err := transport.NewServerTransport("http2", c, config) if err != nil { diff --git a/transport/http2_client.go b/transport/http2_client.go index 92ad868f..3d0b5424 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -194,6 +194,14 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( dynamicWindow = false } var buf bytes.Buffer + writeBufSize := defaultWriteBufSize + if opts.WriteBufferSize > 0 { + writeBufSize = opts.WriteBufferSize + } + readBufSize := defaultReadBufSize + if opts.ReadBufferSize > 0 { + readBufSize = opts.ReadBufferSize + } t := &http2Client{ ctx: ctx, target: addr.Addr, @@ -209,9 +217,9 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) ( errorChan: make(chan struct{}), goAway: make(chan struct{}), awakenKeepalive: make(chan struct{}, 1), - framer: newFramer(conn), hBuf: &buf, hEnc: hpack.NewEncoder(&buf), + framer: newFramer(conn, writeBufSize, readBufSize), controlBuf: newControlBuffer(), fc: &inFlow{limit: uint32(icwz)}, sendQuotaPool: newQuotaPool(defaultWindowSize), diff --git a/transport/http2_server.go b/transport/http2_server.go index 0f0e7599..8763b4c5 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -116,7 +116,15 @@ type http2Server struct { // newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is // returned if something goes wrong. func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err error) { - framer := newFramer(conn) + writeBufSize := defaultWriteBufSize + if config.WriteBufferSize > 0 { + writeBufSize = config.WriteBufferSize + } + readBufSize := defaultReadBufSize + if config.ReadBufferSize > 0 { + readBufSize = config.ReadBufferSize + } + framer := newFramer(conn, writeBufSize, readBufSize) // Send initial settings as connection preface to client. var isettings []http2.Setting // TODO(zhaoq): Have a better way to signal "no limit" because 0 is diff --git a/transport/http_util.go b/transport/http_util.go index 831813fd..39f878cf 100644 --- a/transport/http_util.go +++ b/transport/http_util.go @@ -44,7 +44,8 @@ const ( // http://http2.github.io/http2-spec/#SettingValues http2InitHeaderTableSize = 4096 // http2IOBufSize specifies the buffer size for sending frames. - http2IOBufSize = 32 * 1024 + defaultWriteBufSize = 32 * 1024 + defaultReadBufSize = 32 * 1024 ) var ( @@ -474,10 +475,10 @@ type framer struct { fr *http2.Framer } -func newFramer(conn net.Conn) *framer { +func newFramer(conn net.Conn, writeBufferSize, readBufferSize int) *framer { f := &framer{ - reader: bufio.NewReaderSize(conn, http2IOBufSize), - writer: bufio.NewWriterSize(conn, http2IOBufSize), + reader: bufio.NewReaderSize(conn, readBufferSize), + writer: bufio.NewWriterSize(conn, writeBufferSize), } f.fr = http2.NewFramer(f.writer, f.reader) // Opt-in to Frame reuse API on framer to reduce garbage. diff --git a/transport/transport.go b/transport/transport.go index 594ccd6a..f8d8faed 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -472,6 +472,8 @@ type ServerConfig struct { KeepalivePolicy keepalive.EnforcementPolicy InitialWindowSize int32 InitialConnWindowSize int32 + WriteBufferSize int + ReadBufferSize int } // NewServerTransport creates a ServerTransport with conn or non-nil error @@ -503,6 +505,10 @@ type ConnectOptions struct { InitialWindowSize int32 // InitialConnWindowSize sets the initial window size for a connection. InitialConnWindowSize int32 + // WriteBufferSize sets the size of write buffer which in turn determines how much data can be batched before it's written on the wire. + WriteBufferSize int + // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. + ReadBufferSize int } // TargetInfo contains the information of the target such as network address and metadata. @@ -526,7 +532,7 @@ type Options struct { // Delay is a hint to the transport implementation for whether // the data could be buffered for a batching write. The - // Transport implementation may ignore the hint. + // transport implementation may ignore the hint. Delay bool } diff --git a/transport/transport_test.go b/transport/transport_test.go index f30ebc6d..2bb2b711 100644 --- a/transport/transport_test.go +++ b/transport/transport_test.go @@ -1989,8 +1989,8 @@ func (s *httpServer) start(t *testing.T, lis net.Listener) { t.Errorf("Error at server-side while reading preface from cleint. Err: %v", err) return } - reader := bufio.NewReaderSize(s.conn, http2IOBufSize) - writer := bufio.NewWriterSize(s.conn, http2IOBufSize) + reader := bufio.NewReaderSize(s.conn, defaultWriteBufSize) + writer := bufio.NewWriterSize(s.conn, defaultReadBufSize) framer := http2.NewFramer(writer, reader) if err = framer.WriteSettingsAck(); err != nil { t.Errorf("Error at server-side while sending Settings ack. Err: %v", err)