1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-20 02:21:48 +08:00

coreapi unixfs: progress events

License: MIT
Signed-off-by: Łukasz Magiera <magik6k@gmail.com>
This commit is contained in:
Łukasz Magiera
2018-10-04 01:00:26 +02:00
parent f9cf84965d
commit a81ec29baa
8 changed files with 199 additions and 44 deletions

View File

@ -8,6 +8,7 @@ import (
core "github.com/ipfs/go-ipfs/core" core "github.com/ipfs/go-ipfs/core"
cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
"github.com/ipfs/go-ipfs/core/coreunix" "github.com/ipfs/go-ipfs/core/coreunix"
filestore "github.com/ipfs/go-ipfs/filestore" filestore "github.com/ipfs/go-ipfs/filestore"
ft "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs" ft "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs"
@ -371,7 +372,7 @@ You can now check what blocks have been created by:
break LOOP break LOOP
} }
output := out.(*coreunix.AddedObject) output := out.(*coreiface.AddEvent)
if len(output.Hash) > 0 { if len(output.Hash) > 0 {
lastHash = output.Hash lastHash = output.Hash
if quieter { if quieter {
@ -451,5 +452,5 @@ You can now check what blocks have been created by:
} }
}, },
}, },
Type: coreunix.AddedObject{}, Type: coreiface.AddEvent{},
} }

View File

@ -7,7 +7,7 @@ import (
cmds "github.com/ipfs/go-ipfs/commands" cmds "github.com/ipfs/go-ipfs/commands"
core "github.com/ipfs/go-ipfs/core" core "github.com/ipfs/go-ipfs/core"
e "github.com/ipfs/go-ipfs/core/commands/e" e "github.com/ipfs/go-ipfs/core/commands/e"
"github.com/ipfs/go-ipfs/core/coreunix" coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
tar "github.com/ipfs/go-ipfs/tar" tar "github.com/ipfs/go-ipfs/tar"
dag "gx/ipfs/QmcBoNcAP6qDjgRBew7yjvCqHq7p5jMstE44jPUBWBxzsV/go-merkledag" dag "gx/ipfs/QmcBoNcAP6qDjgRBew7yjvCqHq7p5jMstE44jPUBWBxzsV/go-merkledag"
path "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path" path "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path"
@ -60,12 +60,12 @@ represent it.
c := node.Cid() c := node.Cid()
fi.FileName() fi.FileName()
res.SetOutput(&coreunix.AddedObject{ res.SetOutput(&coreiface.AddEvent{
Name: fi.FileName(), Name: fi.FileName(),
Hash: c.String(), Hash: c.String(),
}) })
}, },
Type: coreunix.AddedObject{}, Type: coreiface.AddEvent{},
Marshalers: cmds.MarshalerMap{ Marshalers: cmds.MarshalerMap{
cmds.Text: func(res cmds.Response) (io.Reader, error) { cmds.Text: func(res cmds.Response) (io.Reader, error) {
v, err := unwrapOutput(res.Output()) v, err := unwrapOutput(res.Output())
@ -73,7 +73,7 @@ represent it.
return nil, err return nil, err
} }
o, ok := v.(*coreunix.AddedObject) o, ok := v.(*coreiface.AddEvent)
if !ok { if !ok {
return nil, e.TypeErr(o, v) return nil, e.TypeErr(o, v)
} }

View File

@ -35,6 +35,10 @@ type UnixfsAddSettings struct {
Wrap bool Wrap bool
Hidden bool Hidden bool
StdinName string StdinName string
Events chan<- interface{}
Silent bool
Progress bool
} }
type UnixfsAddOption func(*UnixfsAddSettings) error type UnixfsAddOption func(*UnixfsAddSettings) error
@ -59,6 +63,10 @@ func UnixfsAddOptions(opts ...UnixfsAddOption) (*UnixfsAddSettings, cid.Prefix,
Wrap: false, Wrap: false,
Hidden: false, Hidden: false,
StdinName: "", StdinName: "",
Events: nil,
Silent: false,
Progress: false,
} }
for _, opt := range opts { for _, opt := range opts {
@ -236,3 +244,30 @@ func (unixfsOpts) StdinName(name string) UnixfsAddOption {
return nil return nil
} }
} }
// Events specifies channel which will be used to report events about ongoing
// Add operation.
//
// Note that if this channel blocks it may slowdown the adder
func (unixfsOpts) Events(sink chan<- interface{}) UnixfsAddOption {
return func(settings *UnixfsAddSettings) error {
settings.Events = sink
return nil
}
}
// Silent reduces event output
func (unixfsOpts) Silent(silent bool) UnixfsAddOption {
return func(settings *UnixfsAddSettings) error {
settings.Silent = silent
return nil
}
}
// Progress tells the adder whether to enable progress events
func (unixfsOpts) Progress(enable bool) UnixfsAddOption {
return func(settings *UnixfsAddSettings) error {
settings.Progress = enable
return nil
}
}

View File

@ -9,6 +9,14 @@ import (
ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format"
) )
// TODO: ideas on making this more coreapi-ish without breaking the http API?
type AddEvent struct {
Name string
Hash string `json:",omitempty"`
Bytes int64 `json:",omitempty"`
Size string `json:",omitempty"`
}
// UnixfsAPI is the basic interface to immutable files in IPFS // UnixfsAPI is the basic interface to immutable files in IPFS
// NOTE: This API is heavily WIP, things are guaranteed to break frequently // NOTE: This API is heavily WIP, things are guaranteed to break frequently
type UnixfsAPI interface { type UnixfsAPI interface {
@ -24,6 +32,7 @@ type UnixfsAPI interface {
Get(context.Context, Path) (files.File, error) Get(context.Context, Path) (files.File, error)
// Cat returns a reader for the file // Cat returns a reader for the file
// TODO: Remove in favour of Get (if we use Get on a file we still have reader directly, so..)
Cat(context.Context, Path) (Reader, error) Cat(context.Context, Path) (Reader, error)
// Ls returns the list of links in a directory // Ls returns the list of links in a directory

View File

@ -64,11 +64,14 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.File, opts ...options
} }
fileAdder.Chunker = settings.Chunker fileAdder.Chunker = settings.Chunker
//fileAdder.Progress = progress if settings.Events != nil {
fileAdder.Out = settings.Events
fileAdder.Progress = settings.Progress
}
fileAdder.Hidden = settings.Hidden fileAdder.Hidden = settings.Hidden
fileAdder.Wrap = settings.Wrap fileAdder.Wrap = settings.Wrap
fileAdder.Pin = settings.Pin && !settings.OnlyHash fileAdder.Pin = settings.Pin && !settings.OnlyHash
fileAdder.Silent = true fileAdder.Silent = settings.Silent
fileAdder.RawLeaves = settings.RawLeaves fileAdder.RawLeaves = settings.RawLeaves
//fileAdder.NoCopy = nocopy //fileAdder.NoCopy = nocopy
fileAdder.Name = settings.StdinName fileAdder.Name = settings.StdinName

View File

@ -9,7 +9,9 @@ import (
"io/ioutil" "io/ioutil"
"math" "math"
"os" "os"
"strconv"
"strings" "strings"
"sync"
"testing" "testing"
"github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core"
@ -147,6 +149,13 @@ func twoLevelDir() func() files.File {
} }
} }
func flatDir() files.File {
return files.NewSliceFile("t", "t", []files.File{
files.NewReaderFile("t/bar", "t/bar", ioutil.NopCloser(strings.NewReader("hello2")), nil),
files.NewReaderFile("t/foo", "t/foo", ioutil.NopCloser(strings.NewReader("hello1")), nil),
})
}
func wrapped(f files.File) files.File { func wrapped(f files.File) files.File {
return files.NewSliceFile("", "", []files.File{ return files.NewSliceFile("", "", []files.File{
f, f,
@ -170,6 +179,8 @@ func TestAdd(t *testing.T) {
recursive bool recursive bool
events []coreiface.AddEvent
opts []options.UnixfsAddOption opts []options.UnixfsAddOption
}{ }{
// Simple cases // Simple cases
@ -263,13 +274,8 @@ func TestAdd(t *testing.T) {
}, },
// multi file // multi file
{ {
name: "simpleDir", name: "simpleDir",
data: func() files.File { data: flatDir,
return files.NewSliceFile("t", "t", []files.File{
files.NewReaderFile("t/bar", "t/bar", ioutil.NopCloser(strings.NewReader("hello2")), nil),
files.NewReaderFile("t/foo", "t/foo", ioutil.NopCloser(strings.NewReader("hello1")), nil),
})
},
recursive: true, recursive: true,
path: "/ipfs/QmRKGpFfR32FVXdvJiHfo4WJ5TDYBsM1P9raAp1p6APWSp", path: "/ipfs/QmRKGpFfR32FVXdvJiHfo4WJ5TDYBsM1P9raAp1p6APWSp",
}, },
@ -300,7 +306,7 @@ func TestAdd(t *testing.T) {
files.NewReaderFile("QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk", "QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk", ioutil.NopCloser(strings.NewReader(helloStr)), nil), files.NewReaderFile("QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk", "QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk", ioutil.NopCloser(strings.NewReader(helloStr)), nil),
}) })
}, },
opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true)}, opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true)},
}, },
{ {
name: "stdinNamed", name: "stdinNamed",
@ -313,7 +319,7 @@ func TestAdd(t *testing.T) {
files.NewReaderFile("test", "test", ioutil.NopCloser(strings.NewReader(helloStr)), nil), files.NewReaderFile("test", "test", ioutil.NopCloser(strings.NewReader(helloStr)), nil),
}) })
}, },
opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true), options.Unixfs.StdinName("test")}, opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true), options.Unixfs.StdinName("test")},
}, },
{ {
name: "twoLevelDirWrapped", name: "twoLevelDirWrapped",
@ -363,19 +369,71 @@ func TestAdd(t *testing.T) {
}) })
}, },
expect: func(files.File) files.File { expect: func(files.File) files.File {
return files.NewSliceFile("t", "t", []files.File{ return flatDir()
files.NewReaderFile("t/bar", "t/bar", ioutil.NopCloser(strings.NewReader("hello2")), nil),
files.NewReaderFile("t/foo", "t/foo", ioutil.NopCloser(strings.NewReader("hello1")), nil),
})
}, },
recursive: true, recursive: true,
path: "/ipfs/QmRKGpFfR32FVXdvJiHfo4WJ5TDYBsM1P9raAp1p6APWSp", path: "/ipfs/QmRKGpFfR32FVXdvJiHfo4WJ5TDYBsM1P9raAp1p6APWSp",
opts: []options.UnixfsAddOption{options.Unixfs.Hidden(false)}, opts: []options.UnixfsAddOption{options.Unixfs.Hidden(false)},
}, },
// Events / Progress
{
name: "simpleAddEvent",
data: strFile(helloStr),
path: "/ipfs/zb2rhdhmJjJZs9qkhQCpCQ7VREFkqWw3h1r8utjVvQugwHPFd",
events: []coreiface.AddEvent{
{Name: "zb2rhdhmJjJZs9qkhQCpCQ7VREFkqWw3h1r8utjVvQugwHPFd", Hash: "zb2rhdhmJjJZs9qkhQCpCQ7VREFkqWw3h1r8utjVvQugwHPFd", Size: strconv.Itoa(len(helloStr))},
},
opts: []options.UnixfsAddOption{options.Unixfs.RawLeaves(true)},
},
{
name: "silentAddEvent",
data: twoLevelDir(),
path: "/ipfs/QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr",
events: []coreiface.AddEvent{
{Name: "t/abc", Hash: "QmU7nuGs2djqK99UNsNgEPGh6GV4662p6WtsgccBNGTDxt", Size: "62"},
{Name: "t", Hash: "QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", Size: "229"},
},
recursive: true,
opts: []options.UnixfsAddOption{options.Unixfs.Silent(true)},
},
{
name: "dirAddEvents",
data: twoLevelDir(),
path: "/ipfs/QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr",
events: []coreiface.AddEvent{
{Name: "t/abc/def", Hash: "QmNyJpQkU1cEkBwMDhDNFstr42q55mqG5GE5Mgwug4xyGk", Size: "13"},
{Name: "t/bar", Hash: "QmS21GuXiRMvJKHos4ZkEmQDmRBqRaF5tQS2CQCu2ne9sY", Size: "14"},
{Name: "t/foo", Hash: "QmfAjGiVpTN56TXi6SBQtstit5BEw3sijKj1Qkxn6EXKzJ", Size: "14"},
{Name: "t/abc", Hash: "QmU7nuGs2djqK99UNsNgEPGh6GV4662p6WtsgccBNGTDxt", Size: "62"},
{Name: "t", Hash: "QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", Size: "229"},
},
recursive: true,
},
{
name: "progress1M",
data: func() files.File {
r := bytes.NewReader(bytes.Repeat([]byte{0}, 1000000))
return files.NewReaderFile("", "", ioutil.NopCloser(r), nil)
},
path: "/ipfs/QmXXNNbwe4zzpdMg62ZXvnX1oU7MwSrQ3vAEtuwFKCm1oD",
events: []coreiface.AddEvent{
{Name: "", Bytes: 262144},
{Name: "", Bytes: 524288},
{Name: "", Bytes: 786432},
{Name: "", Bytes: 1000000},
{Name: "QmXXNNbwe4zzpdMg62ZXvnX1oU7MwSrQ3vAEtuwFKCm1oD", Hash: "QmXXNNbwe4zzpdMg62ZXvnX1oU7MwSrQ3vAEtuwFKCm1oD", Size: "1000256"},
},
recursive: true,
opts: []options.UnixfsAddOption{options.Unixfs.Progress(true)},
},
} }
for _, testCase := range cases { for _, testCase := range cases {
t.Run(testCase.name, func(t *testing.T) { t.Run(testCase.name, func(t *testing.T) {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// recursive logic
data := testCase.data() data := testCase.data()
if testCase.recursive { if testCase.recursive {
@ -384,7 +442,58 @@ func TestAdd(t *testing.T) {
}) })
} }
p, err := api.Unixfs().Add(ctx, data, testCase.opts...) // handle events if relevant to test case
opts := testCase.opts
eventOut := make(chan interface{})
var evtWg sync.WaitGroup
if len(testCase.events) > 0 {
opts = append(opts, options.Unixfs.Events(eventOut))
evtWg.Add(1)
go func() {
defer evtWg.Done()
expected := testCase.events
for evt := range eventOut {
event, ok := evt.(*coreiface.AddEvent)
if !ok {
t.Fatal("unexpected event type")
}
if len(expected) < 1 {
t.Fatal("got more events than expected")
}
if expected[0].Size != event.Size {
t.Errorf("Event.Size didn't match, %s != %s", expected[0].Size, event.Size)
}
if expected[0].Name != event.Name {
t.Errorf("Event.Name didn't match, %s != %s", expected[0].Name, event.Name)
}
if expected[0].Hash != event.Hash {
t.Errorf("Event.Hash didn't match, %s != %s", expected[0].Hash, event.Hash)
}
if expected[0].Bytes != event.Bytes {
t.Errorf("Event.Bytes didn't match, %d != %d", expected[0].Bytes, event.Bytes)
}
expected = expected[1:]
}
if len(expected) > 0 {
t.Fatalf("%d event(s) didn't arrive", len(expected))
}
}()
}
// Add!
p, err := api.Unixfs().Add(ctx, data, opts...)
close(eventOut)
evtWg.Wait()
if testCase.err != "" { if testCase.err != "" {
if err == nil { if err == nil {
t.Fatalf("expected an error: %s", testCase.err) t.Fatalf("expected an error: %s", testCase.err)
@ -402,6 +511,8 @@ func TestAdd(t *testing.T) {
t.Errorf("expected path %s, got: %s", testCase.path, p) t.Errorf("expected path %s, got: %s", testCase.path, p)
} }
// compare file structure with Unixfs().Get
var cmpFile func(orig files.File, got files.File) var cmpFile func(orig files.File, got files.File)
cmpFile = func(orig files.File, got files.File) { cmpFile = func(orig files.File, got files.File) {
if orig.IsDirectory() != got.IsDirectory() { if orig.IsDirectory() != got.IsDirectory() {

View File

@ -11,19 +11,20 @@ import (
"strconv" "strconv"
core "github.com/ipfs/go-ipfs/core" core "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
"github.com/ipfs/go-ipfs/pin" "github.com/ipfs/go-ipfs/pin"
unixfs "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs"
balanced "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/balanced"
ihelper "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/helpers"
trickle "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/trickle"
dag "gx/ipfs/QmcBoNcAP6qDjgRBew7yjvCqHq7p5jMstE44jPUBWBxzsV/go-merkledag"
posinfo "gx/ipfs/QmPG32VXR5jmpo9q8R9FNdR4Ae97Ky9CiZE6SctJLUB79H/go-ipfs-posinfo" posinfo "gx/ipfs/QmPG32VXR5jmpo9q8R9FNdR4Ae97Ky9CiZE6SctJLUB79H/go-ipfs-posinfo"
cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid"
files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files" files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files"
unixfs "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs"
balanced "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/balanced"
ihelper "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/helpers"
trickle "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/trickle"
chunker "gx/ipfs/QmULKgr55cSWR8Kiwy3cVRcAiGVnR6EVSaB7hJcWS4138p/go-ipfs-chunker" chunker "gx/ipfs/QmULKgr55cSWR8Kiwy3cVRcAiGVnR6EVSaB7hJcWS4138p/go-ipfs-chunker"
logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log" logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log"
mfs "gx/ipfs/QmahrY1adY4wvtYEtoGjpZ2GUohTyukrkMkwUR9ytRjTG2/go-mfs" mfs "gx/ipfs/QmahrY1adY4wvtYEtoGjpZ2GUohTyukrkMkwUR9ytRjTG2/go-mfs"
dag "gx/ipfs/QmcBoNcAP6qDjgRBew7yjvCqHq7p5jMstE44jPUBWBxzsV/go-merkledag"
ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format"
bstore "gx/ipfs/QmdriVJgKx4JADRgh3cYPXqXmsa1A45SvFki1nDWHhQNtC/go-ipfs-blockstore" bstore "gx/ipfs/QmdriVJgKx4JADRgh3cYPXqXmsa1A45SvFki1nDWHhQNtC/go-ipfs-blockstore"
) )
@ -46,13 +47,6 @@ type Object struct {
Size string Size string
} }
type AddedObject struct {
Name string
Hash string `json:",omitempty"`
Bytes int64 `json:",omitempty"`
Size string `json:",omitempty"`
}
// NewAdder Returns a new Adder used for a file add operation. // NewAdder Returns a new Adder used for a file add operation.
func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds ipld.DAGService) (*Adder, error) { func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds ipld.DAGService) (*Adder, error) {
return &Adder{ return &Adder{
@ -75,7 +69,7 @@ type Adder struct {
pinning pin.Pinner pinning pin.Pinner
blockstore bstore.GCBlockstore blockstore bstore.GCBlockstore
dagService ipld.DAGService dagService ipld.DAGService
Out chan interface{} Out chan<- interface{}
Progress bool Progress bool
Hidden bool Hidden bool
Pin bool Pin bool
@ -570,7 +564,7 @@ func (adder *Adder) maybePauseForGC() error {
} }
// outputDagnode sends dagnode info over the output channel // outputDagnode sends dagnode info over the output channel
func outputDagnode(out chan interface{}, name string, dn ipld.Node) error { func outputDagnode(out chan<- interface{}, name string, dn ipld.Node) error {
if out == nil { if out == nil {
return nil return nil
} }
@ -580,7 +574,7 @@ func outputDagnode(out chan interface{}, name string, dn ipld.Node) error {
return err return err
} }
out <- &AddedObject{ out <- &coreiface.AddEvent{
Hash: o.Hash, Hash: o.Hash,
Name: name, Name: name,
Size: o.Size, Size: o.Size,
@ -615,7 +609,7 @@ func getOutput(dagnode ipld.Node) (*Object, error) {
type progressReader struct { type progressReader struct {
file files.File file files.File
out chan interface{} out chan<- interface{}
bytes int64 bytes int64
lastProgress int64 lastProgress int64
} }
@ -626,7 +620,7 @@ func (i *progressReader) Read(p []byte) (int, error) {
i.bytes += int64(n) i.bytes += int64(n)
if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF { if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF {
i.lastProgress = i.bytes i.lastProgress = i.bytes
i.out <- &AddedObject{ i.out <- &coreiface.AddEvent{
Name: i.file.FileName(), Name: i.file.FileName(),
Bytes: i.bytes, Bytes: i.bytes,
} }

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"github.com/ipfs/go-ipfs/core" "github.com/ipfs/go-ipfs/core"
coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface"
"github.com/ipfs/go-ipfs/pin/gc" "github.com/ipfs/go-ipfs/pin/gc"
"github.com/ipfs/go-ipfs/repo" "github.com/ipfs/go-ipfs/repo"
@ -96,7 +97,7 @@ func TestAddGCLive(t *testing.T) {
addedHashes := make(map[string]struct{}) addedHashes := make(map[string]struct{})
select { select {
case o := <-out: case o := <-out:
addedHashes[o.(*AddedObject).Hash] = struct{}{} addedHashes[o.(*coreiface.AddEvent).Hash] = struct{}{}
case <-addDone: case <-addDone:
t.Fatal("add shouldnt complete yet") t.Fatal("add shouldnt complete yet")
} }
@ -124,7 +125,7 @@ func TestAddGCLive(t *testing.T) {
// receive next object from adder // receive next object from adder
o := <-out o := <-out
addedHashes[o.(*AddedObject).Hash] = struct{}{} addedHashes[o.(*coreiface.AddEvent).Hash] = struct{}{}
<-gcstarted <-gcstarted
@ -140,7 +141,7 @@ func TestAddGCLive(t *testing.T) {
var last cid.Cid var last cid.Cid
for a := range out { for a := range out {
// wait for it to finish // wait for it to finish
c, err := cid.Decode(a.(*AddedObject).Hash) c, err := cid.Decode(a.(*coreiface.AddEvent).Hash)
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
@ -178,7 +179,8 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {
if err != nil { if err != nil {
t.Fatal(err) t.Fatal(err)
} }
adder.Out = make(chan interface{}) out := make(chan interface{})
adder.Out = out
adder.Progress = true adder.Progress = true
adder.RawLeaves = rawLeaves adder.RawLeaves = rawLeaves
adder.NoCopy = true adder.NoCopy = true
@ -196,7 +198,7 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) {
t.Fatal(err) t.Fatal(err)
} }
}() }()
for range adder.Out { for range out {
} }
exp := 0 exp := 0