mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-02 03:28:25 +08:00
@ -1,6 +1,8 @@
|
||||
package blocks
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
|
||||
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
|
||||
u "github.com/jbenet/go-ipfs/util"
|
||||
)
|
||||
@ -20,3 +22,7 @@ func NewBlock(data []byte) *Block {
|
||||
func (b *Block) Key() u.Key {
|
||||
return u.Key(b.Multihash)
|
||||
}
|
||||
|
||||
func (b *Block) String() string {
|
||||
return fmt.Sprintf("[Block %s]", b.Key())
|
||||
}
|
||||
|
@ -3,6 +3,9 @@ package blockservice
|
||||
import (
|
||||
"bytes"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/go.net/context"
|
||||
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
@ -37,7 +40,8 @@ func TestBlocks(t *testing.T) {
|
||||
t.Error("returned key is not equal to block key", err)
|
||||
}
|
||||
|
||||
b2, err := bs.GetBlock(b.Key())
|
||||
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
|
||||
b2, err := bs.GetBlock(ctx, b.Key())
|
||||
if err != nil {
|
||||
t.Error("failed to retrieve block from BlockService", err)
|
||||
return
|
||||
|
@ -2,7 +2,6 @@ package blockservice
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
|
||||
@ -52,8 +51,8 @@ func (s *BlockService) AddBlock(b *blocks.Block) (u.Key, error) {
|
||||
|
||||
// GetBlock retrieves a particular block from the service,
|
||||
// Getting it from the datastore using the key (hash).
|
||||
func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
|
||||
log.Debug("BlockService GetBlock: '%s'", k)
|
||||
func (s *BlockService) GetBlock(ctx context.Context, k u.Key) (*blocks.Block, error) {
|
||||
log.Debugf("BlockService GetBlock: '%s'", k)
|
||||
datai, err := s.Datastore.Get(k.DsKey())
|
||||
if err == nil {
|
||||
log.Debug("Blockservice: Got data in datastore.")
|
||||
@ -67,7 +66,6 @@ func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
|
||||
}, nil
|
||||
} else if err == ds.ErrNotFound && s.Remote != nil {
|
||||
log.Debug("Blockservice: Searching bitswap.")
|
||||
ctx, _ := context.WithTimeout(context.TODO(), 5*time.Second)
|
||||
blk, err := s.Remote.Block(ctx, k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@ -78,3 +76,7 @@ func (s *BlockService) GetBlock(k u.Key) (*blocks.Block, error) {
|
||||
return nil, u.ErrNotFound
|
||||
}
|
||||
}
|
||||
|
||||
func (s *BlockService) DeleteBlock(k u.Key) error {
|
||||
return s.Datastore.Delete(k.DsKey())
|
||||
}
|
||||
|
@ -5,6 +5,9 @@ import (
|
||||
"io"
|
||||
"io/ioutil"
|
||||
"os"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/go.net/context"
|
||||
|
||||
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
|
||||
"github.com/jbenet/go-ipfs/blocks"
|
||||
@ -26,7 +29,8 @@ func BlockGet(n *core.IpfsNode, args []string, opts map[string]interface{}, out
|
||||
|
||||
k := u.Key(h)
|
||||
log.Debug("BlockGet key: '%q'", k)
|
||||
b, err := n.Blocks.GetBlock(k)
|
||||
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
|
||||
b, err := n.Blocks.GetBlock(ctx, k)
|
||||
if err != nil {
|
||||
return fmt.Errorf("block get: %v", err)
|
||||
}
|
||||
|
@ -53,7 +53,7 @@ func (s *SecurePipe) handshake() error {
|
||||
return err
|
||||
}
|
||||
|
||||
log.Debug("handshake: %s <--> %s", s.local, s.remote)
|
||||
log.Debugf("handshake: %s <--> %s", s.local, s.remote)
|
||||
myPubKey, err := s.local.PubKey().Bytes()
|
||||
if err != nil {
|
||||
return err
|
||||
@ -105,7 +105,7 @@ func (s *SecurePipe) handshake() error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
log.Debug("%s Remote Peer Identified as %s", s.local, s.remote)
|
||||
log.Debugf("%s Remote Peer Identified as %s", s.local, s.remote)
|
||||
|
||||
exchange, err := selectBest(SupportedExchanges, proposeResp.GetExchanges())
|
||||
if err != nil {
|
||||
@ -209,7 +209,7 @@ func (s *SecurePipe) handshake() error {
|
||||
return fmt.Errorf("Negotiation failed, got: %s", resp2)
|
||||
}
|
||||
|
||||
log.Debug("%s handshake: Got node id: %s", s.local, s.remote)
|
||||
log.Debugf("%s handshake: Got node id: %s", s.local, s.remote)
|
||||
return nil
|
||||
}
|
||||
|
||||
|
@ -66,7 +66,7 @@ type bitswap struct {
|
||||
//
|
||||
// TODO ensure only one active request per key
|
||||
func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error) {
|
||||
log.Debug("Get Block %v", k)
|
||||
log.Debugf("Get Block %v", k)
|
||||
|
||||
ctx, cancelFunc := context.WithCancel(parent)
|
||||
bs.wantlist.Add(k)
|
||||
@ -82,10 +82,10 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
|
||||
}
|
||||
message.AppendWanted(k)
|
||||
for peerToQuery := range peersToQuery {
|
||||
log.Debug("bitswap got peersToQuery: %s", peerToQuery)
|
||||
log.Debugf("bitswap got peersToQuery: %s", peerToQuery)
|
||||
go func(p peer.Peer) {
|
||||
|
||||
log.Debug("bitswap dialing peer: %s", p)
|
||||
log.Debugf("bitswap dialing peer: %s", p)
|
||||
err := bs.sender.DialPeer(p)
|
||||
if err != nil {
|
||||
log.Errorf("Error sender.DialPeer(%s)", p)
|
||||
@ -94,7 +94,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
|
||||
|
||||
response, err := bs.sender.SendRequest(ctx, p, message)
|
||||
if err != nil {
|
||||
log.Errorf("Error sender.SendRequest(%s)", p)
|
||||
log.Error("Error sender.SendRequest(%s) = %s", p, err)
|
||||
return
|
||||
}
|
||||
// FIXME ensure accounting is handled correctly when
|
||||
@ -124,7 +124,7 @@ func (bs *bitswap) Block(parent context.Context, k u.Key) (*blocks.Block, error)
|
||||
// HasBlock announces the existance of a block to bitswap, potentially sending
|
||||
// it to peers (Partners) whose WantLists include it.
|
||||
func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
|
||||
log.Debug("Has Block %v", blk.Key())
|
||||
log.Debugf("Has Block %v", blk.Key())
|
||||
bs.wantlist.Remove(blk.Key())
|
||||
bs.sendToPeersThatWant(ctx, blk)
|
||||
return bs.routing.Provide(ctx, blk.Key())
|
||||
@ -133,17 +133,23 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk blocks.Block) error {
|
||||
// TODO(brian): handle errors
|
||||
func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsmsg.BitSwapMessage) (
|
||||
peer.Peer, bsmsg.BitSwapMessage) {
|
||||
log.Debug("ReceiveMessage from %v", p.Key())
|
||||
log.Debugf("ReceiveMessage from %v", p.Key())
|
||||
log.Debugf("Message wantlist: %v", incoming.Wantlist())
|
||||
|
||||
if p == nil {
|
||||
log.Error("Received message from nil peer!")
|
||||
// TODO propagate the error upward
|
||||
return nil, nil
|
||||
}
|
||||
if incoming == nil {
|
||||
log.Error("Got nil bitswap message!")
|
||||
// TODO propagate the error upward
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// Record message bytes in ledger
|
||||
// TODO: this is bad, and could be easily abused.
|
||||
// Should only track *useful* messages in ledger
|
||||
bs.strategy.MessageReceived(p, incoming) // FIRST
|
||||
|
||||
for _, block := range incoming.Blocks() {
|
||||
@ -153,7 +159,10 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
|
||||
}
|
||||
go bs.notifications.Publish(block)
|
||||
go func(block blocks.Block) {
|
||||
_ = bs.HasBlock(ctx, block) // FIXME err ignored
|
||||
err := bs.HasBlock(ctx, block) // FIXME err ignored
|
||||
if err != nil {
|
||||
log.Errorf("HasBlock errored: %s", err)
|
||||
}
|
||||
}(block)
|
||||
}
|
||||
|
||||
@ -162,6 +171,8 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
|
||||
message.AppendWanted(wanted)
|
||||
}
|
||||
for _, key := range incoming.Wantlist() {
|
||||
// TODO: might be better to check if we have the block before checking
|
||||
// if we should send it to someone
|
||||
if bs.strategy.ShouldSendBlockToPeer(key, p) {
|
||||
if block, errBlockNotFound := bs.blockstore.Get(key); errBlockNotFound != nil {
|
||||
continue
|
||||
@ -171,10 +182,13 @@ func (bs *bitswap) ReceiveMessage(ctx context.Context, p peer.Peer, incoming bsm
|
||||
}
|
||||
}
|
||||
defer bs.strategy.MessageSent(p, message)
|
||||
|
||||
log.Debug("Returning message.")
|
||||
return p, message
|
||||
}
|
||||
|
||||
func (bs *bitswap) ReceiveError(err error) {
|
||||
log.Errorf("Bitswap ReceiveError: %s", err)
|
||||
// TODO log the network error
|
||||
// TODO bubble the network error up to the parent context/error logger
|
||||
}
|
||||
@ -187,10 +201,10 @@ func (bs *bitswap) send(ctx context.Context, p peer.Peer, m bsmsg.BitSwapMessage
|
||||
}
|
||||
|
||||
func (bs *bitswap) sendToPeersThatWant(ctx context.Context, block blocks.Block) {
|
||||
log.Debug("Sending %v to peers that want it", block.Key())
|
||||
log.Debugf("Sending %v to peers that want it", block.Key())
|
||||
for _, p := range bs.strategy.Peers() {
|
||||
if bs.strategy.BlockIsWantedByPeer(block.Key(), p) {
|
||||
log.Debug("%v wants %v", p, block.Key())
|
||||
log.Debugf("%v wants %v", p, block.Key())
|
||||
if bs.strategy.ShouldSendBlockToPeer(block.Key(), p) {
|
||||
message := bsmsg.New()
|
||||
message.AppendBlock(block)
|
||||
|
@ -1,7 +1,10 @@
|
||||
package network
|
||||
|
||||
import (
|
||||
"errors"
|
||||
|
||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
||||
"github.com/jbenet/go-ipfs/util"
|
||||
|
||||
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
|
||||
inet "github.com/jbenet/go-ipfs/net"
|
||||
@ -9,6 +12,8 @@ import (
|
||||
peer "github.com/jbenet/go-ipfs/peer"
|
||||
)
|
||||
|
||||
var log = util.Logger("net_message_adapter")
|
||||
|
||||
// NetMessageAdapter wraps a NetMessage network service
|
||||
func NetMessageAdapter(s inet.Service, n inet.Network, r Receiver) Adapter {
|
||||
adapter := impl{
|
||||
@ -48,6 +53,7 @@ func (adapter *impl) HandleMessage(
|
||||
|
||||
// TODO(brian): put this in a helper function
|
||||
if bsmsg == nil || p == nil {
|
||||
adapter.receiver.ReceiveError(errors.New("ReceiveMessage returned nil peer or message"))
|
||||
return nil
|
||||
}
|
||||
|
||||
@ -57,6 +63,7 @@ func (adapter *impl) HandleMessage(
|
||||
return nil
|
||||
}
|
||||
|
||||
log.Debugf("Message size: %d", len(outgoing.Data()))
|
||||
return outgoing
|
||||
}
|
||||
|
||||
|
@ -2,6 +2,9 @@ package merkledag
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"time"
|
||||
|
||||
"code.google.com/p/go.net/context"
|
||||
|
||||
mh "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multihash"
|
||||
blocks "github.com/jbenet/go-ipfs/blocks"
|
||||
@ -204,10 +207,24 @@ func (n *DAGService) Get(k u.Key) (*Node, error) {
|
||||
return nil, fmt.Errorf("DAGService is nil")
|
||||
}
|
||||
|
||||
b, err := n.Blocks.GetBlock(k)
|
||||
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
|
||||
b, err := n.Blocks.GetBlock(ctx, k)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
return Decoded(b.Data)
|
||||
}
|
||||
|
||||
func (n *DAGService) Remove(nd *Node) error {
|
||||
for _, l := range nd.Links {
|
||||
if l.Node != nil {
|
||||
n.Remove(l.Node)
|
||||
}
|
||||
}
|
||||
k, err := nd.Key()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
return n.Blocks.DeleteBlock(k)
|
||||
}
|
||||
|
@ -21,7 +21,7 @@ const (
|
||||
ChanBuffer = 10
|
||||
|
||||
// MaxMessageSize is the size of the largest single message
|
||||
MaxMessageSize = 1 << 20 // 1 MB
|
||||
MaxMessageSize = 1 << 22 // 4 MB
|
||||
|
||||
// HandshakeTimeout for when nodes first connect
|
||||
HandshakeTimeout = time.Second * 5
|
||||
@ -97,6 +97,17 @@ func (c *singleConn) close() error {
|
||||
return err
|
||||
}
|
||||
|
||||
func (c *singleConn) GetError() error {
|
||||
select {
|
||||
case err := <-c.msgio.incoming.ErrChan:
|
||||
return err
|
||||
case err := <-c.msgio.outgoing.ErrChan:
|
||||
return err
|
||||
default:
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// ID is an identifier unique to this connection.
|
||||
func (c *singleConn) ID() string {
|
||||
return ID(c)
|
||||
|
@ -46,7 +46,7 @@ func Handshake1(ctx context.Context, c Conn) error {
|
||||
return fmt.Errorf("could not decode remote version: %q", err)
|
||||
}
|
||||
|
||||
log.Debug("Received remote version (%s) from %s", remoteH, rpeer)
|
||||
log.Debugf("Received remote version (%s) from %s", remoteH, rpeer)
|
||||
}
|
||||
|
||||
if err := handshake.Handshake1Compatible(localH, remoteH); err != nil {
|
||||
|
@ -37,6 +37,8 @@ type Conn interface {
|
||||
// Out returns a writable message channel
|
||||
Out() chan<- []byte
|
||||
|
||||
GetError() error
|
||||
|
||||
// Close ends the connection
|
||||
// Close() error -- already in ContextCloser
|
||||
}
|
||||
|
@ -67,7 +67,7 @@ func (c *MultiConn) Add(conns ...Conn) {
|
||||
defer c.Unlock()
|
||||
|
||||
for _, c2 := range conns {
|
||||
log.Info("MultiConn: adding %s", c2)
|
||||
log.Infof("MultiConn: adding %s", c2)
|
||||
if c.LocalPeer() != c2.LocalPeer() || c.RemotePeer() != c2.RemotePeer() {
|
||||
log.Error(c2)
|
||||
c.Unlock() // ok to unlock (to log). panicing.
|
||||
@ -82,7 +82,7 @@ func (c *MultiConn) Add(conns ...Conn) {
|
||||
|
||||
c.conns[c2.ID()] = c2
|
||||
go c.fanInSingle(c2)
|
||||
log.Info("MultiConn: added %s", c2)
|
||||
log.Infof("MultiConn: added %s", c2)
|
||||
}
|
||||
}
|
||||
|
||||
@ -146,7 +146,7 @@ func (c *MultiConn) fanOut() {
|
||||
// send data out through our "best connection"
|
||||
case m, more := <-c.duplex.Out:
|
||||
if !more {
|
||||
log.Info("%s out channel closed", c)
|
||||
log.Infof("%s out channel closed", c)
|
||||
return
|
||||
}
|
||||
sc := c.BestConn()
|
||||
@ -156,7 +156,7 @@ func (c *MultiConn) fanOut() {
|
||||
}
|
||||
|
||||
i++
|
||||
log.Info("%s sending (%d)", sc, i)
|
||||
log.Infof("%s sending (%d)", sc, i)
|
||||
sc.Out() <- m
|
||||
}
|
||||
}
|
||||
@ -170,7 +170,7 @@ func (c *MultiConn) fanInSingle(child Conn) {
|
||||
|
||||
// cleanup all data associated with this child Connection.
|
||||
defer func() {
|
||||
log.Info("closing: %s", child)
|
||||
log.Infof("closing: %s", child)
|
||||
|
||||
// in case it still is in the map, remove it.
|
||||
c.Lock()
|
||||
@ -197,11 +197,15 @@ func (c *MultiConn) fanInSingle(child Conn) {
|
||||
|
||||
case m, more := <-child.In(): // receiving data
|
||||
if !more {
|
||||
log.Info("%s in channel closed", child)
|
||||
log.Infof("%s in channel closed", child)
|
||||
err := c.GetError()
|
||||
if err != nil {
|
||||
log.Errorf("Found error on connection: %s", err)
|
||||
}
|
||||
return // closed
|
||||
}
|
||||
i++
|
||||
log.Info("%s received (%d)", child, i)
|
||||
log.Infof("%s received (%d)", child, i)
|
||||
c.duplex.In <- m
|
||||
}
|
||||
}
|
||||
@ -209,7 +213,7 @@ func (c *MultiConn) fanInSingle(child Conn) {
|
||||
|
||||
// close is the internal close function, called by ContextCloser.Close
|
||||
func (c *MultiConn) close() error {
|
||||
log.Debug("%s closing Conn with %s", c.local, c.remote)
|
||||
log.Debugf("%s closing Conn with %s", c.local, c.remote)
|
||||
|
||||
// get connections
|
||||
c.RLock()
|
||||
@ -291,3 +295,13 @@ func (c *MultiConn) In() <-chan []byte {
|
||||
func (c *MultiConn) Out() chan<- []byte {
|
||||
return c.duplex.Out
|
||||
}
|
||||
|
||||
func (c *MultiConn) GetError() error {
|
||||
for _, sub := range c.conns {
|
||||
err := sub.GetError()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
@ -134,3 +134,7 @@ func (c *secureConn) In() <-chan []byte {
|
||||
func (c *secureConn) Out() chan<- []byte {
|
||||
return c.secure.Out
|
||||
}
|
||||
|
||||
func (c *secureConn) GetError() error {
|
||||
return c.insecure.GetError()
|
||||
}
|
||||
|
@ -92,7 +92,7 @@ func (s *Swarm) connSetup(c conn.Conn) (conn.Conn, error) {
|
||||
return nil, errors.New("Tried to start nil connection.")
|
||||
}
|
||||
|
||||
log.Debug("%s Started connection: %s", c.LocalPeer(), c.RemotePeer())
|
||||
log.Debugf("%s Started connection: %s", c.LocalPeer(), c.RemotePeer())
|
||||
|
||||
// add address of connection to Peer. Maybe it should happen in connSecure.
|
||||
// NOT adding this address here, because the incoming address in TCP
|
||||
@ -154,6 +154,9 @@ func (s *Swarm) fanOut() {
|
||||
log.Infof("%s outgoing channel closed", s)
|
||||
return
|
||||
}
|
||||
if len(msg.Data()) >= conn.MaxMessageSize {
|
||||
log.Critical("Attempted to send message bigger than max size.")
|
||||
}
|
||||
|
||||
s.connsLock.RLock()
|
||||
c, found := s.conns[msg.Peer().Key()]
|
||||
|
@ -23,7 +23,7 @@ import (
|
||||
|
||||
var log = u.Logger("dht")
|
||||
|
||||
const doPinging = true
|
||||
const doPinging = false
|
||||
|
||||
// TODO. SEE https://github.com/jbenet/node-ipfs/blob/master/submodules/ipfs-dht/index.js
|
||||
|
||||
@ -540,7 +540,11 @@ func (dht *IpfsDHT) PingRoutine(t time.Duration) {
|
||||
func (dht *IpfsDHT) Bootstrap(ctx context.Context) {
|
||||
id := make([]byte, 16)
|
||||
rand.Read(id)
|
||||
_, err := dht.FindPeer(ctx, peer.ID(id))
|
||||
p, err := dht.FindPeer(ctx, peer.ID(id))
|
||||
if err != nil {
|
||||
log.Error("Bootstrap peer error: %s", err)
|
||||
}
|
||||
err = dht.dialer.DialPeer(p)
|
||||
if err != nil {
|
||||
log.Errorf("Bootstrap peer error: %s", err)
|
||||
}
|
||||
|
@ -145,7 +145,7 @@ func (dht *IpfsDHT) handleGetProviders(p peer.Peer, pmes *pb.Message) (*pb.Messa
|
||||
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
|
||||
|
||||
// check if we have this value, to add ourselves as provider.
|
||||
log.Debugf("handling GetProviders: '%s'", pmes.GetKey())
|
||||
log.Debugf("handling GetProviders: '%s'", u.Key(pmes.GetKey()))
|
||||
dsk := u.Key(pmes.GetKey()).DsKey()
|
||||
has, err := dht.datastore.Has(dsk)
|
||||
if err != nil && err != ds.ErrNotFound {
|
||||
|
Reference in New Issue
Block a user