1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-28 00:39:31 +08:00

Merge pull request #1287 from ipfs/feat/httpApiTcpPort0

httpGw: make /tcp/0 work
This commit is contained in:
Juan Batiz-Benet
2015-06-08 03:46:35 -07:00
2 changed files with 193 additions and 77 deletions

View File

@ -3,13 +3,16 @@ package main
import ( import (
_ "expvar" _ "expvar"
"fmt" "fmt"
_ "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/codahale/metrics/runtime"
"net/http" "net/http"
_ "net/http/pprof" _ "net/http/pprof"
"os" "os"
"strings" "strings"
"sync"
_ "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/codahale/metrics/runtime"
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net"
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
"github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core"
commands "github.com/ipfs/go-ipfs/core/commands" commands "github.com/ipfs/go-ipfs/core/commands"
@ -192,98 +195,76 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
return node, nil return node, nil
} }
// verify api address is valid multiaddr // construct api endpoint - every time
apiMaddr, err := ma.NewMultiaddr(cfg.Addresses.API) err, apiErrc := serveHTTPApi(req)
if err != nil { if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
} }
var gatewayMaddr ma.Multiaddr // construct http gateway - if it is set in the config
var gwErrc <-chan error
if len(cfg.Addresses.Gateway) > 0 { if len(cfg.Addresses.Gateway) > 0 {
// ignore error for gateway address var err error
// if there is an error (invalid address), then don't run the gateway err, gwErrc = serveHTTPGateway(req)
gatewayMaddr, _ = ma.NewMultiaddr(cfg.Addresses.Gateway) if err != nil {
if gatewayMaddr == nil { res.SetError(err, cmds.ErrNormal)
log.Errorf("Invalid gateway address: %s", cfg.Addresses.Gateway) return
} }
} }
// mount if the user provided the --mount flag // construct fuse mountpoints - if the user provided the --mount flag
mount, _, err := req.Option(mountKwd).Bool() mount, _, err := req.Option(mountKwd).Bool()
if err != nil { if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
} }
if mount { if mount {
fsdir, found, err := req.Option(ipfsMountKwd).String() if err := mountFuse(req); err != nil {
if err != nil {
res.SetError(err, cmds.ErrNormal) res.SetError(err, cmds.ErrNormal)
return return
} }
if !found {
fsdir = cfg.Mounts.IPFS
}
nsdir, found, err := req.Option(ipnsMountKwd).String()
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
if !found {
nsdir = cfg.Mounts.IPNS
}
err = commands.Mount(node, fsdir, nsdir)
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
fmt.Printf("IPFS mounted at: %s\n", fsdir)
fmt.Printf("IPNS mounted at: %s\n", nsdir)
} }
var rootRedirect corehttp.ServeOption // collect long-running errors and block for shutdown
if len(cfg.Gateway.RootRedirect) > 0 { // TODO(cryptix): our fuse currently doesnt follow this pattern for graceful shutdown
rootRedirect = corehttp.RedirectOption("", cfg.Gateway.RootRedirect) for err := range merge(apiErrc, gwErrc) {
if err != nil {
res.SetError(err, cmds.ErrNormal)
return
}
} }
}
writable, writableOptionFound, err := req.Option(writableKwd).Bool() // serveHTTPApi collects options, creates listener, prints status message and starts serving requests
func serveHTTPApi(req cmds.Request) (error, <-chan error) {
cfg, err := req.Context().GetConfig()
if err != nil { if err != nil {
res.SetError(err, cmds.ErrNormal) return fmt.Errorf("serveHTTPApi: GetConfig() failed: %s", err), nil
return
}
if !writableOptionFound {
writable = cfg.Gateway.Writable
} }
if gatewayMaddr != nil { apiMaddr, err := ma.NewMultiaddr(cfg.Addresses.API)
go func() { if err != nil {
var opts = []corehttp.ServeOption{ return fmt.Errorf("serveHTTPApi: invalid API address: %q (err: %s)", cfg.Addresses.API, err), nil
corehttp.VersionOption(),
corehttp.IPNSHostnameOption(),
corehttp.GatewayOption(writable),
}
if rootRedirect != nil {
opts = append(opts, rootRedirect)
}
if writable {
fmt.Printf("Gateway (writable) server listening on %s\n", gatewayMaddr)
} else {
fmt.Printf("Gateway (readonly) server listening on %s\n", gatewayMaddr)
}
err := corehttp.ListenAndServe(node, gatewayMaddr.String(), opts...)
if err != nil {
log.Error(err)
}
}()
} }
gateway := corehttp.NewGateway(corehttp.GatewayConfig{ apiLis, err := manet.Listen(apiMaddr)
if err != nil {
return fmt.Errorf("serveHTTPApi: manet.Listen(%s) failed: %s", apiMaddr, err), nil
}
// we might have listened to /tcp/0 - lets see what we are listing on
apiMaddr = apiLis.Multiaddr()
fmt.Printf("API server listening on %s\n", apiMaddr)
unrestricted, _, err := req.Option(unrestrictedApiAccess).Bool()
if err != nil {
return fmt.Errorf("serveHTTPApi: Option(%s) failed: %s", unrestrictedApiAccess, err), nil
}
apiGw := corehttp.NewGateway(corehttp.GatewayConfig{
Writable: true, Writable: true,
BlockList: &corehttp.BlockList{ BlockList: &corehttp.BlockList{
Decider: func(s string) bool { Decider: func(s string) bool {
unrestricted, _, _ := req.Option(unrestrictedApiAccess).Bool()
if unrestricted { if unrestricted {
return true return true
} }
@ -300,18 +281,146 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
var opts = []corehttp.ServeOption{ var opts = []corehttp.ServeOption{
corehttp.CommandsOption(*req.Context()), corehttp.CommandsOption(*req.Context()),
corehttp.WebUIOption, corehttp.WebUIOption,
gateway.ServeOption(), apiGw.ServeOption(),
corehttp.VersionOption(), corehttp.VersionOption(),
defaultMux("/debug/vars"), defaultMux("/debug/vars"),
defaultMux("/debug/pprof/"), defaultMux("/debug/pprof/"),
} }
if rootRedirect != nil { if len(cfg.Gateway.RootRedirect) > 0 {
opts = append(opts, rootRedirect) opts = append(opts, corehttp.RedirectOption("", cfg.Gateway.RootRedirect))
} }
fmt.Printf("API server listening on %s\n", apiMaddr)
if err := corehttp.ListenAndServe(node, apiMaddr.String(), opts...); err != nil { node, err := req.Context().ConstructNode()
res.SetError(err, cmds.ErrNormal) if err != nil {
return return fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err), nil
} }
errc := make(chan error)
go func() {
errc <- corehttp.Serve(node, apiLis.NetListener(), opts...)
}()
return nil, errc
}
// serveHTTPGateway collects options, creates listener, prints status message and starts serving requests
func serveHTTPGateway(req cmds.Request) (error, <-chan error) {
cfg, err := req.Context().GetConfig()
if err != nil {
return fmt.Errorf("serveHTTPGateway: GetConfig() failed: %s", err), nil
}
gatewayMaddr, err := ma.NewMultiaddr(cfg.Addresses.Gateway)
if err != nil {
return fmt.Errorf("serveHTTPGateway: invalid gateway address: %q (err: %s)", cfg.Addresses.Gateway, err), nil
}
writable, writableOptionFound, err := req.Option(writableKwd).Bool()
if err != nil {
return fmt.Errorf("serveHTTPGateway: req.Option(%s) failed: %s", writableKwd, err), nil
}
if !writableOptionFound {
writable = cfg.Gateway.Writable
}
gwLis, err := manet.Listen(gatewayMaddr)
if err != nil {
return fmt.Errorf("serveHTTPGateway: manet.Listen(%s) failed: %s", gatewayMaddr, err), nil
}
// we might have listened to /tcp/0 - lets see what we are listing on
gatewayMaddr = gwLis.Multiaddr()
if writable {
fmt.Printf("Gateway (writable) server listening on %s\n", gatewayMaddr)
} else {
fmt.Printf("Gateway (readonly) server listening on %s\n", gatewayMaddr)
}
var opts = []corehttp.ServeOption{
corehttp.VersionOption(),
corehttp.IPNSHostnameOption(),
corehttp.GatewayOption(writable),
}
if len(cfg.Gateway.RootRedirect) > 0 {
opts = append(opts, corehttp.RedirectOption("", cfg.Gateway.RootRedirect))
}
node, err := req.Context().ConstructNode()
if err != nil {
return fmt.Errorf("serveHTTPGateway: ConstructNode() failed: %s", err), nil
}
errc := make(chan error)
go func() {
errc <- corehttp.Serve(node, gwLis.NetListener(), opts...)
}()
return nil, errc
}
//collects options and opens the fuse mountpoint
func mountFuse(req cmds.Request) error {
cfg, err := req.Context().GetConfig()
if err != nil {
return fmt.Errorf("mountFuse: GetConfig() failed: %s", err)
}
fsdir, found, err := req.Option(ipfsMountKwd).String()
if err != nil {
return fmt.Errorf("mountFuse: req.Option(%s) failed: %s", ipfsMountKwd, err)
}
if !found {
fsdir = cfg.Mounts.IPFS
}
nsdir, found, err := req.Option(ipnsMountKwd).String()
if err != nil {
return fmt.Errorf("mountFuse: req.Option(%s) failed: %s", ipnsMountKwd, err)
}
if !found {
nsdir = cfg.Mounts.IPNS
}
node, err := req.Context().ConstructNode()
if err != nil {
return fmt.Errorf("mountFuse: ConstructNode() failed: %s", err)
}
err = commands.Mount(node, fsdir, nsdir)
if err != nil {
return err
}
fmt.Printf("IPFS mounted at: %s\n", fsdir)
fmt.Printf("IPNS mounted at: %s\n", nsdir)
return nil
}
// merge does fan-in of multiple read-only error channels
// taken from http://blog.golang.org/pipelines
func merge(cs ...<-chan error) <-chan error {
var wg sync.WaitGroup
out := make(chan error)
// Start an output goroutine for each input channel in cs. output
// copies values from c to out until c is closed, then calls wg.Done.
output := func(c <-chan error) {
for n := range c {
out <- n
}
wg.Done()
}
for _, c := range cs {
if c != nil {
wg.Add(1)
go output(c)
}
}
// Start a goroutine to close out once all the output goroutines are
// done. This must start after the wg.Add call.
go func() {
wg.Wait()
close(out)
}()
return out
} }

View File

@ -5,6 +5,7 @@ high-level HTTP interfaces to IPFS.
package corehttp package corehttp
import ( import (
"fmt"
"net" "net"
"net/http" "net/http"
"time" "time"
@ -49,20 +50,26 @@ func ListenAndServe(n *core.IpfsNode, listeningMultiAddr string, options ...Serv
if err != nil { if err != nil {
return err return err
} }
handler, err := makeHandler(n, options...)
list, err := manet.Listen(addr)
if err != nil { if err != nil {
return err return err
} }
return listenAndServe(n, addr, handler)
// we might have listened to /tcp/0 - lets see what we are listing on
addr = list.Multiaddr()
fmt.Printf("API server listening on %s\n", addr)
return Serve(n, list.NetListener(), options...)
} }
func listenAndServe(node *core.IpfsNode, addr ma.Multiaddr, handler http.Handler) error { func Serve(node *core.IpfsNode, lis net.Listener, options ...ServeOption) error {
netarg, host, err := manet.DialArgs(addr) handler, err := makeHandler(node, options...)
if err != nil { if err != nil {
return err return err
} }
list, err := net.Listen(netarg, host) addr, err := manet.FromNetAddr(lis.Addr())
if err != nil { if err != nil {
return err return err
} }
@ -75,7 +82,7 @@ func listenAndServe(node *core.IpfsNode, addr ma.Multiaddr, handler http.Handler
defer node.Children().Done() defer node.Children().Done()
go func() { go func() {
serverError = http.Serve(list, handler) serverError = http.Serve(lis, handler)
close(serverExited) close(serverExited)
}() }()
@ -87,7 +94,7 @@ func listenAndServe(node *core.IpfsNode, addr ma.Multiaddr, handler http.Handler
case <-node.Closing(): case <-node.Closing():
log.Infof("server at %s terminating...", addr) log.Infof("server at %s terminating...", addr)
list.Close() lis.Close()
outer: outer:
for { for {