From cfcc3d6a1b77a47cdf6512e6be24e8b3bbd9fb05 Mon Sep 17 00:00:00 2001 From: Jeromy Date: Fri, 18 Sep 2015 10:27:55 -0700 Subject: [PATCH] ipns record selection via sequence numbers This commit adds a sequence number to the IpnsEntry protobuf that is used to determine which among a set of entries for the same key is the 'most correct'. GetValues has been added to the routing interface to retrieve a set of records from the dht, for the caller to select from. GetValue (singular) will call GetValues, select the 'best' record, and then update that record to peers we received outdated records from. This will help keep the dht consistent. License: MIT Signed-off-by: Jeromy --- core/core.go | 1 + namesys/pb/namesys.pb.go | 8 +++ namesys/pb/namesys.proto | 2 + namesys/publisher.go | 77 ++++++++++++++++++++-- routing/dht/dht.go | 17 +++-- routing/dht/dht_test.go | 12 +++- routing/dht/handlers.go | 2 +- routing/dht/routing.go | 102 ++++++++++++++++++++++++----- routing/mock/centralized_client.go | 15 +++++ routing/none/none_client.go | 4 ++ routing/offline/offline.go | 21 ++++++ routing/record/record.go | 2 - routing/record/selection.go | 40 +++++++++++ routing/routing.go | 19 ++++++ routing/supernode/client.go | 16 +++++ 15 files changed, 301 insertions(+), 37 deletions(-) create mode 100644 routing/record/selection.go diff --git a/core/core.go b/core/core.go index 5f246c833..d52883cd8 100644 --- a/core/core.go +++ b/core/core.go @@ -501,6 +501,7 @@ func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config) func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.ThreadSafeDatastore) (routing.IpfsRouting, error) { dhtRouting := dht.NewDHT(ctx, host, dstore) dhtRouting.Validator[IpnsValidatorTag] = namesys.IpnsRecordValidator + dhtRouting.Selector[IpnsValidatorTag] = namesys.IpnsSelectorFunc return dhtRouting, nil } diff --git a/namesys/pb/namesys.pb.go b/namesys/pb/namesys.pb.go index 97e25a855..4d99a5e0a 100644 --- a/namesys/pb/namesys.pb.go +++ b/namesys/pb/namesys.pb.go @@ -56,6 +56,7 @@ type IpnsEntry struct { Signature []byte `protobuf:"bytes,2,req,name=signature" json:"signature,omitempty"` ValidityType *IpnsEntry_ValidityType `protobuf:"varint,3,opt,name=validityType,enum=namesys.pb.IpnsEntry_ValidityType" json:"validityType,omitempty"` Validity []byte `protobuf:"bytes,4,opt,name=validity" json:"validity,omitempty"` + Sequence *uint64 `protobuf:"varint,5,opt,name=sequence" json:"sequence,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -91,6 +92,13 @@ func (m *IpnsEntry) GetValidity() []byte { return nil } +func (m *IpnsEntry) GetSequence() uint64 { + if m != nil && m.Sequence != nil { + return *m.Sequence + } + return 0 +} + func init() { proto.RegisterEnum("namesys.pb.IpnsEntry_ValidityType", IpnsEntry_ValidityType_name, IpnsEntry_ValidityType_value) } diff --git a/namesys/pb/namesys.proto b/namesys/pb/namesys.proto index 4219af6bb..242f77bf2 100644 --- a/namesys/pb/namesys.proto +++ b/namesys/pb/namesys.proto @@ -10,4 +10,6 @@ message IpnsEntry { optional ValidityType validityType = 3; optional bytes validity = 4; + + optional uint64 sequence = 5; } diff --git a/namesys/publisher.go b/namesys/publisher.go index e3dd1d81b..a37a39fbe 100644 --- a/namesys/publisher.go +++ b/namesys/publisher.go @@ -7,6 +7,7 @@ import ( "time" proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" @@ -45,10 +46,6 @@ func NewRoutingPublisher(route routing.IpfsRouting) Publisher { func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Path) error { log.Debugf("Publish %s", value) - data, err := createRoutingEntryData(k, value) - if err != nil { - return err - } pubkey := k.GetPublic() pkbytes, err := pubkey.Bytes() if err != nil { @@ -57,6 +54,27 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa nameb := u.Hash(pkbytes) namekey := key.Key("/pk/" + string(nameb)) + ipnskey := key.Key("/ipns/" + string(nameb)) + + // get previous records sequence number, and add one to it + var seqnum uint64 + prevrec, err := p.routing.GetValues(ctx, ipnskey, 0) + if err == nil { + e := new(pb.IpnsEntry) + err := proto.Unmarshal(prevrec[0].Val, e) + if err != nil { + return err + } + + seqnum = e.GetSequence() + 1 + } else if err != ds.ErrNotFound { + return err + } + + data, err := createRoutingEntryData(k, value, seqnum) + if err != nil { + return err + } log.Debugf("Storing pubkey at: %s", namekey) // Store associated public key @@ -67,8 +85,6 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa return err } - ipnskey := key.Key("/ipns/" + string(nameb)) - log.Debugf("Storing ipns entry at: %s", ipnskey) // Store ipns entry at "/ipns/"+b58(h(pubkey)) timectx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Second*10)) @@ -80,12 +96,13 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa return nil } -func createRoutingEntryData(pk ci.PrivKey, val path.Path) ([]byte, error) { +func createRoutingEntryData(pk ci.PrivKey, val path.Path, seq uint64) ([]byte, error) { entry := new(pb.IpnsEntry) entry.Value = []byte(val) typ := pb.IpnsEntry_EOL entry.ValidityType = &typ + entry.Sequence = proto.Uint64(seq) entry.Validity = []byte(u.FormatRFC3339(time.Now().Add(time.Hour * 24))) sig, err := pk.Sign(ipnsEntryDataForSig(entry)) @@ -110,6 +127,52 @@ var IpnsRecordValidator = &record.ValidChecker{ Sign: true, } +func IpnsSelectorFunc(k key.Key, vals [][]byte) (int, error) { + var recs []*pb.IpnsEntry + for _, v := range vals { + e := new(pb.IpnsEntry) + err := proto.Unmarshal(v, e) + if err == nil { + recs = append(recs, e) + } else { + recs = append(recs, nil) + } + } + + var best_seq uint64 + best_i := -1 + + for i, r := range recs { + if r == nil { + continue + } + if best_i == -1 || r.GetSequence() > best_seq { + best_seq = r.GetSequence() + best_i = i + } else if r.GetSequence() == best_seq { + rt, err := u.ParseRFC3339(string(r.GetValidity())) + if err != nil { + continue + } + + bestt, err := u.ParseRFC3339(string(recs[best_i].GetValidity())) + if err != nil { + continue + } + + if rt.After(bestt) { + best_seq = r.GetSequence() + best_i = i + } + } + } + if best_i == -1 { + return 0, errors.New("no usable records in given set") + } + + return best_i, nil +} + // ValidateIpnsRecord implements ValidatorFunc and verifies that the // given 'val' is an IpnsEntry and that that entry is valid. func ValidateIpnsRecord(k key.Key, val []byte) error { diff --git a/routing/dht/dht.go b/routing/dht/dht.go index b5d6d1611..64f28c74e 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -54,6 +54,7 @@ type IpfsDHT struct { diaglock sync.Mutex // lock to make diagnostics work better Validator record.Validator // record validator funcs + Selector record.Selector // record selection funcs ctx context.Context proc goprocess.Process @@ -89,6 +90,9 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip dht.Validator = make(record.Validator) dht.Validator["pk"] = record.PublicKeyValidator + dht.Selector = make(record.Selector) + dht.Selector["pk"] = record.PublicKeySelector + return dht } @@ -152,13 +156,16 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, skey string) err // NOTE: it will update the dht's peerstore with any new addresses // it finds for the given peer. func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, - key key.Key) ([]byte, []peer.PeerInfo, error) { + key key.Key) (*pb.Record, []peer.PeerInfo, error) { pmes, err := dht.getValueSingle(ctx, p, key) if err != nil { return nil, nil, err } + // Perhaps we were given closer peers + peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers()) + if record := pmes.GetRecord(); record != nil { // Success! We were given the value log.Debug("getValueOrPeers: got value") @@ -169,11 +176,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, log.Info("Received invalid record! (discarded)") return nil, nil, err } - return record.GetValue(), nil, nil + return record, peers, nil } - // Perhaps we were given closer peers - peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers()) if len(peers) > 0 { log.Debug("getValueOrPeers: peers") return nil, peers, nil @@ -193,7 +198,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, } // getLocal attempts to retrieve the value from the datastore -func (dht *IpfsDHT) getLocal(key key.Key) ([]byte, error) { +func (dht *IpfsDHT) getLocal(key key.Key) (*pb.Record, error) { log.Debug("getLocal %s", key) v, err := dht.datastore.Get(key.DsKey()) @@ -221,7 +226,7 @@ func (dht *IpfsDHT) getLocal(key key.Key) ([]byte, error) { } } - return rec.GetValue(), nil + return rec, nil } // getOwnPrivateKey attempts to load the local peers private diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 2e63e438e..c09871610 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -131,8 +131,14 @@ func TestValueGetSet(t *testing.T) { }, Sign: false, } + nulsel := func(_ key.Key, bs [][]byte) (int, error) { + return 0, nil + } + dhtA.Validator["v"] = vf dhtB.Validator["v"] = vf + dhtA.Selector["v"] = nulsel + dhtB.Selector["v"] = nulsel connect(t, ctx, dhtA, dhtB) @@ -193,7 +199,7 @@ func TestProvides(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Equal(bits, v) { + if !bytes.Equal(bits.GetValue(), v) { t.Fatal("didn't store the right bits (%s, %s)", k, v) } } @@ -466,7 +472,7 @@ func TestProvidesMany(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Equal(bits, v) { + if !bytes.Equal(bits.GetValue(), v) { t.Fatal("didn't store the right bits (%s, %s)", k, v) } @@ -558,7 +564,7 @@ func TestProvidesAsync(t *testing.T) { } bits, err := dhts[3].getLocal(k) - if err != nil && bytes.Equal(bits, val) { + if err != nil && bytes.Equal(bits.GetValue(), val) { t.Fatal(err) } diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 36a92a251..1c5d2b1c1 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -108,7 +108,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess dskey := key.Key(pmes.GetKey()).DsKey() if err := dht.verifyRecordLocally(pmes.GetRecord()); err != nil { - log.Debugf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err) + log.Warningf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err) return nil, err } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 80652f6ad..d5854155f 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -1,6 +1,7 @@ package dht import ( + "bytes" "sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" @@ -76,16 +77,71 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) err } // GetValue searches for the value corresponding to given Key. -// If the search does not succeed, a multiaddr string of a closer peer is -// returned along with util.ErrSearchIncomplete func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) { + vals, err := dht.GetValues(ctx, key, 3) + if err != nil { + return nil, err + } + + var recs [][]byte + for _, v := range vals { + recs = append(recs, v.Val) + } + + i, err := dht.Selector.BestRecord(key, recs) + if err != nil { + return nil, err + } + + best := recs[i] + log.Debugf("GetValue %v %v", key, best) + if best == nil { + log.Errorf("GetValue yielded correct record with nil value.") + return nil, routing.ErrNotFound + } + + fixupRec, err := record.MakePutRecord(dht.peerstore.PrivKey(dht.self), key, best, true) + if err != nil { + // probably shouldnt actually 'error' here as we have found a value we like, + // but this call failing probably isnt something we want to ignore + return nil, err + } + + for _, v := range vals { + // if someone sent us a different 'less-valid' record, lets correct them + if !bytes.Equal(v.Val, best) { + go func(v routing.RecvdVal) { + err := dht.putValueToPeer(ctx, v.From, key, fixupRec) + if err != nil { + log.Error("Error correcting DHT entry: ", err) + } + }(v) + } + } + + return best, nil +} + +func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]routing.RecvdVal, error) { + var vals []routing.RecvdVal + var valslock sync.Mutex + // If we have it local, dont bother doing an RPC! - val, err := dht.getLocal(key) + lrec, err := dht.getLocal(key) if err == nil { + // TODO: this is tricky, we dont always want to trust our own value + // what if the authoritative source updated it? log.Debug("have it locally") - return val, nil - } else { - log.Debug("failed to get value locally: %s", err) + vals = append(vals, routing.RecvdVal{ + Val: lrec.GetValue(), + From: dht.self, + }) + + if nvals <= 1 { + return vals, nil + } + } else if nvals == 0 { + return nil, err } // get closest peers in the routing table @@ -104,14 +160,26 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) { ID: p, }) - val, peers, err := dht.getValueOrPeers(ctx, p, key) + rec, peers, err := dht.getValueOrPeers(ctx, p, key) if err != nil { return nil, err } - res := &dhtQueryResult{value: val, closerPeers: peers} - if val != nil { - res.success = true + res := &dhtQueryResult{closerPeers: peers} + + if rec.GetValue() != nil { + rv := routing.RecvdVal{ + Val: rec.GetValue(), + From: p, + } + valslock.Lock() + vals = append(vals, rv) + + // If weve collected enough records, we're done + if len(vals) >= nvals { + res.success = true + } + valslock.Unlock() } notif.PublishQueryEvent(parent, ¬if.QueryEvent{ @@ -124,17 +192,15 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) { }) // run it! - result, err := query.Run(ctx, rtp) - if err != nil { - return nil, err + _, err = query.Run(ctx, rtp) + if len(vals) == 0 { + if err != nil { + return nil, err + } } - log.Debugf("GetValue %v %v", key, result.value) - if result.value == nil { - return nil, routing.ErrNotFound - } + return vals, nil - return result.value, nil } // Value provider layer of indirection. diff --git a/routing/mock/centralized_client.go b/routing/mock/centralized_client.go index 9d577c4a3..09f17e61e 100644 --- a/routing/mock/centralized_client.go +++ b/routing/mock/centralized_client.go @@ -44,6 +44,21 @@ func (c *client) GetValue(ctx context.Context, key key.Key) ([]byte, error) { return data, nil } +func (c *client) GetValues(ctx context.Context, key key.Key, count int) ([]routing.RecvdVal, error) { + log.Debugf("GetValue: %s", key) + v, err := c.datastore.Get(key.DsKey()) + if err != nil { + return nil, err + } + + data, ok := v.([]byte) + if !ok { + return nil, errors.New("could not cast value from datastore") + } + + return []routing.RecvdVal{{Val: data, From: c.peer.ID()}}, nil +} + func (c *client) FindProviders(ctx context.Context, key key.Key) ([]peer.PeerInfo, error) { return c.server.Providers(key), nil } diff --git a/routing/none/none_client.go b/routing/none/none_client.go index 8400e6a3b..49df5870a 100644 --- a/routing/none/none_client.go +++ b/routing/none/none_client.go @@ -25,6 +25,10 @@ func (c *nilclient) GetValue(_ context.Context, _ key.Key) ([]byte, error) { return nil, errors.New("Tried GetValue from nil routing.") } +func (c *nilclient) GetValues(_ context.Context, _ key.Key, _ int) ([]routing.RecvdVal, error) { + return nil, errors.New("Tried GetValues from nil routing.") +} + func (c *nilclient) FindPeer(_ context.Context, _ peer.ID) (peer.PeerInfo, error) { return peer.PeerInfo{}, nil } diff --git a/routing/offline/offline.go b/routing/offline/offline.go index 7ead5d305..22aef75b3 100644 --- a/routing/offline/offline.go +++ b/routing/offline/offline.go @@ -67,6 +67,27 @@ func (c *offlineRouting) GetValue(ctx context.Context, key key.Key) ([]byte, err return rec.GetValue(), nil } +func (c *offlineRouting) GetValues(ctx context.Context, key key.Key, _ int) ([]routing.RecvdVal, error) { + v, err := c.datastore.Get(key.DsKey()) + if err != nil { + return nil, err + } + + byt, ok := v.([]byte) + if !ok { + return nil, errors.New("value stored in datastore not []byte") + } + rec := new(pb.Record) + err = proto.Unmarshal(byt, rec) + if err != nil { + return nil, err + } + + return []routing.RecvdVal{ + {Val: rec.GetValue()}, + }, nil +} + func (c *offlineRouting) FindProviders(ctx context.Context, key key.Key) ([]peer.PeerInfo, error) { return nil, ErrOffline } diff --git a/routing/record/record.go b/routing/record/record.go index 80da08581..61dd995ac 100644 --- a/routing/record/record.go +++ b/routing/record/record.go @@ -6,13 +6,11 @@ import ( proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" key "github.com/ipfs/go-ipfs/blocks/key" - dag "github.com/ipfs/go-ipfs/merkledag" ci "github.com/ipfs/go-ipfs/p2p/crypto" pb "github.com/ipfs/go-ipfs/routing/dht/pb" logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0" ) -var _ = dag.FetchGraph var log = logging.Logger("routing/record") // MakePutRecord creates and signs a dht record for the given key/value pair diff --git a/routing/record/selection.go b/routing/record/selection.go new file mode 100644 index 000000000..e90ebcd39 --- /dev/null +++ b/routing/record/selection.go @@ -0,0 +1,40 @@ +package record + +import ( + "errors" + "strings" + + key "github.com/ipfs/go-ipfs/blocks/key" +) + +// A SelectorFunc selects the best value for the given key from +// a slice of possible values and returns the index of the chosen one +type SelectorFunc func(key.Key, [][]byte) (int, error) + +type Selector map[string]SelectorFunc + +func (s Selector) BestRecord(k key.Key, recs [][]byte) (int, error) { + if len(recs) == 0 { + return 0, errors.New("no records given!") + } + + parts := strings.Split(string(k), "/") + if len(parts) < 3 { + log.Infof("Record key does not have selectorfunc: %s", k) + return 0, errors.New("record key does not have selectorfunc") + } + + sel, ok := s[parts[1]] + if !ok { + log.Infof("Unrecognized key prefix: %s", parts[1]) + return 0, ErrInvalidRecordType + } + + return sel(k, recs) +} + +// PublicKeySelector just selects the first entry. +// All valid public key records will be equivalent. +func PublicKeySelector(k key.Key, vals [][]byte) (int, error) { + return 0, nil +} diff --git a/routing/routing.go b/routing/routing.go index db9b49dcd..1c799b984 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -26,6 +26,18 @@ type IpfsRouting interface { // GetValue searches for the value corresponding to given Key. GetValue(context.Context, key.Key) ([]byte, error) + // GetValues searches for values corresponding to given Key. + // + // Passing a value of '0' for the count argument will cause the + // routing interface to return values only from cached or local storage + // and return an error if no cached value is found. + // + // Passing a value of '1' will return a local value if found, and query + // the network for the first value it finds otherwise. + // As a result, a value of '1' is mostly useful for cases where the record + // in question has only one valid value (such as public keys) + GetValues(c context.Context, k key.Key, count int) ([]RecvdVal, error) + // Value provider layer of indirection. // This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT. @@ -44,6 +56,13 @@ type IpfsRouting interface { // TODO expose io.Closer or plain-old Close error } +// RecvdVal represents a dht value record that has been received from a given peer +// it is used to track peers with expired records in order to correct them. +type RecvdVal struct { + From peer.ID + Val []byte +} + type PubKeyFetcher interface { GetPublicKey(context.Context, peer.ID) (ci.PubKey, error) } diff --git a/routing/supernode/client.go b/routing/supernode/client.go index 97d3d70c7..923d530eb 100644 --- a/routing/supernode/client.go +++ b/routing/supernode/client.go @@ -81,6 +81,22 @@ func (c *Client) GetValue(ctx context.Context, k key.Key) ([]byte, error) { return response.Record.GetValue(), nil } +func (c *Client) GetValues(ctx context.Context, k key.Key, _ int) ([]routing.RecvdVal, error) { + defer log.EventBegin(ctx, "getValue", &k).Done() + msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0) + response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote + if err != nil { + return nil, err + } + + return []routing.RecvdVal{ + { + Val: response.Record.GetValue(), + From: c.local, + }, + }, nil +} + func (c *Client) Provide(ctx context.Context, k key.Key) error { defer log.EventBegin(ctx, "provide", &k).Done() msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0)