1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-26 15:42:21 +08:00

ipns record selection via sequence numbers

This commit adds a sequence number to the IpnsEntry protobuf
that is used to determine which among a set of entries for the same key
is the 'most correct'.

GetValues has been added to the routing interface to retrieve a set of
records from the dht, for the caller to select from.

GetValue (singular) will call GetValues, select the 'best' record, and
then update that record to peers we received outdated records from.
This will help keep the dht consistent.

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy
2015-09-18 10:27:55 -07:00
parent 81c64768c5
commit cfcc3d6a1b
15 changed files with 301 additions and 37 deletions

View File

@ -501,6 +501,7 @@ func startListening(ctx context.Context, host p2phost.Host, cfg *config.Config)
func constructDHTRouting(ctx context.Context, host p2phost.Host, dstore ds.ThreadSafeDatastore) (routing.IpfsRouting, error) {
dhtRouting := dht.NewDHT(ctx, host, dstore)
dhtRouting.Validator[IpnsValidatorTag] = namesys.IpnsRecordValidator
dhtRouting.Selector[IpnsValidatorTag] = namesys.IpnsSelectorFunc
return dhtRouting, nil
}

View File

@ -56,6 +56,7 @@ type IpnsEntry struct {
Signature []byte `protobuf:"bytes,2,req,name=signature" json:"signature,omitempty"`
ValidityType *IpnsEntry_ValidityType `protobuf:"varint,3,opt,name=validityType,enum=namesys.pb.IpnsEntry_ValidityType" json:"validityType,omitempty"`
Validity []byte `protobuf:"bytes,4,opt,name=validity" json:"validity,omitempty"`
Sequence *uint64 `protobuf:"varint,5,opt,name=sequence" json:"sequence,omitempty"`
XXX_unrecognized []byte `json:"-"`
}
@ -91,6 +92,13 @@ func (m *IpnsEntry) GetValidity() []byte {
return nil
}
func (m *IpnsEntry) GetSequence() uint64 {
if m != nil && m.Sequence != nil {
return *m.Sequence
}
return 0
}
func init() {
proto.RegisterEnum("namesys.pb.IpnsEntry_ValidityType", IpnsEntry_ValidityType_name, IpnsEntry_ValidityType_value)
}

View File

@ -10,4 +10,6 @@ message IpnsEntry {
optional ValidityType validityType = 3;
optional bytes validity = 4;
optional uint64 sequence = 5;
}

View File

@ -7,6 +7,7 @@ import (
"time"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ds "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-datastore"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
key "github.com/ipfs/go-ipfs/blocks/key"
@ -45,10 +46,6 @@ func NewRoutingPublisher(route routing.IpfsRouting) Publisher {
func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Path) error {
log.Debugf("Publish %s", value)
data, err := createRoutingEntryData(k, value)
if err != nil {
return err
}
pubkey := k.GetPublic()
pkbytes, err := pubkey.Bytes()
if err != nil {
@ -57,6 +54,27 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa
nameb := u.Hash(pkbytes)
namekey := key.Key("/pk/" + string(nameb))
ipnskey := key.Key("/ipns/" + string(nameb))
// get previous records sequence number, and add one to it
var seqnum uint64
prevrec, err := p.routing.GetValues(ctx, ipnskey, 0)
if err == nil {
e := new(pb.IpnsEntry)
err := proto.Unmarshal(prevrec[0].Val, e)
if err != nil {
return err
}
seqnum = e.GetSequence() + 1
} else if err != ds.ErrNotFound {
return err
}
data, err := createRoutingEntryData(k, value, seqnum)
if err != nil {
return err
}
log.Debugf("Storing pubkey at: %s", namekey)
// Store associated public key
@ -67,8 +85,6 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa
return err
}
ipnskey := key.Key("/ipns/" + string(nameb))
log.Debugf("Storing ipns entry at: %s", ipnskey)
// Store ipns entry at "/ipns/"+b58(h(pubkey))
timectx, cancel = context.WithDeadline(ctx, time.Now().Add(time.Second*10))
@ -80,12 +96,13 @@ func (p *ipnsPublisher) Publish(ctx context.Context, k ci.PrivKey, value path.Pa
return nil
}
func createRoutingEntryData(pk ci.PrivKey, val path.Path) ([]byte, error) {
func createRoutingEntryData(pk ci.PrivKey, val path.Path, seq uint64) ([]byte, error) {
entry := new(pb.IpnsEntry)
entry.Value = []byte(val)
typ := pb.IpnsEntry_EOL
entry.ValidityType = &typ
entry.Sequence = proto.Uint64(seq)
entry.Validity = []byte(u.FormatRFC3339(time.Now().Add(time.Hour * 24)))
sig, err := pk.Sign(ipnsEntryDataForSig(entry))
@ -110,6 +127,52 @@ var IpnsRecordValidator = &record.ValidChecker{
Sign: true,
}
func IpnsSelectorFunc(k key.Key, vals [][]byte) (int, error) {
var recs []*pb.IpnsEntry
for _, v := range vals {
e := new(pb.IpnsEntry)
err := proto.Unmarshal(v, e)
if err == nil {
recs = append(recs, e)
} else {
recs = append(recs, nil)
}
}
var best_seq uint64
best_i := -1
for i, r := range recs {
if r == nil {
continue
}
if best_i == -1 || r.GetSequence() > best_seq {
best_seq = r.GetSequence()
best_i = i
} else if r.GetSequence() == best_seq {
rt, err := u.ParseRFC3339(string(r.GetValidity()))
if err != nil {
continue
}
bestt, err := u.ParseRFC3339(string(recs[best_i].GetValidity()))
if err != nil {
continue
}
if rt.After(bestt) {
best_seq = r.GetSequence()
best_i = i
}
}
}
if best_i == -1 {
return 0, errors.New("no usable records in given set")
}
return best_i, nil
}
// ValidateIpnsRecord implements ValidatorFunc and verifies that the
// given 'val' is an IpnsEntry and that that entry is valid.
func ValidateIpnsRecord(k key.Key, val []byte) error {

View File

@ -54,6 +54,7 @@ type IpfsDHT struct {
diaglock sync.Mutex // lock to make diagnostics work better
Validator record.Validator // record validator funcs
Selector record.Selector // record selection funcs
ctx context.Context
proc goprocess.Process
@ -89,6 +90,9 @@ func NewDHT(ctx context.Context, h host.Host, dstore ds.ThreadSafeDatastore) *Ip
dht.Validator = make(record.Validator)
dht.Validator["pk"] = record.PublicKeyValidator
dht.Selector = make(record.Selector)
dht.Selector["pk"] = record.PublicKeySelector
return dht
}
@ -152,13 +156,16 @@ func (dht *IpfsDHT) putProvider(ctx context.Context, p peer.ID, skey string) err
// NOTE: it will update the dht's peerstore with any new addresses
// it finds for the given peer.
func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
key key.Key) ([]byte, []peer.PeerInfo, error) {
key key.Key) (*pb.Record, []peer.PeerInfo, error) {
pmes, err := dht.getValueSingle(ctx, p, key)
if err != nil {
return nil, nil, err
}
// Perhaps we were given closer peers
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
if record := pmes.GetRecord(); record != nil {
// Success! We were given the value
log.Debug("getValueOrPeers: got value")
@ -169,11 +176,9 @@ func (dht *IpfsDHT) getValueOrPeers(ctx context.Context, p peer.ID,
log.Info("Received invalid record! (discarded)")
return nil, nil, err
}
return record.GetValue(), nil, nil
return record, peers, nil
}
// Perhaps we were given closer peers
peers := pb.PBPeersToPeerInfos(pmes.GetCloserPeers())
if len(peers) > 0 {
log.Debug("getValueOrPeers: peers")
return nil, peers, nil
@ -193,7 +198,7 @@ func (dht *IpfsDHT) getValueSingle(ctx context.Context, p peer.ID,
}
// getLocal attempts to retrieve the value from the datastore
func (dht *IpfsDHT) getLocal(key key.Key) ([]byte, error) {
func (dht *IpfsDHT) getLocal(key key.Key) (*pb.Record, error) {
log.Debug("getLocal %s", key)
v, err := dht.datastore.Get(key.DsKey())
@ -221,7 +226,7 @@ func (dht *IpfsDHT) getLocal(key key.Key) ([]byte, error) {
}
}
return rec.GetValue(), nil
return rec, nil
}
// getOwnPrivateKey attempts to load the local peers private

View File

@ -131,8 +131,14 @@ func TestValueGetSet(t *testing.T) {
},
Sign: false,
}
nulsel := func(_ key.Key, bs [][]byte) (int, error) {
return 0, nil
}
dhtA.Validator["v"] = vf
dhtB.Validator["v"] = vf
dhtA.Selector["v"] = nulsel
dhtB.Selector["v"] = nulsel
connect(t, ctx, dhtA, dhtB)
@ -193,7 +199,7 @@ func TestProvides(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(bits, v) {
if !bytes.Equal(bits.GetValue(), v) {
t.Fatal("didn't store the right bits (%s, %s)", k, v)
}
}
@ -466,7 +472,7 @@ func TestProvidesMany(t *testing.T) {
if err != nil {
t.Fatal(err)
}
if !bytes.Equal(bits, v) {
if !bytes.Equal(bits.GetValue(), v) {
t.Fatal("didn't store the right bits (%s, %s)", k, v)
}
@ -558,7 +564,7 @@ func TestProvidesAsync(t *testing.T) {
}
bits, err := dhts[3].getLocal(k)
if err != nil && bytes.Equal(bits, val) {
if err != nil && bytes.Equal(bits.GetValue(), val) {
t.Fatal(err)
}

View File

@ -108,7 +108,7 @@ func (dht *IpfsDHT) handlePutValue(ctx context.Context, p peer.ID, pmes *pb.Mess
dskey := key.Key(pmes.GetKey()).DsKey()
if err := dht.verifyRecordLocally(pmes.GetRecord()); err != nil {
log.Debugf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err)
log.Warningf("Bad dht record in PUT from: %s. %s", key.Key(pmes.GetRecord().GetAuthor()), err)
return nil, err
}

View File

@ -1,6 +1,7 @@
package dht
import (
"bytes"
"sync"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
@ -76,16 +77,71 @@ func (dht *IpfsDHT) PutValue(ctx context.Context, key key.Key, value []byte) err
}
// GetValue searches for the value corresponding to given Key.
// If the search does not succeed, a multiaddr string of a closer peer is
// returned along with util.ErrSearchIncomplete
func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
vals, err := dht.GetValues(ctx, key, 3)
if err != nil {
return nil, err
}
var recs [][]byte
for _, v := range vals {
recs = append(recs, v.Val)
}
i, err := dht.Selector.BestRecord(key, recs)
if err != nil {
return nil, err
}
best := recs[i]
log.Debugf("GetValue %v %v", key, best)
if best == nil {
log.Errorf("GetValue yielded correct record with nil value.")
return nil, routing.ErrNotFound
}
fixupRec, err := record.MakePutRecord(dht.peerstore.PrivKey(dht.self), key, best, true)
if err != nil {
// probably shouldnt actually 'error' here as we have found a value we like,
// but this call failing probably isnt something we want to ignore
return nil, err
}
for _, v := range vals {
// if someone sent us a different 'less-valid' record, lets correct them
if !bytes.Equal(v.Val, best) {
go func(v routing.RecvdVal) {
err := dht.putValueToPeer(ctx, v.From, key, fixupRec)
if err != nil {
log.Error("Error correcting DHT entry: ", err)
}
}(v)
}
}
return best, nil
}
func (dht *IpfsDHT) GetValues(ctx context.Context, key key.Key, nvals int) ([]routing.RecvdVal, error) {
var vals []routing.RecvdVal
var valslock sync.Mutex
// If we have it local, dont bother doing an RPC!
val, err := dht.getLocal(key)
lrec, err := dht.getLocal(key)
if err == nil {
// TODO: this is tricky, we dont always want to trust our own value
// what if the authoritative source updated it?
log.Debug("have it locally")
return val, nil
} else {
log.Debug("failed to get value locally: %s", err)
vals = append(vals, routing.RecvdVal{
Val: lrec.GetValue(),
From: dht.self,
})
if nvals <= 1 {
return vals, nil
}
} else if nvals == 0 {
return nil, err
}
// get closest peers in the routing table
@ -104,14 +160,26 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
ID: p,
})
val, peers, err := dht.getValueOrPeers(ctx, p, key)
rec, peers, err := dht.getValueOrPeers(ctx, p, key)
if err != nil {
return nil, err
}
res := &dhtQueryResult{value: val, closerPeers: peers}
if val != nil {
res.success = true
res := &dhtQueryResult{closerPeers: peers}
if rec.GetValue() != nil {
rv := routing.RecvdVal{
Val: rec.GetValue(),
From: p,
}
valslock.Lock()
vals = append(vals, rv)
// If weve collected enough records, we're done
if len(vals) >= nvals {
res.success = true
}
valslock.Unlock()
}
notif.PublishQueryEvent(parent, &notif.QueryEvent{
@ -124,17 +192,15 @@ func (dht *IpfsDHT) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
})
// run it!
result, err := query.Run(ctx, rtp)
if err != nil {
return nil, err
_, err = query.Run(ctx, rtp)
if len(vals) == 0 {
if err != nil {
return nil, err
}
}
log.Debugf("GetValue %v %v", key, result.value)
if result.value == nil {
return nil, routing.ErrNotFound
}
return vals, nil
return result.value, nil
}
// Value provider layer of indirection.

View File

@ -44,6 +44,21 @@ func (c *client) GetValue(ctx context.Context, key key.Key) ([]byte, error) {
return data, nil
}
func (c *client) GetValues(ctx context.Context, key key.Key, count int) ([]routing.RecvdVal, error) {
log.Debugf("GetValue: %s", key)
v, err := c.datastore.Get(key.DsKey())
if err != nil {
return nil, err
}
data, ok := v.([]byte)
if !ok {
return nil, errors.New("could not cast value from datastore")
}
return []routing.RecvdVal{{Val: data, From: c.peer.ID()}}, nil
}
func (c *client) FindProviders(ctx context.Context, key key.Key) ([]peer.PeerInfo, error) {
return c.server.Providers(key), nil
}

View File

@ -25,6 +25,10 @@ func (c *nilclient) GetValue(_ context.Context, _ key.Key) ([]byte, error) {
return nil, errors.New("Tried GetValue from nil routing.")
}
func (c *nilclient) GetValues(_ context.Context, _ key.Key, _ int) ([]routing.RecvdVal, error) {
return nil, errors.New("Tried GetValues from nil routing.")
}
func (c *nilclient) FindPeer(_ context.Context, _ peer.ID) (peer.PeerInfo, error) {
return peer.PeerInfo{}, nil
}

View File

@ -67,6 +67,27 @@ func (c *offlineRouting) GetValue(ctx context.Context, key key.Key) ([]byte, err
return rec.GetValue(), nil
}
func (c *offlineRouting) GetValues(ctx context.Context, key key.Key, _ int) ([]routing.RecvdVal, error) {
v, err := c.datastore.Get(key.DsKey())
if err != nil {
return nil, err
}
byt, ok := v.([]byte)
if !ok {
return nil, errors.New("value stored in datastore not []byte")
}
rec := new(pb.Record)
err = proto.Unmarshal(byt, rec)
if err != nil {
return nil, err
}
return []routing.RecvdVal{
{Val: rec.GetValue()},
}, nil
}
func (c *offlineRouting) FindProviders(ctx context.Context, key key.Key) ([]peer.PeerInfo, error) {
return nil, ErrOffline
}

View File

@ -6,13 +6,11 @@ import (
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
key "github.com/ipfs/go-ipfs/blocks/key"
dag "github.com/ipfs/go-ipfs/merkledag"
ci "github.com/ipfs/go-ipfs/p2p/crypto"
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
logging "github.com/ipfs/go-ipfs/vendor/go-log-v1.0.0"
)
var _ = dag.FetchGraph
var log = logging.Logger("routing/record")
// MakePutRecord creates and signs a dht record for the given key/value pair

View File

@ -0,0 +1,40 @@
package record
import (
"errors"
"strings"
key "github.com/ipfs/go-ipfs/blocks/key"
)
// A SelectorFunc selects the best value for the given key from
// a slice of possible values and returns the index of the chosen one
type SelectorFunc func(key.Key, [][]byte) (int, error)
type Selector map[string]SelectorFunc
func (s Selector) BestRecord(k key.Key, recs [][]byte) (int, error) {
if len(recs) == 0 {
return 0, errors.New("no records given!")
}
parts := strings.Split(string(k), "/")
if len(parts) < 3 {
log.Infof("Record key does not have selectorfunc: %s", k)
return 0, errors.New("record key does not have selectorfunc")
}
sel, ok := s[parts[1]]
if !ok {
log.Infof("Unrecognized key prefix: %s", parts[1])
return 0, ErrInvalidRecordType
}
return sel(k, recs)
}
// PublicKeySelector just selects the first entry.
// All valid public key records will be equivalent.
func PublicKeySelector(k key.Key, vals [][]byte) (int, error) {
return 0, nil
}

View File

@ -26,6 +26,18 @@ type IpfsRouting interface {
// GetValue searches for the value corresponding to given Key.
GetValue(context.Context, key.Key) ([]byte, error)
// GetValues searches for values corresponding to given Key.
//
// Passing a value of '0' for the count argument will cause the
// routing interface to return values only from cached or local storage
// and return an error if no cached value is found.
//
// Passing a value of '1' will return a local value if found, and query
// the network for the first value it finds otherwise.
// As a result, a value of '1' is mostly useful for cases where the record
// in question has only one valid value (such as public keys)
GetValues(c context.Context, k key.Key, count int) ([]RecvdVal, error)
// Value provider layer of indirection.
// This is what DSHTs (Coral and MainlineDHT) do to store large values in a DHT.
@ -44,6 +56,13 @@ type IpfsRouting interface {
// TODO expose io.Closer or plain-old Close error
}
// RecvdVal represents a dht value record that has been received from a given peer
// it is used to track peers with expired records in order to correct them.
type RecvdVal struct {
From peer.ID
Val []byte
}
type PubKeyFetcher interface {
GetPublicKey(context.Context, peer.ID) (ci.PubKey, error)
}

View File

@ -81,6 +81,22 @@ func (c *Client) GetValue(ctx context.Context, k key.Key) ([]byte, error) {
return response.Record.GetValue(), nil
}
func (c *Client) GetValues(ctx context.Context, k key.Key, _ int) ([]routing.RecvdVal, error) {
defer log.EventBegin(ctx, "getValue", &k).Done()
msg := pb.NewMessage(pb.Message_GET_VALUE, string(k), 0)
response, err := c.proxy.SendRequest(ctx, msg) // TODO wrap to hide the remote
if err != nil {
return nil, err
}
return []routing.RecvdVal{
{
Val: response.Record.GetValue(),
From: c.local,
},
}, nil
}
func (c *Client) Provide(ctx context.Context, k key.Key) error {
defer log.EventBegin(ctx, "provide", &k).Done()
msg := pb.NewMessage(pb.Message_ADD_PROVIDER, string(k), 0)