diff --git a/net/mock/mock.go b/net/mock/mock.go index 08587c594..1661714a9 100644 --- a/net/mock/mock.go +++ b/net/mock/mock.go @@ -102,6 +102,10 @@ func (c *Conn) removeStream(s *Stream) { func (c *Conn) NewStreamWithProtocol(pr inet.ProtocolID, p peer.Peer) (inet.Stream, error) { + if _, connected := c.local.conns[p]; !connected { + return nil, fmt.Errorf("cannot create new stream for %s. not connected.", p) + } + log.Debugf("NewStreamWithProtocol: %s --> %s", c.local, p) ss, _ := newStreamPair(c.local, p) @@ -193,7 +197,11 @@ func (n *Network) DialPeer(ctx context.Context, p peer.Peer) error { n.Lock() defer n.Unlock() - n.conns[p].connected = true + c, ok := n.conns[p] + if !ok { + return fmt.Errorf("cannot connect to %s (mock needs all nets at start)", p) + } + c.connected = true return nil } @@ -237,7 +245,11 @@ func (n *Network) Conns() []inet.Conn { // ClosePeer connection to peer func (n *Network) ClosePeer(p peer.Peer) error { - return n.conns[p].Close() + c, ok := n.conns[p] + if !ok { + return nil + } + return c.Close() } // close is the real teardown function diff --git a/net/mux.go b/net/mux.go index a12badc3c..94f9cd34a 100644 --- a/net/mux.go +++ b/net/mux.go @@ -69,19 +69,21 @@ func (m *Mux) SetHandler(p ProtocolID, h StreamHandler) { // Handle reads the next name off the Stream, and calls a function func (m *Mux) Handle(s Stream) { - ctx := context.Background() + go func() { + ctx := context.Background() - name, handler, err := m.ReadProtocolHeader(s) - if err != nil { - err = fmt.Errorf("protocol mux error: %s", err) - log.Error(err) - log.Event(ctx, "muxError", lgbl.Error(err)) - return - } + name, handler, err := m.ReadProtocolHeader(s) + if err != nil { + err = fmt.Errorf("protocol mux error: %s", err) + log.Error(err) + log.Event(ctx, "muxError", lgbl.Error(err)) + return + } - log.Info("muxer handle protocol: %s", name) - log.Event(ctx, "muxHandle", eventlog.Metadata{"protocol": name}) - handler(s) + log.Info("muxer handle protocol: %s", name) + log.Event(ctx, "muxHandle", eventlog.Metadata{"protocol": name}) + handler(s) + }() } // ReadLengthPrefix reads the name from Reader with a length-byte-prefix. diff --git a/net/net.go b/net/net.go index e27edc94e..1645e1f17 100644 --- a/net/net.go +++ b/net/net.go @@ -101,7 +101,7 @@ func NewNetwork(ctx context.Context, listen []ma.Multiaddr, local peer.Peer, n := &network{ local: local, swarm: s, - mux: Mux{}, + mux: Mux{Handlers: StreamHandlerMap{}}, cg: ctxgroup.WithContext(ctx), } diff --git a/net/swarm2/swarm.go b/net/swarm2/swarm.go index e236c3ea6..81077d7e8 100644 --- a/net/swarm2/swarm.go +++ b/net/swarm2/swarm.go @@ -32,6 +32,12 @@ type Swarm struct { func NewSwarm(ctx context.Context, listenAddrs []ma.Multiaddr, local peer.Peer, peers peer.Peerstore) (*Swarm, error) { + // make sure our own peer is in our peerstore... + local, err := peers.Add(local) + if err != nil { + return nil, err + } + s := &Swarm{ swarm: ps.NewSwarm(), local: local, @@ -75,13 +81,20 @@ func (s *Swarm) SetStreamHandler(handler StreamHandler) { // NewStreamWithPeer creates a new stream on any available connection to p func (s *Swarm) NewStreamWithPeer(p peer.Peer) (*Stream, error) { + // make sure we use OUR peers. (the tests mess with you...) + p, err := s.peers.Add(p) + if err != nil { + return nil, err + } // if we have no connections, try connecting. if len(s.ConnectionsToPeer(p)) == 0 { + log.Debug("Swarm: NewStreamWithPeer no connections. Attempting to connect...") if _, err := s.Dial(p); err != nil { return nil, err } } + log.Debug("Swarm: NewStreamWithPeer...") st, err := s.swarm.NewStreamWithGroup(p) return wrapStream(st), err @@ -89,11 +102,20 @@ func (s *Swarm) NewStreamWithPeer(p peer.Peer) (*Stream, error) { // StreamsWithPeer returns all the live Streams to p func (s *Swarm) StreamsWithPeer(p peer.Peer) []*Stream { + // make sure we use OUR peers. (the tests mess with you...) + if p2, err := s.peers.Add(p); err == nil { + p = p2 + } + return wrapStreams(ps.StreamsWithGroup(p, s.swarm.Streams())) } // ConnectionsToPeer returns all the live connections to p func (s *Swarm) ConnectionsToPeer(p peer.Peer) []*Conn { + // make sure we use OUR peers. (the tests mess with you...) + if p2, err := s.peers.Add(p); err == nil { + p = p2 + } return wrapConns(ps.ConnsWithGroup(p, s.swarm.Conns())) } @@ -104,6 +126,12 @@ func (s *Swarm) Connections() []*Conn { // CloseConnection removes a given peer from swarm + closes the connection func (s *Swarm) CloseConnection(p peer.Peer) error { + // make sure we use OUR peers. (the tests mess with you...) + p, err := s.peers.Add(p) + if err != nil { + return err + } + conns := s.swarm.ConnsWithGroup(p) // boom. for _, c := range conns { c.Close() diff --git a/routing/dht/dht.go b/routing/dht/dht.go index f85889afd..5a68bd759 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -80,21 +80,17 @@ func NewDHT(ctx context.Context, p peer.Peer, ps peer.Peerstore, n inet.Network, // Connect to a new peer at the given address, ping and add to the routing table func (dht *IpfsDHT) Connect(ctx context.Context, npeer peer.Peer) error { - err := dht.network.DialPeer(ctx, npeer) - if err != nil { + if err := dht.network.DialPeer(ctx, npeer); err != nil { return err } // Ping new peer to register in their routing table // NOTE: this should be done better... - err = dht.Ping(ctx, npeer) - if err != nil { + if err := dht.Ping(ctx, npeer); err != nil { return fmt.Errorf("failed to ping newly connected peer: %s\n", err) } log.Event(ctx, "connect", dht.self, npeer) - dht.Update(ctx, npeer) - return nil } diff --git a/routing/dht/dht_net.go b/routing/dht/dht_net.go new file mode 100644 index 000000000..e31a52da7 --- /dev/null +++ b/routing/dht/dht_net.go @@ -0,0 +1,104 @@ +package dht + +import ( + "errors" + "time" + + inet "github.com/jbenet/go-ipfs/net" + peer "github.com/jbenet/go-ipfs/peer" + pb "github.com/jbenet/go-ipfs/routing/dht/pb" + + ggio "code.google.com/p/gogoprotobuf/io" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" +) + +// handleNewStream implements the inet.StreamHandler +func (dht *IpfsDHT) handleNewStream(s inet.Stream) { + go dht.handleNewMessage(s) +} + +func (dht *IpfsDHT) handleNewMessage(s inet.Stream) { + defer s.Close() + + ctx := dht.Context() + r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + w := ggio.NewDelimitedWriter(s) + mPeer := s.Conn().RemotePeer() + + // receive msg + pmes := new(pb.Message) + if err := r.ReadMsg(pmes); err != nil { + log.Error("Error unmarshaling data") + return + } + // update the peer (on valid msgs only) + dht.Update(ctx, mPeer) + + log.Event(ctx, "foo", dht.self, mPeer, pmes) + + // get handler for this msg type. + handler := dht.handlerForMsgType(pmes.GetType()) + if handler == nil { + log.Error("got back nil handler from handlerForMsgType") + return + } + + // dispatch handler. + rpmes, err := handler(ctx, mPeer, pmes) + if err != nil { + log.Errorf("handle message error: %s", err) + return + } + + // if nil response, return it before serializing + if rpmes == nil { + log.Warning("Got back nil response from request.") + return + } + + // send out response msg + if err := w.WriteMsg(rpmes); err != nil { + log.Errorf("send response error: %s", err) + return + } + + return +} + +// sendRequest sends out a request, but also makes sure to +// measure the RTT for latency measurements. +func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.Peer, pmes *pb.Message) (*pb.Message, error) { + + log.Debugf("%s dht starting stream", dht.self) + s, err := dht.network.NewStream(inet.ProtocolDHT, p) + if err != nil { + return nil, err + } + defer s.Close() + + r := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + w := ggio.NewDelimitedWriter(s) + + start := time.Now() + + log.Debugf("%s writing", dht.self) + if err := w.WriteMsg(pmes); err != nil { + return nil, err + } + log.Event(ctx, "dhtSentMessage", dht.self, p, pmes) + + log.Debugf("%s reading", dht.self) + defer log.Debugf("%s done", dht.self) + + rpmes := new(pb.Message) + if err := r.ReadMsg(rpmes); err != nil { + return nil, err + } + if rpmes == nil { + return nil, errors.New("no response to request") + } + + p.SetLatency(time.Since(start)) + log.Event(ctx, "dhtReceivedMessage", dht.self, p, rpmes) + return rpmes, nil +} diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index a955290f9..50ec76792 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -2,6 +2,7 @@ package dht import ( "bytes" + "math/rand" "sort" "testing" @@ -20,6 +21,16 @@ import ( "time" ) +func randMultiaddr(t *testing.T) ma.Multiaddr { + + s := fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000+rand.Intn(40000)) + a, err := ma.NewMultiaddr(s) + if err != nil { + t.Fatal(err) + } + return a +} + func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT { peerstore := peer.NewPeerstore() @@ -29,7 +40,6 @@ func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT { } d := NewDHT(ctx, p, peerstore, n, ds.NewMapDatastore()) - d.network.SetHandler(inet.ProtocolDHT, d.handleNewStream) d.Validators["v"] = func(u.Key, []byte) error { return nil @@ -40,7 +50,8 @@ func setupDHT(ctx context.Context, t *testing.T, p peer.Peer) *IpfsDHT { func setupDHTS(ctx context.Context, n int, t *testing.T) ([]ma.Multiaddr, []peer.Peer, []*IpfsDHT) { var addrs []ma.Multiaddr for i := 0; i < n; i++ { - a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 5000+i)) + r := rand.Intn(40000) + a, err := ma.NewMultiaddr(fmt.Sprintf("/ip4/127.0.0.1/tcp/%d", 10000+r)) if err != nil { t.Fatal(err) } @@ -85,15 +96,9 @@ func makePeer(addr ma.Multiaddr) peer.Peer { func TestPing(t *testing.T) { // t.Skip("skipping test to debug another") ctx := context.Background() - u.Debug = false - addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/2222") - if err != nil { - t.Fatal(err) - } - addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/5678") - if err != nil { - t.Fatal(err) - } + + addrA := randMultiaddr(t) + addrB := randMultiaddr(t) peerA := makePeer(addrA) peerB := makePeer(addrB) @@ -106,21 +111,22 @@ func TestPing(t *testing.T) { defer dhtA.network.Close() defer dhtB.network.Close() - err = dhtA.Connect(ctx, peerB) - if err != nil { + if err := dhtA.Connect(ctx, peerB); err != nil { t.Fatal(err) } + // if err := dhtB.Connect(ctx, peerA); err != nil { + // t.Fatal(err) + // } + //Test that we can ping the node ctxT, _ := context.WithTimeout(ctx, 100*time.Millisecond) - err = dhtA.Ping(ctxT, peerB) - if err != nil { + if err := dhtA.Ping(ctxT, peerB); err != nil { t.Fatal(err) } ctxT, _ = context.WithTimeout(ctx, 100*time.Millisecond) - err = dhtB.Ping(ctxT, peerA) - if err != nil { + if err := dhtB.Ping(ctxT, peerA); err != nil { t.Fatal(err) } } @@ -129,15 +135,9 @@ func TestValueGetSet(t *testing.T) { // t.Skip("skipping test to debug another") ctx := context.Background() - u.Debug = false - addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/11235") - if err != nil { - t.Fatal(err) - } - addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/15679") - if err != nil { - t.Fatal(err) - } + + addrA := randMultiaddr(t) + addrB := randMultiaddr(t) peerA := makePeer(addrA) peerB := makePeer(addrB) @@ -156,7 +156,7 @@ func TestValueGetSet(t *testing.T) { defer dhtA.network.Close() defer dhtB.network.Close() - err = dhtA.Connect(ctx, peerB) + err := dhtA.Connect(ctx, peerB) if err != nil { t.Fatal(err) } @@ -189,8 +189,6 @@ func TestProvides(t *testing.T) { // t.Skip("skipping test to debug another") ctx := context.Background() - u.Debug = false - _, peers, dhts := setupDHTS(ctx, 4, t) defer func() { for i := 0; i < 4; i++ { @@ -251,7 +249,6 @@ func TestProvidesAsync(t *testing.T) { } ctx := context.Background() - u.Debug = false _, peers, dhts := setupDHTS(ctx, 4, t) defer func() { @@ -317,7 +314,7 @@ func TestLayeredGet(t *testing.T) { } ctx := context.Background() - u.Debug = false + _, peers, dhts := setupDHTS(ctx, 4, t) defer func() { for i := 0; i < 4; i++ { @@ -371,7 +368,6 @@ func TestFindPeer(t *testing.T) { } ctx := context.Background() - u.Debug = false _, peers, dhts := setupDHTS(ctx, 4, t) defer func() { @@ -412,12 +408,13 @@ func TestFindPeer(t *testing.T) { } func TestFindPeersConnectedToPeer(t *testing.T) { + t.Skip("not quite correct (see note)") + if testing.Short() { t.SkipNow() } ctx := context.Background() - u.Debug = false _, peers, dhts := setupDHTS(ctx, 4, t) defer func() { @@ -516,15 +513,9 @@ func TestConnectCollision(t *testing.T) { log.Notice("Running Time: ", rtime) ctx := context.Background() - u.Debug = false - addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/11235") - if err != nil { - t.Fatal(err) - } - addrB, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/15679") - if err != nil { - t.Fatal(err) - } + + addrA := randMultiaddr(t) + addrB := randMultiaddr(t) peerA := makePeer(addrA) peerB := makePeer(addrB) diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index f69d4d018..1fb46b521 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -1,150 +1,48 @@ package dht import ( + "math/rand" "testing" crand "crypto/rand" - context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" - "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto" - ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" inet "github.com/jbenet/go-ipfs/net" + mocknet "github.com/jbenet/go-ipfs/net/mock" peer "github.com/jbenet/go-ipfs/peer" routing "github.com/jbenet/go-ipfs/routing" pb "github.com/jbenet/go-ipfs/routing/dht/pb" u "github.com/jbenet/go-ipfs/util" testutil "github.com/jbenet/go-ipfs/util/testutil" - "sync" + ggio "code.google.com/p/gogoprotobuf/io" + context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context" + ds "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" + "time" ) -// mesHandleFunc is a function that takes in outgoing messages -// and can respond to them, simulating other peers on the network. -// returning nil will chose not to respond and pass the message onto the -// next registered handler -type mesHandleFunc func(msg.NetMessage) msg.NetMessage - -// fauxNet is a standin for a swarm.Network in order to more easily recreate -// different testing scenarios -type fauxSender struct { - sync.Mutex - handlers []mesHandleFunc -} - -func (f *fauxSender) AddHandler(fn func(msg.NetMessage) msg.NetMessage) { - f.Lock() - defer f.Unlock() - - f.handlers = append(f.handlers, fn) -} - -func (f *fauxSender) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) { - f.Lock() - handlers := make([]mesHandleFunc, len(f.handlers)) - copy(handlers, f.handlers) - f.Unlock() - - for _, h := range handlers { - reply := h(m) - if reply != nil { - return reply, nil - } - } - - // no reply? ok force a timeout - select { - case <-ctx.Done(): - } - - return nil, ctx.Err() -} - -func (f *fauxSender) SendMessage(ctx context.Context, m msg.NetMessage) error { - f.Lock() - handlers := make([]mesHandleFunc, len(f.handlers)) - copy(handlers, f.handlers) - f.Unlock() - - for _, h := range handlers { - reply := h(m) - if reply != nil { - return nil - } - } - return nil -} - -// fauxNet is a standin for a swarm.Network in order to more easily recreate -// different testing scenarios -type fauxNet struct { - local peer.Peer -} - -// DialPeer attempts to establish a connection to a given peer -func (f *fauxNet) DialPeer(context.Context, peer.Peer) error { - return nil -} - -func (f *fauxNet) LocalPeer() peer.Peer { - return f.local -} - -// ClosePeer connection to peer -func (f *fauxNet) ClosePeer(peer.Peer) error { - return nil -} - -// IsConnected returns whether a connection to given peer exists. -func (f *fauxNet) IsConnected(peer.Peer) (bool, error) { - return true, nil -} - -// Connectedness returns whether a connection to given peer exists. -func (f *fauxNet) Connectedness(peer.Peer) inet.Connectedness { - return inet.Connected -} - -// GetProtocols returns the protocols registered in the network. -func (f *fauxNet) GetProtocols() *mux.ProtocolMap { return nil } - -// SendMessage sends given Message out -func (f *fauxNet) SendMessage(msg.NetMessage) error { - return nil -} - -func (f *fauxNet) GetPeerList() []peer.Peer { - return nil -} - -func (f *fauxNet) GetBandwidthTotals() (uint64, uint64) { - return 0, 0 -} - -// Close terminates all network operation -func (f *fauxNet) Close() error { return nil } - func TestGetFailures(t *testing.T) { if testing.Short() { t.SkipNow() } + ctx := context.Background() peerstore := peer.NewPeerstore() local := makePeerString(t, "") + peers := []peer.Peer{local, testutil.RandPeer()} - ctx := context.Background() - fn := &fauxNet{local} - fs := &fauxSender{} + nets, err := mocknet.MakeNetworks(ctx, peers) + if err != nil { + t.Fatal(err) + } - d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) - other := makePeerString(t, "") - d.Update(ctx, other) + d := NewDHT(ctx, peers[0], peerstore, nets[0], ds.NewMapDatastore()) + d.Update(ctx, peers[1]) // This one should time out // u.POut("Timout Test\n") ctx1, _ := context.WithTimeout(context.Background(), time.Second) - _, err := d.GetValue(ctx1, u.Key("test")) - if err != nil { + if _, err := d.GetValue(ctx1, u.Key("test")); err != nil { if err != context.DeadlineExceeded { t.Fatal("Got different error than we expected", err) } @@ -152,20 +50,29 @@ func TestGetFailures(t *testing.T) { t.Fatal("Did not get expected error!") } + msgs := make(chan *pb.Message, 100) + // u.POut("NotFound Test\n") // Reply with failures to every message - fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage { + nets[1].SetHandler(inet.ProtocolDHT, func(s inet.Stream) { + defer s.Close() + + pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + pbw := ggio.NewDelimitedWriter(s) + pmes := new(pb.Message) - err := proto.Unmarshal(mes.Data(), pmes) - if err != nil { - t.Fatal(err) + if err := pbr.ReadMsg(pmes); err != nil { + panic(err) } resp := &pb.Message{ Type: pmes.Type, } - m, err := msg.FromObject(mes.Peer(), resp) - return m + if err := pbw.WriteMsg(resp); err != nil { + panic(err) + } + + msgs <- resp }) // This one should fail with NotFound @@ -179,40 +86,45 @@ func TestGetFailures(t *testing.T) { t.Fatal("expected error, got none.") } - fs.handlers = nil // Now we test this DHT's handleGetValue failure - typ := pb.Message_GET_VALUE - str := "hello" - rec, err := d.makePutRecord(u.Key(str), []byte("blah")) - if err != nil { - t.Fatal(err) - } - req := pb.Message{ - Type: &typ, - Key: &str, - Record: rec, - } + { + typ := pb.Message_GET_VALUE + str := "hello" + rec, err := d.makePutRecord(u.Key(str), []byte("blah")) + if err != nil { + t.Fatal(err) + } + req := pb.Message{ + Type: &typ, + Key: &str, + Record: rec, + } - // u.POut("handleGetValue Test\n") - mes, err := msg.FromObject(other, &req) - if err != nil { - t.Error(err) - } + // u.POut("handleGetValue Test\n") + s, err := nets[1].NewStream(inet.ProtocolDHT, peers[0]) + if err != nil { + t.Fatal(err) + } + defer s.Close() - mes = d.HandleMessage(ctx, mes) + pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + pbw := ggio.NewDelimitedWriter(s) - pmes := new(pb.Message) - err = proto.Unmarshal(mes.Data(), pmes) - if err != nil { - t.Fatal(err) - } - if pmes.GetRecord() != nil { - t.Fatal("shouldnt have value") - } - if pmes.GetProviderPeers() != nil { - t.Fatal("shouldnt have provider peers") - } + if err := pbw.WriteMsg(&req); err != nil { + t.Fatal(err) + } + pmes := new(pb.Message) + if err := pbr.ReadMsg(pmes); err != nil { + t.Fatal(err) + } + if pmes.GetRecord() != nil { + t.Fatal("shouldnt have value") + } + if pmes.GetProviderPeers() != nil { + t.Fatal("shouldnt have provider peers") + } + } } // TODO: Maybe put these in some sort of "ipfs_testutil" package @@ -228,49 +140,57 @@ func TestNotFound(t *testing.T) { t.SkipNow() } - local := makePeerString(t, "") - peerstore := peer.NewPeerstore() - peerstore.Add(local) - ctx := context.Background() - fn := &fauxNet{local} - fs := &fauxSender{} + peerstore := peer.NewPeerstore() - d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) + var peers []peer.Peer + for i := 0; i < 16; i++ { + peers = append(peers, testutil.RandPeer()) + } - var ps []peer.Peer - for i := 0; i < 5; i++ { - ps = append(ps, _randPeer()) - d.Update(ctx, ps[i]) + nets, err := mocknet.MakeNetworks(ctx, peers) + if err != nil { + t.Fatal(err) + } + + d := NewDHT(ctx, peers[0], peerstore, nets[0], ds.NewMapDatastore()) + + for _, p := range peers { + d.Update(ctx, p) } // Reply with random peers to every message - fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage { - pmes := new(pb.Message) - err := proto.Unmarshal(mes.Data(), pmes) - if err != nil { - t.Fatal(err) - } + for _, neti := range nets { + neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) { + defer s.Close() - switch pmes.GetType() { - case pb.Message_GET_VALUE: - resp := &pb.Message{Type: pmes.Type} + pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + pbw := ggio.NewDelimitedWriter(s) - peers := []peer.Peer{} - for i := 0; i < 7; i++ { - peers = append(peers, _randPeer()) + pmes := new(pb.Message) + if err := pbr.ReadMsg(pmes); err != nil { + panic(err) } - resp.CloserPeers = pb.PeersToPBPeers(d.dialer, peers) - mes, err := msg.FromObject(mes.Peer(), resp) - if err != nil { - t.Error(err) - } - return mes - default: - panic("Shouldnt recieve this.") - } - }) + switch pmes.GetType() { + case pb.Message_GET_VALUE: + resp := &pb.Message{Type: pmes.Type} + + ps := []peer.Peer{} + for i := 0; i < 7; i++ { + ps = append(ps, peers[rand.Intn(len(peers))]) + } + + resp.CloserPeers = pb.PeersToPBPeers(d.network, peers) + if err := pbw.WriteMsg(resp); err != nil { + panic(err) + } + + default: + panic("Shouldnt recieve this.") + } + }) + } ctx, _ = context.WithTimeout(ctx, time.Second*5) v, err := d.GetValue(ctx, u.Key("hello")) @@ -294,53 +214,57 @@ func TestNotFound(t *testing.T) { func TestLessThanKResponses(t *testing.T) { // t.Skip("skipping test because it makes a lot of output") - local := makePeerString(t, "") - peerstore := peer.NewPeerstore() - peerstore.Add(local) - ctx := context.Background() - u.Debug = false - fn := &fauxNet{local} - fs := &fauxSender{} + peerstore := peer.NewPeerstore() - d := NewDHT(ctx, local, peerstore, fn, fs, ds.NewMapDatastore()) - - var ps []peer.Peer - for i := 0; i < 5; i++ { - ps = append(ps, _randPeer()) - d.Update(ctx, ps[i]) + var peers []peer.Peer + for i := 0; i < 6; i++ { + peers = append(peers, testutil.RandPeer()) + } + + nets, err := mocknet.MakeNetworks(ctx, peers) + if err != nil { + t.Fatal(err) + } + + d := NewDHT(ctx, peers[0], peerstore, nets[0], ds.NewMapDatastore()) + + for i := 1; i < 5; i++ { + d.Update(ctx, peers[i]) } - other := _randPeer() // Reply with random peers to every message - fs.AddHandler(func(mes msg.NetMessage) msg.NetMessage { - pmes := new(pb.Message) - err := proto.Unmarshal(mes.Data(), pmes) - if err != nil { - t.Fatal(err) - } + for _, neti := range nets { + neti.SetHandler(inet.ProtocolDHT, func(s inet.Stream) { + defer s.Close() - switch pmes.GetType() { - case pb.Message_GET_VALUE: - resp := &pb.Message{ - Type: pmes.Type, - CloserPeers: pb.PeersToPBPeers(d.dialer, []peer.Peer{other}), + pbr := ggio.NewDelimitedReader(s, inet.MessageSizeMax) + pbw := ggio.NewDelimitedWriter(s) + + pmes := new(pb.Message) + if err := pbr.ReadMsg(pmes); err != nil { + panic(err) } - mes, err := msg.FromObject(mes.Peer(), resp) - if err != nil { - t.Error(err) - } - return mes - default: - panic("Shouldnt recieve this.") - } + switch pmes.GetType() { + case pb.Message_GET_VALUE: + resp := &pb.Message{ + Type: pmes.Type, + CloserPeers: pb.PeersToPBPeers(d.network, []peer.Peer{peers[1]}), + } - }) + if err := pbw.WriteMsg(resp); err != nil { + panic(err) + } + default: + panic("Shouldnt recieve this.") + } + + }) + } ctx, _ = context.WithTimeout(ctx, time.Second*30) - _, err := d.GetValue(ctx, u.Key("hello")) - if err != nil { + if _, err := d.GetValue(ctx, u.Key("hello")); err != nil { switch err { case routing.ErrNotFound: //Success!