1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-01 10:49:24 +08:00

wire contexts into bitswap requests more deeply

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy
2015-11-20 15:24:14 -08:00
committed by Jeromy
parent 5d25fc1f16
commit 1f4d3300b0
7 changed files with 106 additions and 85 deletions

View File

@ -122,6 +122,10 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []key.Key) <-chan *bloc
} }
} }
if len(misses) == 0 {
return
}
rblocks, err := s.Exchange.GetBlocks(ctx, misses) rblocks, err := s.Exchange.GetBlocks(ctx, misses)
if err != nil { if err != nil {
log.Debugf("Error with GetBlocks: %s", err) log.Debugf("Error with GetBlocks: %s", err)

View File

@ -86,7 +86,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
notifications: notif, notifications: notif,
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network, network: network,
findKeys: make(chan *blockRequest, sizeBatchRequestChan), findKeys: make(chan *wantlist.Entry, sizeBatchRequestChan),
process: px, process: px,
newBlocks: make(chan *blocks.Block, HasBlockBufferSize), newBlocks: make(chan *blocks.Block, HasBlockBufferSize),
provideKeys: make(chan key.Key, provideKeysBufferSize), provideKeys: make(chan key.Key, provideKeysBufferSize),
@ -129,7 +129,7 @@ type Bitswap struct {
notifications notifications.PubSub notifications notifications.PubSub
// send keys to a worker to find and connect to providers for them // send keys to a worker to find and connect to providers for them
findKeys chan *blockRequest findKeys chan *wantlist.Entry
engine *decision.Engine engine *decision.Engine
@ -146,8 +146,8 @@ type Bitswap struct {
} }
type blockRequest struct { type blockRequest struct {
keys []key.Key key key.Key
ctx context.Context ctx context.Context
} }
// GetBlock attempts to retrieve a particular block from peers within the // GetBlock attempts to retrieve a particular block from peers within the
@ -208,6 +208,12 @@ func (bs *Bitswap) WantlistForPeer(p peer.ID) []key.Key {
// resources, provide a context with a reasonably short deadline (ie. not one // resources, provide a context with a reasonably short deadline (ie. not one
// that lasts throughout the lifetime of the server) // that lasts throughout the lifetime of the server)
func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) { func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *blocks.Block, error) {
if len(keys) == 0 {
out := make(chan *blocks.Block)
close(out)
return out, nil
}
select { select {
case <-bs.process.Closing(): case <-bs.process.Closing():
return nil, errors.New("bitswap is closed") return nil, errors.New("bitswap is closed")
@ -219,11 +225,14 @@ func (bs *Bitswap) GetBlocks(ctx context.Context, keys []key.Key) (<-chan *block
log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k) log.Event(ctx, "Bitswap.GetBlockRequest.Start", &k)
} }
bs.wm.WantBlocks(keys) bs.wm.WantBlocks(ctx, keys)
req := &blockRequest{ // NB: Optimization. Assumes that providers of key[0] are likely to
keys: keys, // be able to provide for all keys. This currently holds true in most
ctx: ctx, // every situation. Later, this assumption may not hold as true.
req := &wantlist.Entry{
Key: keys[0],
Ctx: ctx,
} }
select { select {
case bs.findKeys <- req: case bs.findKeys <- req:
@ -276,32 +285,6 @@ func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error {
return err return err
} }
func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// Get providers for all entries in wantlist (could take a while)
wg := sync.WaitGroup{}
for _, e := range entries {
wg.Add(1)
go func(k key.Key) {
defer wg.Done()
child, cancel := context.WithTimeout(ctx, providerRequestTimeout)
defer cancel()
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers {
go func(p peer.ID) {
bs.network.ConnectTo(ctx, p)
}(prov)
}
}(e.Key)
}
wg.Wait() // make sure all our children do finish.
}
func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) { func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg.BitSwapMessage) {
// This call records changes to wantlists, blocks received, // This call records changes to wantlists, blocks received,
// and number of bytes transfered. // and number of bytes transfered.

View File

@ -217,7 +217,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
e.peerRequestQueue.Remove(entry.Key, p) e.peerRequestQueue.Remove(entry.Key, p)
} else { } else {
log.Debugf("wants %s - %d", entry.Key, entry.Priority) log.Debugf("wants %s - %d", entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority) l.Wants(entry.Ctx, entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists { if exists, err := e.bs.Has(entry.Key); err == nil && exists {
e.peerRequestQueue.Push(entry.Entry, p) e.peerRequestQueue.Push(entry.Entry, p)
newWorkExists = true newWorkExists = true

View File

@ -6,6 +6,8 @@ import (
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist" wl "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer" peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer"
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
) )
// keySet is just a convenient alias for maps of keys, where we only care // keySet is just a convenient alias for maps of keys, where we only care
@ -68,9 +70,9 @@ func (l *ledger) ReceivedBytes(n int) {
} }
// TODO: this needs to be different. We need timeouts. // TODO: this needs to be different. We need timeouts.
func (l *ledger) Wants(k key.Key, priority int) { func (l *ledger) Wants(ctx context.Context, k key.Key, priority int) {
log.Debugf("peer %s wants %s", l.Partner, k) log.Debugf("peer %s wants %s", l.Partner, k)
l.wantList.Add(k, priority) l.wantList.Add(ctx, k, priority)
} }
func (l *ledger) CancelWant(k key.Key) { func (l *ledger) CancelWant(k key.Key) {

View File

@ -3,9 +3,12 @@
package wantlist package wantlist
import ( import (
key "github.com/ipfs/go-ipfs/blocks/key"
"sort" "sort"
"sync" "sync"
key "github.com/ipfs/go-ipfs/blocks/key"
"gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
) )
type ThreadSafe struct { type ThreadSafe struct {
@ -16,7 +19,6 @@ type ThreadSafe struct {
// not threadsafe // not threadsafe
type Wantlist struct { type Wantlist struct {
set map[key.Key]Entry set map[key.Key]Entry
// TODO provide O(1) len accessor if cost becomes an issue
} }
type Entry struct { type Entry struct {
@ -24,6 +26,7 @@ type Entry struct {
// slices can be copied efficiently. // slices can be copied efficiently.
Key key.Key Key key.Key
Priority int Priority int
Ctx context.Context
} }
type entrySlice []Entry type entrySlice []Entry
@ -44,22 +47,25 @@ func New() *Wantlist {
} }
} }
func (w *ThreadSafe) Add(k key.Key, priority int) { func (w *ThreadSafe) Add(ctx context.Context, k key.Key, priority int) {
// TODO rm defer for perf
w.lk.Lock() w.lk.Lock()
defer w.lk.Unlock() defer w.lk.Unlock()
w.Wantlist.Add(k, priority) w.Wantlist.Add(ctx, k, priority)
}
func (w *ThreadSafe) AddEntry(e Entry) {
w.lk.Lock()
defer w.lk.Unlock()
w.Wantlist.AddEntry(e)
} }
func (w *ThreadSafe) Remove(k key.Key) { func (w *ThreadSafe) Remove(k key.Key) {
// TODO rm defer for perf
w.lk.Lock() w.lk.Lock()
defer w.lk.Unlock() defer w.lk.Unlock()
w.Wantlist.Remove(k) w.Wantlist.Remove(k)
} }
func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) { func (w *ThreadSafe) Contains(k key.Key) (Entry, bool) {
// TODO rm defer for perf
w.lk.RLock() w.lk.RLock()
defer w.lk.RUnlock() defer w.lk.RUnlock()
return w.Wantlist.Contains(k) return w.Wantlist.Contains(k)
@ -87,16 +93,24 @@ func (w *Wantlist) Len() int {
return len(w.set) return len(w.set)
} }
func (w *Wantlist) Add(k key.Key, priority int) { func (w *Wantlist) Add(ctx context.Context, k key.Key, priority int) {
if _, ok := w.set[k]; ok { if _, ok := w.set[k]; ok {
return return
} }
w.set[k] = Entry{ w.set[k] = Entry{
Key: k, Key: k,
Priority: priority, Priority: priority,
Ctx: ctx,
} }
} }
func (w *Wantlist) AddEntry(e Entry) {
if _, ok := w.set[e.Key]; ok {
return
}
w.set[e.Key] = e
}
func (w *Wantlist) Remove(k key.Key) { func (w *Wantlist) Remove(k key.Key) {
delete(w.set, k) delete(w.set, k)
} }

View File

@ -64,16 +64,16 @@ type msgQueue struct {
done chan struct{} done chan struct{}
} }
func (pm *WantManager) WantBlocks(ks []key.Key) { func (pm *WantManager) WantBlocks(ctx context.Context, ks []key.Key) {
log.Infof("want blocks: %s", ks) log.Infof("want blocks: %s", ks)
pm.addEntries(ks, false) pm.addEntries(ctx, ks, false)
} }
func (pm *WantManager) CancelWants(ks []key.Key) { func (pm *WantManager) CancelWants(ks []key.Key) {
pm.addEntries(ks, true) pm.addEntries(context.TODO(), ks, true)
} }
func (pm *WantManager) addEntries(ks []key.Key, cancel bool) { func (pm *WantManager) addEntries(ctx context.Context, ks []key.Key, cancel bool) {
var entries []*bsmsg.Entry var entries []*bsmsg.Entry
for i, k := range ks { for i, k := range ks {
entries = append(entries, &bsmsg.Entry{ entries = append(entries, &bsmsg.Entry{
@ -81,6 +81,7 @@ func (pm *WantManager) addEntries(ks []key.Key, cancel bool) {
Entry: wantlist.Entry{ Entry: wantlist.Entry{
Key: k, Key: k,
Priority: kMaxPriority - i, Priority: kMaxPriority - i,
Ctx: ctx,
}, },
}) })
} }
@ -224,7 +225,7 @@ func (pm *WantManager) Run() {
if e.Cancel { if e.Cancel {
pm.wl.Remove(e.Key) pm.wl.Remove(e.Key)
} else { } else {
pm.wl.Add(e.Key, e.Priority) pm.wl.AddEntry(e.Entry)
} }
} }
@ -237,6 +238,14 @@ func (pm *WantManager) Run() {
// resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY) // resend entire wantlist every so often (REALLY SHOULDNT BE NECESSARY)
var es []*bsmsg.Entry var es []*bsmsg.Entry
for _, e := range pm.wl.Entries() { for _, e := range pm.wl.Entries() {
select {
case <-e.Ctx.Done():
// entry has been cancelled
// simply continue, the entry will be removed from the
// wantlist soon enough
continue
default:
}
es = append(es, &bsmsg.Entry{Entry: e}) es = append(es, &bsmsg.Entry{Entry: e})
} }
for _, p := range pm.peers { for _, p := range pm.peers {

View File

@ -1,6 +1,7 @@
package bitswap package bitswap
import ( import (
"sync"
"time" "time"
process "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess" process "gx/ipfs/QmQopLATEYMNg7dVqZRNDfeE2S1yKy8zrRh5xnYiuqeZBn/goprocess"
@ -8,6 +9,8 @@ import (
context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context" context "gx/ipfs/QmZy2y8t9zQH2a1b8q2ZSLKp17ATuJoCNxxyMFG5qFExpt/go-net/context"
key "github.com/ipfs/go-ipfs/blocks/key" key "github.com/ipfs/go-ipfs/blocks/key"
wantlist "github.com/ipfs/go-ipfs/exchange/bitswap/wantlist"
peer "gx/ipfs/QmZwZjMVGss5rqYsJVGy18gNbkTJffFyq2x1uJ4e4p3ZAt/go-libp2p-peer"
logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log" logging "gx/ipfs/Qmazh5oNUVsDZTs2g59rq8aYQqwpss8tcUWQzor5sCCEuH/go-log"
) )
@ -16,7 +19,7 @@ var TaskWorkerCount = 8
func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) { func (bs *Bitswap) startWorkers(px process.Process, ctx context.Context) {
// Start up a worker to handle block requests this node is making // Start up a worker to handle block requests this node is making
px.Go(func(px process.Process) { px.Go(func(px process.Process) {
bs.providerConnector(ctx) bs.providerQueryManager(ctx)
}) })
// Start up workers to handle requests from other nodes for the data on this node // Start up workers to handle requests from other nodes for the data on this node
@ -149,37 +152,6 @@ func (bs *Bitswap) provideCollector(ctx context.Context) {
} }
} }
// connects to providers for the given keys
func (bs *Bitswap) providerConnector(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
for {
log.Event(parent, "Bitswap.ProviderConnector.Loop")
select {
case req := <-bs.findKeys:
keys := req.keys
if len(keys) == 0 {
log.Warning("Received batch request for zero blocks")
continue
}
log.Event(parent, "Bitswap.ProviderConnector.Work", logging.LoggableMap{"Keys": keys})
// NB: Optimization. Assumes that providers of key[0] are likely to
// be able to provide for all keys. This currently holds true in most
// every situation. Later, this assumption may not hold as true.
child, cancel := context.WithTimeout(req.ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
for p := range providers {
go bs.network.ConnectTo(req.ctx, p)
}
cancel()
case <-parent.Done():
return
}
}
}
func (bs *Bitswap) rebroadcastWorker(parent context.Context) { func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
ctx, cancel := context.WithCancel(parent) ctx, cancel := context.WithCancel(parent)
defer cancel() defer cancel()
@ -200,12 +172,49 @@ func (bs *Bitswap) rebroadcastWorker(parent context.Context) {
} }
case <-broadcastSignal.C: // resend unfulfilled wantlist keys case <-broadcastSignal.C: // resend unfulfilled wantlist keys
log.Event(ctx, "Bitswap.Rebroadcast.active") log.Event(ctx, "Bitswap.Rebroadcast.active")
entries := bs.wm.wl.Entries() for _, e := range bs.wm.wl.Entries() {
if len(entries) > 0 { bs.findKeys <- &e
bs.connectToProviders(ctx, entries)
} }
case <-parent.Done(): case <-parent.Done():
return return
} }
} }
} }
func (bs *Bitswap) providerQueryManager(ctx context.Context) {
var activeLk sync.Mutex
active := make(map[key.Key]*wantlist.Entry)
for {
select {
case e := <-bs.findKeys:
activeLk.Lock()
if _, ok := active[e.Key]; ok {
activeLk.Unlock()
continue
}
active[e.Key] = e
activeLk.Unlock()
go func(e *wantlist.Entry) {
child, cancel := context.WithTimeout(e.Ctx, providerRequestTimeout)
defer cancel()
providers := bs.network.FindProvidersAsync(child, e.Key, maxProvidersPerRequest)
for p := range providers {
go func(p peer.ID) {
err := bs.network.ConnectTo(child, p)
if err != nil {
log.Debug("failed to connect to provider %s: %s", p, err)
}
}(p)
}
activeLk.Lock()
delete(active, e.Key)
activeLk.Unlock()
}(e)
case <-ctx.Done():
return
}
}
}