fix server exit due to a temporary network error
This commit is contained in:
39
server.go
39
server.go
@ -89,10 +89,12 @@ type service struct {
|
|||||||
type Server struct {
|
type Server struct {
|
||||||
opts options
|
opts options
|
||||||
|
|
||||||
mu sync.Mutex // guards following
|
mu sync.Mutex // guards following
|
||||||
lis map[net.Listener]bool
|
lis map[net.Listener]bool
|
||||||
conns map[io.Closer]bool
|
conns map[io.Closer]bool
|
||||||
drain bool
|
drain bool
|
||||||
|
ctx context.Context
|
||||||
|
cancel context.CancelFunc
|
||||||
// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
|
// A CondVar to let GracefulStop() blocks until all the pending RPCs are finished
|
||||||
// and all the transport goes away.
|
// and all the transport goes away.
|
||||||
cv *sync.Cond
|
cv *sync.Cond
|
||||||
@ -203,6 +205,7 @@ func NewServer(opt ...ServerOption) *Server {
|
|||||||
m: make(map[string]*service),
|
m: make(map[string]*service),
|
||||||
}
|
}
|
||||||
s.cv = sync.NewCond(&s.mu)
|
s.cv = sync.NewCond(&s.mu)
|
||||||
|
s.ctx, s.cancel = context.WithCancel(context.Background())
|
||||||
if EnableTracing {
|
if EnableTracing {
|
||||||
_, file, line, _ := runtime.Caller(1)
|
_, file, line, _ := runtime.Caller(1)
|
||||||
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
|
s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line))
|
||||||
@ -324,7 +327,7 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti
|
|||||||
// Serve accepts incoming connections on the listener lis, creating a new
|
// Serve accepts incoming connections on the listener lis, creating a new
|
||||||
// ServerTransport and service goroutine for each. The service goroutines
|
// ServerTransport and service goroutine for each. The service goroutines
|
||||||
// read gRPC requests and then call the registered handlers to reply to them.
|
// read gRPC requests and then call the registered handlers to reply to them.
|
||||||
// Serve returns when lis.Accept fails. lis will be closed when
|
// Serve returns when lis.Accept fails with fatal errors. lis will be closed when
|
||||||
// this method returns.
|
// this method returns.
|
||||||
func (s *Server) Serve(lis net.Listener) error {
|
func (s *Server) Serve(lis net.Listener) error {
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
@ -344,14 +347,38 @@ func (s *Server) Serve(lis net.Listener) error {
|
|||||||
}
|
}
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
var tempDelay time.Duration // how long to sleep on accept failure
|
||||||
|
|
||||||
for {
|
for {
|
||||||
rawConn, err := lis.Accept()
|
rawConn, err := lis.Accept()
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
if ne, ok := err.(interface {
|
||||||
|
Temporary() bool
|
||||||
|
}); ok && ne.Temporary() {
|
||||||
|
if tempDelay == 0 {
|
||||||
|
tempDelay = 5 * time.Millisecond
|
||||||
|
} else {
|
||||||
|
tempDelay *= 2
|
||||||
|
}
|
||||||
|
if max := 1 * time.Second; tempDelay > max {
|
||||||
|
tempDelay = max
|
||||||
|
}
|
||||||
|
s.mu.Lock()
|
||||||
|
s.printf("Accept error: %v; retrying in %v", err, tempDelay)
|
||||||
|
s.mu.Unlock()
|
||||||
|
select {
|
||||||
|
case <-time.After(tempDelay):
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
}
|
||||||
|
continue
|
||||||
|
}
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
s.printf("done serving; Accept = %v", err)
|
s.printf("done serving; Accept = %v", err)
|
||||||
s.mu.Unlock()
|
s.mu.Unlock()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
tempDelay = 0
|
||||||
// Start a new goroutine to deal with rawConn
|
// Start a new goroutine to deal with rawConn
|
||||||
// so we don't stall this Accept loop goroutine.
|
// so we don't stall this Accept loop goroutine.
|
||||||
go s.handleRawConn(rawConn)
|
go s.handleRawConn(rawConn)
|
||||||
@ -812,6 +839,7 @@ func (s *Server) Stop() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
s.mu.Lock()
|
s.mu.Lock()
|
||||||
|
s.cancel()
|
||||||
if s.events != nil {
|
if s.events != nil {
|
||||||
s.events.Finish()
|
s.events.Finish()
|
||||||
s.events = nil
|
s.events = nil
|
||||||
@ -832,6 +860,7 @@ func (s *Server) GracefulStop() {
|
|||||||
lis.Close()
|
lis.Close()
|
||||||
}
|
}
|
||||||
s.lis = nil
|
s.lis = nil
|
||||||
|
s.cancel()
|
||||||
for c := range s.conns {
|
for c := range s.conns {
|
||||||
c.(transport.ServerTransport).Drain()
|
c.(transport.ServerTransport).Drain()
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user