mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-26 15:42:21 +08:00
namesys: switch to async code
License: MIT Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
@ -17,45 +17,33 @@ type onceResult struct {
|
||||
}
|
||||
|
||||
type resolver interface {
|
||||
// resolveOnce looks up a name once (without recursion).
|
||||
resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (value path.Path, ttl time.Duration, err error)
|
||||
|
||||
resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult
|
||||
}
|
||||
|
||||
// resolve is a helper for implementing Resolver.ResolveN using resolveOnce.
|
||||
func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) (path.Path, error) {
|
||||
depth := options.Depth
|
||||
for {
|
||||
p, _, err := r.resolveOnce(ctx, name, options)
|
||||
ctx, cancel := context.WithCancel(ctx)
|
||||
defer cancel()
|
||||
|
||||
err := ErrResolveFailed
|
||||
var p path.Path
|
||||
|
||||
resCh := resolveAsync(ctx, r, name, options, prefix)
|
||||
|
||||
for res := range resCh {
|
||||
p, err = res.Path, res.Err
|
||||
if err != nil {
|
||||
return "", err
|
||||
}
|
||||
log.Debugf("resolved %s to %s", name, p.String())
|
||||
|
||||
if strings.HasPrefix(p.String(), "/ipfs/") {
|
||||
// we've bottomed out with an IPFS path
|
||||
return p, nil
|
||||
}
|
||||
|
||||
if depth == 1 {
|
||||
return p, ErrResolveRecursion
|
||||
}
|
||||
|
||||
if !strings.HasPrefix(p.String(), prefix) {
|
||||
return p, nil
|
||||
}
|
||||
name = strings.TrimPrefix(p.String(), prefix)
|
||||
|
||||
if depth > 1 {
|
||||
depth--
|
||||
break
|
||||
}
|
||||
}
|
||||
|
||||
return p, err
|
||||
}
|
||||
|
||||
//TODO:
|
||||
// - better error handling
|
||||
func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result {
|
||||
// - select on writes
|
||||
func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result {
|
||||
resCh := r.resolveOnceAsync(ctx, name, options)
|
||||
depth := options.Depth
|
||||
outCh := make(chan Result)
|
||||
@ -70,7 +58,7 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R
|
||||
case res, ok := <-resCh:
|
||||
if !ok {
|
||||
resCh = nil
|
||||
continue
|
||||
break
|
||||
}
|
||||
|
||||
if res.err != nil {
|
||||
@ -79,14 +67,13 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R
|
||||
}
|
||||
log.Debugf("resolved %s to %s", name, res.value.String())
|
||||
if strings.HasPrefix(res.value.String(), "/ipfs/") {
|
||||
outCh <- Result{Err: res.err}
|
||||
continue
|
||||
outCh <- Result{Path: res.value}
|
||||
break
|
||||
}
|
||||
p := strings.TrimPrefix(res.value.String(), prefix)
|
||||
|
||||
if depth == 1 {
|
||||
outCh <- Result{Err: ErrResolveRecursion}
|
||||
continue
|
||||
outCh <- Result{Path: res.value, Err: ErrResolveRecursion}
|
||||
break
|
||||
}
|
||||
|
||||
subopts := options
|
||||
@ -102,26 +89,21 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R
|
||||
subCtx, cancelSub = context.WithCancel(ctx)
|
||||
defer cancelSub()
|
||||
|
||||
subCh = resolveAsyncDo(subCtx, r, p, subopts, prefix)
|
||||
p := strings.TrimPrefix(res.value.String(), prefix)
|
||||
subCh = resolveAsync(subCtx, r, p, subopts, prefix)
|
||||
case res, ok := <-subCh:
|
||||
if !ok {
|
||||
subCh = nil
|
||||
continue
|
||||
}
|
||||
|
||||
if res.Err != nil {
|
||||
outCh <- Result{Err: res.Err}
|
||||
return
|
||||
break
|
||||
}
|
||||
|
||||
outCh <- res
|
||||
case <-ctx.Done():
|
||||
}
|
||||
if resCh == nil && subCh == nil {
|
||||
return
|
||||
}
|
||||
}
|
||||
}()
|
||||
return outCh
|
||||
}
|
||||
|
||||
func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result {
|
||||
return resolveAsyncDo(ctx, r, name, options, prefix)
|
||||
}
|
||||
|
@ -5,7 +5,6 @@ import (
|
||||
"errors"
|
||||
"net"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
opts "github.com/ipfs/go-ipfs/namesys/opts"
|
||||
isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain"
|
||||
@ -31,6 +30,7 @@ func (r *DNSResolver) Resolve(ctx context.Context, name string, options ...opts.
|
||||
return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/")
|
||||
}
|
||||
|
||||
// ResolveAsync implements Resolver.
|
||||
func (r *DNSResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result {
|
||||
return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/")
|
||||
}
|
||||
@ -43,51 +43,6 @@ type lookupRes struct {
|
||||
// resolveOnce implements resolver.
|
||||
// TXT records for a given domain name should contain a b58
|
||||
// encoded multihash.
|
||||
func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) {
|
||||
segments := strings.SplitN(name, "/", 2)
|
||||
domain := segments[0]
|
||||
|
||||
if !isd.IsDomain(domain) {
|
||||
return "", 0, errors.New("not a valid domain name")
|
||||
}
|
||||
log.Debugf("DNSResolver resolving %s", domain)
|
||||
|
||||
rootChan := make(chan lookupRes, 1)
|
||||
go workDomain(r, domain, rootChan)
|
||||
|
||||
subChan := make(chan lookupRes, 1)
|
||||
go workDomain(r, "_dnslink."+domain, subChan)
|
||||
|
||||
var subRes lookupRes
|
||||
select {
|
||||
case subRes = <-subChan:
|
||||
case <-ctx.Done():
|
||||
return "", 0, ctx.Err()
|
||||
}
|
||||
|
||||
var p path.Path
|
||||
if subRes.error == nil {
|
||||
p = subRes.path
|
||||
} else {
|
||||
var rootRes lookupRes
|
||||
select {
|
||||
case rootRes = <-rootChan:
|
||||
case <-ctx.Done():
|
||||
return "", 0, ctx.Err()
|
||||
}
|
||||
if rootRes.error == nil {
|
||||
p = rootRes.path
|
||||
} else {
|
||||
return "", 0, ErrResolveFailed
|
||||
}
|
||||
}
|
||||
var err error
|
||||
if len(segments) > 1 {
|
||||
p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[1])
|
||||
}
|
||||
return p, 0, err
|
||||
}
|
||||
|
||||
func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
|
||||
out := make(chan onceResult, 1)
|
||||
segments := strings.SplitN(name, "/", 2)
|
||||
@ -106,6 +61,13 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options
|
||||
subChan := make(chan lookupRes, 1)
|
||||
go workDomain(r, "_dnslink."+domain, subChan)
|
||||
|
||||
appendPath := func(p path.Path) (path.Path, error) {
|
||||
if len(segments) > 1 {
|
||||
return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[1])
|
||||
}
|
||||
return p, nil
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer close(out)
|
||||
for {
|
||||
@ -113,21 +75,25 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options
|
||||
case subRes, ok := <-subChan:
|
||||
if !ok {
|
||||
subChan = nil
|
||||
break
|
||||
}
|
||||
if subRes.error == nil {
|
||||
p, err := appendPath(subRes.path)
|
||||
select {
|
||||
case out <- onceResult{value: subRes.path}:
|
||||
case out <- onceResult{value: p, err: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
return
|
||||
}
|
||||
case rootRes, ok := <-rootChan:
|
||||
if !ok {
|
||||
subChan = nil
|
||||
rootChan = nil
|
||||
break
|
||||
}
|
||||
if rootRes.error == nil {
|
||||
p, err := appendPath(rootRes.path)
|
||||
select {
|
||||
case out <- onceResult{value: rootRes.path}:
|
||||
case out <- onceResult{value: p, err: err}:
|
||||
case <-ctx.Done():
|
||||
}
|
||||
}
|
||||
@ -144,8 +110,9 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options
|
||||
}
|
||||
|
||||
func workDomain(r *DNSResolver, name string, res chan lookupRes) {
|
||||
txt, err := r.lookupTXT(name)
|
||||
defer close(res)
|
||||
|
||||
txt, err := r.lookupTXT(name)
|
||||
if err != nil {
|
||||
// Error is != nil
|
||||
res <- lookupRes{"", err}
|
||||
|
@ -57,14 +57,13 @@ func TestResolverValidation(t *testing.T) {
|
||||
}
|
||||
|
||||
// Resolve entry
|
||||
resp, _, err := resolver.resolveOnce(ctx, id.Pretty(), opts.DefaultResolveOpts())
|
||||
resp, err := resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts(), "/ipns/")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
if resp != path.Path(p) {
|
||||
t.Fatalf("Mismatch between published path %s and resolved path %s", p, resp)
|
||||
}
|
||||
|
||||
// Create expired entry
|
||||
expiredEntry, err := ipns.Create(priv, p, 1, ts.Add(-1*time.Hour))
|
||||
if err != nil {
|
||||
@ -78,7 +77,7 @@ func TestResolverValidation(t *testing.T) {
|
||||
}
|
||||
|
||||
// Record should fail validation because entry is expired
|
||||
_, _, err = resolver.resolveOnce(ctx, id.Pretty(), opts.DefaultResolveOpts())
|
||||
_, err = resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts(), "/ipns/")
|
||||
if err == nil {
|
||||
t.Fatal("ValidateIpnsRecord should have returned error")
|
||||
}
|
||||
@ -100,7 +99,7 @@ func TestResolverValidation(t *testing.T) {
|
||||
|
||||
// Record should fail validation because public key defined by
|
||||
// ipns path doesn't match record signature
|
||||
_, _, err = resolver.resolveOnce(ctx, id2.Pretty(), opts.DefaultResolveOpts())
|
||||
_, err = resolve(ctx, resolver, id2.Pretty(), opts.DefaultResolveOpts(), "/ipns/")
|
||||
if err == nil {
|
||||
t.Fatal("ValidateIpnsRecord should have failed signature verification")
|
||||
}
|
||||
@ -118,7 +117,7 @@ func TestResolverValidation(t *testing.T) {
|
||||
|
||||
// Record should fail validation because public key is not available
|
||||
// in peer store or on network
|
||||
_, _, err = resolver.resolveOnce(ctx, id3.Pretty(), opts.DefaultResolveOpts())
|
||||
_, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts(), "/ipns/")
|
||||
if err == nil {
|
||||
t.Fatal("ValidateIpnsRecord should have failed because public key was not found")
|
||||
}
|
||||
@ -133,7 +132,7 @@ func TestResolverValidation(t *testing.T) {
|
||||
// public key is available in the peer store by looking it up in
|
||||
// the DHT, which causes the DHT to fetch it and cache it in the
|
||||
// peer store
|
||||
_, _, err = resolver.resolveOnce(ctx, id3.Pretty(), opts.DefaultResolveOpts())
|
||||
_, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts(), "/ipns/")
|
||||
if err != nil {
|
||||
t.Fatal(err)
|
||||
}
|
||||
|
@ -82,48 +82,6 @@ func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.R
|
||||
}
|
||||
|
||||
// resolveOnce implements resolver.
|
||||
func (ns *mpns) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) {
|
||||
if !strings.HasPrefix(name, "/ipns/") {
|
||||
name = "/ipns/" + name
|
||||
}
|
||||
segments := strings.SplitN(name, "/", 4)
|
||||
if len(segments) < 3 || segments[0] != "" {
|
||||
log.Debugf("invalid name syntax for %s", name)
|
||||
return "", 0, ErrResolveFailed
|
||||
}
|
||||
|
||||
key := segments[2]
|
||||
|
||||
p, ok := ns.cacheGet(key)
|
||||
var err error
|
||||
if !ok {
|
||||
// Resolver selection:
|
||||
// 1. if it is a multihash resolve through "ipns".
|
||||
// 2. if it is a domain name, resolve through "dns"
|
||||
// 3. otherwise resolve through the "proquint" resolver
|
||||
var res resolver
|
||||
if _, err := mh.FromB58String(key); err == nil {
|
||||
res = ns.ipnsResolver
|
||||
} else if isd.IsDomain(key) {
|
||||
res = ns.dnsResolver
|
||||
} else {
|
||||
res = ns.proquintResolver
|
||||
}
|
||||
|
||||
var ttl time.Duration
|
||||
p, ttl, err = res.resolveOnce(ctx, key, options)
|
||||
if err != nil {
|
||||
return "", 0, ErrResolveFailed
|
||||
}
|
||||
ns.cacheSet(key, p, ttl)
|
||||
}
|
||||
|
||||
if len(segments) > 3 {
|
||||
p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3])
|
||||
}
|
||||
return p, 0, err
|
||||
}
|
||||
|
||||
func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
|
||||
out := make(chan onceResult, 1)
|
||||
|
||||
|
@ -4,12 +4,11 @@ import (
|
||||
"context"
|
||||
"fmt"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
opts "github.com/ipfs/go-ipfs/namesys/opts"
|
||||
|
||||
"gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs"
|
||||
path "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path"
|
||||
|
||||
ci "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto"
|
||||
offroute "gx/ipfs/QmScZySgru9jaoDa12sSfvh21sWbqF5eXkieTmJzAHJXkQ/go-ipfs-routing/offline"
|
||||
ds "gx/ipfs/QmUyz7JTJzgegC6tiJrfby3mPhzcdswVtG4x58TQ6pq8jV/go-datastore"
|
||||
@ -38,13 +37,12 @@ func testResolution(t *testing.T, resolver Resolver, name string, depth uint, ex
|
||||
}
|
||||
}
|
||||
|
||||
func (r *mockResolver) resolveOnce(ctx context.Context, name string, opts opts.ResolveOpts) (path.Path, time.Duration, error) {
|
||||
p, err := path.ParsePath(r.entries[name])
|
||||
return p, 0, err
|
||||
}
|
||||
|
||||
func (r *mockResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
|
||||
panic("stub")
|
||||
p, err := path.ParsePath(r.entries[name])
|
||||
out := make(chan onceResult, 1)
|
||||
out <- onceResult{value: p, err: err}
|
||||
close(out)
|
||||
return out
|
||||
}
|
||||
|
||||
func mockResolverOne() *mockResolver {
|
||||
|
@ -1,10 +1,8 @@
|
||||
package namesys
|
||||
|
||||
import (
|
||||
"context"
|
||||
"errors"
|
||||
"time"
|
||||
|
||||
context "context"
|
||||
|
||||
opts "github.com/ipfs/go-ipfs/namesys/opts"
|
||||
proquint "gx/ipfs/QmYnf27kzqR2cxt6LFZdrAFJuQd6785fTkBvMuEj9EeRxM/proquint"
|
||||
@ -19,15 +17,6 @@ func (r *ProquintResolver) Resolve(ctx context.Context, name string, options ...
|
||||
}
|
||||
|
||||
// resolveOnce implements resolver. Decodes the proquint string.
|
||||
func (r *ProquintResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) {
|
||||
ok, err := proquint.IsProquint(name)
|
||||
if err != nil || !ok {
|
||||
return "", 0, errors.New("not a valid proquint string")
|
||||
}
|
||||
// Return a 0 TTL as caching this result is pointless.
|
||||
return path.FromString(string(proquint.Decode(name))), 0, nil
|
||||
}
|
||||
|
||||
func (r *ProquintResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
|
||||
out := make(chan onceResult, 1)
|
||||
defer close(out)
|
||||
|
@ -11,6 +11,7 @@ import (
|
||||
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
|
||||
mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash"
|
||||
routing "gx/ipfs/QmVBnJDKhtFXTRVjXKinqpwGu8t1DyNqPKan2iGX8PR8xG/go-libp2p-routing"
|
||||
ropts "gx/ipfs/QmVBnJDKhtFXTRVjXKinqpwGu8t1DyNqPKan2iGX8PR8xG/go-libp2p-routing/options"
|
||||
logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log"
|
||||
dht "gx/ipfs/QmZVakpN44VAUxs9eXAuUGLFYTCGmSyqSy6hyEKfMv68ME/go-libp2p-kad-dht"
|
||||
ipns "gx/ipfs/QmZrmn2BPZbSviQAWeyY2iXkCukmJHv9n7zrLgWU5KgbTb/go-ipns"
|
||||
@ -42,120 +43,30 @@ func (r *IpnsResolver) Resolve(ctx context.Context, name string, options ...opts
|
||||
return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/")
|
||||
}
|
||||
|
||||
// ResolveAsync implements Resolver.
|
||||
func (r *IpnsResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result {
|
||||
return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/")
|
||||
}
|
||||
|
||||
// resolveOnce implements resolver. Uses the IPFS routing system to
|
||||
// resolve SFS-like names.
|
||||
func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) {
|
||||
func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
|
||||
out := make(chan onceResult, 1)
|
||||
log.Debugf("RoutingResolver resolving %s", name)
|
||||
cancel := func() {}
|
||||
|
||||
if options.DhtTimeout != 0 {
|
||||
// Resolution must complete within the timeout
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, options.DhtTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
name = strings.TrimPrefix(name, "/ipns/")
|
||||
pid, err := peer.IDB58Decode(name)
|
||||
if err != nil {
|
||||
// name should be a multihash. if it isn't, error out here.
|
||||
log.Debugf("RoutingResolver: IPNS address not a valid peer ID: [%s]\n", name)
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
// Name should be the hash of a public key retrievable from ipfs.
|
||||
// We retrieve the public key here to make certain that it's in the peer
|
||||
// store before calling GetValue() on the DHT - the DHT will call the
|
||||
// ipns validator, which in turn will get the public key from the peer
|
||||
// store to verify the record signature
|
||||
_, err = routing.GetPublicKey(r.routing, ctx, pid)
|
||||
if err != nil {
|
||||
log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err)
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
// Use the routing system to get the name.
|
||||
// Note that the DHT will call the ipns validator when retrieving
|
||||
// the value, which in turn verifies the ipns record signature
|
||||
ipnsKey := ipns.RecordKey(pid)
|
||||
val, err := r.routing.GetValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount)))
|
||||
if err != nil {
|
||||
log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err)
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
entry := new(pb.IpnsEntry)
|
||||
err = proto.Unmarshal(val, entry)
|
||||
if err != nil {
|
||||
log.Debugf("RoutingResolver: could not unmarshal value for name %s: %s", name, err)
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
var p path.Path
|
||||
// check for old style record:
|
||||
if valh, err := mh.Cast(entry.GetValue()); err == nil {
|
||||
// Its an old style multihash record
|
||||
log.Debugf("encountered CIDv0 ipns entry: %s", valh)
|
||||
p = path.FromCid(cid.NewCidV0(valh))
|
||||
} else {
|
||||
// Not a multihash, probably a new record
|
||||
p, err = path.ParsePath(string(entry.GetValue()))
|
||||
if err != nil {
|
||||
return "", 0, err
|
||||
}
|
||||
}
|
||||
|
||||
ttl := DefaultResolverCacheTTL
|
||||
if entry.Ttl != nil {
|
||||
ttl = time.Duration(*entry.Ttl)
|
||||
}
|
||||
switch eol, err := ipns.GetEOL(entry); err {
|
||||
case ipns.ErrUnrecognizedValidity:
|
||||
// No EOL.
|
||||
case nil:
|
||||
ttEol := eol.Sub(time.Now())
|
||||
if ttEol < 0 {
|
||||
// It *was* valid when we first resolved it.
|
||||
ttl = 0
|
||||
} else if ttEol < ttl {
|
||||
ttl = ttEol
|
||||
}
|
||||
default:
|
||||
log.Errorf("encountered error when parsing EOL: %s", err)
|
||||
return "", 0, err
|
||||
}
|
||||
|
||||
return p, ttl, nil
|
||||
}
|
||||
|
||||
func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult {
|
||||
out := make(chan onceResult, 1)
|
||||
log.Debugf("RoutingResolver resolving %s", name)
|
||||
if options.DhtTimeout != 0 {
|
||||
// Resolution must complete within the timeout
|
||||
var cancel context.CancelFunc
|
||||
ctx, cancel = context.WithTimeout(ctx, options.DhtTimeout)
|
||||
defer cancel()
|
||||
}
|
||||
|
||||
name = strings.TrimPrefix(name, "/ipns/")
|
||||
hash, err := mh.FromB58String(name)
|
||||
if err != nil {
|
||||
// name should be a multihash. if it isn't, error out here.
|
||||
log.Debugf("RoutingResolver: bad input hash: [%s]\n", name)
|
||||
out <- onceResult{err: err}
|
||||
close(out)
|
||||
return out
|
||||
}
|
||||
|
||||
pid, err := peer.IDFromBytes(hash)
|
||||
if err != nil {
|
||||
log.Debugf("RoutingResolver: could not convert public key hash %s to peer ID: %s\n", name, err)
|
||||
out <- onceResult{err: err}
|
||||
close(out)
|
||||
cancel()
|
||||
return out
|
||||
}
|
||||
|
||||
@ -169,6 +80,7 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option
|
||||
log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err)
|
||||
out <- onceResult{err: err}
|
||||
close(out)
|
||||
cancel()
|
||||
return out
|
||||
}
|
||||
|
||||
@ -177,15 +89,17 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option
|
||||
// the value, which in turn verifies the ipns record signature
|
||||
ipnsKey := ipns.RecordKey(pid)
|
||||
|
||||
vals, err := r.routing.(*dht.IpfsDHT).SearchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount)))
|
||||
vals, err := r.searchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount)))
|
||||
if err != nil {
|
||||
log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err)
|
||||
out <- onceResult{err: err}
|
||||
close(out)
|
||||
cancel()
|
||||
return out
|
||||
}
|
||||
|
||||
go func() {
|
||||
defer cancel()
|
||||
defer close(out)
|
||||
for {
|
||||
select {
|
||||
@ -259,3 +173,14 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option
|
||||
|
||||
return out
|
||||
}
|
||||
|
||||
func (r *IpnsResolver) searchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) {
|
||||
if ir, ok := r.routing.(*dht.IpfsDHT); ok {
|
||||
return ir.SearchValue(ctx, key, opts...)
|
||||
}
|
||||
out := make(chan []byte, 1)
|
||||
val, err := r.routing.GetValue(ctx, key, opts...)
|
||||
out <- val
|
||||
close(out)
|
||||
return out, err
|
||||
}
|
||||
|
Reference in New Issue
Block a user