mirror of
https://github.com/ipfs/kubo.git
synced 2025-05-20 00:18:12 +08:00

EnumerateChildrenAsync has been renamed to WalkParallel to reflect the fact that: 1. It visits the root. 2. It's parallel, not async. To mirror this change, EnumerateChildren has also been renamed to Walk and now behaves the same (except that it's not parallel).
142 lines
3.0 KiB
Go
142 lines
3.0 KiB
Go
package coreapi
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
blockservice "github.com/ipfs/go-blockservice"
|
|
cid "github.com/ipfs/go-cid"
|
|
cidutil "github.com/ipfs/go-cidutil"
|
|
blockstore "github.com/ipfs/go-ipfs-blockstore"
|
|
offline "github.com/ipfs/go-ipfs-exchange-offline"
|
|
dag "github.com/ipfs/go-merkledag"
|
|
coreiface "github.com/ipfs/interface-go-ipfs-core"
|
|
caopts "github.com/ipfs/interface-go-ipfs-core/options"
|
|
path "github.com/ipfs/interface-go-ipfs-core/path"
|
|
peer "github.com/libp2p/go-libp2p-core/peer"
|
|
routing "github.com/libp2p/go-libp2p-core/routing"
|
|
)
|
|
|
|
type DhtAPI CoreAPI
|
|
|
|
func (api *DhtAPI) FindPeer(ctx context.Context, p peer.ID) (peer.AddrInfo, error) {
|
|
err := api.checkOnline(false)
|
|
if err != nil {
|
|
return peer.AddrInfo{}, err
|
|
}
|
|
|
|
pi, err := api.routing.FindPeer(ctx, peer.ID(p))
|
|
if err != nil {
|
|
return peer.AddrInfo{}, err
|
|
}
|
|
|
|
return pi, nil
|
|
}
|
|
|
|
func (api *DhtAPI) FindProviders(ctx context.Context, p path.Path, opts ...caopts.DhtFindProvidersOption) (<-chan peer.AddrInfo, error) {
|
|
settings, err := caopts.DhtFindProvidersOptions(opts...)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
err = api.checkOnline(false)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
rp, err := api.core().ResolvePath(ctx, p)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
numProviders := settings.NumProviders
|
|
if numProviders < 1 {
|
|
return nil, fmt.Errorf("number of providers must be greater than 0")
|
|
}
|
|
|
|
pchan := api.routing.FindProvidersAsync(ctx, rp.Cid(), numProviders)
|
|
return pchan, nil
|
|
}
|
|
|
|
func (api *DhtAPI) Provide(ctx context.Context, path path.Path, opts ...caopts.DhtProvideOption) error {
|
|
settings, err := caopts.DhtProvideOptions(opts...)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
err = api.checkOnline(false)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
rp, err := api.core().ResolvePath(ctx, path)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
c := rp.Cid()
|
|
|
|
has, err := api.blockstore.Has(c)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if !has {
|
|
return fmt.Errorf("block %s not found locally, cannot provide", c)
|
|
}
|
|
|
|
if settings.Recursive {
|
|
err = provideKeysRec(ctx, api.routing, api.blockstore, []cid.Cid{c})
|
|
} else {
|
|
err = provideKeys(ctx, api.routing, []cid.Cid{c})
|
|
}
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func provideKeys(ctx context.Context, r routing.Routing, cids []cid.Cid) error {
|
|
for _, c := range cids {
|
|
err := r.Provide(ctx, c, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func provideKeysRec(ctx context.Context, r routing.Routing, bs blockstore.Blockstore, cids []cid.Cid) error {
|
|
provided := cidutil.NewStreamingSet()
|
|
|
|
errCh := make(chan error)
|
|
go func() {
|
|
dserv := dag.NewDAGService(blockservice.New(bs, offline.Exchange(bs)))
|
|
for _, c := range cids {
|
|
err := dag.WalkParallel(ctx, dag.GetLinksDirect(dserv), c, provided.Visitor(ctx))
|
|
if err != nil {
|
|
errCh <- err
|
|
}
|
|
}
|
|
}()
|
|
|
|
for {
|
|
select {
|
|
case k := <-provided.New:
|
|
err := r.Provide(ctx, k, true)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
case err := <-errCh:
|
|
return err
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (api *DhtAPI) core() coreiface.CoreAPI {
|
|
return (*CoreAPI)(api)
|
|
}
|