mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-30 18:13:54 +08:00
Merge pull request #1166 from ipfs/fix/bitswap-multisend
try harder to not send duplicate blocks
This commit is contained in:
@ -101,6 +101,8 @@ var bitswapStatCmd = &cmds.Command{
|
||||
buf := new(bytes.Buffer)
|
||||
fmt.Fprintln(buf, "bitswap status")
|
||||
fmt.Fprintf(buf, "\tprovides buffer: %d / %d\n", out.ProvideBufLen, bitswap.HasBlockBufferSize)
|
||||
fmt.Fprintf(buf, "\tblocks received: %d\n", out.BlocksReceived)
|
||||
fmt.Fprintf(buf, "\tdup blocks received: %d\n", out.DupBlksReceived)
|
||||
fmt.Fprintf(buf, "\twantlist [%d keys]\n", len(out.Wantlist))
|
||||
for _, k := range out.Wantlist {
|
||||
fmt.Fprintf(buf, "\t\t%s\n", k.B58String())
|
||||
|
@ -127,6 +127,9 @@ type Bitswap struct {
|
||||
newBlocks chan *blocks.Block
|
||||
|
||||
provideKeys chan u.Key
|
||||
|
||||
blocksRecvd int
|
||||
dupBlocksRecvd int
|
||||
}
|
||||
|
||||
type blockRequest struct {
|
||||
@ -219,6 +222,7 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
||||
return errors.New("bitswap is closed")
|
||||
default:
|
||||
}
|
||||
|
||||
if err := bs.blockstore.Put(blk); err != nil {
|
||||
return err
|
||||
}
|
||||
@ -342,6 +346,10 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
|
||||
// Should only track *useful* messages in ledger
|
||||
|
||||
for _, block := range incoming.Blocks() {
|
||||
bs.blocksRecvd++
|
||||
if has, err := bs.blockstore.Has(block.Key()); err == nil && has {
|
||||
bs.dupBlocksRecvd++
|
||||
}
|
||||
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
|
||||
if err := bs.HasBlock(hasBlockCtx, block); err != nil {
|
||||
log.Debug(err)
|
||||
|
@ -69,9 +69,6 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
|
||||
hasBlock := g.Next()
|
||||
defer hasBlock.Exchange.Close()
|
||||
|
||||
if err := hasBlock.Blockstore().Put(block); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if err := hasBlock.Exchange.HasBlock(context.Background(), block); err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
@ -136,7 +133,6 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
||||
var blkeys []u.Key
|
||||
first := instances[0]
|
||||
for _, b := range blocks {
|
||||
first.Blockstore().Put(b) // TODO remove. don't need to do this. bitswap owns block
|
||||
blkeys = append(blkeys, b.Key())
|
||||
first.Exchange.HasBlock(context.Background(), b)
|
||||
}
|
||||
@ -144,7 +140,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
|
||||
t.Log("Distribute!")
|
||||
|
||||
wg := sync.WaitGroup{}
|
||||
for _, inst := range instances {
|
||||
for _, inst := range instances[1:] {
|
||||
wg.Add(1)
|
||||
go func(inst Instance) {
|
||||
defer wg.Done()
|
||||
|
@ -46,7 +46,7 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
|
||||
defer tl.lock.Unlock()
|
||||
partner, ok := tl.partners[to]
|
||||
if !ok {
|
||||
partner = &activePartner{taskQueue: pq.New(wrapCmp(V1))}
|
||||
partner = newActivePartner()
|
||||
tl.pQueue.Push(partner)
|
||||
tl.partners[to] = partner
|
||||
}
|
||||
@ -57,12 +57,19 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
|
||||
return
|
||||
}
|
||||
|
||||
partner.activelk.Lock()
|
||||
defer partner.activelk.Unlock()
|
||||
_, ok = partner.activeBlocks[entry.Key]
|
||||
if ok {
|
||||
return
|
||||
}
|
||||
|
||||
task := &peerRequestTask{
|
||||
Entry: entry,
|
||||
Target: to,
|
||||
created: time.Now(),
|
||||
Done: func() {
|
||||
partner.TaskDone()
|
||||
partner.TaskDone(entry.Key)
|
||||
tl.lock.Lock()
|
||||
tl.pQueue.Update(partner.Index())
|
||||
tl.lock.Unlock()
|
||||
@ -93,7 +100,7 @@ func (tl *prq) Pop() *peerRequestTask {
|
||||
continue // discarding tasks that have been removed
|
||||
}
|
||||
|
||||
partner.StartTask()
|
||||
partner.StartTask(out.Entry.Key)
|
||||
partner.requests--
|
||||
break // and return |out|
|
||||
}
|
||||
@ -179,6 +186,8 @@ type activePartner struct {
|
||||
activelk sync.Mutex
|
||||
active int
|
||||
|
||||
activeBlocks map[u.Key]struct{}
|
||||
|
||||
// requests is the number of blocks this peer is currently requesting
|
||||
// request need not be locked around as it will only be modified under
|
||||
// the peerRequestQueue's locks
|
||||
@ -191,6 +200,13 @@ type activePartner struct {
|
||||
taskQueue pq.PQ
|
||||
}
|
||||
|
||||
func newActivePartner() *activePartner {
|
||||
return &activePartner{
|
||||
taskQueue: pq.New(wrapCmp(V1)),
|
||||
activeBlocks: make(map[u.Key]struct{}),
|
||||
}
|
||||
}
|
||||
|
||||
// partnerCompare implements pq.ElemComparator
|
||||
func partnerCompare(a, b pq.Elem) bool {
|
||||
pa := a.(*activePartner)
|
||||
@ -208,15 +224,17 @@ func partnerCompare(a, b pq.Elem) bool {
|
||||
}
|
||||
|
||||
// StartTask signals that a task was started for this partner
|
||||
func (p *activePartner) StartTask() {
|
||||
func (p *activePartner) StartTask(k u.Key) {
|
||||
p.activelk.Lock()
|
||||
p.activeBlocks[k] = struct{}{}
|
||||
p.active++
|
||||
p.activelk.Unlock()
|
||||
}
|
||||
|
||||
// TaskDone signals that a task was completed for this partner
|
||||
func (p *activePartner) TaskDone() {
|
||||
func (p *activePartner) TaskDone(k u.Key) {
|
||||
p.activelk.Lock()
|
||||
delete(p.activeBlocks, k)
|
||||
p.active--
|
||||
if p.active < 0 {
|
||||
panic("more tasks finished than started!")
|
||||
|
@ -6,15 +6,19 @@ import (
|
||||
)
|
||||
|
||||
type Stat struct {
|
||||
ProvideBufLen int
|
||||
Wantlist []u.Key
|
||||
Peers []string
|
||||
ProvideBufLen int
|
||||
Wantlist []u.Key
|
||||
Peers []string
|
||||
BlocksReceived int
|
||||
DupBlksReceived int
|
||||
}
|
||||
|
||||
func (bs *Bitswap) Stat() (*Stat, error) {
|
||||
st := new(Stat)
|
||||
st.ProvideBufLen = len(bs.newBlocks)
|
||||
st.Wantlist = bs.GetWantlist()
|
||||
st.BlocksReceived = bs.blocksRecvd
|
||||
st.DupBlksReceived = bs.dupBlocksRecvd
|
||||
|
||||
for _, p := range bs.engine.Peers() {
|
||||
st.Peers = append(st.Peers, p.Pretty())
|
||||
|
@ -11,7 +11,7 @@ import (
|
||||
u "github.com/ipfs/go-ipfs/util"
|
||||
)
|
||||
|
||||
var TaskWorkerCount = 16
|
||||
var TaskWorkerCount = 8
|
||||
|
||||
func init() {
|
||||
twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS")
|
||||
|
Reference in New Issue
Block a user