Rest of the implementation

This commit is contained in:
Mahak Mukhi
2017-03-10 17:30:46 -08:00
parent e5a289ced7
commit 82fb96a360
5 changed files with 130 additions and 20 deletions

View File

@ -17,11 +17,18 @@ type ClientParameters struct {
PermitWithoutStream bool
}
// TODO(mmukhi) : documentation
// ServerParameters is used to set keepalive and max-age parameters on the server-side.
type ServerParameters struct {
MaxConnectionIdle time.Duration
MaxConnectionAge time.Duration
// MaxConnectionIdle is a duration for the amount of time after which an idle connection would be closed by sending a GoAway.
// Idleness duration is defined since the most recent time the number of outstanding RPCs became zero or the connection establishment.
MaxConnectionIdle time.Duration
// MaxConnectionAge is a duration for the maximum amount of time a connection may exist before it will be closed by sending a GoAway
MaxConnectionAge time.Duration
//MaxConnectinoAgeGrace is an additive period after MaxConnectionAge after which the connection will be forcibly closed.
MaxConnectionAgeGrace time.Duration
Time time.Duration
Timeout time.Duration
// After a duration of this time if the server doesn't see any activity it pings the client to see if the transport is still alive.
Time time.Duration
// After having pinged for keepalive check, the server waits for a duration of Timeout and if no activity is seen even after that
// the connection is closed.
Timeout time.Duration
}

View File

@ -126,7 +126,7 @@ var defaultMaxMsgSize = 1024 * 1024 * 4 // use 4MB as the default message size l
// A ServerOption sets options.
type ServerOption func(*options)
// TODO(mmukhi) : Documentation
// KeepaliveParams returns a ServerOption that sets keepalive and max-age parameters for the server.
func KeepaliveParams(kp keepalive.ServerParameters) ServerOption {
return func(o *options) {
o.keepaliveParams = kp
@ -478,7 +478,7 @@ func (s *Server) serveHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo)
AuthInfo: authInfo,
InTapHandle: s.opts.inTapHandle,
StatsHandler: s.opts.statsHandler,
keepaliveParams: s.opts.keepaliveParams,
KeepaliveParams: s.opts.keepaliveParams,
}
st, err := transport.NewServerTransport("http2", c, config)
if err != nil {

View File

@ -38,9 +38,11 @@ import (
"errors"
"io"
"math"
"math/rand"
"net"
"strconv"
"sync"
"sync/atomic"
"time"
"golang.org/x/net/context"
@ -92,7 +94,10 @@ type http2Server struct {
stats stats.Handler
// TODO(mmukhi): Documentation
// Flag to keep track of reading activity on transport.
// 1 is true and 0 is false.
activity uint32 // Accessed atomically.
// Keepalive and max-age parameters for the server.
kp keepalive.ServerParameters
mu sync.Mutex // guard the following
@ -134,13 +139,15 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err
return nil, connectionErrorf(true, err, "transport: %v", err)
}
}
kp := config.keepaliveParams
kp := config.KeepaliveParams
if kp.MaxConnectionIdle == 0 {
kp.MaxConnectionIdle = defaultMaxConnectionIdle
}
if kp.MaxConnectionAge == 0 {
kp.MaxConnectionAge = defaultMaxConnectionAge
}
// Add a jitter to MaxConnectionAge.
kp.MaxConnectionAge += getJitter(kp.MaxConnectionAge)
if kp.MaxConnectionAgeGrace == 0 {
kp.MaxConnectionAgeGrace = defaultMaxConnectionAgeGrace
}
@ -323,6 +330,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
t.Close()
return
}
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
sf, ok := frame.(*http2.SettingsFrame)
if !ok {
grpclog.Printf("transport: http2Server.HandleStreams saw invalid preface type %T from client", frame)
@ -333,6 +341,7 @@ func (t *http2Server) HandleStreams(handle func(*Stream), traceCtx func(context.
for {
frame, err := t.framer.readFrame()
atomic.CompareAndSwapUint32(&t.activity, 0, 1)
if err != nil {
if se, ok := err.(http2.StreamError); ok {
t.mu.Lock()
@ -763,9 +772,15 @@ func (t *http2Server) applySettings(ss []http2.Setting) {
}
}
// TODO(mmukhi): Documentation
// keepalive running in a separate goroutine does the following:
// 1. Gracefully closes an idle connection after a duration of keepalive.MaxConnectionIdle.
// 2. Gracefully closes any connection after a duration of keepalive.MaxConnectionAge.
// 3. Forcibly closes a connection after an additive period of keepalive.MaxConnectionAgeGrace over keepalive.MaxConnectionAge.
// 4. Makes sure a connection is alive by sending pings with a frequency of keepalive.Time and closes a non-resposive connection
// after an additional duration of keepalive.Timeout.
func (t *http2Server) keepalive() {
//p := &ping{data: [8]byte{}}
p := &ping{data: [8]byte{}}
var pingSent bool
maxIdle := time.NewTimer(t.kp.MaxConnectionIdle)
maxAge := time.NewTimer(t.kp.MaxConnectionAge)
keepalive := time.NewTimer(t.kp.Time)
@ -809,16 +824,39 @@ func (t *http2Server) keepalive() {
oidle = idle
maxIdle.Reset(t.kp.MaxConnectionIdle - time.Since(idle))
case <-maxAge.C:
// Reseting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
t.mu.Lock()
t.state = draining
t.mu.Unlock()
t.Drain()
maxAge.Reset(t.kp.MaxConnectionAgeGrace)
select {
case <-maxAge.C:
// Close the connection after grace period.
t.Close()
// Reseting the timer so that the clean-up doesn't deadlock.
maxAge.Reset(infinity)
case <-t.shutdownChan:
}
return
case <-keepalive.C:
// Reseting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
if atomic.CompareAndSwapUint32(&t.activity, 1, 0) {
pingSent = false
keepalive.Reset(t.kp.Time)
continue
}
if pingSent {
t.Close()
// Reseting the timer so that the clean-up doesn't deadlock.
keepalive.Reset(infinity)
return
}
pingSent = true
t.controlBuf.put(p)
keepalive.Reset(t.kp.Timeout)
case <-t.shutdownChan:
return
}
}
}
// controller running in a separate goroutine takes charge of sending control
@ -934,3 +972,14 @@ func (t *http2Server) RemoteAddr() net.Addr {
func (t *http2Server) Drain() {
t.controlBuf.put(&goAway{})
}
func getJitter(v time.Duration) time.Duration {
if v == infinity {
return 0
}
rand.Seed(time.Now().UnixNano())
// Generate a jitter between +/- 10% of the value.
r := int64(v / 10)
j := rand.Int63n(2*r) - r
return time.Duration(j)
}

View File

@ -366,7 +366,7 @@ type ServerConfig struct {
AuthInfo credentials.AuthInfo
InTapHandle tap.ServerInHandle
StatsHandler stats.Handler
keepaliveParams keepalive.ServerParameters
KeepaliveParams keepalive.ServerParameters
}
// NewServerTransport creates a ServerTransport with conn or non-nil error

View File

@ -303,7 +303,7 @@ func setUpWithNoPingServer(t *testing.T, copts ConnectOptions, done chan net.Con
// MaxConnectionIdle time.
func TestMaxConnectionIdle(t *testing.T) {
serverConfig := &ServerConfig{
keepaliveParams: keepalive.ServerParameters{
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionIdle: 2 * time.Second,
},
}
@ -318,12 +318,66 @@ func TestMaxConnectionIdle(t *testing.T) {
stream.rstStream = true
stream.mu.Unlock()
client.CloseStream(stream, nil)
// wait for server to see that closed stream and max age to send goaway after no new RPCs are mode
// wait for server to see that closed stream and max-age logic to send goaway after no new RPCs are mode
timeout := time.NewTimer(time.Second * 4)
select {
case <-client.GoAway():
case <-timeout.C:
t.Fatalf("Test timed out, expected a GoAway from server")
t.Fatalf("Test timed out, expected a GoAway from the server.")
}
}
// TestMaxConnectinoAge tests that a server will send GoAway after a duration of MaxConnectionAge.
func TestMaxConnectionAge(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
MaxConnectionAge: 2 * time.Second,
},
}
server, client := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{})
defer server.stop()
defer client.Close()
_, err := client.NewStream(context.Background(), &CallHdr{Host: "localhost", Method: "foo.small"})
if err != nil {
t.Fatalf("Client failed to create stream: %v", err)
}
// Wait for max-age logic to send GoAway.
timeout := time.NewTimer(4 * time.Second)
select {
case <-client.GoAway():
case <-timeout.C:
t.Fatalf("Test timer out, expected a GoAway from the server.")
}
}
// TestKeepaliveServer tests that a server closes a peer that doesn't respont to keepalive pings.
func TestKeepaliveServer(t *testing.T) {
serverConfig := &ServerConfig{
KeepaliveParams: keepalive.ServerParameters{
Time: 2 * time.Second,
Timeout: 1 * time.Second,
},
}
server, c := setUpWithOptions(t, 0, serverConfig, normal, ConnectOptions{})
defer server.stop()
defer c.Close()
client, err := net.Dial("tcp", server.lis.Addr().String())
if err != nil {
t.Fatalf("Failed to dial: %v", err)
}
defer client.Close()
// Wait for keepalive logic to close the connection.
time.Sleep(4 * time.Second)
b := make([]byte, 24)
for {
_, err = client.Read(b)
if err == nil {
continue
}
if err != io.EOF {
t.Fatalf("client.Read(_) = _,%v, want io.EOF", err)
}
break
}
}