diff --git a/core/commands/ping.go b/core/commands/ping.go index 5ee12b867..fa9974428 100644 --- a/core/commands/ping.go +++ b/core/commands/ping.go @@ -138,30 +138,35 @@ func pingPeer(ctx context.Context, n *core.IpfsNode, pid peer.ID, numPings int) outChan <- &PingResult{Text: fmt.Sprintf("PING %s.", pid.Pretty())} + ctx, cancel := context.WithTimeout(ctx, kPingTimeout*time.Duration(numPings)) + defer cancel() + pings, err := n.Ping.Ping(ctx, pid) + if err != nil { + log.Debugf("Ping error: %s", err) + outChan <- &PingResult{Text: fmt.Sprintf("Ping error: %s", err)} + return + } + var done bool var total time.Duration for i := 0; i < numPings && !done; i++ { select { case <-ctx.Done(): done = true - continue - default: - } - - ctx, cancel := context.WithTimeout(ctx, kPingTimeout) - defer cancel() - took, err := n.Routing.Ping(ctx, pid) - if err != nil { - log.Debugf("Ping error: %s", err) - outChan <- &PingResult{Text: fmt.Sprintf("Ping error: %s", err)} break + case t, ok := <-pings: + if !ok { + done = true + break + } + + outChan <- &PingResult{ + Success: true, + Time: t, + } + total += t + time.Sleep(time.Second) } - outChan <- &PingResult{ - Success: true, - Time: took, - } - total += took - time.Sleep(time.Second) } averagems := total.Seconds() * 1000 / float64(numPings) outChan <- &PingResult{ diff --git a/core/core.go b/core/core.go index 6ef28150d..9ceb388dd 100644 --- a/core/core.go +++ b/core/core.go @@ -33,6 +33,7 @@ import ( swarm "github.com/ipfs/go-ipfs/p2p/net/swarm" addrutil "github.com/ipfs/go-ipfs/p2p/net/swarm/addr" peer "github.com/ipfs/go-ipfs/p2p/peer" + ping "github.com/ipfs/go-ipfs/p2p/protocol/ping" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" routing "github.com/ipfs/go-ipfs/routing" @@ -102,7 +103,8 @@ type IpfsNode struct { Exchange exchange.Interface // the block exchange + strategy (bitswap) Namesys namesys.NameSystem // the name system, resolves paths to hashes Diagnostics *diag.Diagnostics // the diagnostics service - Reprovider *rp.Reprovider // the value reprovider system + Ping *ping.PingService + Reprovider *rp.Reprovider // the value reprovider system IpnsFs *ipnsfs.Filesystem @@ -324,6 +326,7 @@ func (n *IpfsNode) HandlePeerFound(p peer.PeerInfo) { func (n *IpfsNode) startOnlineServicesWithHost(ctx context.Context, host p2phost.Host, routingOption RoutingOption) error { // setup diagnostics service n.Diagnostics = diag.NewDiagnostics(n.Identity, host) + n.Ping = ping.NewPingService(host) // setup routing service r, err := routingOption(ctx, host, n.Repo.Datastore()) diff --git a/p2p/protocol/ping/ping.go b/p2p/protocol/ping/ping.go new file mode 100644 index 000000000..6a4327d22 --- /dev/null +++ b/p2p/protocol/ping/ping.go @@ -0,0 +1,105 @@ +package ping + +import ( + "bytes" + "errors" + "io" + "time" + + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + + host "github.com/ipfs/go-ipfs/p2p/host" + inet "github.com/ipfs/go-ipfs/p2p/net" + peer "github.com/ipfs/go-ipfs/p2p/peer" + eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" + u "github.com/ipfs/go-ipfs/util" +) + +var log = eventlog.Logger("ping") + +const PingSize = 32 + +const ID = "/ipfs/ping" + +type PingService struct { + Host host.Host +} + +func NewPingService(h host.Host) *PingService { + ps := &PingService{h} + h.SetStreamHandler(ID, ps.PingHandler) + return ps +} + +func (p *PingService) PingHandler(s inet.Stream) { + buf := make([]byte, PingSize) + + for { + _, err := io.ReadFull(s, buf) + if err != nil { + log.Debug(err) + return + } + + _, err = s.Write(buf) + if err != nil { + log.Debug(err) + return + } + } +} + +func (ps *PingService) Ping(ctx context.Context, p peer.ID) (<-chan time.Duration, error) { + s, err := ps.Host.NewStream(ID, p) + if err != nil { + return nil, err + } + + out := make(chan time.Duration) + go func() { + defer close(out) + for { + select { + case <-ctx.Done(): + return + default: + t, err := ping(s) + if err != nil { + log.Debugf("ping error: %s", err) + return + } + + select { + case out <- t: + case <-ctx.Done(): + return + } + } + } + }() + + return out, nil +} + +func ping(s inet.Stream) (time.Duration, error) { + buf := make([]byte, PingSize) + u.NewTimeSeededRand().Read(buf) + + before := time.Now() + _, err := s.Write(buf) + if err != nil { + return 0, err + } + + rbuf := make([]byte, PingSize) + _, err = io.ReadFull(s, rbuf) + if err != nil { + return 0, err + } + + if !bytes.Equal(buf, rbuf) { + return 0, errors.New("ping packet was incorrect!") + } + + return time.Now().Sub(before), nil +} diff --git a/p2p/protocol/ping/ping_test.go b/p2p/protocol/ping/ping_test.go new file mode 100644 index 000000000..85bb34219 --- /dev/null +++ b/p2p/protocol/ping/ping_test.go @@ -0,0 +1,51 @@ +package ping + +import ( + "testing" + "time" + + context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" + peer "github.com/ipfs/go-ipfs/p2p/peer" + netutil "github.com/ipfs/go-ipfs/p2p/test/util" +) + +func TestPing(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + h1 := netutil.GenHostSwarm(t, ctx) + h2 := netutil.GenHostSwarm(t, ctx) + + err := h1.Connect(ctx, peer.PeerInfo{ + ID: h2.ID(), + Addrs: h2.Addrs(), + }) + + if err != nil { + t.Fatal(err) + } + + ps1 := NewPingService(h1) + ps2 := NewPingService(h2) + + testPing(t, ps1, h2.ID()) + testPing(t, ps2, h1.ID()) +} + +func testPing(t *testing.T, ps *PingService, p peer.ID) { + pctx, cancel := context.WithCancel(context.Background()) + defer cancel() + ts, err := ps.Ping(pctx, p) + if err != nil { + t.Fatal(err) + } + + for i := 0; i < 5; i++ { + select { + case took := <-ts: + t.Log("ping took: ", took) + case <-time.After(time.Second * 4): + t.Fatal("failed to receive ping") + } + } + +} diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index edfffcebf..1358903a9 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -102,35 +102,6 @@ func bootstrap(t *testing.T, ctx context.Context, dhts []*IpfsDHT) { cancel() } -func TestPing(t *testing.T) { - // t.Skip("skipping test to debug another") - ctx := context.Background() - - dhtA := setupDHT(ctx, t) - dhtB := setupDHT(ctx, t) - - peerA := dhtA.self - peerB := dhtB.self - - defer dhtA.Close() - defer dhtB.Close() - defer dhtA.host.Close() - defer dhtB.host.Close() - - connect(t, ctx, dhtA, dhtB) - - //Test that we can ping the node - ctxT, _ := context.WithTimeout(ctx, 100*time.Millisecond) - if _, err := dhtA.Ping(ctxT, peerB); err != nil { - t.Fatal(err) - } - - ctxT, _ = context.WithTimeout(ctx, 100*time.Millisecond) - if _, err := dhtB.Ping(ctxT, peerA); err != nil { - t.Fatal(err) - } -} - func TestValueGetSet(t *testing.T) { // t.Skip("skipping test to debug another") diff --git a/routing/dht/routing.go b/routing/dht/routing.go index c4dc76ac4..190e50285 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -2,7 +2,6 @@ package dht import ( "sync" - "time" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" @@ -397,16 +396,3 @@ func (dht *IpfsDHT) FindPeersConnectedToPeer(ctx context.Context, id peer.ID) (< return peerchan, nil } - -// Ping a peer, log the time it took -func (dht *IpfsDHT) Ping(ctx context.Context, p peer.ID) (time.Duration, error) { - // Thoughts: maybe this should accept an ID and do a peer lookup? - log.Debugf("ping %s start", p) - before := time.Now() - - pmes := pb.NewMessage(pb.Message_PING, "", 0) - _, err := dht.sendRequest(ctx, p, pmes) - log.Debugf("ping %s end (err = %s)", p, err) - - return time.Now().Sub(before), err -} diff --git a/routing/routing.go b/routing/routing.go index 31be8f3f8..db9b49dcd 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -3,7 +3,6 @@ package routing import ( "errors" - "time" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" @@ -38,9 +37,6 @@ type IpfsRouting interface { // with relevant addresses. FindPeer(context.Context, peer.ID) (peer.PeerInfo, error) - // Ping a peer, log the time it took - Ping(context.Context, peer.ID) (time.Duration, error) - // Bootstrap allows callers to hint to the routing system to get into a // Boostrapped state Bootstrap(context.Context) error