From 0514504d093464d4ba6f3df32bb2c28889b5c31e Mon Sep 17 00:00:00 2001 From: dgrisham Date: Mon, 24 Apr 2017 15:50:25 -0600 Subject: [PATCH 1/3] bug fix: `BytesSent` in peers' ledgers now updates When sending data to another user, the number of bytes sent to that user (saved by the corresponding Bitswap ledger) was not updated (it was always 0). This also meant that the debt ratio was also always 0. The function that updates the `BytesSent` value in the ledger, `MessageSent()`, was already implemented, however it was not called when the peer was sent data. To fix this, a call to `MessageSent()` was made in the `taskWorker()` function, which is where both the message in question and the Bitswap engine were available to make the call. `MessageSent()` requires the peer's ID and `BitSwapMessage` as its arguments, the latter of which had to be created by making a new `BitSwapMessage`, then the block being sent was added to the new message. Note that, similar to the analagous call to `MessageReceived()`, records *all* of the bytes sent to a particular user. At some point, both of these should be updated to only record the numbers of *useful* bytes sent and received between peers. License: MIT Signed-off-by: David Grisham --- exchange/bitswap/workers.go | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 6c6fe0e8b..028b9735d 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -6,6 +6,8 @@ import ( "sync" "time" + bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message" + process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess" procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context" logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log" @@ -63,6 +65,12 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) { "Block": envelope.Block.Cid().String(), }) + // update the BS ledger to reflect sent message + // TODO: Should only track *useful* messages in ledger + outgoing := bsmsg.New(false) + outgoing.AddBlock(envelope.Block) + bs.engine.MessageSent(envelope.Peer, outgoing) + bs.wm.SendBlock(ctx, envelope) bs.counterLk.Lock() bs.blocksSent++ From 74e19eae62b13411b1456f3534ccdbb7f739edfc Mon Sep 17 00:00:00 2001 From: dgrisham Date: Mon, 24 Apr 2017 20:33:52 -0600 Subject: [PATCH 2/3] tests + data dependency fix: `BytesSent` bug now completely fixed Tests were added to ensure that the bug fix in commit 000fbd25 was correct. The tests caught an error where a peer's ledger was not properly locked when updating it in the `MessageSent()` function. The appropriate calls to lock the ledger were made, and the tests successfully passed. License: MIT Signed-off-by: David Grisham --- exchange/bitswap/bitswap_test.go | 107 ++++++++++++++++++++++++++++ exchange/bitswap/decision/engine.go | 3 + 2 files changed, 110 insertions(+) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 78467ce94..e13ff4c8e 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -11,6 +11,7 @@ import ( blocks "github.com/ipfs/go-ipfs/blocks" blockstore "github.com/ipfs/go-ipfs/blocks/blockstore" blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil" + decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision" tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet" mockrouting "github.com/ipfs/go-ipfs/routing/mock" delay "github.com/ipfs/go-ipfs/thirdparty/delay" @@ -489,3 +490,109 @@ func TestWantlistCleanup(t *testing.T) { t.Fatal("should only have keys[0] in wantlist") } } + +func assertLedgerMatch(ra, rb *decision.Receipt) error { + if ra.Sent != rb.Recv { + return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d sent vs %d recvd", ra.Sent, rb.Recv) + } + + if ra.Recv != rb.Sent { + return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d recvd vs %d sent", ra.Recv, rb.Sent) + } + + if ra.Exchanged != rb.Exchanged { + return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged) + } + + return nil +} + +func TestBitswapBytesSentOneWay(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + bg := blocksutil.NewBlockGenerator() + + t.Log("Test ledgers match when one peer sends block to another") + + instances := sg.Instances(2) + blocks := bg.Blocks(1) + err := instances[0].Exchange.HasBlock(blocks[0]) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid()) + if err != nil { + t.Fatal(err) + } + + ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer) + rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer) + + err = assertLedgerMatch(ra, rb) + if err != nil { + t.Fatal(err) + } + + t.Log(blk) + for _, inst := range instances { + err := inst.Exchange.Close() + if err != nil { + t.Fatal(err) + } + } +} + +func TestBitswapBytesSentTwoWay(t *testing.T) { + net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) + sg := NewTestSessionGenerator(net) + defer sg.Close() + bg := blocksutil.NewBlockGenerator() + + t.Log("Test ledgers match when two peers send one block to each other") + + instances := sg.Instances(2) + blocks := bg.Blocks(2) + err := instances[0].Exchange.HasBlock(blocks[0]) + if err != nil { + t.Fatal(err) + } + + err = instances[1].Exchange.HasBlock(blocks[1]) + if err != nil { + t.Fatal(err) + } + + ctx, cancel := context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid()) + if err != nil { + t.Fatal(err) + } + + ctx, cancel = context.WithTimeout(context.Background(), time.Second*5) + defer cancel() + blk, err = instances[0].Exchange.GetBlock(ctx, blocks[1].Cid()) + if err != nil { + t.Fatal(err) + } + + ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer) + rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer) + + err = assertLedgerMatch(ra, rb) + if err != nil { + t.Fatal(err) + } + + t.Log(blk) + for _, inst := range instances { + err := inst.Exchange.Close() + if err != nil { + t.Fatal(err) + } + } +} diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index f4b170800..6c1a9e936 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -286,6 +286,9 @@ func (e *Engine) AddBlock(block blocks.Block) { func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error { l := e.findOrCreate(p) + l.lk.Lock() + defer l.lk.Unlock() + for _, block := range m.Blocks() { l.SentBytes(len(block.RawData())) l.wantList.Remove(block.Cid()) From 4789a622b2e2675d42a8ff017468109d44b8026b Mon Sep 17 00:00:00 2001 From: dgrisham Date: Tue, 25 Apr 2017 13:54:49 -0600 Subject: [PATCH 3/3] tests: bitswap ledger tests modified Updated the `TestBitswapLedger*` tests and added assertions to check concrete values for ledgers (rather than just checking that two peers' ledgers match). The names for these tests were also changed from the previous commit, according to 's/BytesSent/Ledger/'. License: MIT Signed-off-by: David Grisham --- exchange/bitswap/bitswap_test.go | 60 ++++++++++++++++++++++++++++++-- 1 file changed, 58 insertions(+), 2 deletions(-) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index e13ff4c8e..548c4a62d 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -507,7 +507,37 @@ func assertLedgerMatch(ra, rb *decision.Receipt) error { return nil } -func TestBitswapBytesSentOneWay(t *testing.T) { +func assertLedgerEqual(ra, rb *decision.Receipt) error { + if ra.Value != rb.Value { + return fmt.Errorf("mismatch in ledgers (value/debt ratio): %f vs %f ", ra.Value, rb.Value) + } + + if ra.Sent != rb.Sent { + return fmt.Errorf("mismatch in ledgers (sent bytes): %d vs %d", ra.Sent, rb.Sent) + } + + if ra.Recv != rb.Recv { + return fmt.Errorf("mismatch in ledgers (recvd bytes): %d vs %d", ra.Recv, rb.Recv) + } + + if ra.Exchanged != rb.Exchanged { + return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged) + } + + return nil +} + +func newReceipt(sent, recv, exchanged uint64) *decision.Receipt { + return &decision.Receipt{ + Peer: "test", + Value: float64(sent) / (1 + float64(recv)), + Sent: sent, + Recv: recv, + Exchanged: exchanged, + } +} + +func TestBitswapLedgerOneWay(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) sg := NewTestSessionGenerator(net) defer sg.Close() @@ -532,11 +562,24 @@ func TestBitswapBytesSentOneWay(t *testing.T) { ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer) rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer) + // compare peer ledger receipts err = assertLedgerMatch(ra, rb) if err != nil { t.Fatal(err) } + // check that receipts have intended values + ratest := newReceipt(1, 0, 1) + err = assertLedgerEqual(ratest, ra) + if err != nil { + t.Fatal(err) + } + rbtest := newReceipt(0, 1, 1) + err = assertLedgerEqual(rbtest, rb) + if err != nil { + t.Fatal(err) + } + t.Log(blk) for _, inst := range instances { err := inst.Exchange.Close() @@ -546,7 +589,7 @@ func TestBitswapBytesSentOneWay(t *testing.T) { } } -func TestBitswapBytesSentTwoWay(t *testing.T) { +func TestBitswapLedgerTwoWay(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) sg := NewTestSessionGenerator(net) defer sg.Close() @@ -583,11 +626,24 @@ func TestBitswapBytesSentTwoWay(t *testing.T) { ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer) rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer) + // compare peer ledger receipts err = assertLedgerMatch(ra, rb) if err != nil { t.Fatal(err) } + // check that receipts have intended values + rtest := newReceipt(1, 1, 2) + err = assertLedgerEqual(rtest, ra) + if err != nil { + t.Fatal(err) + } + + err = assertLedgerEqual(rtest, rb) + if err != nil { + t.Fatal(err) + } + t.Log(blk) for _, inst := range instances { err := inst.Exchange.Close()