mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 17:36:38 +08:00
Merge pull request #3876 from dgrisham/master
bug fix: Bitswap now records 'BytesSent' in ledger
This commit is contained in:
@ -11,6 +11,7 @@ import (
|
|||||||
blocks "github.com/ipfs/go-ipfs/blocks"
|
blocks "github.com/ipfs/go-ipfs/blocks"
|
||||||
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
|
blockstore "github.com/ipfs/go-ipfs/blocks/blockstore"
|
||||||
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
|
blocksutil "github.com/ipfs/go-ipfs/blocks/blocksutil"
|
||||||
|
decision "github.com/ipfs/go-ipfs/exchange/bitswap/decision"
|
||||||
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
|
tn "github.com/ipfs/go-ipfs/exchange/bitswap/testnet"
|
||||||
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
|
mockrouting "github.com/ipfs/go-ipfs/routing/mock"
|
||||||
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
|
delay "github.com/ipfs/go-ipfs/thirdparty/delay"
|
||||||
@ -489,3 +490,165 @@ func TestWantlistCleanup(t *testing.T) {
|
|||||||
t.Fatal("should only have keys[0] in wantlist")
|
t.Fatal("should only have keys[0] in wantlist")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func assertLedgerMatch(ra, rb *decision.Receipt) error {
|
||||||
|
if ra.Sent != rb.Recv {
|
||||||
|
return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d sent vs %d recvd", ra.Sent, rb.Recv)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ra.Recv != rb.Sent {
|
||||||
|
return fmt.Errorf("mismatch in ledgers (exchanged bytes): %d recvd vs %d sent", ra.Recv, rb.Sent)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ra.Exchanged != rb.Exchanged {
|
||||||
|
return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func assertLedgerEqual(ra, rb *decision.Receipt) error {
|
||||||
|
if ra.Value != rb.Value {
|
||||||
|
return fmt.Errorf("mismatch in ledgers (value/debt ratio): %f vs %f ", ra.Value, rb.Value)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ra.Sent != rb.Sent {
|
||||||
|
return fmt.Errorf("mismatch in ledgers (sent bytes): %d vs %d", ra.Sent, rb.Sent)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ra.Recv != rb.Recv {
|
||||||
|
return fmt.Errorf("mismatch in ledgers (recvd bytes): %d vs %d", ra.Recv, rb.Recv)
|
||||||
|
}
|
||||||
|
|
||||||
|
if ra.Exchanged != rb.Exchanged {
|
||||||
|
return fmt.Errorf("mismatch in ledgers (exchanged blocks): %d vs %d ", ra.Exchanged, rb.Exchanged)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func newReceipt(sent, recv, exchanged uint64) *decision.Receipt {
|
||||||
|
return &decision.Receipt{
|
||||||
|
Peer: "test",
|
||||||
|
Value: float64(sent) / (1 + float64(recv)),
|
||||||
|
Sent: sent,
|
||||||
|
Recv: recv,
|
||||||
|
Exchanged: exchanged,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBitswapLedgerOneWay(t *testing.T) {
|
||||||
|
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
|
||||||
|
sg := NewTestSessionGenerator(net)
|
||||||
|
defer sg.Close()
|
||||||
|
bg := blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
|
t.Log("Test ledgers match when one peer sends block to another")
|
||||||
|
|
||||||
|
instances := sg.Instances(2)
|
||||||
|
blocks := bg.Blocks(1)
|
||||||
|
err := instances[0].Exchange.HasBlock(blocks[0])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
|
defer cancel()
|
||||||
|
blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
|
||||||
|
rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)
|
||||||
|
|
||||||
|
// compare peer ledger receipts
|
||||||
|
err = assertLedgerMatch(ra, rb)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that receipts have intended values
|
||||||
|
ratest := newReceipt(1, 0, 1)
|
||||||
|
err = assertLedgerEqual(ratest, ra)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
rbtest := newReceipt(0, 1, 1)
|
||||||
|
err = assertLedgerEqual(rbtest, rb)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log(blk)
|
||||||
|
for _, inst := range instances {
|
||||||
|
err := inst.Exchange.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestBitswapLedgerTwoWay(t *testing.T) {
|
||||||
|
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
|
||||||
|
sg := NewTestSessionGenerator(net)
|
||||||
|
defer sg.Close()
|
||||||
|
bg := blocksutil.NewBlockGenerator()
|
||||||
|
|
||||||
|
t.Log("Test ledgers match when two peers send one block to each other")
|
||||||
|
|
||||||
|
instances := sg.Instances(2)
|
||||||
|
blocks := bg.Blocks(2)
|
||||||
|
err := instances[0].Exchange.HasBlock(blocks[0])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = instances[1].Exchange.HasBlock(blocks[1])
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
|
||||||
|
defer cancel()
|
||||||
|
blk, err := instances[1].Exchange.GetBlock(ctx, blocks[0].Cid())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ctx, cancel = context.WithTimeout(context.Background(), time.Second*5)
|
||||||
|
defer cancel()
|
||||||
|
blk, err = instances[0].Exchange.GetBlock(ctx, blocks[1].Cid())
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
ra := instances[0].Exchange.LedgerForPeer(instances[1].Peer)
|
||||||
|
rb := instances[1].Exchange.LedgerForPeer(instances[0].Peer)
|
||||||
|
|
||||||
|
// compare peer ledger receipts
|
||||||
|
err = assertLedgerMatch(ra, rb)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// check that receipts have intended values
|
||||||
|
rtest := newReceipt(1, 1, 2)
|
||||||
|
err = assertLedgerEqual(rtest, ra)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = assertLedgerEqual(rtest, rb)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log(blk)
|
||||||
|
for _, inst := range instances {
|
||||||
|
err := inst.Exchange.Close()
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -286,6 +286,9 @@ func (e *Engine) AddBlock(block blocks.Block) {
|
|||||||
|
|
||||||
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
|
func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
|
||||||
l := e.findOrCreate(p)
|
l := e.findOrCreate(p)
|
||||||
|
l.lk.Lock()
|
||||||
|
defer l.lk.Unlock()
|
||||||
|
|
||||||
for _, block := range m.Blocks() {
|
for _, block := range m.Blocks() {
|
||||||
l.SentBytes(len(block.RawData()))
|
l.SentBytes(len(block.RawData()))
|
||||||
l.wantList.Remove(block.Cid())
|
l.wantList.Remove(block.Cid())
|
||||||
|
@ -6,6 +6,8 @@ import (
|
|||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
bsmsg "github.com/ipfs/go-ipfs/exchange/bitswap/message"
|
||||||
|
|
||||||
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
|
process "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess"
|
||||||
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
|
procctx "gx/ipfs/QmSF8fPo3jgVBAy8fpdjjYqgG87dkJgUprRBHRd2tmfgpP/goprocess/context"
|
||||||
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
|
logging "gx/ipfs/QmSpJByNKFX1sCsHBEp3R73FL4NF6FnQTEGyNAXHm2GS52/go-log"
|
||||||
@ -63,6 +65,12 @@ func (bs *Bitswap) taskWorker(ctx context.Context, id int) {
|
|||||||
"Block": envelope.Block.Cid().String(),
|
"Block": envelope.Block.Cid().String(),
|
||||||
})
|
})
|
||||||
|
|
||||||
|
// update the BS ledger to reflect sent message
|
||||||
|
// TODO: Should only track *useful* messages in ledger
|
||||||
|
outgoing := bsmsg.New(false)
|
||||||
|
outgoing.AddBlock(envelope.Block)
|
||||||
|
bs.engine.MessageSent(envelope.Peer, outgoing)
|
||||||
|
|
||||||
bs.wm.SendBlock(ctx, envelope)
|
bs.wm.SendBlock(ctx, envelope)
|
||||||
bs.counterLk.Lock()
|
bs.counterLk.Lock()
|
||||||
bs.blocksSent++
|
bs.blocksSent++
|
||||||
|
Reference in New Issue
Block a user