From e3223a073e1674301786c9f3adc6a6de07739d78 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 00:29:31 -0800 Subject: [PATCH 01/28] dockertest: logging niceness --- dockertest/client/run.sh | 12 +++++++----- dockertest/server/run.sh | 5 +++-- 2 files changed, 10 insertions(+), 7 deletions(-) diff --git a/dockertest/client/run.sh b/dockertest/client/run.sh index 4de79278c..edbf86431 100644 --- a/dockertest/client/run.sh +++ b/dockertest/client/run.sh @@ -1,14 +1,16 @@ ipfs bootstrap add /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_TCP_PORT/QmNXuBh8HFsWq68Fid8dMbGNQTh7eG6hV9rr1fQyfmfomE + +echo "dockertest> starting client daemon" ipfs daemon & sleep 3 while [ ! -f /data/idtiny ] do - echo waiting for server to add the file... + echo "dockertest> waiting for server to add the file..." sleep 1 done -echo client found file with hash: $(cat /data/idtiny) +echo "dockertest> client found file with hash:" $(cat /data/idtiny) ipfs cat $(cat /data/idtiny) > filetiny @@ -23,10 +25,10 @@ fi while [ ! -f /data/idrand ] do - echo waiting for server to add the file... + echo "dockertest> waiting for server to add the file..." sleep 1 done -echo client found file with hash: $(cat /data/idrand) +echo "dockertest> client found file with hash:" $(cat /data/idrand) cat /data/idrand @@ -44,4 +46,4 @@ if (($? > 0)); then exit 1 fi -echo "success" +echo "dockertest> success" diff --git a/dockertest/server/run.sh b/dockertest/server/run.sh index f71b7ac22..6dab347d2 100644 --- a/dockertest/server/run.sh +++ b/dockertest/server/run.sh @@ -3,6 +3,7 @@ ipfs bootstrap add /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_T # wait for daemon to start/bootstrap # alternatively use ipfs swarm connect +echo "dockertest> starting server daemon" ipfs daemon & sleep 3 # TODO instead of bootrapping: ipfs swarm connect /ip4/$BOOTSTRAP_PORT_4011_TCP_ADDR/tcp/$BOOTSTRAP_PORT_4011_TCP_PORT/QmNXuBh8HFsWq68Fid8dMbGNQTh7eG6hV9rr1fQyfmfomE @@ -10,11 +11,11 @@ sleep 3 # must mount this volume from data container ipfs add -q /data/filetiny > tmptiny mv tmptiny /data/idtiny -echo added tiny file. hash is $(cat /data/idtiny) +echo "dockertest> added tiny file. hash is" $(cat /data/idtiny) ipfs add -q /data/filerand > tmprand mv tmprand /data/idrand -echo added rand file. hash is $(cat /data/idrand) +echo "dockertest> added rand file. hash is" $(cat /data/idrand) # allow ample time for the client to pull the data sleep 10000000 From 3b6350331f7de79af09be22df9fd0905bb6ed137 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 00:29:49 -0800 Subject: [PATCH 02/28] dht: debug dont cast Key as peer.ID --- routing/dht/handlers.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 3670c570d..546939ca0 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -206,7 +206,7 @@ type providerInfo struct { func (dht *IpfsDHT) handleAddProvider(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { key := u.Key(pmes.GetKey()) - log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, peer.ID(key)) + log.Debugf("%s adding %s as a provider for '%s'\n", dht.self, p, key) // add provider should use the address given in the message pinfos := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) From 17ce192af532ccdd1e9be9c6b74159d957a1795c Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 00:56:27 -0800 Subject: [PATCH 03/28] dht: some provider debug logging --- routing/dht/handlers.go | 15 +++++++++++---- routing/dht/routing.go | 11 +++++++++++ 2 files changed, 22 insertions(+), 4 deletions(-) diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 546939ca0..491176550 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -167,25 +167,31 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb.Message) (*pb.Message, error) { resp := pb.NewMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) + key := u.Key(pmes.GetKey()) + + // debug logging niceness. + reqDesc := fmt.Sprintf("%s handleGetProviders(%s, %s): ", dht.self, p, key) + log.Debugf("%s begin", reqDesc) + defer log.Debugf("%s end", reqDesc) // check if we have this value, to add ourselves as provider. - log.Debugf("handling GetProviders: '%s'", u.Key(pmes.GetKey())) - dsk := u.Key(pmes.GetKey()).DsKey() - has, err := dht.datastore.Has(dsk) + has, err := dht.datastore.Has(key.DsKey()) if err != nil && err != ds.ErrNotFound { log.Errorf("unexpected datastore error: %v\n", err) has = false } // setup providers - providers := dht.providers.GetProviders(ctx, u.Key(pmes.GetKey())) + providers := dht.providers.GetProviders(ctx, key) if has { providers = append(providers, dht.self) + log.Debugf("%s have the value. added self as provider", reqDesc) } if providers != nil && len(providers) > 0 { infos := peer.PeerInfos(dht.peerstore, providers) resp.ProviderPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos) + log.Debugf("%s have %d providers: %s", reqDesc, len(providers), infos) } // Also send closer peers. @@ -193,6 +199,7 @@ func (dht *IpfsDHT) handleGetProviders(ctx context.Context, p peer.ID, pmes *pb. if closer != nil { infos := peer.PeerInfos(dht.peerstore, providers) resp.CloserPeers = pb.PeerInfosToPBPeers(dht.host.Network(), infos) + log.Debugf("%s have %d closer peers: %s", reqDesc, len(closer), infos) } return resp, nil diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 2f00929b6..ec414de13 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -1,6 +1,7 @@ package dht import ( + "fmt" "math" "sync" @@ -255,16 +256,24 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // setup the Query query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { + reqDesc := fmt.Sprintf("%s findProviders(%s).Query(%s): ", dht.self, key, p) + log.Debugf("%s begin", reqDesc) + defer log.Debugf("%s end", reqDesc) + pmes, err := dht.findProvidersSingle(ctx, p, key) if err != nil { return nil, err } + log.Debugf("%s got %d provider entries", reqDesc, len(pmes.GetProviderPeers())) provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) + log.Debugf("%s got %d provider entries decoded", reqDesc, len(provs)) // Add unique providers from request, up to 'count' for _, prov := range provs { + log.Debugf("%s got provider: %s", reqDesc, prov) if ps.TryAdd(prov.ID) { + log.Debugf("%s using provider: %s", reqDesc, prov) select { case peerOut <- prov: case <-ctx.Done(): @@ -273,6 +282,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co } } if ps.Size() >= count { + log.Debugf("%s got enough providers (%d/%d)", reqDesc, ps.Size(), count) return &dhtQueryResult{success: true}, nil } } @@ -280,6 +290,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // Give closer peers back to the query to be queried closer := pmes.GetCloserPeers() clpeers := pb.PBPeersToPeerInfos(closer) + log.Debugf("%s got closer peers: %s", reqDesc, clpeers) return &dhtQueryResult{closerPeers: clpeers}, nil }) From 7629ad7ec7f40a21ee3b134f6504122a8052fbc2 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 03:01:17 -0800 Subject: [PATCH 04/28] bitswap: add self peer.ID --- exchange/bitswap/bitswap.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index fe20a406a..cea618970 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -61,6 +61,7 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, }() bs := &bitswap{ + self: p, blockstore: bstore, cancelFunc: cancelFunc, notifications: notif, @@ -79,6 +80,9 @@ func New(parent context.Context, p peer.ID, network bsnet.BitSwapNetwork, // bitswap instances implement the bitswap protocol. type bitswap struct { + // the ID of the peer to act on behalf of + self peer.ID + // network delivers messages on behalf of the session network bsnet.BitSwapNetwork From 5639042df52d1b6a9a4ffb5e493a89f90c4c8329 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 03:01:45 -0800 Subject: [PATCH 05/28] bitswap: send wantlist code reuse + debug logs --- exchange/bitswap/bitswap.go | 87 +++++++++++++++++++++++++++---------- 1 file changed, 63 insertions(+), 24 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index cea618970..79e5a576c 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -3,6 +3,7 @@ package bitswap import ( + "fmt" "math" "sync" "time" @@ -170,58 +171,96 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { return bs.network.Provide(ctx, blk.Key()) } -func (bs *bitswap) sendWantListTo(ctx context.Context, peers <-chan peer.ID) error { +func (bs *bitswap) sendWantlistMsgToPeer(ctx context.Context, m bsmsg.BitSwapMessage, p peer.ID) error { + logd := fmt.Sprintf("%s bitswap.sendWantlistMsgToPeer(%d, %s)", bs.self, len(m.Wantlist()), p) + + log.Debugf("%s sending wantlist", logd) + if err := bs.send(ctx, p, m); err != nil { + log.Errorf("%s send wantlist error: %s", logd, err) + return err + } + log.Debugf("%s send wantlist success", logd) + return nil +} + +func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMessage, peers <-chan peer.ID) error { if peers == nil { panic("Cant send wantlist to nil peerchan") } - message := bsmsg.New() - for _, wanted := range bs.wantlist.Entries() { - message.AddEntry(wanted.Key, wanted.Priority) - } + + logd := fmt.Sprintf("%s bitswap.sendWantlistMsgTo(%d)", bs.self, len(m.Wantlist())) + log.Debugf("%s begin", logd) + defer log.Debugf("%s end", logd) + + set := pset.New() wg := sync.WaitGroup{} for peerToQuery := range peers { log.Event(ctx, "PeerToQuery", peerToQuery) + logd := fmt.Sprintf("%sto(%s)", logd, peerToQuery) + + if !set.TryAdd(peerToQuery) { //Do once per peer + log.Debugf("%s skipped (already sent)", logd) + continue + } + wg.Add(1) go func(p peer.ID) { defer wg.Done() - if err := bs.send(ctx, p, message); err != nil { - log.Error(err) - return - } + bs.sendWantlistMsgToPeer(ctx, m, p) }(peerToQuery) } wg.Wait() return nil } -func (bs *bitswap) sendWantlistToProviders(ctx context.Context, wantlist *wantlist.ThreadSafe) { +func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID) error { + message := bsmsg.New() + message.SetFull(true) + for _, wanted := range bs.wantlist.Entries() { + message.AddEntry(wanted.Key, wanted.Priority) + } + return bs.sendWantlistMsgToPeers(ctx, message, peers) +} + +func (bs *bitswap) sendWantlistToProviders(ctx context.Context) { + logd := fmt.Sprintf("%s bitswap.sendWantlistToProviders", bs.self) + log.Debugf("%s begin", logd) + defer log.Debugf("%s end", logd) + ctx, cancel := context.WithCancel(ctx) defer cancel() - message := bsmsg.New() - message.SetFull(true) - for _, e := range bs.wantlist.Entries() { - message.AddEntry(e.Key, e.Priority) - } - - set := pset.New() + // prepare a channel to hand off to sendWantlistToPeers + sendToPeers := make(chan peer.ID) // Get providers for all entries in wantlist (could take a while) wg := sync.WaitGroup{} - for _, e := range wantlist.Entries() { + for _, e := range bs.wantlist.Entries() { wg.Add(1) go func(k u.Key) { defer wg.Done() + + logd := fmt.Sprintf("%s(entry: %s)", logd, k) + log.Debugf("%s asking dht for providers", logd) + child, _ := context.WithTimeout(ctx, providerRequestTimeout) providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) for prov := range providers { - if set.TryAdd(prov) { //Do once per peer - bs.send(ctx, prov, message) - } + log.Debugf("%s dht returned provider %s. send wantlist", logd, prov) + sendToPeers <- prov } }(e.Key) } - wg.Wait() + + go func() { + wg.Wait() // make sure all our children do finish. + close(sendToPeers) + }() + + err := bs.sendWantlistToPeers(ctx, sendToPeers) + if err != nil { + log.Errorf("%s sendWantlistToPeers error: %s", logd, err) + } } func (bs *bitswap) taskWorker(ctx context.Context) { @@ -247,7 +286,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { select { case <-broadcastSignal: // Resend unfulfilled wantlist keys - bs.sendWantlistToProviders(ctx, bs.wantlist) + bs.sendWantlistToProviders(ctx) broadcastSignal = time.After(rebroadcastDelay.Get()) case ks := <-bs.batchRequests: if len(ks) == 0 { @@ -266,7 +305,7 @@ func (bs *bitswap) clientWorker(parent context.Context) { // newer bitswap strategies. child, _ := context.WithTimeout(ctx, providerRequestTimeout) providers := bs.network.FindProvidersAsync(child, ks[0], maxProvidersPerRequest) - err := bs.sendWantListTo(ctx, providers) + err := bs.sendWantlistToPeers(ctx, providers) if err != nil { log.Errorf("error sending wantlist: %s", err) } From 623625845dd7ead2c9ad6bafcc7fb93d9bf772b8 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 04:06:26 -0800 Subject: [PATCH 06/28] prefix logger this commit adds a logger with prefixes --- util/eventlog/log.go | 43 ++++------ util/prefixlog/prefixlog.go | 151 ++++++++++++++++++++++++++++++++++++ 2 files changed, 167 insertions(+), 27 deletions(-) create mode 100644 util/prefixlog/prefixlog.go diff --git a/util/eventlog/log.go b/util/eventlog/log.go index 4f9757bd1..c0a4a8fe3 100644 --- a/util/eventlog/log.go +++ b/util/eventlog/log.go @@ -2,14 +2,19 @@ package eventlog import ( "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - logging "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-logging" - "github.com/jbenet/go-ipfs/util" + + prelog "github.com/jbenet/go-ipfs/util/prefixlog" ) // EventLogger extends the StandardLogger interface to allow for log items // containing structured metadata type EventLogger interface { - StandardLogger + prelog.StandardLogger + + // Prefix is like PrefixLogger.Prefix. We override it here + // because the type changes (we return EventLogger). + // It's what happens when you wrap interfaces. + Prefix(fmt string, args ...interface{}) EventLogger // Event merges structured data from the provided inputs into a single // machine-readable log event. @@ -27,43 +32,27 @@ type EventLogger interface { Event(ctx context.Context, event string, m ...Loggable) } -// StandardLogger provides API compatibility with standard printf loggers -// eg. go-logging -type StandardLogger interface { - Critical(args ...interface{}) - Criticalf(format string, args ...interface{}) - Debug(args ...interface{}) - Debugf(format string, args ...interface{}) - Error(args ...interface{}) - Errorf(format string, args ...interface{}) - Fatal(args ...interface{}) - Fatalf(format string, args ...interface{}) - Info(args ...interface{}) - Infof(format string, args ...interface{}) - Notice(args ...interface{}) - Noticef(format string, args ...interface{}) - Panic(args ...interface{}) - Panicf(format string, args ...interface{}) - Warning(args ...interface{}) - Warningf(format string, args ...interface{}) -} - // Logger retrieves an event logger by name func Logger(system string) EventLogger { // TODO if we would like to adjust log levels at run-time. Store this event // logger in a map (just like the util.Logger impl) - - return &eventLogger{system: system, Logger: util.Logger(system)} + return &eventLogger{system: system, PrefixLogger: prelog.Logger(system)} } // eventLogger implements the EventLogger and wraps a go-logging Logger type eventLogger struct { - *logging.Logger + prelog.PrefixLogger + system string // TODO add log-level } +func (el *eventLogger) Prefix(fmt string, args ...interface{}) EventLogger { + l := el.PrefixLogger.Prefix(fmt, args...) + return &eventLogger{system: el.system, PrefixLogger: l} +} + func (el *eventLogger) Event(ctx context.Context, event string, metadata ...Loggable) { // Collect loggables for later logging diff --git a/util/prefixlog/prefixlog.go b/util/prefixlog/prefixlog.go new file mode 100644 index 000000000..1ab90bad2 --- /dev/null +++ b/util/prefixlog/prefixlog.go @@ -0,0 +1,151 @@ +package eventlog + +import ( + "strings" + + "github.com/jbenet/go-ipfs/util" +) + +// StandardLogger provides API compatibility with standard printf loggers +// eg. go-logging +type StandardLogger interface { + Critical(args ...interface{}) + Criticalf(format string, args ...interface{}) + Debug(args ...interface{}) + Debugf(format string, args ...interface{}) + Error(args ...interface{}) + Errorf(format string, args ...interface{}) + Fatal(args ...interface{}) + Fatalf(format string, args ...interface{}) + Info(args ...interface{}) + Infof(format string, args ...interface{}) + Notice(args ...interface{}) + Noticef(format string, args ...interface{}) + Panic(args ...interface{}) + Panicf(format string, args ...interface{}) + Warning(args ...interface{}) + Warningf(format string, args ...interface{}) +} + +// StandardLogger provides API compatibility with standard printf loggers +// eg. go-logging +type PrefixLogger interface { + StandardLogger + + Format() string + Args() []interface{} + + Prefix(fmt string, args ...interface{}) PrefixLogger +} + +// Logger retrieves an event logger by name +func Logger(system string) PrefixLogger { + + // TODO if we would like to adjust log levels at run-time. Store this event + // logger in a map (just like the util.Logger impl) + + logger := util.Logger(system) + return Prefix(logger, "") +} + +func Prefix(l StandardLogger, format string, args ...interface{}) PrefixLogger { + return &prefixLogger{logger: l, format: format, args: args} +} + +type prefixLogger struct { + logger StandardLogger + format string + args []interface{} +} + +func (pl *prefixLogger) Format() string { + return pl.format +} + +func (pl *prefixLogger) Args() []interface{} { + return pl.args +} + +func (pl *prefixLogger) Prefix(fmt string, args ...interface{}) PrefixLogger { + return Prefix(pl, fmt, args...) +} + +func (pl *prefixLogger) prepend(fmt string, args []interface{}) (string, []interface{}) { + together := make([]interface{}, 0, len(pl.args)+len(args)) + together = append(together, pl.args...) + together = append(together, args...) + if len(pl.format) > 0 { + fmt = pl.format + " " + fmt + } + return fmt, together +} + +func valfmtn(count int) string { + s := strings.Repeat("%v ", count) + s = s[:len(s)-1] // remove last space + return s +} + +type logFunc func(args ...interface{}) +type logFuncf func(fmt string, args ...interface{}) + +func (pl *prefixLogger) logFunc(f logFuncf, args ...interface{}) { + // need to actually use the format version, with extra fmt strings appended + fmt := valfmtn(len(args)) + pl.logFuncf(f, fmt, args...) +} + +func (pl *prefixLogger) logFuncf(f logFuncf, format string, args ...interface{}) { + format, args = pl.prepend(format, args) + f(format, args...) +} + +func (pl *prefixLogger) Critical(args ...interface{}) { + pl.logFunc(pl.logger.Criticalf, args...) +} +func (pl *prefixLogger) Debug(args ...interface{}) { + pl.logFunc(pl.logger.Debugf, args...) +} +func (pl *prefixLogger) Error(args ...interface{}) { + pl.logFunc(pl.logger.Errorf, args...) +} +func (pl *prefixLogger) Fatal(args ...interface{}) { + pl.logFunc(pl.logger.Fatalf, args...) +} +func (pl *prefixLogger) Info(args ...interface{}) { + pl.logFunc(pl.logger.Infof, args...) +} +func (pl *prefixLogger) Notice(args ...interface{}) { + pl.logFunc(pl.logger.Noticef, args...) +} +func (pl *prefixLogger) Panic(args ...interface{}) { + pl.logFunc(pl.logger.Panicf, args...) +} +func (pl *prefixLogger) Warning(args ...interface{}) { + pl.logFunc(pl.logger.Warningf, args...) +} + +func (pl *prefixLogger) Criticalf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Criticalf, format, args...) +} +func (pl *prefixLogger) Debugf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Debugf, format, args...) +} +func (pl *prefixLogger) Errorf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Errorf, format, args...) +} +func (pl *prefixLogger) Fatalf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Fatalf, format, args...) +} +func (pl *prefixLogger) Infof(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Infof, format, args...) +} +func (pl *prefixLogger) Noticef(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Noticef, format, args...) +} +func (pl *prefixLogger) Panicf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Panicf, format, args...) +} +func (pl *prefixLogger) Warningf(format string, args ...interface{}) { + pl.logFuncf(pl.logger.Warningf, format, args...) +} From c100390a49e6508ba7783e3015ced7ede162b858 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 06:14:16 -0800 Subject: [PATCH 07/28] bitswap engine: signal in own func --- exchange/bitswap/decision/engine.go | 15 ++++++++++----- 1 file changed, 10 insertions(+), 5 deletions(-) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 582d96e08..80a6e2fab 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -143,13 +143,10 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { newWorkExists := false defer func() { if newWorkExists { - // Signal task generation to restart (if stopped!) - select { - case e.workSignal <- struct{}{}: - default: - } + e.signalNewWork() } }() + e.lock.Lock() defer e.lock.Unlock() @@ -222,3 +219,11 @@ func (e *Engine) findOrCreate(p peer.ID) *ledger { } return l } + +func (e *Engine) signalNewWork() { + // Signal task generation to restart (if stopped!) + select { + case e.workSignal <- struct{}{}: + default: + } +} From d357b0ac00d447ba80f720ff6e4f4ce501c2f085 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 06:15:50 -0800 Subject: [PATCH 08/28] bitswap debug logging --- exchange/bitswap/bitswap.go | 33 +++++++++++++-------------- exchange/bitswap/decision/engine.go | 11 +++++++-- exchange/bitswap/network/ipfs_impl.go | 1 + routing/dht/dht_net.go | 6 ----- 4 files changed, 26 insertions(+), 25 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 79e5a576c..4ba099860 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -3,7 +3,6 @@ package bitswap import ( - "fmt" "math" "sync" "time" @@ -172,14 +171,14 @@ func (bs *bitswap) HasBlock(ctx context.Context, blk *blocks.Block) error { } func (bs *bitswap) sendWantlistMsgToPeer(ctx context.Context, m bsmsg.BitSwapMessage, p peer.ID) error { - logd := fmt.Sprintf("%s bitswap.sendWantlistMsgToPeer(%d, %s)", bs.self, len(m.Wantlist()), p) + log := log.Prefix("bitswap(%s).bitswap.sendWantlistMsgToPeer(%d, %s)", bs.self, len(m.Wantlist()), p) - log.Debugf("%s sending wantlist", logd) + log.Debug("sending wantlist") if err := bs.send(ctx, p, m); err != nil { - log.Errorf("%s send wantlist error: %s", logd, err) + log.Errorf("send wantlist error: %s", err) return err } - log.Debugf("%s send wantlist success", logd) + log.Debugf("send wantlist success") return nil } @@ -188,20 +187,20 @@ func (bs *bitswap) sendWantlistMsgToPeers(ctx context.Context, m bsmsg.BitSwapMe panic("Cant send wantlist to nil peerchan") } - logd := fmt.Sprintf("%s bitswap.sendWantlistMsgTo(%d)", bs.self, len(m.Wantlist())) - log.Debugf("%s begin", logd) - defer log.Debugf("%s end", logd) + log := log.Prefix("bitswap(%s).sendWantlistMsgToPeers(%d)", bs.self, len(m.Wantlist())) + log.Debugf("begin") + defer log.Debugf("end") set := pset.New() wg := sync.WaitGroup{} for peerToQuery := range peers { log.Event(ctx, "PeerToQuery", peerToQuery) - logd := fmt.Sprintf("%sto(%s)", logd, peerToQuery) if !set.TryAdd(peerToQuery) { //Do once per peer - log.Debugf("%s skipped (already sent)", logd) + log.Debugf("%s skipped (already sent)", peerToQuery) continue } + log.Debugf("%s sending", peerToQuery) wg.Add(1) go func(p peer.ID) { @@ -223,9 +222,9 @@ func (bs *bitswap) sendWantlistToPeers(ctx context.Context, peers <-chan peer.ID } func (bs *bitswap) sendWantlistToProviders(ctx context.Context) { - logd := fmt.Sprintf("%s bitswap.sendWantlistToProviders", bs.self) - log.Debugf("%s begin", logd) - defer log.Debugf("%s end", logd) + log := log.Prefix("bitswap(%s).sendWantlistToProviders ", bs.self) + log.Debugf("begin") + defer log.Debugf("end") ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -240,13 +239,13 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context) { go func(k u.Key) { defer wg.Done() - logd := fmt.Sprintf("%s(entry: %s)", logd, k) - log.Debugf("%s asking dht for providers", logd) + log := log.Prefix("(entry: %s) ", k) + log.Debug("asking dht for providers") child, _ := context.WithTimeout(ctx, providerRequestTimeout) providers := bs.network.FindProvidersAsync(child, k, maxProvidersPerRequest) for prov := range providers { - log.Debugf("%s dht returned provider %s. send wantlist", logd, prov) + log.Debugf("dht returned provider %s. send wantlist", prov) sendToPeers <- prov } }(e.Key) @@ -259,7 +258,7 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context) { err := bs.sendWantlistToPeers(ctx, sendToPeers) if err != nil { - log.Errorf("%s sendWantlistToPeers error: %s", logd, err) + log.Errorf("sendWantlistToPeers error: %s", err) } } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index 80a6e2fab..cd3ebac31 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -8,7 +8,7 @@ import ( bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" wl "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" peer "github.com/jbenet/go-ipfs/p2p/peer" - u "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) // TODO consider taking responsibility for other types of requests. For @@ -41,7 +41,7 @@ import ( // whatever it sees fit to produce desired outcomes (get wanted keys // quickly, maintain good relationships with peers, etc). -var log = u.Logger("engine") +var log = eventlog.Logger("engine") const ( sizeOutboxChan = 4 @@ -140,6 +140,10 @@ func (e *Engine) Peers() []peer.ID { // MessageReceived performs book-keeping. Returns error if passed invalid // arguments. func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { + log := log.Prefix("Engine.MessageReceived(%s)", p) + log.Debugf("enter") + defer log.Debugf("exit") + newWorkExists := false defer func() { if newWorkExists { @@ -156,9 +160,11 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { } for _, entry := range m.Wantlist() { if entry.Cancel { + log.Debug("cancel", entry.Key) l.CancelWant(entry.Key) e.peerRequestQueue.Remove(entry.Key, p) } else { + log.Debug("wants", entry.Key, entry.Priority) l.Wants(entry.Key, entry.Priority) if exists, err := e.bs.Has(entry.Key); err == nil && exists { newWorkExists = true @@ -169,6 +175,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { for _, block := range m.Blocks() { // FIXME extract blocks.NumBytes(block) or block.NumBytes() method + log.Debug("got block %s %d bytes", block.Key(), len(block.Data)) l.ReceivedBytes(len(block.Data)) for _, l := range e.ledgerMap { if l.WantListContains(block.Key()) { diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 4e349dbed..c2a87ce0a 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -55,6 +55,7 @@ func (bsnet *impl) SendRequest( p peer.ID, outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) { + log.Debugf("bsnet SendRequest to %s", p) s, err := bsnet.host.NewStream(ProtocolBitswap, p) if err != nil { return nil, err diff --git a/routing/dht/dht_net.go b/routing/dht/dht_net.go index fd088e02c..2b857ce2b 100644 --- a/routing/dht/dht_net.go +++ b/routing/dht/dht_net.go @@ -87,15 +87,11 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message start := time.Now() - log.Debugf("%s writing", dht.self) if err := w.WriteMsg(pmes); err != nil { return nil, err } log.Event(ctx, "dhtSentMessage", dht.self, p, pmes) - log.Debugf("%s reading", dht.self) - defer log.Debugf("%s done", dht.self) - rpmes := new(pb.Message) if err := r.ReadMsg(rpmes); err != nil { return nil, err @@ -125,12 +121,10 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func w := ggio.NewDelimitedWriter(cw) - log.Debugf("%s writing", dht.self) if err := w.WriteMsg(pmes); err != nil { return err } log.Event(ctx, "dhtSentMessage", dht.self, p, pmes) - log.Debugf("%s done", dht.self) return nil } From 71ada4696fe16c3923fb7dae7bf31e6eda6bcf13 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 06:16:09 -0800 Subject: [PATCH 09/28] bitswap net: always close --- exchange/bitswap/network/ipfs_impl.go | 23 ++++++++++------------- 1 file changed, 10 insertions(+), 13 deletions(-) diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index c2a87ce0a..2f3fe950b 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -97,23 +97,20 @@ func (bsnet *impl) Provide(ctx context.Context, k util.Key) error { // handleNewStream receives a new stream from the network. func (bsnet *impl) handleNewStream(s inet.Stream) { + defer s.Close() if bsnet.receiver == nil { return } - go func() { - defer s.Close() - - received, err := bsmsg.FromNet(s) - if err != nil { - go bsnet.receiver.ReceiveError(err) - return - } - - p := s.Conn().RemotePeer() - ctx := context.Background() - bsnet.receiver.ReceiveMessage(ctx, p, received) - }() + received, err := bsmsg.FromNet(s) + if err != nil { + go bsnet.receiver.ReceiveError(err) + return + } + p := s.Conn().RemotePeer() + ctx := context.Background() + log.Debugf("bsnet handleNewStream from %s", s.Conn().RemotePeer()) + bsnet.receiver.ReceiveMessage(ctx, p, received) } From bb8886f3afdd5afc7a96f9420ef09b9afed7468c Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 06:16:19 -0800 Subject: [PATCH 10/28] merkledag: LONG timeout on Get we shouldn't use an arbitrary timeout here. since Get doesnt take in a context yet, we give a large upper bound. think of an http request. we want it to go on as long as the client requests it. --- merkledag/merkledag.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/merkledag/merkledag.go b/merkledag/merkledag.go index 9a638ca2a..c9ea00ad2 100644 --- a/merkledag/merkledag.go +++ b/merkledag/merkledag.go @@ -229,7 +229,11 @@ func (n *dagService) Get(k u.Key) (*Node, error) { return nil, fmt.Errorf("dagService is nil") } - ctx, _ := context.WithTimeout(context.TODO(), time.Second*5) + ctx, _ := context.WithTimeout(context.TODO(), time.Minute) + // we shouldn't use an arbitrary timeout here. + // since Get doesnt take in a context yet, we give a large upper bound. + // think of an http request. we want it to go on as long as the client requests it. + b, err := n.Blocks.GetBlock(ctx, k) if err != nil { return nil, err From 9c6228d18f4c0a0a34872097206f80f5753c34a3 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 08:54:36 -0800 Subject: [PATCH 11/28] bitswap and dht: lots of debugging logs --- exchange/bitswap/bitswap.go | 7 ++++++ exchange/bitswap/decision/engine.go | 10 +++++++- exchange/bitswap/decision/taskqueue.go | 5 ++++ exchange/bitswap/network/ipfs_impl.go | 35 ++++++++++++++++++++++---- routing/dht/handlers.go | 2 +- 5 files changed, 52 insertions(+), 7 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index 4ba099860..bdc17ff96 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -108,6 +108,7 @@ type bitswap struct { // GetBlock attempts to retrieve a particular block from peers within the // deadline enforced by the context. func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, error) { + log := log.Prefix("bitswap(%s).GetBlock(%s)", bs.self, k) // Any async work initiated by this function must end when this function // returns. To ensure this, derive a new context. Note that it is okay to @@ -120,10 +121,12 @@ func (bs *bitswap) GetBlock(parent context.Context, k u.Key) (*blocks.Block, err ctx = eventlog.ContextWithLoggable(ctx, eventlog.Uuid("GetBlockRequest")) log.Event(ctx, "GetBlockRequestBegin", &k) + log.Debugf("GetBlockRequestBegin") defer func() { cancelFunc() log.Event(ctx, "GetBlockRequestEnd", &k) + log.Debugf("GetBlockRequestEnd") }() promise, err := bs.GetBlocks(ctx, []u.Key{k}) @@ -263,12 +266,16 @@ func (bs *bitswap) sendWantlistToProviders(ctx context.Context) { } func (bs *bitswap) taskWorker(ctx context.Context) { + log := log.Prefix("bitswap(%s).taskWorker", bs.self) for { select { case <-ctx.Done(): + log.Debugf("exiting") return case envelope := <-bs.engine.Outbox(): + log.Debugf("message to %s sending...", envelope.Peer) bs.send(ctx, envelope.Peer, envelope.Message) + log.Debugf("message to %s sent", envelope.Peer) } } } diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index cd3ebac31..b2e20bf8e 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -91,6 +91,7 @@ func NewEngine(ctx context.Context, bs bstore.Blockstore) *Engine { } func (e *Engine) taskWorker(ctx context.Context) { + log := log.Prefix("bitswap.Engine.taskWorker") for { nextTask := e.peerRequestQueue.Pop() if nextTask == nil { @@ -98,11 +99,16 @@ func (e *Engine) taskWorker(ctx context.Context) { // Wait until there are! select { case <-ctx.Done(): + log.Debugf("exiting: %s", ctx.Err()) return case <-e.workSignal: + log.Debugf("woken up") } continue } + log := log.Prefix("%s", nextTask) + log.Debugf("processing") + block, err := e.bs.Get(nextTask.Entry.Key) if err != nil { log.Warning("engine: task exists to send block, but block is not in blockstore") @@ -113,10 +119,12 @@ func (e *Engine) taskWorker(ctx context.Context) { m := bsmsg.New() m.AddBlock(block) // TODO: maybe add keys from our wantlist? + log.Debugf("sending...") select { case <-ctx.Done(): return case e.outbox <- Envelope{Peer: nextTask.Target, Message: m}: + log.Debugf("sent") } } } @@ -140,7 +148,7 @@ func (e *Engine) Peers() []peer.ID { // MessageReceived performs book-keeping. Returns error if passed invalid // arguments. func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { - log := log.Prefix("Engine.MessageReceived(%s)", p) + log := log.Prefix("bitswap.Engine.MessageReceived(%s)", p) log.Debugf("enter") defer log.Debugf("exit") diff --git a/exchange/bitswap/decision/taskqueue.go b/exchange/bitswap/decision/taskqueue.go index 11af3db35..659e287d0 100644 --- a/exchange/bitswap/decision/taskqueue.go +++ b/exchange/bitswap/decision/taskqueue.go @@ -1,6 +1,7 @@ package decision import ( + "fmt" "sync" wantlist "github.com/jbenet/go-ipfs/exchange/bitswap/wantlist" @@ -30,6 +31,10 @@ type task struct { Trash bool } +func (t *task) String() string { + return fmt.Sprintf("", t.Target, t.Entry.Key, t.Trash) +} + // Push currently adds a new task to the end of the list func (tl *taskQueue) Push(entry wantlist.Entry, to peer.ID) { tl.lock.Lock() diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 2f3fe950b..0950ed0b8 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -2,15 +2,17 @@ package network import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" host "github.com/jbenet/go-ipfs/p2p/host" inet "github.com/jbenet/go-ipfs/p2p/net" peer "github.com/jbenet/go-ipfs/p2p/peer" routing "github.com/jbenet/go-ipfs/routing" util "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) -var log = util.Logger("bitswap_network") +var log = eventlog.Logger("bitswap_network") // NewFromIpfsHost returns a BitSwapNetwork supported by underlying IPFS host func NewFromIpfsHost(host host.Host, r routing.IpfsRouting) BitSwapNetwork { @@ -41,13 +43,23 @@ func (bsnet *impl) SendMessage( p peer.ID, outgoing bsmsg.BitSwapMessage) error { + log := log.Prefix("bitswap net SendMessage to %s", p) + + log.Debug("opening stream") s, err := bsnet.host.NewStream(ProtocolBitswap, p) if err != nil { return err } defer s.Close() - return outgoing.ToNet(s) + log.Debug("sending") + if err := outgoing.ToNet(s); err != nil { + log.Errorf("error: %s", err) + return err + } + + log.Debug("sent") + return err } func (bsnet *impl) SendRequest( @@ -55,18 +67,30 @@ func (bsnet *impl) SendRequest( p peer.ID, outgoing bsmsg.BitSwapMessage) (bsmsg.BitSwapMessage, error) { - log.Debugf("bsnet SendRequest to %s", p) + log := log.Prefix("bitswap net SendRequest to %s", p) + + log.Debug("opening stream") s, err := bsnet.host.NewStream(ProtocolBitswap, p) if err != nil { return nil, err } defer s.Close() + log.Debug("sending") if err := outgoing.ToNet(s); err != nil { + log.Errorf("error: %s", err) return nil, err } - return bsmsg.FromNet(s) + log.Debug("sent, now receiveing") + incoming, err := bsmsg.FromNet(s) + if err != nil { + log.Errorf("error: %s", err) + return incoming, err + } + + log.Debug("received") + return incoming, nil } func (bsnet *impl) SetDelegate(r Receiver) { @@ -106,11 +130,12 @@ func (bsnet *impl) handleNewStream(s inet.Stream) { received, err := bsmsg.FromNet(s) if err != nil { go bsnet.receiver.ReceiveError(err) + log.Errorf("bitswap net handleNewStream from %s error: %s", s.Conn().RemotePeer(), err) return } p := s.Conn().RemotePeer() ctx := context.Background() - log.Debugf("bsnet handleNewStream from %s", s.Conn().RemotePeer()) + log.Debugf("bitswap net handleNewStream from %s", s.Conn().RemotePeer()) bsnet.receiver.ReceiveMessage(ctx, p, received) } diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 491176550..8f66afbf6 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -148,7 +148,7 @@ func (dht *IpfsDHT) handleFindPeer(ctx context.Context, p peer.ID, pmes *pb.Mess } if closest == nil { - log.Warningf("handleFindPeer: could not find anything.") + log.Warningf("%s handleFindPeer %s: could not find anything.", dht.self, p) return resp, nil } From 64cb32dadd65c620f4d34b314d00c91f7a7529c8 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sat, 3 Jan 2015 10:22:50 -0800 Subject: [PATCH 12/28] disable utp utp is BROKEN!! it causes tests to fail. --- cmd/ipfs/init.go | 2 +- dockertest/bootstrap/config | 3 +-- dockertest/client/config | 3 +-- dockertest/server/config | 3 +-- 4 files changed, 4 insertions(+), 7 deletions(-) diff --git a/cmd/ipfs/init.go b/cmd/ipfs/init.go index 5d2c3e773..fc0654d7b 100644 --- a/cmd/ipfs/init.go +++ b/cmd/ipfs/init.go @@ -186,7 +186,7 @@ func initConfig(configFilename string, dspathOverride string, nBitsForKeypair in Addresses: config.Addresses{ Swarm: []string{ "/ip4/0.0.0.0/tcp/4001", - "/ip4/0.0.0.0/udp/4002/utp", + // "/ip4/0.0.0.0/udp/4002/utp", // disabled for now. }, API: "/ip4/127.0.0.1/tcp/5001", }, diff --git a/dockertest/bootstrap/config b/dockertest/bootstrap/config index b9fac5503..58ba1abfa 100644 --- a/dockertest/bootstrap/config +++ b/dockertest/bootstrap/config @@ -9,8 +9,7 @@ }, "Addresses": { "Swarm": [ - "/ip4/0.0.0.0/tcp/4011", - "/ip4/0.0.0.0/udp/4012/utp" + "/ip4/0.0.0.0/tcp/4011" ], "API": "/ip4/127.0.0.1/tcp/5001" }, diff --git a/dockertest/client/config b/dockertest/client/config index afb9f8d77..f3d5e9f65 100644 --- a/dockertest/client/config +++ b/dockertest/client/config @@ -2,8 +2,7 @@ "Addresses": { "API": "/ip4/127.0.0.1/tcp/5001", "Swarm": [ - "/ip4/0.0.0.0/tcp/4031", - "/ip4/0.0.0.0/udp/4032/utp" + "/ip4/0.0.0.0/tcp/4031" ] }, "Bootstrap": [ diff --git a/dockertest/server/config b/dockertest/server/config index 9d04b820b..59c7fecdf 100644 --- a/dockertest/server/config +++ b/dockertest/server/config @@ -2,8 +2,7 @@ "Addresses": { "API": "/ip4/127.0.0.1/tcp/5001", "Swarm": [ - "/ip4/0.0.0.0/tcp/4021", - "/ip4/0.0.0.0/udp/4022/utp" + "/ip4/0.0.0.0/tcp/4021" ] }, "Bootstrap": [ From 05d8c80c394353a29f4caadcfa11b01fd43d3c9d Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Sat, 3 Jan 2015 17:15:05 -0500 Subject: [PATCH 13/28] fix(bitswap/network): return when context is done @jbenet @whyrusleeping This bug (missing return) could tie up the client worker and cause operations to come to a halt. --- exchange/bitswap/network/ipfs_impl.go | 1 + 1 file changed, 1 insertion(+) diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 0950ed0b8..841688162 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -107,6 +107,7 @@ func (bsnet *impl) FindProvidersAsync(ctx context.Context, k util.Key, max int) bsnet.host.Peerstore().AddAddresses(info.ID, info.Addrs) select { case <-ctx.Done(): + return case out <- info.ID: } } From a8127a28e12e459571cfc4725cddfd2e96a85535 Mon Sep 17 00:00:00 2001 From: Brian Tiger Chow Date: Sun, 4 Jan 2015 11:40:43 -0500 Subject: [PATCH 14/28] fix: force clean test results directory Without `-f`, `make clean` fails on machines that don't have the dir. cc @jbenet --- test/Makefile | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/test/Makefile b/test/Makefile index 4a62d6e1a..8cd48719d 100644 --- a/test/Makefile +++ b/test/Makefile @@ -14,7 +14,7 @@ all: clean deps $(T) aggregate clean: @echo "*** $@ ***" - -rm -r test-results + -rm -rf test-results $(T): @echo "*** $@ ***" From b599e28b88f84645644ea06ef7e154ce69287c64 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 4 Jan 2015 16:05:15 -0800 Subject: [PATCH 15/28] core/mock: use mock routing this fixes a failing ipns test which didnt have a "working" routing system --- core/mock.go | 24 +++++++++--------------- 1 file changed, 9 insertions(+), 15 deletions(-) diff --git a/core/mock.go b/core/mock.go index b448b2517..e4c15540b 100644 --- a/core/mock.go +++ b/core/mock.go @@ -10,11 +10,10 @@ import ( "github.com/jbenet/go-ipfs/exchange/offline" mdag "github.com/jbenet/go-ipfs/merkledag" nsys "github.com/jbenet/go-ipfs/namesys" - ci "github.com/jbenet/go-ipfs/p2p/crypto" mocknet "github.com/jbenet/go-ipfs/p2p/net/mock" peer "github.com/jbenet/go-ipfs/p2p/peer" path "github.com/jbenet/go-ipfs/path" - dht "github.com/jbenet/go-ipfs/routing/dht" + mockrouting "github.com/jbenet/go-ipfs/routing/mock" ds2 "github.com/jbenet/go-ipfs/util/datastore2" testutil "github.com/jbenet/go-ipfs/util/testutil" ) @@ -29,23 +28,19 @@ func NewMockNode() (*IpfsNode, error) { nd := new(IpfsNode) // Generate Identity - sk, pk, err := ci.GenerateKeyPair(ci.RSA, 1024) - if err != nil { - return nil, err - } - - p, err := peer.IDFromPublicKey(pk) + ident, err := testutil.RandIdentity() if err != nil { return nil, err } + p := ident.ID() nd.Identity = p - nd.PrivateKey = sk + nd.PrivateKey = ident.PrivateKey() nd.Peerstore = peer.NewPeerstore() - nd.Peerstore.AddPrivKey(p, sk) - nd.Peerstore.AddPubKey(p, pk) + nd.Peerstore.AddPrivKey(p, ident.PrivateKey()) + nd.Peerstore.AddPubKey(p, ident.PublicKey()) - nd.PeerHost, err = mocknet.New(ctx).AddPeer(sk, testutil.RandLocalTCPAddress()) // effectively offline + nd.PeerHost, err = mocknet.New(ctx).AddPeer(ident.PrivateKey(), ident.Address()) // effectively offline if err != nil { return nil, err } @@ -55,8 +50,7 @@ func NewMockNode() (*IpfsNode, error) { nd.Datastore = ds2.CloserWrap(syncds.MutexWrap(dstore)) // Routing - dht := dht.NewDHT(ctx, nd.PeerHost, nd.Datastore) - nd.Routing = dht + nd.Routing = mockrouting.NewServer().Client(ident) // Bitswap bstore := blockstore.NewBlockstore(nd.Datastore) @@ -68,7 +62,7 @@ func NewMockNode() (*IpfsNode, error) { nd.DAG = mdag.NewDAGService(bserv) // Namespace resolver - nd.Namesys = nsys.NewNameSystem(dht) + nd.Namesys = nsys.NewNameSystem(nd.Routing) // Path resolver nd.Resolver = &path.Resolver{DAG: nd.DAG} From b4be7c5986864b6cfdc3f8bb9585285701700252 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 4 Jan 2015 18:18:16 -0800 Subject: [PATCH 16/28] dht: extend duration of TestGetFailures TestGetFailures may just be operating very slowly, instead of completely failing. Right now it gets caught on travis often. not sure if its actually wrong. --- routing/dht/ext_test.go | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 2be8127c7..3da0dfa3a 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -47,7 +47,7 @@ func TestGetFailures(t *testing.T) { // This one should time out // u.POut("Timout Test\n") - ctx1, _ := context.WithTimeout(context.Background(), time.Second) + ctx1, _ := context.WithTimeout(context.Background(), 200*time.Millisecond) if _, err := d.GetValue(ctx1, u.Key("test")); err != nil { if err != context.DeadlineExceeded { t.Fatal("Got different error than we expected", err) @@ -78,8 +78,12 @@ func TestGetFailures(t *testing.T) { } }) - // This one should fail with NotFound - ctx2, _ := context.WithTimeout(context.Background(), 3*time.Second) + // This one should fail with NotFound. + // long context timeout to ensure we dont end too early. + // the dht should be exhausting its query and returning not found. + // (was 3 seconds before which should be _plenty_ of time, but maybe + // travis machines really have a hard time...) + ctx2, _ := context.WithTimeout(context.Background(), 20*time.Second) _, err = d.GetValue(ctx2, u.Key("test")) if err != nil { if err != routing.ErrNotFound { @@ -187,7 +191,8 @@ func TestNotFound(t *testing.T) { }) } - ctx, _ = context.WithTimeout(ctx, time.Second*5) + // long timeout to ensure timing is not at play. + ctx, _ = context.WithTimeout(ctx, time.Second*20) v, err := d.GetValue(ctx, u.Key("hello")) log.Debugf("get value got %v", v) if err != nil { From ce367ee76ed585cc216791848c8aeab9fa82f503 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 5 Jan 2015 04:36:27 -0800 Subject: [PATCH 17/28] ext_test: bitten by mocknet ordering mocknet indeterminism screwed this test up. that's twice it's bitten us. let's not let it do it a third time. cc @briantigerchow omg. --- routing/dht/ext_test.go | 15 ++++++--------- 1 file changed, 6 insertions(+), 9 deletions(-) diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 3da0dfa3a..77ea54c96 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -32,11 +32,10 @@ func TestGetFailures(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - peers := mn.Peers() tsds := dssync.MutexWrap(ds.NewMapDatastore()) d := NewDHT(ctx, hosts[0], tsds) - d.Update(ctx, peers[1]) + d.Update(ctx, hosts[1].ID()) // u.POut("NotFound Test\n") // Reply with failures to every message @@ -147,12 +146,11 @@ func TestNotFound(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - peers := mn.Peers() tsds := dssync.MutexWrap(ds.NewMapDatastore()) d := NewDHT(ctx, hosts[0], tsds) - for _, p := range peers { - d.Update(ctx, p) + for _, p := range hosts { + d.Update(ctx, p.ID()) } // Reply with random peers to every message @@ -175,7 +173,7 @@ func TestNotFound(t *testing.T) { ps := []peer.PeerInfo{} for i := 0; i < 7; i++ { - p := peers[rand.Intn(len(peers))] + p := hosts[rand.Intn(len(hosts))].ID() pi := host.Peerstore().PeerInfo(p) ps = append(ps, pi) } @@ -220,13 +218,12 @@ func TestLessThanKResponses(t *testing.T) { t.Fatal(err) } hosts := mn.Hosts() - peers := mn.Peers() tsds := dssync.MutexWrap(ds.NewMapDatastore()) d := NewDHT(ctx, hosts[0], tsds) for i := 1; i < 5; i++ { - d.Update(ctx, peers[i]) + d.Update(ctx, hosts[i].ID()) } // Reply with random peers to every message @@ -245,7 +242,7 @@ func TestLessThanKResponses(t *testing.T) { switch pmes.GetType() { case pb.Message_GET_VALUE: - pi := host.Peerstore().PeerInfo(peers[1]) + pi := host.Peerstore().PeerInfo(hosts[1].ID()) resp := &pb.Message{ Type: pmes.Type, CloserPeers: pb.PeerInfosToPBPeers(d.host.Network(), []peer.PeerInfo{pi}), From 9d0736bc3b546f2fcf63ada12bb0ced14c288f8e Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 5 Jan 2015 04:30:22 -0800 Subject: [PATCH 18/28] peer+mocknet: sorting for determinism. --- p2p/net/mock/mock_net.go | 19 +++++++++++++++++++ p2p/peer/peer.go | 7 +++++++ 2 files changed, 26 insertions(+) diff --git a/p2p/net/mock/mock_net.go b/p2p/net/mock/mock_net.go index d380e5173..1bb9ddf5a 100644 --- a/p2p/net/mock/mock_net.go +++ b/p2p/net/mock/mock_net.go @@ -2,6 +2,7 @@ package mocknet import ( "fmt" + "sort" "sync" "time" @@ -90,6 +91,7 @@ func (mn *mocknet) Peers() []peer.ID { for _, n := range mn.nets { cp = append(cp, n.peer) } + sort.Sort(peer.IDSlice(cp)) return cp } @@ -115,6 +117,8 @@ func (mn *mocknet) Hosts() []host.Host { for _, h := range mn.hosts { cp = append(cp, h) } + + sort.Sort(hostSlice(cp)) return cp } @@ -126,6 +130,7 @@ func (mn *mocknet) Nets() []inet.Network { for _, n := range mn.nets { cp = append(cp, n) } + sort.Sort(netSlice(cp)) return cp } @@ -339,3 +344,17 @@ func (mn *mocknet) LinkDefaults() LinkOptions { defer mn.RUnlock() return mn.linkDefaults } + +// netSlice for sorting by peer +type netSlice []inet.Network + +func (es netSlice) Len() int { return len(es) } +func (es netSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } +func (es netSlice) Less(i, j int) bool { return string(es[i].LocalPeer()) < string(es[j].LocalPeer()) } + +// hostSlice for sorting by peer +type hostSlice []host.Host + +func (es hostSlice) Len() int { return len(es) } +func (es hostSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } +func (es hostSlice) Less(i, j int) bool { return string(es[i].ID()) < string(es[j].ID()) } diff --git a/p2p/peer/peer.go b/p2p/peer/peer.go index fa4e448d6..c29e23283 100644 --- a/p2p/peer/peer.go +++ b/p2p/peer/peer.go @@ -130,3 +130,10 @@ type PeerInfo struct { ID ID Addrs []ma.Multiaddr } + +// IDSlice for sorting peers +type IDSlice []ID + +func (es IDSlice) Len() int { return len(es) } +func (es IDSlice) Swap(i, j int) { es[i], es[j] = es[j], es[i] } +func (es IDSlice) Less(i, j int) bool { return string(es[i]) < string(es[j]) } From 032b35b2ac070663736de52e5c1f0fe1ecba6efc Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 5 Jan 2015 04:48:50 -0800 Subject: [PATCH 19/28] dht: key without record validator func This is causing test failures because tests don't usually have "/-/-" format. we can decide whether or not to allow keys without validators, but for now removing. cc @whyrusleeping --- routing/dht/records.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/routing/dht/records.go b/routing/dht/records.go index 0791f80a3..083eeb26e 100644 --- a/routing/dht/records.go +++ b/routing/dht/records.go @@ -191,8 +191,8 @@ func (dht *IpfsDHT) verifyRecord(r *pb.Record, pk ci.PubKey) error { // Now, check validity func parts := strings.Split(r.GetKey(), "/") if len(parts) < 3 { - log.Errorf("Record had bad key: %s", u.Key(r.GetKey())) - return ErrBadRecord + log.Infof("Record key does not have validator: %s", u.Key(r.GetKey())) + return nil } fnc, ok := dht.Validators[parts[1]] From f6def11eb3f5c6388fe5a030645f0c64d20dffb2 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 4 Jan 2015 13:24:14 -0800 Subject: [PATCH 20/28] `make test` now runs expensive tests. We use make test as the measure of correctness. This laxity has let bugs creep into several systems. This commit changes our target to always run expensive tests, unless one specifically runs `make test_short` (we would do well to remove most if not all timing-- that's usually what makes tests take a long time.) --- Makefile | 11 ++++++++--- 1 file changed, 8 insertions(+), 3 deletions(-) diff --git a/Makefile b/Makefile index a51da1a15..1376710df 100644 --- a/Makefile +++ b/Makefile @@ -13,20 +13,25 @@ vendor: godep install: cd cmd/ipfs && go install -test: test_go test_sharness +############################################################## +# tests targets + +test: test_expensive + +test_short: test_go_short test_sharness_short test_expensive: test_go_expensive test_sharness_expensive test_docker: cd dockertest/ && make -test_go: +test_go_short: go test -test.short ./... test_go_expensive: go test ./... -test_sharness: +test_sharness_short: cd test/ && make test_sharness_expensive: From aeb2e07198674b56ae892d8873702a55bee7bc77 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 4 Jan 2015 13:56:38 -0800 Subject: [PATCH 21/28] bitswap: remove DialPeer from interface Bitswap doesn't usually care about dialing. the underlying network adapter can make sure of that. --- exchange/bitswap/bitswap.go | 5 ----- exchange/bitswap/network/interface.go | 3 --- exchange/bitswap/network/ipfs_impl.go | 16 ++++++++++++---- exchange/bitswap/testnet/virtual.go | 9 --------- 4 files changed, 12 insertions(+), 21 deletions(-) diff --git a/exchange/bitswap/bitswap.go b/exchange/bitswap/bitswap.go index bdc17ff96..a883e4b03 100644 --- a/exchange/bitswap/bitswap.go +++ b/exchange/bitswap/bitswap.go @@ -385,11 +385,6 @@ func (bs *bitswap) ReceiveError(err error) { // send strives to ensure that accounting is always performed when a message is // sent func (bs *bitswap) send(ctx context.Context, p peer.ID, m bsmsg.BitSwapMessage) error { - log.Event(ctx, "DialPeer", p) - err := bs.network.DialPeer(ctx, p) - if err != nil { - return errors.Wrap(err) - } if err := bs.network.SendMessage(ctx, p, m); err != nil { return errors.Wrap(err) } diff --git a/exchange/bitswap/network/interface.go b/exchange/bitswap/network/interface.go index 7c34a352b..18bb1df83 100644 --- a/exchange/bitswap/network/interface.go +++ b/exchange/bitswap/network/interface.go @@ -14,9 +14,6 @@ var ProtocolBitswap protocol.ID = "/ipfs/bitswap" // BitSwapNetwork provides network connectivity for BitSwap sessions type BitSwapNetwork interface { - // DialPeer ensures there is a connection to peer. - DialPeer(context.Context, peer.ID) error - // SendMessage sends a BitSwap message to a peer. SendMessage( context.Context, diff --git a/exchange/bitswap/network/ipfs_impl.go b/exchange/bitswap/network/ipfs_impl.go index 841688162..ea98cc87f 100644 --- a/exchange/bitswap/network/ipfs_impl.go +++ b/exchange/bitswap/network/ipfs_impl.go @@ -34,10 +34,6 @@ type impl struct { receiver Receiver } -func (bsnet *impl) DialPeer(ctx context.Context, p peer.ID) error { - return bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}) -} - func (bsnet *impl) SendMessage( ctx context.Context, p peer.ID, @@ -45,6 +41,12 @@ func (bsnet *impl) SendMessage( log := log.Prefix("bitswap net SendMessage to %s", p) + // ensure we're connected + //TODO(jbenet) move this into host.NewStream? + if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil { + return err + } + log.Debug("opening stream") s, err := bsnet.host.NewStream(ProtocolBitswap, p) if err != nil { @@ -69,6 +71,12 @@ func (bsnet *impl) SendRequest( log := log.Prefix("bitswap net SendRequest to %s", p) + // ensure we're connected + //TODO(jbenet) move this into host.NewStream? + if err := bsnet.host.Connect(ctx, peer.PeerInfo{ID: p}); err != nil { + return nil, err + } + log.Debug("opening stream") s, err := bsnet.host.NewStream(ProtocolBitswap, p) if err != nil { diff --git a/exchange/bitswap/testnet/virtual.go b/exchange/bitswap/testnet/virtual.go index 9426176a2..639bb00d3 100644 --- a/exchange/bitswap/testnet/virtual.go +++ b/exchange/bitswap/testnet/virtual.go @@ -2,7 +2,6 @@ package bitswap import ( "errors" - "fmt" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" bsmsg "github.com/jbenet/go-ipfs/exchange/bitswap/message" @@ -178,14 +177,6 @@ func (nc *networkClient) Provide(ctx context.Context, k util.Key) error { return nc.routing.Provide(ctx, k) } -func (nc *networkClient) DialPeer(ctx context.Context, p peer.ID) error { - // no need to do anything because dialing isn't a thing in this test net. - if !nc.network.HasPeer(p) { - return fmt.Errorf("Peer not in network: %s", p) - } - return nil -} - func (nc *networkClient) SetDelegate(r bsnet.Receiver) { nc.Receiver = r } From 10072c149dbdf6aafc345b9232e67da457074aab Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 4 Jan 2015 14:06:33 -0800 Subject: [PATCH 22/28] bitswap: log superfluous messages --- exchange/bitswap/decision/engine.go | 7 ++++++- 1 file changed, 6 insertions(+), 1 deletion(-) diff --git a/exchange/bitswap/decision/engine.go b/exchange/bitswap/decision/engine.go index b2e20bf8e..e4b2ab832 100644 --- a/exchange/bitswap/decision/engine.go +++ b/exchange/bitswap/decision/engine.go @@ -149,9 +149,13 @@ func (e *Engine) Peers() []peer.ID { // arguments. func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { log := log.Prefix("bitswap.Engine.MessageReceived(%s)", p) - log.Debugf("enter") + log.Debugf("enter. %d entries %d blocks", len(m.Wantlist()), len(m.Blocks())) defer log.Debugf("exit") + if len(m.Wantlist()) == 0 && len(m.Blocks()) == 0 { + log.Info("superfluous message") + } + newWorkExists := false defer func() { if newWorkExists { @@ -166,6 +170,7 @@ func (e *Engine) MessageReceived(p peer.ID, m bsmsg.BitSwapMessage) error { if m.Full() { l.wantList = wl.New() } + for _, entry := range m.Wantlist() { if entry.Cancel { log.Debug("cancel", entry.Key) From 09a2e1f64cc8a70f8affe00e3703047194f96776 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Sun, 4 Jan 2015 14:06:53 -0800 Subject: [PATCH 23/28] testutil: obvious names for seeded key pairs --- namesys/resolve_test.go | 2 +- p2p/crypto/key_test.go | 4 ++-- p2p/net/mock/mock_net.go | 2 +- p2p/net/mock/mock_test.go | 8 ++++---- p2p/peer/peer_test.go | 2 +- util/testutil/gen.go | 6 +++--- 6 files changed, 12 insertions(+), 12 deletions(-) diff --git a/namesys/resolve_test.go b/namesys/resolve_test.go index 35851fc32..8e3214dfe 100644 --- a/namesys/resolve_test.go +++ b/namesys/resolve_test.go @@ -14,7 +14,7 @@ func TestRoutingResolve(t *testing.T) { resolver := NewRoutingResolver(d) publisher := NewRoutingPublisher(d) - privk, pubk, err := testutil.RandKeyPair(512) + privk, pubk, err := testutil.RandTestKeyPair(512) if err != nil { t.Fatal(err) } diff --git a/p2p/crypto/key_test.go b/p2p/crypto/key_test.go index fa2ad7799..f4a9599f8 100644 --- a/p2p/crypto/key_test.go +++ b/p2p/crypto/key_test.go @@ -9,7 +9,7 @@ import ( ) func TestRsaKeys(t *testing.T) { - sk, pk, err := tu.RandKeyPair(512) + sk, pk, err := tu.RandTestKeyPair(512) if err != nil { t.Fatal(err) } @@ -93,7 +93,7 @@ func testKeyEquals(t *testing.T, k Key) { t.Fatal("Key not equal to key with same bytes.") } - sk, pk, err := tu.RandKeyPair(512) + sk, pk, err := tu.RandTestKeyPair(512) if err != nil { t.Fatal(err) } diff --git a/p2p/net/mock/mock_net.go b/p2p/net/mock/mock_net.go index 1bb9ddf5a..664ef1a2b 100644 --- a/p2p/net/mock/mock_net.go +++ b/p2p/net/mock/mock_net.go @@ -45,7 +45,7 @@ func New(ctx context.Context) Mocknet { } func (mn *mocknet) GenPeer() (host.Host, error) { - sk, _, err := testutil.SeededKeyPair(time.Now().UnixNano()) + sk, _, err := testutil.SeededTestKeyPair(time.Now().UnixNano()) if err != nil { return nil, err } diff --git a/p2p/net/mock/mock_test.go b/p2p/net/mock/mock_test.go index 268b35f4d..1b1ca7ddb 100644 --- a/p2p/net/mock/mock_test.go +++ b/p2p/net/mock/mock_test.go @@ -26,15 +26,15 @@ func randPeer(t *testing.T) peer.ID { func TestNetworkSetup(t *testing.T) { ctx := context.Background() - sk1, _, err := testutil.RandKeyPair(512) + sk1, _, err := testutil.RandTestKeyPair(512) if err != nil { t.Fatal(t) } - sk2, _, err := testutil.RandKeyPair(512) + sk2, _, err := testutil.RandTestKeyPair(512) if err != nil { t.Fatal(t) } - sk3, _, err := testutil.RandKeyPair(512) + sk3, _, err := testutil.RandTestKeyPair(512) if err != nil { t.Fatal(t) } @@ -398,7 +398,7 @@ func TestAdding(t *testing.T) { peers := []peer.ID{} for i := 0; i < 3; i++ { - sk, _, err := testutil.RandKeyPair(512) + sk, _, err := testutil.RandTestKeyPair(512) if err != nil { t.Fatal(err) } diff --git a/p2p/peer/peer_test.go b/p2p/peer/peer_test.go index b19d0faef..d5bd793fd 100644 --- a/p2p/peer/peer_test.go +++ b/p2p/peer/peer_test.go @@ -41,7 +41,7 @@ type keyset struct { func (ks *keyset) generate() error { var err error - ks.sk, ks.pk, err = tu.RandKeyPair(512) + ks.sk, ks.pk, err = tu.RandTestKeyPair(512) if err != nil { return err } diff --git a/util/testutil/gen.go b/util/testutil/gen.go index 93bca120d..2097c6117 100644 --- a/util/testutil/gen.go +++ b/util/testutil/gen.go @@ -29,11 +29,11 @@ func init() { ZeroLocalTCPAddress = maddr } -func RandKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) { +func RandTestKeyPair(bits int) (ci.PrivKey, ci.PubKey, error) { return ci.GenerateKeyPairWithReader(ci.RSA, bits, u.NewTimeSeededRand()) } -func SeededKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) { +func SeededTestKeyPair(seed int64) (ci.PrivKey, ci.PubKey, error) { return ci.GenerateKeyPairWithReader(ci.RSA, 512, u.NewSeededRand(seed)) } @@ -142,7 +142,7 @@ func RandPeerNetParams() (*PeerNetParams, error) { var p PeerNetParams var err error p.Addr = ZeroLocalTCPAddress - p.PrivKey, p.PubKey, err = RandKeyPair(512) + p.PrivKey, p.PubKey, err = RandTestKeyPair(512) if err != nil { return nil, err } From f25dfb68b98a3fe78d9184b30414daa07ab0ed12 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 5 Jan 2015 04:33:25 -0800 Subject: [PATCH 24/28] peer/queue: close fix, and logging --- p2p/peer/queue/sync.go | 19 ++++++++++++++++--- 1 file changed, 16 insertions(+), 3 deletions(-) diff --git a/p2p/peer/queue/sync.go b/p2p/peer/queue/sync.go index 3f75cd0cf..3d7aa68ad 100644 --- a/p2p/peer/queue/sync.go +++ b/p2p/peer/queue/sync.go @@ -4,8 +4,11 @@ import ( context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" peer "github.com/jbenet/go-ipfs/p2p/peer" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" ) +var log = eventlog.Logger("peerqueue") + // ChanQueue makes any PeerQueue synchronizable through channels. type ChanQueue struct { Queue PeerQueue @@ -21,6 +24,7 @@ func NewChanQueue(ctx context.Context, pq PeerQueue) *ChanQueue { } func (cq *ChanQueue) process(ctx context.Context) { + log := log.Prefix("", cq) // construct the channels here to be able to use them bidirectionally enqChan := make(chan peer.ID) @@ -30,6 +34,8 @@ func (cq *ChanQueue) process(ctx context.Context) { cq.DeqChan = deqChan go func() { + log.Debug("processing") + defer log.Debug("closed") defer close(deqChan) var next peer.ID @@ -38,11 +44,13 @@ func (cq *ChanQueue) process(ctx context.Context) { for { if cq.Queue.Len() == 0 { + // log.Debug("wait for enqueue") select { case next, more = <-enqChan: if !more { return } + // log.Debug("got", next) case <-ctx.Done(): return @@ -50,19 +58,24 @@ func (cq *ChanQueue) process(ctx context.Context) { } else { next = cq.Queue.Dequeue() + // log.Debug("peek", next) } select { case item, more = <-enqChan: if !more { - return + if cq.Queue.Len() > 0 { + return // we're done done. + } + enqChan = nil // closed, so no use. } - + // log.Debug("got", item) cq.Queue.Enqueue(item) - cq.Queue.Enqueue(next) + cq.Queue.Enqueue(next) // order may have changed. next = "" case deqChan <- next: + // log.Debug("dequeued", next) next = "" case <-ctx.Done(): From 172801712e0eae07828797b3fd061dd5bbe11326 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 5 Jan 2015 04:35:54 -0800 Subject: [PATCH 25/28] dht: even more logging. --- routing/dht/dht.go | 5 +++ routing/dht/query.go | 73 ++++++++++++++++++++++-------------------- routing/dht/routing.go | 46 +++++++++++++++----------- 3 files changed, 72 insertions(+), 52 deletions(-) diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 2a576629a..17d300d87 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -89,6 +89,11 @@ func (dht *IpfsDHT) LocalPeer() peer.ID { return dht.self } +// log returns the dht's logger +func (dht *IpfsDHT) log() eventlog.EventLogger { + return log.Prefix("dht(%s)", dht.self) +} + // Connect to a new peer at the given address, ping and add to the routing table func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.ID) error { // TODO: change interface to accept a PeerInfo as well. diff --git a/routing/dht/query.go b/routing/dht/query.go index 0056bee1d..44dc49926 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -7,6 +7,7 @@ import ( queue "github.com/jbenet/go-ipfs/p2p/peer/queue" "github.com/jbenet/go-ipfs/routing" u "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" pset "github.com/jbenet/go-ipfs/util/peerset" todoctr "github.com/jbenet/go-ipfs/util/todocounter" @@ -55,32 +56,18 @@ func (q *dhtQuery) Run(ctx context.Context, peers []peer.ID) (*dhtQueryResult, e } type dhtQueryRunner struct { + query *dhtQuery // query to run + peersSeen *pset.PeerSet // all peers queried. prevent querying same peer 2x + peersToQuery *queue.ChanQueue // peers remaining to be queried + peersRemaining todoctr.Counter // peersToQuery + currently processing - // the query to run - query *dhtQuery + result *dhtQueryResult // query result + errs []error // result errors. maybe should be a map[peer.ID]error - // peersToQuery is a list of peers remaining to query - peersToQuery *queue.ChanQueue + rateLimit chan struct{} // processing semaphore + log eventlog.EventLogger - // peersSeen are all the peers queried. used to prevent querying same peer 2x - peersSeen *pset.PeerSet - - // rateLimit is a channel used to rate limit our processing (semaphore) - rateLimit chan struct{} - - // peersRemaining is a counter of peers remaining (toQuery + processing) - peersRemaining todoctr.Counter - - // context group cg ctxgroup.ContextGroup - - // result - result *dhtQueryResult - - // result errors - errs []error - - // lock for concurrent access to fields sync.RWMutex } @@ -96,6 +83,11 @@ func newQueryRunner(ctx context.Context, q *dhtQuery) *dhtQueryRunner { } func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { + log := log.Prefix("dht(%s).Query(%s).Run(%d)", r.query.dht.self, r.query.key, len(peers)) + r.log = log + log.Debug("enter") + defer log.Debug("end") + log.Debugf("Run query with %d peers.", len(peers)) if len(peers) == 0 { log.Warning("Running query with no peers!") @@ -115,6 +107,7 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { // go do this thing. // do it as a child func to make sure Run exits // ONLY AFTER spawn workers has exited. + log.Debugf("go spawn workers") r.cg.AddChildFunc(r.spawnWorkers) // so workers are working. @@ -124,41 +117,45 @@ func (r *dhtQueryRunner) Run(peers []peer.ID) (*dhtQueryResult, error) { select { case <-r.peersRemaining.Done(): + log.Debug("all peers ended") r.cg.Close() r.RLock() defer r.RUnlock() if len(r.errs) > 0 { - err = r.errs[0] + err = r.errs[0] // take the first? } case <-r.cg.Closed(): + log.Debug("r.cg.Closed()") + r.RLock() defer r.RUnlock() err = r.cg.Context().Err() // collect the error. } if r.result != nil && r.result.success { + log.Debug("success: %s", r.result) return r.result, nil } + log.Debug("failure: %s", err) return nil, err } func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) { // if new peer is ourselves... if next == r.query.dht.self { + r.log.Debug("addPeerToQuery skip self") return } if !r.peersSeen.TryAdd(next) { - log.Debug("query peer was already seen") + r.log.Debugf("addPeerToQuery skip seen %s", next) return } - log.Debugf("adding peer to query: %v", next) - - // do this after unlocking to prevent possible deadlocks. + r.log.Debugf("addPeerToQuery adding %s", next) r.peersRemaining.Increment(1) select { case r.peersToQuery.EnqChan <- next: @@ -167,6 +164,10 @@ func (r *dhtQueryRunner) addPeerToQuery(ctx context.Context, next peer.ID) { } func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) { + log := r.log.Prefix("spawnWorkers") + log.Debugf("begin") + defer log.Debugf("end") + for { select { @@ -192,7 +193,9 @@ func (r *dhtQueryRunner) spawnWorkers(parent ctxgroup.ContextGroup) { } func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { - log.Debugf("spawned worker for: %v", p) + log := r.log.Prefix("queryPeer(%s)", p) + log.Debugf("spawned") + defer log.Debugf("finished") // make sure we rate limit concurrency. select { @@ -203,34 +206,36 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { } // ok let's do this! - log.Debugf("running worker for: %v", p) + log.Debugf("running") // make sure we do this when we exit defer func() { // signal we're done proccessing peer p - log.Debugf("completing worker for: %v", p) + log.Debugf("completed") r.peersRemaining.Decrement(1) r.rateLimit <- struct{}{} }() // make sure we're connected to the peer. if conns := r.query.dht.host.Network().ConnsToPeer(p); len(conns) == 0 { - log.Infof("worker for: %v -- not connected. dial start", p) + log.Infof("not connected. dialing.") pi := peer.PeerInfo{ID: p} if err := r.query.dht.host.Connect(cg.Context(), pi); err != nil { - log.Debugf("ERROR worker for: %v -- err connecting: %v", p, err) + log.Debugf("Error connecting: %s", err) r.Lock() r.errs = append(r.errs, err) r.Unlock() return } - log.Infof("worker for: %v -- not connected. dial success!", p) + log.Debugf("connected. dial success.") } // finally, run the query against this peer + log.Debugf("query running") res, err := r.query.qfunc(cg.Context(), p) + log.Debugf("query finished") if err != nil { log.Debugf("ERROR worker for: %v %v", p, err) @@ -239,7 +244,7 @@ func (r *dhtQueryRunner) queryPeer(cg ctxgroup.ContextGroup, p peer.ID) { r.Unlock() } else if res.success { - log.Debugf("SUCCESS worker for: %v", p, res) + log.Debugf("SUCCESS worker for: %v %s", p, res) r.Lock() r.result = res r.Unlock() diff --git a/routing/dht/routing.go b/routing/dht/routing.go index ec414de13..5978a9a80 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -1,7 +1,6 @@ package dht import ( - "fmt" "math" "sync" @@ -66,25 +65,29 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key u.Key, value []byte) error // If the search does not succeed, a multiaddr string of a closer peer is // returned along with util.ErrSearchIncomplete func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { - log.Debugf("Get Value [%s]", key) + log := dht.log().Prefix("GetValue(%s)", key) + log.Debugf("start") + defer log.Debugf("end") // If we have it local, dont bother doing an RPC! val, err := dht.getLocal(key) if err == nil { - log.Debug("Got value locally!") + log.Debug("have it locally") return val, nil } // get closest peers in the routing table + rtp := dht.routingTable.ListPeers() + log.Debugf("peers in rt: %s", len(rtp), rtp) + closest := dht.routingTable.NearestPeers(kb.ConvertKey(key), PoolSize) if closest == nil || len(closest) == 0 { - log.Warning("Got no peers back from routing table!") + log.Warning("No peers from routing table!") return nil, errors.Wrap(kb.ErrLookupFailure) } // setup the Query query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { - val, peers, err := dht.getValueOrPeers(ctx, p, key) if err != nil { return nil, err @@ -117,9 +120,13 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key u.Key) ([]byte, error) { // Provide makes this node announce that it can provide a value for the given key func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { - + log := dht.log().Prefix("Provide(%s)", key) + log.Debugf("start", key) log.Event(ctx, "provideBegin", &key) + defer log.Debugf("end", key) defer log.Event(ctx, "provideEnd", &key) + + // add self locally dht.providers.AddProvider(key, dht.self) peers, err := dht.getClosestPeers(ctx, key) @@ -132,6 +139,7 @@ func (dht *IpfsDHT) Provide(ctx context.Context, key u.Key) error { wg.Add(1) go func(p peer.ID) { defer wg.Done() + log.Debugf("putProvider(%s, %s)", key, p) err := dht.putProvider(ctx, p, string(key)) if err != nil { log.Error(err) @@ -231,9 +239,12 @@ func (dht *IpfsDHT) FindProvidersAsync(ctx context.Context, key u.Key, count int } func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, count int, peerOut chan peer.PeerInfo) { + log := dht.log().Prefix("FindProviders(%s)", key) + defer close(peerOut) defer log.Event(ctx, "findProviders end", &key) - log.Debugf("%s FindProviders %s", dht.self, key) + log.Debug("begin") + defer log.Debug("begin") ps := pset.NewLimited(count) provs := dht.providers.GetProviders(ctx, key) @@ -255,25 +266,24 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // setup the Query query := dht.newQuery(key, func(ctx context.Context, p peer.ID) (*dhtQueryResult, error) { - - reqDesc := fmt.Sprintf("%s findProviders(%s).Query(%s): ", dht.self, key, p) - log.Debugf("%s begin", reqDesc) - defer log.Debugf("%s end", reqDesc) + log := log.Prefix("Query(%s)", p) + log.Debugf("begin") + defer log.Debugf("end") pmes, err := dht.findProvidersSingle(ctx, p, key) if err != nil { return nil, err } - log.Debugf("%s got %d provider entries", reqDesc, len(pmes.GetProviderPeers())) + log.Debugf("%d provider entries", len(pmes.GetProviderPeers())) provs := pb.PBPeersToPeerInfos(pmes.GetProviderPeers()) - log.Debugf("%s got %d provider entries decoded", reqDesc, len(provs)) + log.Debugf("%d provider entries decoded", len(provs)) // Add unique providers from request, up to 'count' for _, prov := range provs { - log.Debugf("%s got provider: %s", reqDesc, prov) + log.Debugf("got provider: %s", prov) if ps.TryAdd(prov.ID) { - log.Debugf("%s using provider: %s", reqDesc, prov) + log.Debugf("using provider: %s", prov) select { case peerOut <- prov: case <-ctx.Done(): @@ -282,7 +292,7 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co } } if ps.Size() >= count { - log.Debugf("%s got enough providers (%d/%d)", reqDesc, ps.Size(), count) + log.Debugf("got enough providers (%d/%d)", ps.Size(), count) return &dhtQueryResult{success: true}, nil } } @@ -290,14 +300,14 @@ func (dht *IpfsDHT) findProvidersAsyncRoutine(ctx context.Context, key u.Key, co // Give closer peers back to the query to be queried closer := pmes.GetCloserPeers() clpeers := pb.PBPeersToPeerInfos(closer) - log.Debugf("%s got closer peers: %s", reqDesc, clpeers) + log.Debugf("got closer peers: %d %s", len(clpeers), clpeers) return &dhtQueryResult{closerPeers: clpeers}, nil }) peers := dht.routingTable.NearestPeers(kb.ConvertKey(key), AlphaValue) _, err := query.Run(ctx, peers) if err != nil { - log.Errorf("FindProviders Query error: %s", err) + log.Errorf("Query error: %s", err) } } From 41af4f4ea1039a1b17ade71029425ccf9045f8b3 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 5 Jan 2015 04:48:37 -0800 Subject: [PATCH 26/28] dht test skips --- routing/dht/dht_test.go | 5 +++++ routing/dht/ext_test.go | 2 ++ 2 files changed, 7 insertions(+) diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 133f7a27c..147970695 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -231,6 +231,7 @@ func TestProvides(t *testing.T) { } func TestBootstrap(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } @@ -388,6 +389,7 @@ func TestProvidesMany(t *testing.T) { } func TestProvidesAsync(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } @@ -442,6 +444,7 @@ func TestProvidesAsync(t *testing.T) { } func TestLayeredGet(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } @@ -482,6 +485,7 @@ func TestLayeredGet(t *testing.T) { } func TestFindPeer(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } @@ -596,6 +600,7 @@ func testPeerListsMatch(t *testing.T, p1, p2 []peer.ID) { } func TestConnectCollision(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 77ea54c96..6f12c3113 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -136,6 +136,7 @@ func TestGetFailures(t *testing.T) { } func TestNotFound(t *testing.T) { + // t.Skip("skipping test to debug another") if testing.Short() { t.SkipNow() } @@ -210,6 +211,7 @@ func TestNotFound(t *testing.T) { // If less than K nodes are in the entire network, it should fail when we make // a GET rpc and nobody has the value func TestLessThanKResponses(t *testing.T) { + // t.Skip("skipping test to debug another") // t.Skip("skipping test because it makes a lot of output") ctx := context.Background() From 1ab9588cba4c87ba6ff238c59cbcabdb2101ff80 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 5 Jan 2015 05:21:05 -0800 Subject: [PATCH 27/28] p2p/test: bogus key pair for faster tests --- blockservice/mock.go | 2 +- exchange/bitswap/bitswap_test.go | 18 ++-- exchange/bitswap/testutils.go | 6 +- p2p/net/mock/mock_net.go | 4 +- p2p/test/util/key.go | 165 +++++++++++++++++++++++++++++++ 5 files changed, 181 insertions(+), 14 deletions(-) create mode 100644 p2p/test/util/key.go diff --git a/blockservice/mock.go b/blockservice/mock.go index 57432178e..73fcdf2fc 100644 --- a/blockservice/mock.go +++ b/blockservice/mock.go @@ -12,7 +12,7 @@ import ( // Mocks returns |n| connected mock Blockservices func Mocks(t *testing.T, n int) []*BlockService { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(0)) - sg := bitswap.NewSessionGenerator(net) + sg := bitswap.NewTestSessionGenerator(net) instances := sg.Instances(n) diff --git a/exchange/bitswap/bitswap_test.go b/exchange/bitswap/bitswap_test.go index af6cb138c..64d5ead52 100644 --- a/exchange/bitswap/bitswap_test.go +++ b/exchange/bitswap/bitswap_test.go @@ -11,10 +11,10 @@ import ( blocks "github.com/jbenet/go-ipfs/blocks" blocksutil "github.com/jbenet/go-ipfs/blocks/blocksutil" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" + p2ptestutil "github.com/jbenet/go-ipfs/p2p/test/util" mockrouting "github.com/jbenet/go-ipfs/routing/mock" u "github.com/jbenet/go-ipfs/util" delay "github.com/jbenet/go-ipfs/util/delay" - "github.com/jbenet/go-ipfs/util/testutil" ) // FIXME the tests are really sensitive to the network delay. fix them to work @@ -25,7 +25,7 @@ func TestClose(t *testing.T) { // TODO t.Skip("TODO Bitswap's Close implementation is a WIP") vnet := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sesgen := NewSessionGenerator(vnet) + sesgen := NewTestSessionGenerator(vnet) defer sesgen.Close() bgen := blocksutil.NewBlockGenerator() @@ -39,7 +39,7 @@ func TestClose(t *testing.T) { func TestGetBlockTimeout(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - g := NewSessionGenerator(net) + g := NewTestSessionGenerator(net) defer g.Close() self := g.Next() @@ -57,11 +57,11 @@ func TestProviderForKeyButNetworkCannotFind(t *testing.T) { // TODO revisit this rs := mockrouting.NewServer() net := tn.VirtualNetwork(rs, delay.Fixed(kNetworkDelay)) - g := NewSessionGenerator(net) + g := NewTestSessionGenerator(net) defer g.Close() block := blocks.NewBlock([]byte("block")) - pinfo := testutil.RandIdentityOrFatal(t) + pinfo := p2ptestutil.RandTestBogusIdentityOrFatal(t) rs.Client(pinfo).Provide(context.Background(), block.Key()) // but not on network solo := g.Next() @@ -81,7 +81,7 @@ func TestGetBlockFromPeerAfterPeerAnnounces(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) block := blocks.NewBlock([]byte("block")) - g := NewSessionGenerator(net) + g := NewTestSessionGenerator(net) defer g.Close() hasBlock := g.Next() @@ -134,7 +134,7 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) { t.SkipNow() } net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewSessionGenerator(net) + sg := NewTestSessionGenerator(net) defer sg.Close() bg := blocksutil.NewBlockGenerator() @@ -198,7 +198,7 @@ func TestSendToWantingPeer(t *testing.T) { } net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewSessionGenerator(net) + sg := NewTestSessionGenerator(net) defer sg.Close() bg := blocksutil.NewBlockGenerator() @@ -243,7 +243,7 @@ func TestSendToWantingPeer(t *testing.T) { func TestBasicBitswap(t *testing.T) { net := tn.VirtualNetwork(mockrouting.NewServer(), delay.Fixed(kNetworkDelay)) - sg := NewSessionGenerator(net) + sg := NewTestSessionGenerator(net) bg := blocksutil.NewBlockGenerator() t.Log("Test a few nodes trying to get one file with a lot of blocks") diff --git a/exchange/bitswap/testutils.go b/exchange/bitswap/testutils.go index dd96e5f46..95019f297 100644 --- a/exchange/bitswap/testutils.go +++ b/exchange/bitswap/testutils.go @@ -10,12 +10,14 @@ import ( exchange "github.com/jbenet/go-ipfs/exchange" tn "github.com/jbenet/go-ipfs/exchange/bitswap/testnet" peer "github.com/jbenet/go-ipfs/p2p/peer" + p2ptestutil "github.com/jbenet/go-ipfs/p2p/test/util" datastore2 "github.com/jbenet/go-ipfs/util/datastore2" delay "github.com/jbenet/go-ipfs/util/delay" testutil "github.com/jbenet/go-ipfs/util/testutil" ) -func NewSessionGenerator( +// WARNING: this uses RandTestBogusIdentity DO NOT USE for NON TESTS! +func NewTestSessionGenerator( net tn.Network) SessionGenerator { ctx, cancel := context.WithCancel(context.TODO()) return SessionGenerator{ @@ -41,7 +43,7 @@ func (g *SessionGenerator) Close() error { func (g *SessionGenerator) Next() Instance { g.seq++ - p, err := testutil.RandIdentity() + p, err := p2ptestutil.RandTestBogusIdentity() if err != nil { panic("FIXME") // TODO change signature } diff --git a/p2p/net/mock/mock_net.go b/p2p/net/mock/mock_net.go index 664ef1a2b..6146c4e52 100644 --- a/p2p/net/mock/mock_net.go +++ b/p2p/net/mock/mock_net.go @@ -4,13 +4,13 @@ import ( "fmt" "sort" "sync" - "time" ic "github.com/jbenet/go-ipfs/p2p/crypto" host "github.com/jbenet/go-ipfs/p2p/host" bhost "github.com/jbenet/go-ipfs/p2p/host/basic" inet "github.com/jbenet/go-ipfs/p2p/net" peer "github.com/jbenet/go-ipfs/p2p/peer" + p2putil "github.com/jbenet/go-ipfs/p2p/test/util" testutil "github.com/jbenet/go-ipfs/util/testutil" context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" @@ -45,7 +45,7 @@ func New(ctx context.Context) Mocknet { } func (mn *mocknet) GenPeer() (host.Host, error) { - sk, _, err := testutil.SeededTestKeyPair(time.Now().UnixNano()) + sk, err := p2putil.RandTestBogusPrivateKey() if err != nil { return nil, err } diff --git a/p2p/test/util/key.go b/p2p/test/util/key.go new file mode 100644 index 000000000..fc4f3af4e --- /dev/null +++ b/p2p/test/util/key.go @@ -0,0 +1,165 @@ +package testutil + +import ( + "bytes" + "io" + "testing" + + u "github.com/jbenet/go-ipfs/util" + eventlog "github.com/jbenet/go-ipfs/util/eventlog" + testutil "github.com/jbenet/go-ipfs/util/testutil" + + ic "github.com/jbenet/go-ipfs/p2p/crypto" + peer "github.com/jbenet/go-ipfs/p2p/peer" + + ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" +) + +var log = eventlog.Logger("boguskey") + +// TestBogusPrivateKey is a key used for testing (to avoid expensive keygen) +type TestBogusPrivateKey []byte + +// TestBogusPublicKey is a key used for testing (to avoid expensive keygen) +type TestBogusPublicKey []byte + +func (pk TestBogusPublicKey) Verify(data, sig []byte) (bool, error) { + log.Criticalf("TestBogusPublicKey.Verify -- this better be a test!") + return bytes.Equal(data, reverse(sig)), nil +} + +func (pk TestBogusPublicKey) Bytes() ([]byte, error) { + return []byte(pk), nil +} + +func (pk TestBogusPublicKey) Encrypt(b []byte) ([]byte, error) { + log.Criticalf("TestBogusPublicKey.Encrypt -- this better be a test!") + return reverse(b), nil +} + +// Equals checks whether this key is equal to another +func (pk TestBogusPublicKey) Equals(k ic.Key) bool { + return ic.KeyEqual(pk, k) +} + +func (pk TestBogusPublicKey) Hash() ([]byte, error) { + return ic.KeyHash(pk) +} + +func (sk TestBogusPrivateKey) GenSecret() []byte { + return []byte(sk) +} + +func (sk TestBogusPrivateKey) Sign(message []byte) ([]byte, error) { + log.Criticalf("TestBogusPrivateKey.Sign -- this better be a test!") + return reverse(message), nil +} + +func (sk TestBogusPrivateKey) GetPublic() ic.PubKey { + return TestBogusPublicKey(sk) +} + +func (sk TestBogusPrivateKey) Decrypt(b []byte) ([]byte, error) { + log.Criticalf("TestBogusPrivateKey.Decrypt -- this better be a test!") + return reverse(b), nil +} + +func (sk TestBogusPrivateKey) Bytes() ([]byte, error) { + return []byte(sk), nil +} + +// Equals checks whether this key is equal to another +func (sk TestBogusPrivateKey) Equals(k ic.Key) bool { + return ic.KeyEqual(sk, k) +} + +func (sk TestBogusPrivateKey) Hash() ([]byte, error) { + return ic.KeyHash(sk) +} + +func RandTestBogusPrivateKey() (TestBogusPrivateKey, error) { + r := u.NewTimeSeededRand() + k := make([]byte, 5) + if _, err := io.ReadFull(r, k); err != nil { + return nil, err + } + return TestBogusPrivateKey(k), nil +} + +func RandTestBogusPublicKey() (TestBogusPublicKey, error) { + k, err := RandTestBogusPrivateKey() + return TestBogusPublicKey(k), err +} + +func RandTestBogusPrivateKeyOrFatal(t *testing.T) TestBogusPrivateKey { + k, err := RandTestBogusPrivateKey() + if err != nil { + t.Fatal(err) + } + return k +} + +func RandTestBogusPublicKeyOrFatal(t *testing.T) TestBogusPublicKey { + k, err := RandTestBogusPublicKey() + if err != nil { + t.Fatal(err) + } + return k +} + +func RandTestBogusIdentity() (testutil.Identity, error) { + k, err := RandTestBogusPrivateKey() + if err != nil { + return nil, err + } + + id, err := peer.IDFromPrivateKey(k) + if err != nil { + return nil, err + } + + return &identity{ + k: k, + id: id, + a: testutil.RandLocalTCPAddress(), + }, nil +} + +func RandTestBogusIdentityOrFatal(t *testing.T) testutil.Identity { + k, err := RandTestBogusIdentity() + if err != nil { + t.Fatal(err) + } + return k +} + +// identity is a temporary shim to delay binding of PeerNetParams. +type identity struct { + k TestBogusPrivateKey + id peer.ID + a ma.Multiaddr +} + +func (p *identity) ID() peer.ID { + return p.id +} + +func (p *identity) Address() ma.Multiaddr { + return p.a +} + +func (p *identity) PrivateKey() ic.PrivKey { + return p.k +} + +func (p *identity) PublicKey() ic.PubKey { + return p.k.GetPublic() +} + +func reverse(a []byte) []byte { + b := make([]byte, len(a)) + for i := 0; i < len(a); i++ { + b[i] = a[len(a)-1-i] + } + return b +} From 919e9bebee14f69cd5be20ea6b5dc76acafc2665 Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Mon, 5 Jan 2015 05:48:35 -0800 Subject: [PATCH 28/28] ipns test: is fast republish a timing prob? --- fuse/ipns/ipns_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/fuse/ipns/ipns_test.go b/fuse/ipns/ipns_test.go index 5811a0533..dab2a784b 100644 --- a/fuse/ipns/ipns_test.go +++ b/fuse/ipns/ipns_test.go @@ -248,7 +248,7 @@ func TestFastRepublish(t *testing.T) { // get first resolved hash log.Debug("publishing first hash") writeFileData(t, dataA, fname) // random - <-time.After(shortRepublishTimeout * 11 / 10) + <-time.After(shortRepublishTimeout * 2) log.Debug("resolving first hash") resolvedHash, err := node.Namesys.Resolve(pubkeyHash) if err != nil {