From 804634d59b249a00768390e479bec558035ef5f3 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=C5=81ukasz=20Magiera?= Date: Tue, 28 Aug 2018 15:37:12 +0200 Subject: [PATCH] namesys: switch to async code MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit License: MIT Signed-off-by: Ɓukasz Magiera --- namesys/base.go | 70 +++++--------- namesys/dns.go | 67 ++++--------- namesys/ipns_resolver_validation_test.go | 11 +-- namesys/namesys.go | 42 -------- namesys/namesys_test.go | 14 ++- namesys/proquint.go | 13 +-- namesys/routing.go | 117 ++++------------------- 7 files changed, 76 insertions(+), 258 deletions(-) diff --git a/namesys/base.go b/namesys/base.go index b3610cc0d..c24832799 100644 --- a/namesys/base.go +++ b/namesys/base.go @@ -17,45 +17,33 @@ type onceResult struct { } type resolver interface { - // resolveOnce looks up a name once (without recursion). - resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (value path.Path, ttl time.Duration, err error) - resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult } // resolve is a helper for implementing Resolver.ResolveN using resolveOnce. func resolve(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) (path.Path, error) { - depth := options.Depth - for { - p, _, err := r.resolveOnce(ctx, name, options) + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + err := ErrResolveFailed + var p path.Path + + resCh := resolveAsync(ctx, r, name, options, prefix) + + for res := range resCh { + p, err = res.Path, res.Err if err != nil { - return "", err - } - log.Debugf("resolved %s to %s", name, p.String()) - - if strings.HasPrefix(p.String(), "/ipfs/") { - // we've bottomed out with an IPFS path - return p, nil - } - - if depth == 1 { - return p, ErrResolveRecursion - } - - if !strings.HasPrefix(p.String(), prefix) { - return p, nil - } - name = strings.TrimPrefix(p.String(), prefix) - - if depth > 1 { - depth-- + break } } + + return p, err } //TODO: // - better error handling -func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result { +// - select on writes +func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result { resCh := r.resolveOnceAsync(ctx, name, options) depth := options.Depth outCh := make(chan Result) @@ -70,7 +58,7 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R case res, ok := <-resCh: if !ok { resCh = nil - continue + break } if res.err != nil { @@ -79,14 +67,13 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R } log.Debugf("resolved %s to %s", name, res.value.String()) if strings.HasPrefix(res.value.String(), "/ipfs/") { - outCh <- Result{Err: res.err} - continue + outCh <- Result{Path: res.value} + break } - p := strings.TrimPrefix(res.value.String(), prefix) if depth == 1 { - outCh <- Result{Err: ErrResolveRecursion} - continue + outCh <- Result{Path: res.value, Err: ErrResolveRecursion} + break } subopts := options @@ -102,26 +89,21 @@ func resolveAsyncDo(ctx context.Context, r resolver, name string, options opts.R subCtx, cancelSub = context.WithCancel(ctx) defer cancelSub() - subCh = resolveAsyncDo(subCtx, r, p, subopts, prefix) + p := strings.TrimPrefix(res.value.String(), prefix) + subCh = resolveAsync(subCtx, r, p, subopts, prefix) case res, ok := <-subCh: if !ok { subCh = nil - continue - } - - if res.Err != nil { - outCh <- Result{Err: res.Err} - return + break } outCh <- res case <-ctx.Done(): } + if resCh == nil && subCh == nil { + return + } } }() return outCh } - -func resolveAsync(ctx context.Context, r resolver, name string, options opts.ResolveOpts, prefix string) <-chan Result { - return resolveAsyncDo(ctx, r, name, options, prefix) -} diff --git a/namesys/dns.go b/namesys/dns.go index d92ce8fa7..f90880de9 100644 --- a/namesys/dns.go +++ b/namesys/dns.go @@ -5,7 +5,6 @@ import ( "errors" "net" "strings" - "time" opts "github.com/ipfs/go-ipfs/namesys/opts" isd "gx/ipfs/QmZmmuAXgX73UQmX1jRKjTGmjzq24Jinqkq8vzkBtno4uX/go-is-domain" @@ -31,6 +30,7 @@ func (r *DNSResolver) Resolve(ctx context.Context, name string, options ...opts. return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") } +// ResolveAsync implements Resolver. func (r *DNSResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/") } @@ -43,51 +43,6 @@ type lookupRes struct { // resolveOnce implements resolver. // TXT records for a given domain name should contain a b58 // encoded multihash. -func (r *DNSResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { - segments := strings.SplitN(name, "/", 2) - domain := segments[0] - - if !isd.IsDomain(domain) { - return "", 0, errors.New("not a valid domain name") - } - log.Debugf("DNSResolver resolving %s", domain) - - rootChan := make(chan lookupRes, 1) - go workDomain(r, domain, rootChan) - - subChan := make(chan lookupRes, 1) - go workDomain(r, "_dnslink."+domain, subChan) - - var subRes lookupRes - select { - case subRes = <-subChan: - case <-ctx.Done(): - return "", 0, ctx.Err() - } - - var p path.Path - if subRes.error == nil { - p = subRes.path - } else { - var rootRes lookupRes - select { - case rootRes = <-rootChan: - case <-ctx.Done(): - return "", 0, ctx.Err() - } - if rootRes.error == nil { - p = rootRes.path - } else { - return "", 0, ErrResolveFailed - } - } - var err error - if len(segments) > 1 { - p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[1]) - } - return p, 0, err -} - func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { out := make(chan onceResult, 1) segments := strings.SplitN(name, "/", 2) @@ -106,6 +61,13 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options subChan := make(chan lookupRes, 1) go workDomain(r, "_dnslink."+domain, subChan) + appendPath := func(p path.Path) (path.Path, error) { + if len(segments) > 1 { + return path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[1]) + } + return p, nil + } + go func() { defer close(out) for { @@ -113,21 +75,25 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options case subRes, ok := <-subChan: if !ok { subChan = nil + break } if subRes.error == nil { + p, err := appendPath(subRes.path) select { - case out <- onceResult{value: subRes.path}: + case out <- onceResult{value: p, err: err}: case <-ctx.Done(): } return } case rootRes, ok := <-rootChan: if !ok { - subChan = nil + rootChan = nil + break } if rootRes.error == nil { + p, err := appendPath(rootRes.path) select { - case out <- onceResult{value: rootRes.path}: + case out <- onceResult{value: p, err: err}: case <-ctx.Done(): } } @@ -144,8 +110,9 @@ func (r *DNSResolver) resolveOnceAsync(ctx context.Context, name string, options } func workDomain(r *DNSResolver, name string, res chan lookupRes) { - txt, err := r.lookupTXT(name) + defer close(res) + txt, err := r.lookupTXT(name) if err != nil { // Error is != nil res <- lookupRes{"", err} diff --git a/namesys/ipns_resolver_validation_test.go b/namesys/ipns_resolver_validation_test.go index cc99bf1d7..36e5fdc67 100644 --- a/namesys/ipns_resolver_validation_test.go +++ b/namesys/ipns_resolver_validation_test.go @@ -57,14 +57,13 @@ func TestResolverValidation(t *testing.T) { } // Resolve entry - resp, _, err := resolver.resolveOnce(ctx, id.Pretty(), opts.DefaultResolveOpts()) + resp, err := resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts(), "/ipns/") if err != nil { t.Fatal(err) } if resp != path.Path(p) { t.Fatalf("Mismatch between published path %s and resolved path %s", p, resp) } - // Create expired entry expiredEntry, err := ipns.Create(priv, p, 1, ts.Add(-1*time.Hour)) if err != nil { @@ -78,7 +77,7 @@ func TestResolverValidation(t *testing.T) { } // Record should fail validation because entry is expired - _, _, err = resolver.resolveOnce(ctx, id.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id.Pretty(), opts.DefaultResolveOpts(), "/ipns/") if err == nil { t.Fatal("ValidateIpnsRecord should have returned error") } @@ -100,7 +99,7 @@ func TestResolverValidation(t *testing.T) { // Record should fail validation because public key defined by // ipns path doesn't match record signature - _, _, err = resolver.resolveOnce(ctx, id2.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id2.Pretty(), opts.DefaultResolveOpts(), "/ipns/") if err == nil { t.Fatal("ValidateIpnsRecord should have failed signature verification") } @@ -118,7 +117,7 @@ func TestResolverValidation(t *testing.T) { // Record should fail validation because public key is not available // in peer store or on network - _, _, err = resolver.resolveOnce(ctx, id3.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts(), "/ipns/") if err == nil { t.Fatal("ValidateIpnsRecord should have failed because public key was not found") } @@ -133,7 +132,7 @@ func TestResolverValidation(t *testing.T) { // public key is available in the peer store by looking it up in // the DHT, which causes the DHT to fetch it and cache it in the // peer store - _, _, err = resolver.resolveOnce(ctx, id3.Pretty(), opts.DefaultResolveOpts()) + _, err = resolve(ctx, resolver, id3.Pretty(), opts.DefaultResolveOpts(), "/ipns/") if err != nil { t.Fatal(err) } diff --git a/namesys/namesys.go b/namesys/namesys.go index 0842486ea..1099997b2 100644 --- a/namesys/namesys.go +++ b/namesys/namesys.go @@ -82,48 +82,6 @@ func (ns *mpns) ResolveAsync(ctx context.Context, name string, options ...opts.R } // resolveOnce implements resolver. -func (ns *mpns) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { - if !strings.HasPrefix(name, "/ipns/") { - name = "/ipns/" + name - } - segments := strings.SplitN(name, "/", 4) - if len(segments) < 3 || segments[0] != "" { - log.Debugf("invalid name syntax for %s", name) - return "", 0, ErrResolveFailed - } - - key := segments[2] - - p, ok := ns.cacheGet(key) - var err error - if !ok { - // Resolver selection: - // 1. if it is a multihash resolve through "ipns". - // 2. if it is a domain name, resolve through "dns" - // 3. otherwise resolve through the "proquint" resolver - var res resolver - if _, err := mh.FromB58String(key); err == nil { - res = ns.ipnsResolver - } else if isd.IsDomain(key) { - res = ns.dnsResolver - } else { - res = ns.proquintResolver - } - - var ttl time.Duration - p, ttl, err = res.resolveOnce(ctx, key, options) - if err != nil { - return "", 0, ErrResolveFailed - } - ns.cacheSet(key, p, ttl) - } - - if len(segments) > 3 { - p, err = path.FromSegments("", strings.TrimRight(p.String(), "/"), segments[3]) - } - return p, 0, err -} - func (ns *mpns) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { out := make(chan onceResult, 1) diff --git a/namesys/namesys_test.go b/namesys/namesys_test.go index 23a1852f8..09d5cf81e 100644 --- a/namesys/namesys_test.go +++ b/namesys/namesys_test.go @@ -4,12 +4,11 @@ import ( "context" "fmt" "testing" - "time" opts "github.com/ipfs/go-ipfs/namesys/opts" + "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs" path "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path" - ci "gx/ipfs/QmPvyPwuCgJ7pDmrKDxRtsScJgBaM5h4EpRL2qQJsmXf4n/go-libp2p-crypto" offroute "gx/ipfs/QmScZySgru9jaoDa12sSfvh21sWbqF5eXkieTmJzAHJXkQ/go-ipfs-routing/offline" ds "gx/ipfs/QmUyz7JTJzgegC6tiJrfby3mPhzcdswVtG4x58TQ6pq8jV/go-datastore" @@ -38,13 +37,12 @@ func testResolution(t *testing.T, resolver Resolver, name string, depth uint, ex } } -func (r *mockResolver) resolveOnce(ctx context.Context, name string, opts opts.ResolveOpts) (path.Path, time.Duration, error) { - p, err := path.ParsePath(r.entries[name]) - return p, 0, err -} - func (r *mockResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { - panic("stub") + p, err := path.ParsePath(r.entries[name]) + out := make(chan onceResult, 1) + out <- onceResult{value: p, err: err} + close(out) + return out } func mockResolverOne() *mockResolver { diff --git a/namesys/proquint.go b/namesys/proquint.go index 279c361fd..ad09fd48c 100644 --- a/namesys/proquint.go +++ b/namesys/proquint.go @@ -1,10 +1,8 @@ package namesys import ( + "context" "errors" - "time" - - context "context" opts "github.com/ipfs/go-ipfs/namesys/opts" proquint "gx/ipfs/QmYnf27kzqR2cxt6LFZdrAFJuQd6785fTkBvMuEj9EeRxM/proquint" @@ -19,15 +17,6 @@ func (r *ProquintResolver) Resolve(ctx context.Context, name string, options ... } // resolveOnce implements resolver. Decodes the proquint string. -func (r *ProquintResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { - ok, err := proquint.IsProquint(name) - if err != nil || !ok { - return "", 0, errors.New("not a valid proquint string") - } - // Return a 0 TTL as caching this result is pointless. - return path.FromString(string(proquint.Decode(name))), 0, nil -} - func (r *ProquintResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { out := make(chan onceResult, 1) defer close(out) diff --git a/namesys/routing.go b/namesys/routing.go index d633c2d8d..c591d5662 100644 --- a/namesys/routing.go +++ b/namesys/routing.go @@ -11,6 +11,7 @@ import ( cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" mh "gx/ipfs/QmPnFwZ2JXKnXgMw8CdBPxn7FWh6LLdjUjxV1fKHuJnkr8/go-multihash" routing "gx/ipfs/QmVBnJDKhtFXTRVjXKinqpwGu8t1DyNqPKan2iGX8PR8xG/go-libp2p-routing" + ropts "gx/ipfs/QmVBnJDKhtFXTRVjXKinqpwGu8t1DyNqPKan2iGX8PR8xG/go-libp2p-routing/options" logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log" dht "gx/ipfs/QmZVakpN44VAUxs9eXAuUGLFYTCGmSyqSy6hyEKfMv68ME/go-libp2p-kad-dht" ipns "gx/ipfs/QmZrmn2BPZbSviQAWeyY2iXkCukmJHv9n7zrLgWU5KgbTb/go-ipns" @@ -42,120 +43,30 @@ func (r *IpnsResolver) Resolve(ctx context.Context, name string, options ...opts return resolve(ctx, r, name, opts.ProcessOpts(options), "/ipns/") } +// ResolveAsync implements Resolver. func (r *IpnsResolver) ResolveAsync(ctx context.Context, name string, options ...opts.ResolveOpt) <-chan Result { return resolveAsync(ctx, r, name, opts.ProcessOpts(options), "/ipns/") } // resolveOnce implements resolver. Uses the IPFS routing system to // resolve SFS-like names. -func (r *IpnsResolver) resolveOnce(ctx context.Context, name string, options opts.ResolveOpts) (path.Path, time.Duration, error) { +func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { + out := make(chan onceResult, 1) log.Debugf("RoutingResolver resolving %s", name) + cancel := func() {} if options.DhtTimeout != 0 { // Resolution must complete within the timeout - var cancel context.CancelFunc ctx, cancel = context.WithTimeout(ctx, options.DhtTimeout) - defer cancel() } name = strings.TrimPrefix(name, "/ipns/") pid, err := peer.IDB58Decode(name) - if err != nil { - // name should be a multihash. if it isn't, error out here. - log.Debugf("RoutingResolver: IPNS address not a valid peer ID: [%s]\n", name) - return "", 0, err - } - - // Name should be the hash of a public key retrievable from ipfs. - // We retrieve the public key here to make certain that it's in the peer - // store before calling GetValue() on the DHT - the DHT will call the - // ipns validator, which in turn will get the public key from the peer - // store to verify the record signature - _, err = routing.GetPublicKey(r.routing, ctx, pid) - if err != nil { - log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err) - return "", 0, err - } - - // Use the routing system to get the name. - // Note that the DHT will call the ipns validator when retrieving - // the value, which in turn verifies the ipns record signature - ipnsKey := ipns.RecordKey(pid) - val, err := r.routing.GetValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) - if err != nil { - log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err) - return "", 0, err - } - - entry := new(pb.IpnsEntry) - err = proto.Unmarshal(val, entry) - if err != nil { - log.Debugf("RoutingResolver: could not unmarshal value for name %s: %s", name, err) - return "", 0, err - } - - var p path.Path - // check for old style record: - if valh, err := mh.Cast(entry.GetValue()); err == nil { - // Its an old style multihash record - log.Debugf("encountered CIDv0 ipns entry: %s", valh) - p = path.FromCid(cid.NewCidV0(valh)) - } else { - // Not a multihash, probably a new record - p, err = path.ParsePath(string(entry.GetValue())) - if err != nil { - return "", 0, err - } - } - - ttl := DefaultResolverCacheTTL - if entry.Ttl != nil { - ttl = time.Duration(*entry.Ttl) - } - switch eol, err := ipns.GetEOL(entry); err { - case ipns.ErrUnrecognizedValidity: - // No EOL. - case nil: - ttEol := eol.Sub(time.Now()) - if ttEol < 0 { - // It *was* valid when we first resolved it. - ttl = 0 - } else if ttEol < ttl { - ttl = ttEol - } - default: - log.Errorf("encountered error when parsing EOL: %s", err) - return "", 0, err - } - - return p, ttl, nil -} - -func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, options opts.ResolveOpts) <-chan onceResult { - out := make(chan onceResult, 1) - log.Debugf("RoutingResolver resolving %s", name) - if options.DhtTimeout != 0 { - // Resolution must complete within the timeout - var cancel context.CancelFunc - ctx, cancel = context.WithTimeout(ctx, options.DhtTimeout) - defer cancel() - } - - name = strings.TrimPrefix(name, "/ipns/") - hash, err := mh.FromB58String(name) - if err != nil { - // name should be a multihash. if it isn't, error out here. - log.Debugf("RoutingResolver: bad input hash: [%s]\n", name) - out <- onceResult{err: err} - close(out) - return out - } - - pid, err := peer.IDFromBytes(hash) if err != nil { log.Debugf("RoutingResolver: could not convert public key hash %s to peer ID: %s\n", name, err) out <- onceResult{err: err} close(out) + cancel() return out } @@ -169,6 +80,7 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option log.Debugf("RoutingResolver: could not retrieve public key %s: %s\n", name, err) out <- onceResult{err: err} close(out) + cancel() return out } @@ -177,15 +89,17 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option // the value, which in turn verifies the ipns record signature ipnsKey := ipns.RecordKey(pid) - vals, err := r.routing.(*dht.IpfsDHT).SearchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) + vals, err := r.searchValue(ctx, ipnsKey, dht.Quorum(int(options.DhtRecordCount))) if err != nil { log.Debugf("RoutingResolver: dht get for name %s failed: %s", name, err) out <- onceResult{err: err} close(out) + cancel() return out } go func() { + defer cancel() defer close(out) for { select { @@ -259,3 +173,14 @@ func (r *IpnsResolver) resolveOnceAsync(ctx context.Context, name string, option return out } + +func (r *IpnsResolver) searchValue(ctx context.Context, key string, opts ...ropts.Option) (<-chan []byte, error) { + if ir, ok := r.routing.(*dht.IpfsDHT); ok { + return ir.SearchValue(ctx, key, opts...) + } + out := make(chan []byte, 1) + val, err := r.routing.GetValue(ctx, key, opts...) + out <- val + close(out) + return out, err +}