diff --git a/core/commands/add.go b/core/commands/add.go index d7d6b33a2..df811af54 100644 --- a/core/commands/add.go +++ b/core/commands/add.go @@ -8,6 +8,7 @@ import ( core "github.com/ipfs/go-ipfs/core" cmdenv "github.com/ipfs/go-ipfs/core/commands/cmdenv" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" "github.com/ipfs/go-ipfs/core/coreunix" filestore "github.com/ipfs/go-ipfs/filestore" ft "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs" @@ -371,7 +372,7 @@ You can now check what blocks have been created by: break LOOP } - output := out.(*coreunix.AddedObject) + output := out.(*coreiface.AddEvent) if len(output.Hash) > 0 { lastHash = output.Hash if quieter { @@ -451,5 +452,5 @@ You can now check what blocks have been created by: } }, }, - Type: coreunix.AddedObject{}, + Type: coreiface.AddEvent{}, } diff --git a/core/commands/tar.go b/core/commands/tar.go index a2fa3da20..670679032 100644 --- a/core/commands/tar.go +++ b/core/commands/tar.go @@ -7,7 +7,7 @@ import ( cmds "github.com/ipfs/go-ipfs/commands" core "github.com/ipfs/go-ipfs/core" e "github.com/ipfs/go-ipfs/core/commands/e" - "github.com/ipfs/go-ipfs/core/coreunix" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" tar "github.com/ipfs/go-ipfs/tar" dag "gx/ipfs/QmcBoNcAP6qDjgRBew7yjvCqHq7p5jMstE44jPUBWBxzsV/go-merkledag" path "gx/ipfs/QmcjwUb36Z16NJkvDX6ccXPqsFswo6AsRXynyXcLLCphV2/go-path" @@ -60,12 +60,12 @@ represent it. c := node.Cid() fi.FileName() - res.SetOutput(&coreunix.AddedObject{ + res.SetOutput(&coreiface.AddEvent{ Name: fi.FileName(), Hash: c.String(), }) }, - Type: coreunix.AddedObject{}, + Type: coreiface.AddEvent{}, Marshalers: cmds.MarshalerMap{ cmds.Text: func(res cmds.Response) (io.Reader, error) { v, err := unwrapOutput(res.Output()) @@ -73,7 +73,7 @@ represent it. return nil, err } - o, ok := v.(*coreunix.AddedObject) + o, ok := v.(*coreiface.AddEvent) if !ok { return nil, e.TypeErr(o, v) } diff --git a/core/coreapi/interface/options/unixfs.go b/core/coreapi/interface/options/unixfs.go index 90ad53e9e..da99b42f6 100644 --- a/core/coreapi/interface/options/unixfs.go +++ b/core/coreapi/interface/options/unixfs.go @@ -35,6 +35,10 @@ type UnixfsAddSettings struct { Wrap bool Hidden bool StdinName string + + Events chan<- interface{} + Silent bool + Progress bool } type UnixfsAddOption func(*UnixfsAddSettings) error @@ -59,6 +63,10 @@ func UnixfsAddOptions(opts ...UnixfsAddOption) (*UnixfsAddSettings, cid.Prefix, Wrap: false, Hidden: false, StdinName: "", + + Events: nil, + Silent: false, + Progress: false, } for _, opt := range opts { @@ -236,3 +244,30 @@ func (unixfsOpts) StdinName(name string) UnixfsAddOption { return nil } } + +// Events specifies channel which will be used to report events about ongoing +// Add operation. +// +// Note that if this channel blocks it may slowdown the adder +func (unixfsOpts) Events(sink chan<- interface{}) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Events = sink + return nil + } +} + +// Silent reduces event output +func (unixfsOpts) Silent(silent bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Silent = silent + return nil + } +} + +// Progress tells the adder whether to enable progress events +func (unixfsOpts) Progress(enable bool) UnixfsAddOption { + return func(settings *UnixfsAddSettings) error { + settings.Progress = enable + return nil + } +} diff --git a/core/coreapi/interface/unixfs.go b/core/coreapi/interface/unixfs.go index 69e731822..c622e210e 100644 --- a/core/coreapi/interface/unixfs.go +++ b/core/coreapi/interface/unixfs.go @@ -9,6 +9,14 @@ import ( ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" ) +// TODO: ideas on making this more coreapi-ish without breaking the http API? +type AddEvent struct { + Name string + Hash string `json:",omitempty"` + Bytes int64 `json:",omitempty"` + Size string `json:",omitempty"` +} + // UnixfsAPI is the basic interface to immutable files in IPFS // NOTE: This API is heavily WIP, things are guaranteed to break frequently type UnixfsAPI interface { @@ -24,6 +32,7 @@ type UnixfsAPI interface { Get(context.Context, Path) (files.File, error) // Cat returns a reader for the file + // TODO: Remove in favour of Get (if we use Get on a file we still have reader directly, so..) Cat(context.Context, Path) (Reader, error) // Ls returns the list of links in a directory diff --git a/core/coreapi/unixfs.go b/core/coreapi/unixfs.go index fc971bf6e..4bd60d159 100644 --- a/core/coreapi/unixfs.go +++ b/core/coreapi/unixfs.go @@ -64,11 +64,14 @@ func (api *UnixfsAPI) Add(ctx context.Context, files files.File, opts ...options } fileAdder.Chunker = settings.Chunker - //fileAdder.Progress = progress + if settings.Events != nil { + fileAdder.Out = settings.Events + fileAdder.Progress = settings.Progress + } fileAdder.Hidden = settings.Hidden fileAdder.Wrap = settings.Wrap fileAdder.Pin = settings.Pin && !settings.OnlyHash - fileAdder.Silent = true + fileAdder.Silent = settings.Silent fileAdder.RawLeaves = settings.RawLeaves //fileAdder.NoCopy = nocopy fileAdder.Name = settings.StdinName diff --git a/core/coreapi/unixfs_test.go b/core/coreapi/unixfs_test.go index ba92e0484..eb02b3e51 100644 --- a/core/coreapi/unixfs_test.go +++ b/core/coreapi/unixfs_test.go @@ -9,7 +9,9 @@ import ( "io/ioutil" "math" "os" + "strconv" "strings" + "sync" "testing" "github.com/ipfs/go-ipfs/core" @@ -147,6 +149,13 @@ func twoLevelDir() func() files.File { } } +func flatDir() files.File { + return files.NewSliceFile("t", "t", []files.File{ + files.NewReaderFile("t/bar", "t/bar", ioutil.NopCloser(strings.NewReader("hello2")), nil), + files.NewReaderFile("t/foo", "t/foo", ioutil.NopCloser(strings.NewReader("hello1")), nil), + }) +} + func wrapped(f files.File) files.File { return files.NewSliceFile("", "", []files.File{ f, @@ -170,6 +179,8 @@ func TestAdd(t *testing.T) { recursive bool + events []coreiface.AddEvent + opts []options.UnixfsAddOption }{ // Simple cases @@ -263,13 +274,8 @@ func TestAdd(t *testing.T) { }, // multi file { - name: "simpleDir", - data: func() files.File { - return files.NewSliceFile("t", "t", []files.File{ - files.NewReaderFile("t/bar", "t/bar", ioutil.NopCloser(strings.NewReader("hello2")), nil), - files.NewReaderFile("t/foo", "t/foo", ioutil.NopCloser(strings.NewReader("hello1")), nil), - }) - }, + name: "simpleDir", + data: flatDir, recursive: true, path: "/ipfs/QmRKGpFfR32FVXdvJiHfo4WJ5TDYBsM1P9raAp1p6APWSp", }, @@ -300,7 +306,7 @@ func TestAdd(t *testing.T) { files.NewReaderFile("QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk", "QmQy2Dw4Wk7rdJKjThjYXzfFJNaRKRHhHP5gHHXroJMYxk", ioutil.NopCloser(strings.NewReader(helloStr)), nil), }) }, - opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true)}, + opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true)}, }, { name: "stdinNamed", @@ -313,7 +319,7 @@ func TestAdd(t *testing.T) { files.NewReaderFile("test", "test", ioutil.NopCloser(strings.NewReader(helloStr)), nil), }) }, - opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true), options.Unixfs.StdinName("test")}, + opts: []options.UnixfsAddOption{options.Unixfs.Wrap(true), options.Unixfs.StdinName("test")}, }, { name: "twoLevelDirWrapped", @@ -363,19 +369,71 @@ func TestAdd(t *testing.T) { }) }, expect: func(files.File) files.File { - return files.NewSliceFile("t", "t", []files.File{ - files.NewReaderFile("t/bar", "t/bar", ioutil.NopCloser(strings.NewReader("hello2")), nil), - files.NewReaderFile("t/foo", "t/foo", ioutil.NopCloser(strings.NewReader("hello1")), nil), - }) + return flatDir() }, recursive: true, path: "/ipfs/QmRKGpFfR32FVXdvJiHfo4WJ5TDYBsM1P9raAp1p6APWSp", opts: []options.UnixfsAddOption{options.Unixfs.Hidden(false)}, }, + // Events / Progress + { + name: "simpleAddEvent", + data: strFile(helloStr), + path: "/ipfs/zb2rhdhmJjJZs9qkhQCpCQ7VREFkqWw3h1r8utjVvQugwHPFd", + events: []coreiface.AddEvent{ + {Name: "zb2rhdhmJjJZs9qkhQCpCQ7VREFkqWw3h1r8utjVvQugwHPFd", Hash: "zb2rhdhmJjJZs9qkhQCpCQ7VREFkqWw3h1r8utjVvQugwHPFd", Size: strconv.Itoa(len(helloStr))}, + }, + opts: []options.UnixfsAddOption{options.Unixfs.RawLeaves(true)}, + }, + { + name: "silentAddEvent", + data: twoLevelDir(), + path: "/ipfs/QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", + events: []coreiface.AddEvent{ + {Name: "t/abc", Hash: "QmU7nuGs2djqK99UNsNgEPGh6GV4662p6WtsgccBNGTDxt", Size: "62"}, + {Name: "t", Hash: "QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", Size: "229"}, + }, + recursive: true, + opts: []options.UnixfsAddOption{options.Unixfs.Silent(true)}, + }, + { + name: "dirAddEvents", + data: twoLevelDir(), + path: "/ipfs/QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", + events: []coreiface.AddEvent{ + {Name: "t/abc/def", Hash: "QmNyJpQkU1cEkBwMDhDNFstr42q55mqG5GE5Mgwug4xyGk", Size: "13"}, + {Name: "t/bar", Hash: "QmS21GuXiRMvJKHos4ZkEmQDmRBqRaF5tQS2CQCu2ne9sY", Size: "14"}, + {Name: "t/foo", Hash: "QmfAjGiVpTN56TXi6SBQtstit5BEw3sijKj1Qkxn6EXKzJ", Size: "14"}, + {Name: "t/abc", Hash: "QmU7nuGs2djqK99UNsNgEPGh6GV4662p6WtsgccBNGTDxt", Size: "62"}, + {Name: "t", Hash: "QmVG2ZYCkV1S4TK8URA3a4RupBF17A8yAr4FqsRDXVJASr", Size: "229"}, + }, + recursive: true, + }, + { + name: "progress1M", + data: func() files.File { + r := bytes.NewReader(bytes.Repeat([]byte{0}, 1000000)) + return files.NewReaderFile("", "", ioutil.NopCloser(r), nil) + }, + path: "/ipfs/QmXXNNbwe4zzpdMg62ZXvnX1oU7MwSrQ3vAEtuwFKCm1oD", + events: []coreiface.AddEvent{ + {Name: "", Bytes: 262144}, + {Name: "", Bytes: 524288}, + {Name: "", Bytes: 786432}, + {Name: "", Bytes: 1000000}, + {Name: "QmXXNNbwe4zzpdMg62ZXvnX1oU7MwSrQ3vAEtuwFKCm1oD", Hash: "QmXXNNbwe4zzpdMg62ZXvnX1oU7MwSrQ3vAEtuwFKCm1oD", Size: "1000256"}, + }, + recursive: true, + opts: []options.UnixfsAddOption{options.Unixfs.Progress(true)}, + }, } for _, testCase := range cases { t.Run(testCase.name, func(t *testing.T) { + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + // recursive logic data := testCase.data() if testCase.recursive { @@ -384,7 +442,58 @@ func TestAdd(t *testing.T) { }) } - p, err := api.Unixfs().Add(ctx, data, testCase.opts...) + // handle events if relevant to test case + + opts := testCase.opts + eventOut := make(chan interface{}) + var evtWg sync.WaitGroup + if len(testCase.events) > 0 { + opts = append(opts, options.Unixfs.Events(eventOut)) + evtWg.Add(1) + + go func() { + defer evtWg.Done() + expected := testCase.events + + for evt := range eventOut { + event, ok := evt.(*coreiface.AddEvent) + if !ok { + t.Fatal("unexpected event type") + } + + if len(expected) < 1 { + t.Fatal("got more events than expected") + } + + if expected[0].Size != event.Size { + t.Errorf("Event.Size didn't match, %s != %s", expected[0].Size, event.Size) + } + + if expected[0].Name != event.Name { + t.Errorf("Event.Name didn't match, %s != %s", expected[0].Name, event.Name) + } + + if expected[0].Hash != event.Hash { + t.Errorf("Event.Hash didn't match, %s != %s", expected[0].Hash, event.Hash) + } + if expected[0].Bytes != event.Bytes { + t.Errorf("Event.Bytes didn't match, %d != %d", expected[0].Bytes, event.Bytes) + } + + expected = expected[1:] + } + + if len(expected) > 0 { + t.Fatalf("%d event(s) didn't arrive", len(expected)) + } + }() + } + + // Add! + + p, err := api.Unixfs().Add(ctx, data, opts...) + close(eventOut) + evtWg.Wait() if testCase.err != "" { if err == nil { t.Fatalf("expected an error: %s", testCase.err) @@ -402,6 +511,8 @@ func TestAdd(t *testing.T) { t.Errorf("expected path %s, got: %s", testCase.path, p) } + // compare file structure with Unixfs().Get + var cmpFile func(orig files.File, got files.File) cmpFile = func(orig files.File, got files.File) { if orig.IsDirectory() != got.IsDirectory() { diff --git a/core/coreunix/add.go b/core/coreunix/add.go index d5a152f57..84324c7ec 100644 --- a/core/coreunix/add.go +++ b/core/coreunix/add.go @@ -11,19 +11,20 @@ import ( "strconv" core "github.com/ipfs/go-ipfs/core" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" "github.com/ipfs/go-ipfs/pin" - unixfs "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs" - balanced "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/balanced" - ihelper "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/helpers" - trickle "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/trickle" - dag "gx/ipfs/QmcBoNcAP6qDjgRBew7yjvCqHq7p5jMstE44jPUBWBxzsV/go-merkledag" posinfo "gx/ipfs/QmPG32VXR5jmpo9q8R9FNdR4Ae97Ky9CiZE6SctJLUB79H/go-ipfs-posinfo" cid "gx/ipfs/QmPSQnBKM9g7BaUcZCvswUJVscQ1ipjmwxN5PXCjkp9EQ7/go-cid" files "gx/ipfs/QmSP88ryZkHSRn1fnngAaV2Vcn63WUJzAavnRM9CVdU1Ky/go-ipfs-cmdkit/files" + unixfs "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs" + balanced "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/balanced" + ihelper "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/helpers" + trickle "gx/ipfs/QmU4x3742bvgfxJsByEDpBnifJqjJdV6x528co4hwKCn46/go-unixfs/importer/trickle" chunker "gx/ipfs/QmULKgr55cSWR8Kiwy3cVRcAiGVnR6EVSaB7hJcWS4138p/go-ipfs-chunker" logging "gx/ipfs/QmZChCsSt8DctjceaL56Eibc29CVQq4dGKRXC5JRZ6Ppae/go-log" mfs "gx/ipfs/QmahrY1adY4wvtYEtoGjpZ2GUohTyukrkMkwUR9ytRjTG2/go-mfs" + dag "gx/ipfs/QmcBoNcAP6qDjgRBew7yjvCqHq7p5jMstE44jPUBWBxzsV/go-merkledag" ipld "gx/ipfs/QmdDXJs4axxefSPgK6Y1QhpJWKuDPnGJiqgq4uncb4rFHL/go-ipld-format" bstore "gx/ipfs/QmdriVJgKx4JADRgh3cYPXqXmsa1A45SvFki1nDWHhQNtC/go-ipfs-blockstore" ) @@ -46,13 +47,6 @@ type Object struct { Size string } -type AddedObject struct { - Name string - Hash string `json:",omitempty"` - Bytes int64 `json:",omitempty"` - Size string `json:",omitempty"` -} - // NewAdder Returns a new Adder used for a file add operation. func NewAdder(ctx context.Context, p pin.Pinner, bs bstore.GCBlockstore, ds ipld.DAGService) (*Adder, error) { return &Adder{ @@ -75,7 +69,7 @@ type Adder struct { pinning pin.Pinner blockstore bstore.GCBlockstore dagService ipld.DAGService - Out chan interface{} + Out chan<- interface{} Progress bool Hidden bool Pin bool @@ -570,7 +564,7 @@ func (adder *Adder) maybePauseForGC() error { } // outputDagnode sends dagnode info over the output channel -func outputDagnode(out chan interface{}, name string, dn ipld.Node) error { +func outputDagnode(out chan<- interface{}, name string, dn ipld.Node) error { if out == nil { return nil } @@ -580,7 +574,7 @@ func outputDagnode(out chan interface{}, name string, dn ipld.Node) error { return err } - out <- &AddedObject{ + out <- &coreiface.AddEvent{ Hash: o.Hash, Name: name, Size: o.Size, @@ -615,7 +609,7 @@ func getOutput(dagnode ipld.Node) (*Object, error) { type progressReader struct { file files.File - out chan interface{} + out chan<- interface{} bytes int64 lastProgress int64 } @@ -626,7 +620,7 @@ func (i *progressReader) Read(p []byte) (int, error) { i.bytes += int64(n) if i.bytes-i.lastProgress >= progressReaderIncrement || err == io.EOF { i.lastProgress = i.bytes - i.out <- &AddedObject{ + i.out <- &coreiface.AddEvent{ Name: i.file.FileName(), Bytes: i.bytes, } diff --git a/core/coreunix/add_test.go b/core/coreunix/add_test.go index 5185c4f4d..4af1803b3 100644 --- a/core/coreunix/add_test.go +++ b/core/coreunix/add_test.go @@ -11,6 +11,7 @@ import ( "time" "github.com/ipfs/go-ipfs/core" + coreiface "github.com/ipfs/go-ipfs/core/coreapi/interface" "github.com/ipfs/go-ipfs/pin/gc" "github.com/ipfs/go-ipfs/repo" @@ -96,7 +97,7 @@ func TestAddGCLive(t *testing.T) { addedHashes := make(map[string]struct{}) select { case o := <-out: - addedHashes[o.(*AddedObject).Hash] = struct{}{} + addedHashes[o.(*coreiface.AddEvent).Hash] = struct{}{} case <-addDone: t.Fatal("add shouldnt complete yet") } @@ -124,7 +125,7 @@ func TestAddGCLive(t *testing.T) { // receive next object from adder o := <-out - addedHashes[o.(*AddedObject).Hash] = struct{}{} + addedHashes[o.(*coreiface.AddEvent).Hash] = struct{}{} <-gcstarted @@ -140,7 +141,7 @@ func TestAddGCLive(t *testing.T) { var last cid.Cid for a := range out { // wait for it to finish - c, err := cid.Decode(a.(*AddedObject).Hash) + c, err := cid.Decode(a.(*coreiface.AddEvent).Hash) if err != nil { t.Fatal(err) } @@ -178,7 +179,8 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) { if err != nil { t.Fatal(err) } - adder.Out = make(chan interface{}) + out := make(chan interface{}) + adder.Out = out adder.Progress = true adder.RawLeaves = rawLeaves adder.NoCopy = true @@ -196,7 +198,7 @@ func testAddWPosInfo(t *testing.T, rawLeaves bool) { t.Fatal(err) } }() - for range adder.Out { + for range out { } exp := 0