diff --git a/routing/dht/dht.go b/routing/dht/dht.go index 9338339da..89abd093a 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -1,6 +1,7 @@ package dht import ( + "bytes" "crypto/rand" "errors" "fmt" @@ -190,15 +191,20 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p *peer.Peer, pmes *Message return rpmes, nil } -func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, key string, value []byte) error { +func (dht *IpfsDHT) putValueToNetwork(ctx context.Context, p *peer.Peer, + key string, value []byte) error { + pmes := newMessage(Message_PUT_VALUE, string(key), 0) pmes.Value = value - - mes, err := msg.FromObject(p, pmes) + rpmes, err := dht.sendRequest(ctx, p, pmes) if err != nil { return err } - return dht.sender.SendMessage(ctx, mes) + + if !bytes.Equal(rpmes.Value, pmes.Value) { + return errors.New("value not put correctly") + } + return nil } func (dht *IpfsDHT) putProvider(ctx context.Context, p *peer.Peer, key string) error { diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 6cf9c115d..b7e24b1d7 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -117,7 +117,7 @@ func TestPing(t *testing.T) { } func TestValueGetSet(t *testing.T) { - u.Debug = false + u.Debug = true addrA, err := ma.NewMultiaddr("/ip4/127.0.0.1/tcp/1235") if err != nil { t.Fatal(err) diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 124bd76f5..5320cc10a 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -38,7 +38,7 @@ func (dht *IpfsDHT) handlerForMsgType(t Message_MessageType) dhtHandler { } func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error) { - u.DOut("handleGetValue for key: %s\n", pmes.GetKey()) + u.DOut("[%s] handleGetValue for key: %s\n", dht.self.ID.Pretty(), pmes.GetKey()) // setup response resp := newMessage(pmes.GetType(), pmes.GetKey(), pmes.GetClusterLevel()) @@ -50,11 +50,13 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error } // let's first check if we have the value locally. + u.DOut("[%s] handleGetValue looking into ds\n", dht.self.ID.Pretty()) dskey := ds.NewKey(pmes.GetKey()) iVal, err := dht.datastore.Get(dskey) + u.DOut("[%s] handleGetValue looking into ds GOT %v\n", dht.self.ID.Pretty(), iVal) // if we got an unexpected error, bail. - if err != ds.ErrNotFound { + if err != nil && err != ds.ErrNotFound { return nil, err } @@ -63,7 +65,7 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error // if we have the value, send it back if err == nil { - u.DOut("handleGetValue success!\n") + u.DOut("[%s] handleGetValue success!\n", dht.self.ID.Pretty()) byts, ok := iVal.([]byte) if !ok { @@ -85,7 +87,6 @@ func (dht *IpfsDHT) handleGetValue(p *peer.Peer, pmes *Message) (*Message, error if closer != nil { u.DOut("handleGetValue returning a closer peer: '%s'\n", closer.ID.Pretty()) resp.CloserPeers = peersToPBPeers([]*peer.Peer{closer}) - return resp, nil } return resp, nil @@ -97,8 +98,8 @@ func (dht *IpfsDHT) handlePutValue(p *peer.Peer, pmes *Message) (*Message, error defer dht.dslock.Unlock() dskey := ds.NewKey(pmes.GetKey()) err := dht.datastore.Put(dskey, pmes.GetValue()) - u.DOut("[%s] handlePutValue %v %v", dht.self.ID.Pretty(), dskey, pmes.GetValue()) - return nil, err + u.DOut("[%s] handlePutValue %v %v\n", dht.self.ID.Pretty(), dskey, pmes.GetValue()) + return pmes, err } func (dht *IpfsDHT) handlePing(p *peer.Peer, pmes *Message) (*Message, error) { diff --git a/routing/dht/query.go b/routing/dht/query.go index f4695845f..4db3f70e7 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -117,28 +117,29 @@ func (r *dhtQueryRunner) Run(peers []*peer.Peer) (*dhtQueryResult, error) { // so workers are working. // wait until they're done. + err := u.ErrNotFound + select { case <-r.peersRemaining.Done(): r.cancel() // ran all and nothing. cancel all outstanding workers. - r.RLock() defer r.RUnlock() if len(r.errs) > 0 { - return nil, r.errs[0] + err = r.errs[0] } - return nil, u.ErrNotFound case <-r.ctx.Done(): r.RLock() defer r.RUnlock() - - if r.result != nil && r.result.success { - return r.result, nil - } - return nil, r.ctx.Err() + err = r.ctx.Err() } + if r.result != nil && r.result.success { + return r.result, nil + } + + return nil, err } func (r *dhtQueryRunner) addPeerToQuery(next *peer.Peer, benchmark *peer.Peer) { diff --git a/routing/dht/routing.go b/routing/dht/routing.go index b9fdbeef4..4991a06f3 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -89,6 +89,8 @@ func (dht *IpfsDHT) GetValue(key u.Key, timeout time.Duration) ([]byte, error) { return nil, err } + u.DOut("[%s] GetValue %v %v\n", dht.self.ID.Pretty(), key, result.value) + if result.value == nil { return nil, u.ErrNotFound }