mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 02:30:39 +08:00
allow bitswap to attempt to write blocks to disk multiple times
License: MIT Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
@ -228,7 +228,9 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
||||
default:
|
||||
}
|
||||
|
||||
if err := bs.blockstore.Put(blk); err != nil {
|
||||
err := bs.tryPutBlock(blk, 4) // attempt to store block up to four times
|
||||
if err != nil {
|
||||
log.Errorf("Error writing block to datastore: %s", err)
|
||||
return err
|
||||
}
|
||||
|
||||
@ -242,6 +244,18 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error {
|
||||
var err error
|
||||
for i := 0; i < attempts; i++ {
|
||||
if err = bs.blockstore.Put(blk); err == nil {
|
||||
break
|
||||
}
|
||||
|
||||
time.Sleep(time.Millisecond * time.Duration(400*(i+1)))
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) {
|
||||
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
@ -297,38 +311,46 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg
|
||||
wg.Add(1)
|
||||
go func(b *blocks.Block) {
|
||||
defer wg.Done()
|
||||
bs.counterLk.Lock()
|
||||
bs.blocksRecvd++
|
||||
has, err := bs.blockstore.Has(b.Key())
|
||||
if err != nil {
|
||||
bs.counterLk.Unlock()
|
||||
log.Infof("blockstore.Has error: %s", err)
|
||||
return
|
||||
}
|
||||
if err == nil && has {
|
||||
bs.dupBlocksRecvd++
|
||||
}
|
||||
brecvd := bs.blocksRecvd
|
||||
bdup := bs.dupBlocksRecvd
|
||||
bs.counterLk.Unlock()
|
||||
if has {
|
||||
return
|
||||
|
||||
if err := bs.updateReceiveCounters(b.Key()); err != nil {
|
||||
return // ignore error, is either logged previously, or ErrAlreadyHaveBlock
|
||||
}
|
||||
|
||||
k := b.Key()
|
||||
log.Event(ctx, "Bitswap.GetBlockRequest.End", &k)
|
||||
|
||||
log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup)
|
||||
log.Debugf("got block %s from %s", b, p)
|
||||
hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout)
|
||||
defer cancel()
|
||||
if err := bs.HasBlock(hasBlockCtx, b); err != nil {
|
||||
log.Warningf("ReceiveMessage HasBlock error: %s", err)
|
||||
}
|
||||
cancel()
|
||||
}(block)
|
||||
}
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
var ErrAlreadyHaveBlock = errors.New("already have block")
|
||||
|
||||
func (bs *Bitswap) updateReceiveCounters(k key.Key) error {
|
||||
bs.counterLk.Lock()
|
||||
defer bs.counterLk.Unlock()
|
||||
bs.blocksRecvd++
|
||||
has, err := bs.blockstore.Has(k)
|
||||
if err != nil {
|
||||
log.Infof("blockstore.Has error: %s", err)
|
||||
return err
|
||||
}
|
||||
if err == nil && has {
|
||||
bs.dupBlocksRecvd++
|
||||
}
|
||||
|
||||
if has {
|
||||
return ErrAlreadyHaveBlock
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Connected/Disconnected warns bitswap about peer connections
|
||||
func (bs *Bitswap) PeerConnected(p peer.ID) {
|
||||
bs.wm.Connected(p)
|
||||
|
Reference in New Issue
Block a user