1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-05-21 08:56:37 +08:00

coreapi: Untangle from core.IpfsNode

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera
2018-12-06 21:00:40 +01:00
parent 6e90367696
commit 3183b1cb8e
14 changed files with 210 additions and 124 deletions

View File

@ -88,7 +88,7 @@ var addPinCmd = &cmds.Command{
} }
if !showProgress { if !showProgress {
added, err := corerepo.Pin(n, api, req.Context, req.Arguments, recursive) added, err := corerepo.Pin(n.Pinning, api, req.Context, req.Arguments, recursive)
if err != nil { if err != nil {
return err return err
} }
@ -105,7 +105,7 @@ var addPinCmd = &cmds.Command{
ch := make(chan pinResult, 1) ch := make(chan pinResult, 1)
go func() { go func() {
added, err := corerepo.Pin(n, api, ctx, req.Arguments, recursive) added, err := corerepo.Pin(n.Pinning, api, ctx, req.Arguments, recursive)
ch <- pinResult{pins: added, err: err} ch <- pinResult{pins: added, err: err}
}() }()
@ -215,7 +215,7 @@ collected if needed. (By default, recursively. Use -r=false for direct pins.)
return err return err
} }
removed, err := corerepo.Unpin(n, api, req.Context, req.Arguments, recursive) removed, err := corerepo.Unpin(n.Pinning, api, req.Context, req.Arguments, recursive)
if err != nil { if err != nil {
return err return err
} }

View File

@ -43,7 +43,7 @@ func (api *BlockAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Bloc
return nil, err return nil, err
} }
err = api.node.Blocks.AddBlock(b) err = api.blocks.AddBlock(b)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -57,7 +57,7 @@ func (api *BlockAPI) Get(ctx context.Context, p coreiface.Path) (io.Reader, erro
return nil, err return nil, err
} }
b, err := api.node.Blocks.GetBlock(ctx, rp.Cid()) b, err := api.blocks.GetBlock(ctx, rp.Cid())
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -78,7 +78,7 @@ func (api *BlockAPI) Rm(ctx context.Context, p coreiface.Path, opts ...caopts.Bl
cids := []cid.Cid{rp.Cid()} cids := []cid.Cid{rp.Cid()}
o := util.RmBlocksOpts{Force: settings.Force} o := util.RmBlocksOpts{Force: settings.Force}
out, err := util.RmBlocks(api.node.Blockstore, api.node.Pinning, cids, o) out, err := util.RmBlocks(api.blockstore, api.pinning, cids, o)
if err != nil { if err != nil {
return err return err
} }
@ -109,7 +109,7 @@ func (api *BlockAPI) Stat(ctx context.Context, p coreiface.Path) (coreiface.Bloc
return nil, err return nil, err
} }
b, err := api.node.Blocks.GetBlock(ctx, rp.Cid()) b, err := api.blocks.GetBlock(ctx, rp.Cid())
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -15,25 +15,103 @@ package coreapi
import ( import (
"context" "context"
"errors"
core "github.com/ipfs/go-ipfs/core" core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
options "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
namesys "github.com/ipfs/go-ipfs/namesys"
pin "github.com/ipfs/go-ipfs/pin"
repo "github.com/ipfs/go-ipfs/repo"
ci "gx/ipfs/QmNiJiXwWE3kRhZrC5ej3kSjWHm337pYfhjLGSCDNKJP2s/go-libp2p-crypto"
exchange "gx/ipfs/QmP2g3VxmC7g7fyRJDj1VJ72KHZbJ9UW24YjSWEj1XTb4H/go-ipfs-exchange-interface"
bserv "gx/ipfs/QmPoh3SrQzFBWtdGK6qmHDV4EanKR6kYPj4DD3J2NLoEmZ/go-blockservice"
routing "gx/ipfs/QmRASJXJUFygM5qU4YrH7k7jD6S4Hg8nJmgqJ4bYJvLatd/go-libp2p-routing"
blockstore "gx/ipfs/QmS2aqUZLJp8kF1ihE5rvDGE5LvmKDPnx32w9Z1BW9xLV5/go-ipfs-blockstore"
peer "gx/ipfs/QmY5Grm8pJdiSSVsYxx4uNRgweY72EmYwuSDbRnbFok3iY/go-libp2p-peer"
pstore "gx/ipfs/QmZ9zH2FnLcxv1xyzFeUpDUeo55xEhZQHgveZijcxr7TLj/go-libp2p-peerstore"
pubsub "gx/ipfs/QmaqGyUhWLsJbVo1QAujSu13mxNjFJ98Kt2VWGSnShGE1Q/go-libp2p-pubsub"
ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format" ipld "gx/ipfs/QmcKKBwfz6FyQdHR2jsXrrF6XeSBXYL86anmWNewpFpoF5/go-ipld-format"
logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log" logging "gx/ipfs/QmcuXC5cxs79ro2cUuHs4HQ2bkDLJUYokwL8aivcX6HW3C/go-log"
dag "gx/ipfs/QmdV35UHnL1FM52baPkeUo6u7Fxm2CRUkPTLRPxeF8a4Ap/go-merkledag" dag "gx/ipfs/QmdV35UHnL1FM52baPkeUo6u7Fxm2CRUkPTLRPxeF8a4Ap/go-merkledag"
record "gx/ipfs/QmfARXVCzpwFXQdepAJZuqyNDgV9doEsMnVCo1ssmuSe1U/go-libp2p-record"
p2phost "gx/ipfs/QmfD51tKgJiTMnW9JEiDiPwsCY4mqUoxkhKhBfyW12spTC/go-libp2p-host"
) )
var log = logging.Logger("core/coreapi") var log = logging.Logger("core/coreapi")
type CoreAPI struct { type CoreAPI struct {
node *core.IpfsNode nctx context.Context
dag ipld.DAGService
identity peer.ID //TODO: check mutable structs
privateKey ci.PrivKey
repo repo.Repo
blockstore blockstore.GCBlockstore
baseBlocks blockstore.Blockstore
blocks bserv.BlockService
dag ipld.DAGService
pinning pin.Pinner
peerstore pstore.Peerstore
peerHost p2phost.Host
namesys namesys.NameSystem
recordValidator record.Validator
exchange exchange.Interface
routing routing.IpfsRouting
pubSub *pubsub.PubSub
checkRouting func(bool) error
// TODO: this can be generalized to all functions when we implement some
// api based security mechanism
isPublishAllowed func() error
} }
// NewCoreAPI creates new instance of IPFS CoreAPI backed by go-ipfs Node. // NewCoreAPI creates new instance of IPFS CoreAPI backed by go-ipfs Node.
func NewCoreAPI(n *core.IpfsNode) coreiface.CoreAPI { func NewCoreAPI(n *core.IpfsNode, opts ...options.ApiOption) coreiface.CoreAPI {
api := &CoreAPI{n, n.DAG} api := &CoreAPI{
nctx: n.Context(),
identity: n.Identity,
privateKey: n.PrivateKey,
repo: n.Repo,
blockstore: n.Blockstore,
baseBlocks: n.BaseBlocks,
blocks: n.Blocks,
dag: n.DAG,
pinning: n.Pinning,
peerstore: n.Peerstore,
peerHost: n.PeerHost,
namesys: n.Namesys,
recordValidator: n.RecordValidator,
exchange: n.Exchange,
routing: n.Routing,
pubSub: n.PubSub,
checkRouting: func(allowOffline bool) error {
if !n.OnlineMode() {
if !allowOffline {
return coreiface.ErrOffline
}
return n.SetupOfflineRouting()
}
return nil
},
isPublishAllowed: func() error {
if n.Mounts.Ipns != nil && n.Mounts.Ipns.IsActive() {
return errors.New("cannot manually publish while IPNS is mounted")
}
return nil
},
}
return api return api
} }
@ -89,6 +167,10 @@ func (api *CoreAPI) PubSub() coreiface.PubSubAPI {
// getSession returns new api backed by the same node with a read-only session DAG // getSession returns new api backed by the same node with a read-only session DAG
func (api *CoreAPI) getSession(ctx context.Context) *CoreAPI { func (api *CoreAPI) getSession(ctx context.Context) *CoreAPI {
ng := dag.NewReadOnlyDagService(dag.NewSession(ctx, api.dag)) sesApi := *api
return &CoreAPI{api.node, ng}
//TODO: we may want to apply this to other things too
sesApi.dag = dag.NewReadOnlyDagService(dag.NewSession(ctx, api.dag))
return &sesApi
} }

View File

@ -22,7 +22,7 @@ import (
type DhtAPI CoreAPI type DhtAPI CoreAPI
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) { func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (pstore.PeerInfo, error) {
pi, err := api.node.Routing.FindPeer(ctx, peer.ID(p)) pi, err := api.routing.FindPeer(ctx, peer.ID(p))
if err != nil { if err != nil {
return pstore.PeerInfo{}, err return pstore.PeerInfo{}, err
} }
@ -46,7 +46,7 @@ func (api *DhtAPI) FindProviders(ctx context.Context, p coreiface.Path, opts ...
return nil, fmt.Errorf("number of providers must be greater than 0") return nil, fmt.Errorf("number of providers must be greater than 0")
} }
pchan := api.node.Routing.FindProvidersAsync(ctx, rp.Cid(), numProviders) pchan := api.routing.FindProvidersAsync(ctx, rp.Cid(), numProviders)
return pchan, nil return pchan, nil
} }
@ -56,7 +56,7 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao
return err return err
} }
if api.node.Routing == nil { if api.routing == nil {
return errors.New("cannot provide in offline mode") return errors.New("cannot provide in offline mode")
} }
@ -67,7 +67,7 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao
c := rp.Cid() c := rp.Cid()
has, err := api.node.Blockstore.Has(c) has, err := api.blockstore.Has(c)
if err != nil { if err != nil {
return err return err
} }
@ -77,9 +77,9 @@ func (api *DhtAPI) Provide(ctx context.Context, path coreiface.Path, opts ...cao
} }
if settings.Recursive { if settings.Recursive {
err = provideKeysRec(ctx, api.node.Routing, api.node.Blockstore, []cid.Cid{c}) err = provideKeysRec(ctx, api.routing, api.blockstore, []cid.Cid{c})
} else { } else {
err = provideKeys(ctx, api.node.Routing, []cid.Cid{c}) err = provideKeys(ctx, api.routing, []cid.Cid{c})
} }
if err != nil { if err != nil {
return err return err

View File

@ -54,7 +54,7 @@ func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.Key
return nil, fmt.Errorf("cannot create key with name 'self'") return nil, fmt.Errorf("cannot create key with name 'self'")
} }
_, err = api.node.Repo.Keystore().Get(name) _, err = api.repo.Keystore().Get(name)
if err == nil { if err == nil {
return nil, fmt.Errorf("key with name '%s' already exists", name) return nil, fmt.Errorf("key with name '%s' already exists", name)
} }
@ -87,7 +87,7 @@ func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.Key
return nil, fmt.Errorf("unrecognized key type: %s", options.Algorithm) return nil, fmt.Errorf("unrecognized key type: %s", options.Algorithm)
} }
err = api.node.Repo.Keystore().Put(name, sk) err = api.repo.Keystore().Put(name, sk)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -102,7 +102,7 @@ func (api *KeyAPI) Generate(ctx context.Context, name string, opts ...caopts.Key
// List returns a list keys stored in keystore. // List returns a list keys stored in keystore.
func (api *KeyAPI) List(ctx context.Context) ([]coreiface.Key, error) { func (api *KeyAPI) List(ctx context.Context) ([]coreiface.Key, error) {
keys, err := api.node.Repo.Keystore().List() keys, err := api.repo.Keystore().List()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -110,10 +110,10 @@ func (api *KeyAPI) List(ctx context.Context) ([]coreiface.Key, error) {
sort.Strings(keys) sort.Strings(keys)
out := make([]coreiface.Key, len(keys)+1) out := make([]coreiface.Key, len(keys)+1)
out[0] = &key{"self", api.node.Identity} out[0] = &key{"self", api.identity}
for n, k := range keys { for n, k := range keys {
privKey, err := api.node.Repo.Keystore().Get(k) privKey, err := api.repo.Keystore().Get(k)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -138,7 +138,7 @@ func (api *KeyAPI) Rename(ctx context.Context, oldName string, newName string, o
return nil, false, err return nil, false, err
} }
ks := api.node.Repo.Keystore() ks := api.repo.Keystore()
if oldName == "self" { if oldName == "self" {
return nil, false, fmt.Errorf("cannot rename key with name 'self'") return nil, false, fmt.Errorf("cannot rename key with name 'self'")
@ -192,7 +192,7 @@ func (api *KeyAPI) Rename(ctx context.Context, oldName string, newName string, o
// Remove removes keys from keystore. Returns ipns path of the removed key. // Remove removes keys from keystore. Returns ipns path of the removed key.
func (api *KeyAPI) Remove(ctx context.Context, name string) (coreiface.Key, error) { func (api *KeyAPI) Remove(ctx context.Context, name string) (coreiface.Key, error) {
ks := api.node.Repo.Keystore() ks := api.repo.Keystore()
if name == "self" { if name == "self" {
return nil, fmt.Errorf("cannot remove key with name 'self'") return nil, fmt.Errorf("cannot remove key with name 'self'")
@ -219,9 +219,9 @@ func (api *KeyAPI) Remove(ctx context.Context, name string) (coreiface.Key, erro
} }
func (api *KeyAPI) Self(ctx context.Context) (coreiface.Key, error) { func (api *KeyAPI) Self(ctx context.Context) (coreiface.Key, error) {
if api.node.Identity == "" { if api.identity == "" {
return nil, errors.New("identity not loaded") return nil, errors.New("identity not loaded")
} }
return &key{"self", api.node.Identity}, nil return &key{"self", api.identity}, nil
} }

View File

@ -7,13 +7,13 @@ import (
"strings" "strings"
"time" "time"
"github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
"github.com/ipfs/go-ipfs/keystore" "github.com/ipfs/go-ipfs/keystore"
"github.com/ipfs/go-ipfs/namesys" "github.com/ipfs/go-ipfs/namesys"
"gx/ipfs/QmNiJiXwWE3kRhZrC5ej3kSjWHm337pYfhjLGSCDNKJP2s/go-libp2p-crypto" "gx/ipfs/QmNiJiXwWE3kRhZrC5ej3kSjWHm337pYfhjLGSCDNKJP2s/go-libp2p-crypto"
ci "gx/ipfs/QmNiJiXwWE3kRhZrC5ej3kSjWHm337pYfhjLGSCDNKJP2s/go-libp2p-crypto"
"gx/ipfs/QmY5Grm8pJdiSSVsYxx4uNRgweY72EmYwuSDbRnbFok3iY/go-libp2p-peer" "gx/ipfs/QmY5Grm8pJdiSSVsYxx4uNRgweY72EmYwuSDbRnbFok3iY/go-libp2p-peer"
ipath "gx/ipfs/QmZErC2Ay6WuGi96CPg316PwitdwgLo6RxZRqVjJjRj2MR/go-path" ipath "gx/ipfs/QmZErC2Ay6WuGi96CPg316PwitdwgLo6RxZRqVjJjRj2MR/go-path"
"gx/ipfs/QmdmWkx54g7VfVyxeG8ic84uf4G6Eq1GohuyKA3XDuJ8oC/go-ipfs-routing/offline" "gx/ipfs/QmdmWkx54g7VfVyxeG8ic84uf4G6Eq1GohuyKA3XDuJ8oC/go-ipfs-routing/offline"
@ -38,20 +38,17 @@ func (e *ipnsEntry) Value() coreiface.Path {
// Publish announces new IPNS name and returns the new IPNS entry. // Publish announces new IPNS name and returns the new IPNS entry.
func (api *NameAPI) Publish(ctx context.Context, p coreiface.Path, opts ...caopts.NamePublishOption) (coreiface.IpnsEntry, error) { func (api *NameAPI) Publish(ctx context.Context, p coreiface.Path, opts ...caopts.NamePublishOption) (coreiface.IpnsEntry, error) {
if err := api.isPublishAllowed(); err != nil {
return nil, err
}
options, err := caopts.NamePublishOptions(opts...) options, err := caopts.NamePublishOptions(opts...)
if err != nil { if err != nil {
return nil, err return nil, err
} }
n := api.node
if !n.OnlineMode() { if err := api.checkRouting(options.AllowOffline); err != nil {
if !options.AllowOffline { return nil, err
return nil, coreiface.ErrOffline
}
}
if n.Mounts.Ipns != nil && n.Mounts.Ipns.IsActive() {
return nil, errors.New("cannot manually publish while IPNS is mounted")
} }
pth, err := ipath.ParsePath(p.String()) pth, err := ipath.ParsePath(p.String())
@ -59,7 +56,7 @@ func (api *NameAPI) Publish(ctx context.Context, p coreiface.Path, opts ...caopt
return nil, err return nil, err
} }
k, err := keylookup(n, options.Key) k, err := keylookup(api.privateKey, api.repo.Keystore(), options.Key)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -69,7 +66,7 @@ func (api *NameAPI) Publish(ctx context.Context, p coreiface.Path, opts ...caopt
} }
eol := time.Now().Add(options.ValidTime) eol := time.Now().Add(options.ValidTime)
err = n.Namesys.PublishWithEOL(ctx, k, pth, eol) err = api.namesys.PublishWithEOL(ctx, k, pth, eol)
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -91,21 +88,23 @@ func (api *NameAPI) Search(ctx context.Context, name string, opts ...caopts.Name
return nil, err return nil, err
} }
n := api.node if err := api.checkRouting(true); err != nil {
return nil, err
}
var resolver namesys.Resolver = n.Namesys var resolver namesys.Resolver = api.namesys
if options.Local && !options.Cache { if options.Local && !options.Cache { //TODO: rm before offline/local global opt merge
return nil, errors.New("cannot specify both local and nocache") return nil, errors.New("cannot specify both local and nocache")
} }
if options.Local { if options.Local {
offroute := offline.NewOfflineRouter(n.Repo.Datastore(), n.RecordValidator) offroute := offline.NewOfflineRouter(api.repo.Datastore(), api.recordValidator)
resolver = namesys.NewIpnsResolver(offroute) resolver = namesys.NewIpnsResolver(offroute)
} }
if !options.Cache { if !options.Cache {
resolver = namesys.NewNameSystem(n.Routing, n.Repo.Datastore(), 0) resolver = namesys.NewNameSystem(api.routing, api.repo.Datastore(), 0)
} }
if !strings.HasPrefix(name, "/ipns/") { if !strings.HasPrefix(name, "/ipns/") {
@ -150,8 +149,12 @@ func (api *NameAPI) Resolve(ctx context.Context, name string, opts ...caopts.Nam
return p, err return p, err
} }
func keylookup(n *core.IpfsNode, k string) (crypto.PrivKey, error) { func keylookup(self ci.PrivKey, kstore keystore.Keystore, k string) (crypto.PrivKey, error) {
res, err := n.GetKey(k) if k == "self" {
return self, nil
}
res, err := kstore.Get(k)
if res != nil { if res != nil {
return res, nil return res, nil
} }
@ -160,13 +163,13 @@ func keylookup(n *core.IpfsNode, k string) (crypto.PrivKey, error) {
return nil, err return nil, err
} }
keys, err := n.Repo.Keystore().List() keys, err := kstore.List()
if err != nil { if err != nil {
return nil, err return nil, err
} }
for _, key := range keys { for _, key := range keys {
privKey, err := n.Repo.Keystore().Get(key) privKey, err := kstore.Get(key)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -118,7 +118,7 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj
} }
if options.Pin { if options.Pin {
defer api.node.Blockstore.PinLock().Unlock() defer api.blockstore.PinLock().Unlock()
} }
err = api.dag.Add(ctx, dagnode) err = api.dag.Add(ctx, dagnode)
@ -127,8 +127,8 @@ func (api *ObjectAPI) Put(ctx context.Context, src io.Reader, opts ...caopts.Obj
} }
if options.Pin { if options.Pin {
api.node.Pinning.PinWithMode(dagnode.Cid(), pin.Recursive) api.pinning.PinWithMode(dagnode.Cid(), pin.Recursive)
err = api.node.Pinning.Flush() err = api.pinning.Flush()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -38,7 +38,7 @@ func (api *CoreAPI) ResolvePath(ctx context.Context, p coreiface.Path) (coreifac
} }
ipath := ipfspath.Path(p.String()) ipath := ipfspath.Path(p.String())
ipath, err := core.ResolveIPNS(ctx, api.node.Namesys, ipath) ipath, err := core.ResolveIPNS(ctx, api.namesys, ipath)
if err == core.ErrNoNamesys { if err == core.ErrNoNamesys {
return nil, coreiface.ErrOffline return nil, coreiface.ErrOffline
} else if err != nil { } else if err != nil {

View File

@ -27,14 +27,14 @@ func (api *PinAPI) Add(ctx context.Context, p coreiface.Path, opts ...caopts.Pin
return err return err
} }
defer api.node.Blockstore.PinLock().Unlock() defer api.blockstore.PinLock().Unlock()
_, err = corerepo.Pin(api.node, api.core(), ctx, []string{rp.Cid().String()}, settings.Recursive) _, err = corerepo.Pin(api.pinning, api.core(), ctx, []string{rp.Cid().String()}, settings.Recursive)
if err != nil { if err != nil {
return err return err
} }
return api.node.Pinning.Flush() return api.pinning.Flush()
} }
func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) { func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreiface.Pin, error) {
@ -53,12 +53,12 @@ func (api *PinAPI) Ls(ctx context.Context, opts ...caopts.PinLsOption) ([]coreif
} }
func (api *PinAPI) Rm(ctx context.Context, p coreiface.Path) error { func (api *PinAPI) Rm(ctx context.Context, p coreiface.Path) error {
_, err := corerepo.Unpin(api.node, api.core(), ctx, []string{p.String()}, true) _, err := corerepo.Unpin(api.pinning, api.core(), ctx, []string{p.String()}, true)
if err != nil { if err != nil {
return err return err
} }
return api.node.Pinning.Flush() return api.pinning.Flush()
} }
func (api *PinAPI) Update(ctx context.Context, from coreiface.Path, to coreiface.Path, opts ...caopts.PinUpdateOption) error { func (api *PinAPI) Update(ctx context.Context, from coreiface.Path, to coreiface.Path, opts ...caopts.PinUpdateOption) error {
@ -77,14 +77,14 @@ func (api *PinAPI) Update(ctx context.Context, from coreiface.Path, to coreiface
return err return err
} }
defer api.node.Blockstore.PinLock().Unlock() defer api.blockstore.PinLock().Unlock()
err = api.node.Pinning.Update(ctx, fp.Cid(), tp.Cid(), settings.Unpin) err = api.pinning.Update(ctx, fp.Cid(), tp.Cid(), settings.Unpin)
if err != nil { if err != nil {
return err return err
} }
return api.node.Pinning.Flush() return api.pinning.Flush()
} }
type pinStatus struct { type pinStatus struct {
@ -117,10 +117,10 @@ func (n *badNode) Err() error {
func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) { func (api *PinAPI) Verify(ctx context.Context) (<-chan coreiface.PinStatus, error) {
visited := make(map[cid.Cid]*pinStatus) visited := make(map[cid.Cid]*pinStatus)
bs := api.node.Blocks.Blockstore() bs := api.blockstore
DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs))) DAG := merkledag.NewDAGService(bserv.New(bs, offline.Exchange(bs)))
getLinks := merkledag.GetLinksWithDAG(DAG) getLinks := merkledag.GetLinksWithDAG(DAG)
recPins := api.node.Pinning.RecursiveKeys() recPins := api.pinning.RecursiveKeys()
var checkPin func(root cid.Cid) *pinStatus var checkPin func(root cid.Cid) *pinStatus
checkPin = func(root cid.Cid) *pinStatus { checkPin = func(root cid.Cid) *pinStatus {
@ -187,11 +187,11 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pi
} }
if typeStr == "direct" || typeStr == "all" { if typeStr == "direct" || typeStr == "all" {
AddToResultKeys(api.node.Pinning.DirectKeys(), "direct") AddToResultKeys(api.pinning.DirectKeys(), "direct")
} }
if typeStr == "indirect" || typeStr == "all" { if typeStr == "indirect" || typeStr == "all" {
set := cid.NewSet() set := cid.NewSet()
for _, k := range api.node.Pinning.RecursiveKeys() { for _, k := range api.pinning.RecursiveKeys() {
err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(api.dag), k, set.Visit) err := merkledag.EnumerateChildren(ctx, merkledag.GetLinksWithDAG(api.dag), k, set.Visit)
if err != nil { if err != nil {
return nil, err return nil, err
@ -200,7 +200,7 @@ func (api *PinAPI) pinLsAll(typeStr string, ctx context.Context) ([]coreiface.Pi
AddToResultKeys(set.Keys(), "indirect") AddToResultKeys(set.Keys(), "indirect")
} }
if typeStr == "recursive" || typeStr == "all" { if typeStr == "recursive" || typeStr == "all" {
AddToResultKeys(api.node.Pinning.RecursiveKeys(), "recursive") AddToResultKeys(api.pinning.RecursiveKeys(), "recursive")
} }
out := make([]coreiface.Pin, 0, len(keys)) out := make([]coreiface.Pin, 0, len(keys))

View File

@ -7,14 +7,15 @@ import (
"sync" "sync"
"time" "time"
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options" caopts "github.com/ipfs/go-ipfs/core/coreapi/interface/options"
cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" cid "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
routing "gx/ipfs/QmRASJXJUFygM5qU4YrH7k7jD6S4Hg8nJmgqJ4bYJvLatd/go-libp2p-routing"
peer "gx/ipfs/QmY5Grm8pJdiSSVsYxx4uNRgweY72EmYwuSDbRnbFok3iY/go-libp2p-peer" peer "gx/ipfs/QmY5Grm8pJdiSSVsYxx4uNRgweY72EmYwuSDbRnbFok3iY/go-libp2p-peer"
pstore "gx/ipfs/QmZ9zH2FnLcxv1xyzFeUpDUeo55xEhZQHgveZijcxr7TLj/go-libp2p-peerstore" pstore "gx/ipfs/QmZ9zH2FnLcxv1xyzFeUpDUeo55xEhZQHgveZijcxr7TLj/go-libp2p-peerstore"
pubsub "gx/ipfs/QmaqGyUhWLsJbVo1QAujSu13mxNjFJ98Kt2VWGSnShGE1Q/go-libp2p-pubsub" pubsub "gx/ipfs/QmaqGyUhWLsJbVo1QAujSu13mxNjFJ98Kt2VWGSnShGE1Q/go-libp2p-pubsub"
p2phost "gx/ipfs/QmfD51tKgJiTMnW9JEiDiPwsCY4mqUoxkhKhBfyW12spTC/go-libp2p-host"
) )
type PubSubAPI CoreAPI type PubSubAPI CoreAPI
@ -33,7 +34,7 @@ func (api *PubSubAPI) Ls(ctx context.Context) ([]string, error) {
return nil, err return nil, err
} }
return api.node.PubSub.GetTopics(), nil return api.pubSub.GetTopics(), nil
} }
func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) { func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOption) ([]peer.ID, error) {
@ -46,7 +47,7 @@ func (api *PubSubAPI) Peers(ctx context.Context, opts ...caopts.PubSubPeersOptio
return nil, err return nil, err
} }
peers := api.node.PubSub.ListPeers(settings.Topic) peers := api.pubSub.ListPeers(settings.Topic)
out := make([]peer.ID, len(peers)) out := make([]peer.ID, len(peers))
for i, peer := range peers { for i, peer := range peers {
@ -61,7 +62,7 @@ func (api *PubSubAPI) Publish(ctx context.Context, topic string, data []byte) er
return err return err
} }
return api.node.PubSub.Publish(topic, data) return api.pubSub.Publish(topic, data)
} }
func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) { func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopts.PubSubSubscribeOption) (coreiface.PubSubSubscription, error) {
@ -71,12 +72,12 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
return nil, err return nil, err
} }
sub, err := api.node.PubSub.Subscribe(topic) sub, err := api.pubSub.Subscribe(topic)
if err != nil { if err != nil {
return nil, err return nil, err
} }
pubctx, cancel := context.WithCancel(api.node.Context()) pubctx, cancel := context.WithCancel(api.nctx)
if options.Discover { if options.Discover {
go func() { go func() {
@ -86,18 +87,18 @@ func (api *PubSubAPI) Subscribe(ctx context.Context, topic string, opts ...caopt
return return
} }
connectToPubSubPeers(pubctx, api.node, blk.Path().Cid()) connectToPubSubPeers(pubctx, api.routing, api.peerHost, blk.Path().Cid())
}() }()
} }
return &pubSubSubscription{cancel, sub}, nil return &pubSubSubscription{cancel, sub}, nil
} }
func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) { func connectToPubSubPeers(ctx context.Context, r routing.IpfsRouting, ph p2phost.Host, cid cid.Cid) {
ctx, cancel := context.WithCancel(ctx) ctx, cancel := context.WithCancel(ctx)
defer cancel() defer cancel()
provs := n.Routing.FindProvidersAsync(ctx, cid, 10) provs := r.FindProvidersAsync(ctx, cid, 10)
var wg sync.WaitGroup var wg sync.WaitGroup
for p := range provs { for p := range provs {
wg.Add(1) wg.Add(1)
@ -105,7 +106,7 @@ func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
defer wg.Done() defer wg.Done()
ctx, cancel := context.WithTimeout(ctx, time.Second*10) ctx, cancel := context.WithTimeout(ctx, time.Second*10)
defer cancel() defer cancel()
err := n.PeerHost.Connect(ctx, pi) err := ph.Connect(ctx, pi)
if err != nil { if err != nil {
log.Info("pubsub discover: ", err) log.Info("pubsub discover: ", err)
return return
@ -118,11 +119,11 @@ func connectToPubSubPeers(ctx context.Context, n *core.IpfsNode, cid cid.Cid) {
} }
func (api *PubSubAPI) checkNode() error { func (api *PubSubAPI) checkNode() error {
if !api.node.OnlineMode() { if err := api.checkRouting(false); err != nil {
return coreiface.ErrOffline return err
} }
if api.node.PubSub == nil { if api.pubSub == nil {
return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.") return errors.New("experimental pubsub feature not enabled. Run daemon with --enable-pubsub-experiment to use.")
} }

View File

@ -5,7 +5,6 @@ import (
"sort" "sort"
"time" "time"
core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
inet "gx/ipfs/QmPtFaR7BWHLAjSwLh9kXcyrgTzDpuhcWLkx8ioa9RMYnx/go-libp2p-net" inet "gx/ipfs/QmPtFaR7BWHLAjSwLh9kXcyrgTzDpuhcWLkx8ioa9RMYnx/go-libp2p-net"
@ -21,9 +20,9 @@ import (
type SwarmAPI CoreAPI type SwarmAPI CoreAPI
type connInfo struct { type connInfo struct {
node *core.IpfsNode peerstore pstore.Peerstore
conn net.Conn conn net.Conn
dir net.Direction dir net.Direction
addr ma.Multiaddr addr ma.Multiaddr
peer peer.ID peer peer.ID
@ -31,19 +30,19 @@ type connInfo struct {
} }
func (api *SwarmAPI) Connect(ctx context.Context, pi pstore.PeerInfo) error { func (api *SwarmAPI) Connect(ctx context.Context, pi pstore.PeerInfo) error {
if api.node.PeerHost == nil { if api.peerHost == nil {
return coreiface.ErrOffline return coreiface.ErrOffline
} }
if swrm, ok := api.node.PeerHost.Network().(*swarm.Swarm); ok { if swrm, ok := api.peerHost.Network().(*swarm.Swarm); ok {
swrm.Backoff().Clear(pi.ID) swrm.Backoff().Clear(pi.ID)
} }
return api.node.PeerHost.Connect(ctx, pi) return api.peerHost.Connect(ctx, pi)
} }
func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error { func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error {
if api.node.PeerHost == nil { if api.peerHost == nil {
return coreiface.ErrOffline return coreiface.ErrOffline
} }
@ -54,7 +53,7 @@ func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error {
taddr := ia.Transport() taddr := ia.Transport()
id := ia.ID() id := ia.ID()
net := api.node.PeerHost.Network() net := api.peerHost.Network()
if taddr == nil { if taddr == nil {
if net.Connectedness(id) != inet.Connected { if net.Connectedness(id) != inet.Connected {
@ -78,12 +77,12 @@ func (api *SwarmAPI) Disconnect(ctx context.Context, addr ma.Multiaddr) error {
} }
func (api *SwarmAPI) KnownAddrs(context.Context) (map[peer.ID][]ma.Multiaddr, error) { func (api *SwarmAPI) KnownAddrs(context.Context) (map[peer.ID][]ma.Multiaddr, error) {
if api.node.PeerHost == nil { if api.peerHost == nil {
return nil, coreiface.ErrOffline return nil, coreiface.ErrOffline
} }
addrs := make(map[peer.ID][]ma.Multiaddr) addrs := make(map[peer.ID][]ma.Multiaddr)
ps := api.node.PeerHost.Network().Peerstore() ps := api.peerHost.Network().Peerstore()
for _, p := range ps.Peers() { for _, p := range ps.Peers() {
for _, a := range ps.Addrs(p) { for _, a := range ps.Addrs(p) {
addrs[p] = append(addrs[p], a) addrs[p] = append(addrs[p], a)
@ -97,27 +96,27 @@ func (api *SwarmAPI) KnownAddrs(context.Context) (map[peer.ID][]ma.Multiaddr, er
} }
func (api *SwarmAPI) LocalAddrs(context.Context) ([]ma.Multiaddr, error) { func (api *SwarmAPI) LocalAddrs(context.Context) ([]ma.Multiaddr, error) {
if api.node.PeerHost == nil { if api.peerHost == nil {
return nil, coreiface.ErrOffline return nil, coreiface.ErrOffline
} }
return api.node.PeerHost.Addrs(), nil return api.peerHost.Addrs(), nil
} }
func (api *SwarmAPI) ListenAddrs(context.Context) ([]ma.Multiaddr, error) { func (api *SwarmAPI) ListenAddrs(context.Context) ([]ma.Multiaddr, error) {
if api.node.PeerHost == nil { if api.peerHost == nil {
return nil, coreiface.ErrOffline return nil, coreiface.ErrOffline
} }
return api.node.PeerHost.Network().InterfaceListenAddresses() return api.peerHost.Network().InterfaceListenAddresses()
} }
func (api *SwarmAPI) Peers(context.Context) ([]coreiface.ConnectionInfo, error) { func (api *SwarmAPI) Peers(context.Context) ([]coreiface.ConnectionInfo, error) {
if api.node.PeerHost == nil { if api.peerHost == nil {
return nil, coreiface.ErrOffline return nil, coreiface.ErrOffline
} }
conns := api.node.PeerHost.Network().Conns() conns := api.peerHost.Network().Conns()
var out []coreiface.ConnectionInfo var out []coreiface.ConnectionInfo
for _, c := range conns { for _, c := range conns {
@ -125,9 +124,9 @@ func (api *SwarmAPI) Peers(context.Context) ([]coreiface.ConnectionInfo, error)
addr := c.RemoteMultiaddr() addr := c.RemoteMultiaddr()
ci := &connInfo{ ci := &connInfo{
node: api.node, peerstore: api.peerstore,
conn: c, conn: c,
dir: c.Stat().Direction, dir: c.Stat().Direction,
addr: addr, addr: addr,
peer: pid, peer: pid,
@ -160,7 +159,7 @@ func (ci *connInfo) Direction() net.Direction {
} }
func (ci *connInfo) Latency() (time.Duration, error) { func (ci *connInfo) Latency() (time.Duration, error) {
return ci.node.Peerstore.LatencyEWMA(peer.ID(ci.ID())), nil return ci.peerstore.LatencyEWMA(peer.ID(ci.ID())), nil
} }
func (ci *connInfo) Streams() ([]protocol.ID, error) { func (ci *connInfo) Streams() ([]protocol.ID, error) {

View File

@ -34,9 +34,7 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
return nil, err return nil, err
} }
n := api.node cfg, err := api.repo.Config()
cfg, err := n.Repo.Config()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -53,6 +51,13 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
return nil, filestore.ErrFilestoreNotEnabled return nil, filestore.ErrFilestoreNotEnabled
} }
addblockstore := api.blockstore
if !(settings.FsCache || settings.NoCopy) {
addblockstore = bstore.NewGCBlockstore(api.baseBlocks, api.blockstore)
}
exch := api.exchange
pinning := api.pinning
if settings.OnlyHash { if settings.OnlyHash {
nilnode, err := core.NewNode(ctx, &core.BuildCfg{ nilnode, err := core.NewNode(ctx, &core.BuildCfg{
//TODO: need this to be true or all files //TODO: need this to be true or all files
@ -62,15 +67,11 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
if err != nil { if err != nil {
return nil, err return nil, err
} }
n = nilnode addblockstore = nilnode.Blockstore
exch = nilnode.Exchange
pinning = nilnode.Pinning
} }
addblockstore := n.Blockstore
if !(settings.FsCache || settings.NoCopy) {
addblockstore = bstore.NewGCBlockstore(n.BaseBlocks, n.GCLocker)
}
exch := n.Exchange
if settings.Local { if settings.Local {
exch = offline.Exchange(addblockstore) exch = offline.Exchange(addblockstore)
} }
@ -78,7 +79,7 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.Node, opts ...options
bserv := blockservice.New(addblockstore, exch) // hash security 001 bserv := blockservice.New(addblockstore, exch) // hash security 001
dserv := dag.NewDAGService(bserv) dserv := dag.NewDAGService(bserv)
fileAdder, err := coreunix.NewAdder(ctx, n.Pinning, n.Blockstore, dserv) fileAdder, err := coreunix.NewAdder(ctx, pinning, addblockstore, dserv)
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -16,14 +16,14 @@ package corerepo
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/ipfs/go-ipfs/pin"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/core/coreapi/interface" "github.com/ipfs/go-ipfs/core/coreapi/interface"
"gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid" "gx/ipfs/QmR8BauakNcBa3RbE4nbQu76PDiJgoQgz8AJdhJuiU4TAw/go-cid"
) )
func Pin(n *core.IpfsNode, api iface.CoreAPI, ctx context.Context, paths []string, recursive bool) ([]cid.Cid, error) { func Pin(pinning pin.Pinner, api iface.CoreAPI, ctx context.Context, paths []string, recursive bool) ([]cid.Cid, error) {
out := make([]cid.Cid, len(paths)) out := make([]cid.Cid, len(paths))
for i, fpath := range paths { for i, fpath := range paths {
@ -36,14 +36,14 @@ func Pin(n *core.IpfsNode, api iface.CoreAPI, ctx context.Context, paths []strin
if err != nil { if err != nil {
return nil, fmt.Errorf("pin: %s", err) return nil, fmt.Errorf("pin: %s", err)
} }
err = n.Pinning.Pin(ctx, dagnode, recursive) err = pinning.Pin(ctx, dagnode, recursive)
if err != nil { if err != nil {
return nil, fmt.Errorf("pin: %s", err) return nil, fmt.Errorf("pin: %s", err)
} }
out[i] = dagnode.Cid() out[i] = dagnode.Cid()
} }
err := n.Pinning.Flush() err := pinning.Flush()
if err != nil { if err != nil {
return nil, err return nil, err
} }
@ -51,7 +51,7 @@ func Pin(n *core.IpfsNode, api iface.CoreAPI, ctx context.Context, paths []strin
return out, nil return out, nil
} }
func Unpin(n *core.IpfsNode, api iface.CoreAPI, ctx context.Context, paths []string, recursive bool) ([]cid.Cid, error) { func Unpin(pinning pin.Pinner, api iface.CoreAPI, ctx context.Context, paths []string, recursive bool) ([]cid.Cid, error) {
unpinned := make([]cid.Cid, len(paths)) unpinned := make([]cid.Cid, len(paths))
for i, p := range paths { for i, p := range paths {
@ -65,14 +65,14 @@ func Unpin(n *core.IpfsNode, api iface.CoreAPI, ctx context.Context, paths []str
return nil, err return nil, err
} }
err = n.Pinning.Unpin(ctx, k.Cid(), recursive) err = pinning.Unpin(ctx, k.Cid(), recursive)
if err != nil { if err != nil {
return nil, err return nil, err
} }
unpinned[i] = k.Cid() unpinned[i] = k.Cid()
} }
err := n.Pinning.Flush() err := pinning.Flush()
if err != nil { if err != nil {
return nil, err return nil, err
} }

View File

@ -48,13 +48,13 @@ type Object struct {
} }
// NewAdder Returns a new Adder used for a file add operation. // NewAdder Returns a new Adder used for a file add operation.
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds ipld.DAGService) (*Adder, error) { func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCLocker, ds ipld.DAGService) (*Adder, error) {
bufferedDS := ipld.NewBufferedDAG(ctx, ds) bufferedDS := ipld.NewBufferedDAG(ctx, ds)
return &Adder{ return &Adder{
ctx: ctx, ctx: ctx,
pinning: p, pinning: p,
blockstore: bs, gcLocker: bs,
dagService: ds, dagService: ds,
bufferedDS: bufferedDS, bufferedDS: bufferedDS,
Progress: false, Progress: false,
@ -70,7 +70,7 @@ func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds ipld
type Adder struct { type Adder struct {
ctx context.Context ctx context.Context
pinning pin.Pinner pinning pin.Pinner
blockstore bstore.GCBlockstore gcLocker bstore.GCLocker
dagService ipld.DAGService dagService ipld.DAGService
bufferedDS *ipld.BufferedDAG bufferedDS *ipld.BufferedDAG
Out chan<- interface{} Out chan<- interface{}
@ -401,7 +401,7 @@ func (adder *Adder) addNode(node ipld.Node, path string) error {
// AddAllAndPin adds the given request's files and pin them. // AddAllAndPin adds the given request's files and pin them.
func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) { func (adder *Adder) AddAllAndPin(file files.Node) (ipld.Node, error) {
if adder.Pin { if adder.Pin {
adder.unlocker = adder.blockstore.PinLock() adder.unlocker = adder.gcLocker.PinLock()
} }
defer func() { defer func() {
if adder.unlocker != nil { if adder.unlocker != nil {
@ -556,14 +556,14 @@ func (adder *Adder) addDir(path string, dir files.Directory) error {
} }
func (adder *Adder) maybePauseForGC() error { func (adder *Adder) maybePauseForGC() error {
if adder.unlocker != nil && adder.blockstore.GCRequested() { if adder.unlocker != nil && adder.gcLocker.GCRequested() {
err := adder.PinRoot() err := adder.PinRoot()
if err != nil { if err != nil {
return err return err
} }
adder.unlocker.Unlock() adder.unlocker.Unlock()
adder.unlocker = adder.blockstore.PinLock() adder.unlocker = adder.gcLocker.PinLock()
} }
return nil return nil
} }