mirror of
https://github.com/ipfs/kubo.git
synced 2025-10-15 18:54:08 +08:00
basic implementation of bitswap, needs testing/verification that it works
This commit is contained in:
@ -10,7 +10,6 @@ import (
|
|||||||
|
|
||||||
ds "github.com/jbenet/datastore.go"
|
ds "github.com/jbenet/datastore.go"
|
||||||
|
|
||||||
"errors"
|
|
||||||
"time"
|
"time"
|
||||||
)
|
)
|
||||||
|
|
||||||
@ -74,24 +73,52 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR
|
|||||||
}
|
}
|
||||||
|
|
||||||
// GetBlock attempts to retrieve a particular block from peers, within timeout.
|
// GetBlock attempts to retrieve a particular block from peers, within timeout.
|
||||||
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Time) (
|
func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) (
|
||||||
*blocks.Block, error) {
|
*blocks.Block, error) {
|
||||||
begin := time.Now()
|
begin := time.Now()
|
||||||
_, err := bs.routing.FindProviders(k, timeout)
|
provs, err := bs.routing.FindProviders(k, timeout)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
u.PErr("GetBlock error: %s\n", err)
|
u.PErr("GetBlock error: %s\n", err)
|
||||||
return
|
return nil, err
|
||||||
|
}
|
||||||
|
tleft := timeout - time.Now().Sub(begin)
|
||||||
|
|
||||||
|
valchan := make(chan []byte)
|
||||||
|
after := time.After(tleft)
|
||||||
|
for _, p := range provs {
|
||||||
|
go func(pr *peer.Peer) {
|
||||||
|
ledger := bs.GetLedger(pr.Key())
|
||||||
|
blk, err := bs.getBlock(k, pr, tleft)
|
||||||
|
if err != nil {
|
||||||
|
u.PErr("%v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
// NOTE: this credits everyone who sends us a block,
|
||||||
|
// even if we dont use it
|
||||||
|
ledger.ReceivedBytes(uint64(len(blk)))
|
||||||
|
select {
|
||||||
|
case valchan <- blk:
|
||||||
|
default:
|
||||||
|
}
|
||||||
|
}(p)
|
||||||
|
}
|
||||||
|
|
||||||
|
select {
|
||||||
|
case blkdata := <-valchan:
|
||||||
|
return blocks.NewBlock(blkdata)
|
||||||
|
case <-after:
|
||||||
|
return nil, u.ErrTimeout
|
||||||
}
|
}
|
||||||
tleft := timeout.Sub(time.Now().Sub(begin))
|
|
||||||
return nil, errors.New("not implemented")
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) {
|
func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) {
|
||||||
|
//
|
||||||
mes := new(PBMessage)
|
mes := new(PBMessage)
|
||||||
mes.Id = proto.Uint64(swarm.GenerateID())
|
mes.Id = proto.Uint64(swarm.GenerateMessageID())
|
||||||
mes.Key = proto.String(k)
|
mes.Key = proto.String(string(k))
|
||||||
typ := PBMessage_GET_BLOCK
|
typ := PBMessage_GET_BLOCK
|
||||||
mes.Type = &typ
|
mes.Type = &typ
|
||||||
|
//
|
||||||
|
|
||||||
after := time.After(timeout)
|
after := time.After(timeout)
|
||||||
resp := bs.listener.Listen(mes.GetId(), 1, timeout)
|
resp := bs.listener.Listen(mes.GetId(), 1, timeout)
|
||||||
@ -100,23 +127,89 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byt
|
|||||||
|
|
||||||
select {
|
select {
|
||||||
case resp_mes := <-resp:
|
case resp_mes := <-resp:
|
||||||
|
pmes := new(PBMessage)
|
||||||
|
err := proto.Unmarshal(resp_mes.Data, pmes)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
if pmes.GetSuccess() {
|
||||||
|
return pmes.GetValue(), nil
|
||||||
|
}
|
||||||
|
return nil, u.ErrNotFound
|
||||||
case <-after:
|
case <-after:
|
||||||
u.PErr("getBlock for '%s' timed out.", k)
|
u.PErr("getBlock for '%s' timed out.\n", k)
|
||||||
return nil, u.ErrTimeout
|
return nil, u.ErrTimeout
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// HaveBlock announces the existance of a block to BitSwap, potentially sending
|
// HaveBlock announces the existance of a block to BitSwap, potentially sending
|
||||||
// it to peers (Partners) whose WantLists include it.
|
// it to peers (Partners) whose WantLists include it.
|
||||||
func (bs *BitSwap) HaveBlock(k u.Key) (*blocks.Block, error) {
|
func (bs *BitSwap) HaveBlock(k u.Key) error {
|
||||||
return nil, errors.New("not implemented")
|
return bs.routing.Provide(k)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *BitSwap) handleMessages() {
|
func (bs *BitSwap) handleMessages() {
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case mes := bs.meschan.Incoming:
|
case mes := <-bs.meschan.Incoming:
|
||||||
|
pmes := new(PBMessage)
|
||||||
|
err := proto.Unmarshal(mes.Data, pmes)
|
||||||
|
if err != nil {
|
||||||
|
u.PErr("%v\n", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
if pmes.GetResponse() {
|
||||||
|
bs.listener.Respond(pmes.GetId(), mes)
|
||||||
|
}
|
||||||
|
|
||||||
|
switch pmes.GetType() {
|
||||||
|
case PBMessage_GET_BLOCK:
|
||||||
|
go bs.handleGetBlock(mes.Peer, pmes)
|
||||||
|
default:
|
||||||
|
u.PErr("Invalid message type.\n")
|
||||||
|
}
|
||||||
case <-bs.haltChan:
|
case <-bs.haltChan:
|
||||||
|
return
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) {
|
||||||
|
ledger := bs.GetLedger(p.Key())
|
||||||
|
|
||||||
|
idata, err := bs.datastore.Get(ds.NewKey(pmes.GetKey()))
|
||||||
|
if err != nil {
|
||||||
|
if err == ds.ErrNotFound {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
u.PErr("%v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
data, ok := idata.([]byte)
|
||||||
|
if !ok {
|
||||||
|
u.PErr("Failed casting data from datastore.")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if ledger.ShouldSend() {
|
||||||
|
resp := &Message{
|
||||||
|
Value: data,
|
||||||
|
Response: true,
|
||||||
|
ID: pmes.GetId(),
|
||||||
|
}
|
||||||
|
bs.meschan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf())
|
||||||
|
ledger.SentBytes(uint64(len(data)))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *BitSwap) GetLedger(k u.Key) *Ledger {
|
||||||
|
l, ok := bs.partners[k]
|
||||||
|
if ok {
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
|
||||||
|
l = new(Ledger)
|
||||||
|
l.Partner = peer.ID(k)
|
||||||
|
bs.partners[k] = l
|
||||||
|
return l
|
||||||
|
}
|
||||||
|
@ -1,6 +1,8 @@
|
|||||||
package bitswap
|
package bitswap
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"math/rand"
|
||||||
|
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
|
||||||
@ -13,11 +15,8 @@ type Ledger struct {
|
|||||||
// Partner is the ID of the remote Peer.
|
// Partner is the ID of the remote Peer.
|
||||||
Partner peer.ID
|
Partner peer.ID
|
||||||
|
|
||||||
// BytesSent counts the number of bytes the local peer sent to Partner
|
// Accounting tracks bytes sent and recieved.
|
||||||
BytesSent uint64
|
Accounting debtRatio
|
||||||
|
|
||||||
// BytesReceived counts the number of bytes local peer received from Partner
|
|
||||||
BytesReceived uint64
|
|
||||||
|
|
||||||
// FirstExchnage is the time of the first data exchange.
|
// FirstExchnage is the time of the first data exchange.
|
||||||
FirstExchange *time.Time
|
FirstExchange *time.Time
|
||||||
@ -31,3 +30,15 @@ type Ledger struct {
|
|||||||
|
|
||||||
// LedgerMap lists Ledgers by their Partner key.
|
// LedgerMap lists Ledgers by their Partner key.
|
||||||
type LedgerMap map[u.Key]*Ledger
|
type LedgerMap map[u.Key]*Ledger
|
||||||
|
|
||||||
|
func (l *Ledger) ShouldSend() bool {
|
||||||
|
return rand.Float64() <= probabilitySend(l.Accounting.Value())
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Ledger) SentBytes(n uint64) {
|
||||||
|
l.Accounting.BytesSent += n
|
||||||
|
}
|
||||||
|
|
||||||
|
func (l *Ledger) ReceivedBytes(n uint64) {
|
||||||
|
l.Accounting.BytesRecv += n
|
||||||
|
}
|
||||||
|
30
bitswap/message.go
Normal file
30
bitswap/message.go
Normal file
@ -0,0 +1,30 @@
|
|||||||
|
package bitswap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"code.google.com/p/goprotobuf/proto"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
)
|
||||||
|
|
||||||
|
type Message struct {
|
||||||
|
ID uint64
|
||||||
|
Response bool
|
||||||
|
Key u.Key
|
||||||
|
Value []byte
|
||||||
|
Success bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Message) ToProtobuf() *PBMessage {
|
||||||
|
pmes := new(PBMessage)
|
||||||
|
pmes.Id = &m.ID
|
||||||
|
if m.Response {
|
||||||
|
pmes.Response = proto.Bool(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.Success {
|
||||||
|
pmes.Success = proto.Bool(true)
|
||||||
|
}
|
||||||
|
|
||||||
|
pmes.Key = proto.String(string(m.Key))
|
||||||
|
pmes.Value = m.Value
|
||||||
|
return pmes
|
||||||
|
}
|
@ -56,6 +56,7 @@ type PBMessage struct {
|
|||||||
Key *string `protobuf:"bytes,3,req,name=key" json:"key,omitempty"`
|
Key *string `protobuf:"bytes,3,req,name=key" json:"key,omitempty"`
|
||||||
Value []byte `protobuf:"bytes,4,opt,name=value" json:"value,omitempty"`
|
Value []byte `protobuf:"bytes,4,opt,name=value" json:"value,omitempty"`
|
||||||
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
|
Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"`
|
||||||
|
Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"`
|
||||||
XXX_unrecognized []byte `json:"-"`
|
XXX_unrecognized []byte `json:"-"`
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -98,6 +99,13 @@ func (m *PBMessage) GetResponse() bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *PBMessage) GetSuccess() bool {
|
||||||
|
if m != nil && m.Success != nil {
|
||||||
|
return *m.Success
|
||||||
|
}
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
proto.RegisterEnum("bitswap.PBMessage_MessageType", PBMessage_MessageType_name, PBMessage_MessageType_value)
|
proto.RegisterEnum("bitswap.PBMessage_MessageType", PBMessage_MessageType_name, PBMessage_MessageType_value)
|
||||||
}
|
}
|
||||||
|
@ -10,4 +10,5 @@ message PBMessage {
|
|||||||
required string key = 3;
|
required string key = 3;
|
||||||
optional bytes value = 4;
|
optional bytes value = 4;
|
||||||
optional bool response = 5;
|
optional bool response = 5;
|
||||||
|
optional bool success = 6;
|
||||||
}
|
}
|
||||||
|
@ -1,8 +1,6 @@
|
|||||||
package blocks
|
package blocks
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
|
||||||
ds "github.com/jbenet/datastore.go"
|
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
mh "github.com/jbenet/go-multihash"
|
mh "github.com/jbenet/go-multihash"
|
||||||
)
|
)
|
||||||
@ -27,45 +25,3 @@ func NewBlock(data []byte) (*Block, error) {
|
|||||||
func (b *Block) Key() u.Key {
|
func (b *Block) Key() u.Key {
|
||||||
return u.Key(b.Multihash)
|
return u.Key(b.Multihash)
|
||||||
}
|
}
|
||||||
|
|
||||||
// BlockService is a block datastore.
|
|
||||||
// It uses an internal `datastore.Datastore` instance to store values.
|
|
||||||
type BlockService struct {
|
|
||||||
Datastore ds.Datastore
|
|
||||||
// Remote *bitswap.BitSwap // eventually.
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewBlockService creates a BlockService with given datastore instance.
|
|
||||||
func NewBlockService(d ds.Datastore) (*BlockService, error) {
|
|
||||||
if d == nil {
|
|
||||||
return nil, fmt.Errorf("BlockService requires valid datastore")
|
|
||||||
}
|
|
||||||
return &BlockService{Datastore: d}, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// AddBlock adds a particular block to the service, Putting it into the datastore.
|
|
||||||
func (s *BlockService) AddBlock(b *Block) (u.Key, error) {
|
|
||||||
k := b.Key()
|
|
||||||
dsk := ds.NewKey(string(k))
|
|
||||||
return k, s.Datastore.Put(dsk, b.Data)
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetBlock retrieves a particular block from the service,
|
|
||||||
// Getting it from the datastore using the key (hash).
|
|
||||||
func (s *BlockService) GetBlock(k u.Key) (*Block, error) {
|
|
||||||
dsk := ds.NewKey(string(k))
|
|
||||||
datai, err := s.Datastore.Get(dsk)
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
data, ok := datai.([]byte)
|
|
||||||
if !ok {
|
|
||||||
return nil, fmt.Errorf("data associated with %s is not a []byte", k)
|
|
||||||
}
|
|
||||||
|
|
||||||
return &Block{
|
|
||||||
Multihash: mh.Multihash(k),
|
|
||||||
Data: data,
|
|
||||||
}, nil
|
|
||||||
}
|
|
||||||
|
54
blockservice/blockservice.go
Normal file
54
blockservice/blockservice.go
Normal file
@ -0,0 +1,54 @@
|
|||||||
|
package blockservice
|
||||||
|
|
||||||
|
import (
|
||||||
|
"fmt"
|
||||||
|
|
||||||
|
ds "github.com/jbenet/datastore.go"
|
||||||
|
bitswap "github.com/jbenet/go-ipfs/bitswap"
|
||||||
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
|
|
||||||
|
mh "github.com/jbenet/go-multihash"
|
||||||
|
)
|
||||||
|
|
||||||
|
// BlockService is a block datastore.
|
||||||
|
// It uses an internal `datastore.Datastore` instance to store values.
|
||||||
|
type BlockService struct {
|
||||||
|
Datastore ds.Datastore
|
||||||
|
Remote *bitswap.BitSwap
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewBlockService creates a BlockService with given datastore instance.
|
||||||
|
func NewBlockService(d ds.Datastore) (*BlockService, error) {
|
||||||
|
if d == nil {
|
||||||
|
return nil, fmt.Errorf("BlockService requires valid datastore")
|
||||||
|
}
|
||||||
|
return &BlockService{Datastore: d}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// AddBlock adds a particular block to the service, Putting it into the datastore.
|
||||||
|
func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
||||||
|
k := b.Key()
|
||||||
|
dsk := ds.NewKey(string(k))
|
||||||
|
return k, s.Datastore.Put(dsk, b.Data)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBlock retrieves a particular block from the service,
|
||||||
|
// Getting it from the datastore using the key (hash).
|
||||||
|
func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
|
||||||
|
dsk := ds.NewKey(string(k))
|
||||||
|
datai, err := s.Datastore.Get(dsk)
|
||||||
|
if err != nil {
|
||||||
|
return nil, err
|
||||||
|
}
|
||||||
|
|
||||||
|
data, ok := datai.([]byte)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("data associated with %s is not a []byte", k)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &blocks.Block{
|
||||||
|
Multihash: mh.Multihash(k),
|
||||||
|
Data: data,
|
||||||
|
}, nil
|
||||||
|
}
|
@ -2,8 +2,9 @@ package core
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
ds "github.com/jbenet/datastore.go"
|
ds "github.com/jbenet/datastore.go"
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
bserv "github.com/jbenet/go-ipfs/blockservice"
|
||||||
config "github.com/jbenet/go-ipfs/config"
|
config "github.com/jbenet/go-ipfs/config"
|
||||||
merkledag "github.com/jbenet/go-ipfs/merkledag"
|
merkledag "github.com/jbenet/go-ipfs/merkledag"
|
||||||
path "github.com/jbenet/go-ipfs/path"
|
path "github.com/jbenet/go-ipfs/path"
|
||||||
@ -35,7 +36,7 @@ type IpfsNode struct {
|
|||||||
// BitSwap *bitswap.BitSwap
|
// BitSwap *bitswap.BitSwap
|
||||||
|
|
||||||
// the block service, get/add blocks.
|
// the block service, get/add blocks.
|
||||||
Blocks *blocks.BlockService
|
Blocks *bserv.BlockService
|
||||||
|
|
||||||
// the merkle dag service, get/add objects.
|
// the merkle dag service, get/add objects.
|
||||||
DAG *merkledag.DAGService
|
DAG *merkledag.DAGService
|
||||||
@ -58,7 +59,7 @@ func NewIpfsNode(cfg *config.Config) (*IpfsNode, error) {
|
|||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
|
||||||
bs, err := blocks.NewBlockService(d)
|
bs, err := bserv.NewBlockService(d)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
|
@ -4,6 +4,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
|
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
|
bserv "github.com/jbenet/go-ipfs/blockservice"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
mh "github.com/jbenet/go-multihash"
|
mh "github.com/jbenet/go-multihash"
|
||||||
)
|
)
|
||||||
@ -93,7 +94,7 @@ func (n *Node) Key() (u.Key, error) {
|
|||||||
// - the root is virtual (like a forest)
|
// - the root is virtual (like a forest)
|
||||||
// - stores nodes' data in a BlockService
|
// - stores nodes' data in a BlockService
|
||||||
type DAGService struct {
|
type DAGService struct {
|
||||||
Blocks *blocks.BlockService
|
Blocks *bserv.BlockService
|
||||||
}
|
}
|
||||||
|
|
||||||
// Put adds a node to the DAGService, storing the block in the BlockService
|
// Put adds a node to the DAGService, storing the block in the BlockService
|
||||||
|
@ -603,6 +603,7 @@ func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, time
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Could be done async
|
||||||
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer.Peer {
|
func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer.Peer {
|
||||||
var provArr []*peer.Peer
|
var provArr []*peer.Peer
|
||||||
for _, prov := range peers {
|
for _, prov := range peers {
|
||||||
|
Reference in New Issue
Block a user