diff --git a/core/core.go b/core/core.go index 5f246c833..d52883cd8 100644 --- a/core/core.go +++ b/core/core.go @@ -501,6 +501,7 @@ func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config) func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.ThreadSafeDatastore) (routing.IpfsRouting, error) { dhtRouting := dht.NewDHT(ctx, host, dstore) dhtRouting.Validator[IpnsValidatorTag] = namesys.IpnsRecordValidator + dhtRouting.Selector[IpnsValidatorTag] = namesys.IpnsSelectorFunc return dhtRouting, nil } diff --git a/namesys/pb/namesys.pb.go b/namesys/pb/namesys.pb.go index 97e25a855..4d99a5e0a 100644 --- a/namesys/pb/namesys.pb.go +++ b/namesys/pb/namesys.pb.go @@ -56,6 +56,7 @@ type IpnsEntry struct { Signature []byte `protobuf:"bytes,2,req,name=signature" json:"signature,omitempty"` ValidityType *IpnsEntry_ValidityType `protobuf:"varint,3,opt,name=validityType,enum=namesys.pb.IpnsEntry_ValidityType" json:"validityType,omitempty"` Validity []byte `protobuf:"bytes,4,opt,name=validity" json:"validity,omitempty"` + Sequence *uint64 `protobuf:"varint,5,opt,name=sequence" json:"sequence,omitempty"` XXX_unrecognized []byte `json:"-"` } @@ -91,6 +92,13 @@ func (m *IpnsEntry) GetValidity() []byte { return nil } +func (m *IpnsEntry) GetSequence() uint64 { + if m != nil && m.Sequence != nil { + return *m.Sequence + } + return 0 +} + func init() { proto.RegisterEnum("namesys.pb.IpnsEntry_ValidityType", IpnsEntry_ValidityType_name, IpnsEntry_ValidityType_value) } diff --git a/namesys/pb/namesys.proto b/namesys/pb/namesys.proto index 4219af6bb..242f77bf2 100644 --- a/namesys/pb/namesys.proto +++ b/namesys/pb/namesys.proto @@ -10,4 +10,6 @@ message IpnsEntry { optional ValidityType validityType = 3; optional bytes validity = 4; + + optional uint64 sequence = 5; } diff --git a/namesys/publisher.go b/namesys/publisher.go index e3dd1d81b..a37a39fbe 100644 --- a/namesys/publisher.go +++ b/namesys/publisher.go @@ -7,6 +7,7 @@ import ( "time" proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" + ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" key "github.com/ipfs/go-ipfs/blocks/key" @@ -45,10 +46,6 @@ func NewRoutingPublisher(route routing.IpfsRouting) Publisher { func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Path) error { log.Debugf("Publish %s", value) - data, err := createRoutingEntryData(k, value) - if err != nil { - return err - } pubkey := k.GetPublic() pkbytes, err := pubkey.Bytes() if err != nil { @@ -57,6 +54,27 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa nameb := u.Hash(pkbytes) namekey := key.Key("/pk/" + string(nameb)) + ipnskey := key.Key("/ipns/" + string(nameb)) + + // get previous records sequence number, and add one to it + var seqnum uint64 + prevrec, err := p.routing.GetValues(ctx, ipnskey, 0) + if err == nil { + e := new(pb.IpnsEntry) + err := proto.Unmarshal(prevrec[0].Val, e) + if err != nil { + return err + } + + seqnum = e.GetSequence() + 1 + } else if err != ds.ErrNotFound { + return err + } + + data, err := createRoutingEntryData(k, value, seqnum) + if err != nil { + return err + } log.Debugf("Storing pubkey at: %s", namekey) // Store associated public key @@ -67,8 +85,6 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa return err } - ipnskey := key.Key("/ipns/" + string(nameb)) - log.Debugf("Storing ipns entry at: %s", ipnskey) // Store ipns entry at "/ipns/"+b58(h(pubkey)) timectx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Second*10)) @@ -80,12 +96,13 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa return nil } -func createRoutingEntryData(pk ci.PrivKey, val path.Path) ([]byte, error) { +func createRoutingEntryData(pk ci.PrivKey, val path.Path, seq uint64) ([]byte, error) { entry := new(pb.IpnsEntry) entry.Value = []byte(val) typ := pb.IpnsEntry_EOL entry.ValidityType = &typ + entry.Sequence = proto.Uint64(seq) entry.Validity = []byte(u.FormatRFC3339(time.Now().Add(time.Hour * 24))) sig, err := pk.Sign(ipnsEntryDataForSig(entry)) @@ -110,6 +127,52 @@ var IpnsRecordValidator = &record.ValidChecker{ Sign: true, } +func IpnsSelectorFunc(k key.Key, vals [][]byte) (int, error) { + var recs []*pb.IpnsEntry + for _, v := range vals { + e := new(pb.IpnsEntry) + err := proto.Unmarshal(v, e) + if err == nil { + recs = append(recs, e) + } else { + recs = append(recs, nil) + } + } + + var best_seq uint64 + best_i := -1 + + for i, r := range recs { + if r == nil { + continue + } + if best_i == -1 || r.GetSequence() > best_seq { + best_seq = r.GetSequence() + best_i = i + } else if r.GetSequence() == best_seq { + rt, err := u.ParseRFC3339(string(r.GetValidity())) + if err != nil { + continue + } + + bestt, err := u.ParseRFC3339(string(recs[best_i].GetValidity())) + if err != nil { + continue + } + + if rt.After(bestt) { + best_seq = r.GetSequence() + best_i = i + } + } + } + if best_i == -1 { + return 0, errors.New("no usable records in given set") + } + + return best_i, nil +} + // ValidateIpnsRecord implements ValidatorFunc and verifies that the // given 'val' is an IpnsEntry and that that entry is valid. func ValidateIpnsRecord(k key.Key, val []byte) error { diff --git a/routing/dht/dht.go b/routing/dht/dht.go index b5d6d1611..64f28c74e 100644 --- a/routing/dht/dht.go +++ b/routing/dht/dht.go @@ -54,6 +54,7 @@ type IpfsDHT struct { diaglock sync.Mutex // lock to make diagnostics work better Validator record.Validator // record validator funcs + Selector record.Selector // record selection funcs ctx context.Context proc goprocess.Process @@ -89,6 +90,9 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip dht.Validator = make(record.Validator) dht.Validator["pk"] = record.PublicKeyValidator + dht.Selector = make(record.Selector) + dht.Selector["pk"] = record.PublicKeySelector + return dht } @@ -152,13 +156,16 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, skey string) err // NOTE: it will update the dht's peerstore with any new addresses // it finds for the given peer. func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, - key key.Key) ([]byte, []peer.PeerInfo, error) { + key key.Key) (*pb.Record, []peer.PeerInfo, error) { pmes, err := dht.getValueSingle(ctx, p, key) if err != nil { return nil, nil, err } + // Perhaps we were given closer peers + peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers()) + if record := pmes.GetRecord(); record != nil { // Success! We were given the value log.Debug("getValueOrPeers: got value") @@ -169,11 +176,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID, log.Info("Received invalid record! (discarded)") return nil, nil, err } - return record.GetValue(), nil, nil + return record, peers, nil } - // Perhaps we were given closer peers - peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers()) if len(peers) > 0 { log.Debug("getValueOrPeers: peers") return nil, peers, nil @@ -193,7 +198,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID, } // getLocal attempts to retrieve the value from the datastore -func (dht *IpfsDHT) getLocal(key key.Key) ([]byte, error) { +func (dht *IpfsDHT) getLocal(key key.Key) (*pb.Record, error) { log.Debug("getLocal %s", key) v, err := dht.datastore.Get(key.DsKey()) @@ -221,7 +226,7 @@ func (dht *IpfsDHT) getLocal(key key.Key) ([]byte, error) { } } - return rec.GetValue(), nil + return rec, nil } // getOwnPrivateKey attempts to load the local peers private diff --git a/routing/dht/dht_test.go b/routing/dht/dht_test.go index 2e63e438e..c09871610 100644 --- a/routing/dht/dht_test.go +++ b/routing/dht/dht_test.go @@ -131,8 +131,14 @@ func TestValueGetSet(t *testing.T) { }, Sign: false, } + nulsel := func(_ key.Key, bs [][]byte) (int, error) { + return 0, nil + } + dhtA.Validator["v"] = vf dhtB.Validator["v"] = vf + dhtA.Selector["v"] = nulsel + dhtB.Selector["v"] = nulsel connect(t, ctx, dhtA, dhtB) @@ -193,7 +199,7 @@ func TestProvides(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Equal(bits, v) { + if !bytes.Equal(bits.GetValue(), v) { t.Fatal("didn't store the right bits (%s, %s)", k, v) } } @@ -466,7 +472,7 @@ func TestProvidesMany(t *testing.T) { if err != nil { t.Fatal(err) } - if !bytes.Equal(bits, v) { + if !bytes.Equal(bits.GetValue(), v) { t.Fatal("didn't store the right bits (%s, %s)", k, v) } @@ -558,7 +564,7 @@ func TestProvidesAsync(t *testing.T) { } bits, err := dhts[3].getLocal(k) - if err != nil && bytes.Equal(bits, val) { + if err != nil && bytes.Equal(bits.GetValue(), val) { t.Fatal(err) } diff --git a/routing/dht/handlers.go b/routing/dht/handlers.go index 36a92a251..1c5d2b1c1 100644 --- a/routing/dht/handlers.go +++ b/routing/dht/handlers.go @@ -108,7 +108,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess dskey := key.Key(pmes.GetKey()).DsKey() if err := dht.verifyRecordLocally(pmes.GetRecord()); err != nil { - log.Debugf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err) + log.Warningf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err) return nil, err } diff --git a/routing/dht/routing.go b/routing/dht/routing.go index 80652f6ad..d5854155f 100644 --- a/routing/dht/routing.go +++ b/routing/dht/routing.go @@ -1,6 +1,7 @@ package dht import ( + "bytes" "sync" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" @@ -76,16 +77,71 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) err } // GetValue searches for the value corresponding to given Key. -// If the search does not succeed, a multiaddr string of a closer peer is -// returned along with util.ErrSearchIncomplete func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) { + vals, err := dht.GetValues(ctx, key, 3) + if err != nil { + return nil, err + } + + var recs [][]byte + for _, v := range vals { + recs = append(recs, v.Val) + } + + i, err := dht.Selector.BestRecord(key, recs) + if err != nil { + return nil, err + } + + best := recs[i] + log.Debugf("GetValue %v %v", key, best) + if best == nil { + log.Errorf("GetValue yielded correct record with nil value.") + return nil, routing.ErrNotFound + } + + fixupRec, err := record.MakePutRecord(dht.peerstore.PrivKey(dht.self), key, best, true) + if err != nil { + // probably shouldnt actually 'error' here as we have found a value we like, + // but this call failing probably isnt something we want to ignore + return nil, err + } + + for _, v := range vals { + // if someone sent us a different 'less-valid' record, lets correct them + if !bytes.Equal(v.Val, best) { + go func(v routing.RecvdVal) { + err := dht.putValueToPeer(ctx, v.From, key, fixupRec) + if err != nil { + log.Error("Error correcting DHT entry: ", err) + } + }(v) + } + } + + return best, nil +} + +func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]routing.RecvdVal, error) { + var vals []routing.RecvdVal + var valslock sync.Mutex + // If we have it local, dont bother doing an RPC! - val, err := dht.getLocal(key) + lrec, err := dht.getLocal(key) if err == nil { + // TODO: this is tricky, we dont always want to trust our own value + // what if the authoritative source updated it? log.Debug("have it locally") - return val, nil - } else { - log.Debug("failed to get value locally: %s", err) + vals = append(vals, routing.RecvdVal{ + Val: lrec.GetValue(), + From: dht.self, + }) + + if nvals <= 1 { + return vals, nil + } + } else if nvals == 0 { + return nil, err } // get closest peers in the routing table @@ -104,14 +160,26 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) { ID: p, }) - val, peers, err := dht.getValueOrPeers(ctx, p, key) + rec, peers, err := dht.getValueOrPeers(ctx, p, key) if err != nil { return nil, err } - res := &dhtQueryResult{value: val, closerPeers: peers} - if val != nil { - res.success = true + res := &dhtQueryResult{closerPeers: peers} + + if rec.GetValue() != nil { + rv := routing.RecvdVal{ + Val: rec.GetValue(), + From: p, + } + valslock.Lock() + vals = append(vals, rv) + + // If weve collected enough records, we're done + if len(vals) >= nvals { + res.success = true + } + valslock.Unlock() } notif.PublishQueryEvent(parent, ¬if.QueryEvent{ @@ -124,17 +192,15 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) { }) // run it! - result, err := query.Run(ctx, rtp) - if err != nil { - return nil, err + _, err = query.Run(ctx, rtp) + if len(vals) == 0 { + if err != nil { + return nil, err + } } - log.Debugf("GetValue %v %v", key, result.value) - if result.value == nil { - return nil, routing.ErrNotFound - } + return vals, nil - return result.value, nil } // Value provider layer of indirection. diff --git a/routing/mock/centralized_client.go b/routing/mock/centralized_client.go index 9d577c4a3..09f17e61e 100644 --- a/routing/mock/centralized_client.go +++ b/routing/mock/centralized_client.go @@ -44,6 +44,21 @@ func (c *client) GetValue(ctx context.Context, key key.Key) ([]byte, error) { return data, nil } +func (c *client) GetValues(ctx context.Context, key key.Key, count int) ([]routing.RecvdVal, error) { + log.Debugf("GetValue: %s", key) + v, err := c.datastore.Get(key.DsKey()) + if err != nil { + return nil, err + } + + data, ok := v.([]byte) + if !ok { + return nil, errors.New("could not cast value from datastore") + } + + return []routing.RecvdVal{{Val: data, From: c.peer.ID()}}, nil +} + func (c *client) FindProviders(ctx context.Context, key key.Key) ([]peer.PeerInfo, error) { return c.server.Providers(key), nil } diff --git a/routing/none/none_client.go b/routing/none/none_client.go index 8400e6a3b..49df5870a 100644 --- a/routing/none/none_client.go +++ b/routing/none/none_client.go @@ -25,6 +25,10 @@ func (c *nilclient) GetValue(_ context.Context, _ key.Key) ([]byte, error) { return nil, errors.New("Tried GetValue from nil routing.") } +func (c *nilclient) GetValues(_ context.Context, _ key.Key, _ int) ([]routing.RecvdVal, error) { + return nil, errors.New("Tried GetValues from nil routing.") +} + func (c *nilclient) FindPeer(_ context.Context, _ peer.ID) (peer.PeerInfo, error) { return peer.PeerInfo{}, nil } diff --git a/routing/offline/offline.go b/routing/offline/offline.go index 7ead5d305..22aef75b3 100644 --- a/routing/offline/offline.go +++ b/routing/offline/offline.go @@ -67,6 +67,27 @@ func (c *offlineRouting) GetValue(ctx context.Context, key key.Key) ([]byte, err return rec.GetValue(), nil } +func (c *offlineRouting) GetValues(ctx context.Context, key key.Key, _ int) ([]routing.RecvdVal, error) { + v, err := c.datastore.Get(key.DsKey()) + if err != nil { + return nil, err + } + + byt, ok := v.([]byte) + if !ok { + return nil, errors.New("value stored in datastore not []byte") + } + rec := new(pb.Record) + err = proto.Unmarshal(byt, rec) + if err != nil { + return nil, err + } + + return []routing.RecvdVal{ + {Val: rec.GetValue()}, + }, nil +} + func (c *offlineRouting) FindProviders(ctx context.Context, key key.Key) ([]peer.PeerInfo, error) { return nil, ErrOffline } diff --git a/routing/record/record.go b/routing/record/record.go index 80da08581..61dd995ac 100644 --- a/routing/record/record.go +++ b/routing/record/record.go @@ -6,13 +6,11 @@ import ( proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto" key "github.com/ipfs/go-ipfs/blocks/key" - dag "github.com/ipfs/go-ipfs/merkledag" ci "github.com/ipfs/go-ipfs/p2p/crypto" pb "github.com/ipfs/go-ipfs/routing/dht/pb" logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0" ) -var _ = dag.FetchGraph var log = logging.Logger("routing/record") // MakePutRecord creates and signs a dht record for the given key/value pair diff --git a/routing/record/selection.go b/routing/record/selection.go new file mode 100644 index 000000000..e90ebcd39 --- /dev/null +++ b/routing/record/selection.go @@ -0,0 +1,40 @@ +package record + +import ( + "errors" + "strings" + + key "github.com/ipfs/go-ipfs/blocks/key" +) + +// A SelectorFunc selects the best value for the given key from +// a slice of possible values and returns the index of the chosen one +type SelectorFunc func(key.Key, [][]byte) (int, error) + +type Selector map[string]SelectorFunc + +func (s Selector) BestRecord(k key.Key, recs [][]byte) (int, error) { + if len(recs) == 0 { + return 0, errors.New("no records given!") + } + + parts := strings.Split(string(k), "/") + if len(parts) < 3 { + log.Infof("Record key does not have selectorfunc: %s", k) + return 0, errors.New("record key does not have selectorfunc") + } + + sel, ok := s[parts[1]] + if !ok { + log.Infof("Unrecognized key prefix: %s", parts[1]) + return 0, ErrInvalidRecordType + } + + return sel(k, recs) +} + +// PublicKeySelector just selects the first entry. +// All valid public key records will be equivalent. +func PublicKeySelector(k key.Key, vals [][]byte) (int, error) { + return 0, nil +} diff --git a/routing/routing.go b/routing/routing.go index db9b49dcd..1c799b984 100644 --- a/routing/routing.go +++ b/routing/routing.go @@ -26,6 +26,18 @@ type IpfsRouting interface { // GetValue searches for the value corresponding to given Key. GetValue(context.Context, key.Key) ([]byte, error) + // GetValues searches for values corresponding to given Key. + // + // Passing a value of '0' for the count argument will cause the + // routing interface to return values only from cached or local storage + // and return an error if no cached value is found. + // + // Passing a value of '1' will return a local value if found, and query + // the network for the first value it finds otherwise. + // As a result, a value of '1' is mostly useful for cases where the record + // in question has only one valid value (such as public keys) + GetValues(c context.Context, k key.Key, count int) ([]RecvdVal, error) + // Value provider layer of indirection. // This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT. @@ -44,6 +56,13 @@ type IpfsRouting interface { // TODO expose io.Closer or plain-old Close error } +// RecvdVal represents a dht value record that has been received from a given peer +// it is used to track peers with expired records in order to correct them. +type RecvdVal struct { + From peer.ID + Val []byte +} + type PubKeyFetcher interface { GetPublicKey(context.Context, peer.ID) (ci.PubKey, error) } diff --git a/routing/supernode/client.go b/routing/supernode/client.go index 97d3d70c7..923d530eb 100644 --- a/routing/supernode/client.go +++ b/routing/supernode/client.go @@ -81,6 +81,22 @@ func (c *Client) GetValue(ctx context.Context, k key.Key) ([]byte, error) { return response.Record.GetValue(), nil } +func (c *Client) GetValues(ctx context.Context, k key.Key, _ int) ([]routing.RecvdVal, error) { + defer log.EventBegin(ctx, "getValue", &k).Done() + msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0) + response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote + if err != nil { + return nil, err + } + + return []routing.RecvdVal{ + { + Val: response.Record.GetValue(), + From: c.local, + }, + }, nil +} + func (c *Client) Provide(ctx context.Context, k key.Key) error { defer log.EventBegin(ctx, "provide", &k).Done() msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0)