1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 14:34:24 +08:00

feat(bitswap): synchronous close

This commit is contained in:
Brian Tiger Chow
2015-01-30 01:29:12 -08:00
parent 951ff4f747
commit c114b04ae1
3 changed files with 48 additions and 17 deletions

View File

@ -263,16 +263,18 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context) error {
func (n *IpfsNode) teardown() error {
// owned objects are closed in this teardown to ensure that they're closed
// regardless of which constructor was used to add them to the node.
var closers []io.Closer
addCloser := func(c io.Closer) {
closers := []io.Closer{
n.Blocks,
n.Exchange,
n.Repo,
}
addCloser := func(c io.Closer) { // use when field may be nil
if c != nil {
closers = append(closers, c)
}
}
addCloser(n.Bootstrapper)
addCloser(n.Repo)
addCloser(n.Blocks)
if dht, ok := n.Routing.(*dht.IpfsDHT); ok {
addCloser(dht)
}

View File

@ -9,6 +9,7 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
inflect "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/briantigerchow/inflect"
process "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
blocks "github.com/jbenet/go-ipfs/blocks"
blockstore "github.com/jbenet/go-ipfs/blocks/blockstore"
@ -52,28 +53,47 @@ var (
func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
bstore blockstore.Blockstore, nice bool) exchange.Interface {
// important to use provided parent context (since it may include important
// loggable data). It's probably not a good idea to allow bitswap to be
// coupled to the concerns of the IPFS daemon in this way.
//
// FIXME(btc) Now that bitswap manages itself using a process, it probably
// shouldn't accept a context anymore. Clients should probably use Close()
// exclusively. We should probably find another way to share logging data
ctx, cancelFunc := context.WithCancel(parent)
notif := notifications.New()
go func() {
<-ctx.Done()
cancelFunc()
px := process.WithTeardown(func() error {
notif.Shutdown()
return nil
})
go func() {
<-px.Closing() // process closes first
cancelFunc()
}()
go func() {
<-ctx.Done() // parent cancelled first
px.Close()
}()
bs := &bitswap{
self: p,
blockstore: bstore,
cancelFunc: cancelFunc,
notifications: notif,
engine: decision.NewEngine(ctx, bstore),
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
network: network,
wantlist: wantlist.NewThreadSafe(),
batchRequests: make(chan []u.Key, sizeBatchRequestChan),
process: px,
}
network.SetDelegate(bs)
go bs.clientWorker(ctx)
go bs.taskWorker(ctx)
px.Go(func(px process.Process) {
bs.clientWorker(ctx)
})
px.Go(func(px process.Process) {
bs.taskWorker(ctx)
})
return bs
}
@ -102,8 +122,7 @@ type bitswap struct {
wantlist *wantlist.ThreadSafe
// cancelFunc signals cancellation to the bitswap event loop
cancelFunc func()
process process.Process
}
// GetBlock attempts to retrieve a particular block from peers within the
@ -149,6 +168,11 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
// that lasts throughout the lifetime of the server)
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
select {
case <-bs.process.Closing():
return nil, errors.New("bitswap is closed")
default:
}
promise := bs.notifications.Subscribe(ctx, keys...)
select {
case bs.batchRequests <- keys:
@ -161,6 +185,11 @@ func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.
// HasBlock announces the existance of a block to this bitswap service. The
// service will potentially notify its peers.
func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
select {
case <-bs.process.Closing():
return errors.New("bitswap is closed")
default:
}
if err := bs.blockstore.Put(blk); err != nil {
return err
}
@ -235,6 +264,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context, entries []wantli
}
func (bs *bitswap) taskWorker(ctx context.Context) {
defer log.Info("bitswap task worker shutting down...")
for {
select {
case <-ctx.Done():
@ -256,6 +286,8 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
// TODO ensure only one active request per key
func (bs *bitswap) clientWorker(parent context.Context) {
defer log.Info("bitswap client worker shutting down...")
ctx, cancel := context.WithCancel(parent)
broadcastSignal := time.After(rebroadcastDelay.Get())
@ -384,6 +416,5 @@ func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage)
}
func (bs *bitswap) Close() error {
bs.cancelFunc()
return nil // to conform to Closer interface
return bs.process.Close()
}

View File

@ -22,8 +22,6 @@ import (
const kNetworkDelay = 0 * time.Millisecond
func TestClose(t *testing.T) {
// TODO
t.Skip("TODO Bitswap's Close implementation is a WIP")
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()