mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-25 23:21:54 +08:00
Merge pull request #1572 from ipfs/node-construct-v2
replace nodebuilder with a nicer interface
This commit is contained in:
@ -192,9 +192,11 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
|
||||
return
|
||||
}
|
||||
|
||||
// Start assembling corebuilder
|
||||
nb := core.NewNodeBuilder().Online()
|
||||
nb.SetRepo(repo)
|
||||
// Start assembling node config
|
||||
ncfg := &core.BuildCfg{
|
||||
Online: true,
|
||||
Repo: repo,
|
||||
}
|
||||
|
||||
routingOption, _, err := req.Option(routingOptionKwd).String()
|
||||
if err != nil {
|
||||
@ -215,10 +217,11 @@ func daemonFunc(req cmds.Request, res cmds.Response) {
|
||||
Addrs: []ma.Multiaddr{addr.Transport()},
|
||||
})
|
||||
}
|
||||
nb.SetRouting(corerouting.SupernodeClient(infos...))
|
||||
|
||||
ncfg.Routing = corerouting.SupernodeClient(infos...)
|
||||
}
|
||||
|
||||
node, err := nb.Build(req.Context())
|
||||
node, err := core.NewNode(req.Context(), ncfg)
|
||||
if err != nil {
|
||||
log.Error("error from node construction: ", err)
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
|
@ -157,7 +157,7 @@ func addDefaultAssets(out io.Writer, repoRoot string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
nd, err := core.NewIPFSNode(ctx, core.Offline(r))
|
||||
nd, err := core.NewNode(ctx, &core.BuildCfg{Repo: r})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -191,7 +191,7 @@ func initializeIpnsKeyspace(repoRoot string) error {
|
||||
return err
|
||||
}
|
||||
|
||||
nd, err := core.NewIPFSNode(ctx, core.Offline(r))
|
||||
nd, err := core.NewNode(ctx, &core.BuildCfg{Repo: r})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
@ -204,7 +204,10 @@ func (i *cmdInvocation) constructNodeFunc(ctx context.Context) func() (*core.Ipf
|
||||
|
||||
// ok everything is good. set it on the invocation (for ownership)
|
||||
// and return it.
|
||||
n, err := core.NewIPFSNode(ctx, core.Standard(r, cmdctx.Online))
|
||||
n, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: cmdctx.Online,
|
||||
Repo: r,
|
||||
})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -71,7 +71,11 @@ func run(ipfsPath, watchPath string) error {
|
||||
// TODO handle case: repo doesn't exist or isn't initialized
|
||||
return err
|
||||
}
|
||||
node, err := core.NewIPFSNode(context.Background(), core.Online(r))
|
||||
|
||||
node, err := core.NewNode(context.Background(), &core.BuildCfg{
|
||||
Online: true,
|
||||
Repo: r,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
167
core/builder.go
167
core/builder.go
@ -7,31 +7,60 @@ import (
|
||||
|
||||
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
dsync "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
bstore "github.com/ipfs/go-ipfs/blocks/blockstore"
|
||||
key "github.com/ipfs/go-ipfs/blocks/key"
|
||||
bserv "github.com/ipfs/go-ipfs/blockservice"
|
||||
offline "github.com/ipfs/go-ipfs/exchange/offline"
|
||||
dag "github.com/ipfs/go-ipfs/merkledag"
|
||||
ci "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
path "github.com/ipfs/go-ipfs/path"
|
||||
pin "github.com/ipfs/go-ipfs/pin"
|
||||
repo "github.com/ipfs/go-ipfs/repo"
|
||||
cfg "github.com/ipfs/go-ipfs/repo/config"
|
||||
)
|
||||
|
||||
var ErrAlreadyBuilt = errors.New("this builder has already been used")
|
||||
type BuildCfg struct {
|
||||
// If online is set, the node will have networking enabled
|
||||
Online bool
|
||||
|
||||
// NodeBuilder is an object used to generate an IpfsNode
|
||||
type NodeBuilder struct {
|
||||
online bool
|
||||
routing RoutingOption
|
||||
peerhost HostOption
|
||||
repo repo.Repo
|
||||
built bool
|
||||
nilrepo bool
|
||||
// If NilRepo is set, a repo backed by a nil datastore will be constructed
|
||||
NilRepo bool
|
||||
|
||||
Routing RoutingOption
|
||||
Host HostOption
|
||||
Repo repo.Repo
|
||||
}
|
||||
|
||||
func NewNodeBuilder() *NodeBuilder {
|
||||
return &NodeBuilder{
|
||||
online: false,
|
||||
routing: DHTOption,
|
||||
peerhost: DefaultHostOption,
|
||||
func (cfg *BuildCfg) fillDefaults() error {
|
||||
if cfg.Repo != nil && cfg.NilRepo {
|
||||
return errors.New("cannot set a repo and specify nilrepo at the same time")
|
||||
}
|
||||
|
||||
if cfg.Repo == nil {
|
||||
var d ds.Datastore
|
||||
d = ds.NewMapDatastore()
|
||||
if cfg.NilRepo {
|
||||
d = ds.NewNullDatastore()
|
||||
}
|
||||
r, err := defaultRepo(dsync.MutexWrap(d))
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
cfg.Repo = r
|
||||
}
|
||||
|
||||
if cfg.Routing == nil {
|
||||
cfg.Routing = DHTOption
|
||||
}
|
||||
|
||||
if cfg.Host == nil {
|
||||
cfg.Host = DefaultHostOption
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func defaultRepo(dstore ds.ThreadSafeDatastore) (repo.Repo, error) {
|
||||
@ -62,53 +91,69 @@ func defaultRepo(dstore ds.ThreadSafeDatastore) (repo.Repo, error) {
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (nb *NodeBuilder) Online() *NodeBuilder {
|
||||
nb.online = true
|
||||
return nb
|
||||
}
|
||||
|
||||
func (nb *NodeBuilder) Offline() *NodeBuilder {
|
||||
nb.online = false
|
||||
return nb
|
||||
}
|
||||
|
||||
func (nb *NodeBuilder) SetRouting(ro RoutingOption) *NodeBuilder {
|
||||
nb.routing = ro
|
||||
return nb
|
||||
}
|
||||
|
||||
func (nb *NodeBuilder) SetHost(ho HostOption) *NodeBuilder {
|
||||
nb.peerhost = ho
|
||||
return nb
|
||||
}
|
||||
|
||||
func (nb *NodeBuilder) SetRepo(r repo.Repo) *NodeBuilder {
|
||||
nb.repo = r
|
||||
return nb
|
||||
}
|
||||
|
||||
func (nb *NodeBuilder) NilRepo() *NodeBuilder {
|
||||
nb.nilrepo = true
|
||||
return nb
|
||||
}
|
||||
|
||||
func (nb *NodeBuilder) Build(ctx context.Context) (*IpfsNode, error) {
|
||||
if nb.built {
|
||||
return nil, ErrAlreadyBuilt
|
||||
func NewNode(ctx context.Context, cfg *BuildCfg) (*IpfsNode, error) {
|
||||
if cfg == nil {
|
||||
cfg = new(BuildCfg)
|
||||
}
|
||||
nb.built = true
|
||||
if nb.repo == nil {
|
||||
var d ds.Datastore
|
||||
d = ds.NewMapDatastore()
|
||||
if nb.nilrepo {
|
||||
d = ds.NewNullDatastore()
|
||||
}
|
||||
r, err := defaultRepo(dsync.MutexWrap(d))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
nb.repo = r
|
||||
|
||||
err := cfg.fillDefaults()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
conf := standardWithRouting(nb.repo, nb.online, nb.routing, nb.peerhost)
|
||||
return NewIPFSNode(ctx, conf)
|
||||
|
||||
n := &IpfsNode{
|
||||
mode: offlineMode,
|
||||
Repo: cfg.Repo,
|
||||
ctx: ctx,
|
||||
Peerstore: peer.NewPeerstore(),
|
||||
}
|
||||
if cfg.Online {
|
||||
n.mode = onlineMode
|
||||
}
|
||||
|
||||
// TODO: this is a weird circular-ish dependency, rework it
|
||||
n.proc = goprocessctx.WithContextAndTeardown(ctx, n.teardown)
|
||||
|
||||
if err := setupNode(ctx, n, cfg); err != nil {
|
||||
n.Close()
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return n, nil
|
||||
}
|
||||
|
||||
func setupNode(ctx context.Context, n *IpfsNode, cfg *BuildCfg) error {
|
||||
// setup local peer ID (private key is loaded in online setup)
|
||||
if err := n.loadID(); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
var err error
|
||||
n.Blockstore, err = bstore.WriteCached(bstore.NewBlockstore(n.Repo.Datastore()), kSizeBlockstoreWriteCache)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if cfg.Online {
|
||||
do := setupDiscoveryOption(n.Repo.Config().Discovery)
|
||||
if err := n.startOnlineServices(ctx, cfg.Routing, cfg.Host, do); err != nil {
|
||||
return err
|
||||
}
|
||||
} else {
|
||||
n.Exchange = offline.Exchange(n.Blockstore)
|
||||
}
|
||||
|
||||
n.Blocks = bserv.New(n.Blockstore, n.Exchange)
|
||||
n.DAG = dag.NewDAGService(n.Blocks)
|
||||
n.Pinning, err = pin.LoadPinner(n.Repo.Datastore(), n.DAG)
|
||||
if err != nil {
|
||||
// TODO: we should move towards only running 'NewPinner' explicity on
|
||||
// node init instead of implicitly here as a result of the pinner keys
|
||||
// not being found in the datastore.
|
||||
// this is kinda sketchy and could cause data loss
|
||||
n.Pinning = pin.NewPinner(n.Repo.Datastore(), n.DAG)
|
||||
}
|
||||
n.Resolver = &path.Resolver{DAG: n.DAG}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
@ -104,7 +104,11 @@ remains to be implemented.
|
||||
chunker, _, _ := req.Option(chunkerOptionName).String()
|
||||
|
||||
if hash {
|
||||
nilnode, err := core.NewNodeBuilder().Build(n.Context())
|
||||
nilnode, err := core.NewNode(n.Context(), &core.BuildCfg{
|
||||
//TODO: need this to be true or all files
|
||||
// hashed will be stored in memory!
|
||||
NilRepo: false,
|
||||
})
|
||||
if err != nil {
|
||||
res.SetError(err, cmds.ErrNormal)
|
||||
return
|
||||
|
125
core/core.go
125
core/core.go
@ -20,7 +20,6 @@ import (
|
||||
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
ma "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
|
||||
goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
goprocessctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context"
|
||||
mamask "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/whyrusleeping/multiaddr-filter"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
diag "github.com/ipfs/go-ipfs/diagnostics"
|
||||
@ -46,7 +45,6 @@ import (
|
||||
exchange "github.com/ipfs/go-ipfs/exchange"
|
||||
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
|
||||
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
|
||||
offline "github.com/ipfs/go-ipfs/exchange/offline"
|
||||
rp "github.com/ipfs/go-ipfs/exchange/reprovide"
|
||||
|
||||
mount "github.com/ipfs/go-ipfs/fuse/mount"
|
||||
@ -123,124 +121,6 @@ type Mounts struct {
|
||||
Ipns mount.Mount
|
||||
}
|
||||
|
||||
type ConfigOption func(ctx context.Context) (*IpfsNode, error)
|
||||
|
||||
func NewIPFSNode(ctx context.Context, option ConfigOption) (*IpfsNode, error) {
|
||||
node, err := option(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if node.ctx == nil {
|
||||
node.ctx = ctx
|
||||
}
|
||||
if node.proc == nil {
|
||||
node.proc = goprocessctx.WithContextAndTeardown(node.ctx, node.teardown)
|
||||
}
|
||||
|
||||
success := false // flip to true after all sub-system inits succeed
|
||||
defer func() {
|
||||
if !success {
|
||||
node.proc.Close()
|
||||
}
|
||||
}()
|
||||
|
||||
// Need to make sure it's perfectly clear 1) which variables are expected
|
||||
// to be initialized at this point, and 2) which variables will be
|
||||
// initialized after this point.
|
||||
|
||||
node.Blocks = bserv.New(node.Blockstore, node.Exchange)
|
||||
|
||||
if node.Peerstore == nil {
|
||||
node.Peerstore = peer.NewPeerstore()
|
||||
}
|
||||
node.DAG = merkledag.NewDAGService(node.Blocks)
|
||||
node.Pinning, err = pin.LoadPinner(node.Repo.Datastore(), node.DAG)
|
||||
if err != nil {
|
||||
node.Pinning = pin.NewPinner(node.Repo.Datastore(), node.DAG)
|
||||
}
|
||||
node.Resolver = &path.Resolver{DAG: node.DAG}
|
||||
|
||||
success = true
|
||||
return node, nil
|
||||
}
|
||||
|
||||
func Offline(r repo.Repo) ConfigOption {
|
||||
return Standard(r, false)
|
||||
}
|
||||
|
||||
func OnlineWithOptions(r repo.Repo, router RoutingOption, ho HostOption) ConfigOption {
|
||||
return standardWithRouting(r, true, router, ho)
|
||||
}
|
||||
|
||||
func Online(r repo.Repo) ConfigOption {
|
||||
return Standard(r, true)
|
||||
}
|
||||
|
||||
// DEPRECATED: use Online, Offline functions
|
||||
func Standard(r repo.Repo, online bool) ConfigOption {
|
||||
return standardWithRouting(r, online, DHTOption, DefaultHostOption)
|
||||
}
|
||||
|
||||
// TODO refactor so maybeRouter isn't special-cased in this way
|
||||
func standardWithRouting(r repo.Repo, online bool, routingOption RoutingOption, hostOption HostOption) ConfigOption {
|
||||
return func(ctx context.Context) (n *IpfsNode, err error) {
|
||||
// FIXME perform node construction in the main constructor so it isn't
|
||||
// necessary to perform this teardown in this scope.
|
||||
success := false
|
||||
defer func() {
|
||||
if !success && n != nil {
|
||||
n.teardown()
|
||||
}
|
||||
}()
|
||||
|
||||
// TODO move as much of node initialization as possible into
|
||||
// NewIPFSNode. The larger these config options are, the harder it is
|
||||
// to test all node construction code paths.
|
||||
|
||||
if r == nil {
|
||||
return nil, fmt.Errorf("repo required")
|
||||
}
|
||||
n = &IpfsNode{
|
||||
mode: func() mode {
|
||||
if online {
|
||||
return onlineMode
|
||||
}
|
||||
return offlineMode
|
||||
}(),
|
||||
Repo: r,
|
||||
}
|
||||
|
||||
n.ctx = ctx
|
||||
n.proc = goprocessctx.WithContextAndTeardown(ctx, n.teardown)
|
||||
|
||||
// setup Peerstore
|
||||
n.Peerstore = peer.NewPeerstore()
|
||||
|
||||
// setup local peer ID (private key is loaded in online setup)
|
||||
if err := n.loadID(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
n.Blockstore, err = bstore.WriteCached(bstore.NewBlockstore(n.Repo.Datastore()), kSizeBlockstoreWriteCache)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if online {
|
||||
do := setupDiscoveryOption(n.Repo.Config().Discovery)
|
||||
if err := n.startOnlineServices(ctx, routingOption, hostOption, do); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
} else {
|
||||
n.Exchange = offline.Exchange(n.Blockstore)
|
||||
}
|
||||
|
||||
success = true
|
||||
return n, nil
|
||||
}
|
||||
}
|
||||
|
||||
func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption RoutingOption, hostOption HostOption, do DiscoveryOption) error {
|
||||
|
||||
if n.PeerHost != nil { // already online.
|
||||
@ -371,10 +251,13 @@ func (n *IpfsNode) teardown() error {
|
||||
// owned objects are closed in this teardown to ensure that they're closed
|
||||
// regardless of which constructor was used to add them to the node.
|
||||
closers := []io.Closer{
|
||||
n.Exchange,
|
||||
n.Repo,
|
||||
}
|
||||
|
||||
if n.Exchange != nil {
|
||||
closers = append(closers, n.Exchange)
|
||||
}
|
||||
|
||||
if n.Mounts.Ipfs != nil {
|
||||
closers = append(closers, mount.Closer(n.Mounts.Ipfs))
|
||||
}
|
||||
|
@ -48,7 +48,7 @@ func TestInitialization(t *testing.T) {
|
||||
C: *c,
|
||||
D: testutil.ThreadSafeCloserMapDatastore(),
|
||||
}
|
||||
n, err := NewIPFSNode(ctx, Standard(r, false))
|
||||
n, err := NewNode(ctx, &BuildCfg{Repo: r})
|
||||
if n == nil || err != nil {
|
||||
t.Error("Should have constructed.", i, err)
|
||||
}
|
||||
@ -59,7 +59,7 @@ func TestInitialization(t *testing.T) {
|
||||
C: *c,
|
||||
D: testutil.ThreadSafeCloserMapDatastore(),
|
||||
}
|
||||
n, err := NewIPFSNode(ctx, Standard(r, false))
|
||||
n, err := NewNode(ctx, &BuildCfg{Repo: r})
|
||||
if n != nil || err == nil {
|
||||
t.Error("Should have failed to construct.", i)
|
||||
}
|
||||
|
@ -47,7 +47,7 @@ func newNodeWithMockNamesys(ns mockNamesys) (*core.IpfsNode, error) {
|
||||
C: c,
|
||||
D: testutil.ThreadSafeCloserMapDatastore(),
|
||||
}
|
||||
n, err := core.NewIPFSNode(context.Background(), core.Offline(r))
|
||||
n, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
@ -25,7 +25,7 @@ func TestAddRecursive(t *testing.T) {
|
||||
},
|
||||
D: testutil.ThreadSafeCloserMapDatastore(),
|
||||
}
|
||||
node, err := core.NewIPFSNode(context.Background(), core.Offline(r))
|
||||
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -1,86 +1,39 @@
|
||||
package coremock
|
||||
|
||||
import (
|
||||
"net"
|
||||
|
||||
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
|
||||
"github.com/ipfs/go-ipfs/blocks/blockstore"
|
||||
blockservice "github.com/ipfs/go-ipfs/blockservice"
|
||||
commands "github.com/ipfs/go-ipfs/commands"
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
"github.com/ipfs/go-ipfs/exchange/offline"
|
||||
mdag "github.com/ipfs/go-ipfs/merkledag"
|
||||
nsys "github.com/ipfs/go-ipfs/namesys"
|
||||
metrics "github.com/ipfs/go-ipfs/metrics"
|
||||
host "github.com/ipfs/go-ipfs/p2p/host"
|
||||
mocknet "github.com/ipfs/go-ipfs/p2p/net/mock"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
path "github.com/ipfs/go-ipfs/path"
|
||||
pin "github.com/ipfs/go-ipfs/pin"
|
||||
"github.com/ipfs/go-ipfs/repo"
|
||||
config "github.com/ipfs/go-ipfs/repo/config"
|
||||
offrt "github.com/ipfs/go-ipfs/routing/offline"
|
||||
ds2 "github.com/ipfs/go-ipfs/util/datastore2"
|
||||
testutil "github.com/ipfs/go-ipfs/util/testutil"
|
||||
)
|
||||
|
||||
// TODO this is super sketch. Deprecate and initialize one that shares code
|
||||
// with the actual core constructor. Lots of fields aren't initialized.
|
||||
// "This is as good as broken." --- is it?
|
||||
|
||||
// NewMockNode constructs an IpfsNode for use in tests.
|
||||
func NewMockNode() (*core.IpfsNode, error) {
|
||||
ctx := context.Background()
|
||||
|
||||
// Generate Identity
|
||||
ident, err := testutil.RandIdentity()
|
||||
if err != nil {
|
||||
return nil, err
|
||||
// effectively offline, only peer in its network
|
||||
return core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: MockHostOption(mocknet.New(ctx)),
|
||||
})
|
||||
}
|
||||
|
||||
func MockHostOption(mn mocknet.Mocknet) core.HostOption {
|
||||
return func(ctx context.Context, id peer.ID, ps peer.Peerstore, bwr metrics.Reporter, fs []*net.IPNet) (host.Host, error) {
|
||||
return mn.AddPeerWithPeerstore(id, ps)
|
||||
}
|
||||
p := ident.ID()
|
||||
|
||||
c := config.Config{
|
||||
Identity: config.Identity{
|
||||
PeerID: p.String(),
|
||||
},
|
||||
}
|
||||
|
||||
nd, err := core.Offline(&repo.Mock{
|
||||
C: c,
|
||||
D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())),
|
||||
})(ctx)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
nd.PrivateKey = ident.PrivateKey()
|
||||
nd.Peerstore = peer.NewPeerstore()
|
||||
nd.Peerstore.AddPrivKey(p, ident.PrivateKey())
|
||||
nd.Peerstore.AddPubKey(p, ident.PublicKey())
|
||||
nd.Identity = p
|
||||
|
||||
nd.PeerHost, err = mocknet.New(nd.Context()).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Routing
|
||||
nd.Routing = offrt.NewOfflineRouter(nd.Repo.Datastore(), nd.PrivateKey)
|
||||
|
||||
// Bitswap
|
||||
bstore := blockstore.NewBlockstore(nd.Repo.Datastore())
|
||||
bserv := blockservice.New(bstore, offline.Exchange(bstore))
|
||||
|
||||
nd.DAG = mdag.NewDAGService(bserv)
|
||||
|
||||
nd.Pinning = pin.NewPinner(nd.Repo.Datastore(), nd.DAG)
|
||||
|
||||
// Namespace resolver
|
||||
nd.Namesys = nsys.NewNameSystem(nd.Routing)
|
||||
|
||||
// Path resolver
|
||||
nd.Resolver = &path.Resolver{DAG: nd.DAG}
|
||||
|
||||
return nd, nil
|
||||
}
|
||||
|
||||
func MockCmdsCtx() (commands.Context, error) {
|
||||
@ -97,10 +50,14 @@ func MockCmdsCtx() (commands.Context, error) {
|
||||
},
|
||||
}
|
||||
|
||||
node, err := core.NewIPFSNode(context.Background(), core.Offline(&repo.Mock{
|
||||
r := &repo.Mock{
|
||||
D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())),
|
||||
C: conf,
|
||||
}))
|
||||
}
|
||||
|
||||
node, err := core.NewNode(context.Background(), &core.BuildCfg{
|
||||
Repo: r,
|
||||
})
|
||||
|
||||
return commands.Context{
|
||||
Online: true,
|
||||
|
@ -25,6 +25,7 @@ type Mocknet interface {
|
||||
// AddPeer adds an existing peer. we need both a privkey and addr.
|
||||
// ID is derived from PrivKey
|
||||
AddPeer(ic.PrivKey, ma.Multiaddr) (host.Host, error)
|
||||
AddPeerWithPeerstore(peer.ID, peer.Peerstore) (host.Host, error)
|
||||
|
||||
// retrieve things (with randomized iteration order)
|
||||
Peers() []peer.ID
|
||||
|
@ -64,13 +64,26 @@ func (mn *mocknet) GenPeer() (host.Host, error) {
|
||||
}
|
||||
|
||||
func (mn *mocknet) AddPeer(k ic.PrivKey, a ma.Multiaddr) (host.Host, error) {
|
||||
n, err := newPeernet(mn.ctx, mn, k, a)
|
||||
p, err := peer.IDFromPublicKey(k.GetPublic())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ps := peer.NewPeerstore()
|
||||
ps.AddAddr(p, a, peer.PermanentAddrTTL)
|
||||
ps.AddPrivKey(p, k)
|
||||
ps.AddPubKey(p, k.GetPublic())
|
||||
|
||||
return mn.AddPeerWithPeerstore(p, ps)
|
||||
}
|
||||
|
||||
func (mn *mocknet) AddPeerWithPeerstore(p peer.ID, ps peer.Peerstore) (host.Host, error) {
|
||||
n, err := newPeernet(mn.ctx, mn, p, ps)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
h := bhost.New(n)
|
||||
log.Debugf("mocknet added listen addr for peer: %s -- %s", n.LocalPeer(), a)
|
||||
|
||||
mn.proc.AddChild(n.proc)
|
||||
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"math/rand"
|
||||
"sync"
|
||||
|
||||
ic "github.com/ipfs/go-ipfs/p2p/crypto"
|
||||
inet "github.com/ipfs/go-ipfs/p2p/net"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
|
||||
@ -40,19 +39,7 @@ type peernet struct {
|
||||
}
|
||||
|
||||
// newPeernet constructs a new peernet
|
||||
func newPeernet(ctx context.Context, m *mocknet, k ic.PrivKey,
|
||||
a ma.Multiaddr) (*peernet, error) {
|
||||
|
||||
p, err := peer.IDFromPublicKey(k.GetPublic())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// create our own entirely, so that peers knowledge doesn't get shared
|
||||
ps := peer.NewPeerstore()
|
||||
ps.AddAddr(p, a, peer.PermanentAddrTTL)
|
||||
ps.AddPrivKey(p, k)
|
||||
ps.AddPubKey(p, k.GetPublic())
|
||||
func newPeernet(ctx context.Context, m *mocknet, p peer.ID, ps peer.Peerstore) (*peernet, error) {
|
||||
|
||||
n := &peernet{
|
||||
mocknet: m,
|
||||
|
@ -15,12 +15,16 @@ import (
|
||||
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
coreunix "github.com/ipfs/go-ipfs/core/coreunix"
|
||||
mock "github.com/ipfs/go-ipfs/core/mock"
|
||||
mocknet "github.com/ipfs/go-ipfs/p2p/net/mock"
|
||||
"github.com/ipfs/go-ipfs/p2p/peer"
|
||||
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
|
||||
"github.com/ipfs/go-ipfs/thirdparty/unit"
|
||||
testutil "github.com/ipfs/go-ipfs/util/testutil"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("epictest")
|
||||
|
||||
const kSeed = 1
|
||||
|
||||
func Test1KBInstantaneous(t *testing.T) {
|
||||
@ -87,35 +91,38 @@ func RandomBytes(n int64) []byte {
|
||||
func DirectAddCat(data []byte, conf testutil.LatencyConfig) error {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
const numPeers = 2
|
||||
|
||||
// create network
|
||||
mn, err := mocknet.FullMeshLinked(ctx, numPeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mn := mocknet.New(ctx)
|
||||
mn.SetLinkDefaults(mocknet.LinkOptions{
|
||||
Latency: conf.NetworkLatency,
|
||||
// TODO add to conf. This is tricky because we want 0 values to be functional.
|
||||
Bandwidth: math.MaxInt32,
|
||||
})
|
||||
|
||||
peers := mn.Peers()
|
||||
if len(peers) < numPeers {
|
||||
return errors.New("test initialization error")
|
||||
}
|
||||
|
||||
adder, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[0], mn.Host(peers[0]), conf, core.DHTOption)))
|
||||
adder, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: mock.MockHostOption(mn),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer adder.Close()
|
||||
catter, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[1], mn.Host(peers[1]), conf, core.DHTOption)))
|
||||
|
||||
catter, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: mock.MockHostOption(mn),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer catter.Close()
|
||||
|
||||
err = mn.LinkAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bs1 := []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)}
|
||||
bs2 := []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)}
|
||||
|
||||
|
@ -10,6 +10,7 @@ import (
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
coreunix "github.com/ipfs/go-ipfs/core/coreunix"
|
||||
mock "github.com/ipfs/go-ipfs/core/mock"
|
||||
mocknet "github.com/ipfs/go-ipfs/p2p/net/mock"
|
||||
"github.com/ipfs/go-ipfs/p2p/peer"
|
||||
"github.com/ipfs/go-ipfs/thirdparty/unit"
|
||||
@ -35,35 +36,38 @@ func benchCat(b *testing.B, data []byte, conf testutil.LatencyConfig) error {
|
||||
b.StopTimer()
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
const numPeers = 2
|
||||
|
||||
// create network
|
||||
mn, err := mocknet.FullMeshLinked(ctx, numPeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mn := mocknet.New(ctx)
|
||||
mn.SetLinkDefaults(mocknet.LinkOptions{
|
||||
Latency: conf.NetworkLatency,
|
||||
// TODO add to conf. This is tricky because we want 0 values to be functional.
|
||||
Bandwidth: math.MaxInt32,
|
||||
})
|
||||
|
||||
peers := mn.Peers()
|
||||
if len(peers) < numPeers {
|
||||
return errors.New("test initialization error")
|
||||
}
|
||||
|
||||
adder, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[0], mn.Host(peers[0]), conf, core.DHTOption)))
|
||||
adder, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: mock.MockHostOption(mn),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer adder.Close()
|
||||
catter, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(peers[1], mn.Host(peers[1]), conf, core.DHTOption)))
|
||||
|
||||
catter, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: mock.MockHostOption(mn),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer catter.Close()
|
||||
|
||||
err = mn.LinkAll()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
bs1 := []peer.PeerInfo{adder.Peerstore.PeerInfo(adder.Identity)}
|
||||
bs2 := []peer.PeerInfo{catter.Peerstore.PeerInfo(catter.Identity)}
|
||||
|
||||
|
@ -2,15 +2,13 @@ package integrationtest
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"errors"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
"github.com/ipfs/go-ipfs/blocks"
|
||||
"github.com/ipfs/go-ipfs/core"
|
||||
"github.com/ipfs/go-ipfs/core/mock"
|
||||
mocknet "github.com/ipfs/go-ipfs/p2p/net/mock"
|
||||
testutil "github.com/ipfs/go-ipfs/util/testutil"
|
||||
)
|
||||
|
||||
func TestBitswapWithoutRouting(t *testing.T) {
|
||||
@ -19,22 +17,15 @@ func TestBitswapWithoutRouting(t *testing.T) {
|
||||
const numPeers = 4
|
||||
|
||||
// create network
|
||||
mn, err := mocknet.FullMeshLinked(ctx, numPeers)
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
||||
peers := mn.Peers()
|
||||
if len(peers) < numPeers {
|
||||
t.Fatal(errors.New("test initialization error"))
|
||||
}
|
||||
|
||||
// set the routing latency to infinity.
|
||||
conf := testutil.LatencyConfig{RoutingLatency: (525600 * time.Minute)}
|
||||
mn := mocknet.New(ctx)
|
||||
|
||||
var nodes []*core.IpfsNode
|
||||
for _, p := range peers {
|
||||
n, err := core.NewIPFSNode(ctx, core.ConfigOption(MocknetTestRepo(p, mn.Host(p), conf, core.NilRouterOption)))
|
||||
for i := 0; i < numPeers; i++ {
|
||||
n, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: coremock.MockHostOption(mn),
|
||||
Routing: core.NilRouterOption, // no routing
|
||||
})
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -42,6 +33,8 @@ func TestBitswapWithoutRouting(t *testing.T) {
|
||||
nodes = append(nodes, n)
|
||||
}
|
||||
|
||||
mn.LinkAll()
|
||||
|
||||
// connect them
|
||||
for _, n1 := range nodes {
|
||||
for _, n2 := range nodes {
|
||||
|
@ -1,54 +0,0 @@
|
||||
package integrationtest
|
||||
|
||||
import (
|
||||
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
syncds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
|
||||
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
bitswap "github.com/ipfs/go-ipfs/exchange/bitswap"
|
||||
bsnet "github.com/ipfs/go-ipfs/exchange/bitswap/network"
|
||||
host "github.com/ipfs/go-ipfs/p2p/host"
|
||||
peer "github.com/ipfs/go-ipfs/p2p/peer"
|
||||
"github.com/ipfs/go-ipfs/repo"
|
||||
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
|
||||
eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog"
|
||||
ds2 "github.com/ipfs/go-ipfs/util/datastore2"
|
||||
testutil "github.com/ipfs/go-ipfs/util/testutil"
|
||||
)
|
||||
|
||||
var log = eventlog.Logger("epictest")
|
||||
|
||||
func MocknetTestRepo(p peer.ID, h host.Host, conf testutil.LatencyConfig, routing core.RoutingOption) core.ConfigOption {
|
||||
return func(ctx context.Context) (*core.IpfsNode, error) {
|
||||
const kWriteCacheElems = 100
|
||||
const alwaysSendToPeer = true
|
||||
dsDelay := delay.Fixed(conf.BlockstoreLatency)
|
||||
r := &repo.Mock{
|
||||
D: ds2.CloserWrap(syncds.MutexWrap(ds2.WithDelay(datastore.NewMapDatastore(), dsDelay))),
|
||||
}
|
||||
ds := r.Datastore()
|
||||
|
||||
n := &core.IpfsNode{
|
||||
Peerstore: h.Peerstore(),
|
||||
Repo: r,
|
||||
PeerHost: h,
|
||||
Identity: p,
|
||||
}
|
||||
dhtt, err := routing(ctx, n.PeerHost, n.Repo.Datastore())
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
bsn := bsnet.NewFromIpfsHost(h, dhtt)
|
||||
bstore, err := blockstore.WriteCached(blockstore.NewBlockstore(ds), kWriteCacheElems)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
exch := bitswap.New(ctx, p, bsn, bstore, alwaysSendToPeer)
|
||||
n.Blockstore = bstore
|
||||
n.Exchange = exch
|
||||
n.Routing = dhtt
|
||||
return n, nil
|
||||
}
|
||||
}
|
@ -16,9 +16,9 @@ import (
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
"github.com/ipfs/go-ipfs/core/corerouting"
|
||||
"github.com/ipfs/go-ipfs/core/coreunix"
|
||||
mock "github.com/ipfs/go-ipfs/core/mock"
|
||||
mocknet "github.com/ipfs/go-ipfs/p2p/net/mock"
|
||||
"github.com/ipfs/go-ipfs/p2p/peer"
|
||||
"github.com/ipfs/go-ipfs/thirdparty/iter"
|
||||
"github.com/ipfs/go-ipfs/thirdparty/unit"
|
||||
ds2 "github.com/ipfs/go-ipfs/util/datastore2"
|
||||
testutil "github.com/ipfs/go-ipfs/util/testutil"
|
||||
@ -82,28 +82,21 @@ func InitializeSupernodeNetwork(
|
||||
conf testutil.LatencyConfig) ([]*core.IpfsNode, []*core.IpfsNode, error) {
|
||||
|
||||
// create network
|
||||
mn, err := mocknet.FullMeshLinked(ctx, numServers+numClients)
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
mn := mocknet.New(ctx)
|
||||
|
||||
mn.SetLinkDefaults(mocknet.LinkOptions{
|
||||
Latency: conf.NetworkLatency,
|
||||
Bandwidth: math.MaxInt32,
|
||||
})
|
||||
|
||||
peers := mn.Peers()
|
||||
if len(peers) < numServers+numClients {
|
||||
return nil, nil, errors.New("test initialization error")
|
||||
}
|
||||
clientPeers, serverPeers := peers[0:numClients], peers[numClients:]
|
||||
|
||||
routingDatastore := ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore()))
|
||||
var servers []*core.IpfsNode
|
||||
for i := range iter.N(numServers) {
|
||||
p := serverPeers[i]
|
||||
bootstrap, err := core.NewIPFSNode(ctx, MocknetTestRepo(p, mn.Host(p), conf,
|
||||
corerouting.SupernodeServer(routingDatastore)))
|
||||
for i := 0; i < numServers; i++ {
|
||||
bootstrap, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: mock.MockHostOption(mn),
|
||||
Routing: corerouting.SupernodeServer(routingDatastore),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
@ -117,15 +110,18 @@ func InitializeSupernodeNetwork(
|
||||
}
|
||||
|
||||
var clients []*core.IpfsNode
|
||||
for i := range iter.N(numClients) {
|
||||
p := clientPeers[i]
|
||||
n, err := core.NewIPFSNode(ctx, MocknetTestRepo(p, mn.Host(p), conf,
|
||||
corerouting.SupernodeClient(bootstrapInfos...)))
|
||||
for i := 0; i < numClients; i++ {
|
||||
n, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: mock.MockHostOption(mn),
|
||||
Routing: corerouting.SupernodeClient(bootstrapInfos...),
|
||||
})
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
clients = append(clients, n)
|
||||
}
|
||||
mn.LinkAll()
|
||||
|
||||
bcfg := core.BootstrapConfigWithPeers(bootstrapInfos)
|
||||
for _, n := range clients {
|
||||
|
@ -12,6 +12,7 @@ import (
|
||||
|
||||
core "github.com/ipfs/go-ipfs/core"
|
||||
coreunix "github.com/ipfs/go-ipfs/core/coreunix"
|
||||
mock "github.com/ipfs/go-ipfs/core/mock"
|
||||
mocknet "github.com/ipfs/go-ipfs/p2p/net/mock"
|
||||
"github.com/ipfs/go-ipfs/p2p/peer"
|
||||
"github.com/ipfs/go-ipfs/thirdparty/unit"
|
||||
@ -67,35 +68,40 @@ func RunThreeLeggedCat(data []byte, conf testutil.LatencyConfig) error {
|
||||
const numPeers = 3
|
||||
|
||||
// create network
|
||||
mn, err := mocknet.FullMeshLinked(ctx, numPeers)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
mn := mocknet.New(ctx)
|
||||
mn.SetLinkDefaults(mocknet.LinkOptions{
|
||||
Latency: conf.NetworkLatency,
|
||||
// TODO add to conf. This is tricky because we want 0 values to be functional.
|
||||
Bandwidth: math.MaxInt32,
|
||||
})
|
||||
|
||||
peers := mn.Peers()
|
||||
if len(peers) < numPeers {
|
||||
return errors.New("test initialization error")
|
||||
}
|
||||
bootstrap, err := core.NewIPFSNode(ctx, MocknetTestRepo(peers[2], mn.Host(peers[2]), conf, core.DHTOption))
|
||||
bootstrap, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: mock.MockHostOption(mn),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer bootstrap.Close()
|
||||
adder, err := core.NewIPFSNode(ctx, MocknetTestRepo(peers[0], mn.Host(peers[0]), conf, core.DHTOption))
|
||||
|
||||
adder, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: mock.MockHostOption(mn),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer adder.Close()
|
||||
catter, err := core.NewIPFSNode(ctx, MocknetTestRepo(peers[1], mn.Host(peers[1]), conf, core.DHTOption))
|
||||
|
||||
catter, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Host: mock.MockHostOption(mn),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer catter.Close()
|
||||
mn.LinkAll()
|
||||
|
||||
bis := bootstrap.Peerstore.PeerInfo(bootstrap.PeerHost.ID())
|
||||
bcfg := core.BootstrapConfigWithPeers([]peer.PeerInfo{bis})
|
||||
|
@ -93,14 +93,11 @@ func run() error {
|
||||
})
|
||||
}
|
||||
|
||||
node, err := core.NewIPFSNode(
|
||||
ctx,
|
||||
core.OnlineWithOptions(
|
||||
repo,
|
||||
corerouting.SupernodeClient(infos...),
|
||||
core.DefaultHostOption,
|
||||
),
|
||||
)
|
||||
node, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Online: true,
|
||||
Repo: repo,
|
||||
Routing: corerouting.SupernodeClient(infos...),
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@ -168,10 +165,13 @@ func runFileCattingWorker(ctx context.Context, n *core.IpfsNode) error {
|
||||
return err
|
||||
}
|
||||
|
||||
dummy, err := core.NewIPFSNode(ctx, core.Offline(&repo.Mock{
|
||||
r := &repo.Mock{
|
||||
D: ds2.CloserWrap(syncds.MutexWrap(datastore.NewMapDatastore())),
|
||||
C: *conf,
|
||||
}))
|
||||
}
|
||||
dummy, err := core.NewNode(ctx, &core.BuildCfg{
|
||||
Repo: r,
|
||||
})
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
Reference in New Issue
Block a user