From 91e4675cbfd89e5df1b50168c7aac26cc57c656f Mon Sep 17 00:00:00 2001 From: Jeromy Date: Sun, 24 Aug 2014 18:13:05 -0700 Subject: [PATCH] basic implementation of bitswap, needs testing/verification that it works --- bitswap/bitswap.go | 117 +++++++++++++++++++++++++++++++---- bitswap/ledger.go | 21 +++++-- bitswap/message.go | 30 +++++++++ bitswap/message.pb.go | 8 +++ bitswap/message.proto | 1 + blocks/blocks.go | 44 ------------- blockservice/blockservice.go | 54 ++++++++++++++++ core/core.go | 7 ++- merkledag/merkledag.go | 3 +- routing/dht/dht.go | 1 + 10 files changed, 221 insertions(+), 65 deletions(-) create mode 100644 bitswap/message.go create mode 100644 blockservice/blockservice.go diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index b3ac44c5a..f3a9ffb1b 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -10,7 +10,6 @@ import ( ds "github.com/jbenet/datastore.go" - "errors" "time" ) @@ -74,24 +73,52 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR } // GetBlock attempts to retrieve a particular block from peers, within timeout. -func (bs *BitSwap) GetBlock(k u.Key, timeout time.Time) ( +func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( *blocks.Block, error) { begin := time.Now() - _, err := bs.routing.FindProviders(k, timeout) + provs, err := bs.routing.FindProviders(k, timeout) if err != nil { u.PErr("GetBlock error: %s\n", err) - return + return nil, err + } + tleft := timeout - time.Now().Sub(begin) + + valchan := make(chan []byte) + after := time.After(tleft) + for _, p := range provs { + go func(pr *peer.Peer) { + ledger := bs.GetLedger(pr.Key()) + blk, err := bs.getBlock(k, pr, tleft) + if err != nil { + u.PErr("%v\n", err) + return + } + // NOTE: this credits everyone who sends us a block, + // even if we dont use it + ledger.ReceivedBytes(uint64(len(blk))) + select { + case valchan <- blk: + default: + } + }(p) + } + + select { + case blkdata := <-valchan: + return blocks.NewBlock(blkdata) + case <-after: + return nil, u.ErrTimeout } - tleft := timeout.Sub(time.Now().Sub(begin)) - return nil, errors.New("not implemented") } func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byte, error) { + // mes := new(PBMessage) - mes.Id = proto.Uint64(swarm.GenerateID()) - mes.Key = proto.String(k) + mes.Id = proto.Uint64(swarm.GenerateMessageID()) + mes.Key = proto.String(string(k)) typ := PBMessage_GET_BLOCK mes.Type = &typ + // after := time.After(timeout) resp := bs.listener.Listen(mes.GetId(), 1, timeout) @@ -100,23 +127,89 @@ func (bs *BitSwap) getBlock(k u.Key, p *peer.Peer, timeout time.Duration) ([]byt select { case resp_mes := <-resp: + pmes := new(PBMessage) + err := proto.Unmarshal(resp_mes.Data, pmes) + if err != nil { + return nil, err + } + if pmes.GetSuccess() { + return pmes.GetValue(), nil + } + return nil, u.ErrNotFound case <-after: - u.PErr("getBlock for '%s' timed out.", k) + u.PErr("getBlock for '%s' timed out.\n", k) return nil, u.ErrTimeout } } // HaveBlock announces the existance of a block to BitSwap, potentially sending // it to peers (Partners) whose WantLists include it. -func (bs *BitSwap) HaveBlock(k u.Key) (*blocks.Block, error) { - return nil, errors.New("not implemented") +func (bs *BitSwap) HaveBlock(k u.Key) error { + return bs.routing.Provide(k) } func (bs *BitSwap) handleMessages() { for { select { - case mes := bs.meschan.Incoming: + case mes := <-bs.meschan.Incoming: + pmes := new(PBMessage) + err := proto.Unmarshal(mes.Data, pmes) + if err != nil { + u.PErr("%v\n", err) + continue + } + if pmes.GetResponse() { + bs.listener.Respond(pmes.GetId(), mes) + } + + switch pmes.GetType() { + case PBMessage_GET_BLOCK: + go bs.handleGetBlock(mes.Peer, pmes) + default: + u.PErr("Invalid message type.\n") + } case <-bs.haltChan: + return } } } + +func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) { + ledger := bs.GetLedger(p.Key()) + + idata, err := bs.datastore.Get(ds.NewKey(pmes.GetKey())) + if err != nil { + if err == ds.ErrNotFound { + return + } + u.PErr("%v\n", err) + return + } + data, ok := idata.([]byte) + if !ok { + u.PErr("Failed casting data from datastore.") + return + } + + if ledger.ShouldSend() { + resp := &Message{ + Value: data, + Response: true, + ID: pmes.GetId(), + } + bs.meschan.Outgoing <- swarm.NewMessage(p, resp.ToProtobuf()) + ledger.SentBytes(uint64(len(data))) + } +} + +func (bs *BitSwap) GetLedger(k u.Key) *Ledger { + l, ok := bs.partners[k] + if ok { + return l + } + + l = new(Ledger) + l.Partner = peer.ID(k) + bs.partners[k] = l + return l +} diff --git a/bitswap/ledger.go b/bitswap/ledger.go index ee6bd0f3e..adf5b51bb 100644 --- a/bitswap/ledger.go +++ b/bitswap/ledger.go @@ -1,6 +1,8 @@ package bitswap import ( + "math/rand" + peer "github.com/jbenet/go-ipfs/peer" u "github.com/jbenet/go-ipfs/util" @@ -13,11 +15,8 @@ type Ledger struct { // Partner is the ID of the remote Peer. Partner peer.ID - // BytesSent counts the number of bytes the local peer sent to Partner - BytesSent uint64 - - // BytesReceived counts the number of bytes local peer received from Partner - BytesReceived uint64 + // Accounting tracks bytes sent and recieved. + Accounting debtRatio // FirstExchnage is the time of the first data exchange. FirstExchange *time.Time @@ -31,3 +30,15 @@ type Ledger struct { // LedgerMap lists Ledgers by their Partner key. type LedgerMap map[u.Key]*Ledger + +func (l *Ledger) ShouldSend() bool { + return rand.Float64() <= probabilitySend(l.Accounting.Value()) +} + +func (l *Ledger) SentBytes(n uint64) { + l.Accounting.BytesSent += n +} + +func (l *Ledger) ReceivedBytes(n uint64) { + l.Accounting.BytesRecv += n +} diff --git a/bitswap/message.go b/bitswap/message.go new file mode 100644 index 000000000..968adb583 --- /dev/null +++ b/bitswap/message.go @@ -0,0 +1,30 @@ +package bitswap + +import ( + "code.google.com/p/goprotobuf/proto" + u "github.com/jbenet/go-ipfs/util" +) + +type Message struct { + ID uint64 + Response bool + Key u.Key + Value []byte + Success bool +} + +func (m *Message) ToProtobuf() *PBMessage { + pmes := new(PBMessage) + pmes.Id = &m.ID + if m.Response { + pmes.Response = proto.Bool(true) + } + + if m.Success { + pmes.Success = proto.Bool(true) + } + + pmes.Key = proto.String(string(m.Key)) + pmes.Value = m.Value + return pmes +} diff --git a/bitswap/message.pb.go b/bitswap/message.pb.go index 3d6170862..ea478517b 100644 --- a/bitswap/message.pb.go +++ b/bitswap/message.pb.go @@ -56,6 +56,7 @@ type PBMessage struct { Key *string `protobuf:"bytes,3,req,name=key" json:"key,omitempty"` Value []byte `protobuf:"bytes,4,opt,name=value" json:"value,omitempty"` Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` + Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -98,6 +99,13 @@ func (m *PBMessage) GetResponse() bool { return false } +func (m *PBMessage) GetSuccess() bool { + if m != nil && m.Success != nil { + return *m.Success + } + return false +} + func init() { proto.RegisterEnum("bitswap.PBMessage_MessageType", PBMessage_MessageType_name, PBMessage_MessageType_value) } diff --git a/bitswap/message.proto b/bitswap/message.proto index 4ed2809a7..3906bf709 100644 --- a/bitswap/message.proto +++ b/bitswap/message.proto @@ -10,4 +10,5 @@ message PBMessage { required string key = 3; optional bytes value = 4; optional bool response = 5; + optional bool success = 6; } diff --git a/blocks/blocks.go b/blocks/blocks.go index 4d5e9e952..45aee6ab2 100644 --- a/blocks/blocks.go +++ b/blocks/blocks.go @@ -1,8 +1,6 @@ package blocks import ( - "fmt" - ds "github.com/jbenet/datastore.go" u "github.com/jbenet/go-ipfs/util" mh "github.com/jbenet/go-multihash" ) @@ -27,45 +25,3 @@ func NewBlock(data []byte) (*Block, error) { func (b *Block) Key() u.Key { return u.Key(b.Multihash) } - -// BlockService is a block datastore. -// It uses an internal `datastore.Datastore` instance to store values. -type BlockService struct { - Datastore ds.Datastore - // Remote *bitswap.BitSwap // eventually. -} - -// NewBlockService creates a BlockService with given datastore instance. -func NewBlockService(d ds.Datastore) (*BlockService, error) { - if d == nil { - return nil, fmt.Errorf("BlockService requires valid datastore") - } - return &BlockService{Datastore: d}, nil -} - -// AddBlock adds a particular block to the service, Putting it into the datastore. -func (s *BlockService) AddBlock(b *Block) (u.Key, error) { - k := b.Key() - dsk := ds.NewKey(string(k)) - return k, s.Datastore.Put(dsk, b.Data) -} - -// GetBlock retrieves a particular block from the service, -// Getting it from the datastore using the key (hash). -func (s *BlockService) GetBlock(k u.Key) (*Block, error) { - dsk := ds.NewKey(string(k)) - datai, err := s.Datastore.Get(dsk) - if err != nil { - return nil, err - } - - data, ok := datai.([]byte) - if !ok { - return nil, fmt.Errorf("data associated with %s is not a []byte", k) - } - - return &Block{ - Multihash: mh.Multihash(k), - Data: data, - }, nil -} diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go new file mode 100644 index 000000000..0f5dbd888 --- /dev/null +++ b/blockservice/blockservice.go @@ -0,0 +1,54 @@ +package blockservice + +import ( + "fmt" + + ds "github.com/jbenet/datastore.go" + bitswap "github.com/jbenet/go-ipfs/bitswap" + blocks "github.com/jbenet/go-ipfs/blocks" + u "github.com/jbenet/go-ipfs/util" + + mh "github.com/jbenet/go-multihash" +) + +// BlockService is a block datastore. +// It uses an internal `datastore.Datastore` instance to store values. +type BlockService struct { + Datastore ds.Datastore + Remote *bitswap.BitSwap +} + +// NewBlockService creates a BlockService with given datastore instance. +func NewBlockService(d ds.Datastore) (*BlockService, error) { + if d == nil { + return nil, fmt.Errorf("BlockService requires valid datastore") + } + return &BlockService{Datastore: d}, nil +} + +// AddBlock adds a particular block to the service, Putting it into the datastore. +func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { + k := b.Key() + dsk := ds.NewKey(string(k)) + return k, s.Datastore.Put(dsk, b.Data) +} + +// GetBlock retrieves a particular block from the service, +// Getting it from the datastore using the key (hash). +func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) { + dsk := ds.NewKey(string(k)) + datai, err := s.Datastore.Get(dsk) + if err != nil { + return nil, err + } + + data, ok := datai.([]byte) + if !ok { + return nil, fmt.Errorf("data associated with %s is not a []byte", k) + } + + return &blocks.Block{ + Multihash: mh.Multihash(k), + Data: data, + }, nil +} diff --git a/core/core.go b/core/core.go index 66058cf61..f2d6d1c3e 100644 --- a/core/core.go +++ b/core/core.go @@ -2,8 +2,9 @@ package core import ( "fmt" + ds "github.com/jbenet/datastore.go" - blocks "github.com/jbenet/go-ipfs/blocks" + bserv "github.com/jbenet/go-ipfs/blockservice" config "github.com/jbenet/go-ipfs/config" merkledag "github.com/jbenet/go-ipfs/merkledag" path "github.com/jbenet/go-ipfs/path" @@ -35,7 +36,7 @@ type IpfsNode struct { // BitSwap *bitswap.BitSwap // the block service, get/add blocks. - Blocks *blocks.BlockService + Blocks *bserv.BlockService // the merkle dag service, get/add objects. DAG *merkledag.DAGService @@ -58,7 +59,7 @@ func NewIpfsNode(cfg *config.Config) (*IpfsNode, error) { return nil, err } - bs, err := blocks.NewBlockService(d) + bs, err := bserv.NewBlockService(d) if err != nil { return nil, err } diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index b906bbf89..4d1f5d4e4 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -4,6 +4,7 @@ import ( "fmt" blocks "github.com/jbenet/go-ipfs/blocks" + bserv "github.com/jbenet/go-ipfs/blockservice" u "github.com/jbenet/go-ipfs/util" mh "github.com/jbenet/go-multihash" ) @@ -93,7 +94,7 @@ func (n *Node) Key() (u.Key, error) { // - the root is virtual (like a forest) // - stores nodes' data in a BlockService type DAGService struct { - Blocks *blocks.BlockService + Blocks *bserv.BlockService } // Put adds a node to the DAGService, storing the block in the BlockService diff --git a/routing/dht/dht.go b/routing/dht/dht.go index aa3a8da8b..593310d64 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -603,6 +603,7 @@ func (dht *IpfsDHT) findProvidersSingle(p *peer.Peer, key u.Key, level int, time } } +// TODO: Could be done async func (dht *IpfsDHT) addPeerList(key u.Key, peers []*PBDHTMessage_PBPeer) []*peer.Peer { var provArr []*peer.Peer for _, prov := range peers {