Support max age(#1119)
The new logic added to the server does the following: - Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle. - Gracefully closes any connection after a duration of keepalive.MaxConnectionAge. - Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge. - Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection after an additional duration of keepalive.Timeout.
This commit is contained in:
@ -50,3 +50,20 @@ type ClientParameters struct {
|
||||
// If true, client runs keepalive checks even with no active RPCs.
|
||||
PermitWithoutStream bool
|
||||
}
|
||||
|
||||
// ServerParameters is used to set keepalive and max-age parameters on the server-side.
|
||||
type ServerParameters struct {
|
||||
// MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
|
||||
// Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
|
||||
MaxConnectionIdle time.Duration
|
||||
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway.
|
||||
// A random jitter of +/-10% will be added to MaxConnectionAge to spread out connection storms.
|
||||
MaxConnectionAge time.Duration
|
||||
// MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
|
||||
MaxConnectionAgeGrace time.Duration
|
||||
// After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
|
||||
Time time.Duration
|
||||
// After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
|
||||
// the connection is closed.
|
||||
Timeout time.Duration
|
||||
}
|
||||
|
18
server.go
18
server.go
@ -53,6 +53,7 @@ import (
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/internal"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/stats"
|
||||
"google.golang.org/grpc/tap"
|
||||
@ -117,6 +118,7 @@ type options struct {
|
||||
maxConcurrentStreams uint32
|
||||
useHandlerImpl bool // use http.Handler-based server
|
||||
unknownStreamDesc *StreamDesc
|
||||
keepaliveParams keepalive.ServerParameters
|
||||
}
|
||||
|
||||
var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size limit
|
||||
@ -124,6 +126,13 @@ var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size l
|
||||
// A ServerOption sets options.
|
||||
type ServerOption func(*options)
|
||||
|
||||
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
|
||||
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
|
||||
return func(o *options) {
|
||||
o.keepaliveParams = kp
|
||||
}
|
||||
}
|
||||
|
||||
// CustomCodec returns a ServerOption that sets a codec for message marshaling and unmarshaling.
|
||||
func CustomCodec(codec Codec) ServerOption {
|
||||
return func(o *options) {
|
||||
@ -465,10 +474,11 @@ func (s *Server) handleRawConn(rawConn net.Conn) {
|
||||
// transport.NewServerTransport).
|
||||
func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) {
|
||||
config := &transport.ServerConfig{
|
||||
MaxStreams: s.opts.maxConcurrentStreams,
|
||||
AuthInfo: authInfo,
|
||||
InTapHandle: s.opts.inTapHandle,
|
||||
StatsHandler: s.opts.statsHandler,
|
||||
MaxStreams: s.opts.maxConcurrentStreams,
|
||||
AuthInfo: authInfo,
|
||||
InTapHandle: s.opts.inTapHandle,
|
||||
StatsHandler: s.opts.statsHandler,
|
||||
KeepaliveParams: s.opts.keepaliveParams,
|
||||
}
|
||||
st, err := transport.NewServerTransport("http2", c, config)
|
||||
if err != nil {
|
||||
|
@ -46,12 +46,17 @@ const (
|
||||
// The default value of flow control window size in HTTP2 spec.
|
||||
defaultWindowSize = 65535
|
||||
// The initial window size for flow control.
|
||||
initialWindowSize = defaultWindowSize // for an RPC
|
||||
initialConnWindowSize = defaultWindowSize * 16 // for a connection
|
||||
infinity = time.Duration(math.MaxInt64)
|
||||
defaultKeepaliveTime = infinity
|
||||
defaultKeepaliveTimeout = time.Duration(20 * time.Second)
|
||||
defaultMaxStreamsClient = 100
|
||||
initialWindowSize = defaultWindowSize // for an RPC
|
||||
initialConnWindowSize = defaultWindowSize * 16 // for a connection
|
||||
infinity = time.Duration(math.MaxInt64)
|
||||
defaultClientKeepaliveTime = infinity
|
||||
defaultClientKeepaliveTimeout = time.Duration(20 * time.Second)
|
||||
defaultMaxStreamsClient = 100
|
||||
defaultMaxConnectionIdle = infinity
|
||||
defaultMaxConnectionAge = infinity
|
||||
defaultMaxConnectionAgeGrace = infinity
|
||||
defaultServerKeepaliveTime = time.Duration(2 * time.Hour)
|
||||
defaultServerKeepaliveTimeout = time.Duration(20 * time.Second)
|
||||
)
|
||||
|
||||
// The following defines various control items which could flow through
|
||||
|
@ -194,10 +194,10 @@ func newHTTP2Client(ctx context.Context, addr TargetInfo, opts ConnectOptions) (
|
||||
kp := opts.KeepaliveParams
|
||||
// Validate keepalive parameters.
|
||||
if kp.Time == 0 {
|
||||
kp.Time = defaultKeepaliveTime
|
||||
kp.Time = defaultClientKeepaliveTime
|
||||
}
|
||||
if kp.Timeout == 0 {
|
||||
kp.Timeout = defaultKeepaliveTimeout
|
||||
kp.Timeout = defaultClientKeepaliveTimeout
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
t := &http2Client{
|
||||
|
@ -38,9 +38,12 @@ import (
|
||||
"errors"
|
||||
"io"
|
||||
"math"
|
||||
"math/rand"
|
||||
"net"
|
||||
"strconv"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"golang.org/x/net/context"
|
||||
"golang.org/x/net/http2"
|
||||
@ -48,6 +51,7 @@ import (
|
||||
"google.golang.org/grpc/codes"
|
||||
"google.golang.org/grpc/credentials"
|
||||
"google.golang.org/grpc/grpclog"
|
||||
"google.golang.org/grpc/keepalive"
|
||||
"google.golang.org/grpc/metadata"
|
||||
"google.golang.org/grpc/peer"
|
||||
"google.golang.org/grpc/stats"
|
||||
@ -90,11 +94,22 @@ type http2Server struct {
|
||||
|
||||
stats stats.Handler
|
||||
|
||||
// Flag to keep track of reading activity on transport.
|
||||
// 1 is true and 0 is false.
|
||||
activity uint32 // Accessed atomically.
|
||||
// Keepalive and max-age parameters for the server.
|
||||
kp keepalive.ServerParameters
|
||||
|
||||
mu sync.Mutex // guard the following
|
||||
state transportState
|
||||
activeStreams map[uint32]*Stream
|
||||
// the per-stream outbound flow control window size set by the peer.
|
||||
streamSendQuota uint32
|
||||
// idle is the time instant when the connection went idle.
|
||||
// This is either the begining of the connection or when the number of
|
||||
// RPCs go down to 0.
|
||||
// When the connection is busy, this value is set to 0.
|
||||
idle time.Time
|
||||
}
|
||||
|
||||
// newHTTP2Server constructs a ServerTransport based on HTTP2. ConnectionError is
|
||||
@ -128,6 +143,24 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||
return nil, connectionErrorf(true, err, "transport: %v", err)
|
||||
}
|
||||
}
|
||||
kp := config.KeepaliveParams
|
||||
if kp.MaxConnectionIdle == 0 {
|
||||
kp.MaxConnectionIdle = defaultMaxConnectionIdle
|
||||
}
|
||||
if kp.MaxConnectionAge == 0 {
|
||||
kp.MaxConnectionAge = defaultMaxConnectionAge
|
||||
}
|
||||
// Add a jitter to MaxConnectionAge.
|
||||
kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
|
||||
if kp.MaxConnectionAgeGrace == 0 {
|
||||
kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
|
||||
}
|
||||
if kp.Time == 0 {
|
||||
kp.Time = defaultServerKeepaliveTime
|
||||
}
|
||||
if kp.Timeout == 0 {
|
||||
kp.Timeout = defaultServerKeepaliveTimeout
|
||||
}
|
||||
var buf bytes.Buffer
|
||||
t := &http2Server{
|
||||
ctx: context.Background(),
|
||||
@ -149,6 +182,8 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||
activeStreams: make(map[uint32]*Stream),
|
||||
streamSendQuota: defaultWindowSize,
|
||||
stats: config.StatsHandler,
|
||||
kp: kp,
|
||||
idle: time.Now(),
|
||||
}
|
||||
if t.stats != nil {
|
||||
t.ctx = t.stats.TagConn(t.ctx, &stats.ConnTagInfo{
|
||||
@ -159,6 +194,7 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
|
||||
t.stats.HandleConn(t.ctx, connBegin)
|
||||
}
|
||||
go t.controller()
|
||||
go t.keepalive()
|
||||
t.writableChan <- 0
|
||||
return t, nil
|
||||
}
|
||||
@ -248,6 +284,9 @@ func (t *http2Server) operateHeaders(frame *http2.MetaHeadersFrame, handle func(
|
||||
t.maxStreamID = s.id
|
||||
s.sendQuotaPool = newQuotaPool(int(t.streamSendQuota))
|
||||
t.activeStreams[s.id] = s
|
||||
if len(t.activeStreams) == 1 {
|
||||
t.idle = time.Time{}
|
||||
}
|
||||
t.mu.Unlock()
|
||||
s.windowHandler = func(n int) {
|
||||
t.updateWindow(s, uint32(n))
|
||||
@ -295,6 +334,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
|
||||
t.Close()
|
||||
return
|
||||
}
|
||||
atomic.StoreUint32(&t.activity, 1)
|
||||
sf, ok := frame.(*http2.SettingsFrame)
|
||||
if !ok {
|
||||
grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
|
||||
@ -305,6 +345,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
|
||||
|
||||
for {
|
||||
frame, err := t.framer.readFrame()
|
||||
atomic.StoreUint32(&t.activity, 1)
|
||||
if err != nil {
|
||||
if se, ok := err.(http2.StreamError); ok {
|
||||
t.mu.Lock()
|
||||
@ -735,6 +776,91 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
|
||||
}
|
||||
}
|
||||
|
||||
// keepalive running in a separate goroutine does the following:
|
||||
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
|
||||
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
|
||||
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
|
||||
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection
|
||||
// after an additional duration of keepalive.Timeout.
|
||||
func (t *http2Server) keepalive() {
|
||||
p := &ping{}
|
||||
var pingSent bool
|
||||
maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
|
||||
maxAge := time.NewTimer(t.kp.MaxConnectionAge)
|
||||
keepalive := time.NewTimer(t.kp.Time)
|
||||
// NOTE: All exit paths of this function should reset their
|
||||
// respecitve timers. A failure to do so will cause the
|
||||
// following clean-up to deadlock and eventually leak.
|
||||
defer func() {
|
||||
if !maxIdle.Stop() {
|
||||
<-maxIdle.C
|
||||
}
|
||||
if !maxAge.Stop() {
|
||||
<-maxAge.C
|
||||
}
|
||||
if !keepalive.Stop() {
|
||||
<-keepalive.C
|
||||
}
|
||||
}()
|
||||
for {
|
||||
select {
|
||||
case <-maxIdle.C:
|
||||
t.mu.Lock()
|
||||
idle := t.idle
|
||||
if idle.IsZero() { // The connection is non-idle.
|
||||
t.mu.Unlock()
|
||||
maxIdle.Reset(t.kp.MaxConnectionIdle)
|
||||
continue
|
||||
}
|
||||
val := t.kp.MaxConnectionIdle - time.Since(idle)
|
||||
if val <= 0 {
|
||||
// The connection has been idle for a duration of keepalive.MaxConnectionIdle or more.
|
||||
// Gracefully close the connection.
|
||||
t.state = draining
|
||||
t.mu.Unlock()
|
||||
t.Drain()
|
||||
// Reseting the timer so that the clean-up doesn't deadlock.
|
||||
maxIdle.Reset(infinity)
|
||||
return
|
||||
}
|
||||
t.mu.Unlock()
|
||||
maxIdle.Reset(val)
|
||||
case <-maxAge.C:
|
||||
t.mu.Lock()
|
||||
t.state = draining
|
||||
t.mu.Unlock()
|
||||
t.Drain()
|
||||
maxAge.Reset(t.kp.MaxConnectionAgeGrace)
|
||||
select {
|
||||
case <-maxAge.C:
|
||||
// Close the connection after grace period.
|
||||
t.Close()
|
||||
// Reseting the timer so that the clean-up doesn't deadlock.
|
||||
maxAge.Reset(infinity)
|
||||
case <-t.shutdownChan:
|
||||
}
|
||||
return
|
||||
case <-keepalive.C:
|
||||
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
|
||||
pingSent = false
|
||||
keepalive.Reset(t.kp.Time)
|
||||
continue
|
||||
}
|
||||
if pingSent {
|
||||
t.Close()
|
||||
// Reseting the timer so that the clean-up doesn't deadlock.
|
||||
keepalive.Reset(infinity)
|
||||
return
|
||||
}
|
||||
pingSent = true
|
||||
t.controlBuf.put(p)
|
||||
keepalive.Reset(t.kp.Timeout)
|
||||
case <-t.shutdownChan:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// controller running in a separate goroutine takes charge of sending control
|
||||
// frames (e.g., window update, reset stream, setting, etc.) to the server.
|
||||
func (t *http2Server) controller() {
|
||||
@ -816,6 +942,9 @@ func (t *http2Server) Close() (err error) {
|
||||
func (t *http2Server) closeStream(s *Stream) {
|
||||
t.mu.Lock()
|
||||
delete(t.activeStreams, s.id)
|
||||
if len(t.activeStreams) == 0 {
|
||||
t.idle = time.Now()
|
||||
}
|
||||
if t.state == draining && len(t.activeStreams) == 0 {
|
||||
defer t.Close()
|
||||
}
|
||||
@ -845,3 +974,15 @@ func (t *http2Server) RemoteAddr() net.Addr {
|
||||
func (t *http2Server) Drain() {
|
||||
t.controlBuf.put(&goAway{})
|
||||
}
|
||||
|
||||
var rgen = rand.New(rand.NewSource(time.Now().UnixNano()))
|
||||
|
||||
func getJitter(v time.Duration) time.Duration {
|
||||
if v == infinity {
|
||||
return 0
|
||||
}
|
||||
// Generate a jitter between +/- 10% of the value.
|
||||
r := int64(v / 10)
|
||||
j := rgen.Int63n(2*r) - r
|
||||
return time.Duration(j)
|
||||
}
|
||||
|
@ -365,10 +365,11 @@ const (
|
||||
|
||||
// ServerConfig consists of all the configurations to establish a server transport.
|
||||
type ServerConfig struct {
|
||||
MaxStreams uint32
|
||||
AuthInfo credentials.AuthInfo
|
||||
InTapHandle tap.ServerInHandle
|
||||
StatsHandler stats.Handler
|
||||
MaxStreams uint32
|
||||
AuthInfo credentials.AuthInfo
|
||||
InTapHandle tap.ServerInHandle
|
||||
StatsHandler stats.Handler
|
||||
KeepaliveParams keepalive.ServerParameters
|
||||
}
|
||||
|
||||
// NewServerTransport creates a ServerTransport with conn or non-nil error
|
||||
|
@ -156,7 +156,7 @@ func (h *testStreamHandler) handleStreamInvalidHeaderField(t *testing.T, s *Stre
|
||||
}
|
||||
|
||||
// start starts server. Other goroutines should block on s.readyChan for further operations.
|
||||
func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
|
||||
func (s *server) start(t *testing.T, port int, serverConfig *ServerConfig, ht hType) {
|
||||
var err error
|
||||
if port == 0 {
|
||||
s.lis, err = net.Listen("tcp", "localhost:0")
|
||||
@ -180,10 +180,7 @@ func (s *server) start(t *testing.T, port int, maxStreams uint32, ht hType) {
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
config := &ServerConfig{
|
||||
MaxStreams: maxStreams,
|
||||
}
|
||||
transport, err := NewServerTransport("http2", conn, config)
|
||||
transport, err := NewServerTransport("http2", conn, serverConfig)
|
||||
if err != nil {
|
||||
return
|
||||
}
|
||||
@ -252,12 +249,12 @@ func (s *server) stop() {
|
||||
}
|
||||
|
||||
func setUp(t *testing.T, port int, maxStreams uint32, ht hType) (*server, ClientTransport) {
|
||||
return setUpWithOptions(t, port, maxStreams, ht, ConnectOptions{})
|
||||
return setUpWithOptions(t, port, &ServerConfig{MaxStreams: maxStreams}, ht, ConnectOptions{})
|
||||
}
|
||||
|
||||
func setUpWithOptions(t *testing.T, port int, maxStreams uint32, ht hType, copts ConnectOptions) (*server, ClientTransport) {
|
||||
func setUpWithOptions(t *testing.T, port int, serverConfig *ServerConfig, ht hType, copts ConnectOptions) (*server, ClientTransport) {
|
||||
server := &server{startedErr: make(chan error, 1)}
|
||||
go server.start(t, port, maxStreams, ht)
|
||||
go server.start(t, port, serverConfig, ht)
|
||||
server.wait(t, 2*time.Second)
|
||||
addr := "localhost:" + server.port
|
||||
var (
|
||||
@ -302,6 +299,136 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
|
||||
return tr
|
||||
}
|
||||
|
||||
// TestMaxConnectionIdle tests that a server will send GoAway to a idle client.
|
||||
// An idle client is one who doesn't make any RPC calls for a duration of
|
||||
// MaxConnectionIdle time.
|
||||
func TestMaxConnectionIdle(t *testing.T) {
|
||||
serverConfig := &ServerConfig{
|
||||
KeepaliveParams: keepalive.ServerParameters{
|
||||
MaxConnectionIdle: 2 * time.Second,
|
||||
},
|
||||
}
|
||||
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
||||
defer server.stop()
|
||||
defer client.Close()
|
||||
stream, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
|
||||
if err != nil {
|
||||
t.Fatalf("Client failed to create RPC request: %v", err)
|
||||
}
|
||||
stream.mu.Lock()
|
||||
stream.rstStream = true
|
||||
stream.mu.Unlock()
|
||||
client.CloseStream(stream, nil)
|
||||
// wait for server to see that closed stream and max-age logic to send goaway after no new RPCs are mode
|
||||
timeout := time.NewTimer(time.Second * 4)
|
||||
select {
|
||||
case <-client.GoAway():
|
||||
case <-timeout.C:
|
||||
t.Fatalf("Test timed out, expected a GoAway from the server.")
|
||||
}
|
||||
}
|
||||
|
||||
// TestMaxConenctionIdleNegative tests that a server will not send GoAway to a non-idle(busy) client.
|
||||
func TestMaxConnectionIdleNegative(t *testing.T) {
|
||||
serverConfig := &ServerConfig{
|
||||
KeepaliveParams: keepalive.ServerParameters{
|
||||
MaxConnectionIdle: 2 * time.Second,
|
||||
},
|
||||
}
|
||||
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
||||
defer server.stop()
|
||||
defer client.Close()
|
||||
_, err := client.NewStream(context.Background(), &CallHdr{Flush: true})
|
||||
if err != nil {
|
||||
t.Fatalf("Client failed to create RPC request: %v", err)
|
||||
}
|
||||
timeout := time.NewTimer(time.Second * 4)
|
||||
select {
|
||||
case <-client.GoAway():
|
||||
t.Fatalf("A non-idle client received a GoAway.")
|
||||
case <-timeout.C:
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// TestMaxConnectionAge tests that a server will send GoAway after a duration of MaxConnectionAge.
|
||||
func TestMaxConnectionAge(t *testing.T) {
|
||||
serverConfig := &ServerConfig{
|
||||
KeepaliveParams: keepalive.ServerParameters{
|
||||
MaxConnectionAge: 2 * time.Second,
|
||||
},
|
||||
}
|
||||
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
||||
defer server.stop()
|
||||
defer client.Close()
|
||||
_, err := client.NewStream(context.Background(), &CallHdr{})
|
||||
if err != nil {
|
||||
t.Fatalf("Client failed to create stream: %v", err)
|
||||
}
|
||||
// Wait for max-age logic to send GoAway.
|
||||
timeout := time.NewTimer(4 * time.Second)
|
||||
select {
|
||||
case <-client.GoAway():
|
||||
case <-timeout.C:
|
||||
t.Fatalf("Test timer out, expected a GoAway from the server.")
|
||||
}
|
||||
}
|
||||
|
||||
// TestKeepaliveServer tests that a server closes conneciton with a client that doesn't respond to keepalive pings.
|
||||
func TestKeepaliveServer(t *testing.T) {
|
||||
serverConfig := &ServerConfig{
|
||||
KeepaliveParams: keepalive.ServerParameters{
|
||||
Time: 2 * time.Second,
|
||||
Timeout: 1 * time.Second,
|
||||
},
|
||||
}
|
||||
server, c := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
||||
defer server.stop()
|
||||
defer c.Close()
|
||||
client, err := net.Dial("tcp", server.lis.Addr().String())
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to dial: %v", err)
|
||||
}
|
||||
defer client.Close()
|
||||
// Set read deadline on client conn so that it doesn't block forever in errorsome cases.
|
||||
client.SetReadDeadline(time.Now().Add(10 * time.Second))
|
||||
// Wait for keepalive logic to close the connection.
|
||||
time.Sleep(4 * time.Second)
|
||||
b := make([]byte, 24)
|
||||
for {
|
||||
_, err = client.Read(b)
|
||||
if err == nil {
|
||||
continue
|
||||
}
|
||||
if err != io.EOF {
|
||||
t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)
|
||||
}
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
// TestKeepaliveServerNegative tests that a server doesn't close connection with a client that responds to keepalive pings.
|
||||
func TestKeepaliveServerNegative(t *testing.T) {
|
||||
serverConfig := &ServerConfig{
|
||||
KeepaliveParams: keepalive.ServerParameters{
|
||||
Time: 2 * time.Second,
|
||||
Timeout: 1 * time.Second,
|
||||
},
|
||||
}
|
||||
server, client := setUpWithOptions(t, 0, serverConfig, suspended, ConnectOptions{})
|
||||
defer server.stop()
|
||||
defer client.Close()
|
||||
// Give keepalive logic some time by sleeping.
|
||||
time.Sleep(4 * time.Second)
|
||||
// Assert that client is still active.
|
||||
clientTr := client.(*http2Client)
|
||||
clientTr.mu.Lock()
|
||||
defer clientTr.mu.Unlock()
|
||||
if clientTr.state != reachable {
|
||||
t.Fatalf("Test failed: Expected server-client connection to be healthy.")
|
||||
}
|
||||
}
|
||||
|
||||
func TestKeepaliveClientClosesIdleTransport(t *testing.T) {
|
||||
done := make(chan net.Conn, 1)
|
||||
tr := setUpWithNoPingServer(t, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
|
||||
@ -362,7 +489,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
|
||||
}
|
||||
defer conn.Close()
|
||||
// Create a stream.
|
||||
_, err := tr.NewStream(context.Background(), &CallHdr{})
|
||||
_, err := tr.NewStream(context.Background(), &CallHdr{Flush: true})
|
||||
if err != nil {
|
||||
t.Fatalf("Failed to create a new stream: %v", err)
|
||||
}
|
||||
@ -378,7 +505,7 @@ func TestKeepaliveClientClosesWithActiveStreams(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestKeepaliveClientStaysHealthyWithResponsiveServer(t *testing.T) {
|
||||
s, tr := setUpWithOptions(t, 0, math.MaxUint32, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
|
||||
s, tr := setUpWithOptions(t, 0, &ServerConfig{MaxStreams: math.MaxUint32}, normal, ConnectOptions{KeepaliveParams: keepalive.ClientParameters{
|
||||
Time: 2 * time.Second, // Keepalive time = 2 sec.
|
||||
Timeout: 1 * time.Second, // Keepalive timeout = 1 sec.
|
||||
PermitWithoutStream: true, // Run keepalive even with no RPCs.
|
||||
|
Reference in New Issue
Block a user