mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-28 00:39:31 +08:00
Merge pull request #2257 from ipfs/feat/promise-fail
allow promises to fail
This commit is contained in:
@ -176,9 +176,8 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
|
|||||||
}
|
}
|
||||||
|
|
||||||
promises := make([]NodeGetter, len(keys))
|
promises := make([]NodeGetter, len(keys))
|
||||||
sendChans := make([]chan<- *Node, len(keys))
|
|
||||||
for i := range keys {
|
for i := range keys {
|
||||||
promises[i], sendChans[i] = newNodePromise(ctx)
|
promises[i] = newNodePromise(ctx)
|
||||||
}
|
}
|
||||||
|
|
||||||
dedupedKeys := dedupeKeys(keys)
|
dedupedKeys := dedupeKeys(keys)
|
||||||
@ -192,11 +191,16 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
|
|||||||
select {
|
select {
|
||||||
case opt, ok := <-nodechan:
|
case opt, ok := <-nodechan:
|
||||||
if !ok {
|
if !ok {
|
||||||
|
for _, p := range promises {
|
||||||
|
p.Fail(ErrNotFound)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
if opt.Err != nil {
|
if opt.Err != nil {
|
||||||
log.Error("error fetching: ", opt.Err)
|
for _, p := range promises {
|
||||||
|
p.Fail(opt.Err)
|
||||||
|
}
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -211,7 +215,7 @@ func GetNodes(ctx context.Context, ds DAGService, keys []key.Key) []NodeGetter {
|
|||||||
is := FindLinks(keys, k, 0)
|
is := FindLinks(keys, k, 0)
|
||||||
for _, i := range is {
|
for _, i := range is {
|
||||||
count++
|
count++
|
||||||
sendChans[i] <- nd
|
promises[i].Send(nd)
|
||||||
}
|
}
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
@ -234,18 +238,20 @@ func dedupeKeys(ks []key.Key) []key.Key {
|
|||||||
return out
|
return out
|
||||||
}
|
}
|
||||||
|
|
||||||
func newNodePromise(ctx context.Context) (NodeGetter, chan<- *Node) {
|
func newNodePromise(ctx context.Context) NodeGetter {
|
||||||
ch := make(chan *Node, 1)
|
|
||||||
return &nodePromise{
|
return &nodePromise{
|
||||||
recv: ch,
|
recv: make(chan *Node, 1),
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
}, ch
|
err: make(chan error, 1),
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type nodePromise struct {
|
type nodePromise struct {
|
||||||
cache *Node
|
cache *Node
|
||||||
recv <-chan *Node
|
clk sync.Mutex
|
||||||
|
recv chan *Node
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
|
err chan error
|
||||||
}
|
}
|
||||||
|
|
||||||
// NodeGetter provides a promise like interface for a dag Node
|
// NodeGetter provides a promise like interface for a dag Node
|
||||||
@ -254,22 +260,57 @@ type nodePromise struct {
|
|||||||
// cached node.
|
// cached node.
|
||||||
type NodeGetter interface {
|
type NodeGetter interface {
|
||||||
Get(context.Context) (*Node, error)
|
Get(context.Context) (*Node, error)
|
||||||
|
Fail(err error)
|
||||||
|
Send(*Node)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (np *nodePromise) Fail(err error) {
|
||||||
|
np.clk.Lock()
|
||||||
|
v := np.cache
|
||||||
|
np.clk.Unlock()
|
||||||
|
|
||||||
|
// if promise has a value, don't fail it
|
||||||
|
if v != nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
np.err <- err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (np *nodePromise) Send(nd *Node) {
|
||||||
|
var already bool
|
||||||
|
np.clk.Lock()
|
||||||
|
if np.cache != nil {
|
||||||
|
already = true
|
||||||
|
}
|
||||||
|
np.cache = nd
|
||||||
|
np.clk.Unlock()
|
||||||
|
|
||||||
|
if already {
|
||||||
|
panic("sending twice to the same promise is an error!")
|
||||||
|
}
|
||||||
|
|
||||||
|
np.recv <- nd
|
||||||
}
|
}
|
||||||
|
|
||||||
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
|
func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
|
||||||
if np.cache != nil {
|
np.clk.Lock()
|
||||||
return np.cache, nil
|
c := np.cache
|
||||||
|
np.clk.Unlock()
|
||||||
|
if c != nil {
|
||||||
|
return c, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case blk := <-np.recv:
|
case nd := <-np.recv:
|
||||||
np.cache = blk
|
return nd, nil
|
||||||
case <-np.ctx.Done():
|
case <-np.ctx.Done():
|
||||||
return nil, np.ctx.Err()
|
return nil, np.ctx.Err()
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return nil, ctx.Err()
|
return nil, ctx.Err()
|
||||||
|
case err := <-np.err:
|
||||||
|
return nil, err
|
||||||
}
|
}
|
||||||
return np.cache, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Batch struct {
|
type Batch struct {
|
||||||
|
@ -20,6 +20,7 @@ import (
|
|||||||
imp "github.com/ipfs/go-ipfs/importer"
|
imp "github.com/ipfs/go-ipfs/importer"
|
||||||
chunk "github.com/ipfs/go-ipfs/importer/chunk"
|
chunk "github.com/ipfs/go-ipfs/importer/chunk"
|
||||||
. "github.com/ipfs/go-ipfs/merkledag"
|
. "github.com/ipfs/go-ipfs/merkledag"
|
||||||
|
dstest "github.com/ipfs/go-ipfs/merkledag/test"
|
||||||
"github.com/ipfs/go-ipfs/pin"
|
"github.com/ipfs/go-ipfs/pin"
|
||||||
uio "github.com/ipfs/go-ipfs/unixfs/io"
|
uio "github.com/ipfs/go-ipfs/unixfs/io"
|
||||||
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
|
u "gx/ipfs/QmZNVWh8LLjAavuQ2JXuFmuYH3C11xo988vSgp7UQrTRj1/go-ipfs-util"
|
||||||
@ -323,3 +324,46 @@ func TestEnumerateChildren(t *testing.T) {
|
|||||||
|
|
||||||
traverse(root)
|
traverse(root)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestFetchFailure(t *testing.T) {
|
||||||
|
ds := dstest.Mock()
|
||||||
|
ds_bad := dstest.Mock()
|
||||||
|
|
||||||
|
top := new(Node)
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
nd := &Node{Data: []byte{byte('a' + i)}}
|
||||||
|
_, err := ds.Add(nd)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = top.AddNodeLinkClean(fmt.Sprintf("AA%d", i), nd)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
for i := 0; i < 10; i++ {
|
||||||
|
nd := &Node{Data: []byte{'f', 'a' + byte(i)}}
|
||||||
|
_, err := ds_bad.Add(nd)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
|
||||||
|
err = top.AddNodeLinkClean(fmt.Sprintf("BB%d", i), nd)
|
||||||
|
if err != nil {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
getters := GetDAG(context.Background(), ds, top)
|
||||||
|
for i, getter := range getters {
|
||||||
|
_, err := getter.Get(context.Background())
|
||||||
|
if err != nil && i < 10 {
|
||||||
|
t.Fatal(err)
|
||||||
|
}
|
||||||
|
if err == nil && i >= 10 {
|
||||||
|
t.Fatal("should have failed request")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
@ -113,9 +113,30 @@ test_get_cmd() {
|
|||||||
'
|
'
|
||||||
}
|
}
|
||||||
|
|
||||||
|
test_get_fail() {
|
||||||
|
test_expect_success "create an object that has unresolveable links" '
|
||||||
|
cat <<-\EOF >bad_object &&
|
||||||
|
{ "Links": [ { "Name": "foo", "Hash": "QmZzaC6ydNXiR65W8VjGA73ET9MZ6VFAqUT1ngYMXcpihn", "Size": 1897 }, { "Name": "bar", "Hash": "Qmd4mG6pDFDmDTn6p3hX1srP8qTbkyXKj5yjpEsiHDX3u8", "Size": 56 }, { "Name": "baz", "Hash": "QmUTjwRnG28dSrFFVTYgbr6LiDLsBmRr2SaUSTGheK2YqG", "Size": 24266 } ], "Data": "\b\u0001" }
|
||||||
|
EOF
|
||||||
|
cat bad_object | ipfs object put > put_out
|
||||||
|
'
|
||||||
|
|
||||||
|
test_expect_success "output looks good" '
|
||||||
|
echo "added QmaGidyrnX8FMbWJoxp8HVwZ1uRKwCyxBJzABnR1S2FVUr" > put_exp &&
|
||||||
|
test_cmp put_exp put_out
|
||||||
|
'
|
||||||
|
|
||||||
|
test_expect_success "ipfs get fails" '
|
||||||
|
test_expect_code 1 ipfs get QmaGidyrnX8FMbWJoxp8HVwZ1uRKwCyxBJzABnR1S2FVUr
|
||||||
|
'
|
||||||
|
}
|
||||||
|
|
||||||
# should work offline
|
# should work offline
|
||||||
test_get_cmd
|
test_get_cmd
|
||||||
|
|
||||||
|
# only really works offline, will try and search network when online
|
||||||
|
test_get_fail
|
||||||
|
|
||||||
# should work online
|
# should work online
|
||||||
test_launch_ipfs_daemon
|
test_launch_ipfs_daemon
|
||||||
test_get_cmd
|
test_get_cmd
|
||||||
|
Reference in New Issue
Block a user