diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 4511e188e..206b44f1e 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -228,7 +228,9 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { default: } - if err := bs.blockstore.Put(blk); err != nil { + err := bs.tryPutBlock(blk, 4) // attempt to store block up to four times + if err != nil { + log.Errorf("Error writing block to datastore: %s", err) return err } @@ -242,6 +244,18 @@ func (bs *Bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return nil } +func (bs *Bitswap) tryPutBlock(blk *blocks.Block, attempts int) error { + var err error + for i := 0; i < attempts; i++ { + if err = bs.blockstore.Put(blk); err == nil { + break + } + + time.Sleep(time.Millisecond * time.Duration(400*(i+1))) + } + return err +} + func (bs *Bitswap) connectToProviders(ctx context.Context, entries []wantlist.Entry) { ctx, cancel := context.WithCancel(ctx) @@ -297,38 +311,46 @@ func (bs *Bitswap) ReceiveMessage(ctx context.Context, p peer.ID, incoming bsmsg wg.Add(1) go func(b *blocks.Block) { defer wg.Done() - bs.counterLk.Lock() - bs.blocksRecvd++ - has, err := bs.blockstore.Has(b.Key()) - if err != nil { - bs.counterLk.Unlock() - log.Infof("blockstore.Has error: %s", err) - return - } - if err == nil && has { - bs.dupBlocksRecvd++ - } - brecvd := bs.blocksRecvd - bdup := bs.dupBlocksRecvd - bs.counterLk.Unlock() - if has { - return + + if err := bs.updateReceiveCounters(b.Key()); err != nil { + return // ignore error, is either logged previously, or ErrAlreadyHaveBlock } k := b.Key() log.Event(ctx, "Bitswap.GetBlockRequest.End", &k) - log.Debugf("got block %s from %s (%d,%d)", b, p, brecvd, bdup) + log.Debugf("got block %s from %s", b, p) hasBlockCtx, cancel := context.WithTimeout(ctx, hasBlockTimeout) + defer cancel() if err := bs.HasBlock(hasBlockCtx, b); err != nil { log.Warningf("ReceiveMessage HasBlock error: %s", err) } - cancel() }(block) } wg.Wait() } +var ErrAlreadyHaveBlock = errors.New("already have block") + +func (bs *Bitswap) updateReceiveCounters(k key.Key) error { + bs.counterLk.Lock() + defer bs.counterLk.Unlock() + bs.blocksRecvd++ + has, err := bs.blockstore.Has(k) + if err != nil { + log.Infof("blockstore.Has error: %s", err) + return err + } + if err == nil && has { + bs.dupBlocksRecvd++ + } + + if has { + return ErrAlreadyHaveBlock + } + return nil +} + // Connected/Disconnected warns bitswap about peer connections func (bs *Bitswap) PeerConnected(p peer.ID) { bs.wm.Connected(p)