mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 08:47:42 +08:00
add worker to bitswap for reproviding new blocks
This commit is contained in:
@ -42,3 +42,9 @@ func (b *Block) Key() u.Key {
|
|||||||
func (b *Block) String() string {
|
func (b *Block) String() string {
|
||||||
return fmt.Sprintf("[Block %s]", b.Key())
|
return fmt.Sprintf("[Block %s]", b.Key())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (b *Block) Loggable() map[string]interface{} {
|
||||||
|
return map[string]interface{}{
|
||||||
|
"block": b.Key().String(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -8,7 +8,6 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
|
|
||||||
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||||
|
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
@ -37,9 +36,13 @@ const (
|
|||||||
maxProvidersPerRequest = 3
|
maxProvidersPerRequest = 3
|
||||||
providerRequestTimeout = time.Second * 10
|
providerRequestTimeout = time.Second * 10
|
||||||
hasBlockTimeout = time.Second * 15
|
hasBlockTimeout = time.Second * 15
|
||||||
|
provideTimeout = time.Second * 15
|
||||||
sizeBatchRequestChan = 32
|
sizeBatchRequestChan = 32
|
||||||
// kMaxPriority is the max priority as defined by the bitswap protocol
|
// kMaxPriority is the max priority as defined by the bitswap protocol
|
||||||
kMaxPriority = math.MaxInt32
|
kMaxPriority = math.MaxInt32
|
||||||
|
|
||||||
|
hasBlockBufferSize = 256
|
||||||
|
provideWorkers = 4
|
||||||
)
|
)
|
||||||
|
|
||||||
var (
|
var (
|
||||||
@ -86,18 +89,12 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
|
|||||||
wantlist: wantlist.NewThreadSafe(),
|
wantlist: wantlist.NewThreadSafe(),
|
||||||
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
|
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
|
||||||
process: px,
|
process: px,
|
||||||
|
newBlocks: make(chan *blocks.Block, hasBlockBufferSize),
|
||||||
}
|
}
|
||||||
network.SetDelegate(bs)
|
network.SetDelegate(bs)
|
||||||
px.Go(func(px process.Process) {
|
|
||||||
bs.clientWorker(ctx)
|
|
||||||
})
|
|
||||||
px.Go(func(px process.Process) {
|
|
||||||
bs.taskWorker(ctx)
|
|
||||||
})
|
|
||||||
px.Go(func(px process.Process) {
|
|
||||||
bs.rebroadcastWorker(ctx)
|
|
||||||
})
|
|
||||||
|
|
||||||
|
// Start up bitswaps async worker routines
|
||||||
|
bs.startWorkers(px, ctx)
|
||||||
return bs
|
return bs
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -126,6 +123,8 @@ type bitswap struct {
|
|||||||
wantlist *wantlist.ThreadSafe
|
wantlist *wantlist.ThreadSafe
|
||||||
|
|
||||||
process process.Process
|
process process.Process
|
||||||
|
|
||||||
|
newBlocks chan *blocks.Block
|
||||||
}
|
}
|
||||||
|
|
||||||
type blockRequest struct {
|
type blockRequest struct {
|
||||||
@ -172,7 +171,6 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
|
|||||||
case <-parent.Done():
|
case <-parent.Done():
|
||||||
return nil, parent.Err()
|
return nil, parent.Err()
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// GetBlocks returns a channel where the caller may receive blocks that
|
// GetBlocks returns a channel where the caller may receive blocks that
|
||||||
@ -205,6 +203,7 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
|
|||||||
// HasBlock announces the existance of a block to this bitswap service. The
|
// HasBlock announces the existance of a block to this bitswap service. The
|
||||||
// service will potentially notify its peers.
|
// service will potentially notify its peers.
|
||||||
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
||||||
|
log.Event(ctx, "hasBlock", blk)
|
||||||
select {
|
select {
|
||||||
case <-bs.process.Closing():
|
case <-bs.process.Closing():
|
||||||
return errors.New("bitswap is closed")
|
return errors.New("bitswap is closed")
|
||||||
@ -215,7 +214,12 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
|||||||
}
|
}
|
||||||
bs.wantlist.Remove(blk.Key())
|
bs.wantlist.Remove(blk.Key())
|
||||||
bs.notifications.Publish(blk)
|
bs.notifications.Publish(blk)
|
||||||
return bs.network.Provide(ctx, blk.Key())
|
select {
|
||||||
|
case bs.newBlocks <- blk:
|
||||||
|
case <-ctx.Done():
|
||||||
|
return ctx.Err()
|
||||||
|
}
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
|
func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
|
||||||
@ -310,6 +314,7 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
|
|||||||
log.Debug(err)
|
log.Debug(err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var keys []u.Key
|
var keys []u.Key
|
||||||
for _, block := range incoming.Blocks() {
|
for _, block := range incoming.Blocks() {
|
||||||
keys = append(keys, block.Key())
|
keys = append(keys, block.Key())
|
||||||
@ -391,82 +396,3 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage)
|
|||||||
func (bs *bitswap) Close() error {
|
func (bs *bitswap) Close() error {
|
||||||
return bs.process.Close()
|
return bs.process.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bs *bitswap) taskWorker(ctx context.Context) {
|
|
||||||
defer log.Info("bitswap task worker shutting down...")
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case nextEnvelope := <-bs.engine.Outbox():
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case envelope, ok := <-nextEnvelope:
|
|
||||||
if !ok {
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
|
|
||||||
bs.send(ctx, envelope.Peer, envelope.Message)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// TODO ensure only one active request per key
|
|
||||||
func (bs *bitswap) clientWorker(parent context.Context) {
|
|
||||||
defer log.Info("bitswap client worker shutting down...")
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case req := <-bs.batchRequests:
|
|
||||||
keys := req.keys
|
|
||||||
if len(keys) == 0 {
|
|
||||||
log.Warning("Received batch request for zero blocks")
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
for i, k := range keys {
|
|
||||||
bs.wantlist.Add(k, kMaxPriority-i)
|
|
||||||
}
|
|
||||||
|
|
||||||
bs.wantNewBlocks(req.ctx, keys)
|
|
||||||
|
|
||||||
// NB: Optimization. Assumes that providers of key[0] are likely to
|
|
||||||
// be able to provide for all keys. This currently holds true in most
|
|
||||||
// every situation. Later, this assumption may not hold as true.
|
|
||||||
child, _ := context.WithTimeout(req.ctx, providerRequestTimeout)
|
|
||||||
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
|
|
||||||
err := bs.sendWantlistToPeers(req.ctx, providers)
|
|
||||||
if err != nil {
|
|
||||||
log.Debugf("error sending wantlist: %s", err)
|
|
||||||
}
|
|
||||||
case <-parent.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
func (bs *bitswap) rebroadcastWorker(parent context.Context) {
|
|
||||||
ctx, cancel := context.WithCancel(parent)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
broadcastSignal := time.After(rebroadcastDelay.Get())
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-time.Tick(10 * time.Second):
|
|
||||||
n := bs.wantlist.Len()
|
|
||||||
if n > 0 {
|
|
||||||
log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
|
|
||||||
}
|
|
||||||
case <-broadcastSignal: // resend unfulfilled wantlist keys
|
|
||||||
entries := bs.wantlist.Entries()
|
|
||||||
if len(entries) > 0 {
|
|
||||||
bs.sendWantlistToProviders(ctx, entries)
|
|
||||||
}
|
|
||||||
broadcastSignal = time.After(rebroadcastDelay.Get())
|
|
||||||
case <-parent.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
133
exchange/bitswap/workers.go
Normal file
133
exchange/bitswap/workers.go
Normal file
@ -0,0 +1,133 @@
|
|||||||
|
package bitswap
|
||||||
|
|
||||||
|
import (
|
||||||
|
"time"
|
||||||
|
|
||||||
|
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||||
|
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
|
||||||
|
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||||
|
)
|
||||||
|
|
||||||
|
func (bs *bitswap) startWorkers(px process.Process, ctx context.Context) {
|
||||||
|
// Start up a worker to handle block requests this node is making
|
||||||
|
px.Go(func(px process.Process) {
|
||||||
|
bs.clientWorker(ctx)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Start up a worker to handle requests from other nodes for the data on this node
|
||||||
|
px.Go(func(px process.Process) {
|
||||||
|
bs.taskWorker(ctx)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Start up a worker to manage periodically resending our wantlist out to peers
|
||||||
|
px.Go(func(px process.Process) {
|
||||||
|
bs.rebroadcastWorker(ctx)
|
||||||
|
})
|
||||||
|
|
||||||
|
// Spawn up multiple workers to handle incoming blocks
|
||||||
|
// consider increasing number if providing blocks bottlenecks
|
||||||
|
// file transfers
|
||||||
|
for i := 0; i < provideWorkers; i++ {
|
||||||
|
px.Go(func(px process.Process) {
|
||||||
|
bs.blockReceiveWorker(ctx)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *bitswap) taskWorker(ctx context.Context) {
|
||||||
|
defer log.Info("bitswap task worker shutting down...")
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case nextEnvelope := <-bs.engine.Outbox():
|
||||||
|
select {
|
||||||
|
case envelope, ok := <-nextEnvelope:
|
||||||
|
if !ok {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
log.Event(ctx, "deliverBlocks", envelope.Message, envelope.Peer)
|
||||||
|
bs.send(ctx, envelope.Peer, envelope.Message)
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *bitswap) blockReceiveWorker(ctx context.Context) {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case blk, ok := <-bs.newBlocks:
|
||||||
|
if !ok {
|
||||||
|
log.Debug("newBlocks channel closed")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
ctx, _ := context.WithTimeout(ctx, provideTimeout)
|
||||||
|
err := bs.network.Provide(ctx, blk.Key())
|
||||||
|
if err != nil {
|
||||||
|
log.Error(err)
|
||||||
|
}
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO ensure only one active request per key
|
||||||
|
func (bs *bitswap) clientWorker(parent context.Context) {
|
||||||
|
defer log.Info("bitswap client worker shutting down...")
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case req := <-bs.batchRequests:
|
||||||
|
keys := req.keys
|
||||||
|
if len(keys) == 0 {
|
||||||
|
log.Warning("Received batch request for zero blocks")
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
for i, k := range keys {
|
||||||
|
bs.wantlist.Add(k, kMaxPriority-i)
|
||||||
|
}
|
||||||
|
|
||||||
|
bs.wantNewBlocks(req.ctx, keys)
|
||||||
|
|
||||||
|
// NB: Optimization. Assumes that providers of key[0] are likely to
|
||||||
|
// be able to provide for all keys. This currently holds true in most
|
||||||
|
// every situation. Later, this assumption may not hold as true.
|
||||||
|
child, _ := context.WithTimeout(req.ctx, providerRequestTimeout)
|
||||||
|
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
|
||||||
|
err := bs.sendWantlistToPeers(req.ctx, providers)
|
||||||
|
if err != nil {
|
||||||
|
log.Debugf("error sending wantlist: %s", err)
|
||||||
|
}
|
||||||
|
case <-parent.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (bs *bitswap) rebroadcastWorker(parent context.Context) {
|
||||||
|
ctx, cancel := context.WithCancel(parent)
|
||||||
|
defer cancel()
|
||||||
|
|
||||||
|
broadcastSignal := time.After(rebroadcastDelay.Get())
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-time.Tick(10 * time.Second):
|
||||||
|
n := bs.wantlist.Len()
|
||||||
|
if n > 0 {
|
||||||
|
log.Debug(n, inflect.FromNumber("keys", n), "in bitswap wantlist")
|
||||||
|
}
|
||||||
|
case <-broadcastSignal: // resend unfulfilled wantlist keys
|
||||||
|
entries := bs.wantlist.Entries()
|
||||||
|
if len(entries) > 0 {
|
||||||
|
bs.sendWantlistToProviders(ctx, entries)
|
||||||
|
}
|
||||||
|
broadcastSignal = time.After(rebroadcastDelay.Get())
|
||||||
|
case <-parent.Done():
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
Reference in New Issue
Block a user