mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 03:42:21 +08:00
integrate bitswap and blockservice into the core package
This commit is contained in:
@ -78,6 +78,7 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR
|
|||||||
// GetBlock attempts to retrieve a particular block from peers, within timeout.
|
// GetBlock attempts to retrieve a particular block from peers, within timeout.
|
||||||
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
|
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
|
||||||
*blocks.Block, error) {
|
*blocks.Block, error) {
|
||||||
|
u.DOut("Bitswap GetBlock: '%s'\n", k.Pretty())
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
tleft := timeout - time.Now().Sub(begin)
|
tleft := timeout - time.Now().Sub(begin)
|
||||||
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
|
provs_ch := bs.routing.FindProvidersAsync(k, 20, timeout)
|
||||||
@ -126,7 +127,7 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byt
|
|||||||
case resp_mes := <-resp:
|
case resp_mes := <-resp:
|
||||||
return resp_mes.Data, nil
|
return resp_mes.Data, nil
|
||||||
case <-after:
|
case <-after:
|
||||||
u.PErr("getBlock for '%s' timed out.\n", k)
|
u.PErr("getBlock for '%s' timed out.\n", k.Pretty())
|
||||||
return nil, u.ErrTimeout
|
return nil, u.ErrTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -48,24 +48,29 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
|||||||
// GetBlock retrieves a particular block from the service,
|
// GetBlock retrieves a particular block from the service,
|
||||||
// Getting it from the datastore using the key (hash).
|
// Getting it from the datastore using the key (hash).
|
||||||
func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
|
func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
|
||||||
|
u.DOut("BlockService GetBlock: '%s'\n", k.Pretty())
|
||||||
dsk := ds.NewKey(string(k))
|
dsk := ds.NewKey(string(k))
|
||||||
datai, err := s.Datastore.Get(dsk)
|
datai, err := s.Datastore.Get(dsk)
|
||||||
if err == nil {
|
if err == nil {
|
||||||
|
u.DOut("Blockservice: Got data in datastore.\n")
|
||||||
bdata, ok := datai.([]byte)
|
bdata, ok := datai.([]byte)
|
||||||
if !ok {
|
if !ok {
|
||||||
return nil, fmt.Errorf("data associated with %s is not a []byte", k)
|
return nil, fmt.Errorf("data associated with %s is not a []byte", k)
|
||||||
}
|
}
|
||||||
|
u.DOut("Got data: %v\n", bdata)
|
||||||
return &blocks.Block{
|
return &blocks.Block{
|
||||||
Multihash: mh.Multihash(k),
|
Multihash: mh.Multihash(k),
|
||||||
Data: bdata,
|
Data: bdata,
|
||||||
}, nil
|
}, nil
|
||||||
} else if err == ds.ErrNotFound && s.Remote != nil {
|
} else if err == ds.ErrNotFound && s.Remote != nil {
|
||||||
|
u.DOut("Blockservice: Searching bitswap.\n")
|
||||||
blk, err := s.Remote.GetBlock(k, time.Second*5)
|
blk, err := s.Remote.GetBlock(k, time.Second*5)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return blk, nil
|
return blk, nil
|
||||||
} else {
|
} else {
|
||||||
|
u.DOut("Blockservice GetBlock: Not found.\n")
|
||||||
return nil, u.ErrNotFound
|
return nil, u.ErrNotFound
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -2,12 +2,13 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"os"
|
||||||
|
|
||||||
"github.com/gonuts/flag"
|
"github.com/gonuts/flag"
|
||||||
"github.com/jbenet/commander"
|
"github.com/jbenet/commander"
|
||||||
config "github.com/jbenet/go-ipfs/config"
|
config "github.com/jbenet/go-ipfs/config"
|
||||||
core "github.com/jbenet/go-ipfs/core"
|
core "github.com/jbenet/go-ipfs/core"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
"os"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// The IPFS command tree. It is an instance of `commander.Command`.
|
// The IPFS command tree. It is an instance of `commander.Command`.
|
||||||
@ -55,6 +56,7 @@ func ipfsCmd(c *commander.Command, args []string) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func main() {
|
func main() {
|
||||||
|
u.Debug = true
|
||||||
err := CmdIpfs.Dispatch(os.Args[1:])
|
err := CmdIpfs.Dispatch(os.Args[1:])
|
||||||
if err != nil {
|
if err != nil {
|
||||||
if len(err.Error()) > 0 {
|
if len(err.Error()) > 0 {
|
||||||
|
@ -4,6 +4,7 @@ package main
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
"github.com/gonuts/flag"
|
"github.com/gonuts/flag"
|
||||||
"github.com/jbenet/commander"
|
"github.com/jbenet/commander"
|
||||||
rofs "github.com/jbenet/go-ipfs/fuse/readonly"
|
rofs "github.com/jbenet/go-ipfs/fuse/readonly"
|
||||||
|
43
core/core.go
43
core/core.go
@ -4,11 +4,17 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
ds "github.com/jbenet/datastore.go"
|
ds "github.com/jbenet/datastore.go"
|
||||||
|
"github.com/jbenet/go-ipfs/bitswap"
|
||||||
bserv "github.com/jbenet/go-ipfs/blockservice"
|
bserv "github.com/jbenet/go-ipfs/blockservice"
|
||||||
config "github.com/jbenet/go-ipfs/config"
|
config "github.com/jbenet/go-ipfs/config"
|
||||||
merkledag "github.com/jbenet/go-ipfs/merkledag"
|
merkledag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
path "github.com/jbenet/go-ipfs/path"
|
path "github.com/jbenet/go-ipfs/path"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
|
routing "github.com/jbenet/go-ipfs/routing"
|
||||||
|
dht "github.com/jbenet/go-ipfs/routing/dht"
|
||||||
|
swarm "github.com/jbenet/go-ipfs/swarm"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
ma "github.com/jbenet/go-multiaddr"
|
||||||
)
|
)
|
||||||
|
|
||||||
// IpfsNode is IPFS Core module. It represents an IPFS instance.
|
// IpfsNode is IPFS Core module. It represents an IPFS instance.
|
||||||
@ -27,13 +33,13 @@ type IpfsNode struct {
|
|||||||
Datastore ds.Datastore
|
Datastore ds.Datastore
|
||||||
|
|
||||||
// the network message stream
|
// the network message stream
|
||||||
// Network *netmux.Netux
|
Swarm *swarm.Swarm
|
||||||
|
|
||||||
// the routing system. recommend ipfs-dht
|
// the routing system. recommend ipfs-dht
|
||||||
// Routing *routing.Routing
|
Routing routing.IpfsRouting
|
||||||
|
|
||||||
// the block exchange + strategy (bitswap)
|
// the block exchange + strategy (bitswap)
|
||||||
// BitSwap *bitswap.BitSwap
|
BitSwap *bitswap.BitSwap
|
||||||
|
|
||||||
// the block service, get/add blocks.
|
// the block service, get/add blocks.
|
||||||
Blocks *bserv.BlockService
|
Blocks *bserv.BlockService
|
||||||
@ -59,7 +65,36 @@ func NewIpfsNode(cfg *config.Config) (*IpfsNode, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
bs, err := bserv.NewBlockService(d, nil)
|
maddr, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/4001")
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
local := &peer.Peer{
|
||||||
|
ID: peer.ID(cfg.Identity.PeerID),
|
||||||
|
Addresses: []*ma.Multiaddr{maddr},
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(local.ID) == 0 {
|
||||||
|
mh, err := u.Hash([]byte("blah blah blah ID"))
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
local.ID = peer.ID(mh)
|
||||||
|
}
|
||||||
|
|
||||||
|
net := swarm.NewSwarm(local)
|
||||||
|
err = net.Listen()
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
route := dht.NewDHT(local, net, d)
|
||||||
|
route.Start()
|
||||||
|
|
||||||
|
swap := bitswap.NewBitSwap(local, net, d, route)
|
||||||
|
|
||||||
|
bs, err := bserv.NewBlockService(d, swap)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -5,17 +5,19 @@
|
|||||||
package readonly
|
package readonly
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bazil.org/fuse"
|
|
||||||
"bazil.org/fuse/fs"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
core "github.com/jbenet/go-ipfs/core"
|
|
||||||
mdag "github.com/jbenet/go-ipfs/merkledag"
|
|
||||||
"os"
|
"os"
|
||||||
"os/exec"
|
"os/exec"
|
||||||
"os/signal"
|
"os/signal"
|
||||||
"runtime"
|
"runtime"
|
||||||
"syscall"
|
"syscall"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"bazil.org/fuse"
|
||||||
|
"bazil.org/fuse/fs"
|
||||||
|
core "github.com/jbenet/go-ipfs/core"
|
||||||
|
mdag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
// FileSystem is the readonly Ipfs Fuse Filesystem.
|
// FileSystem is the readonly Ipfs Fuse Filesystem.
|
||||||
@ -45,6 +47,7 @@ func (*Root) Attr() fuse.Attr {
|
|||||||
|
|
||||||
// Lookup performs a lookup under this node.
|
// Lookup performs a lookup under this node.
|
||||||
func (s *Root) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
|
func (s *Root) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
|
||||||
|
u.DOut("Root Lookup: '%s'\n", name)
|
||||||
switch name {
|
switch name {
|
||||||
case "mach_kernel", ".hidden", "._.":
|
case "mach_kernel", ".hidden", "._.":
|
||||||
// Just quiet some log noise on OS X.
|
// Just quiet some log noise on OS X.
|
||||||
@ -62,6 +65,7 @@ func (s *Root) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
|
|||||||
|
|
||||||
// ReadDir reads a particular directory. Disallowed for root.
|
// ReadDir reads a particular directory. Disallowed for root.
|
||||||
func (*Root) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
|
func (*Root) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
|
||||||
|
u.DOut("Read Root.\n")
|
||||||
return nil, fuse.EPERM
|
return nil, fuse.EPERM
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -73,6 +77,7 @@ type Node struct {
|
|||||||
|
|
||||||
// Attr returns the attributes of a given node.
|
// Attr returns the attributes of a given node.
|
||||||
func (s *Node) Attr() fuse.Attr {
|
func (s *Node) Attr() fuse.Attr {
|
||||||
|
u.DOut("Node attr.\n")
|
||||||
if len(s.Nd.Links) > 0 {
|
if len(s.Nd.Links) > 0 {
|
||||||
return fuse.Attr{Mode: os.ModeDir | 0555}
|
return fuse.Attr{Mode: os.ModeDir | 0555}
|
||||||
}
|
}
|
||||||
@ -83,6 +88,7 @@ func (s *Node) Attr() fuse.Attr {
|
|||||||
|
|
||||||
// Lookup performs a lookup under this node.
|
// Lookup performs a lookup under this node.
|
||||||
func (s *Node) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
|
func (s *Node) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
|
||||||
|
u.DOut("Lookup '%s'\n", name)
|
||||||
nd, err := s.Ipfs.Resolver.ResolveLinks(s.Nd, []string{name})
|
nd, err := s.Ipfs.Resolver.ResolveLinks(s.Nd, []string{name})
|
||||||
if err != nil {
|
if err != nil {
|
||||||
// todo: make this error more versatile.
|
// todo: make this error more versatile.
|
||||||
@ -94,6 +100,7 @@ func (s *Node) Lookup(name string, intr fs.Intr) (fs.Node, fuse.Error) {
|
|||||||
|
|
||||||
// ReadDir reads the link structure as directory entries
|
// ReadDir reads the link structure as directory entries
|
||||||
func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
|
func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
|
||||||
|
u.DOut("Node ReadDir\n")
|
||||||
entries := make([]fuse.Dirent, len(s.Nd.Links))
|
entries := make([]fuse.Dirent, len(s.Nd.Links))
|
||||||
for i, link := range s.Nd.Links {
|
for i, link := range s.Nd.Links {
|
||||||
n := link.Name
|
n := link.Name
|
||||||
@ -111,6 +118,7 @@ func (s *Node) ReadDir(intr fs.Intr) ([]fuse.Dirent, fuse.Error) {
|
|||||||
|
|
||||||
// ReadAll reads the object data as file data
|
// ReadAll reads the object data as file data
|
||||||
func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
|
func (s *Node) ReadAll(intr fs.Intr) ([]byte, fuse.Error) {
|
||||||
|
u.DOut("Read node.\n")
|
||||||
return []byte(s.Nd.Data), nil
|
return []byte(s.Nd.Data), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -2,6 +2,7 @@ package merkledag
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
mh "github.com/jbenet/go-multihash"
|
mh "github.com/jbenet/go-multihash"
|
||||||
)
|
)
|
||||||
|
|
||||||
|
@ -2,11 +2,12 @@ package path
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"path"
|
||||||
|
"strings"
|
||||||
|
|
||||||
merkledag "github.com/jbenet/go-ipfs/merkledag"
|
merkledag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
mh "github.com/jbenet/go-multihash"
|
mh "github.com/jbenet/go-multihash"
|
||||||
"path"
|
|
||||||
"strings"
|
|
||||||
)
|
)
|
||||||
|
|
||||||
// Resolver provides path resolution to IPFS
|
// Resolver provides path resolution to IPFS
|
||||||
@ -19,6 +20,7 @@ type Resolver struct {
|
|||||||
// path component as a hash (key) of the first node, then resolves
|
// path component as a hash (key) of the first node, then resolves
|
||||||
// all other components walking the links, with ResolveLinks.
|
// all other components walking the links, with ResolveLinks.
|
||||||
func (s *Resolver) ResolvePath(fpath string) (*merkledag.Node, error) {
|
func (s *Resolver) ResolvePath(fpath string) (*merkledag.Node, error) {
|
||||||
|
u.DOut("Resolve: '%s'\n", fpath)
|
||||||
fpath = path.Clean(fpath)
|
fpath = path.Clean(fpath)
|
||||||
|
|
||||||
parts := strings.Split(fpath, "/")
|
parts := strings.Split(fpath, "/")
|
||||||
@ -39,6 +41,7 @@ func (s *Resolver) ResolvePath(fpath string) (*merkledag.Node, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
u.DOut("Resolve dag get.\n")
|
||||||
nd, err := s.DAG.Get(u.Key(h))
|
nd, err := s.DAG.Get(u.Key(h))
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
|
Reference in New Issue
Block a user