mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 05:52:20 +08:00
blockservice: async HasBlock with ratelimit
This commit is contained in:
@ -8,6 +8,8 @@ import (
|
||||
"fmt"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
procrl "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||
@ -17,6 +19,9 @@ import (
|
||||
var log = u.Logger("blockservice")
|
||||
var ErrNotFound = errors.New("blockservice: key not found")
|
||||
|
||||
// MaxExchangeAddWorkers rate limits the number of exchange workers
|
||||
var MaxExchangeAddWorkers = 100
|
||||
|
||||
// BlockService is a hybrid block datastore. It stores data in a local
|
||||
// datastore and may retrieve data from a remote Exchange.
|
||||
// It uses an internal `datastore.Datastore` instance to store values.
|
||||
@ -24,6 +29,9 @@ type BlockService struct {
|
||||
// TODO don't expose underlying impl details
|
||||
Blockstore blockstore.Blockstore
|
||||
Exchange exchange.Interface
|
||||
|
||||
rateLimiter *procrl.RateLimiter
|
||||
exchangeAdd chan blocks.Block
|
||||
}
|
||||
|
||||
// NewBlockService creates a BlockService with given datastore instance.
|
||||
@ -34,7 +42,17 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error
|
||||
if rem == nil {
|
||||
log.Warning("blockservice running in local (offline) mode.")
|
||||
}
|
||||
return &BlockService{Blockstore: bs, Exchange: rem}, nil
|
||||
|
||||
// exchangeAdd is a channel for async workers to add to the exchange.
|
||||
// 100 blocks buffer. not clear what this number should be
|
||||
exchangeAdd := make(chan blocks.Block, 100)
|
||||
|
||||
return &BlockService{
|
||||
Blockstore: bs,
|
||||
Exchange: rem,
|
||||
exchangeAdd: exchangeAdd,
|
||||
rateLimiter: procrl.NewRateLimiter(process.Background(), MaxExchangeAddWorkers),
|
||||
}, nil
|
||||
}
|
||||
|
||||
// AddBlock adds a particular block to the service, Putting it into the datastore.
|
||||
@ -46,15 +64,21 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
||||
return k, err
|
||||
}
|
||||
|
||||
// TODO this operation rate-limits blockservice operations, we should
|
||||
// consider moving this to an sync process.
|
||||
// this operation rate-limits blockservice operations, so it is
|
||||
// now an async process.
|
||||
if s.Exchange != nil {
|
||||
ctx := context.TODO()
|
||||
if err := s.Exchange.HasBlock(ctx, b); err != nil {
|
||||
// suppress error, as the client shouldn't care about bitswap.
|
||||
// the client only cares about the blockstore.Put.
|
||||
log.Errorf("Exchange.HasBlock error: %s", err)
|
||||
}
|
||||
|
||||
// LimitedGo will spawn a goroutine but provide proper backpressure.
|
||||
// it will not spawn the goroutine until the ratelimiter's work load
|
||||
// is under the threshold.
|
||||
s.rateLimiter.LimitedGo(func(worker process.Process) {
|
||||
ctx := context.TODO()
|
||||
if err := s.Exchange.HasBlock(ctx, b); err != nil {
|
||||
// suppress error, as the client shouldn't care about bitswap.
|
||||
// the client only cares about the blockstore.Put.
|
||||
log.Errorf("Exchange.HasBlock error: %s", err)
|
||||
}
|
||||
})
|
||||
}
|
||||
return k, nil
|
||||
}
|
||||
|
Reference in New Issue
Block a user