diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index c6f3c74a9..57359c0ec 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -288,7 +288,7 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg bs.dupBlocksRecvd++ } bs.counterLk.Unlock() - log.Debugf("got block %s from %s", block, p) + log.Debugf("got block %s from %s (%d,%d)", block, p, bs.blocksRecvd, bs.dupBlocksRecvd) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(hasBlockCtx, block); err != nil { diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 86eb2d764..6548472c9 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -163,7 +163,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { } for _ = range outch { } - log.Error("DONE") }(inst) } wg.Wait() diff --git a/exchange/bitswap/decision/engine_test.go b/exchange/bitswap/decision/engine_test.go index 31e46c776..8337c4800 100644 --- a/exchange/bitswap/decision/engine_test.go +++ b/exchange/bitswap/decision/engine_test.go @@ -41,7 +41,7 @@ func TestConsistentAccounting(t *testing.T) { // Send messages from Ernie to Bert for i := 0; i < 1000; i++ { - m := message.New() + m := message.New(false) content := []string{"this", "is", "message", "i"} m.AddBlock(blocks.NewBlock([]byte(strings.Join(content, " ")))) @@ -73,7 +73,7 @@ func TestPeerIsAddedToPeersWhenMessageReceivedOrSent(t *testing.T) { sanfrancisco := newEngine(ctx, "sf") seattle := newEngine(ctx, "sea") - m := message.New() + m := message.New(true) sanfrancisco.Engine.MessageSent(seattle.Peer, m) seattle.Engine.MessageReceived(sanfrancisco.Peer, m) @@ -164,7 +164,7 @@ func TestPartnerWantsThenCancels(t *testing.T) { } func partnerWants(e *Engine, keys []string, partner peer.ID) { - add := message.New() + add := message.New(false) for i, letter := range keys { block := blocks.NewBlock([]byte(letter)) add.AddEntry(block.Key(), math.MaxInt32-i) @@ -173,7 +173,7 @@ func partnerWants(e *Engine, keys []string, partner peer.ID) { } func partnerCancels(e *Engine, keys []string, partner peer.ID) { - cancels := message.New() + cancels := message.New(false) for _, k := range keys { block := blocks.NewBlock([]byte(k)) cancels.Cancel(block.Key()) diff --git a/exchange/bitswap/message/message.go b/exchange/bitswap/message/message.go index 63f7f28b5..d885bb373 100644 --- a/exchange/bitswap/message/message.go +++ b/exchange/bitswap/message/message.go @@ -31,12 +31,7 @@ type BitSwapMessage interface { Empty() bool - // Sets whether or not the contained wantlist represents the entire wantlist - // true = full wantlist - // false = wantlist 'patch' - // default: true - SetFull(isFull bool) - + // A full wantlist is an authoritative copy, a 'non-full' wantlist is a patch-set Full() bool AddBlock(*blocks.Block) @@ -56,15 +51,15 @@ type impl struct { blocks map[u.Key]*blocks.Block } -func New() BitSwapMessage { - return newMsg() +func New(full bool) BitSwapMessage { + return newMsg(full) } -func newMsg() *impl { +func newMsg(full bool) *impl { return &impl{ blocks: make(map[u.Key]*blocks.Block), wantlist: make(map[u.Key]Entry), - full: true, + full: full, } } @@ -74,8 +69,7 @@ type Entry struct { } func newMessageFromProto(pbm pb.Message) BitSwapMessage { - m := newMsg() - m.SetFull(pbm.GetWantlist().GetFull()) + m := newMsg(pbm.GetWantlist().GetFull()) for _, e := range pbm.GetWantlist().GetEntries() { m.addEntry(u.Key(e.GetBlock()), int(e.GetPriority()), e.GetCancel()) } @@ -86,10 +80,6 @@ func newMessageFromProto(pbm pb.Message) BitSwapMessage { return m } -func (m *impl) SetFull(full bool) { - m.full = full -} - func (m *impl) Full() bool { return m.full } diff --git a/exchange/bitswap/message/message_test.go b/exchange/bitswap/message/message_test.go index cbeed8892..7a6a28a04 100644 --- a/exchange/bitswap/message/message_test.go +++ b/exchange/bitswap/message/message_test.go @@ -13,7 +13,7 @@ import ( func TestAppendWanted(t *testing.T) { const str = "foo" - m := New() + m := New(true) m.AddEntry(u.Key(str), 1) if !wantlistContains(m.ToProto().GetWantlist(), str) { @@ -44,7 +44,7 @@ func TestAppendBlock(t *testing.T) { strs = append(strs, "Celeritas") strs = append(strs, "Incendia") - m := New() + m := New(true) for _, str := range strs { block := blocks.NewBlock([]byte(str)) m.AddBlock(block) @@ -61,7 +61,7 @@ func TestAppendBlock(t *testing.T) { func TestWantlist(t *testing.T) { keystrs := []string{"foo", "bar", "baz", "bat"} - m := New() + m := New(true) for _, s := range keystrs { m.AddEntry(u.Key(s), 1) } @@ -84,7 +84,7 @@ func TestWantlist(t *testing.T) { func TestCopyProtoByValue(t *testing.T) { const str = "foo" - m := New() + m := New(true) protoBeforeAppend := m.ToProto() m.AddEntry(u.Key(str), 1) if wantlistContains(protoBeforeAppend.GetWantlist(), str) { @@ -93,7 +93,7 @@ func TestCopyProtoByValue(t *testing.T) { } func TestToNetFromNetPreservesWantList(t *testing.T) { - original := New() + original := New(true) original.AddEntry(u.Key("M"), 1) original.AddEntry(u.Key("B"), 1) original.AddEntry(u.Key("D"), 1) @@ -124,7 +124,7 @@ func TestToNetFromNetPreservesWantList(t *testing.T) { func TestToAndFromNetMessage(t *testing.T) { - original := New() + original := New(true) original.AddBlock(blocks.NewBlock([]byte("W"))) original.AddBlock(blocks.NewBlock([]byte("E"))) original.AddBlock(blocks.NewBlock([]byte("F"))) @@ -172,7 +172,7 @@ func contains(strs []string, x string) bool { func TestDuplicates(t *testing.T) { b := blocks.NewBlock([]byte("foo")) - msg := New() + msg := New(true) msg.AddEntry(b.Key(), 1) msg.AddEntry(b.Key(), 1) diff --git a/exchange/bitswap/testnet/network_test.go b/exchange/bitswap/testnet/network_test.go index c963ae9ac..9624df5f8 100644 --- a/exchange/bitswap/testnet/network_test.go +++ b/exchange/bitswap/testnet/network_test.go @@ -31,7 +31,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { fromWaiter peer.ID, msgFromWaiter bsmsg.BitSwapMessage) { - msgToWaiter := bsmsg.New() + msgToWaiter := bsmsg.New(true) msgToWaiter.AddBlock(blocks.NewBlock([]byte(expectedStr))) waiter.SendMessage(ctx, fromWaiter, msgToWaiter) })) @@ -55,7 +55,7 @@ func TestSendMessageAsyncButWaitForResponse(t *testing.T) { } })) - messageSentAsync := bsmsg.New() + messageSentAsync := bsmsg.New(true) messageSentAsync.AddBlock(blocks.NewBlock([]byte("data"))) errSending := waiter.SendMessage( context.Background(), responderPeer.ID(), messageSentAsync) diff --git a/exchange/bitswap/wantmanager.go b/exchange/bitswap/wantmanager.go index 3b2067914..eb49201a6 100644 --- a/exchange/bitswap/wantmanager.go +++ b/exchange/bitswap/wantmanager.go @@ -2,6 +2,7 @@ package bitswap import ( "sync" + "time" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" engine "github.com/ipfs/go-ipfs/exchange/bitswap/decision" @@ -94,9 +95,8 @@ func (pm *WantManager) SendBlock(ctx context.Context, env *engine.Envelope) { // throughout the network stack defer env.Sent() - msg := bsmsg.New() + msg := bsmsg.New(false) msg.AddBlock(env.Block) - msg.SetFull(false) err := pm.network.SendMessage(ctx, env.Peer, msg) if err != nil { log.Error(err) @@ -113,11 +113,10 @@ func (pm *WantManager) startPeerHandler(p peer.ID) *msgQueue { mq := newMsgQueue(p) // new peer, we will want to give them our full wantlist - fullwantlist := bsmsg.New() + fullwantlist := bsmsg.New(true) for _, e := range pm.wl.Entries() { fullwantlist.AddEntry(e.Key, e.Priority) } - fullwantlist.SetFull(true) mq.out = fullwantlist mq.work <- struct{}{} @@ -180,6 +179,7 @@ func (pm *WantManager) Disconnected(p peer.ID) { // TODO: use goprocess here once i trust it func (pm *WantManager) Run() { + tock := time.NewTicker(rebroadcastDelay.Get()) for { select { case entries := <-pm.incoming: @@ -198,6 +198,19 @@ func (pm *WantManager) Run() { p.addMessage(entries) } + case <-tock.C: + // resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY) + var es []*bsmsg.Entry + for _, e := range pm.wl.Entries() { + es = append(es, &bsmsg.Entry{Entry: e}) + } + for _, p := range pm.peers { + p.outlk.Lock() + p.out = bsmsg.New(true) + p.outlk.Unlock() + + p.addMessage(es) + } case p := <-pm.connect: pm.startPeerHandler(p) case p := <-pm.disconnect: @@ -230,7 +243,7 @@ func (mq *msgQueue) addMessage(entries []*bsmsg.Entry) { // if we have no message held, or the one we are given is full // overwrite the one we are holding if mq.out == nil { - mq.out = bsmsg.New() + mq.out = bsmsg.New(false) } // TODO: add a msg.Combine(...) method