mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 02:30:39 +08:00
refac(bitswap) use blockstore
This commit is contained in:
@ -11,6 +11,7 @@ import (
|
|||||||
bsnet "github.com/jbenet/go-ipfs/bitswap/network"
|
bsnet "github.com/jbenet/go-ipfs/bitswap/network"
|
||||||
notifications "github.com/jbenet/go-ipfs/bitswap/notifications"
|
notifications "github.com/jbenet/go-ipfs/bitswap/notifications"
|
||||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||||
|
blockstore "github.com/jbenet/go-ipfs/blockstore"
|
||||||
peer "github.com/jbenet/go-ipfs/peer"
|
peer "github.com/jbenet/go-ipfs/peer"
|
||||||
routing "github.com/jbenet/go-ipfs/routing"
|
routing "github.com/jbenet/go-ipfs/routing"
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
u "github.com/jbenet/go-ipfs/util"
|
||||||
@ -35,8 +36,8 @@ type BitSwap struct {
|
|||||||
// sender delivers messages on behalf of the session
|
// sender delivers messages on behalf of the session
|
||||||
sender bsnet.NetworkAdapter
|
sender bsnet.NetworkAdapter
|
||||||
|
|
||||||
// datastore is the local database // Ledgers of known
|
// blockstore is the local database
|
||||||
datastore ds.Datastore
|
blockstore blockstore.Blockstore
|
||||||
|
|
||||||
// routing interface for communication
|
// routing interface for communication
|
||||||
routing routing.IpfsRouting
|
routing routing.IpfsRouting
|
||||||
@ -66,7 +67,7 @@ func NewSession(parent context.Context, s bsnet.NetworkService, p *peer.Peer, d
|
|||||||
receiver := bsnet.Forwarder{}
|
receiver := bsnet.Forwarder{}
|
||||||
bs := &BitSwap{
|
bs := &BitSwap{
|
||||||
peer: p,
|
peer: p,
|
||||||
datastore: d,
|
blockstore: blockstore.NewBlockstore(d),
|
||||||
partners: LedgerMap{},
|
partners: LedgerMap{},
|
||||||
wantList: KeySet{},
|
wantList: KeySet{},
|
||||||
routing: r,
|
routing: r,
|
||||||
@ -151,7 +152,9 @@ func (bs *BitSwap) HasBlock(blk blocks.Block) error {
|
|||||||
return bs.routing.Provide(blk.Key())
|
return bs.routing.Provide(blk.Key())
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(brian): get a return value
|
||||||
func (bs *BitSwap) SendBlock(p *peer.Peer, b blocks.Block) {
|
func (bs *BitSwap) SendBlock(p *peer.Peer, b blocks.Block) {
|
||||||
|
u.DOut("Sending block to peer.\n")
|
||||||
message := bsmsg.New()
|
message := bsmsg.New()
|
||||||
// TODO(brian): change interface to accept value instead of pointer
|
// TODO(brian): change interface to accept value instead of pointer
|
||||||
message.AppendBlock(b)
|
message.AppendBlock(b)
|
||||||
@ -162,40 +165,26 @@ func (bs *BitSwap) SendBlock(p *peer.Peer, b blocks.Block) {
|
|||||||
// and then if we do, check the ledger for whether or not we should send it.
|
// and then if we do, check the ledger for whether or not we should send it.
|
||||||
func (bs *BitSwap) peerWantsBlock(p *peer.Peer, wanted u.Key) {
|
func (bs *BitSwap) peerWantsBlock(p *peer.Peer, wanted u.Key) {
|
||||||
u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), wanted.Pretty())
|
u.DOut("peer [%s] wants block [%s]\n", p.ID.Pretty(), wanted.Pretty())
|
||||||
|
|
||||||
ledger := bs.getLedger(p)
|
ledger := bs.getLedger(p)
|
||||||
|
|
||||||
blk_i, err := bs.datastore.Get(wanted.DatastoreKey())
|
if !ledger.ShouldSend() {
|
||||||
if err != nil {
|
|
||||||
if err == ds.ErrNotFound {
|
|
||||||
ledger.Wants(wanted)
|
|
||||||
}
|
|
||||||
u.PErr("datastore get error: %v\n", err)
|
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
blk, ok := blk_i.([]byte)
|
block, err := bs.blockstore.Get(wanted)
|
||||||
if !ok {
|
if err != nil { // TODO(brian): log/return the error
|
||||||
u.PErr("data conversion error.\n")
|
ledger.Wants(wanted)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
bs.SendBlock(p, *block)
|
||||||
if ledger.ShouldSend() {
|
ledger.SentBytes(numBytes(*block))
|
||||||
u.DOut("Sending block to peer.\n")
|
|
||||||
bblk, err := blocks.NewBlock(blk)
|
|
||||||
if err != nil {
|
|
||||||
u.PErr("newBlock error: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
bs.SendBlock(p, *bblk)
|
|
||||||
ledger.SentBytes(len(blk))
|
|
||||||
} else {
|
|
||||||
u.DOut("Decided not to send block.")
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO(brian): return error
|
||||||
func (bs *BitSwap) blockReceive(p *peer.Peer, blk blocks.Block) {
|
func (bs *BitSwap) blockReceive(p *peer.Peer, blk blocks.Block) {
|
||||||
u.DOut("blockReceive: %s\n", blk.Key().Pretty())
|
u.DOut("blockReceive: %s\n", blk.Key().Pretty())
|
||||||
err := bs.datastore.Put(ds.NewKey(string(blk.Key())), blk.Data)
|
err := bs.blockstore.Put(blk)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
u.PErr("blockReceive error: %v\n", err)
|
u.PErr("blockReceive error: %v\n", err)
|
||||||
return
|
return
|
||||||
@ -255,3 +244,7 @@ func (bs *BitSwap) ReceiveMessage(
|
|||||||
}
|
}
|
||||||
return nil, nil, errors.New("TODO implement")
|
return nil, nil, errors.New("TODO implement")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func numBytes(b blocks.Block) int {
|
||||||
|
return len(b.Data)
|
||||||
|
}
|
||||||
|
Reference in New Issue
Block a user