mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-02 12:20:03 +08:00
refac(bitswap) less concurrency while testing and iterating
This commit is contained in:
@ -79,7 +79,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
|
|||||||
message := bsmsg.New()
|
message := bsmsg.New()
|
||||||
message.AppendWanted(k)
|
message.AppendWanted(k)
|
||||||
for i := range peersToQuery {
|
for i := range peersToQuery {
|
||||||
go func(p *peer.Peer) {
|
func(p *peer.Peer) {
|
||||||
response, err := bs.sender.SendRequest(ctx, p, message)
|
response, err := bs.sender.SendRequest(ctx, p, message)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return
|
return
|
||||||
@ -109,7 +109,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
|
|||||||
// HasBlock announces the existance of a block to bitswap, potentially sending
|
// HasBlock announces the existance of a block to bitswap, potentially sending
|
||||||
// it to peers (Partners) whose WantLists include it.
|
// it to peers (Partners) whose WantLists include it.
|
||||||
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
|
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
|
||||||
go bs.sendToPeersThatWant(ctx, blk)
|
bs.sendToPeersThatWant(ctx, blk)
|
||||||
return bs.routing.Provide(blk.Key())
|
return bs.routing.Provide(blk.Key())
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -128,9 +128,9 @@ func (bs *bitswap) ReceiveMessage(
|
|||||||
|
|
||||||
if incoming.Blocks() != nil {
|
if incoming.Blocks() != nil {
|
||||||
for _, block := range incoming.Blocks() {
|
for _, block := range incoming.Blocks() {
|
||||||
go bs.blockstore.Put(block) // FIXME(brian): err ignored
|
bs.blockstore.Put(block) // FIXME(brian): err ignored
|
||||||
go bs.notifications.Publish(block)
|
bs.notifications.Publish(block)
|
||||||
go bs.HasBlock(ctx, block) // FIXME err ignored
|
bs.HasBlock(ctx, block) // FIXME err ignored
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -139,12 +139,11 @@ func (bs *bitswap) ReceiveMessage(
|
|||||||
if bs.strategy.ShouldSendBlockToPeer(key, p) {
|
if bs.strategy.ShouldSendBlockToPeer(key, p) {
|
||||||
block, errBlockNotFound := bs.blockstore.Get(key)
|
block, errBlockNotFound := bs.blockstore.Get(key)
|
||||||
if errBlockNotFound != nil {
|
if errBlockNotFound != nil {
|
||||||
// TODO(brian): log/return the error
|
return nil, nil, errBlockNotFound
|
||||||
continue
|
|
||||||
}
|
}
|
||||||
message := bsmsg.New()
|
message := bsmsg.New()
|
||||||
message.AppendBlock(*block)
|
message.AppendBlock(*block)
|
||||||
go bs.send(ctx, p, message)
|
bs.send(ctx, p, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -168,7 +167,7 @@ func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block)
|
|||||||
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
|
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
|
||||||
message := bsmsg.New()
|
message := bsmsg.New()
|
||||||
message.AppendBlock(block)
|
message.AppendBlock(block)
|
||||||
go bs.send(ctx, p, message)
|
bs.send(ctx, p, message)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user