mirror of
https://github.com/ipfs/kubo.git
synced 2025-09-10 05:52:20 +08:00
move blocking calls out of single threaded loops, cancel contexts ASAP
This commit is contained in:
@ -249,9 +249,6 @@ func (n *IpfsNode) startOnlineServices(ctx context.Context, routingOption Routin
|
||||
// startOnlineServicesWithHost is the set of services which need to be
|
||||
// initialized with the host and _before_ we start listening.
|
||||
func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption) error {
|
||||
// Wrap standard peer host with routing system to allow unknown peer lookups
|
||||
n.PeerHost = rhost.Wrap(host, n.Routing)
|
||||
|
||||
// setup diagnostics service
|
||||
n.Diagnostics = diag.NewDiagnostics(n.Identity, host)
|
||||
|
||||
@ -262,6 +259,9 @@ func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost
|
||||
}
|
||||
n.Routing = r
|
||||
|
||||
// Wrap standard peer host with routing system to allow unknown peer lookups
|
||||
n.PeerHost = rhost.Wrap(host, n.Routing)
|
||||
|
||||
// setup exchange service
|
||||
const alwaysSendToPeer = true // use YesManStrategy
|
||||
bitswapNetwork := bsnet.NewFromIpfsHost(n.PeerHost, n.Routing)
|
||||
|
@ -84,7 +84,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
|
||||
engine: decision.NewEngine(ctx, bstore), // TODO close the engine with Close() method
|
||||
network: network,
|
||||
wantlist: wantlist.NewThreadSafe(),
|
||||
batchRequests: make(chan []u.Key, sizeBatchRequestChan),
|
||||
batchRequests: make(chan *blockRequest, sizeBatchRequestChan),
|
||||
process: px,
|
||||
}
|
||||
network.SetDelegate(bs)
|
||||
@ -94,6 +94,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
|
||||
px.Go(func(px process.Process) {
|
||||
bs.taskWorker(ctx)
|
||||
})
|
||||
px.Go(func(px process.Process) {
|
||||
bs.rebroadcastWorker(ctx)
|
||||
})
|
||||
|
||||
return bs
|
||||
}
|
||||
@ -116,7 +119,7 @@ type bitswap struct {
|
||||
// Requests for a set of related blocks
|
||||
// the assumption is made that the same peer is likely to
|
||||
// have more than a single block in the set
|
||||
batchRequests chan []u.Key
|
||||
batchRequests chan *blockRequest
|
||||
|
||||
engine *decision.Engine
|
||||
|
||||
@ -125,6 +128,11 @@ type bitswap struct {
|
||||
process process.Process
|
||||
}
|
||||
|
||||
type blockRequest struct {
|
||||
keys []u.Key
|
||||
ctx context.Context
|
||||
}
|
||||
|
||||
// GetBlock attempts to retrieve a particular block from peers within the
|
||||
// deadline enforced by the context.
|
||||
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
|
||||
@ -175,15 +183,19 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
|
||||
// resources, provide a context with a reasonably short deadline (ie. not one
|
||||
// that lasts throughout the lifetime of the server)
|
||||
func (bs *bitswap) GetBlocks(ctx context.Context, keys []u.Key) (<-chan *blocks.Block, error) {
|
||||
|
||||
select {
|
||||
case <-bs.process.Closing():
|
||||
return nil, errors.New("bitswap is closed")
|
||||
default:
|
||||
}
|
||||
promise := bs.notifications.Subscribe(ctx, keys...)
|
||||
|
||||
req := &blockRequest{
|
||||
keys: keys,
|
||||
ctx: ctx,
|
||||
}
|
||||
select {
|
||||
case bs.batchRequests <- keys:
|
||||
case bs.batchRequests <- req:
|
||||
return promise, nil
|
||||
case <-ctx.Done():
|
||||
return nil, ctx.Err()
|
||||
@ -321,8 +333,8 @@ func (bs *bitswap) PeerConnected(p peer.ID) {
|
||||
}
|
||||
|
||||
// Connected/Disconnected warns bitswap about peer connections
|
||||
func (bs *bitswap) PeerDisconnected(peer.ID) {
|
||||
// TODO: release resources.
|
||||
func (bs *bitswap) PeerDisconnected(p peer.ID) {
|
||||
bs.engine.PeerDisconnected(p)
|
||||
}
|
||||
|
||||
func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
|
||||
@ -342,6 +354,24 @@ func (bs *bitswap) cancelBlocks(ctx context.Context, bkeys []u.Key) {
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *bitswap) wantNewBlocks(ctx context.Context, bkeys []u.Key) {
|
||||
if len(bkeys) < 1 {
|
||||
return
|
||||
}
|
||||
|
||||
message := bsmsg.New()
|
||||
message.SetFull(false)
|
||||
for i, k := range bkeys {
|
||||
message.AddEntry(k, kMaxPriority-i)
|
||||
}
|
||||
for _, p := range bs.engine.Peers() {
|
||||
err := bs.send(ctx, p, message)
|
||||
if err != nil {
|
||||
log.Debugf("Error sending message: %s", err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *bitswap) ReceiveError(err error) {
|
||||
log.Debugf("Bitswap ReceiveError: %s", err)
|
||||
// TODO log the network error
|
||||
@ -385,13 +415,42 @@ func (bs *bitswap) taskWorker(ctx context.Context) {
|
||||
|
||||
// TODO ensure only one active request per key
|
||||
func (bs *bitswap) clientWorker(parent context.Context) {
|
||||
|
||||
defer log.Info("bitswap client worker shutting down...")
|
||||
|
||||
for {
|
||||
select {
|
||||
case req := <-bs.batchRequests:
|
||||
keys := req.keys
|
||||
if len(keys) == 0 {
|
||||
log.Warning("Received batch request for zero blocks")
|
||||
continue
|
||||
}
|
||||
for i, k := range keys {
|
||||
bs.wantlist.Add(k, kMaxPriority-i)
|
||||
}
|
||||
|
||||
bs.wantNewBlocks(req.ctx, keys)
|
||||
|
||||
// NB: Optimization. Assumes that providers of key[0] are likely to
|
||||
// be able to provide for all keys. This currently holds true in most
|
||||
// every situation. Later, this assumption may not hold as true.
|
||||
child, _ := context.WithTimeout(req.ctx, providerRequestTimeout)
|
||||
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
|
||||
err := bs.sendWantlistToPeers(req.ctx, providers)
|
||||
if err != nil {
|
||||
log.Debugf("error sending wantlist: %s", err)
|
||||
}
|
||||
case <-parent.Done():
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (bs *bitswap) rebroadcastWorker(parent context.Context) {
|
||||
ctx, cancel := context.WithCancel(parent)
|
||||
defer cancel()
|
||||
|
||||
broadcastSignal := time.After(rebroadcastDelay.Get())
|
||||
defer cancel()
|
||||
|
||||
for {
|
||||
select {
|
||||
@ -406,23 +465,6 @@ func (bs *bitswap) clientWorker(parent context.Context) {
|
||||
bs.sendWantlistToProviders(ctx, entries)
|
||||
}
|
||||
broadcastSignal = time.After(rebroadcastDelay.Get())
|
||||
case keys := <-bs.batchRequests:
|
||||
if len(keys) == 0 {
|
||||
log.Warning("Received batch request for zero blocks")
|
||||
continue
|
||||
}
|
||||
for i, k := range keys {
|
||||
bs.wantlist.Add(k, kMaxPriority-i)
|
||||
}
|
||||
// NB: Optimization. Assumes that providers of key[0] are likely to
|
||||
// be able to provide for all keys. This currently holds true in most
|
||||
// every situation. Later, this assumption may not hold as true.
|
||||
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
|
||||
providers := bs.network.FindProvidersAsync(child, keys[0], maxProvidersPerRequest)
|
||||
err := bs.sendWantlistToPeers(ctx, providers)
|
||||
if err != nil {
|
||||
log.Debugf("error sending wantlist: %s", err)
|
||||
}
|
||||
case <-parent.Done():
|
||||
return
|
||||
}
|
||||
|
@ -228,6 +228,10 @@ func (e *Engine) MessageSent(p peer.ID, m bsmsg.BitSwapMessage) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func (e *Engine) PeerDisconnected(p peer.ID) {
|
||||
// TODO: release ledger
|
||||
}
|
||||
|
||||
func (e *Engine) numBytesSentTo(p peer.ID) uint64 {
|
||||
// NB not threadsafe
|
||||
return e.findOrCreate(p).Accounting.BytesSent
|
||||
|
@ -187,9 +187,12 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) []NodeGetter {
|
||||
}
|
||||
|
||||
go func() {
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
blkchan := ds.Blocks.GetBlocks(ctx, keys)
|
||||
|
||||
for {
|
||||
for count := 0; count < len(keys); {
|
||||
select {
|
||||
case blk, ok := <-blkchan:
|
||||
if !ok {
|
||||
@ -205,6 +208,7 @@ func (ds *dagService) GetNodes(ctx context.Context, keys []u.Key) []NodeGetter {
|
||||
is := FindLinks(keys, blk.Key(), 0)
|
||||
for _, i := range is {
|
||||
sendChans[i] <- nd
|
||||
count++
|
||||
}
|
||||
case <-ctx.Done():
|
||||
return
|
||||
|
Reference in New Issue
Block a user