mirror of
https://github.com/ipfs/kubo.git
synced 2025-08-06 11:31:54 +08:00
refactor: wantlist splits into WL and ThreadSafe WL
bitswap keeps the threadsafe version. observing the ledger shows that it doesn't need it anymore (ledgermanager is protected and safe). License: MIT Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:

committed by
Juan Batiz-Benet

parent
d069ae11f4
commit
bef622222d
@ -15,7 +15,7 @@ import (
|
||||
bsnet "github.com/jbenet/go-ipfs/exchange/bitswap/network"
|
||||
notifications "github.com/jbenet/go-ipfs/exchange/bitswap/notifications"
|
||||
strategy "github.com/jbenet/go-ipfs/exchange/bitswap/strategy"
|
||||
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
|
||||
@ -59,7 +59,7 @@ func New(parent context.Context, p peer.Peer, network bsnet.BitSwapNetwork, rout
|
||||
ledgermanager: strategy.NewLedgerManager(ctx, bstore),
|
||||
routing: routing,
|
||||
sender: network,
|
||||
wantlist: wl.New(),
|
||||
wantlist: wantlist.NewThreadSafe(),
|
||||
batchRequests: make(chan []u.Key, 32),
|
||||
}
|
||||
network.SetDelegate(bs)
|
||||
@ -95,7 +95,7 @@ type bitswap struct {
|
||||
|
||||
ledgermanager *strategy.LedgerManager
|
||||
|
||||
wantlist *wl.Wantlist
|
||||
wantlist *wantlist.ThreadSafe
|
||||
|
||||
// cancelFunc signals cancellation to the bitswap event loop
|
||||
cancelFunc func()
|
||||
@ -203,7 +203,7 @@ func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.Peer) e
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wl.Wantlist) {
|
||||
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantlist.ThreadSafe) {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
|
@ -6,25 +6,86 @@ import (
|
||||
"sync"
|
||||
)
|
||||
|
||||
type ThreadSafe struct {
|
||||
lk sync.RWMutex
|
||||
Wantlist
|
||||
}
|
||||
|
||||
// not threadsafe
|
||||
type Wantlist struct {
|
||||
lk sync.RWMutex
|
||||
set map[u.Key]*Entry
|
||||
}
|
||||
|
||||
type Entry struct {
|
||||
Key u.Key
|
||||
Priority int
|
||||
}
|
||||
|
||||
type entrySlice []*Entry
|
||||
|
||||
func (es entrySlice) Len() int { return len(es) }
|
||||
func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
|
||||
func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priority }
|
||||
|
||||
func NewThreadSafe() *ThreadSafe {
|
||||
return &ThreadSafe{
|
||||
Wantlist: *New(),
|
||||
}
|
||||
}
|
||||
|
||||
func New() *Wantlist {
|
||||
return &Wantlist{
|
||||
set: make(map[u.Key]*Entry),
|
||||
}
|
||||
}
|
||||
|
||||
type Entry struct {
|
||||
Key u.Key
|
||||
Priority int
|
||||
func (w *ThreadSafe) Add(k u.Key, priority int) {
|
||||
// TODO rm defer for perf
|
||||
w.lk.Lock()
|
||||
defer w.lk.Unlock()
|
||||
w.Wantlist.Add(k, priority)
|
||||
}
|
||||
|
||||
func (w *ThreadSafe) Remove(k u.Key) {
|
||||
// TODO rm defer for perf
|
||||
w.lk.Lock()
|
||||
defer w.lk.Unlock()
|
||||
w.Wantlist.Remove(k)
|
||||
}
|
||||
|
||||
func (w *ThreadSafe) Contains(k u.Key) bool {
|
||||
// TODO rm defer for perf
|
||||
w.lk.RLock()
|
||||
defer w.lk.RUnlock()
|
||||
return w.Wantlist.Contains(k)
|
||||
}
|
||||
|
||||
func (w *ThreadSafe) Entries() []*Entry {
|
||||
w.lk.RLock()
|
||||
defer w.lk.RUnlock()
|
||||
var es entrySlice
|
||||
for _, e := range w.set {
|
||||
es = append(es, e)
|
||||
}
|
||||
// TODO rename SortedEntries (state that they're sorted so callers know
|
||||
// they're paying an expense)
|
||||
sort.Sort(es)
|
||||
return es
|
||||
}
|
||||
|
||||
func (w *ThreadSafe) SortedEntries() []*Entry {
|
||||
w.lk.RLock()
|
||||
defer w.lk.RUnlock()
|
||||
var es entrySlice
|
||||
|
||||
for _, e := range w.set {
|
||||
es = append(es, e)
|
||||
}
|
||||
sort.Sort(es)
|
||||
return es
|
||||
}
|
||||
|
||||
func (w *Wantlist) Add(k u.Key, priority int) {
|
||||
w.lk.Lock()
|
||||
defer w.lk.Unlock()
|
||||
if _, ok := w.set[k]; ok {
|
||||
return
|
||||
}
|
||||
@ -35,28 +96,15 @@ func (w *Wantlist) Add(k u.Key, priority int) {
|
||||
}
|
||||
|
||||
func (w *Wantlist) Remove(k u.Key) {
|
||||
w.lk.Lock()
|
||||
defer w.lk.Unlock()
|
||||
delete(w.set, k)
|
||||
}
|
||||
|
||||
func (w *Wantlist) Contains(k u.Key) bool {
|
||||
w.lk.RLock()
|
||||
defer w.lk.RUnlock()
|
||||
_, ok := w.set[k]
|
||||
return ok
|
||||
}
|
||||
|
||||
type entrySlice []*Entry
|
||||
|
||||
func (es entrySlice) Len() int { return len(es) }
|
||||
func (es entrySlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
|
||||
func (es entrySlice) Less(i, j int) bool { return es[i].Priority > es[j].Priority }
|
||||
|
||||
func (w *Wantlist) Entries() []*Entry {
|
||||
w.lk.RLock()
|
||||
defer w.lk.RUnlock()
|
||||
|
||||
var es entrySlice
|
||||
|
||||
for _, e := range w.set {
|
||||
@ -67,8 +115,6 @@ func (w *Wantlist) Entries() []*Entry {
|
||||
}
|
||||
|
||||
func (w *Wantlist) SortedEntries() []*Entry {
|
||||
w.lk.RLock()
|
||||
defer w.lk.RUnlock()
|
||||
var es entrySlice
|
||||
|
||||
for _, e := range w.set {
|
||||
|
Reference in New Issue
Block a user