mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-04 13:27:14 +08:00
document bitswap more
This commit is contained in:
@ -22,7 +22,8 @@ import (
|
|||||||
var log = eventlog.Logger("bitswap")
|
var log = eventlog.Logger("bitswap")
|
||||||
|
|
||||||
// Number of providers to request for sending a wantlist to
|
// Number of providers to request for sending a wantlist to
|
||||||
const maxProvidersPerRequest = 6
|
// TODO: if a 'non-nice' strategy is implemented, consider increasing this value
|
||||||
|
const maxProvidersPerRequest = 3
|
||||||
|
|
||||||
// New initializes a BitSwap instance that communicates over the
|
// New initializes a BitSwap instance that communicates over the
|
||||||
// provided BitSwapNetwork. This function registers the returned instance as
|
// provided BitSwapNetwork. This function registers the returned instance as
|
||||||
@ -211,6 +212,7 @@ func (bs *bitswap) loop(parent context.Context) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-broadcastSignal.C:
|
case <-broadcastSignal.C:
|
||||||
|
// Resend unfulfilled wantlist keys
|
||||||
bs.sendWantlistToProviders(ctx, bs.wantlist.Keys())
|
bs.sendWantlistToProviders(ctx, bs.wantlist.Keys())
|
||||||
case ks := <-bs.batchRequests:
|
case ks := <-bs.batchRequests:
|
||||||
// TODO: implement batching on len(ks) > X for some X
|
// TODO: implement batching on len(ks) > X for some X
|
||||||
@ -224,6 +226,13 @@ func (bs *bitswap) loop(parent context.Context) {
|
|||||||
for _, k := range ks {
|
for _, k := range ks {
|
||||||
bs.wantlist.Add(k)
|
bs.wantlist.Add(k)
|
||||||
}
|
}
|
||||||
|
// NB: send want list to providers for the first peer in this list.
|
||||||
|
// the assumption is made that the providers of the first key in
|
||||||
|
// the set are likely to have others as well.
|
||||||
|
// This currently holds true in most every situation, since when
|
||||||
|
// pinning a file, you store and provide all blocks associated with
|
||||||
|
// it. Later, this assumption may not hold as true if we implement
|
||||||
|
// newer bitswap strategies.
|
||||||
providers := bs.routing.FindProvidersAsync(ctx, ks[0], maxProvidersPerRequest)
|
providers := bs.routing.FindProvidersAsync(ctx, ks[0], maxProvidersPerRequest)
|
||||||
|
|
||||||
err := bs.sendWantListTo(ctx, providers)
|
err := bs.sendWantListTo(ctx, providers)
|
||||||
@ -263,7 +272,6 @@ func (bs *bitswap) receiveBlock(ctx context.Context, block *blocks.Block) {
|
|||||||
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
|
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
|
||||||
peer.Peer, bsmsg.BitSwapMessage) {
|
peer.Peer, bsmsg.BitSwapMessage) {
|
||||||
log.Debugf("ReceiveMessage from %s", p)
|
log.Debugf("ReceiveMessage from %s", p)
|
||||||
log.Debugf("Message wantlist: %v", incoming.Wantlist())
|
|
||||||
|
|
||||||
if p == nil {
|
if p == nil {
|
||||||
log.Error("Received message from nil peer!")
|
log.Error("Received message from nil peer!")
|
||||||
@ -279,15 +287,17 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
|
|||||||
// Record message bytes in ledger
|
// Record message bytes in ledger
|
||||||
// TODO: this is bad, and could be easily abused.
|
// TODO: this is bad, and could be easily abused.
|
||||||
// Should only track *useful* messages in ledger
|
// Should only track *useful* messages in ledger
|
||||||
bs.strategy.MessageReceived(p, incoming) // FIRST
|
// This call records changes to wantlists, blocks received,
|
||||||
|
// and number of bytes transfered.
|
||||||
|
bs.strategy.MessageReceived(p, incoming)
|
||||||
|
|
||||||
for _, block := range incoming.Blocks() {
|
go func() {
|
||||||
go bs.receiveBlock(ctx, block)
|
for _, block := range incoming.Blocks() {
|
||||||
}
|
bs.receiveBlock(ctx, block)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
for _, key := range incoming.Wantlist() {
|
for _, key := range incoming.Wantlist() {
|
||||||
// TODO: might be better to check if we have the block before checking
|
|
||||||
// if we should send it to someone
|
|
||||||
if bs.strategy.ShouldSendBlockToPeer(key, p) {
|
if bs.strategy.ShouldSendBlockToPeer(key, p) {
|
||||||
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
|
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
|
||||||
continue
|
continue
|
||||||
@ -303,12 +313,12 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
|
|||||||
}
|
}
|
||||||
|
|
||||||
blkmsg.AddBlock(block)
|
blkmsg.AddBlock(block)
|
||||||
bs.strategy.MessageSent(p, blkmsg)
|
|
||||||
bs.send(ctx, p, blkmsg)
|
bs.send(ctx, p, blkmsg)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: consider changing this function to not return anything
|
||||||
return nil, nil
|
return nil, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -326,7 +336,7 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) {
|
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block *blocks.Block) {
|
||||||
log.Debugf("Sending %v to peers that want it", block.Key())
|
log.Debugf("Sending %s to peers that want it", block)
|
||||||
|
|
||||||
for _, p := range bs.strategy.Peers() {
|
for _, p := range bs.strategy.Peers() {
|
||||||
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
|
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
|
||||||
|
Reference in New Issue
Block a user