From 86fb07aed7d32ccd1a33f8bab0ea47aa3adbb359 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 29 Apr 2015 01:36:47 -0700 Subject: [PATCH 1/3] try harder to not send duplicate blocks --- .../bitswap/decision/peer_request_queue.go | 28 +++++++++++++++---- exchange/bitswap/workers.go | 2 +- 2 files changed, 24 insertions(+), 6 deletions(-) diff --git a/exchange/bitswap/decision/peer_request_queue.go b/exchange/bitswap/decision/peer_request_queue.go index e771ece0b..42928487d 100644 --- a/exchange/bitswap/decision/peer_request_queue.go +++ b/exchange/bitswap/decision/peer_request_queue.go @@ -46,7 +46,7 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { defer tl.lock.Unlock() partner, ok := tl.partners[to] if !ok { - partner = &activePartner{taskQueue: pq.New(wrapCmp(V1))} + partner = newActivePartner() tl.pQueue.Push(partner) tl.partners[to] = partner } @@ -57,12 +57,19 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) { return } + partner.activelk.Lock() + defer partner.activelk.Unlock() + _, ok = partner.activeBlocks[entry.Key] + if ok { + return + } + task := &peerRequestTask{ Entry: entry, Target: to, created: time.Now(), Done: func() { - partner.TaskDone() + partner.TaskDone(entry.Key) tl.lock.Lock() tl.pQueue.Update(partner.Index()) tl.lock.Unlock() @@ -93,7 +100,7 @@ func (tl *prq) Pop() *peerRequestTask { continue // discarding tasks that have been removed } - partner.StartTask() + partner.StartTask(out.Entry.Key) partner.requests-- break // and return |out| } @@ -179,6 +186,8 @@ type activePartner struct { activelk sync.Mutex active int + activeBlocks map[u.Key]struct{} + // requests is the number of blocks this peer is currently requesting // request need not be locked around as it will only be modified under // the peerRequestQueue's locks @@ -191,6 +200,13 @@ type activePartner struct { taskQueue pq.PQ } +func newActivePartner() *activePartner { + return &activePartner{ + taskQueue: pq.New(wrapCmp(V1)), + activeBlocks: make(map[u.Key]struct{}), + } +} + // partnerCompare implements pq.ElemComparator func partnerCompare(a, b pq.Elem) bool { pa := a.(*activePartner) @@ -208,15 +224,17 @@ func partnerCompare(a, b pq.Elem) bool { } // StartTask signals that a task was started for this partner -func (p *activePartner) StartTask() { +func (p *activePartner) StartTask(k u.Key) { p.activelk.Lock() + p.activeBlocks[k] = struct{}{} p.active++ p.activelk.Unlock() } // TaskDone signals that a task was completed for this partner -func (p *activePartner) TaskDone() { +func (p *activePartner) TaskDone(k u.Key) { p.activelk.Lock() + delete(p.activeBlocks, k) p.active-- if p.active < 0 { panic("more tasks finished than started!") diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index 4e2bf43b8..1fc59a214 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -11,7 +11,7 @@ import ( u "github.com/ipfs/go-ipfs/util" ) -var TaskWorkerCount = 16 +var TaskWorkerCount = 8 func init() { twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS") From f998339acb8a5a74eac708576d66fcd062804a0d Mon Sep 17 00:00:00 2001 From: Jeromy Date: Wed, 29 Apr 2015 19:59:18 -0700 Subject: [PATCH 2/3] remove some redundant blockputs to avoid false duplicate block receives --- exchange/bitswap/bitswap.go | 9 +++++++++ exchange/bitswap/bitswap_test.go | 6 +----- 2 files changed, 10 insertions(+), 5 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 37826c492..937ee131e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -219,6 +219,15 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return errors.New("bitswap is closed") default: } + has, err := bs.blockstore.Has(blk.Key()) + if err != nil { + return err + } + + if has { + log.Error(bs.self, "Dup Block! ", blk.Key()) + } + if err := bs.blockstore.Put(blk); err != nil { return err } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index 85b3c0ec8..85a8e9d5d 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -69,9 +69,6 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { hasBlock := g.Next() defer hasBlock.Exchange.Close() - if err := hasBlock.Blockstore().Put(block); err != nil { - t.Fatal(err) - } if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil { t.Fatal(err) } @@ -136,7 +133,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { var blkeys []u.Key first := instances[0] for _, b := range blocks { - first.Blockstore().Put(b) // TODO remove. don't need to do this. bitswap owns block blkeys = append(blkeys, b.Key()) first.Exchange.HasBlock(context.Background(), b) } @@ -144,7 +140,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.Log("Distribute!") wg := sync.WaitGroup{} - for _, inst := range instances { + for _, inst := range instances[1:] { wg.Add(1) go func(inst Instance) { defer wg.Done() From 6f04302a48fe642e96afe7734230743cc59f2fe4 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Mon, 4 May 2015 03:12:17 -0700 Subject: [PATCH 3/3] remove logging of dup blocks, move to counters for bitswap stat --- core/commands/bitswap.go | 2 ++ exchange/bitswap/bitswap.go | 15 +++++++-------- exchange/bitswap/stat.go | 10 +++++++--- 3 files changed, 16 insertions(+), 11 deletions(-) diff --git a/core/commands/bitswap.go b/core/commands/bitswap.go index f89f38d7f..cfc522bae 100644 --- a/core/commands/bitswap.go +++ b/core/commands/bitswap.go @@ -101,6 +101,8 @@ var bitswapStatCmd = &cmds.Command{ buf := new(bytes.Buffer) fmt.Fprintln(buf, "bitswap status") fmt.Fprintf(buf, "\tprovides buffer: %d / %d\n", out.ProvideBufLen, bitswap.HasBlockBufferSize) + fmt.Fprintf(buf, "\tblocks received: %d\n", out.BlocksReceived) + fmt.Fprintf(buf, "\tdup blocks received: %d\n", out.DupBlksReceived) fmt.Fprintf(buf, "\twantlist [%d keys]\n", len(out.Wantlist)) for _, k := range out.Wantlist { fmt.Fprintf(buf, "\t\t%s\n", k.B58String()) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 937ee131e..8b12a4727 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -127,6 +127,9 @@ type Bitswap struct { newBlocks chan *blocks.Block provideKeys chan u.Key + + blocksRecvd int + dupBlocksRecvd int } type blockRequest struct { @@ -219,14 +222,6 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return errors.New("bitswap is closed") default: } - has, err := bs.blockstore.Has(blk.Key()) - if err != nil { - return err - } - - if has { - log.Error(bs.self, "Dup Block! ", blk.Key()) - } if err := bs.blockstore.Put(blk); err != nil { return err @@ -351,6 +346,10 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg // Should only track *useful* messages in ledger for _, block := range incoming.Blocks() { + bs.blocksRecvd++ + if has, err := bs.blockstore.Has(block.Key()); err == nil && has { + bs.dupBlocksRecvd++ + } hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) if err := bs.HasBlock(hasBlockCtx, block); err != nil { log.Debug(err) diff --git a/exchange/bitswap/stat.go b/exchange/bitswap/stat.go index 1c5fec62b..ceab4b2ee 100644 --- a/exchange/bitswap/stat.go +++ b/exchange/bitswap/stat.go @@ -6,15 +6,19 @@ import ( ) type Stat struct { - ProvideBufLen int - Wantlist []u.Key - Peers []string + ProvideBufLen int + Wantlist []u.Key + Peers []string + BlocksReceived int + DupBlksReceived int } func (bs *Bitswap) Stat() (*Stat, error) { st := new(Stat) st.ProvideBufLen = len(bs.newBlocks) st.Wantlist = bs.GetWantlist() + st.BlocksReceived = bs.blocksRecvd + st.DupBlksReceived = bs.dupBlocksRecvd for _, p := range bs.engine.Peers() { st.Peers = append(st.Peers, p.Pretty())