mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 14:34:24 +08:00
try harder to not send duplicate blocks
This commit is contained in:
@ -46,7 +46,7 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
|
|||||||
defer tl.lock.Unlock()
|
defer tl.lock.Unlock()
|
||||||
partner, ok := tl.partners[to]
|
partner, ok := tl.partners[to]
|
||||||
if !ok {
|
if !ok {
|
||||||
partner = &activePartner{taskQueue: pq.New(wrapCmp(V1))}
|
partner = newActivePartner()
|
||||||
tl.pQueue.Push(partner)
|
tl.pQueue.Push(partner)
|
||||||
tl.partners[to] = partner
|
tl.partners[to] = partner
|
||||||
}
|
}
|
||||||
@ -57,12 +57,19 @@ func (tl *prq) Push(entry wantlist.Entry, to peer.ID) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
partner.activelk.Lock()
|
||||||
|
defer partner.activelk.Unlock()
|
||||||
|
_, ok = partner.activeBlocks[entry.Key]
|
||||||
|
if ok {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
task := &peerRequestTask{
|
task := &peerRequestTask{
|
||||||
Entry: entry,
|
Entry: entry,
|
||||||
Target: to,
|
Target: to,
|
||||||
created: time.Now(),
|
created: time.Now(),
|
||||||
Done: func() {
|
Done: func() {
|
||||||
partner.TaskDone()
|
partner.TaskDone(entry.Key)
|
||||||
tl.lock.Lock()
|
tl.lock.Lock()
|
||||||
tl.pQueue.Update(partner.Index())
|
tl.pQueue.Update(partner.Index())
|
||||||
tl.lock.Unlock()
|
tl.lock.Unlock()
|
||||||
@ -93,7 +100,7 @@ func (tl *prq) Pop() *peerRequestTask {
|
|||||||
continue // discarding tasks that have been removed
|
continue // discarding tasks that have been removed
|
||||||
}
|
}
|
||||||
|
|
||||||
partner.StartTask()
|
partner.StartTask(out.Entry.Key)
|
||||||
partner.requests--
|
partner.requests--
|
||||||
break // and return |out|
|
break // and return |out|
|
||||||
}
|
}
|
||||||
@ -179,6 +186,8 @@ type activePartner struct {
|
|||||||
activelk sync.Mutex
|
activelk sync.Mutex
|
||||||
active int
|
active int
|
||||||
|
|
||||||
|
activeBlocks map[u.Key]struct{}
|
||||||
|
|
||||||
// requests is the number of blocks this peer is currently requesting
|
// requests is the number of blocks this peer is currently requesting
|
||||||
// request need not be locked around as it will only be modified under
|
// request need not be locked around as it will only be modified under
|
||||||
// the peerRequestQueue's locks
|
// the peerRequestQueue's locks
|
||||||
@ -191,6 +200,13 @@ type activePartner struct {
|
|||||||
taskQueue pq.PQ
|
taskQueue pq.PQ
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func newActivePartner() *activePartner {
|
||||||
|
return &activePartner{
|
||||||
|
taskQueue: pq.New(wrapCmp(V1)),
|
||||||
|
activeBlocks: make(map[u.Key]struct{}),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
// partnerCompare implements pq.ElemComparator
|
// partnerCompare implements pq.ElemComparator
|
||||||
func partnerCompare(a, b pq.Elem) bool {
|
func partnerCompare(a, b pq.Elem) bool {
|
||||||
pa := a.(*activePartner)
|
pa := a.(*activePartner)
|
||||||
@ -208,15 +224,17 @@ func partnerCompare(a, b pq.Elem) bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// StartTask signals that a task was started for this partner
|
// StartTask signals that a task was started for this partner
|
||||||
func (p *activePartner) StartTask() {
|
func (p *activePartner) StartTask(k u.Key) {
|
||||||
p.activelk.Lock()
|
p.activelk.Lock()
|
||||||
|
p.activeBlocks[k] = struct{}{}
|
||||||
p.active++
|
p.active++
|
||||||
p.activelk.Unlock()
|
p.activelk.Unlock()
|
||||||
}
|
}
|
||||||
|
|
||||||
// TaskDone signals that a task was completed for this partner
|
// TaskDone signals that a task was completed for this partner
|
||||||
func (p *activePartner) TaskDone() {
|
func (p *activePartner) TaskDone(k u.Key) {
|
||||||
p.activelk.Lock()
|
p.activelk.Lock()
|
||||||
|
delete(p.activeBlocks, k)
|
||||||
p.active--
|
p.active--
|
||||||
if p.active < 0 {
|
if p.active < 0 {
|
||||||
panic("more tasks finished than started!")
|
panic("more tasks finished than started!")
|
||||||
|
@ -11,7 +11,7 @@ import (
|
|||||||
u "github.com/ipfs/go-ipfs/util"
|
u "github.com/ipfs/go-ipfs/util"
|
||||||
)
|
)
|
||||||
|
|
||||||
var TaskWorkerCount = 16
|
var TaskWorkerCount = 8
|
||||||
|
|
||||||
func init() {
|
func init() {
|
||||||
twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS")
|
twc := os.Getenv("IPFS_BITSWAP_TASK_WORKERS")
|
||||||
|
Reference in New Issue
Block a user