mirror of
https://github.com/containers/podman.git
synced 2025-06-20 00:51:16 +08:00
Merge pull request #5777 from jwhonce/issues/5599
Refactor service idle support
This commit is contained in:
@ -155,9 +155,7 @@ func runREST(r *libpod.Runtime, uri string, timeout time.Duration) error {
|
||||
}
|
||||
}()
|
||||
|
||||
err = server.Serve()
|
||||
logrus.Debugf("%d/%d Active connections/Total connections\n", server.ActiveConnections, server.TotalConnections)
|
||||
return err
|
||||
return server.Serve()
|
||||
}
|
||||
|
||||
func runVarlink(r *libpod.Runtime, uri string, timeout time.Duration, c *cliconfig.ServiceValues) error {
|
||||
|
@ -1,7 +1,6 @@
|
||||
package compat
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
|
||||
@ -10,6 +9,7 @@ import (
|
||||
"github.com/containers/libpod/pkg/api/handlers"
|
||||
"github.com/containers/libpod/pkg/api/handlers/utils"
|
||||
"github.com/gorilla/schema"
|
||||
jsoniter "github.com/json-iterator/go"
|
||||
"github.com/pkg/errors"
|
||||
"github.com/sirupsen/logrus"
|
||||
)
|
||||
@ -48,14 +48,27 @@ func GetEvents(w http.ResponseWriter, r *http.Request) {
|
||||
}()
|
||||
if eventsError != nil {
|
||||
utils.InternalServerError(w, eventsError)
|
||||
close(eventChannel)
|
||||
return
|
||||
}
|
||||
|
||||
// If client disappears we need to stop listening for events
|
||||
go func(done <-chan struct{}) {
|
||||
<-done
|
||||
close(eventChannel)
|
||||
}(r.Context().Done())
|
||||
|
||||
// Headers need to be written out before turning Writer() over to json encoder
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
if flusher, ok := w.(http.Flusher); ok {
|
||||
flusher.Flush()
|
||||
}
|
||||
|
||||
json := jsoniter.ConfigCompatibleWithStandardLibrary
|
||||
coder := json.NewEncoder(w)
|
||||
coder.SetEscapeHTML(true)
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
for event := range eventChannel {
|
||||
e := handlers.EventToApiEvent(event)
|
||||
if err := coder.Encode(e); err != nil {
|
||||
|
@ -19,7 +19,7 @@ func (s *APIServer) APIHandler(h http.HandlerFunc) http.HandlerFunc {
|
||||
if err != nil {
|
||||
buf := make([]byte, 1<<20)
|
||||
n := runtime.Stack(buf, true)
|
||||
log.Warnf("Recovering from podman handler panic: %v, %s", err, buf[:n])
|
||||
log.Warnf("Recovering from API handler panic: %v, %s", err, buf[:n])
|
||||
// Try to inform client things went south... won't work if handler already started writing response body
|
||||
utils.InternalServerError(w, fmt.Errorf("%v", err))
|
||||
}
|
||||
@ -27,12 +27,7 @@ func (s *APIServer) APIHandler(h http.HandlerFunc) http.HandlerFunc {
|
||||
|
||||
// Wrapper to hide some boiler plate
|
||||
fn := func(w http.ResponseWriter, r *http.Request) {
|
||||
// Connection counting, ugh. Needed to support the sliding window for idle checking.
|
||||
s.ConnectionCh <- EnterHandler
|
||||
defer func() { s.ConnectionCh <- ExitHandler }()
|
||||
|
||||
log.Debugf("APIHandler -- Method: %s URL: %s (conn %d/%d)",
|
||||
r.Method, r.URL.String(), s.ActiveConnections, s.TotalConnections)
|
||||
log.Debugf("APIHandler -- Method: %s URL: %s", r.Method, r.URL.String())
|
||||
|
||||
if err := r.ParseForm(); err != nil {
|
||||
log.Infof("Failed Request: unable to parse form: %q", err)
|
||||
|
@ -2,11 +2,14 @@ package server
|
||||
|
||||
import (
|
||||
"context"
|
||||
"log"
|
||||
"net"
|
||||
"net/http"
|
||||
"os"
|
||||
"os/signal"
|
||||
"runtime"
|
||||
"strings"
|
||||
"sync"
|
||||
"syscall"
|
||||
"time"
|
||||
|
||||
@ -26,20 +29,13 @@ type APIServer struct {
|
||||
*libpod.Runtime // Where the real work happens
|
||||
net.Listener // mux for routing HTTP API calls to libpod routines
|
||||
context.CancelFunc // Stop APIServer
|
||||
*time.Timer // Hold timer for sliding window
|
||||
time.Duration // Duration of client access sliding window
|
||||
ActiveConnections uint64 // Number of handlers holding a connection
|
||||
TotalConnections uint64 // Number of connections handled
|
||||
ConnectionCh chan int // Channel for signalling handler enter/exit
|
||||
idleTracker *IdleTracker // Track connections to support idle shutdown
|
||||
}
|
||||
|
||||
// Number of seconds to wait for next request, if exceeded shutdown server
|
||||
const (
|
||||
DefaultServiceDuration = 300 * time.Second
|
||||
UnlimitedServiceDuration = 0 * time.Second
|
||||
EnterHandler = 1
|
||||
ExitHandler = -1
|
||||
NOOPHandler = 0
|
||||
)
|
||||
|
||||
// NewServer will create and configure a new API server with all defaults
|
||||
@ -70,17 +66,20 @@ func newServer(runtime *libpod.Runtime, duration time.Duration, listener *net.Li
|
||||
}
|
||||
|
||||
router := mux.NewRouter().UseEncodedPath()
|
||||
idle := NewIdleTracker(duration)
|
||||
|
||||
server := APIServer{
|
||||
Server: http.Server{
|
||||
Handler: router,
|
||||
ReadHeaderTimeout: 20 * time.Second,
|
||||
IdleTimeout: duration,
|
||||
ConnState: idle.ConnState,
|
||||
ErrorLog: log.New(logrus.StandardLogger().Out, "", 0),
|
||||
},
|
||||
Decoder: handlers.NewAPIDecoder(),
|
||||
Runtime: runtime,
|
||||
idleTracker: idle,
|
||||
Listener: *listener,
|
||||
Duration: duration,
|
||||
ConnectionCh: make(chan int),
|
||||
Runtime: runtime,
|
||||
}
|
||||
|
||||
router.NotFoundHandler = http.HandlerFunc(
|
||||
@ -120,11 +119,11 @@ func newServer(runtime *libpod.Runtime, duration time.Duration, listener *net.Li
|
||||
router.Walk(func(route *mux.Route, r *mux.Router, ancestors []*mux.Route) error { // nolint
|
||||
path, err := route.GetPathTemplate()
|
||||
if err != nil {
|
||||
path = ""
|
||||
path = "<N/A>"
|
||||
}
|
||||
methods, err := route.GetMethods()
|
||||
if err != nil {
|
||||
methods = []string{}
|
||||
methods = []string{"<N/A>"}
|
||||
}
|
||||
logrus.Debugf("Methods: %s Path: %s", strings.Join(methods, ", "), path)
|
||||
return nil
|
||||
@ -136,24 +135,20 @@ func newServer(runtime *libpod.Runtime, duration time.Duration, listener *net.Li
|
||||
|
||||
// Serve starts responding to HTTP requests
|
||||
func (s *APIServer) Serve() error {
|
||||
// This is initialized here as Timer is not needed until Serve'ing
|
||||
if s.Duration > 0 {
|
||||
s.Timer = time.AfterFunc(s.Duration, func() {
|
||||
s.ConnectionCh <- NOOPHandler
|
||||
})
|
||||
go s.ReadChannelWithTimeout()
|
||||
} else {
|
||||
go s.ReadChannelNoTimeout()
|
||||
}
|
||||
|
||||
sigChan := make(chan os.Signal, 1)
|
||||
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
||||
errChan := make(chan error, 1)
|
||||
|
||||
go func() {
|
||||
<-s.idleTracker.Done()
|
||||
logrus.Debugf("API Server idle for %v", s.idleTracker.Duration)
|
||||
_ = s.Shutdown()
|
||||
}()
|
||||
|
||||
go func() {
|
||||
err := s.Server.Serve(s.Listener)
|
||||
if err != nil && err != http.ErrServerClosed {
|
||||
errChan <- errors.Wrap(err, "Failed to start APIServer")
|
||||
errChan <- errors.Wrap(err, "failed to start API server")
|
||||
return
|
||||
}
|
||||
errChan <- nil
|
||||
@ -169,72 +164,30 @@ func (s *APIServer) Serve() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *APIServer) ReadChannelWithTimeout() {
|
||||
// stalker to count the connections. Should the timer expire it will shutdown the service.
|
||||
for delta := range s.ConnectionCh {
|
||||
switch delta {
|
||||
case EnterHandler:
|
||||
s.Timer.Stop()
|
||||
s.ActiveConnections += 1
|
||||
s.TotalConnections += 1
|
||||
case ExitHandler:
|
||||
s.Timer.Stop()
|
||||
s.ActiveConnections -= 1
|
||||
if s.ActiveConnections == 0 {
|
||||
// Server will be shutdown iff the timer expires before being reset or stopped
|
||||
s.Timer = time.AfterFunc(s.Duration, func() {
|
||||
if err := s.Shutdown(); err != nil {
|
||||
logrus.Errorf("Failed to shutdown APIServer: %v", err)
|
||||
os.Exit(1)
|
||||
}
|
||||
})
|
||||
} else {
|
||||
s.Timer.Reset(s.Duration)
|
||||
}
|
||||
case NOOPHandler:
|
||||
// push the check out another duration...
|
||||
s.Timer.Reset(s.Duration)
|
||||
default:
|
||||
logrus.Warnf("ConnectionCh received unsupported input %d", delta)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *APIServer) ReadChannelNoTimeout() {
|
||||
// stalker to count the connections.
|
||||
for delta := range s.ConnectionCh {
|
||||
switch delta {
|
||||
case EnterHandler:
|
||||
s.ActiveConnections += 1
|
||||
s.TotalConnections += 1
|
||||
case ExitHandler:
|
||||
s.ActiveConnections -= 1
|
||||
case NOOPHandler:
|
||||
default:
|
||||
logrus.Warnf("ConnectionCh received unsupported input %d", delta)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Shutdown is a clean shutdown waiting on existing clients
|
||||
func (s *APIServer) Shutdown() error {
|
||||
if logrus.IsLevelEnabled(logrus.DebugLevel) {
|
||||
_, file, line, _ := runtime.Caller(1)
|
||||
logrus.Debugf("APIServer.Shutdown by %s:%d, %d/%d connection(s)",
|
||||
file, line, s.idleTracker.ActiveConnections(), s.idleTracker.TotalConnections())
|
||||
}
|
||||
|
||||
// Duration == 0 flags no auto-shutdown of the server
|
||||
if s.Duration == 0 {
|
||||
if s.idleTracker.Duration == 0 {
|
||||
logrus.Debug("APIServer.Shutdown ignored as Duration == 0")
|
||||
return nil
|
||||
}
|
||||
logrus.Debugf("APIServer.Shutdown called %v, conn %d/%d", time.Now(), s.ActiveConnections, s.TotalConnections)
|
||||
|
||||
// Gracefully shutdown server
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.Duration)
|
||||
// Gracefully shutdown server, duration of wait same as idle window
|
||||
ctx, cancel := context.WithTimeout(context.Background(), s.idleTracker.Duration)
|
||||
defer cancel()
|
||||
|
||||
go func() {
|
||||
err := s.Server.Shutdown(ctx)
|
||||
if err != nil && err != context.Canceled && err != http.ErrServerClosed {
|
||||
logrus.Errorf("Failed to cleanly shutdown APIServer: %s", err.Error())
|
||||
}
|
||||
}()
|
||||
<-ctx.Done()
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -242,3 +195,55 @@ func (s *APIServer) Shutdown() error {
|
||||
func (s *APIServer) Close() error {
|
||||
return s.Server.Close()
|
||||
}
|
||||
|
||||
type IdleTracker struct {
|
||||
active map[net.Conn]struct{}
|
||||
total int
|
||||
mux sync.Mutex
|
||||
timer *time.Timer
|
||||
Duration time.Duration
|
||||
}
|
||||
|
||||
func NewIdleTracker(idle time.Duration) *IdleTracker {
|
||||
return &IdleTracker{
|
||||
active: make(map[net.Conn]struct{}),
|
||||
Duration: idle,
|
||||
timer: time.NewTimer(idle),
|
||||
}
|
||||
}
|
||||
|
||||
func (t *IdleTracker) ConnState(conn net.Conn, state http.ConnState) {
|
||||
t.mux.Lock()
|
||||
defer t.mux.Unlock()
|
||||
|
||||
oldActive := len(t.active)
|
||||
logrus.Debugf("IdleTracker %p:%v %d/%d connection(s)", conn, state, t.ActiveConnections(), t.TotalConnections())
|
||||
switch state {
|
||||
case http.StateNew, http.StateActive, http.StateHijacked:
|
||||
t.active[conn] = struct{}{}
|
||||
// stop the timer if we transitioned from idle
|
||||
if oldActive == 0 {
|
||||
t.timer.Stop()
|
||||
}
|
||||
t.total += 1
|
||||
case http.StateIdle, http.StateClosed:
|
||||
delete(t.active, conn)
|
||||
// Restart the timer if we've become idle
|
||||
if oldActive > 0 && len(t.active) == 0 {
|
||||
t.timer.Stop()
|
||||
t.timer.Reset(t.Duration)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (t *IdleTracker) ActiveConnections() int {
|
||||
return len(t.active)
|
||||
}
|
||||
|
||||
func (t *IdleTracker) TotalConnections() int {
|
||||
return t.total
|
||||
}
|
||||
|
||||
func (t *IdleTracker) Done() <-chan time.Time {
|
||||
return t.timer.C
|
||||
}
|
||||
|
@ -51,7 +51,6 @@ func (ic *ContainerEngine) RestService(_ context.Context, opts entities.ServiceO
|
||||
}()
|
||||
|
||||
err = server.Serve()
|
||||
logrus.Debugf("%d/%d Active connections/Total connections\n", server.ActiveConnections, server.TotalConnections)
|
||||
_ = listener.Close()
|
||||
return err
|
||||
}
|
||||
|
Reference in New Issue
Block a user