diff --git a/bitswap/bitswap.go b/bitswap/bitswap.go index eabfed153..e643e3bf4 100644 --- a/bitswap/bitswap.go +++ b/bitswap/bitswap.go @@ -32,14 +32,13 @@ type BitSwap struct { net swarm.Network meschan *swarm.Chan - // datastore is the local database - // Ledgers of known + // datastore is the local database // Ledgers of known datastore ds.Datastore // routing interface for communication routing *dht.IpfsDHT - listener *swarm.MesListener + listener *swarm.MessageListener // partners is a map of currently active bitswap relationships. // The Ledger has the peer.ID, and the peer connection works through net. @@ -67,7 +66,7 @@ func NewBitSwap(p *peer.Peer, net swarm.Network, d ds.Datastore, r routing.IpfsR routing: r.(*dht.IpfsDHT), meschan: net.GetChannel(swarm.PBWrapper_BITSWAP), haltChan: make(chan struct{}), - listener: swarm.NewMesListener(), + listener: swarm.NewMessageListener(), } go bs.handleMessages() @@ -84,11 +83,11 @@ func (bs *BitSwap) GetBlock(k u.Key, timeout time.Duration) ( valchan := make(chan []byte) after := time.After(tleft) - // TODO: when the data is received, shut down this for loop + // TODO: when the data is received, shut down this for loop ASAP go func() { for p := range provs_ch { go func(pr *peer.Peer) { - ledger := bs.GetLedger(pr.Key()) + ledger := bs.GetLedger(pr) blk, err := bs.getBlock(k, pr, tleft) if err != nil { u.PErr("getBlock returned: %v\n", err) @@ -170,6 +169,8 @@ func (bs *BitSwap) handleMessages() { switch pmes.GetType() { case PBMessage_GET_BLOCK: go bs.handleGetBlock(mes.Peer, pmes) + case PBMessage_WANT_BLOCK: + go bs.handleWantBlock(mes.Peer, pmes) default: u.PErr("Invalid message type.\n") } @@ -179,9 +180,18 @@ func (bs *BitSwap) handleMessages() { } } +func (bs *BitSwap) handleWantBlock(p *peer.Peer, pmes *PBMessage) { + wants := pmes.GetWantlist() + ledg := bs.GetLedger(p) + for _, s := range wants { + // TODO: this needs to be different. We need timeouts. + ledg.WantList[u.Key(s)] = struct{}{} + } +} + func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) { u.DOut("handleGetBlock.\n") - ledger := bs.GetLedger(p.Key()) + ledger := bs.GetLedger(p) u.DOut("finding [%s] in datastore.\n", u.Key(pmes.GetKey()).Pretty()) idata, err := bs.datastore.Get(ds.NewKey(pmes.GetKey())) @@ -216,19 +226,35 @@ func (bs *BitSwap) handleGetBlock(p *peer.Peer, pmes *PBMessage) { } } -func (bs *BitSwap) GetLedger(k u.Key) *Ledger { - l, ok := bs.partners[k] +func (bs *BitSwap) GetLedger(p *peer.Peer) *Ledger { + l, ok := bs.partners[p.Key()] if ok { return l } l = new(Ledger) l.Strategy = StandardStrategy - l.Partner = peer.ID(k) - bs.partners[k] = l + l.Partner = p + bs.partners[p.Key()] = l return l } +func (bs *BitSwap) SendWantList(wl KeySet) error { + mes := Message{ + ID: swarm.GenerateMessageID(), + Type: PBMessage_WANT_BLOCK, + WantList: bs.wantList, + } + + pbmes := mes.ToProtobuf() + // Lets just ping everybody all at once + for _, ledger := range bs.partners { + bs.meschan.Outgoing <- swarm.NewMessage(ledger.Partner, pbmes) + } + + return nil +} + func (bs *BitSwap) Halt() { bs.haltChan <- struct{}{} } diff --git a/bitswap/ledger.go b/bitswap/ledger.go index f5637aca5..f94001771 100644 --- a/bitswap/ledger.go +++ b/bitswap/ledger.go @@ -10,17 +10,17 @@ import ( // Ledger stores the data exchange relationship between two peers. type Ledger struct { - // Partner is the ID of the remote Peer. - Partner peer.ID + // Partner is the remote Peer. + Partner *peer.Peer // Accounting tracks bytes sent and recieved. Accounting debtRatio // FirstExchnage is the time of the first data exchange. - FirstExchange *time.Time + FirstExchange time.Time // LastExchange is the time of the last data exchange. - LastExchange *time.Time + LastExchange time.Time // WantList is a (bounded, small) set of keys that Partner desires. WantList KeySet @@ -36,9 +36,11 @@ func (l *Ledger) ShouldSend() bool { } func (l *Ledger) SentBytes(n uint64) { + l.LastExchange = time.Now() l.Accounting.BytesSent += n } func (l *Ledger) ReceivedBytes(n uint64) { + l.LastExchange = time.Now() l.Accounting.BytesRecv += n } diff --git a/bitswap/message.go b/bitswap/message.go index 5a124602e..0211ae71e 100644 --- a/bitswap/message.go +++ b/bitswap/message.go @@ -12,6 +12,7 @@ type Message struct { Key u.Key Value []byte Success bool + WantList KeySet } func (m *Message) ToProtobuf() *PBMessage { @@ -26,6 +27,14 @@ func (m *Message) ToProtobuf() *PBMessage { pmes.Success = proto.Bool(true) } + if m.WantList != nil { + var swant []string + for k, _ := range m.WantList { + swant = append(swant, string(k)) + } + pmes.Wantlist = swant + } + pmes.Key = proto.String(string(m.Key)) pmes.Value = m.Value return pmes diff --git a/bitswap/message.pb.go b/bitswap/message.pb.go index ea478517b..e187b9bea 100644 --- a/bitswap/message.pb.go +++ b/bitswap/message.pb.go @@ -23,14 +23,17 @@ var _ = math.Inf type PBMessage_MessageType int32 const ( - PBMessage_GET_BLOCK PBMessage_MessageType = 0 + PBMessage_GET_BLOCK PBMessage_MessageType = 0 + PBMessage_WANT_BLOCK PBMessage_MessageType = 1 ) var PBMessage_MessageType_name = map[int32]string{ 0: "GET_BLOCK", + 1: "WANT_BLOCK", } var PBMessage_MessageType_value = map[string]int32{ - "GET_BLOCK": 0, + "GET_BLOCK": 0, + "WANT_BLOCK": 1, } func (x PBMessage_MessageType) Enum() *PBMessage_MessageType { @@ -57,6 +60,7 @@ type PBMessage struct { Value []byte `protobuf:"bytes,4,opt,name=value" json:"value,omitempty"` Response *bool `protobuf:"varint,5,opt,name=response" json:"response,omitempty"` Success *bool `protobuf:"varint,6,opt,name=success" json:"success,omitempty"` + Wantlist []string `protobuf:"bytes,7,rep,name=wantlist" json:"wantlist,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -106,6 +110,13 @@ func (m *PBMessage) GetSuccess() bool { return false } +func (m *PBMessage) GetWantlist() []string { + if m != nil { + return m.Wantlist + } + return nil +} + func init() { proto.RegisterEnum("bitswap.PBMessage_MessageType", PBMessage_MessageType_name, PBMessage_MessageType_value) } diff --git a/bitswap/message.proto b/bitswap/message.proto index 3906bf709..a99ed7ebc 100644 --- a/bitswap/message.proto +++ b/bitswap/message.proto @@ -3,6 +3,7 @@ package bitswap; message PBMessage { enum MessageType { GET_BLOCK = 0; + WANT_BLOCK = 1; } required MessageType Type = 1; @@ -11,4 +12,5 @@ message PBMessage { optional bytes value = 4; optional bool response = 5; optional bool success = 6; + repeated string wantlist = 7; } diff --git a/blockservice/blocks_test.go b/blockservice/blocks_test.go index d1fe5f080..28034b711 100644 --- a/blockservice/blocks_test.go +++ b/blockservice/blocks_test.go @@ -2,7 +2,6 @@ package blockservice import ( "bytes" - "fmt" "testing" ds "github.com/jbenet/datastore.go" @@ -11,9 +10,8 @@ import ( ) func TestBlocks(t *testing.T) { - d := ds.NewMapDatastore() - bs, err := NewBlockService(d) + bs, err := NewBlockService(d, nil) if err != nil { t.Error("failed to construct block service", err) return @@ -62,7 +60,4 @@ func TestBlocks(t *testing.T) { if !bytes.Equal(b.Data, b2.Data) { t.Error("Block data is not equal.") } - - fmt.Printf("key: %s\n", b.Key()) - fmt.Printf("data: %v\n", b.Data) } diff --git a/blockservice/blockservice.go b/blockservice/blockservice.go index a645c2917..eb59151cd 100644 --- a/blockservice/blockservice.go +++ b/blockservice/blockservice.go @@ -25,7 +25,7 @@ func NewBlockService(d ds.Datastore, rem *bitswap.BitSwap) (*BlockService, error return nil, fmt.Errorf("BlockService requires valid datastore") } if rem == nil { - return nil, fmt.Errorf("BlockService requires a valid bitswap") + u.PErr("Caution: blockservice running in local (offline) mode.\n") } return &BlockService{Datastore: d, Remote: rem}, nil } @@ -39,7 +39,9 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) { if err != nil { return k, err } - err = s.Remote.HaveBlock(b.Key()) + if s.Remote != nil { + err = s.Remote.HaveBlock(b.Key()) + } return k, err } @@ -57,7 +59,7 @@ func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) { Multihash: mh.Multihash(k), Data: bdata, }, nil - } else if err == ds.ErrNotFound { + } else if err == ds.ErrNotFound && s.Remote != nil { blk, err := s.Remote.GetBlock(k, time.Second*5) if err != nil { return nil, err diff --git a/routing/dht/dht.go b/routing/dht/dht.go index f340bf805..b1a6e59c9 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -49,7 +49,7 @@ type IpfsDHT struct { diaglock sync.Mutex // listener is a server to register to listen for responses to messages - listener *swarm.MesListener + listener *swarm.MessageListener } // NewDHT creates a new DHT object with the given peer as the 'local' host @@ -66,7 +66,7 @@ func NewDHT(p *peer.Peer, net swarm.Network, dstore ds.Datastore) *IpfsDHT { dht.routingTables[0] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*30) dht.routingTables[1] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Millisecond*100) dht.routingTables[2] = kb.NewRoutingTable(20, kb.ConvertPeerID(p.ID), time.Hour) - dht.listener = swarm.NewMesListener() + dht.listener = swarm.NewMessageListener() dht.birth = time.Now() return dht } diff --git a/routing/kbucket/table_test.go b/routing/kbucket/table_test.go index 13a55d14c..dbb391ff3 100644 --- a/routing/kbucket/table_test.go +++ b/routing/kbucket/table_test.go @@ -78,7 +78,7 @@ func TestTableUpdate(t *testing.T) { for i := 0; i < 10000; i++ { p := rt.Update(peers[rand.Intn(len(peers))]) if p != nil { - t.Log("evicted peer.") + //t.Log("evicted peer.") } } diff --git a/swarm/mes_listener.go b/swarm/mes_listener.go index 3f24195b8..712d1999f 100644 --- a/swarm/mes_listener.go +++ b/swarm/mes_listener.go @@ -8,7 +8,7 @@ import ( u "github.com/jbenet/go-ipfs/util" ) -type MesListener struct { +type MessageListener struct { listeners map[uint64]*listenInfo haltchan chan struct{} unlist chan uint64 @@ -41,8 +41,8 @@ type listenInfo struct { id uint64 } -func NewMesListener() *MesListener { - ml := new(MesListener) +func NewMessageListener() *MessageListener { + ml := new(MessageListener) ml.haltchan = make(chan struct{}) ml.listeners = make(map[uint64]*listenInfo) ml.nlist = make(chan *listenInfo, 16) @@ -52,7 +52,7 @@ func NewMesListener() *MesListener { return ml } -func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-chan *Message { +func (ml *MessageListener) Listen(id uint64, count int, timeout time.Duration) <-chan *Message { li := new(listenInfo) li.count = count li.eol = time.Now().Add(timeout) @@ -62,7 +62,7 @@ func (ml *MesListener) Listen(id uint64, count int, timeout time.Duration) <-cha return li.resp } -func (ml *MesListener) Unlisten(id uint64) { +func (ml *MessageListener) Unlisten(id uint64) { ml.unlist <- id } @@ -71,18 +71,18 @@ type respMes struct { mes *Message } -func (ml *MesListener) Respond(id uint64, mes *Message) { +func (ml *MessageListener) Respond(id uint64, mes *Message) { ml.send <- &respMes{ id: id, mes: mes, } } -func (ml *MesListener) Halt() { +func (ml *MessageListener) Halt() { ml.haltchan <- struct{}{} } -func (ml *MesListener) run() { +func (ml *MessageListener) run() { for { select { case <-ml.haltchan: diff --git a/swarm/mes_listener_test.go b/swarm/mes_listener_test.go index 2d0846eb7..566011aa9 100644 --- a/swarm/mes_listener_test.go +++ b/swarm/mes_listener_test.go @@ -8,8 +8,8 @@ import ( ) // Ensure that the Message Listeners basic functionality works -func TestMesListenerBasic(t *testing.T) { - ml := NewMesListener() +func TestMessageListener(t *testing.T) { + ml := NewMessageListener() a := GenerateMessageID() resp := ml.Listen(a, 1, time.Minute) @@ -20,7 +20,7 @@ func TestMesListenerBasic(t *testing.T) { go ml.Respond(a, mes) - del := time.After(time.Millisecond * 10) + del := time.After(time.Millisecond * 100) select { case get := <-resp: if string(get.Data) != string(mes.Data) {