mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 05:52:20 +08:00
fix(blockservice) fully async exchange.HasBlock
This commit is contained in:
@ -22,6 +22,7 @@ func TestBlocks(t *testing.T) {
|
||||
t.Error("failed to construct block service", err)
|
||||
return
|
||||
}
|
||||
defer bs.Close()
|
||||
|
||||
b := blocks.NewBlock([]byte("beep boop"))
|
||||
h := u.Hash([]byte("beep boop"))
|
||||
@ -61,6 +62,9 @@ func TestBlocks(t *testing.T) {
|
||||
|
||||
func TestGetBlocksSequential(t *testing.T) {
|
||||
var servs = Mocks(t, 4)
|
||||
for _, s := range servs {
|
||||
defer s.Close()
|
||||
}
|
||||
bg := blocksutil.NewBlockGenerator()
|
||||
blks := bg.Blocks(50)
|
||||
|
||||
@ -73,7 +77,7 @@ func TestGetBlocksSequential(t *testing.T) {
|
||||
t.Log("one instance at a time, get blocks concurrently")
|
||||
|
||||
for i := 1; i < len(servs); i++ {
|
||||
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
|
||||
ctx, _ := context.WithTimeout(context.TODO(), time.Second*50)
|
||||
out := servs[i].GetBlocks(ctx, keys)
|
||||
gotten := make(map[u.Key]*blocks.Block)
|
||||
for blk := range out {
|
||||
|
@ -8,20 +8,33 @@ import (
|
||||
"fmt"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
procrl "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
"github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
worker "github.com/jbenet/go-ipfs/blockservice/worker"
|
||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
var wc = worker.Config{
|
||||
// When running on a single core, NumWorkers has a harsh negative effect on
|
||||
// throughput. (-80% when < 25)
|
||||
// Running a lot more workers appears to have very little effect on both
|
||||
// single and multicore configurations.
|
||||
NumWorkers: 25,
|
||||
|
||||
// These have no effect on when running on multiple cores, but harsh
|
||||
// negative effect on throughput when running on a single core
|
||||
// On multicore configurations these buffers have little effect on
|
||||
// throughput.
|
||||
// On single core configurations, larger buffers have severe adverse
|
||||
// effects on throughput.
|
||||
ClientBufferSize: 0,
|
||||
WorkerBufferSize: 0,
|
||||
}
|
||||
|
||||
var log = u.Logger("blockservice")
|
||||
var ErrNotFound = errors.New("blockservice: key not found")
|
||||
|
||||
// MaxExchangeAddWorkers rate limits the number of exchange workers
|
||||
var MaxExchangeAddWorkers = 100
|
||||
|
||||
// BlockService is a hybrid block datastore. It stores data in a local
|
||||
// datastore and may retrieve data from a remote Exchange.
|
||||
// It uses an internal `datastore.Datastore` instance to store values.
|
||||
@ -30,8 +43,7 @@ type BlockService struct {
|
||||
Blockstore blockstore.Blockstore
|
||||
Exchange exchange.Interface
|
||||
|
||||
rateLimiter *procrl.RateLimiter
|
||||
exchangeAdd chan blocks.Block
|
||||
worker *worker.Worker
|
||||
}
|
||||
|
||||
// NewBlockService creates a BlockService with given datastore instance.
|
||||
@ -43,15 +55,10 @@ func New(bs blockstore.Blockstore, rem exchange.Interface) (*BlockService, error
|
||||
log.Warning("blockservice running in local (offline) mode.")
|
||||
}
|
||||
|
||||
// exchangeAdd is a channel for async workers to add to the exchange.
|
||||
// 100 blocks buffer. not clear what this number should be
|
||||
exchangeAdd := make(chan blocks.Block, 100)
|
||||
|
||||
return &BlockService{
|
||||
Blockstore: bs,
|
||||
Exchange: rem,
|
||||
exchangeAdd: exchangeAdd,
|
||||
rateLimiter: procrl.NewRateLimiter(process.Background(), MaxExchangeAddWorkers),
|
||||
Blockstore: bs,
|
||||
Exchange: rem,
|
||||
worker: worker.NewWorker(rem, wc),
|
||||
}, nil
|
||||
}
|
||||
|
||||
@ -63,22 +70,8 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
||||
if err != nil {
|
||||
return k, err
|
||||
}
|
||||
|
||||
// this operation rate-limits blockservice operations, so it is
|
||||
// now an async process.
|
||||
if s.Exchange != nil {
|
||||
|
||||
// LimitedGo will spawn a goroutine but provide proper backpressure.
|
||||
// it will not spawn the goroutine until the ratelimiter's work load
|
||||
// is under the threshold.
|
||||
s.rateLimiter.LimitedGo(func(worker process.Process) {
|
||||
ctx := context.TODO()
|
||||
if err := s.Exchange.HasBlock(ctx, b); err != nil {
|
||||
// suppress error, as the client shouldn't care about bitswap.
|
||||
// the client only cares about the blockstore.Put.
|
||||
log.Errorf("Exchange.HasBlock error: %s", err)
|
||||
}
|
||||
})
|
||||
if err := s.worker.HasBlock(b); err != nil {
|
||||
return "", errors.New("blockservice is closed")
|
||||
}
|
||||
return k, nil
|
||||
}
|
||||
@ -148,3 +141,8 @@ func (s *BlockService) GetBlocks(ctx context.Context, ks []u.Key) <-chan *blocks
|
||||
func (s *BlockService) DeleteBlock(k u.Key) error {
|
||||
return s.Blockstore.DeleteBlock(k)
|
||||
}
|
||||
|
||||
func (s *BlockService) Close() error {
|
||||
log.Debug("blockservice is shutting down...")
|
||||
return s.worker.Close()
|
||||
}
|
||||
|
82
blockservice/worker/bench/main.go
Normal file
82
blockservice/worker/bench/main.go
Normal file
@ -0,0 +1,82 @@
|
||||
package main
|
||||
|
||||
import (
|
||||
"log"
|
||||
"math"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
ds_sync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
worker "github.com/jbenet/go-ipfs/blockservice/worker"
|
||||
"github.com/jbenet/go-ipfs/exchange/offline"
|
||||
"github.com/jbenet/go-ipfs/thirdparty/delay"
|
||||
"github.com/jbenet/go-ipfs/util/datastore2"
|
||||
)
|
||||
|
||||
const kEstRoutingDelay = time.Second
|
||||
|
||||
const kBlocksPerOp = 100
|
||||
|
||||
func main() {
|
||||
var bestConfig worker.Config
|
||||
var quickestNsPerOp int64 = math.MaxInt64
|
||||
for NumWorkers := 1; NumWorkers < 10; NumWorkers++ {
|
||||
for ClientBufferSize := 0; ClientBufferSize < 10; ClientBufferSize++ {
|
||||
for WorkerBufferSize := 0; WorkerBufferSize < 10; WorkerBufferSize++ {
|
||||
c := worker.Config{
|
||||
NumWorkers: NumWorkers,
|
||||
ClientBufferSize: ClientBufferSize,
|
||||
WorkerBufferSize: WorkerBufferSize,
|
||||
}
|
||||
result := testing.Benchmark(BenchmarkWithConfig(c))
|
||||
if result.NsPerOp() < quickestNsPerOp {
|
||||
bestConfig = c
|
||||
quickestNsPerOp = result.NsPerOp()
|
||||
}
|
||||
log.Printf("benched %+v \t result: %+v", c, result)
|
||||
}
|
||||
}
|
||||
}
|
||||
log.Println(bestConfig)
|
||||
}
|
||||
|
||||
func BenchmarkWithConfig(c worker.Config) func(b *testing.B) {
|
||||
return func(b *testing.B) {
|
||||
|
||||
routingDelay := delay.Fixed(0) // during setup
|
||||
|
||||
dstore := ds_sync.MutexWrap(datastore2.WithDelay(ds.NewMapDatastore(), routingDelay))
|
||||
bstore := blockstore.NewBlockstore(dstore)
|
||||
var testdata []*blocks.Block
|
||||
var i int64
|
||||
for i = 0; i < kBlocksPerOp; i++ {
|
||||
testdata = append(testdata, blocks.NewBlock([]byte(string(i))))
|
||||
}
|
||||
b.ResetTimer()
|
||||
b.SetBytes(kBlocksPerOp)
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
||||
b.StopTimer()
|
||||
w := worker.NewWorker(offline.Exchange(bstore), c)
|
||||
b.StartTimer()
|
||||
|
||||
prev := routingDelay.Set(kEstRoutingDelay) // during measured section
|
||||
|
||||
for _, block := range testdata {
|
||||
if err := w.HasBlock(block); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
routingDelay.Set(prev) // to hasten the unmeasured close period
|
||||
|
||||
b.StopTimer()
|
||||
w.Close()
|
||||
b.StartTimer()
|
||||
|
||||
}
|
||||
}
|
||||
}
|
42
blockservice/worker/bench_worker_test.go
Normal file
42
blockservice/worker/bench_worker_test.go
Normal file
@ -0,0 +1,42 @@
|
||||
package worker
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
dssync "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore/sync"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
|
||||
"github.com/jbenet/go-ipfs/exchange/offline"
|
||||
)
|
||||
|
||||
func BenchmarkHandle10KBlocks(b *testing.B) {
|
||||
bstore := blockstore.NewBlockstore(dssync.MutexWrap(ds.NewMapDatastore()))
|
||||
var testdata []*blocks.Block
|
||||
for i := 0; i < 10000; i++ {
|
||||
testdata = append(testdata, blocks.NewBlock([]byte(string(i))))
|
||||
}
|
||||
b.ResetTimer()
|
||||
b.SetBytes(10000)
|
||||
for i := 0; i < b.N; i++ {
|
||||
|
||||
b.StopTimer()
|
||||
w := NewWorker(offline.Exchange(bstore), Config{
|
||||
NumWorkers: 1,
|
||||
ClientBufferSize: 0,
|
||||
WorkerBufferSize: 0,
|
||||
})
|
||||
b.StartTimer()
|
||||
|
||||
for _, block := range testdata {
|
||||
if err := w.HasBlock(block); err != nil {
|
||||
b.Fatal(err)
|
||||
}
|
||||
}
|
||||
|
||||
b.StopTimer()
|
||||
w.Close()
|
||||
b.StartTimer()
|
||||
|
||||
}
|
||||
}
|
169
blockservice/worker/worker.go
Normal file
169
blockservice/worker/worker.go
Normal file
@ -0,0 +1,169 @@
|
||||
// TODO FIXME name me
|
||||
package worker
|
||||
|
||||
import (
|
||||
"container/list"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
exchange "github.com/jbenet/go-ipfs/exchange"
|
||||
util "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
|
||||
var log = util.Logger("blockservice")
|
||||
|
||||
var DefaultConfig = Config{
|
||||
NumWorkers: 1,
|
||||
ClientBufferSize: 0,
|
||||
WorkerBufferSize: 0,
|
||||
}
|
||||
|
||||
type Config struct {
|
||||
// NumWorkers sets the number of background workers that provide blocks to
|
||||
// the exchange.
|
||||
NumWorkers int
|
||||
|
||||
// ClientBufferSize allows clients of HasBlock to send up to
|
||||
// |ClientBufferSize| blocks without blocking.
|
||||
ClientBufferSize int
|
||||
|
||||
// WorkerBufferSize can be used in conjunction with NumWorkers to reduce
|
||||
// communication-coordination within the worker.
|
||||
WorkerBufferSize int
|
||||
}
|
||||
|
||||
// TODO FIXME name me
|
||||
type Worker struct {
|
||||
// added accepts blocks from client
|
||||
added chan *blocks.Block
|
||||
exchange exchange.Interface
|
||||
|
||||
// workQueue is owned by the client worker
|
||||
// process manages life-cycle
|
||||
process process.Process
|
||||
}
|
||||
|
||||
func NewWorker(e exchange.Interface, c Config) *Worker {
|
||||
if c.NumWorkers < 1 {
|
||||
c.NumWorkers = 1 // provide a sane default
|
||||
}
|
||||
w := &Worker{
|
||||
exchange: e,
|
||||
added: make(chan *blocks.Block, c.ClientBufferSize),
|
||||
process: process.WithParent(process.Background()), // internal management
|
||||
}
|
||||
w.start(c)
|
||||
return w
|
||||
}
|
||||
|
||||
func (w *Worker) HasBlock(b *blocks.Block) error {
|
||||
select {
|
||||
case <-w.process.Closed():
|
||||
return errors.New("blockservice worker is closed")
|
||||
case w.added <- b:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
func (w *Worker) Close() error {
|
||||
log.Debug("blockservice provide worker is shutting down...")
|
||||
return w.process.Close()
|
||||
}
|
||||
|
||||
func (w *Worker) start(c Config) {
|
||||
|
||||
workerChan := make(chan *blocks.Block, c.WorkerBufferSize)
|
||||
|
||||
// clientWorker handles incoming blocks from |w.added| and sends to
|
||||
// |workerChan|. This will never block the client.
|
||||
w.process.Go(func(proc process.Process) {
|
||||
defer close(workerChan)
|
||||
|
||||
var workQueue BlockList
|
||||
for {
|
||||
|
||||
// take advantage of the fact that sending on nil channel always
|
||||
// blocks so that a message is only sent if a block exists
|
||||
sendToWorker := workerChan
|
||||
nextBlock := workQueue.Pop()
|
||||
if nextBlock == nil {
|
||||
sendToWorker = nil
|
||||
}
|
||||
|
||||
select {
|
||||
|
||||
// if worker is ready and there's a block to process, send the
|
||||
// block
|
||||
case sendToWorker <- nextBlock:
|
||||
case <-time.Tick(5 * time.Second):
|
||||
if workQueue.Len() > 0 {
|
||||
log.Debugf("%d blocks in blockservice provide queue...", workQueue.Len())
|
||||
}
|
||||
case block := <-w.added:
|
||||
if nextBlock != nil {
|
||||
workQueue.Push(nextBlock) // missed the chance to send it
|
||||
}
|
||||
// if the client sends another block, add it to the queue.
|
||||
workQueue.Push(block)
|
||||
case <-proc.Closing():
|
||||
return
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
for i := 0; i < c.NumWorkers; i++ {
|
||||
// reads from |workerChan| until process closes
|
||||
w.process.Go(func(proc process.Process) {
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
|
||||
// shuts down an in-progress HasBlock operation
|
||||
proc.Go(func(proc process.Process) {
|
||||
<-proc.Closing()
|
||||
cancel()
|
||||
})
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-proc.Closing():
|
||||
return
|
||||
case block, ok := <-workerChan:
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if err := w.exchange.HasBlock(ctx, block); err != nil {
|
||||
// TODO log event?
|
||||
}
|
||||
}
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
type BlockList struct {
|
||||
list list.List
|
||||
}
|
||||
|
||||
func (s *BlockList) PushFront(b *blocks.Block) {
|
||||
// FIXME find figures
|
||||
s.list.PushFront(b)
|
||||
}
|
||||
|
||||
func (s *BlockList) Push(b *blocks.Block) {
|
||||
s.list.PushBack(b)
|
||||
}
|
||||
|
||||
func (s *BlockList) Pop() *blocks.Block {
|
||||
if s.list.Len() == 0 {
|
||||
return nil
|
||||
}
|
||||
e := s.list.Front()
|
||||
s.list.Remove(e)
|
||||
return e.Value.(*blocks.Block)
|
||||
}
|
||||
|
||||
func (s *BlockList) Len() int {
|
||||
return s.list.Len()
|
||||
}
|
14
blockservice/worker/worker_test.go
Normal file
14
blockservice/worker/worker_test.go
Normal file
@ -0,0 +1,14 @@
|
||||
package worker
|
||||
|
||||
import "testing"
|
||||
|
||||
func TestStartClose(t *testing.T) {
|
||||
numRuns := 50
|
||||
if testing.Short() {
|
||||
numRuns = 5
|
||||
}
|
||||
for i := 0; i < numRuns; i++ {
|
||||
w := NewWorker(nil, DefaultConfig)
|
||||
w.Close()
|
||||
}
|
||||
}
|
Reference in New Issue
Block a user