mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-25 15:08:45 +08:00
Merge pull request #5322 from ipfs/fix/shutdown-error
wait for all connections to close before exiting on shutdown.
This commit is contained in:
@ -138,7 +138,7 @@ var rootSubcommands = map[string]*cmds.Command{
|
||||
"update": lgc.NewCommand(ExternalBinary()),
|
||||
"urlstore": urlStoreCmd,
|
||||
"version": lgc.NewCommand(VersionCmd),
|
||||
"shutdown": lgc.NewCommand(daemonShutdownCmd),
|
||||
"shutdown": daemonShutdownCmd,
|
||||
}
|
||||
|
||||
// RootRO is the readonly version of Root
|
||||
|
@ -3,31 +3,28 @@ package commands
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
cmds "gx/ipfs/QmNueRyPRQiV7PUEpnP4GgGLuK1rKQLaRW7sfPvUetYig1/go-ipfs-cmds"
|
||||
"gx/ipfs/QmdE4gMduCKCGAcczM2F5ioYDfdeKuPix138wrES1YSr7f/go-ipfs-cmdkit"
|
||||
|
||||
cmds "github.com/ipfs/go-ipfs/commands"
|
||||
)
|
||||
|
||||
var daemonShutdownCmd = &cmds.Command{
|
||||
Helptext: cmdkit.HelpText{
|
||||
Tagline: "Shut down the ipfs daemon",
|
||||
},
|
||||
Run: func(req cmds.Request, res cmds.Response) {
|
||||
nd, err := req.InvocContext().GetNode()
|
||||
Run: func(req *cmds.Request, re cmds.ResponseEmitter, env cmds.Environment) {
|
||||
nd, err := GetNode(env)
|
||||
if err != nil {
|
||||
res.SetError(err, cmdkit.ErrNormal)
|
||||
re.SetError(err, cmdkit.ErrNormal)
|
||||
return
|
||||
}
|
||||
|
||||
if nd.LocalMode() {
|
||||
res.SetError(fmt.Errorf("daemon not running"), cmdkit.ErrClient)
|
||||
re.SetError(fmt.Errorf("daemon not running"), cmdkit.ErrClient)
|
||||
return
|
||||
}
|
||||
|
||||
if err := nd.Process().Close(); err != nil {
|
||||
log.Error("error while shutting down ipfs daemon:", err)
|
||||
}
|
||||
|
||||
res.SetOutput(nil)
|
||||
},
|
||||
}
|
||||
|
@ -5,6 +5,7 @@ high-level HTTP interfaces to IPFS.
|
||||
package corehttp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
@ -12,6 +13,7 @@ import (
|
||||
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
"gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
|
||||
periodicproc "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/periodic"
|
||||
manet "gx/ipfs/QmV6FjemM1K8oXjrvuq3wuVWWoU2TLDPmNnKrxHzY3v6Ai/go-multiaddr-net"
|
||||
ma "gx/ipfs/QmYmsdtJ3HsodkePE3eU3TsCaP2YvPZJ4LoXnNkDE5Tpt7/go-multiaddr"
|
||||
logging "gx/ipfs/QmcVVHfdyv15GVPk7NrxdWjh2hLVccXnoD8j2tyQShiXJb/go-log"
|
||||
@ -19,6 +21,10 @@ import (
|
||||
|
||||
var log = logging.Logger("core/server")
|
||||
|
||||
// shutdownTimeout is the timeout after which we'll stop waiting for hung
|
||||
// commands to return on shutdown.
|
||||
const shutdownTimeout = 30 * time.Second
|
||||
|
||||
// ServeOption registers any HTTP handlers it provides on the given mux.
|
||||
// It returns the mux to expose to future options, which may be a new mux if it
|
||||
// is interested in mediating requests to future options, or the same mux
|
||||
@ -65,6 +71,9 @@ func ListenAndServe(n *core.IpfsNode, listeningMultiAddr string, options ...Serv
|
||||
}
|
||||
|
||||
func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error {
|
||||
// make sure we close this no matter what.
|
||||
defer lis.Close()
|
||||
|
||||
handler, err := makeHandler(node, lis, options...)
|
||||
if err != nil {
|
||||
return err
|
||||
@ -75,43 +84,44 @@ func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error
|
||||
return err
|
||||
}
|
||||
|
||||
// if the server exits beforehand
|
||||
var serverError error
|
||||
serverExited := make(chan struct{})
|
||||
|
||||
select {
|
||||
case <-node.Process().Closing():
|
||||
return fmt.Errorf("failed to start server, process closing")
|
||||
default:
|
||||
}
|
||||
|
||||
node.Process().Go(func(p goprocess.Process) {
|
||||
serverError = http.Serve(lis, handler)
|
||||
close(serverExited)
|
||||
server := &http.Server{
|
||||
Handler: handler,
|
||||
}
|
||||
|
||||
var serverError error
|
||||
serverProc := node.Process().Go(func(p goprocess.Process) {
|
||||
serverError = server.Serve(lis)
|
||||
})
|
||||
|
||||
// wait for server to exit.
|
||||
select {
|
||||
case <-serverExited:
|
||||
|
||||
case <-serverProc.Closed():
|
||||
// if node being closed before server exits, close server
|
||||
case <-node.Process().Closing():
|
||||
log.Infof("server at %s terminating...", addr)
|
||||
|
||||
lis.Close()
|
||||
warnProc := periodicproc.Tick(5*time.Second, func(_ goprocess.Process) {
|
||||
log.Infof("waiting for server at %s to terminate...", addr)
|
||||
})
|
||||
|
||||
outer:
|
||||
for {
|
||||
// wait until server exits
|
||||
select {
|
||||
case <-serverExited:
|
||||
// if the server exited as we are closing, we really dont care about errors
|
||||
serverError = nil
|
||||
break outer
|
||||
case <-time.After(5 * time.Second):
|
||||
log.Infof("waiting for server at %s to terminate...", addr)
|
||||
}
|
||||
}
|
||||
// This timeout shouldn't be necessary if all of our commands
|
||||
// are obeying their contexts but we should have *some* timeout.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), shutdownTimeout)
|
||||
defer cancel()
|
||||
err := server.Shutdown(ctx)
|
||||
|
||||
// Should have already closed but we still need to wait for it
|
||||
// to set the error.
|
||||
<-serverProc.Closed()
|
||||
serverError = err
|
||||
|
||||
warnProc.Close()
|
||||
}
|
||||
|
||||
log.Infof("server at %s terminated", addr)
|
||||
|
@ -13,7 +13,7 @@ test_init_ipfs
|
||||
test_launch_ipfs_daemon
|
||||
|
||||
test_expect_success "shutdown succeeds" '
|
||||
ipfs shutdown || true # bug: https://github.com/ipfs/go-ipfs/issues/4055
|
||||
ipfs shutdown
|
||||
'
|
||||
|
||||
test_expect_success "daemon no longer running" '
|
||||
@ -27,7 +27,7 @@ test_expect_success "daemon no longer running" '
|
||||
test_launch_ipfs_daemon --offline
|
||||
|
||||
test_expect_success "shutdown succeeds" '
|
||||
ipfs shutdown || true # bug: https://github.com/ipfs/go-ipfs/issues/4055
|
||||
ipfs shutdown
|
||||
'
|
||||
|
||||
test_expect_success "daemon no longer running" '
|
||||
|
Reference in New Issue
Block a user