1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-28 17:03:58 +08:00

Merge pull request #490 from jbenet/more-dht-fixes

various fixes
This commit is contained in:
Juan Batiz-Benet
2015-01-05 05:59:17 -08:00
39 changed files with 710 additions and 226 deletions

View File

@ -13,20 +13,25 @@ vendor: godep
install:
cd cmd/ipfs && go install
test: test_go test_sharness
##############################################################
# tests targets
test: test_expensive
test_short: test_go_short test_sharness_short
test_expensive: test_go_expensive test_sharness_expensive
test_docker:
cd dockertest/ && make
test_go:
test_go_short:
go test -test.short ./...
test_go_expensive:
go test ./...
test_sharness:
test_sharness_short:
cd test/ && make
test_sharness_expensive:

View File

@ -12,7 +12,7 @@ import (
// Mocks returns |n| connected mock Blockservices
func Mocks(t *testing.T, n int) []*BlockService {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0))
sg := bitswap.NewSessionGenerator(net)
sg := bitswap.NewTestSessionGenerator(net)
instances := sg.Instances(n)

View File

@ -186,7 +186,7 @@ func initConfig(configFilename string, dspathOverride string, nBitsForKeypair in
Addresses: config.Addresses{
Swarm: []string{
"/ip4/0.0.0.0/tcp/4001",
"/ip4/0.0.0.0/udp/4002/utp",
// "/ip4/0.0.0.0/udp/4002/utp", // disabled for now.
},
API: "/ip4/127.0.0.1/tcp/5001",
},

View File

@ -10,11 +10,10 @@ import (
"github.com/jbenet/go-ipfs/exchange/offline"
mdag "github.com/jbenet/go-ipfs/merkledag"
nsys "github.com/jbenet/go-ipfs/namesys"
ci "github.com/jbenet/go-ipfs/p2p/crypto"
mocknet "github.com/jbenet/go-ipfs/p2p/net/mock"
peer "github.com/jbenet/go-ipfs/p2p/peer"
path "github.com/jbenet/go-ipfs/path"
dht "github.com/jbenet/go-ipfs/routing/dht"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
ds2 "github.com/jbenet/go-ipfs/util/datastore2"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
@ -29,23 +28,19 @@ func NewMockNode() (*IpfsNode, error) {
nd := new(IpfsNode)
// Generate Identity
sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024)
if err != nil {
return nil, err
}
p, err := peer.IDFromPublicKey(pk)
ident, err := testutil.RandIdentity()
if err != nil {
return nil, err
}
p := ident.ID()
nd.Identity = p
nd.PrivateKey = sk
nd.PrivateKey = ident.PrivateKey()
nd.Peerstore = peer.NewPeerstore()
nd.Peerstore.AddPrivKey(p, sk)
nd.Peerstore.AddPubKey(p, pk)
nd.Peerstore.AddPrivKey(p, ident.PrivateKey())
nd.Peerstore.AddPubKey(p, ident.PublicKey())
nd.PeerHost, err = mocknet.New(ctx).AddPeer(sk, testutil.RandLocalTCPAddress()) // effectively offline
nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline
if err != nil {
return nil, err
}
@ -55,8 +50,7 @@ func NewMockNode() (*IpfsNode, error) {
nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore))
// Routing
dht := dht.NewDHT(ctx, nd.PeerHost, nd.Datastore)
nd.Routing = dht
nd.Routing = mockrouting.NewServer().Client(ident)
// Bitswap
bstore := blockstore.NewBlockstore(nd.Datastore)
@ -68,7 +62,7 @@ func NewMockNode() (*IpfsNode, error) {
nd.DAG = mdag.NewDAGService(bserv)
// Namespace resolver
nd.Namesys = nsys.NewNameSystem(dht)
nd.Namesys = nsys.NewNameSystem(nd.Routing)
// Path resolver
nd.Resolver = &path.Resolver{DAG: nd.DAG}

View File

@ -9,8 +9,7 @@
},
"Addresses": {
"Swarm": [
"/ip4/0.0.0.0/tcp/4011",
"/ip4/0.0.0.0/udp/4012/utp"
"/ip4/0.0.0.0/tcp/4011"
],
"API": "/ip4/127.0.0.1/tcp/5001"
},

View File

@ -2,8 +2,7 @@
"Addresses": {
"API": "/ip4/127.0.0.1/tcp/5001",
"Swarm": [
"/ip4/0.0.0.0/tcp/4031",
"/ip4/0.0.0.0/udp/4032/utp"
"/ip4/0.0.0.0/tcp/4031"
]
},
"Bootstrap": [

View File

@ -1,14 +1,16 @@
ipfs bootstrap add /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_TCP_PORT/QmNXuBh8HFsWq68Fid8dMbGNQTh7eG6hV9rr1fQyfmfomE
echo "dockertest> starting client daemon"
ipfs daemon &
sleep 3
while [ ! -f /data/idtiny ]
do
echo waiting for server to add the file...
echo "dockertest> waiting for server to add the file..."
sleep 1
done
echo client found file with hash: $(cat /data/idtiny)
echo "dockertest> client found file with hash:" $(cat /data/idtiny)
ipfs cat $(cat /data/idtiny) > filetiny
@ -23,10 +25,10 @@ fi
while [ ! -f /data/idrand ]
do
echo waiting for server to add the file...
echo "dockertest> waiting for server to add the file..."
sleep 1
done
echo client found file with hash: $(cat /data/idrand)
echo "dockertest> client found file with hash:" $(cat /data/idrand)
cat /data/idrand
@ -44,4 +46,4 @@ if (($? > 0)); then
exit 1
fi
echo "success"
echo "dockertest> success"

View File

@ -2,8 +2,7 @@
"Addresses": {
"API": "/ip4/127.0.0.1/tcp/5001",
"Swarm": [
"/ip4/0.0.0.0/tcp/4021",
"/ip4/0.0.0.0/udp/4022/utp"
"/ip4/0.0.0.0/tcp/4021"
]
},
"Bootstrap": [

View File

@ -3,6 +3,7 @@ ipfs bootstrap add /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_T
# wait for daemon to start/bootstrap
# alternatively use ipfs swarm connect
echo "dockertest> starting server daemon"
ipfs daemon &
sleep 3
# TODO instead of bootrapping: ipfs swarm connect /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_TCP_PORT/QmNXuBh8HFsWq68Fid8dMbGNQTh7eG6hV9rr1fQyfmfomE
@ -10,11 +11,11 @@ sleep 3
# must mount this volume from data container
ipfs add -q /data/filetiny > tmptiny
mv tmptiny /data/idtiny
echo added tiny file. hash is $(cat /data/idtiny)
echo "dockertest> added tiny file. hash is" $(cat /data/idtiny)
ipfs add -q /data/filerand > tmprand
mv tmprand /data/idrand
echo added rand file. hash is $(cat /data/idrand)
echo "dockertest> added rand file. hash is" $(cat /data/idrand)
# allow ample time for the client to pull the data
sleep 10000000

View File

@ -61,6 +61,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
}()
bs := &bitswap{
self: p,
blockstore: bstore,
cancelFunc: cancelFunc,
notifications: notif,
@ -79,6 +80,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork,
// bitswap instances implement the bitswap protocol.
type bitswap struct {
// the ID of the peer to act on behalf of
self peer.ID
// network delivers messages on behalf of the session
network bsnet.BitSwapNetwork
@ -104,6 +108,7 @@ type bitswap struct {
// GetBlock attempts to retrieve a particular block from peers within the
// deadline enforced by the context.
func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) {
log := log.Prefix("bitswap(%s).GetBlock(%s)", bs.self, k)
// Any async work initiated by this function must end when this function
// returns. To ensure this, derive a new context. Note that it is okay to
@ -116,10 +121,12 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err
ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest"))
log.Event(ctx, "GetBlockRequestBegin", &k)
log.Debugf("GetBlockRequestBegin")
defer func() {
cancelFunc()
log.Event(ctx, "GetBlockRequestEnd", &k)
log.Debugf("GetBlockRequestEnd")
}()
promise, err := bs.GetBlocks(ctx, []u.Key{k})
@ -166,67 +173,109 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error {
return bs.network.Provide(ctx, blk.Key())
}
func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.ID) error {
func (bs *bitswap) sendWantlistMsgToPeer(ctx context.Context, m bsmsg.BitSwapMessage, p peer.ID) error {
log := log.Prefix("bitswap(%s).bitswap.sendWantlistMsgToPeer(%d, %s)", bs.self, len(m.Wantlist()), p)
log.Debug("sending wantlist")
if err := bs.send(ctx, p, m); err != nil {
log.Errorf("send wantlist error: %s", err)
return err
}
log.Debugf("send wantlist success")
return nil
}
func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error {
if peers == nil {
panic("Cant send wantlist to nil peerchan")
}
message := bsmsg.New()
for _, wanted := range bs.wantlist.Entries() {
message.AddEntry(wanted.Key, wanted.Priority)
}
log := log.Prefix("bitswap(%s).sendWantlistMsgToPeers(%d)", bs.self, len(m.Wantlist()))
log.Debugf("begin")
defer log.Debugf("end")
set := pset.New()
wg := sync.WaitGroup{}
for peerToQuery := range peers {
log.Event(ctx, "PeerToQuery", peerToQuery)
if !set.TryAdd(peerToQuery) { //Do once per peer
log.Debugf("%s skipped (already sent)", peerToQuery)
continue
}
log.Debugf("%s sending", peerToQuery)
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
if err := bs.send(ctx, p, message); err != nil {
log.Error(err)
return
}
bs.sendWantlistMsgToPeer(ctx, m, p)
}(peerToQuery)
}
wg.Wait()
return nil
}
func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantlist.ThreadSafe) {
func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error {
message := bsmsg.New()
message.SetFull(true)
for _, wanted := range bs.wantlist.Entries() {
message.AddEntry(wanted.Key, wanted.Priority)
}
return bs.sendWantlistMsgToPeers(ctx, message, peers)
}
func (bs *bitswap) sendWantlistToProviders(ctx context.Context) {
log := log.Prefix("bitswap(%s).sendWantlistToProviders ", bs.self)
log.Debugf("begin")
defer log.Debugf("end")
ctx, cancel := context.WithCancel(ctx)
defer cancel()
message := bsmsg.New()
message.SetFull(true)
for _, e := range bs.wantlist.Entries() {
message.AddEntry(e.Key, e.Priority)
}
set := pset.New()
// prepare a channel to hand off to sendWantlistToPeers
sendToPeers := make(chan peer.ID)
// Get providers for all entries in wantlist (could take a while)
wg := sync.WaitGroup{}
for _, e := range wantlist.Entries() {
for _, e := range bs.wantlist.Entries() {
wg.Add(1)
go func(k u.Key) {
defer wg.Done()
log := log.Prefix("(entry: %s) ", k)
log.Debug("asking dht for providers")
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest)
for prov := range providers {
if set.TryAdd(prov) { //Do once per peer
bs.send(ctx, prov, message)
}
log.Debugf("dht returned provider %s. send wantlist", prov)
sendToPeers <- prov
}
}(e.Key)
}
wg.Wait()
go func() {
wg.Wait() // make sure all our children do finish.
close(sendToPeers)
}()
err := bs.sendWantlistToPeers(ctx, sendToPeers)
if err != nil {
log.Errorf("sendWantlistToPeers error: %s", err)
}
}
func (bs *bitswap) taskWorker(ctx context.Context) {
log := log.Prefix("bitswap(%s).taskWorker", bs.self)
for {
select {
case <-ctx.Done():
log.Debugf("exiting")
return
case envelope := <-bs.engine.Outbox():
log.Debugf("message to %s sending...", envelope.Peer)
bs.send(ctx, envelope.Peer, envelope.Message)
log.Debugf("message to %s sent", envelope.Peer)
}
}
}
@ -243,7 +292,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
select {
case <-broadcastSignal:
// Resend unfulfilled wantlist keys
bs.sendWantlistToProviders(ctx, bs.wantlist)
bs.sendWantlistToProviders(ctx)
broadcastSignal = time.After(rebroadcastDelay.Get())
case ks := <-bs.batchRequests:
if len(ks) == 0 {
@ -262,7 +311,7 @@ func (bs *bitswap) clientWorker(parent context.Context) {
// newer bitswap strategies.
child, _ := context.WithTimeout(ctx, providerRequestTimeout)
providers := bs.network.FindProvidersAsync(child, ks[0], maxProvidersPerRequest)
err := bs.sendWantListTo(ctx, providers)
err := bs.sendWantlistToPeers(ctx, providers)
if err != nil {
log.Errorf("error sending wantlist: %s", err)
}
@ -336,11 +385,6 @@ func (bs *bitswap) ReceiveError(err error) {
// send strives to ensure that accounting is always performed when a message is
// sent
func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error {
log.Event(ctx, "DialPeer", p)
err := bs.network.DialPeer(ctx, p)
if err != nil {
return errors.Wrap(err)
}
if err := bs.network.SendMessage(ctx, p, m); err != nil {
return errors.Wrap(err)
}

View File

@ -11,10 +11,10 @@ import (
blocks "github.com/jbenet/go-ipfs/blocks"
blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
p2ptestutil "github.com/jbenet/go-ipfs/p2p/test/util"
mockrouting "github.com/jbenet/go-ipfs/routing/mock"
u "github.com/jbenet/go-ipfs/util"
delay "github.com/jbenet/go-ipfs/util/delay"
"github.com/jbenet/go-ipfs/util/testutil"
)
// FIXME the tests are really sensitive to the network delay. fix them to work
@ -25,7 +25,7 @@ func TestClose(t *testing.T) {
// TODO
t.Skip("TODO Bitswap's Close implementation is a WIP")
vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sesgen := NewSessionGenerator(vnet)
sesgen := NewTestSessionGenerator(vnet)
defer sesgen.Close()
bgen := blocksutil.NewBlockGenerator()
@ -39,7 +39,7 @@ func TestClose(t *testing.T) {
func TestGetBlockTimeout(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
g := NewSessionGenerator(net)
g := NewTestSessionGenerator(net)
defer g.Close()
self := g.Next()
@ -57,11 +57,11 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this
rs := mockrouting.NewServer()
net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay))
g := NewSessionGenerator(net)
g := NewTestSessionGenerator(net)
defer g.Close()
block := blocks.NewBlock([]byte("block"))
pinfo := testutil.RandIdentityOrFatal(t)
pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t)
rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network
solo := g.Next()
@ -81,7 +81,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
block := blocks.NewBlock([]byte("block"))
g := NewSessionGenerator(net)
g := NewTestSessionGenerator(net)
defer g.Close()
hasBlock := g.Next()
@ -134,7 +134,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.SkipNow()
}
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewSessionGenerator(net)
sg := NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
@ -198,7 +198,7 @@ func TestSendToWantingPeer(t *testing.T) {
}
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewSessionGenerator(net)
sg := NewTestSessionGenerator(net)
defer sg.Close()
bg := blocksutil.NewBlockGenerator()
@ -243,7 +243,7 @@ func TestSendToWantingPeer(t *testing.T) {
func TestBasicBitswap(t *testing.T) {
net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay))
sg := NewSessionGenerator(net)
sg := NewTestSessionGenerator(net)
bg := blocksutil.NewBlockGenerator()
t.Log("Test a few nodes trying to get one file with a lot of blocks")

View File

@ -8,7 +8,7 @@ import (
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
peer "github.com/jbenet/go-ipfs/p2p/peer"
u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
// TODO consider taking responsibility for other types of requests. For
@ -41,7 +41,7 @@ import (
// whatever it sees fit to produce desired outcomes (get wanted keys
// quickly, maintain good relationships with peers, etc).
var log = u.Logger("engine")
var log = eventlog.Logger("engine")
const (
sizeOutboxChan = 4
@ -91,6 +91,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine {
}
func (e *Engine) taskWorker(ctx context.Context) {
log := log.Prefix("bitswap.Engine.taskWorker")
for {
nextTask := e.peerRequestQueue.Pop()
if nextTask == nil {
@ -98,11 +99,16 @@ func (e *Engine) taskWorker(ctx context.Context) {
// Wait until there are!
select {
case <-ctx.Done():
log.Debugf("exiting: %s", ctx.Err())
return
case <-e.workSignal:
log.Debugf("woken up")
}
continue
}
log := log.Prefix("%s", nextTask)
log.Debugf("processing")
block, err := e.bs.Get(nextTask.Entry.Key)
if err != nil {
log.Warning("engine: task exists to send block, but block is not in blockstore")
@ -113,10 +119,12 @@ func (e *Engine) taskWorker(ctx context.Context) {
m := bsmsg.New()
m.AddBlock(block)
// TODO: maybe add keys from our wantlist?
log.Debugf("sending...")
select {
case <-ctx.Done():
return
case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}:
log.Debugf("sent")
}
}
}
@ -140,16 +148,21 @@ func (e *Engine) Peers() []peer.ID {
// MessageReceived performs book-keeping. Returns error if passed invalid
// arguments.
func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
log := log.Prefix("bitswap.Engine.MessageReceived(%s)", p)
log.Debugf("enter. %d entries %d blocks", len(m.Wantlist()), len(m.Blocks()))
defer log.Debugf("exit")
if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 {
log.Info("superfluous message")
}
newWorkExists := false
defer func() {
if newWorkExists {
// Signal task generation to restart (if stopped!)
select {
case e.workSignal <- struct{}{}:
default:
}
e.signalNewWork()
}
}()
e.lock.Lock()
defer e.lock.Unlock()
@ -157,11 +170,14 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
if m.Full() {
l.wantList = wl.New()
}
for _, entry := range m.Wantlist() {
if entry.Cancel {
log.Debug("cancel", entry.Key)
l.CancelWant(entry.Key)
e.peerRequestQueue.Remove(entry.Key, p)
} else {
log.Debug("wants", entry.Key, entry.Priority)
l.Wants(entry.Key, entry.Priority)
if exists, err := e.bs.Has(entry.Key); err == nil && exists {
newWorkExists = true
@ -172,6 +188,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error {
for _, block := range m.Blocks() {
// FIXME extract blocks.NumBytes(block) or block.NumBytes() method
log.Debug("got block %s %d bytes", block.Key(), len(block.Data))
l.ReceivedBytes(len(block.Data))
for _, l := range e.ledgerMap {
if l.WantListContains(block.Key()) {
@ -222,3 +239,11 @@ func (e *Engine) findOrCreate(p peer.ID) *ledger {
}
return l
}
func (e *Engine) signalNewWork() {
// Signal task generation to restart (if stopped!)
select {
case e.workSignal <- struct{}{}:
default:
}
}

View File

@ -1,6 +1,7 @@
package decision
import (
"fmt"
"sync"
wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist"
@ -30,6 +31,10 @@ type task struct {
Trash bool
}
func (t *task) String() string {
return fmt.Sprintf("<Task %s, %s, %v>", t.Target, t.Entry.Key, t.Trash)
}
// Push currently adds a new task to the end of the list
func (tl *taskQueue) Push(entry wantlist.Entry, to peer.ID) {
tl.lock.Lock()

View File

@ -14,9 +14,6 @@ var ProtocolBitswap protocol.ID = "/ipfs/bitswap"
// BitSwapNetwork provides network connectivity for BitSwap sessions
type BitSwapNetwork interface {
// DialPeer ensures there is a connection to peer.
DialPeer(context.Context, peer.ID) error
// SendMessage sends a BitSwap message to a peer.
SendMessage(
context.Context,

View File

@ -2,15 +2,17 @@ package network
import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
host "github.com/jbenet/go-ipfs/p2p/host"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
routing "github.com/jbenet/go-ipfs/routing"
util "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
var log = util.Logger("bitswap_network")
var log = eventlog.Logger("bitswap_network")
// NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host
func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork {
@ -32,22 +34,34 @@ type impl struct {
receiver Receiver
}
func (bsnet *impl) DialPeer(ctx context.Context, p peer.ID) error {
return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p})
}
func (bsnet *impl) SendMessage(
ctx context.Context,
p peer.ID,
outgoing bsmsg.BitSwapMessage) error {
log := log.Prefix("bitswap net SendMessage to %s", p)
// ensure we're connected
//TODO(jbenet) move this into host.NewStream?
if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil {
return err
}
log.Debug("opening stream")
s, err := bsnet.host.NewStream(ProtocolBitswap, p)
if err != nil {
return err
}
defer s.Close()
return outgoing.ToNet(s)
log.Debug("sending")
if err := outgoing.ToNet(s); err != nil {
log.Errorf("error: %s", err)
return err
}
log.Debug("sent")
return err
}
func (bsnet *impl) SendRequest(
@ -55,17 +69,36 @@ func (bsnet *impl) SendRequest(
p peer.ID,
outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) {
log := log.Prefix("bitswap net SendRequest to %s", p)
// ensure we're connected
//TODO(jbenet) move this into host.NewStream?
if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil {
return nil, err
}
log.Debug("opening stream")
s, err := bsnet.host.NewStream(ProtocolBitswap, p)
if err != nil {
return nil, err
}
defer s.Close()
log.Debug("sending")
if err := outgoing.ToNet(s); err != nil {
log.Errorf("error: %s", err)
return nil, err
}
return bsmsg.FromNet(s)
log.Debug("sent, now receiveing")
incoming, err := bsmsg.FromNet(s)
if err != nil {
log.Errorf("error: %s", err)
return incoming, err
}
log.Debug("received")
return incoming, nil
}
func (bsnet *impl) SetDelegate(r Receiver) {
@ -82,6 +115,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int)
bsnet.host.Peerstore().AddAddresses(info.ID, info.Addrs)
select {
case <-ctx.Done():
return
case out <- info.ID:
}
}
@ -96,23 +130,21 @@ func (bsnet *impl) Provide(ctx context.Context, k util.Key) error {
// handleNewStream receives a new stream from the network.
func (bsnet *impl) handleNewStream(s inet.Stream) {
defer s.Close()
if bsnet.receiver == nil {
return
}
go func() {
defer s.Close()
received, err := bsmsg.FromNet(s)
if err != nil {
go bsnet.receiver.ReceiveError(err)
return
}
p := s.Conn().RemotePeer()
ctx := context.Background()
bsnet.receiver.ReceiveMessage(ctx, p, received)
}()
received, err := bsmsg.FromNet(s)
if err != nil {
go bsnet.receiver.ReceiveError(err)
log.Errorf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err)
return
}
p := s.Conn().RemotePeer()
ctx := context.Background()
log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer())
bsnet.receiver.ReceiveMessage(ctx, p, received)
}

View File

@ -2,7 +2,6 @@ package bitswap
import (
"errors"
"fmt"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message"
@ -178,14 +177,6 @@ func (nc *networkClient) Provide(ctx context.Context, k util.Key) error {
return nc.routing.Provide(ctx, k)
}
func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error {
// no need to do anything because dialing isn't a thing in this test net.
if !nc.network.HasPeer(p) {
return fmt.Errorf("Peer not in network: %s", p)
}
return nil
}
func (nc *networkClient) SetDelegate(r bsnet.Receiver) {
nc.Receiver = r
}

View File

@ -10,12 +10,14 @@ import (
exchange "github.com/jbenet/go-ipfs/exchange"
tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet"
peer "github.com/jbenet/go-ipfs/p2p/peer"
p2ptestutil "github.com/jbenet/go-ipfs/p2p/test/util"
datastore2 "github.com/jbenet/go-ipfs/util/datastore2"
delay "github.com/jbenet/go-ipfs/util/delay"
testutil "github.com/jbenet/go-ipfs/util/testutil"
)
func NewSessionGenerator(
// WARNING: this uses RandTestBogusIdentity DO NOT USE for NON TESTS!
func NewTestSessionGenerator(
net tn.Network) SessionGenerator {
ctx, cancel := context.WithCancel(context.TODO())
return SessionGenerator{
@ -41,7 +43,7 @@ func (g *SessionGenerator) Close() error {
func (g *SessionGenerator) Next() Instance {
g.seq++
p, err := testutil.RandIdentity()
p, err := p2ptestutil.RandTestBogusIdentity()
if err != nil {
panic("FIXME") // TODO change signature
}

View File

@ -248,7 +248,7 @@ func TestFastRepublish(t *testing.T) {
// get first resolved hash
log.Debug("publishing first hash")
writeFileData(t, dataA, fname) // random
<-time.After(shortRepublishTimeout * 11 / 10)
<-time.After(shortRepublishTimeout * 2)
log.Debug("resolving first hash")
resolvedHash, err := node.Namesys.Resolve(pubkeyHash)
if err != nil {

View File

@ -229,7 +229,11 @@ func (n *dagService) Get(k u.Key) (*Node, error) {
return nil, fmt.Errorf("dagService is nil")
}
ctx, _ := context.WithTimeout(context.TODO(), time.Second*5)
ctx, _ := context.WithTimeout(context.TODO(), time.Minute)
// we shouldn't use an arbitrary timeout here.
// since Get doesnt take in a context yet, we give a large upper bound.
// think of an http request. we want it to go on as long as the client requests it.
b, err := n.Blocks.GetBlock(ctx, k)
if err != nil {
return nil, err

View File

@ -14,7 +14,7 @@ func TestRoutingResolve(t *testing.T) {
resolver := NewRoutingResolver(d)
publisher := NewRoutingPublisher(d)
privk, pubk, err := testutil.RandKeyPair(512)
privk, pubk, err := testutil.RandTestKeyPair(512)
if err != nil {
t.Fatal(err)
}

View File

@ -9,7 +9,7 @@ import (
)
func TestRsaKeys(t *testing.T) {
sk, pk, err := tu.RandKeyPair(512)
sk, pk, err := tu.RandTestKeyPair(512)
if err != nil {
t.Fatal(err)
}
@ -93,7 +93,7 @@ func testKeyEquals(t *testing.T, k Key) {
t.Fatal("Key not equal to key with same bytes.")
}
sk, pk, err := tu.RandKeyPair(512)
sk, pk, err := tu.RandTestKeyPair(512)
if err != nil {
t.Fatal(err)
}

View File

@ -2,14 +2,15 @@ package mocknet
import (
"fmt"
"sort"
"sync"
"time"
ic "github.com/jbenet/go-ipfs/p2p/crypto"
host "github.com/jbenet/go-ipfs/p2p/host"
bhost "github.com/jbenet/go-ipfs/p2p/host/basic"
inet "github.com/jbenet/go-ipfs/p2p/net"
peer "github.com/jbenet/go-ipfs/p2p/peer"
p2putil "github.com/jbenet/go-ipfs/p2p/test/util"
testutil "github.com/jbenet/go-ipfs/util/testutil"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
@ -44,7 +45,7 @@ func New(ctx context.Context) Mocknet {
}
func (mn *mocknet) GenPeer() (host.Host, error) {
sk, _, err := testutil.SeededKeyPair(time.Now().UnixNano())
sk, err := p2putil.RandTestBogusPrivateKey()
if err != nil {
return nil, err
}
@ -90,6 +91,7 @@ func (mn *mocknet) Peers() []peer.ID {
for _, n := range mn.nets {
cp = append(cp, n.peer)
}
sort.Sort(peer.IDSlice(cp))
return cp
}
@ -115,6 +117,8 @@ func (mn *mocknet) Hosts() []host.Host {
for _, h := range mn.hosts {
cp = append(cp, h)
}
sort.Sort(hostSlice(cp))
return cp
}
@ -126,6 +130,7 @@ func (mn *mocknet) Nets() []inet.Network {
for _, n := range mn.nets {
cp = append(cp, n)
}
sort.Sort(netSlice(cp))
return cp
}
@ -339,3 +344,17 @@ func (mn *mocknet) LinkDefaults() LinkOptions {
defer mn.RUnlock()
return mn.linkDefaults
}
// netSlice for sorting by peer
type netSlice []inet.Network
func (es netSlice) Len() int { return len(es) }
func (es netSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
func (es netSlice) Less(i, j int) bool { return string(es[i].LocalPeer()) < string(es[j].LocalPeer()) }
// hostSlice for sorting by peer
type hostSlice []host.Host
func (es hostSlice) Len() int { return len(es) }
func (es hostSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
func (es hostSlice) Less(i, j int) bool { return string(es[i].ID()) < string(es[j].ID()) }

View File

@ -26,15 +26,15 @@ func randPeer(t *testing.T) peer.ID {
func TestNetworkSetup(t *testing.T) {
ctx := context.Background()
sk1, _, err := testutil.RandKeyPair(512)
sk1, _, err := testutil.RandTestKeyPair(512)
if err != nil {
t.Fatal(t)
}
sk2, _, err := testutil.RandKeyPair(512)
sk2, _, err := testutil.RandTestKeyPair(512)
if err != nil {
t.Fatal(t)
}
sk3, _, err := testutil.RandKeyPair(512)
sk3, _, err := testutil.RandTestKeyPair(512)
if err != nil {
t.Fatal(t)
}
@ -398,7 +398,7 @@ func TestAdding(t *testing.T) {
peers := []peer.ID{}
for i := 0; i < 3; i++ {
sk, _, err := testutil.RandKeyPair(512)
sk, _, err := testutil.RandTestKeyPair(512)
if err != nil {
t.Fatal(err)
}

View File

@ -130,3 +130,10 @@ type PeerInfo struct {
ID ID
Addrs []ma.Multiaddr
}
// IDSlice for sorting peers
type IDSlice []ID
func (es IDSlice) Len() int { return len(es) }
func (es IDSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] }
func (es IDSlice) Less(i, j int) bool { return string(es[i]) < string(es[j]) }

View File

@ -41,7 +41,7 @@ type keyset struct {
func (ks *keyset) generate() error {
var err error
ks.sk, ks.pk, err = tu.RandKeyPair(512)
ks.sk, ks.pk, err = tu.RandTestKeyPair(512)
if err != nil {
return err
}

View File

@ -4,8 +4,11 @@ import (
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
peer "github.com/jbenet/go-ipfs/p2p/peer"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
)
var log = eventlog.Logger("peerqueue")
// ChanQueue makes any PeerQueue synchronizable through channels.
type ChanQueue struct {
Queue PeerQueue
@ -21,6 +24,7 @@ func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue {
}
func (cq *ChanQueue) process(ctx context.Context) {
log := log.Prefix("<ChanQueue %p>", cq)
// construct the channels here to be able to use them bidirectionally
enqChan := make(chan peer.ID)
@ -30,6 +34,8 @@ func (cq *ChanQueue) process(ctx context.Context) {
cq.DeqChan = deqChan
go func() {
log.Debug("processing")
defer log.Debug("closed")
defer close(deqChan)
var next peer.ID
@ -38,11 +44,13 @@ func (cq *ChanQueue) process(ctx context.Context) {
for {
if cq.Queue.Len() == 0 {
// log.Debug("wait for enqueue")
select {
case next, more = <-enqChan:
if !more {
return
}
// log.Debug("got", next)
case <-ctx.Done():
return
@ -50,19 +58,24 @@ func (cq *ChanQueue) process(ctx context.Context) {
} else {
next = cq.Queue.Dequeue()
// log.Debug("peek", next)
}
select {
case item, more = <-enqChan:
if !more {
return
if cq.Queue.Len() > 0 {
return // we're done done.
}
enqChan = nil // closed, so no use.
}
// log.Debug("got", item)
cq.Queue.Enqueue(item)
cq.Queue.Enqueue(next)
cq.Queue.Enqueue(next) // order may have changed.
next = ""
case deqChan <- next:
// log.Debug("dequeued", next)
next = ""
case <-ctx.Done():

165
p2p/test/util/key.go Normal file
View File

@ -0,0 +1,165 @@
package testutil
import (
"bytes"
"io"
"testing"
u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
testutil "github.com/jbenet/go-ipfs/util/testutil"
ic "github.com/jbenet/go-ipfs/p2p/crypto"
peer "github.com/jbenet/go-ipfs/p2p/peer"
ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr"
)
var log = eventlog.Logger("boguskey")
// TestBogusPrivateKey is a key used for testing (to avoid expensive keygen)
type TestBogusPrivateKey []byte
// TestBogusPublicKey is a key used for testing (to avoid expensive keygen)
type TestBogusPublicKey []byte
func (pk TestBogusPublicKey) Verify(data, sig []byte) (bool, error) {
log.Criticalf("TestBogusPublicKey.Verify -- this better be a test!")
return bytes.Equal(data, reverse(sig)), nil
}
func (pk TestBogusPublicKey) Bytes() ([]byte, error) {
return []byte(pk), nil
}
func (pk TestBogusPublicKey) Encrypt(b []byte) ([]byte, error) {
log.Criticalf("TestBogusPublicKey.Encrypt -- this better be a test!")
return reverse(b), nil
}
// Equals checks whether this key is equal to another
func (pk TestBogusPublicKey) Equals(k ic.Key) bool {
return ic.KeyEqual(pk, k)
}
func (pk TestBogusPublicKey) Hash() ([]byte, error) {
return ic.KeyHash(pk)
}
func (sk TestBogusPrivateKey) GenSecret() []byte {
return []byte(sk)
}
func (sk TestBogusPrivateKey) Sign(message []byte) ([]byte, error) {
log.Criticalf("TestBogusPrivateKey.Sign -- this better be a test!")
return reverse(message), nil
}
func (sk TestBogusPrivateKey) GetPublic() ic.PubKey {
return TestBogusPublicKey(sk)
}
func (sk TestBogusPrivateKey) Decrypt(b []byte) ([]byte, error) {
log.Criticalf("TestBogusPrivateKey.Decrypt -- this better be a test!")
return reverse(b), nil
}
func (sk TestBogusPrivateKey) Bytes() ([]byte, error) {
return []byte(sk), nil
}
// Equals checks whether this key is equal to another
func (sk TestBogusPrivateKey) Equals(k ic.Key) bool {
return ic.KeyEqual(sk, k)
}
func (sk TestBogusPrivateKey) Hash() ([]byte, error) {
return ic.KeyHash(sk)
}
func RandTestBogusPrivateKey() (TestBogusPrivateKey, error) {
r := u.NewTimeSeededRand()
k := make([]byte, 5)
if _, err := io.ReadFull(r, k); err != nil {
return nil, err
}
return TestBogusPrivateKey(k), nil
}
func RandTestBogusPublicKey() (TestBogusPublicKey, error) {
k, err := RandTestBogusPrivateKey()
return TestBogusPublicKey(k), err
}
func RandTestBogusPrivateKeyOrFatal(t *testing.T) TestBogusPrivateKey {
k, err := RandTestBogusPrivateKey()
if err != nil {
t.Fatal(err)
}
return k
}
func RandTestBogusPublicKeyOrFatal(t *testing.T) TestBogusPublicKey {
k, err := RandTestBogusPublicKey()
if err != nil {
t.Fatal(err)
}
return k
}
func RandTestBogusIdentity() (testutil.Identity, error) {
k, err := RandTestBogusPrivateKey()
if err != nil {
return nil, err
}
id, err := peer.IDFromPrivateKey(k)
if err != nil {
return nil, err
}
return &identity{
k: k,
id: id,
a: testutil.RandLocalTCPAddress(),
}, nil
}
func RandTestBogusIdentityOrFatal(t *testing.T) testutil.Identity {
k, err := RandTestBogusIdentity()
if err != nil {
t.Fatal(err)
}
return k
}
// identity is a temporary shim to delay binding of PeerNetParams.
type identity struct {
k TestBogusPrivateKey
id peer.ID
a ma.Multiaddr
}
func (p *identity) ID() peer.ID {
return p.id
}
func (p *identity) Address() ma.Multiaddr {
return p.a
}
func (p *identity) PrivateKey() ic.PrivKey {
return p.k
}
func (p *identity) PublicKey() ic.PubKey {
return p.k.GetPublic()
}
func reverse(a []byte) []byte {
b := make([]byte, len(a))
for i := 0; i < len(a); i++ {
b[i] = a[len(a)-1-i]
}
return b
}

View File

@ -89,6 +89,11 @@ func (dht *IpfsDHT) LocalPeer() peer.ID {
return dht.self
}
// log returns the dht's logger
func (dht *IpfsDHT) log() eventlog.EventLogger {
return log.Prefix("dht(%s)", dht.self)
}
// Connect to a new peer at the given address, ping and add to the routing table
func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error {
// TODO: change interface to accept a PeerInfo as well.

View File

@ -87,15 +87,11 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
start := time.Now()
log.Debugf("%s writing", dht.self)
if err := w.WriteMsg(pmes); err != nil {
return nil, err
}
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
log.Debugf("%s reading", dht.self)
defer log.Debugf("%s done", dht.self)
rpmes := new(pb.Message)
if err := r.ReadMsg(rpmes); err != nil {
return nil, err
@ -125,12 +121,10 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
w := ggio.NewDelimitedWriter(cw)
log.Debugf("%s writing", dht.self)
if err := w.WriteMsg(pmes); err != nil {
return err
}
log.Event(ctx, "dhtSentMessage", dht.self, p, pmes)
log.Debugf("%s done", dht.self)
return nil
}

View File

@ -231,6 +231,7 @@ func TestProvides(t *testing.T) {
}
func TestBootstrap(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
@ -388,6 +389,7 @@ func TestProvidesMany(t *testing.T) {
}
func TestProvidesAsync(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
@ -442,6 +444,7 @@ func TestProvidesAsync(t *testing.T) {
}
func TestLayeredGet(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
@ -482,6 +485,7 @@ func TestLayeredGet(t *testing.T) {
}
func TestFindPeer(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
@ -596,6 +600,7 @@ func testPeerListsMatch(t *testing.T, p1, p2 []peer.ID) {
}
func TestConnectCollision(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}

View File

@ -32,11 +32,10 @@ func TestGetFailures(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
peers := mn.Peers()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
d.Update(ctx, peers[1])
d.Update(ctx, hosts[1].ID())
// u.POut("NotFound Test\n")
// Reply with failures to every message
@ -47,7 +46,7 @@ func TestGetFailures(t *testing.T) {
// This one should time out
// u.POut("Timout Test\n")
ctx1, _ := context.WithTimeout(context.Background(), time.Second)
ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond)
if _, err := d.GetValue(ctx1, u.Key("test")); err != nil {
if err != context.DeadlineExceeded {
t.Fatal("Got different error than we expected", err)
@ -78,8 +77,12 @@ func TestGetFailures(t *testing.T) {
}
})
// This one should fail with NotFound
ctx2, _ := context.WithTimeout(context.Background(), 3*time.Second)
// This one should fail with NotFound.
// long context timeout to ensure we dont end too early.
// the dht should be exhausting its query and returning not found.
// (was 3 seconds before which should be _plenty_ of time, but maybe
// travis machines really have a hard time...)
ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second)
_, err = d.GetValue(ctx2, u.Key("test"))
if err != nil {
if err != routing.ErrNotFound {
@ -133,6 +136,7 @@ func TestGetFailures(t *testing.T) {
}
func TestNotFound(t *testing.T) {
// t.Skip("skipping test to debug another")
if testing.Short() {
t.SkipNow()
}
@ -143,12 +147,11 @@ func TestNotFound(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
peers := mn.Peers()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
for _, p := range peers {
d.Update(ctx, p)
for _, p := range hosts {
d.Update(ctx, p.ID())
}
// Reply with random peers to every message
@ -171,7 +174,7 @@ func TestNotFound(t *testing.T) {
ps := []peer.PeerInfo{}
for i := 0; i < 7; i++ {
p := peers[rand.Intn(len(peers))]
p := hosts[rand.Intn(len(hosts))].ID()
pi := host.Peerstore().PeerInfo(p)
ps = append(ps, pi)
}
@ -187,7 +190,8 @@ func TestNotFound(t *testing.T) {
})
}
ctx, _ = context.WithTimeout(ctx, time.Second*5)
// long timeout to ensure timing is not at play.
ctx, _ = context.WithTimeout(ctx, time.Second*20)
v, err := d.GetValue(ctx, u.Key("hello"))
log.Debugf("get value got %v", v)
if err != nil {
@ -207,6 +211,7 @@ func TestNotFound(t *testing.T) {
// If less than K nodes are in the entire network, it should fail when we make
// a GET rpc and nobody has the value
func TestLessThanKResponses(t *testing.T) {
// t.Skip("skipping test to debug another")
// t.Skip("skipping test because it makes a lot of output")
ctx := context.Background()
@ -215,13 +220,12 @@ func TestLessThanKResponses(t *testing.T) {
t.Fatal(err)
}
hosts := mn.Hosts()
peers := mn.Peers()
tsds := dssync.MutexWrap(ds.NewMapDatastore())
d := NewDHT(ctx, hosts[0], tsds)
for i := 1; i < 5; i++ {
d.Update(ctx, peers[i])
d.Update(ctx, hosts[i].ID())
}
// Reply with random peers to every message
@ -240,7 +244,7 @@ func TestLessThanKResponses(t *testing.T) {
switch pmes.GetType() {
case pb.Message_GET_VALUE:
pi := host.Peerstore().PeerInfo(peers[1])
pi := host.Peerstore().PeerInfo(hosts[1].ID())
resp := &pb.Message{
Type: pmes.Type,
CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.PeerInfo{pi}),

View File

@ -148,7 +148,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
}
if closest == nil {
log.Warningf("handleFindPeer: could not find anything.")
log.Warningf("%s handleFindPeer %s: could not find anything.", dht.self, p)
return resp, nil
}
@ -167,25 +167,31 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess
func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel())
key := u.Key(pmes.GetKey())
// debug logging niceness.
reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key)
log.Debugf("%s begin", reqDesc)
defer log.Debugf("%s end", reqDesc)
// check if we have this value, to add ourselves as provider.
log.Debugf("handling GetProviders: '%s'", u.Key(pmes.GetKey()))
dsk := u.Key(pmes.GetKey()).DsKey()
has, err := dht.datastore.Has(dsk)
has, err := dht.datastore.Has(key.DsKey())
if err != nil && err != ds.ErrNotFound {
log.Errorf("unexpected datastore error: %v\n", err)
has = false
}
// setup providers
providers := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey()))
providers := dht.providers.GetProviders(ctx, key)
if has {
providers = append(providers, dht.self)
log.Debugf("%s have the value. added self as provider", reqDesc)
}
if providers != nil && len(providers) > 0 {
infos := peer.PeerInfos(dht.peerstore, providers)
resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
log.Debugf("%s have %d providers: %s", reqDesc, len(providers), infos)
}
// Also send closer peers.
@ -193,6 +199,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.
if closer != nil {
infos := peer.PeerInfos(dht.peerstore, providers)
resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos)
log.Debugf("%s have %d closer peers: %s", reqDesc, len(closer), infos)
}
return resp, nil
@ -206,7 +213,7 @@ type providerInfo struct {
func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) {
key := u.Key(pmes.GetKey())
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key))
log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key)
// add provider should use the address given in the message
pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())

View File

@ -7,6 +7,7 @@ import (
queue "github.com/jbenet/go-ipfs/p2p/peer/queue"
"github.com/jbenet/go-ipfs/routing"
u "github.com/jbenet/go-ipfs/util"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
pset "github.com/jbenet/go-ipfs/util/peerset"
todoctr "github.com/jbenet/go-ipfs/util/todocounter"
@ -55,32 +56,18 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, e
}
type dhtQueryRunner struct {
query *dhtQuery // query to run
peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x
peersToQuery *queue.ChanQueue // peers remaining to be queried
peersRemaining todoctr.Counter // peersToQuery + currently processing
// the query to run
query *dhtQuery
result *dhtQueryResult // query result
errs []error // result errors. maybe should be a map[peer.ID]error
// peersToQuery is a list of peers remaining to query
peersToQuery *queue.ChanQueue
rateLimit chan struct{} // processing semaphore
log eventlog.EventLogger
// peersSeen are all the peers queried. used to prevent querying same peer 2x
peersSeen *pset.PeerSet
// rateLimit is a channel used to rate limit our processing (semaphore)
rateLimit chan struct{}
// peersRemaining is a counter of peers remaining (toQuery + processing)
peersRemaining todoctr.Counter
// context group
cg ctxgroup.ContextGroup
// result
result *dhtQueryResult
// result errors
errs []error
// lock for concurrent access to fields
sync.RWMutex
}
@ -96,6 +83,11 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner {
}
func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
log := log.Prefix("dht(%s).Query(%s).Run(%d)", r.query.dht.self, r.query.key, len(peers))
r.log = log
log.Debug("enter")
defer log.Debug("end")
log.Debugf("Run query with %d peers.", len(peers))
if len(peers) == 0 {
log.Warning("Running query with no peers!")
@ -115,6 +107,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
// go do this thing.
// do it as a child func to make sure Run exits
// ONLY AFTER spawn workers has exited.
log.Debugf("go spawn workers")
r.cg.AddChildFunc(r.spawnWorkers)
// so workers are working.
@ -124,41 +117,45 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) {
select {
case <-r.peersRemaining.Done():
log.Debug("all peers ended")
r.cg.Close()
r.RLock()
defer r.RUnlock()
if len(r.errs) > 0 {
err = r.errs[0]
err = r.errs[0] // take the first?
}
case <-r.cg.Closed():
log.Debug("r.cg.Closed()")
r.RLock()
defer r.RUnlock()
err = r.cg.Context().Err() // collect the error.
}
if r.result != nil && r.result.success {
log.Debug("success: %s", r.result)
return r.result, nil
}
log.Debug("failure: %s", err)
return nil, err
}
func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
// if new peer is ourselves...
if next == r.query.dht.self {
r.log.Debug("addPeerToQuery skip self")
return
}
if !r.peersSeen.TryAdd(next) {
log.Debug("query peer was already seen")
r.log.Debugf("addPeerToQuery skip seen %s", next)
return
}
log.Debugf("adding peer to query: %v", next)
// do this after unlocking to prevent possible deadlocks.
r.log.Debugf("addPeerToQuery adding %s", next)
r.peersRemaining.Increment(1)
select {
case r.peersToQuery.EnqChan <- next:
@ -167,6 +164,10 @@ func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) {
}
func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
log := r.log.Prefix("spawnWorkers")
log.Debugf("begin")
defer log.Debugf("end")
for {
select {
@ -192,7 +193,9 @@ func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) {
}
func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
log.Debugf("spawned worker for: %v", p)
log := r.log.Prefix("queryPeer(%s)", p)
log.Debugf("spawned")
defer log.Debugf("finished")
// make sure we rate limit concurrency.
select {
@ -203,34 +206,36 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
}
// ok let's do this!
log.Debugf("running worker for: %v", p)
log.Debugf("running")
// make sure we do this when we exit
defer func() {
// signal we're done proccessing peer p
log.Debugf("completing worker for: %v", p)
log.Debugf("completed")
r.peersRemaining.Decrement(1)
r.rateLimit <- struct{}{}
}()
// make sure we're connected to the peer.
if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 {
log.Infof("worker for: %v -- not connected. dial start", p)
log.Infof("not connected. dialing.")
pi := peer.PeerInfo{ID: p}
if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil {
log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err)
log.Debugf("Error connecting: %s", err)
r.Lock()
r.errs = append(r.errs, err)
r.Unlock()
return
}
log.Infof("worker for: %v -- not connected. dial success!", p)
log.Debugf("connected. dial success.")
}
// finally, run the query against this peer
log.Debugf("query running")
res, err := r.query.qfunc(cg.Context(), p)
log.Debugf("query finished")
if err != nil {
log.Debugf("ERROR worker for: %v %v", p, err)
@ -239,7 +244,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) {
r.Unlock()
} else if res.success {
log.Debugf("SUCCESS worker for: %v", p, res)
log.Debugf("SUCCESS worker for: %v %s", p, res)
r.Lock()
r.result = res
r.Unlock()

View File

@ -191,8 +191,8 @@ func (dht *IpfsDHT) verifyRecord(r *pb.Record, pk ci.PubKey) error {
// Now, check validity func
parts := strings.Split(r.GetKey(), "/")
if len(parts) < 3 {
log.Errorf("Record had bad key: %s", u.Key(r.GetKey()))
return ErrBadRecord
log.Infof("Record key does not have validator: %s", u.Key(r.GetKey()))
return nil
}
fnc, ok := dht.Validators[parts[1]]

View File

@ -65,25 +65,29 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
log.Debugf("Get Value [%s]", key)
log := dht.log().Prefix("GetValue(%s)", key)
log.Debugf("start")
defer log.Debugf("end")
// If we have it local, dont bother doing an RPC!
val, err := dht.getLocal(key)
if err == nil {
log.Debug("Got value locally!")
log.Debug("have it locally")
return val, nil
}
// get closest peers in the routing table
rtp := dht.routingTable.ListPeers()
log.Debugf("peers in rt: %s", len(rtp), rtp)
closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize)
if closest == nil || len(closest) == 0 {
log.Warning("Got no peers back from routing table!")
log.Warning("No peers from routing table!")
return nil, errors.Wrap(kb.ErrLookupFailure)
}
// setup the Query
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
val, peers, err := dht.getValueOrPeers(ctx, p, key)
if err != nil {
return nil, err
@ -116,9 +120,13 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) {
// Provide makes this node announce that it can provide a value for the given key
func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
log := dht.log().Prefix("Provide(%s)", key)
log.Debugf("start", key)
log.Event(ctx, "provideBegin", &key)
defer log.Debugf("end", key)
defer log.Event(ctx, "provideEnd", &key)
// add self locally
dht.providers.AddProvider(key, dht.self)
peers, err := dht.getClosestPeers(ctx, key)
@ -131,6 +139,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error {
wg.Add(1)
go func(p peer.ID) {
defer wg.Done()
log.Debugf("putProvider(%s, %s)", key, p)
err := dht.putProvider(ctx, p, string(key))
if err != nil {
log.Error(err)
@ -230,9 +239,12 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int
}
func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) {
log := dht.log().Prefix("FindProviders(%s)", key)
defer close(peerOut)
defer log.Event(ctx, "findProviders end", &key)
log.Debugf("%s FindProviders %s", dht.self, key)
log.Debug("begin")
defer log.Debug("begin")
ps := pset.NewLimited(count)
provs := dht.providers.GetProviders(ctx, key)
@ -254,17 +266,24 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
// setup the Query
query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) {
log := log.Prefix("Query(%s)", p)
log.Debugf("begin")
defer log.Debugf("end")
pmes, err := dht.findProvidersSingle(ctx, p, key)
if err != nil {
return nil, err
}
log.Debugf("%d provider entries", len(pmes.GetProviderPeers()))
provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers())
log.Debugf("%d provider entries decoded", len(provs))
// Add unique providers from request, up to 'count'
for _, prov := range provs {
log.Debugf("got provider: %s", prov)
if ps.TryAdd(prov.ID) {
log.Debugf("using provider: %s", prov)
select {
case peerOut <- prov:
case <-ctx.Done():
@ -273,6 +292,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
}
}
if ps.Size() >= count {
log.Debugf("got enough providers (%d/%d)", ps.Size(), count)
return &dhtQueryResult{success: true}, nil
}
}
@ -280,13 +300,14 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co
// Give closer peers back to the query to be queried
closer := pmes.GetCloserPeers()
clpeers := pb.PBPeersToPeerInfos(closer)
log.Debugf("got closer peers: %d %s", len(clpeers), clpeers)
return &dhtQueryResult{closerPeers: clpeers}, nil
})
peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue)
_, err := query.Run(ctx, peers)
if err != nil {
log.Errorf("FindProviders Query error: %s", err)
log.Errorf("Query error: %s", err)
}
}

View File

@ -14,7 +14,7 @@ all: clean deps $(T) aggregate
clean:
@echo "*** $@ ***"
-rm -r test-results
-rm -rf test-results
$(T):
@echo "*** $@ ***"

View File

@ -2,14 +2,19 @@ package eventlog
import (
"github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
logging "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging"
"github.com/jbenet/go-ipfs/util"
prelog "github.com/jbenet/go-ipfs/util/prefixlog"
)
// EventLogger extends the StandardLogger interface to allow for log items
// containing structured metadata
type EventLogger interface {
StandardLogger
prelog.StandardLogger
// Prefix is like PrefixLogger.Prefix. We override it here
// because the type changes (we return EventLogger).
// It's what happens when you wrap interfaces.
Prefix(fmt string, args ...interface{}) EventLogger
// Event merges structured data from the provided inputs into a single
// machine-readable log event.
@ -27,43 +32,27 @@ type EventLogger interface {
Event(ctx context.Context, event string, m ...Loggable)
}
// StandardLogger provides API compatibility with standard printf loggers
// eg. go-logging
type StandardLogger interface {
Critical(args ...interface{})
Criticalf(format string, args ...interface{})
Debug(args ...interface{})
Debugf(format string, args ...interface{})
Error(args ...interface{})
Errorf(format string, args ...interface{})
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
Info(args ...interface{})
Infof(format string, args ...interface{})
Notice(args ...interface{})
Noticef(format string, args ...interface{})
Panic(args ...interface{})
Panicf(format string, args ...interface{})
Warning(args ...interface{})
Warningf(format string, args ...interface{})
}
// Logger retrieves an event logger by name
func Logger(system string) EventLogger {
// TODO if we would like to adjust log levels at run-time. Store this event
// logger in a map (just like the util.Logger impl)
return &eventLogger{system: system, Logger: util.Logger(system)}
return &eventLogger{system: system, PrefixLogger: prelog.Logger(system)}
}
// eventLogger implements the EventLogger and wraps a go-logging Logger
type eventLogger struct {
*logging.Logger
prelog.PrefixLogger
system string
// TODO add log-level
}
func (el *eventLogger) Prefix(fmt string, args ...interface{}) EventLogger {
l := el.PrefixLogger.Prefix(fmt, args...)
return &eventLogger{system: el.system, PrefixLogger: l}
}
func (el *eventLogger) Event(ctx context.Context, event string, metadata ...Loggable) {
// Collect loggables for later logging

151
util/prefixlog/prefixlog.go Normal file
View File

@ -0,0 +1,151 @@
package eventlog
import (
"strings"
"github.com/jbenet/go-ipfs/util"
)
// StandardLogger provides API compatibility with standard printf loggers
// eg. go-logging
type StandardLogger interface {
Critical(args ...interface{})
Criticalf(format string, args ...interface{})
Debug(args ...interface{})
Debugf(format string, args ...interface{})
Error(args ...interface{})
Errorf(format string, args ...interface{})
Fatal(args ...interface{})
Fatalf(format string, args ...interface{})
Info(args ...interface{})
Infof(format string, args ...interface{})
Notice(args ...interface{})
Noticef(format string, args ...interface{})
Panic(args ...interface{})
Panicf(format string, args ...interface{})
Warning(args ...interface{})
Warningf(format string, args ...interface{})
}
// StandardLogger provides API compatibility with standard printf loggers
// eg. go-logging
type PrefixLogger interface {
StandardLogger
Format() string
Args() []interface{}
Prefix(fmt string, args ...interface{}) PrefixLogger
}
// Logger retrieves an event logger by name
func Logger(system string) PrefixLogger {
// TODO if we would like to adjust log levels at run-time. Store this event
// logger in a map (just like the util.Logger impl)
logger := util.Logger(system)
return Prefix(logger, "")
}
func Prefix(l StandardLogger, format string, args ...interface{}) PrefixLogger {
return &prefixLogger{logger: l, format: format, args: args}
}
type prefixLogger struct {
logger StandardLogger
format string
args []interface{}
}
func (pl *prefixLogger) Format() string {
return pl.format
}
func (pl *prefixLogger) Args() []interface{} {
return pl.args
}
func (pl *prefixLogger) Prefix(fmt string, args ...interface{}) PrefixLogger {
return Prefix(pl, fmt, args...)
}
func (pl *prefixLogger) prepend(fmt string, args []interface{}) (string, []interface{}) {
together := make([]interface{}, 0, len(pl.args)+len(args))
together = append(together, pl.args...)
together = append(together, args...)
if len(pl.format) > 0 {
fmt = pl.format + " " + fmt
}
return fmt, together
}
func valfmtn(count int) string {
s := strings.Repeat("%v ", count)
s = s[:len(s)-1] // remove last space
return s
}
type logFunc func(args ...interface{})
type logFuncf func(fmt string, args ...interface{})
func (pl *prefixLogger) logFunc(f logFuncf, args ...interface{}) {
// need to actually use the format version, with extra fmt strings appended
fmt := valfmtn(len(args))
pl.logFuncf(f, fmt, args...)
}
func (pl *prefixLogger) logFuncf(f logFuncf, format string, args ...interface{}) {
format, args = pl.prepend(format, args)
f(format, args...)
}
func (pl *prefixLogger) Critical(args ...interface{}) {
pl.logFunc(pl.logger.Criticalf, args...)
}
func (pl *prefixLogger) Debug(args ...interface{}) {
pl.logFunc(pl.logger.Debugf, args...)
}
func (pl *prefixLogger) Error(args ...interface{}) {
pl.logFunc(pl.logger.Errorf, args...)
}
func (pl *prefixLogger) Fatal(args ...interface{}) {
pl.logFunc(pl.logger.Fatalf, args...)
}
func (pl *prefixLogger) Info(args ...interface{}) {
pl.logFunc(pl.logger.Infof, args...)
}
func (pl *prefixLogger) Notice(args ...interface{}) {
pl.logFunc(pl.logger.Noticef, args...)
}
func (pl *prefixLogger) Panic(args ...interface{}) {
pl.logFunc(pl.logger.Panicf, args...)
}
func (pl *prefixLogger) Warning(args ...interface{}) {
pl.logFunc(pl.logger.Warningf, args...)
}
func (pl *prefixLogger) Criticalf(format string, args ...interface{}) {
pl.logFuncf(pl.logger.Criticalf, format, args...)
}
func (pl *prefixLogger) Debugf(format string, args ...interface{}) {
pl.logFuncf(pl.logger.Debugf, format, args...)
}
func (pl *prefixLogger) Errorf(format string, args ...interface{}) {
pl.logFuncf(pl.logger.Errorf, format, args...)
}
func (pl *prefixLogger) Fatalf(format string, args ...interface{}) {
pl.logFuncf(pl.logger.Fatalf, format, args...)
}
func (pl *prefixLogger) Infof(format string, args ...interface{}) {
pl.logFuncf(pl.logger.Infof, format, args...)
}
func (pl *prefixLogger) Noticef(format string, args ...interface{}) {
pl.logFuncf(pl.logger.Noticef, format, args...)
}
func (pl *prefixLogger) Panicf(format string, args ...interface{}) {
pl.logFuncf(pl.logger.Panicf, format, args...)
}
func (pl *prefixLogger) Warningf(format string, args ...interface{}) {
pl.logFuncf(pl.logger.Warningf, format, args...)
}

View File

@ -29,11 +29,11 @@ func init() {
ZeroLocalTCPAddress = maddr
}
func RandKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) {
func RandTestKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) {
return ci.GenerateKeyPairWithReader(ci.RSA, bits, u.NewTimeSeededRand())
}
func SeededKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) {
func SeededTestKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) {
return ci.GenerateKeyPairWithReader(ci.RSA, 512, u.NewSeededRand(seed))
}
@ -142,7 +142,7 @@ func RandPeerNetParams() (*PeerNetParams, error) {
var p PeerNetParams
var err error
p.Addr = ZeroLocalTCPAddress
p.PrivKey, p.PubKey, err = RandKeyPair(512)
p.PrivKey, p.PubKey, err = RandTestKeyPair(512)
if err != nil {
return nil, err
}