diff --git a/cmd/ipfs/daemon.go b/cmd/ipfs/daemon.go index 02e218d3e..68f869298 100644 --- a/cmd/ipfs/daemon.go +++ b/cmd/ipfs/daemon.go @@ -3,13 +3,16 @@ package main import ( _ "expvar" "fmt" - _ "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/codahale/metrics/runtime" "net/http" _ "net/http/pprof" "os" "strings" + "sync" + _ "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/codahale/metrics/runtime" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net" + cmds "github.com/ipfs/go-ipfs/commands" "github.com/ipfs/go-ipfs/core" commands "github.com/ipfs/go-ipfs/core/commands" @@ -192,98 +195,76 @@ func daemonFunc(req cmds.Request, res cmds.Response) { return node, nil } - // verify api address is valid multiaddr - apiMaddr, err := ma.NewMultiaddr(cfg.Addresses.API) + // construct api endpoint - every time + err, apiErrc := mountHTTPapi(req) if err != nil { res.SetError(err, cmds.ErrNormal) return } - var gatewayMaddr ma.Multiaddr + // construct http gateway - if it is set in the config + var gwErrc <-chan error if len(cfg.Addresses.Gateway) > 0 { - // ignore error for gateway address - // if there is an error (invalid address), then don't run the gateway - gatewayMaddr, _ = ma.NewMultiaddr(cfg.Addresses.Gateway) - if gatewayMaddr == nil { - log.Errorf("Invalid gateway address: %s", cfg.Addresses.Gateway) + var err error + err, gwErrc = mountHTTPgw(req) + if err != nil { + res.SetError(err, cmds.ErrNormal) + return } } - // mount if the user provided the --mount flag + // construct fuse mountpoints - if the user provided the --mount flag mount, _, err := req.Option(mountKwd).Bool() if err != nil { res.SetError(err, cmds.ErrNormal) return } if mount { - fsdir, found, err := req.Option(ipfsMountKwd).String() - if err != nil { + if err := mountFuse(req); err != nil { res.SetError(err, cmds.ErrNormal) return } - if !found { - fsdir = cfg.Mounts.IPFS - } - - nsdir, found, err := req.Option(ipnsMountKwd).String() - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - if !found { - nsdir = cfg.Mounts.IPNS - } - - err = commands.Mount(node, fsdir, nsdir) - if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - fmt.Printf("IPFS mounted at: %s\n", fsdir) - fmt.Printf("IPNS mounted at: %s\n", nsdir) } - var rootRedirect corehttp.ServeOption - if len(cfg.Gateway.RootRedirect) > 0 { - rootRedirect = corehttp.RedirectOption("", cfg.Gateway.RootRedirect) + // collect long-running errors and block for shutdown + // TODO(cryptix): our fuse currently doesnt follow this pattern for graceful shutdown + for err := range merge(apiErrc, gwErrc) { + if err != nil { + res.SetError(err, cmds.ErrNormal) + return + } } +} - writable, writableOptionFound, err := req.Option(writableKwd).Bool() +// mountHTTPapi collects options, creates listener, prints status message and starts serving requests +func mountHTTPapi(req cmds.Request) (error, <-chan error) { + cfg, err := req.Context().GetConfig() if err != nil { - res.SetError(err, cmds.ErrNormal) - return - } - if !writableOptionFound { - writable = cfg.Gateway.Writable + return fmt.Errorf("mountHTTPapi: GetConfig() failed: %s", err), nil } - if gatewayMaddr != nil { - go func() { - var opts = []corehttp.ServeOption{ - corehttp.VersionOption(), - corehttp.IPNSHostnameOption(), - corehttp.GatewayOption(writable), - } - if rootRedirect != nil { - opts = append(opts, rootRedirect) - } - if writable { - fmt.Printf("Gateway (writable) server listening on %s\n", gatewayMaddr) - } else { - fmt.Printf("Gateway (readonly) server listening on %s\n", gatewayMaddr) - } - err := corehttp.ListenAndServe(node, gatewayMaddr.String(), opts...) - if err != nil { - log.Error(err) - } - }() + apiMaddr, err := ma.NewMultiaddr(cfg.Addresses.API) + if err != nil { + return fmt.Errorf("mountHTTPapi: invalid API address: %q (err: %s)", cfg.Addresses.API, err), nil } - gateway := corehttp.NewGateway(corehttp.GatewayConfig{ + apiLis, err := manet.Listen(apiMaddr) + if err != nil { + return fmt.Errorf("mountHTTPapi: manet.Listen(%s) failed: %s", apiMaddr, err), nil + } + // we might have listened to /tcp/0 - lets see what we are listing on + apiMaddr = apiLis.Multiaddr() + fmt.Printf("API server listening on %s\n", apiMaddr) + + unrestricted, _, err := req.Option(unrestrictedApiAccess).Bool() + if err != nil { + return fmt.Errorf("mountHTTPapi: Option(%s) failed: %s", unrestrictedApiAccess, err), nil + } + + apiGw := corehttp.NewGateway(corehttp.GatewayConfig{ Writable: true, BlockList: &corehttp.BlockList{ Decider: func(s string) bool { - unrestricted, _, _ := req.Option(unrestrictedApiAccess).Bool() if unrestricted { return true } @@ -300,18 +281,146 @@ func daemonFunc(req cmds.Request, res cmds.Response) { var opts = []corehttp.ServeOption{ corehttp.CommandsOption(*req.Context()), corehttp.WebUIOption, - gateway.ServeOption(), + apiGw.ServeOption(), corehttp.VersionOption(), defaultMux("/debug/vars"), defaultMux("/debug/pprof/"), } - if rootRedirect != nil { - opts = append(opts, rootRedirect) + if len(cfg.Gateway.RootRedirect) > 0 { + opts = append(opts, corehttp.RedirectOption("", cfg.Gateway.RootRedirect)) } - fmt.Printf("API server listening on %s\n", apiMaddr) - if err := corehttp.ListenAndServe(node, apiMaddr.String(), opts...); err != nil { - res.SetError(err, cmds.ErrNormal) - return + + node, err := req.Context().ConstructNode() + if err != nil { + return fmt.Errorf("mountHTTPgw: ConstructNode() failed: %s", err), nil } + + errc := make(chan error) + go func() { + errc <- corehttp.Serve(node, apiLis.NetListener(), opts...) + }() + return nil, errc +} + +// mountHTTPgw collects options, creates listener, prints status message and starts serving requests +func mountHTTPgw(req cmds.Request) (error, <-chan error) { + cfg, err := req.Context().GetConfig() + if err != nil { + return fmt.Errorf("mountHTTPgw: GetConfig() failed: %s", err), nil + } + + gatewayMaddr, err := ma.NewMultiaddr(cfg.Addresses.Gateway) + if err != nil { + return fmt.Errorf("mountHTTPgw: invalid gateway address: %q (err: %s)", cfg.Addresses.Gateway, err), nil + } + + writable, writableOptionFound, err := req.Option(writableKwd).Bool() + if err != nil { + return fmt.Errorf("mountHTTPgw: req.Option(%s) failed: %s", writableKwd, err), nil + } + if !writableOptionFound { + writable = cfg.Gateway.Writable + } + + gwLis, err := manet.Listen(gatewayMaddr) + if err != nil { + return fmt.Errorf("mountHTTPgw: manet.Listen(%s) failed: %s", gatewayMaddr, err), nil + } + // we might have listened to /tcp/0 - lets see what we are listing on + gatewayMaddr = gwLis.Multiaddr() + + if writable { + fmt.Printf("Gateway (writable) server listening on %s\n", gatewayMaddr) + } else { + fmt.Printf("Gateway (readonly) server listening on %s\n", gatewayMaddr) + } + + var opts = []corehttp.ServeOption{ + corehttp.VersionOption(), + corehttp.IPNSHostnameOption(), + corehttp.GatewayOption(writable), + } + + if len(cfg.Gateway.RootRedirect) > 0 { + opts = append(opts, corehttp.RedirectOption("", cfg.Gateway.RootRedirect)) + } + + node, err := req.Context().ConstructNode() + if err != nil { + return fmt.Errorf("mountHTTPgw: ConstructNode() failed: %s", err), nil + } + + errc := make(chan error) + go func() { + errc <- corehttp.Serve(node, gwLis.NetListener(), opts...) + }() + return nil, errc +} + +//collects options and opens the fuse mountpoint +func mountFuse(req cmds.Request) error { + cfg, err := req.Context().GetConfig() + if err != nil { + return fmt.Errorf("mountFuse: GetConfig() failed: %s", err) + } + + fsdir, found, err := req.Option(ipfsMountKwd).String() + if err != nil { + return fmt.Errorf("mountFuse: req.Option(%s) failed: %s", ipfsMountKwd, err) + } + if !found { + fsdir = cfg.Mounts.IPFS + } + + nsdir, found, err := req.Option(ipnsMountKwd).String() + if err != nil { + return fmt.Errorf("mountFuse: req.Option(%s) failed: %s", ipnsMountKwd, err) + } + if !found { + nsdir = cfg.Mounts.IPNS + } + + node, err := req.Context().ConstructNode() + if err != nil { + return fmt.Errorf("mountFuse: ConstructNode() failed: %s", err) + } + + err = commands.Mount(node, fsdir, nsdir) + if err != nil { + return err + } + fmt.Printf("IPFS mounted at: %s\n", fsdir) + fmt.Printf("IPNS mounted at: %s\n", nsdir) + return nil +} + +// merge does fan-in of multiple read-only error channels +// taken from http://blog.golang.org/pipelines +func merge(cs ...<-chan error) <-chan error { + var wg sync.WaitGroup + out := make(chan error) + + // Start an output goroutine for each input channel in cs. output + // copies values from c to out until c is closed, then calls wg.Done. + output := func(c <-chan error) { + for n := range c { + out <- n + } + wg.Done() + } + wg.Add(len(cs)) + for _, c := range cs { + if c != nil { + go output(c) + } + } + + // Start a goroutine to close out once all the output goroutines are + // done. This must start after the wg.Add call. + go func() { + wg.Wait() + close(out) + }() + return out } diff --git a/core/corehttp/corehttp.go b/core/corehttp/corehttp.go index 18afa30e5..95a159fa2 100644 --- a/core/corehttp/corehttp.go +++ b/core/corehttp/corehttp.go @@ -5,6 +5,7 @@ high-level HTTP interfaces to IPFS. package corehttp import ( + "fmt" "net" "net/http" "time" @@ -49,20 +50,26 @@ func ListenAndServe(n *core.IpfsNode, listeningMultiAddr string, options ...Serv if err != nil { return err } - handler, err := makeHandler(n, options...) + + list, err := manet.Listen(addr) if err != nil { return err } - return listenAndServe(n, addr, handler) + + // we might have listened to /tcp/0 - lets see what we are listing on + addr = list.Multiaddr() + fmt.Printf("API server listening on %s\n", addr) + + return Serve(n, list.NetListener(), options...) } -func listenAndServe(node *core.IpfsNode, addr ma.Multiaddr, handler http.Handler) error { - netarg, host, err := manet.DialArgs(addr) +func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error { + handler, err := makeHandler(node, options...) if err != nil { return err } - list, err := net.Listen(netarg, host) + addr, err := manet.FromNetAddr(lis.Addr()) if err != nil { return err } @@ -75,7 +82,7 @@ func listenAndServe(node *core.IpfsNode, addr ma.Multiaddr, handler http.Handler defer node.Children().Done() go func() { - serverError = http.Serve(list, handler) + serverError = http.Serve(lis, handler) close(serverExited) }() @@ -87,7 +94,7 @@ func listenAndServe(node *core.IpfsNode, addr ma.Multiaddr, handler http.Handler case <-node.Closing(): log.Infof("server at %s terminating...", addr) - list.Close() + lis.Close() outer: for {