diff --git a/Makefile b/Makefile index a51da1a15..1376710df 100644 --- a/Makefile +++ b/Makefile @@ -13,20 +13,25 @@ vendor: godep install: cd cmd/ipfs && go install -test: test_go test_sharness +############################################################## +# tests targets + +test: test_expensive + +test_short: test_go_short test_sharness_short test_expensive: test_go_expensive test_sharness_expensive test_docker: cd dockertest/ && make -test_go: +test_go_short: go test -test.short ./... test_go_expensive: go test ./... -test_sharness: +test_sharness_short: cd test/ && make test_sharness_expensive: diff --git a/blockservice/mock.go b/blockservice/mock.go index 57432178e..73fcdf2fc 100644 --- a/blockservice/mock.go +++ b/blockservice/mock.go @@ -12,7 +12,7 @@ import ( // Mocks returns |n| connected mock Blockservices func Mocks(t *testing.T, n int) []*BlockService { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) - sg := bitswap.NewSessionGenerator(net) + sg := bitswap.NewTestSessionGenerator(net) instances := sg.Instances(n) diff --git a/cmd/ipfs/init.go b/cmd/ipfs/init.go index 5d2c3e773..fc0654d7b 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/init.go @@ -186,7 +186,7 @@ func initConfig(configFilename string, dspathOverride string, nBitsForKeypair in Addresses: config.Addresses{ Swarm: []string{ "/ip4/0.0.0.0/tcp/4001", - "/ip4/0.0.0.0/udp/4002/utp", + // "/ip4/0.0.0.0/udp/4002/utp", // disabled for now. }, API: "/ip4/127.0.0.1/tcp/5001", }, diff --git a/core/mock.go b/core/mock.go index b448b2517..e4c15540b 100644 --- a/core/mock.go +++ b/core/mock.go @@ -10,11 +10,10 @@ import ( "github.com/jbenet/go-ipfs/exchange/offline" mdag "github.com/jbenet/go-ipfs/merkledag" nsys "github.com/jbenet/go-ipfs/namesys" - ci "github.com/jbenet/go-ipfs/p2p/crypto" mocknet "github.com/jbenet/go-ipfs/p2p/net/mock" peer "github.com/jbenet/go-ipfs/p2p/peer" path "github.com/jbenet/go-ipfs/path" - dht "github.com/jbenet/go-ipfs/routing/dht" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" ds2 "github.com/jbenet/go-ipfs/util/datastore2" testutil "github.com/jbenet/go-ipfs/util/testutil" ) @@ -29,23 +28,19 @@ func NewMockNode() (*IpfsNode, error) { nd := new(IpfsNode) // Generate Identity - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024) - if err != nil { - return nil, err - } - - p, err := peer.IDFromPublicKey(pk) + ident, err := testutil.RandIdentity() if err != nil { return nil, err } + p := ident.ID() nd.Identity = p - nd.PrivateKey = sk + nd.PrivateKey = ident.PrivateKey() nd.Peerstore = peer.NewPeerstore() - nd.Peerstore.AddPrivKey(p, sk) - nd.Peerstore.AddPubKey(p, pk) + nd.Peerstore.AddPrivKey(p, ident.PrivateKey()) + nd.Peerstore.AddPubKey(p, ident.PublicKey()) - nd.PeerHost, err = mocknet.New(ctx).AddPeer(sk, testutil.RandLocalTCPAddress()) // effectively offline + nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline if err != nil { return nil, err } @@ -55,8 +50,7 @@ func NewMockNode() (*IpfsNode, error) { nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore)) // Routing - dht := dht.NewDHT(ctx, nd.PeerHost, nd.Datastore) - nd.Routing = dht + nd.Routing = mockrouting.NewServer().Client(ident) // Bitswap bstore := blockstore.NewBlockstore(nd.Datastore) @@ -68,7 +62,7 @@ func NewMockNode() (*IpfsNode, error) { nd.DAG = mdag.NewDAGService(bserv) // Namespace resolver - nd.Namesys = nsys.NewNameSystem(dht) + nd.Namesys = nsys.NewNameSystem(nd.Routing) // Path resolver nd.Resolver = &path.Resolver{DAG: nd.DAG} diff --git a/dockertest/bootstrap/config b/dockertest/bootstrap/config index b9fac5503..58ba1abfa 100644 --- a/dockertest/bootstrap/config +++ b/dockertest/bootstrap/config @@ -9,8 +9,7 @@ }, "Addresses": { "Swarm": [ - "/ip4/0.0.0.0/tcp/4011", - "/ip4/0.0.0.0/udp/4012/utp" + "/ip4/0.0.0.0/tcp/4011" ], "API": "/ip4/127.0.0.1/tcp/5001" }, diff --git a/dockertest/client/config b/dockertest/client/config index afb9f8d77..f3d5e9f65 100644 --- a/dockertest/client/config +++ b/dockertest/client/config @@ -2,8 +2,7 @@ "Addresses": { "API": "/ip4/127.0.0.1/tcp/5001", "Swarm": [ - "/ip4/0.0.0.0/tcp/4031", - "/ip4/0.0.0.0/udp/4032/utp" + "/ip4/0.0.0.0/tcp/4031" ] }, "Bootstrap": [ diff --git a/dockertest/client/run.sh b/dockertest/client/run.sh index 4de79278c..edbf86431 100644 --- a/dockertest/client/run.sh +++ b/dockertest/client/run.sh @@ -1,14 +1,16 @@ ipfs bootstrap add /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_TCP_PORT/QmNXuBh8HFsWq68Fid8dMbGNQTh7eG6hV9rr1fQyfmfomE + +echo "dockertest> starting client daemon" ipfs daemon & sleep 3 while [ ! -f /data/idtiny ] do - echo waiting for server to add the file... + echo "dockertest> waiting for server to add the file..." sleep 1 done -echo client found file with hash: $(cat /data/idtiny) +echo "dockertest> client found file with hash:" $(cat /data/idtiny) ipfs cat $(cat /data/idtiny) > filetiny @@ -23,10 +25,10 @@ fi while [ ! -f /data/idrand ] do - echo waiting for server to add the file... + echo "dockertest> waiting for server to add the file..." sleep 1 done -echo client found file with hash: $(cat /data/idrand) +echo "dockertest> client found file with hash:" $(cat /data/idrand) cat /data/idrand @@ -44,4 +46,4 @@ if (($? > 0)); then exit 1 fi -echo "success" +echo "dockertest> success" diff --git a/dockertest/server/config b/dockertest/server/config index 9d04b820b..59c7fecdf 100644 --- a/dockertest/server/config +++ b/dockertest/server/config @@ -2,8 +2,7 @@ "Addresses": { "API": "/ip4/127.0.0.1/tcp/5001", "Swarm": [ - "/ip4/0.0.0.0/tcp/4021", - "/ip4/0.0.0.0/udp/4022/utp" + "/ip4/0.0.0.0/tcp/4021" ] }, "Bootstrap": [ diff --git a/dockertest/server/run.sh b/dockertest/server/run.sh index f71b7ac22..6dab347d2 100644 --- a/dockertest/server/run.sh +++ b/dockertest/server/run.sh @@ -3,6 +3,7 @@ ipfs bootstrap add /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_T # wait for daemon to start/bootstrap # alternatively use ipfs swarm connect +echo "dockertest> starting server daemon" ipfs daemon & sleep 3 # TODO instead of bootrapping: ipfs swarm connect /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_TCP_PORT/QmNXuBh8HFsWq68Fid8dMbGNQTh7eG6hV9rr1fQyfmfomE @@ -10,11 +11,11 @@ sleep 3 # must mount this volume from data container ipfs add -q /data/filetiny > tmptiny mv tmptiny /data/idtiny -echo added tiny file. hash is $(cat /data/idtiny) +echo "dockertest> added tiny file. hash is" $(cat /data/idtiny) ipfs add -q /data/filerand > tmprand mv tmprand /data/idrand -echo added rand file. hash is $(cat /data/idrand) +echo "dockertest> added rand file. hash is" $(cat /data/idrand) # allow ample time for the client to pull the data sleep 10000000 diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index fe20a406a..a883e4b03 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -61,6 +61,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, }() bs := &bitswap{ + self: p, blockstore: bstore, cancelFunc: cancelFunc, notifications: notif, @@ -79,6 +80,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, // bitswap instances implement the bitswap protocol. type bitswap struct { + // the ID of the peer to act on behalf of + self peer.ID + // network delivers messages on behalf of the session network bsnet.BitSwapNetwork @@ -104,6 +108,7 @@ type bitswap struct { // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) { + log := log.Prefix("bitswap(%s).GetBlock(%s)", bs.self, k) // Any async work initiated by this function must end when this function // returns. To ensure this, derive a new context. Note that it is okay to @@ -116,10 +121,12 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest")) log.Event(ctx, "GetBlockRequestBegin", &k) + log.Debugf("GetBlockRequestBegin") defer func() { cancelFunc() log.Event(ctx, "GetBlockRequestEnd", &k) + log.Debugf("GetBlockRequestEnd") }() promise, err := bs.GetBlocks(ctx, []u.Key{k}) @@ -166,67 +173,109 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return bs.network.Provide(ctx, blk.Key()) } -func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.ID) error { +func (bs *bitswap) sendWantlistMsgToPeer(ctx context.Context, m bsmsg.BitSwapMessage, p peer.ID) error { + log := log.Prefix("bitswap(%s).bitswap.sendWantlistMsgToPeer(%d, %s)", bs.self, len(m.Wantlist()), p) + + log.Debug("sending wantlist") + if err := bs.send(ctx, p, m); err != nil { + log.Errorf("send wantlist error: %s", err) + return err + } + log.Debugf("send wantlist success") + return nil +} + +func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { if peers == nil { panic("Cant send wantlist to nil peerchan") } - message := bsmsg.New() - for _, wanted := range bs.wantlist.Entries() { - message.AddEntry(wanted.Key, wanted.Priority) - } + + log := log.Prefix("bitswap(%s).sendWantlistMsgToPeers(%d)", bs.self, len(m.Wantlist())) + log.Debugf("begin") + defer log.Debugf("end") + + set := pset.New() wg := sync.WaitGroup{} for peerToQuery := range peers { log.Event(ctx, "PeerToQuery", peerToQuery) + + if !set.TryAdd(peerToQuery) { //Do once per peer + log.Debugf("%s skipped (already sent)", peerToQuery) + continue + } + log.Debugf("%s sending", peerToQuery) + wg.Add(1) go func(p peer.ID) { defer wg.Done() - if err := bs.send(ctx, p, message); err != nil { - log.Error(err) - return - } + bs.sendWantlistMsgToPeer(ctx, m, p) }(peerToQuery) } wg.Wait() return nil } -func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantlist.ThreadSafe) { +func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { + message := bsmsg.New() + message.SetFull(true) + for _, wanted := range bs.wantlist.Entries() { + message.AddEntry(wanted.Key, wanted.Priority) + } + return bs.sendWantlistMsgToPeers(ctx, message, peers) +} + +func (bs *bitswap) sendWantlistToProviders(ctx context.Context) { + log := log.Prefix("bitswap(%s).sendWantlistToProviders ", bs.self) + log.Debugf("begin") + defer log.Debugf("end") + ctx, cancel := context.WithCancel(ctx) defer cancel() - message := bsmsg.New() - message.SetFull(true) - for _, e := range bs.wantlist.Entries() { - message.AddEntry(e.Key, e.Priority) - } - - set := pset.New() + // prepare a channel to hand off to sendWantlistToPeers + sendToPeers := make(chan peer.ID) // Get providers for all entries in wantlist (could take a while) wg := sync.WaitGroup{} - for _, e := range wantlist.Entries() { + for _, e := range bs.wantlist.Entries() { wg.Add(1) go func(k u.Key) { defer wg.Done() + + log := log.Prefix("(entry: %s) ", k) + log.Debug("asking dht for providers") + child, _ := context.WithTimeout(ctx, providerRequestTimeout) providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) for prov := range providers { - if set.TryAdd(prov) { //Do once per peer - bs.send(ctx, prov, message) - } + log.Debugf("dht returned provider %s. send wantlist", prov) + sendToPeers <- prov } }(e.Key) } - wg.Wait() + + go func() { + wg.Wait() // make sure all our children do finish. + close(sendToPeers) + }() + + err := bs.sendWantlistToPeers(ctx, sendToPeers) + if err != nil { + log.Errorf("sendWantlistToPeers error: %s", err) + } } func (bs *bitswap) taskWorker(ctx context.Context) { + log := log.Prefix("bitswap(%s).taskWorker", bs.self) for { select { case <-ctx.Done(): + log.Debugf("exiting") return case envelope := <-bs.engine.Outbox(): + log.Debugf("message to %s sending...", envelope.Peer) bs.send(ctx, envelope.Peer, envelope.Message) + log.Debugf("message to %s sent", envelope.Peer) } } } @@ -243,7 +292,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { select { case <-broadcastSignal: // Resend unfulfilled wantlist keys - bs.sendWantlistToProviders(ctx, bs.wantlist) + bs.sendWantlistToProviders(ctx) broadcastSignal = time.After(rebroadcastDelay.Get()) case ks := <-bs.batchRequests: if len(ks) == 0 { @@ -262,7 +311,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { // newer bitswap strategies. child, _ := context.WithTimeout(ctx, providerRequestTimeout) providers := bs.network.FindProvidersAsync(child, ks[0], maxProvidersPerRequest) - err := bs.sendWantListTo(ctx, providers) + err := bs.sendWantlistToPeers(ctx, providers) if err != nil { log.Errorf("error sending wantlist: %s", err) } @@ -336,11 +385,6 @@ func (bs *bitswap) ReceiveError(err error) { // send strives to ensure that accounting is always performed when a message is // sent func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error { - log.Event(ctx, "DialPeer", p) - err := bs.network.DialPeer(ctx, p) - if err != nil { - return errors.Wrap(err) - } if err := bs.network.SendMessage(ctx, p, m); err != nil { return errors.Wrap(err) } diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index af6cb138c..64d5ead52 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -11,10 +11,10 @@ import ( blocks "github.com/jbenet/go-ipfs/blocks" blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" + p2ptestutil "github.com/jbenet/go-ipfs/p2p/test/util" mockrouting "github.com/jbenet/go-ipfs/routing/mock" u "github.com/jbenet/go-ipfs/util" delay "github.com/jbenet/go-ipfs/util/delay" - "github.com/jbenet/go-ipfs/util/testutil" ) // FIXME the tests are really sensitive to the network delay. fix them to work @@ -25,7 +25,7 @@ func TestClose(t *testing.T) { // TODO t.Skip("TODO Bitswap's Close implementation is a WIP") vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sesgen := NewSessionGenerator(vnet) + sesgen := NewTestSessionGenerator(vnet) defer sesgen.Close() bgen := blocksutil.NewBlockGenerator() @@ -39,7 +39,7 @@ func TestClose(t *testing.T) { func TestGetBlockTimeout(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - g := NewSessionGenerator(net) + g := NewTestSessionGenerator(net) defer g.Close() self := g.Next() @@ -57,11 +57,11 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this rs := mockrouting.NewServer() net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) - g := NewSessionGenerator(net) + g := NewTestSessionGenerator(net) defer g.Close() block := blocks.NewBlock([]byte("block")) - pinfo := testutil.RandIdentityOrFatal(t) + pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network solo := g.Next() @@ -81,7 +81,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - g := NewSessionGenerator(net) + g := NewTestSessionGenerator(net) defer g.Close() hasBlock := g.Next() @@ -134,7 +134,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.SkipNow() } net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewSessionGenerator(net) + sg := NewTestSessionGenerator(net) defer sg.Close() bg := blocksutil.NewBlockGenerator() @@ -198,7 +198,7 @@ func TestSendToWantingPeer(t *testing.T) { } net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewSessionGenerator(net) + sg := NewTestSessionGenerator(net) defer sg.Close() bg := blocksutil.NewBlockGenerator() @@ -243,7 +243,7 @@ func TestSendToWantingPeer(t *testing.T) { func TestBasicBitswap(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewSessionGenerator(net) + sg := NewTestSessionGenerator(net) bg := blocksutil.NewBlockGenerator() t.Log("Test a few nodes trying to get one file with a lot of blocks") diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 582d96e08..e4b2ab832 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -8,7 +8,7 @@ import ( bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" peer "github.com/jbenet/go-ipfs/p2p/peer" - u "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) // TODO consider taking responsibility for other types of requests. For @@ -41,7 +41,7 @@ import ( // whatever it sees fit to produce desired outcomes (get wanted keys // quickly, maintain good relationships with peers, etc). -var log = u.Logger("engine") +var log = eventlog.Logger("engine") const ( sizeOutboxChan = 4 @@ -91,6 +91,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { } func (e *Engine) taskWorker(ctx context.Context) { + log := log.Prefix("bitswap.Engine.taskWorker") for { nextTask := e.peerRequestQueue.Pop() if nextTask == nil { @@ -98,11 +99,16 @@ func (e *Engine) taskWorker(ctx context.Context) { // Wait until there are! select { case <-ctx.Done(): + log.Debugf("exiting: %s", ctx.Err()) return case <-e.workSignal: + log.Debugf("woken up") } continue } + log := log.Prefix("%s", nextTask) + log.Debugf("processing") + block, err := e.bs.Get(nextTask.Entry.Key) if err != nil { log.Warning("engine: task exists to send block, but block is not in blockstore") @@ -113,10 +119,12 @@ func (e *Engine) taskWorker(ctx context.Context) { m := bsmsg.New() m.AddBlock(block) // TODO: maybe add keys from our wantlist? + log.Debugf("sending...") select { case <-ctx.Done(): return case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}: + log.Debugf("sent") } } } @@ -140,16 +148,21 @@ func (e *Engine) Peers() []peer.ID { // MessageReceived performs book-keeping. Returns error if passed invalid // arguments. func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { + log := log.Prefix("bitswap.Engine.MessageReceived(%s)", p) + log.Debugf("enter. %d entries %d blocks", len(m.Wantlist()), len(m.Blocks())) + defer log.Debugf("exit") + + if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 { + log.Info("superfluous message") + } + newWorkExists := false defer func() { if newWorkExists { - // Signal task generation to restart (if stopped!) - select { - case e.workSignal <- struct{}{}: - default: - } + e.signalNewWork() } }() + e.lock.Lock() defer e.lock.Unlock() @@ -157,11 +170,14 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { if m.Full() { l.wantList = wl.New() } + for _, entry := range m.Wantlist() { if entry.Cancel { + log.Debug("cancel", entry.Key) l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { + log.Debug("wants", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { newWorkExists = true @@ -172,6 +188,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, block := range m.Blocks() { // FIXME extract blocks.NumBytes(block) or block.NumBytes() method + log.Debug("got block %s %d bytes", block.Key(), len(block.Data)) l.ReceivedBytes(len(block.Data)) for _, l := range e.ledgerMap { if l.WantListContains(block.Key()) { @@ -222,3 +239,11 @@ func (e *Engine) findOrCreate(p peer.ID) *ledger { } return l } + +func (e *Engine) signalNewWork() { + // Signal task generation to restart (if stopped!) + select { + case e.workSignal <- struct{}{}: + default: + } +} diff --git a/exchange/bitswap/decision/taskqueue.go b/exchange/bitswap/decision/taskqueue.go index 11af3db35..659e287d0 100644 --- a/exchange/bitswap/decision/taskqueue.go +++ b/exchange/bitswap/decision/taskqueue.go @@ -1,6 +1,7 @@ package decision import ( + "fmt" "sync" wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" @@ -30,6 +31,10 @@ type task struct { Trash bool } +func (t *task) String() string { + return fmt.Sprintf("", t.Target, t.Entry.Key, t.Trash) +} + // Push currently adds a new task to the end of the list func (tl *taskQueue) Push(entry wantlist.Entry, to peer.ID) { tl.lock.Lock() diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 7c34a352b..18bb1df83 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -14,9 +14,6 @@ var ProtocolBitswap protocol.ID = "/ipfs/bitswap" // BitSwapNetwork provides network connectivity for BitSwap sessions type BitSwapNetwork interface { - // DialPeer ensures there is a connection to peer. - DialPeer(context.Context, peer.ID) error - // SendMessage sends a BitSwap message to a peer. SendMessage( context.Context, diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 4e349dbed..ea98cc87f 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -2,15 +2,17 @@ package network import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" host "github.com/jbenet/go-ipfs/p2p/host" inet "github.com/jbenet/go-ipfs/p2p/net" peer "github.com/jbenet/go-ipfs/p2p/peer" routing "github.com/jbenet/go-ipfs/routing" util "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) -var log = util.Logger("bitswap_network") +var log = eventlog.Logger("bitswap_network") // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork { @@ -32,22 +34,34 @@ type impl struct { receiver Receiver } -func (bsnet *impl) DialPeer(ctx context.Context, p peer.ID) error { - return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}) -} - func (bsnet *impl) SendMessage( ctx context.Context, p peer.ID, outgoing bsmsg.BitSwapMessage) error { + log := log.Prefix("bitswap net SendMessage to %s", p) + + // ensure we're connected + //TODO(jbenet) move this into host.NewStream? + if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil { + return err + } + + log.Debug("opening stream") s, err := bsnet.host.NewStream(ProtocolBitswap, p) if err != nil { return err } defer s.Close() - return outgoing.ToNet(s) + log.Debug("sending") + if err := outgoing.ToNet(s); err != nil { + log.Errorf("error: %s", err) + return err + } + + log.Debug("sent") + return err } func (bsnet *impl) SendRequest( @@ -55,17 +69,36 @@ func (bsnet *impl) SendRequest( p peer.ID, outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) { + log := log.Prefix("bitswap net SendRequest to %s", p) + + // ensure we're connected + //TODO(jbenet) move this into host.NewStream? + if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil { + return nil, err + } + + log.Debug("opening stream") s, err := bsnet.host.NewStream(ProtocolBitswap, p) if err != nil { return nil, err } defer s.Close() + log.Debug("sending") if err := outgoing.ToNet(s); err != nil { + log.Errorf("error: %s", err) return nil, err } - return bsmsg.FromNet(s) + log.Debug("sent, now receiveing") + incoming, err := bsmsg.FromNet(s) + if err != nil { + log.Errorf("error: %s", err) + return incoming, err + } + + log.Debug("received") + return incoming, nil } func (bsnet *impl) SetDelegate(r Receiver) { @@ -82,6 +115,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) bsnet.host.Peerstore().AddAddresses(info.ID, info.Addrs) select { case <-ctx.Done(): + return case out <- info.ID: } } @@ -96,23 +130,21 @@ func (bsnet *impl) Provide(ctx context.Context, k util.Key) error { // handleNewStream receives a new stream from the network. func (bsnet *impl) handleNewStream(s inet.Stream) { + defer s.Close() if bsnet.receiver == nil { return } - go func() { - defer s.Close() - - received, err := bsmsg.FromNet(s) - if err != nil { - go bsnet.receiver.ReceiveError(err) - return - } - - p := s.Conn().RemotePeer() - ctx := context.Background() - bsnet.receiver.ReceiveMessage(ctx, p, received) - }() + received, err := bsmsg.FromNet(s) + if err != nil { + go bsnet.receiver.ReceiveError(err) + log.Errorf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err) + return + } + p := s.Conn().RemotePeer() + ctx := context.Background() + log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) + bsnet.receiver.ReceiveMessage(ctx, p, received) } diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index 9426176a2..639bb00d3 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -2,7 +2,6 @@ package bitswap import ( "errors" - "fmt" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" @@ -178,14 +177,6 @@ func (nc *networkClient) Provide(ctx context.Context, k util.Key) error { return nc.routing.Provide(ctx, k) } -func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error { - // no need to do anything because dialing isn't a thing in this test net. - if !nc.network.HasPeer(p) { - return fmt.Errorf("Peer not in network: %s", p) - } - return nil -} - func (nc *networkClient) SetDelegate(r bsnet.Receiver) { nc.Receiver = r } diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index dd96e5f46..95019f297 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -10,12 +10,14 @@ import ( exchange "github.com/jbenet/go-ipfs/exchange" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" peer "github.com/jbenet/go-ipfs/p2p/peer" + p2ptestutil "github.com/jbenet/go-ipfs/p2p/test/util" datastore2 "github.com/jbenet/go-ipfs/util/datastore2" delay "github.com/jbenet/go-ipfs/util/delay" testutil "github.com/jbenet/go-ipfs/util/testutil" ) -func NewSessionGenerator( +// WARNING: this uses RandTestBogusIdentity DO NOT USE for NON TESTS! +func NewTestSessionGenerator( net tn.Network) SessionGenerator { ctx, cancel := context.WithCancel(context.TODO()) return SessionGenerator{ @@ -41,7 +43,7 @@ func (g *SessionGenerator) Close() error { func (g *SessionGenerator) Next() Instance { g.seq++ - p, err := testutil.RandIdentity() + p, err := p2ptestutil.RandTestBogusIdentity() if err != nil { panic("FIXME") // TODO change signature } diff --git a/fuse/ipns/ipns_test.go b/fuse/ipns/ipns_test.go index 5811a0533..dab2a784b 100644 --- a/fuse/ipns/ipns_test.go +++ b/fuse/ipns/ipns_test.go @@ -248,7 +248,7 @@ func TestFastRepublish(t *testing.T) { // get first resolved hash log.Debug("publishing first hash") writeFileData(t, dataA, fname) // random - <-time.After(shortRepublishTimeout * 11 / 10) + <-time.After(shortRepublishTimeout * 2) log.Debug("resolving first hash") resolvedHash, err := node.Namesys.Resolve(pubkeyHash) if err != nil { diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 9a638ca2a..c9ea00ad2 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -229,7 +229,11 @@ func (n *dagService) Get(k u.Key) (*Node, error) { return nil, fmt.Errorf("dagService is nil") } - ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) + ctx, _ := context.WithTimeout(context.TODO(), time.Minute) + // we shouldn't use an arbitrary timeout here. + // since Get doesnt take in a context yet, we give a large upper bound. + // think of an http request. we want it to go on as long as the client requests it. + b, err := n.Blocks.GetBlock(ctx, k) if err != nil { return nil, err diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index 35851fc32..8e3214dfe 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -14,7 +14,7 @@ func TestRoutingResolve(t *testing.T) { resolver := NewRoutingResolver(d) publisher := NewRoutingPublisher(d) - privk, pubk, err := testutil.RandKeyPair(512) + privk, pubk, err := testutil.RandTestKeyPair(512) if err != nil { t.Fatal(err) } diff --git a/p2p/crypto/key_test.go b/p2p/crypto/key_test.go index fa2ad7799..f4a9599f8 100644 --- a/p2p/crypto/key_test.go +++ b/p2p/crypto/key_test.go @@ -9,7 +9,7 @@ import ( ) func TestRsaKeys(t *testing.T) { - sk, pk, err := tu.RandKeyPair(512) + sk, pk, err := tu.RandTestKeyPair(512) if err != nil { t.Fatal(err) } @@ -93,7 +93,7 @@ func testKeyEquals(t *testing.T, k Key) { t.Fatal("Key not equal to key with same bytes.") } - sk, pk, err := tu.RandKeyPair(512) + sk, pk, err := tu.RandTestKeyPair(512) if err != nil { t.Fatal(err) } diff --git a/p2p/net/mock/mock_net.go b/p2p/net/mock/mock_net.go index d380e5173..6146c4e52 100644 --- a/p2p/net/mock/mock_net.go +++ b/p2p/net/mock/mock_net.go @@ -2,14 +2,15 @@ package mocknet import ( "fmt" + "sort" "sync" - "time" ic "github.com/jbenet/go-ipfs/p2p/crypto" host "github.com/jbenet/go-ipfs/p2p/host" bhost "github.com/jbenet/go-ipfs/p2p/host/basic" inet "github.com/jbenet/go-ipfs/p2p/net" peer "github.com/jbenet/go-ipfs/p2p/peer" + p2putil "github.com/jbenet/go-ipfs/p2p/test/util" testutil "github.com/jbenet/go-ipfs/util/testutil" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -44,7 +45,7 @@ func New(ctx context.Context) Mocknet { } func (mn *mocknet) GenPeer() (host.Host, error) { - sk, _, err := testutil.SeededKeyPair(time.Now().UnixNano()) + sk, err := p2putil.RandTestBogusPrivateKey() if err != nil { return nil, err } @@ -90,6 +91,7 @@ func (mn *mocknet) Peers() []peer.ID { for _, n := range mn.nets { cp = append(cp, n.peer) } + sort.Sort(peer.IDSlice(cp)) return cp } @@ -115,6 +117,8 @@ func (mn *mocknet) Hosts() []host.Host { for _, h := range mn.hosts { cp = append(cp, h) } + + sort.Sort(hostSlice(cp)) return cp } @@ -126,6 +130,7 @@ func (mn *mocknet) Nets() []inet.Network { for _, n := range mn.nets { cp = append(cp, n) } + sort.Sort(netSlice(cp)) return cp } @@ -339,3 +344,17 @@ func (mn *mocknet) LinkDefaults() LinkOptions { defer mn.RUnlock() return mn.linkDefaults } + +// netSlice for sorting by peer +type netSlice []inet.Network + +func (es netSlice) Len() int { return len(es) } +func (es netSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } +func (es netSlice) Less(i, j int) bool { return string(es[i].LocalPeer()) < string(es[j].LocalPeer()) } + +// hostSlice for sorting by peer +type hostSlice []host.Host + +func (es hostSlice) Len() int { return len(es) } +func (es hostSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } +func (es hostSlice) Less(i, j int) bool { return string(es[i].ID()) < string(es[j].ID()) } diff --git a/p2p/net/mock/mock_test.go b/p2p/net/mock/mock_test.go index 268b35f4d..1b1ca7ddb 100644 --- a/p2p/net/mock/mock_test.go +++ b/p2p/net/mock/mock_test.go @@ -26,15 +26,15 @@ func randPeer(t *testing.T) peer.ID { func TestNetworkSetup(t *testing.T) { ctx := context.Background() - sk1, _, err := testutil.RandKeyPair(512) + sk1, _, err := testutil.RandTestKeyPair(512) if err != nil { t.Fatal(t) } - sk2, _, err := testutil.RandKeyPair(512) + sk2, _, err := testutil.RandTestKeyPair(512) if err != nil { t.Fatal(t) } - sk3, _, err := testutil.RandKeyPair(512) + sk3, _, err := testutil.RandTestKeyPair(512) if err != nil { t.Fatal(t) } @@ -398,7 +398,7 @@ func TestAdding(t *testing.T) { peers := []peer.ID{} for i := 0; i < 3; i++ { - sk, _, err := testutil.RandKeyPair(512) + sk, _, err := testutil.RandTestKeyPair(512) if err != nil { t.Fatal(err) } diff --git a/p2p/peer/peer.go b/p2p/peer/peer.go index fa4e448d6..c29e23283 100644 --- a/p2p/peer/peer.go +++ b/p2p/peer/peer.go @@ -130,3 +130,10 @@ type PeerInfo struct { ID ID Addrs []ma.Multiaddr } + +// IDSlice for sorting peers +type IDSlice []ID + +func (es IDSlice) Len() int { return len(es) } +func (es IDSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } +func (es IDSlice) Less(i, j int) bool { return string(es[i]) < string(es[j]) } diff --git a/p2p/peer/peer_test.go b/p2p/peer/peer_test.go index b19d0faef..d5bd793fd 100644 --- a/p2p/peer/peer_test.go +++ b/p2p/peer/peer_test.go @@ -41,7 +41,7 @@ type keyset struct { func (ks *keyset) generate() error { var err error - ks.sk, ks.pk, err = tu.RandKeyPair(512) + ks.sk, ks.pk, err = tu.RandTestKeyPair(512) if err != nil { return err } diff --git a/p2p/peer/queue/sync.go b/p2p/peer/queue/sync.go index 3f75cd0cf..3d7aa68ad 100644 --- a/p2p/peer/queue/sync.go +++ b/p2p/peer/queue/sync.go @@ -4,8 +4,11 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" peer "github.com/jbenet/go-ipfs/p2p/peer" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) +var log = eventlog.Logger("peerqueue") + // ChanQueue makes any PeerQueue synchronizable through channels. type ChanQueue struct { Queue PeerQueue @@ -21,6 +24,7 @@ func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue { } func (cq *ChanQueue) process(ctx context.Context) { + log := log.Prefix("", cq) // construct the channels here to be able to use them bidirectionally enqChan := make(chan peer.ID) @@ -30,6 +34,8 @@ func (cq *ChanQueue) process(ctx context.Context) { cq.DeqChan = deqChan go func() { + log.Debug("processing") + defer log.Debug("closed") defer close(deqChan) var next peer.ID @@ -38,11 +44,13 @@ func (cq *ChanQueue) process(ctx context.Context) { for { if cq.Queue.Len() == 0 { + // log.Debug("wait for enqueue") select { case next, more = <-enqChan: if !more { return } + // log.Debug("got", next) case <-ctx.Done(): return @@ -50,19 +58,24 @@ func (cq *ChanQueue) process(ctx context.Context) { } else { next = cq.Queue.Dequeue() + // log.Debug("peek", next) } select { case item, more = <-enqChan: if !more { - return + if cq.Queue.Len() > 0 { + return // we're done done. + } + enqChan = nil // closed, so no use. } - + // log.Debug("got", item) cq.Queue.Enqueue(item) - cq.Queue.Enqueue(next) + cq.Queue.Enqueue(next) // order may have changed. next = "" case deqChan <- next: + // log.Debug("dequeued", next) next = "" case <-ctx.Done(): diff --git a/p2p/test/util/key.go b/p2p/test/util/key.go new file mode 100644 index 000000000..fc4f3af4e --- /dev/null +++ b/p2p/test/util/key.go @@ -0,0 +1,165 @@ +package testutil + +import ( + "bytes" + "io" + "testing" + + u "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" + testutil "github.com/jbenet/go-ipfs/util/testutil" + + ic "github.com/jbenet/go-ipfs/p2p/crypto" + peer "github.com/jbenet/go-ipfs/p2p/peer" + + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) + +var log = eventlog.Logger("boguskey") + +// TestBogusPrivateKey is a key used for testing (to avoid expensive keygen) +type TestBogusPrivateKey []byte + +// TestBogusPublicKey is a key used for testing (to avoid expensive keygen) +type TestBogusPublicKey []byte + +func (pk TestBogusPublicKey) Verify(data, sig []byte) (bool, error) { + log.Criticalf("TestBogusPublicKey.Verify -- this better be a test!") + return bytes.Equal(data, reverse(sig)), nil +} + +func (pk TestBogusPublicKey) Bytes() ([]byte, error) { + return []byte(pk), nil +} + +func (pk TestBogusPublicKey) Encrypt(b []byte) ([]byte, error) { + log.Criticalf("TestBogusPublicKey.Encrypt -- this better be a test!") + return reverse(b), nil +} + +// Equals checks whether this key is equal to another +func (pk TestBogusPublicKey) Equals(k ic.Key) bool { + return ic.KeyEqual(pk, k) +} + +func (pk TestBogusPublicKey) Hash() ([]byte, error) { + return ic.KeyHash(pk) +} + +func (sk TestBogusPrivateKey) GenSecret() []byte { + return []byte(sk) +} + +func (sk TestBogusPrivateKey) Sign(message []byte) ([]byte, error) { + log.Criticalf("TestBogusPrivateKey.Sign -- this better be a test!") + return reverse(message), nil +} + +func (sk TestBogusPrivateKey) GetPublic() ic.PubKey { + return TestBogusPublicKey(sk) +} + +func (sk TestBogusPrivateKey) Decrypt(b []byte) ([]byte, error) { + log.Criticalf("TestBogusPrivateKey.Decrypt -- this better be a test!") + return reverse(b), nil +} + +func (sk TestBogusPrivateKey) Bytes() ([]byte, error) { + return []byte(sk), nil +} + +// Equals checks whether this key is equal to another +func (sk TestBogusPrivateKey) Equals(k ic.Key) bool { + return ic.KeyEqual(sk, k) +} + +func (sk TestBogusPrivateKey) Hash() ([]byte, error) { + return ic.KeyHash(sk) +} + +func RandTestBogusPrivateKey() (TestBogusPrivateKey, error) { + r := u.NewTimeSeededRand() + k := make([]byte, 5) + if _, err := io.ReadFull(r, k); err != nil { + return nil, err + } + return TestBogusPrivateKey(k), nil +} + +func RandTestBogusPublicKey() (TestBogusPublicKey, error) { + k, err := RandTestBogusPrivateKey() + return TestBogusPublicKey(k), err +} + +func RandTestBogusPrivateKeyOrFatal(t *testing.T) TestBogusPrivateKey { + k, err := RandTestBogusPrivateKey() + if err != nil { + t.Fatal(err) + } + return k +} + +func RandTestBogusPublicKeyOrFatal(t *testing.T) TestBogusPublicKey { + k, err := RandTestBogusPublicKey() + if err != nil { + t.Fatal(err) + } + return k +} + +func RandTestBogusIdentity() (testutil.Identity, error) { + k, err := RandTestBogusPrivateKey() + if err != nil { + return nil, err + } + + id, err := peer.IDFromPrivateKey(k) + if err != nil { + return nil, err + } + + return &identity{ + k: k, + id: id, + a: testutil.RandLocalTCPAddress(), + }, nil +} + +func RandTestBogusIdentityOrFatal(t *testing.T) testutil.Identity { + k, err := RandTestBogusIdentity() + if err != nil { + t.Fatal(err) + } + return k +} + +// identity is a temporary shim to delay binding of PeerNetParams. +type identity struct { + k TestBogusPrivateKey + id peer.ID + a ma.Multiaddr +} + +func (p *identity) ID() peer.ID { + return p.id +} + +func (p *identity) Address() ma.Multiaddr { + return p.a +} + +func (p *identity) PrivateKey() ic.PrivKey { + return p.k +} + +func (p *identity) PublicKey() ic.PubKey { + return p.k.GetPublic() +} + +func reverse(a []byte) []byte { + b := make([]byte, len(a)) + for i := 0; i < len(a); i++ { + b[i] = a[len(a)-1-i] + } + return b +} diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 2a576629a..17d300d87 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -89,6 +89,11 @@ func (dht *IpfsDHT) LocalPeer() peer.ID { return dht.self } +// log returns the dht's logger +func (dht *IpfsDHT) log() eventlog.EventLogger { + return log.Prefix("dht(%s)", dht.self) +} + // Connect to a new peer at the given address, ping and add to the routing table func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error { // TODO: change interface to accept a PeerInfo as well. diff --git a/routing/dht/dht_net.go b/routing/dht/dht_net.go index fd088e02c..2b857ce2b 100644 --- a/routing/dht/dht_net.go +++ b/routing/dht/dht_net.go @@ -87,15 +87,11 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message start := time.Now() - log.Debugf("%s writing", dht.self) if err := w.WriteMsg(pmes); err != nil { return nil, err } log.Event(ctx, "dhtSentMessage", dht.self, p, pmes) - log.Debugf("%s reading", dht.self) - defer log.Debugf("%s done", dht.self) - rpmes := new(pb.Message) if err := r.ReadMsg(rpmes); err != nil { return nil, err @@ -125,12 +121,10 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func w := ggio.NewDelimitedWriter(cw) - log.Debugf("%s writing", dht.self) if err := w.WriteMsg(pmes); err != nil { return err } log.Event(ctx, "dhtSentMessage", dht.self, p, pmes) - log.Debugf("%s done", dht.self) return nil } diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 133f7a27c..147970695 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -231,6 +231,7 @@ func TestProvides(t *testing.T) { } func TestBootstrap(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } @@ -388,6 +389,7 @@ func TestProvidesMany(t *testing.T) { } func TestProvidesAsync(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } @@ -442,6 +444,7 @@ func TestProvidesAsync(t *testing.T) { } func TestLayeredGet(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } @@ -482,6 +485,7 @@ func TestLayeredGet(t *testing.T) { } func TestFindPeer(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } @@ -596,6 +600,7 @@ func testPeerListsMatch(t *testing.T, p1, p2 []peer.ID) { } func TestConnectCollision(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 2be8127c7..6f12c3113 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -32,11 +32,10 @@ func TestGetFailures(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - peers := mn.Peers() tsds := dssync.MutexWrap(ds.NewMapDatastore()) d := NewDHT(ctx, hosts[0], tsds) - d.Update(ctx, peers[1]) + d.Update(ctx, hosts[1].ID()) // u.POut("NotFound Test\n") // Reply with failures to every message @@ -47,7 +46,7 @@ func TestGetFailures(t *testing.T) { // This one should time out // u.POut("Timout Test\n") - ctx1, _ := context.WithTimeout(context.Background(), time.Second) + ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond) if _, err := d.GetValue(ctx1, u.Key("test")); err != nil { if err != context.DeadlineExceeded { t.Fatal("Got different error than we expected", err) @@ -78,8 +77,12 @@ func TestGetFailures(t *testing.T) { } }) - // This one should fail with NotFound - ctx2, _ := context.WithTimeout(context.Background(), 3*time.Second) + // This one should fail with NotFound. + // long context timeout to ensure we dont end too early. + // the dht should be exhausting its query and returning not found. + // (was 3 seconds before which should be _plenty_ of time, but maybe + // travis machines really have a hard time...) + ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second) _, err = d.GetValue(ctx2, u.Key("test")) if err != nil { if err != routing.ErrNotFound { @@ -133,6 +136,7 @@ func TestGetFailures(t *testing.T) { } func TestNotFound(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } @@ -143,12 +147,11 @@ func TestNotFound(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - peers := mn.Peers() tsds := dssync.MutexWrap(ds.NewMapDatastore()) d := NewDHT(ctx, hosts[0], tsds) - for _, p := range peers { - d.Update(ctx, p) + for _, p := range hosts { + d.Update(ctx, p.ID()) } // Reply with random peers to every message @@ -171,7 +174,7 @@ func TestNotFound(t *testing.T) { ps := []peer.PeerInfo{} for i := 0; i < 7; i++ { - p := peers[rand.Intn(len(peers))] + p := hosts[rand.Intn(len(hosts))].ID() pi := host.Peerstore().PeerInfo(p) ps = append(ps, pi) } @@ -187,7 +190,8 @@ func TestNotFound(t *testing.T) { }) } - ctx, _ = context.WithTimeout(ctx, time.Second*5) + // long timeout to ensure timing is not at play. + ctx, _ = context.WithTimeout(ctx, time.Second*20) v, err := d.GetValue(ctx, u.Key("hello")) log.Debugf("get value got %v", v) if err != nil { @@ -207,6 +211,7 @@ func TestNotFound(t *testing.T) { // If less than K nodes are in the entire network, it should fail when we make // a GET rpc and nobody has the value func TestLessThanKResponses(t *testing.T) { + // t.Skip("skipping test to debug another") // t.Skip("skipping test because it makes a lot of output") ctx := context.Background() @@ -215,13 +220,12 @@ func TestLessThanKResponses(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - peers := mn.Peers() tsds := dssync.MutexWrap(ds.NewMapDatastore()) d := NewDHT(ctx, hosts[0], tsds) for i := 1; i < 5; i++ { - d.Update(ctx, peers[i]) + d.Update(ctx, hosts[i].ID()) } // Reply with random peers to every message @@ -240,7 +244,7 @@ func TestLessThanKResponses(t *testing.T) { switch pmes.GetType() { case pb.Message_GET_VALUE: - pi := host.Peerstore().PeerInfo(peers[1]) + pi := host.Peerstore().PeerInfo(hosts[1].ID()) resp := &pb.Message{ Type: pmes.Type, CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.PeerInfo{pi}), diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 3670c570d..8f66afbf6 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -148,7 +148,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess } if closest == nil { - log.Warningf("handleFindPeer: could not find anything.") + log.Warningf("%s handleFindPeer %s: could not find anything.", dht.self, p) return resp, nil } @@ -167,25 +167,31 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) + key := u.Key(pmes.GetKey()) + + // debug logging niceness. + reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key) + log.Debugf("%s begin", reqDesc) + defer log.Debugf("%s end", reqDesc) // check if we have this value, to add ourselves as provider. - log.Debugf("handling GetProviders: '%s'", u.Key(pmes.GetKey())) - dsk := u.Key(pmes.GetKey()).DsKey() - has, err := dht.datastore.Has(dsk) + has, err := dht.datastore.Has(key.DsKey()) if err != nil && err != ds.ErrNotFound { log.Errorf("unexpected datastore error: %v\n", err) has = false } // setup providers - providers := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey())) + providers := dht.providers.GetProviders(ctx, key) if has { providers = append(providers, dht.self) + log.Debugf("%s have the value. added self as provider", reqDesc) } if providers != nil && len(providers) > 0 { infos := peer.PeerInfos(dht.peerstore, providers) resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos) + log.Debugf("%s have %d providers: %s", reqDesc, len(providers), infos) } // Also send closer peers. @@ -193,6 +199,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. if closer != nil { infos := peer.PeerInfos(dht.peerstore, providers) resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos) + log.Debugf("%s have %d closer peers: %s", reqDesc, len(closer), infos) } return resp, nil @@ -206,7 +213,7 @@ type providerInfo struct { func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { key := u.Key(pmes.GetKey()) - log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key)) + log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key) // add provider should use the address given in the message pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) diff --git a/routing/dht/query.go b/routing/dht/query.go index 0056bee1d..44dc49926 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -7,6 +7,7 @@ import ( queue "github.com/jbenet/go-ipfs/p2p/peer/queue" "github.com/jbenet/go-ipfs/routing" u "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" pset "github.com/jbenet/go-ipfs/util/peerset" todoctr "github.com/jbenet/go-ipfs/util/todocounter" @@ -55,32 +56,18 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, e } type dhtQueryRunner struct { + query *dhtQuery // query to run + peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x + peersToQuery *queue.ChanQueue // peers remaining to be queried + peersRemaining todoctr.Counter // peersToQuery + currently processing - // the query to run - query *dhtQuery + result *dhtQueryResult // query result + errs []error // result errors. maybe should be a map[peer.ID]error - // peersToQuery is a list of peers remaining to query - peersToQuery *queue.ChanQueue + rateLimit chan struct{} // processing semaphore + log eventlog.EventLogger - // peersSeen are all the peers queried. used to prevent querying same peer 2x - peersSeen *pset.PeerSet - - // rateLimit is a channel used to rate limit our processing (semaphore) - rateLimit chan struct{} - - // peersRemaining is a counter of peers remaining (toQuery + processing) - peersRemaining todoctr.Counter - - // context group cg ctxgroup.ContextGroup - - // result - result *dhtQueryResult - - // result errors - errs []error - - // lock for concurrent access to fields sync.RWMutex } @@ -96,6 +83,11 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner { } func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { + log := log.Prefix("dht(%s).Query(%s).Run(%d)", r.query.dht.self, r.query.key, len(peers)) + r.log = log + log.Debug("enter") + defer log.Debug("end") + log.Debugf("Run query with %d peers.", len(peers)) if len(peers) == 0 { log.Warning("Running query with no peers!") @@ -115,6 +107,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { // go do this thing. // do it as a child func to make sure Run exits // ONLY AFTER spawn workers has exited. + log.Debugf("go spawn workers") r.cg.AddChildFunc(r.spawnWorkers) // so workers are working. @@ -124,41 +117,45 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { select { case <-r.peersRemaining.Done(): + log.Debug("all peers ended") r.cg.Close() r.RLock() defer r.RUnlock() if len(r.errs) > 0 { - err = r.errs[0] + err = r.errs[0] // take the first? } case <-r.cg.Closed(): + log.Debug("r.cg.Closed()") + r.RLock() defer r.RUnlock() err = r.cg.Context().Err() // collect the error. } if r.result != nil && r.result.success { + log.Debug("success: %s", r.result) return r.result, nil } + log.Debug("failure: %s", err) return nil, err } func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) { // if new peer is ourselves... if next == r.query.dht.self { + r.log.Debug("addPeerToQuery skip self") return } if !r.peersSeen.TryAdd(next) { - log.Debug("query peer was already seen") + r.log.Debugf("addPeerToQuery skip seen %s", next) return } - log.Debugf("adding peer to query: %v", next) - - // do this after unlocking to prevent possible deadlocks. + r.log.Debugf("addPeerToQuery adding %s", next) r.peersRemaining.Increment(1) select { case r.peersToQuery.EnqChan <- next: @@ -167,6 +164,10 @@ func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) { } func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) { + log := r.log.Prefix("spawnWorkers") + log.Debugf("begin") + defer log.Debugf("end") + for { select { @@ -192,7 +193,9 @@ func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) { } func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { - log.Debugf("spawned worker for: %v", p) + log := r.log.Prefix("queryPeer(%s)", p) + log.Debugf("spawned") + defer log.Debugf("finished") // make sure we rate limit concurrency. select { @@ -203,34 +206,36 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { } // ok let's do this! - log.Debugf("running worker for: %v", p) + log.Debugf("running") // make sure we do this when we exit defer func() { // signal we're done proccessing peer p - log.Debugf("completing worker for: %v", p) + log.Debugf("completed") r.peersRemaining.Decrement(1) r.rateLimit <- struct{}{} }() // make sure we're connected to the peer. if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 { - log.Infof("worker for: %v -- not connected. dial start", p) + log.Infof("not connected. dialing.") pi := peer.PeerInfo{ID: p} if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil { - log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err) + log.Debugf("Error connecting: %s", err) r.Lock() r.errs = append(r.errs, err) r.Unlock() return } - log.Infof("worker for: %v -- not connected. dial success!", p) + log.Debugf("connected. dial success.") } // finally, run the query against this peer + log.Debugf("query running") res, err := r.query.qfunc(cg.Context(), p) + log.Debugf("query finished") if err != nil { log.Debugf("ERROR worker for: %v %v", p, err) @@ -239,7 +244,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { r.Unlock() } else if res.success { - log.Debugf("SUCCESS worker for: %v", p, res) + log.Debugf("SUCCESS worker for: %v %s", p, res) r.Lock() r.result = res r.Unlock() diff --git a/routing/dht/records.go b/routing/dht/records.go index 0791f80a3..083eeb26e 100644 --- a/routing/dht/records.go +++ b/routing/dht/records.go @@ -191,8 +191,8 @@ func (dht *IpfsDHT) verifyRecord(r *pb.Record, pk ci.PubKey) error { // Now, check validity func parts := strings.Split(r.GetKey(), "/") if len(parts) < 3 { - log.Errorf("Record had bad key: %s", u.Key(r.GetKey())) - return ErrBadRecord + log.Infof("Record key does not have validator: %s", u.Key(r.GetKey())) + return nil } fnc, ok := dht.Validators[parts[1]] diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 2f00929b6..5978a9a80 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -65,25 +65,29 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error // If the search does not succeed, a multiaddr string of a closer peer is // returned along with util.ErrSearchIncomplete func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { - log.Debugf("Get Value [%s]", key) + log := dht.log().Prefix("GetValue(%s)", key) + log.Debugf("start") + defer log.Debugf("end") // If we have it local, dont bother doing an RPC! val, err := dht.getLocal(key) if err == nil { - log.Debug("Got value locally!") + log.Debug("have it locally") return val, nil } // get closest peers in the routing table + rtp := dht.routingTable.ListPeers() + log.Debugf("peers in rt: %s", len(rtp), rtp) + closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize) if closest == nil || len(closest) == 0 { - log.Warning("Got no peers back from routing table!") + log.Warning("No peers from routing table!") return nil, errors.Wrap(kb.ErrLookupFailure) } // setup the Query query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { - val, peers, err := dht.getValueOrPeers(ctx, p, key) if err != nil { return nil, err @@ -116,9 +120,13 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { - + log := dht.log().Prefix("Provide(%s)", key) + log.Debugf("start", key) log.Event(ctx, "provideBegin", &key) + defer log.Debugf("end", key) defer log.Event(ctx, "provideEnd", &key) + + // add self locally dht.providers.AddProvider(key, dht.self) peers, err := dht.getClosestPeers(ctx, key) @@ -131,6 +139,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { wg.Add(1) go func(p peer.ID) { defer wg.Done() + log.Debugf("putProvider(%s, %s)", key, p) err := dht.putProvider(ctx, p, string(key)) if err != nil { log.Error(err) @@ -230,9 +239,12 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int } func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) { + log := dht.log().Prefix("FindProviders(%s)", key) + defer close(peerOut) defer log.Event(ctx, "findProviders end", &key) - log.Debugf("%s FindProviders %s", dht.self, key) + log.Debug("begin") + defer log.Debug("begin") ps := pset.NewLimited(count) provs := dht.providers.GetProviders(ctx, key) @@ -254,17 +266,24 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // setup the Query query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + log := log.Prefix("Query(%s)", p) + log.Debugf("begin") + defer log.Debugf("end") pmes, err := dht.findProvidersSingle(ctx, p, key) if err != nil { return nil, err } + log.Debugf("%d provider entries", len(pmes.GetProviderPeers())) provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) + log.Debugf("%d provider entries decoded", len(provs)) // Add unique providers from request, up to 'count' for _, prov := range provs { + log.Debugf("got provider: %s", prov) if ps.TryAdd(prov.ID) { + log.Debugf("using provider: %s", prov) select { case peerOut <- prov: case <-ctx.Done(): @@ -273,6 +292,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co } } if ps.Size() >= count { + log.Debugf("got enough providers (%d/%d)", ps.Size(), count) return &dhtQueryResult{success: true}, nil } } @@ -280,13 +300,14 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // Give closer peers back to the query to be queried closer := pmes.GetCloserPeers() clpeers := pb.PBPeersToPeerInfos(closer) + log.Debugf("got closer peers: %d %s", len(clpeers), clpeers) return &dhtQueryResult{closerPeers: clpeers}, nil }) peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) _, err := query.Run(ctx, peers) if err != nil { - log.Errorf("FindProviders Query error: %s", err) + log.Errorf("Query error: %s", err) } } diff --git a/test/Makefile b/test/Makefile index 4a62d6e1a..8cd48719d 100644 --- a/test/Makefile +++ b/test/Makefile @@ -14,7 +14,7 @@ all: clean deps $(T) aggregate clean: @echo "*** $@ ***" - -rm -r test-results + -rm -rf test-results $(T): @echo "*** $@ ***" diff --git a/util/eventlog/log.go b/util/eventlog/log.go index 4f9757bd1..c0a4a8fe3 100644 --- a/util/eventlog/log.go +++ b/util/eventlog/log.go @@ -2,14 +2,19 @@ package eventlog import ( "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - logging "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging" - "github.com/jbenet/go-ipfs/util" + + prelog "github.com/jbenet/go-ipfs/util/prefixlog" ) // EventLogger extends the StandardLogger interface to allow for log items // containing structured metadata type EventLogger interface { - StandardLogger + prelog.StandardLogger + + // Prefix is like PrefixLogger.Prefix. We override it here + // because the type changes (we return EventLogger). + // It's what happens when you wrap interfaces. + Prefix(fmt string, args ...interface{}) EventLogger // Event merges structured data from the provided inputs into a single // machine-readable log event. @@ -27,43 +32,27 @@ type EventLogger interface { Event(ctx context.Context, event string, m ...Loggable) } -// StandardLogger provides API compatibility with standard printf loggers -// eg. go-logging -type StandardLogger interface { - Critical(args ...interface{}) - Criticalf(format string, args ...interface{}) - Debug(args ...interface{}) - Debugf(format string, args ...interface{}) - Error(args ...interface{}) - Errorf(format string, args ...interface{}) - Fatal(args ...interface{}) - Fatalf(format string, args ...interface{}) - Info(args ...interface{}) - Infof(format string, args ...interface{}) - Notice(args ...interface{}) - Noticef(format string, args ...interface{}) - Panic(args ...interface{}) - Panicf(format string, args ...interface{}) - Warning(args ...interface{}) - Warningf(format string, args ...interface{}) -} - // Logger retrieves an event logger by name func Logger(system string) EventLogger { // TODO if we would like to adjust log levels at run-time. Store this event // logger in a map (just like the util.Logger impl) - - return &eventLogger{system: system, Logger: util.Logger(system)} + return &eventLogger{system: system, PrefixLogger: prelog.Logger(system)} } // eventLogger implements the EventLogger and wraps a go-logging Logger type eventLogger struct { - *logging.Logger + prelog.PrefixLogger + system string // TODO add log-level } +func (el *eventLogger) Prefix(fmt string, args ...interface{}) EventLogger { + l := el.PrefixLogger.Prefix(fmt, args...) + return &eventLogger{system: el.system, PrefixLogger: l} +} + func (el *eventLogger) Event(ctx context.Context, event string, metadata ...Loggable) { // Collect loggables for later logging diff --git a/util/prefixlog/prefixlog.go b/util/prefixlog/prefixlog.go new file mode 100644 index 000000000..1ab90bad2 --- /dev/null +++ b/util/prefixlog/prefixlog.go @@ -0,0 +1,151 @@ +package eventlog + +import ( + "strings" + + "github.com/jbenet/go-ipfs/util" +) + +// StandardLogger provides API compatibility with standard printf loggers +// eg. go-logging +type StandardLogger interface { + Critical(args ...interface{}) + Criticalf(format string, args ...interface{}) + Debug(args ...interface{}) + Debugf(format string, args ...interface{}) + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Fatal(args ...interface{}) + Fatalf(format string, args ...interface{}) + Info(args ...interface{}) + Infof(format string, args ...interface{}) + Notice(args ...interface{}) + Noticef(format string, args ...interface{}) + Panic(args ...interface{}) + Panicf(format string, args ...interface{}) + Warning(args ...interface{}) + Warningf(format string, args ...interface{}) +} + +// StandardLogger provides API compatibility with standard printf loggers +// eg. go-logging +type PrefixLogger interface { + StandardLogger + + Format() string + Args() []interface{} + + Prefix(fmt string, args ...interface{}) PrefixLogger +} + +// Logger retrieves an event logger by name +func Logger(system string) PrefixLogger { + + // TODO if we would like to adjust log levels at run-time. Store this event + // logger in a map (just like the util.Logger impl) + + logger := util.Logger(system) + return Prefix(logger, "") +} + +func Prefix(l StandardLogger, format string, args ...interface{}) PrefixLogger { + return &prefixLogger{logger: l, format: format, args: args} +} + +type prefixLogger struct { + logger StandardLogger + format string + args []interface{} +} + +func (pl *prefixLogger) Format() string { + return pl.format +} + +func (pl *prefixLogger) Args() []interface{} { + return pl.args +} + +func (pl *prefixLogger) Prefix(fmt string, args ...interface{}) PrefixLogger { + return Prefix(pl, fmt, args...) +} + +func (pl *prefixLogger) prepend(fmt string, args []interface{}) (string, []interface{}) { + together := make([]interface{}, 0, len(pl.args)+len(args)) + together = append(together, pl.args...) + together = append(together, args...) + if len(pl.format) > 0 { + fmt = pl.format + " " + fmt + } + return fmt, together +} + +func valfmtn(count int) string { + s := strings.Repeat("%v ", count) + s = s[:len(s)-1] // remove last space + return s +} + +type logFunc func(args ...interface{}) +type logFuncf func(fmt string, args ...interface{}) + +func (pl *prefixLogger) logFunc(f logFuncf, args ...interface{}) { + // need to actually use the format version, with extra fmt strings appended + fmt := valfmtn(len(args)) + pl.logFuncf(f, fmt, args...) +} + +func (pl *prefixLogger) logFuncf(f logFuncf, format string, args ...interface{}) { + format, args = pl.prepend(format, args) + f(format, args...) +} + +func (pl *prefixLogger) Critical(args ...interface{}) { + pl.logFunc(pl.logger.Criticalf, args...) +} +func (pl *prefixLogger) Debug(args ...interface{}) { + pl.logFunc(pl.logger.Debugf, args...) +} +func (pl *prefixLogger) Error(args ...interface{}) { + pl.logFunc(pl.logger.Errorf, args...) +} +func (pl *prefixLogger) Fatal(args ...interface{}) { + pl.logFunc(pl.logger.Fatalf, args...) +} +func (pl *prefixLogger) Info(args ...interface{}) { + pl.logFunc(pl.logger.Infof, args...) +} +func (pl *prefixLogger) Notice(args ...interface{}) { + pl.logFunc(pl.logger.Noticef, args...) +} +func (pl *prefixLogger) Panic(args ...interface{}) { + pl.logFunc(pl.logger.Panicf, args...) +} +func (pl *prefixLogger) Warning(args ...interface{}) { + pl.logFunc(pl.logger.Warningf, args...) +} + +func (pl *prefixLogger) Criticalf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Criticalf, format, args...) +} +func (pl *prefixLogger) Debugf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Debugf, format, args...) +} +func (pl *prefixLogger) Errorf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Errorf, format, args...) +} +func (pl *prefixLogger) Fatalf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Fatalf, format, args...) +} +func (pl *prefixLogger) Infof(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Infof, format, args...) +} +func (pl *prefixLogger) Noticef(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Noticef, format, args...) +} +func (pl *prefixLogger) Panicf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Panicf, format, args...) +} +func (pl *prefixLogger) Warningf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Warningf, format, args...) +} diff --git a/util/testutil/gen.go b/util/testutil/gen.go index 93bca120d..2097c6117 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -29,11 +29,11 @@ func init() { ZeroLocalTCPAddress = maddr } -func RandKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) { +func RandTestKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) { return ci.GenerateKeyPairWithReader(ci.RSA, bits, u.NewTimeSeededRand()) } -func SeededKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) { +func SeededTestKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) { return ci.GenerateKeyPairWithReader(ci.RSA, 512, u.NewSeededRand(seed)) } @@ -142,7 +142,7 @@ func RandPeerNetParams() (*PeerNetParams, error) { var p PeerNetParams var err error p.Addr = ZeroLocalTCPAddress - p.PrivKey, p.PubKey, err = RandKeyPair(512) + p.PrivKey, p.PubKey, err = RandTestKeyPair(512) if err != nil { return nil, err }