diff --git a/namesys/ipns_select_test.go b/namesys/ipns_select_test.go new file mode 100644 index 000000000..ebd81e86d --- /dev/null +++ b/namesys/ipns_select_test.go @@ -0,0 +1,127 @@ +package namesys + +import ( + "fmt" + "math/rand" + "testing" + "time" + + proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + + pb "github.com/ipfs/go-ipfs/namesys/pb" + ci "github.com/ipfs/go-ipfs/p2p/crypto" + path "github.com/ipfs/go-ipfs/path" + u "github.com/ipfs/go-ipfs/util" +) + +func shuffle(a []*pb.IpnsEntry) { + for n := 0; n < 5; n++ { + for i, _ := range a { + j := rand.Intn(len(a)) + a[i], a[j] = a[j], a[i] + } + } +} + +func AssertSelected(r *pb.IpnsEntry, from ...*pb.IpnsEntry) error { + shuffle(from) + var vals [][]byte + for _, r := range from { + data, err := proto.Marshal(r) + if err != nil { + return err + } + vals = append(vals, data) + } + + i, err := selectRecord(from, vals) + if err != nil { + return err + } + + if from[i] != r { + return fmt.Errorf("selected incorrect record %d", i) + } + + return nil +} + +func TestOrdering(t *testing.T) { + // select timestamp so selection is deterministic + ts := time.Unix(1000000, 0) + + // generate a key for signing the records + r := u.NewSeededRand(15) // generate deterministic keypair + priv, _, err := ci.GenerateKeyPairWithReader(ci.RSA, 1024, r) + if err != nil { + t.Fatal(err) + } + + e1, err := CreateRoutingEntryData(priv, path.Path("foo"), 1, ts.Add(time.Hour)) + if err != nil { + t.Fatal(err) + } + + e2, err := CreateRoutingEntryData(priv, path.Path("bar"), 2, ts.Add(time.Hour)) + if err != nil { + t.Fatal(err) + } + + e3, err := CreateRoutingEntryData(priv, path.Path("baz"), 3, ts.Add(time.Hour)) + if err != nil { + t.Fatal(err) + } + + e4, err := CreateRoutingEntryData(priv, path.Path("cat"), 3, ts.Add(time.Hour*2)) + if err != nil { + t.Fatal(err) + } + + e5, err := CreateRoutingEntryData(priv, path.Path("dog"), 4, ts.Add(time.Hour*3)) + if err != nil { + t.Fatal(err) + } + + e6, err := CreateRoutingEntryData(priv, path.Path("fish"), 4, ts.Add(time.Hour*3)) + if err != nil { + t.Fatal(err) + } + + // e1 is the only record, i hope it gets this right + err = AssertSelected(e1, e1) + if err != nil { + t.Fatal(err) + } + + // e2 has the highest sequence number + err = AssertSelected(e2, e1, e2) + if err != nil { + t.Fatal(err) + } + + // e3 has the highest sequence number + err = AssertSelected(e3, e1, e2, e3) + if err != nil { + t.Fatal(err) + } + + // e4 has a higher timeout + err = AssertSelected(e4, e1, e2, e3, e4) + if err != nil { + t.Fatal(err) + } + + // e5 has the highest sequence number + err = AssertSelected(e5, e1, e2, e3, e4, e5) + if err != nil { + t.Fatal(err) + } + + // e6 should be selected as its signauture will win in the comparison + err = AssertSelected(e6, e1, e2, e3, e4, e5, e6) + if err != nil { + t.Fatal(err) + } + + _ = []interface{}{e1, e2, e3, e4, e5, e6} +} diff --git a/namesys/publisher.go b/namesys/publisher.go index 33c7a49cc..521ce4cc8 100644 --- a/namesys/publisher.go +++ b/namesys/publisher.go @@ -60,7 +60,7 @@ func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value return err } - namekey, ipnskey := IpnsKeysForID(id) + _, ipnskey := IpnsKeysForID(id) // get previous records sequence number, and add one to it var seqnum uint64 @@ -77,17 +77,22 @@ func (p *ipnsPublisher) PublishWithEOL(ctx context.Context, k ci.PrivKey, value return err } + return PutRecordToRouting(ctx, k, value, seqnum, eol, p.routing, id) +} + +func PutRecordToRouting(ctx context.Context, k ci.PrivKey, value path.Path, seqnum uint64, eol time.Time, r routing.IpfsRouting, id peer.ID) error { + namekey, ipnskey := IpnsKeysForID(id) entry, err := CreateRoutingEntryData(k, value, seqnum, eol) if err != nil { return err } - err = PublishEntry(ctx, p.routing, ipnskey, entry) + err = PublishEntry(ctx, r, ipnskey, entry) if err != nil { return err } - err = PublishPublicKey(ctx, p.routing, namekey, k.GetPublic()) + err = PublishPublicKey(ctx, r, namekey, k.GetPublic()) if err != nil { return err } @@ -174,13 +179,18 @@ func IpnsSelectorFunc(k key.Key, vals [][]byte) (int, error) { } } + return selectRecord(recs, vals) +} + +func selectRecord(recs []*pb.IpnsEntry, vals [][]byte) (int, error) { var best_seq uint64 best_i := -1 for i, r := range recs { - if r == nil { + if r == nil || r.GetSequence() < best_seq { continue } + if best_i == -1 || r.GetSequence() > best_seq { best_seq = r.GetSequence() best_i = i @@ -196,8 +206,11 @@ func IpnsSelectorFunc(k key.Key, vals [][]byte) (int, error) { } if rt.After(bestt) { - best_seq = r.GetSequence() best_i = i + } else if rt == bestt { + if bytes.Compare(vals[i], vals[best_i]) > 0 { + best_i = i + } } } } diff --git a/namesys/republisher/repub.go b/namesys/republisher/repub.go index 9dd95f563..4fece5721 100644 --- a/namesys/republisher/repub.go +++ b/namesys/republisher/repub.go @@ -69,7 +69,7 @@ func (rp *Republisher) Run(proc goprocess.Process) { case <-tick.C: err := rp.republishEntries(proc) if err != nil { - log.Error(err) + log.Error("Republisher failed to republish: ", err) } case <-proc.Closing(): return @@ -86,7 +86,7 @@ func (rp *Republisher) republishEntries(p goprocess.Process) error { priv := rp.ps.PrivKey(id) // Look for it locally only - namekey, ipnskey := namesys.IpnsKeysForID(id) + _, ipnskey := namesys.IpnsKeysForID(id) p, seq, err := rp.getLastVal(ipnskey) if err != nil { if err == errNoEntry { @@ -97,19 +97,7 @@ func (rp *Republisher) republishEntries(p goprocess.Process) error { // update record with same sequence number eol := time.Now().Add(rp.RecordLifetime) - entry, err := namesys.CreateRoutingEntryData(priv, p, seq, eol) - if err != nil { - return err - } - - // republish public key - err = namesys.PublishPublicKey(ctx, rp.r, namekey, priv.GetPublic()) - if err != nil { - return err - } - - // republish ipns entry - err = namesys.PublishEntry(ctx, rp.r, ipnskey, entry) + err = namesys.PutRecordToRouting(ctx, priv, p, seq, eol, rp.r, id) if err != nil { return err } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 57341e69c..df93396ce 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -3,6 +3,7 @@ package dht import ( "bytes" "sync" + "time" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" @@ -60,6 +61,8 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) err for p := range pchan { wg.Add(1) go func(p peer.ID) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() defer wg.Done() notif.PublishQueryEvent(ctx, ¬if.QueryEvent{ Type: notif.Value, @@ -78,7 +81,10 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) err // GetValue searches for the value corresponding to given Key. func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) { - vals, err := dht.GetValues(ctx, key, 3) + ctx, cancel := context.WithTimeout(ctx, time.Minute) + defer cancel() + + vals, err := dht.GetValues(ctx, key, 16) if err != nil { return nil, err } @@ -111,6 +117,8 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) { // if someone sent us a different 'less-valid' record, lets correct them if !bytes.Equal(v.Val, best) { go func(v routing.RecvdVal) { + ctx, cancel := context.WithTimeout(dht.Context(), time.Second*30) + defer cancel() err := dht.putValueToPeer(ctx, v.From, key, fixupRec) if err != nil { log.Error("Error correcting DHT entry: ", err)