mirror of
https://github.com/ipfs/kubo.git
synced 2025-08-06 11:31:54 +08:00
467 lines
12 KiB
Go
467 lines
12 KiB
Go
// cmd/ipfs implements the primary CLI binary for ipfs
|
|
package main
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"math/rand"
|
|
"os"
|
|
"os/signal"
|
|
"path/filepath"
|
|
"runtime/pprof"
|
|
"strings"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
oldcmds "github.com/ipfs/go-ipfs/commands"
|
|
core "github.com/ipfs/go-ipfs/core"
|
|
coreCmds "github.com/ipfs/go-ipfs/core/commands"
|
|
corehttp "github.com/ipfs/go-ipfs/core/corehttp"
|
|
loader "github.com/ipfs/go-ipfs/plugin/loader"
|
|
repo "github.com/ipfs/go-ipfs/repo"
|
|
config "github.com/ipfs/go-ipfs/repo/config"
|
|
fsrepo "github.com/ipfs/go-ipfs/repo/fsrepo"
|
|
|
|
u "gx/ipfs/QmNiJuT8Ja3hMVpBHXv3Q6dwmperaQ6JjLtpMQgMCD7xvx/go-ipfs-util"
|
|
manet "gx/ipfs/QmRK2LxanhK2gZq6k6R7vk5ZoYZk8ULSSTB7FzDsMUX6CB/go-multiaddr-net"
|
|
logging "gx/ipfs/QmRb5jh8z2E8hMGN2tkvs1yHynUanqnZ3UeKwgN1i9P1F8/go-log"
|
|
ma "gx/ipfs/QmWWQ2Txc2c6tqjsBpzg5Ar652cHPGNsQQp2SejkNmkUMb/go-multiaddr"
|
|
osh "gx/ipfs/QmXuBJ7DR6k3rmUEKtvVMhwjmXDuJgXXPUt4LQXKBMsU93/go-os-helper"
|
|
loggables "gx/ipfs/Qmf9JgVLz46pxPXwG2eWSJpkqVCcjD4rp7zCRi2KP6GTNB/go-libp2p-loggables"
|
|
"gx/ipfs/QmfAkMSt9Fwzk48QDJecPcwCUjnf2uG7MLnmCGTp4C6ouL/go-ipfs-cmds"
|
|
"gx/ipfs/QmfAkMSt9Fwzk48QDJecPcwCUjnf2uG7MLnmCGTp4C6ouL/go-ipfs-cmds/cli"
|
|
"gx/ipfs/QmfAkMSt9Fwzk48QDJecPcwCUjnf2uG7MLnmCGTp4C6ouL/go-ipfs-cmds/http"
|
|
)
|
|
|
|
// log is the command logger
|
|
var log = logging.Logger("cmd/ipfs")
|
|
|
|
var errRequestCanceled = errors.New("request canceled")
|
|
|
|
const (
|
|
EnvEnableProfiling = "IPFS_PROF"
|
|
cpuProfile = "ipfs.cpuprof"
|
|
heapProfile = "ipfs.memprof"
|
|
)
|
|
|
|
// main roadmap:
|
|
// - parse the commandline to get a cmdInvocation
|
|
// - if user requests help, print it and exit.
|
|
// - run the command invocation
|
|
// - output the response
|
|
// - if anything fails, print error, maybe with help
|
|
func main() {
|
|
os.Exit(mainRet())
|
|
}
|
|
|
|
func mainRet() int {
|
|
rand.Seed(time.Now().UnixNano())
|
|
ctx := logging.ContextWithLoggable(context.Background(), loggables.Uuid("session"))
|
|
var err error
|
|
|
|
// we'll call this local helper to output errors.
|
|
// this is so we control how to print errors in one place.
|
|
printErr := func(err error) {
|
|
fmt.Fprintf(os.Stderr, "Error: %s\n", err.Error())
|
|
}
|
|
|
|
stopFunc, err := profileIfEnabled()
|
|
if err != nil {
|
|
printErr(err)
|
|
return 1
|
|
}
|
|
defer stopFunc() // to be executed as late as possible
|
|
|
|
intrh, ctx := setupInterruptHandler(ctx)
|
|
defer intrh.Close()
|
|
|
|
// Handle `ipfs help'
|
|
if len(os.Args) == 2 {
|
|
if os.Args[1] == "help" {
|
|
os.Args[1] = "-h"
|
|
} else if os.Args[1] == "--version" {
|
|
os.Args[1] = "version"
|
|
}
|
|
}
|
|
|
|
// output depends on excecutable name passed in os.Args
|
|
// so we need to make sure it's stable
|
|
os.Args[0] = "ipfs"
|
|
|
|
buildEnv := func(ctx context.Context, req *cmds.Request) (cmds.Environment, error) {
|
|
repoPath, err := getRepoPath(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
log.Debugf("config path is %s", repoPath)
|
|
|
|
// this sets up the function that will initialize the node
|
|
// this is so that we can construct the node lazily.
|
|
return &oldcmds.Context{
|
|
ConfigRoot: repoPath,
|
|
LoadConfig: loadConfig,
|
|
ReqLog: &oldcmds.ReqLog{},
|
|
ConstructNode: func() (n *core.IpfsNode, err error) {
|
|
if req == nil {
|
|
return nil, errors.New("constructing node without a request")
|
|
}
|
|
|
|
r, err := fsrepo.Open(repoPath)
|
|
if err != nil { // repo is owned by the node
|
|
return nil, err
|
|
}
|
|
|
|
// ok everything is good. set it on the invocation (for ownership)
|
|
// and return it.
|
|
n, err = core.NewNode(ctx, &core.BuildCfg{
|
|
Repo: r,
|
|
})
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
n.SetLocal(true)
|
|
return n, nil
|
|
},
|
|
}, nil
|
|
}
|
|
|
|
err = cli.Run(ctx, Root, os.Args, os.Stdin, os.Stdout, os.Stderr, buildEnv, makeExecutor)
|
|
if err != nil {
|
|
return 1
|
|
}
|
|
|
|
// everything went better than expected :)
|
|
return 0
|
|
}
|
|
|
|
func checkDebug(req *cmds.Request) {
|
|
// check if user wants to debug. option OR env var.
|
|
debug, _ := req.Options["debug"].(bool)
|
|
if debug || os.Getenv("IPFS_LOGGING") == "debug" {
|
|
u.Debug = true
|
|
logging.SetDebugLogging()
|
|
}
|
|
if u.GetenvBool("DEBUG") {
|
|
u.Debug = true
|
|
}
|
|
}
|
|
|
|
func makeExecutor(req *cmds.Request, env interface{}) (cmds.Executor, error) {
|
|
checkDebug(req)
|
|
details, err := commandDetails(req.Path, Root)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client, err := commandShouldRunOnDaemon(*details, req, env.(*oldcmds.Context))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var exctr cmds.Executor
|
|
if client != nil && !req.Command.External {
|
|
exctr = client.(cmds.Executor)
|
|
} else {
|
|
cctx := env.(*oldcmds.Context)
|
|
pluginpath := filepath.Join(cctx.ConfigRoot, "plugins")
|
|
|
|
// check if repo is accessible before loading plugins
|
|
ok, err := checkPermissions(cctx.ConfigRoot)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if ok {
|
|
if _, err := loader.LoadPlugins(pluginpath); err != nil {
|
|
log.Warning("error loading plugins: ", err)
|
|
}
|
|
}
|
|
|
|
exctr = cmds.NewExecutor(req.Root)
|
|
}
|
|
|
|
return exctr, nil
|
|
}
|
|
|
|
func checkPermissions(path string) (bool, error) {
|
|
_, err := os.Open(path)
|
|
if os.IsNotExist(err) {
|
|
// repo does not exist yet - don't load plugins, but also don't fail
|
|
return false, nil
|
|
}
|
|
if os.IsPermission(err) {
|
|
// repo is not accessible. error out.
|
|
return false, fmt.Errorf("error opening repository at %s: permission denied", path)
|
|
}
|
|
|
|
return true, nil
|
|
}
|
|
|
|
// commandDetails returns a command's details for the command given by |path|
|
|
// within the |root| command tree.
|
|
//
|
|
// Returns an error if the command is not found in the Command tree.
|
|
func commandDetails(path []string, root *cmds.Command) (*cmdDetails, error) {
|
|
var details cmdDetails
|
|
// find the last command in path that has a cmdDetailsMap entry
|
|
cmd := root
|
|
for _, cmp := range path {
|
|
cmd = cmd.Subcommands[cmp]
|
|
if cmd == nil {
|
|
return nil, fmt.Errorf("subcommand %s should be in root", cmp)
|
|
}
|
|
|
|
if cmdDetails, found := cmdDetailsMap[strings.Join(path, "/")]; found {
|
|
details = cmdDetails
|
|
}
|
|
}
|
|
return &details, nil
|
|
}
|
|
|
|
// commandShouldRunOnDaemon determines, from commmand details, whether a
|
|
// command ought to be executed on an ipfs daemon.
|
|
//
|
|
// It returns a client if the command should be executed on a daemon and nil if
|
|
// it should be executed on a client. It returns an error if the command must
|
|
// NOT be executed on either.
|
|
func commandShouldRunOnDaemon(details cmdDetails, req *cmds.Request, cctx *oldcmds.Context) (http.Client, error) {
|
|
path := req.Path
|
|
// root command.
|
|
if len(path) < 1 {
|
|
return nil, nil
|
|
}
|
|
|
|
if details.cannotRunOnClient && details.cannotRunOnDaemon {
|
|
return nil, fmt.Errorf("command disabled: %s", path[0])
|
|
}
|
|
|
|
if details.doesNotUseRepo && details.canRunOnClient() {
|
|
return nil, nil
|
|
}
|
|
|
|
// at this point need to know whether api is running. we defer
|
|
// to this point so that we dont check unnecessarily
|
|
|
|
// did user specify an api to use for this command?
|
|
apiAddrStr, _ := req.Options[coreCmds.ApiOption].(string)
|
|
|
|
client, err := getApiClient(cctx.ConfigRoot, apiAddrStr)
|
|
if err == repo.ErrApiNotRunning {
|
|
if apiAddrStr != "" && req.Command != daemonCmd {
|
|
// if user SPECIFIED an api, and this cmd is not daemon
|
|
// we MUST use it. so error out.
|
|
return nil, err
|
|
}
|
|
|
|
// ok for api not to be running
|
|
} else if err != nil { // some other api error
|
|
return nil, err
|
|
}
|
|
|
|
if client != nil {
|
|
if details.cannotRunOnDaemon {
|
|
// check if daemon locked. legacy error text, for now.
|
|
log.Debugf("Command cannot run on daemon. Checking if daemon is locked")
|
|
if daemonLocked, _ := fsrepo.LockedByOtherProcess(cctx.ConfigRoot); daemonLocked {
|
|
return nil, cmds.ClientError("ipfs daemon is running. please stop it to run this command")
|
|
}
|
|
return nil, nil
|
|
}
|
|
|
|
return client, nil
|
|
}
|
|
|
|
if details.cannotRunOnClient {
|
|
return nil, cmds.ClientError("must run on the ipfs daemon")
|
|
}
|
|
|
|
return nil, nil
|
|
}
|
|
|
|
func getRepoPath(req *cmds.Request) (string, error) {
|
|
repoOpt, found := req.Options["config"].(string)
|
|
if found && repoOpt != "" {
|
|
return repoOpt, nil
|
|
}
|
|
|
|
repoPath, err := fsrepo.BestKnownPath()
|
|
if err != nil {
|
|
return "", err
|
|
}
|
|
return repoPath, nil
|
|
}
|
|
|
|
func loadConfig(path string) (*config.Config, error) {
|
|
return fsrepo.ConfigAt(path)
|
|
}
|
|
|
|
// startProfiling begins CPU profiling and returns a `stop` function to be
|
|
// executed as late as possible. The stop function captures the memprofile.
|
|
func startProfiling() (func(), error) {
|
|
// start CPU profiling as early as possible
|
|
ofi, err := os.Create(cpuProfile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
pprof.StartCPUProfile(ofi)
|
|
go func() {
|
|
for range time.NewTicker(time.Second * 30).C {
|
|
err := writeHeapProfileToFile()
|
|
if err != nil {
|
|
log.Error(err)
|
|
}
|
|
}
|
|
}()
|
|
|
|
stopProfiling := func() {
|
|
pprof.StopCPUProfile()
|
|
defer ofi.Close() // captured by the closure
|
|
}
|
|
return stopProfiling, nil
|
|
}
|
|
|
|
func writeHeapProfileToFile() error {
|
|
mprof, err := os.Create(heapProfile)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer mprof.Close() // _after_ writing the heap profile
|
|
return pprof.WriteHeapProfile(mprof)
|
|
}
|
|
|
|
// IntrHandler helps set up an interrupt handler that can
|
|
// be cleanly shut down through the io.Closer interface.
|
|
type IntrHandler struct {
|
|
sig chan os.Signal
|
|
wg sync.WaitGroup
|
|
}
|
|
|
|
func NewIntrHandler() *IntrHandler {
|
|
ih := &IntrHandler{}
|
|
ih.sig = make(chan os.Signal, 1)
|
|
return ih
|
|
}
|
|
|
|
func (ih *IntrHandler) Close() error {
|
|
close(ih.sig)
|
|
ih.wg.Wait()
|
|
return nil
|
|
}
|
|
|
|
// Handle starts handling the given signals, and will call the handler
|
|
// callback function each time a signal is catched. The function is passed
|
|
// the number of times the handler has been triggered in total, as
|
|
// well as the handler itself, so that the handling logic can use the
|
|
// handler's wait group to ensure clean shutdown when Close() is called.
|
|
func (ih *IntrHandler) Handle(handler func(count int, ih *IntrHandler), sigs ...os.Signal) {
|
|
signal.Notify(ih.sig, sigs...)
|
|
ih.wg.Add(1)
|
|
go func() {
|
|
defer ih.wg.Done()
|
|
count := 0
|
|
for range ih.sig {
|
|
count++
|
|
handler(count, ih)
|
|
}
|
|
signal.Stop(ih.sig)
|
|
}()
|
|
}
|
|
|
|
func setupInterruptHandler(ctx context.Context) (io.Closer, context.Context) {
|
|
intrh := NewIntrHandler()
|
|
ctx, cancelFunc := context.WithCancel(ctx)
|
|
|
|
handlerFunc := func(count int, ih *IntrHandler) {
|
|
switch count {
|
|
case 1:
|
|
fmt.Println() // Prevent un-terminated ^C character in terminal
|
|
|
|
ih.wg.Add(1)
|
|
go func() {
|
|
defer ih.wg.Done()
|
|
cancelFunc()
|
|
}()
|
|
|
|
default:
|
|
fmt.Println("Received another interrupt before graceful shutdown, terminating...")
|
|
os.Exit(-1)
|
|
}
|
|
}
|
|
|
|
intrh.Handle(handlerFunc, syscall.SIGHUP, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
return intrh, ctx
|
|
}
|
|
|
|
func profileIfEnabled() (func(), error) {
|
|
// FIXME this is a temporary hack so profiling of asynchronous operations
|
|
// works as intended.
|
|
if os.Getenv(EnvEnableProfiling) != "" {
|
|
stopProfilingFunc, err := startProfiling() // TODO maybe change this to its own option... profiling makes it slower.
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return stopProfilingFunc, nil
|
|
}
|
|
return func() {}, nil
|
|
}
|
|
|
|
var apiFileErrorFmt string = `Failed to parse '%[1]s/api' file.
|
|
error: %[2]s
|
|
If you're sure go-ipfs isn't running, you can just delete it.
|
|
`
|
|
var checkIPFSUnixFmt = "Otherwise check:\n\tps aux | grep ipfs"
|
|
var checkIPFSWinFmt = "Otherwise check:\n\ttasklist | findstr ipfs"
|
|
|
|
// getApiClient checks the repo, and the given options, checking for
|
|
// a running API service. if there is one, it returns a client.
|
|
// otherwise, it returns errApiNotRunning, or another error.
|
|
func getApiClient(repoPath, apiAddrStr string) (http.Client, error) {
|
|
var apiErrorFmt string
|
|
switch {
|
|
case osh.IsUnix():
|
|
apiErrorFmt = apiFileErrorFmt + checkIPFSUnixFmt
|
|
case osh.IsWindows():
|
|
apiErrorFmt = apiFileErrorFmt + checkIPFSWinFmt
|
|
default:
|
|
apiErrorFmt = apiFileErrorFmt
|
|
}
|
|
|
|
var addr ma.Multiaddr
|
|
var err error
|
|
if len(apiAddrStr) != 0 {
|
|
addr, err = ma.NewMultiaddr(apiAddrStr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if len(addr.Protocols()) == 0 {
|
|
return nil, fmt.Errorf("multiaddr doesn't provide any protocols")
|
|
}
|
|
} else {
|
|
addr, err = fsrepo.APIAddr(repoPath)
|
|
if err == repo.ErrApiNotRunning {
|
|
return nil, err
|
|
}
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf(apiErrorFmt, repoPath, err.Error())
|
|
}
|
|
}
|
|
if len(addr.Protocols()) == 0 {
|
|
return nil, fmt.Errorf(apiErrorFmt, repoPath, "multiaddr doesn't provide any protocols")
|
|
}
|
|
return apiClientForAddr(addr)
|
|
}
|
|
|
|
func apiClientForAddr(addr ma.Multiaddr) (http.Client, error) {
|
|
_, host, err := manet.DialArgs(addr)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return http.NewClient(host, http.ClientWithAPIPrefix(corehttp.APIPath)), nil
|
|
}
|