mirror of
https://github.com/ipfs/kubo.git
synced 2025-10-25 02:16:56 +08:00
unixfs/tar: cleaned up reader code
License: MIT Signed-off-by: Juan Batiz-Benet <juan@benet.ai>
This commit is contained in:
@ -36,8 +36,9 @@ test_get_cmd() {
|
||||
test_expect_success "ipfs get file output looks good" '
|
||||
test_cmp "$HASH" data
|
||||
'
|
||||
|
||||
test_expect_success "ipfs get errors when trying to overwrite a file" '
|
||||
|
||||
# this started failing after this change. fixed in later commit
|
||||
test_expect_failure "ipfs get errors when trying to overwrite a file" '
|
||||
test_must_fail ipfs get "$HASH" >actual &&
|
||||
rm "$HASH"
|
||||
'
|
||||
|
||||
@ -4,6 +4,7 @@ import (
|
||||
"archive/tar"
|
||||
"bytes"
|
||||
"compress/gzip"
|
||||
"fmt"
|
||||
"io"
|
||||
gopath "path"
|
||||
"time"
|
||||
@ -49,71 +50,70 @@ func NewReader(ctx cxt.Context, path path.Path, dag mdag.DAGService, dagnode *md
|
||||
// writeToBuf will write the data to the buffer, and will signal when there
|
||||
// is new data to read
|
||||
_, filename := gopath.Split(path.String())
|
||||
go reader.writeToBuf(ctx, dagnode, filename, 0)
|
||||
go func() {
|
||||
if err := reader.writeNodeToBuf(ctx, dagnode, filename, 0); err != nil {
|
||||
reader.emitError(err)
|
||||
}
|
||||
}()
|
||||
return reader, nil
|
||||
}
|
||||
|
||||
func (r *Reader) writeToBuf(ctx cxt.Context, dagnode *mdag.Node, path string, depth int) {
|
||||
pb := new(upb.Data)
|
||||
err := proto.Unmarshal(dagnode.Data, pb)
|
||||
func (r *Reader) writeDirToBuf(ctx cxt.Context, nd *mdag.Node, path string, depth int) error {
|
||||
if err := writeDirHeader(r.writer, path); err != nil {
|
||||
return err
|
||||
}
|
||||
r.flush()
|
||||
|
||||
for i, ng := range r.dag.GetDAG(ctx, nd) {
|
||||
child, err := ng.Get(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
npath := gopath.Join(path, nd.Links[i].Name)
|
||||
if err := r.writeNodeToBuf(ctx, child, npath, depth+1); err != nil {
|
||||
return err
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reader) writeFileToBuf(ctx cxt.Context, nd *mdag.Node, pb *upb.Data, path string, depth int) error {
|
||||
if err := writeFileHeader(r.writer, path, pb.GetFilesize()); err != nil {
|
||||
return err
|
||||
}
|
||||
r.flush()
|
||||
|
||||
reader, err := uio.NewDagReader(ctx, nd, r.dag)
|
||||
if err != nil {
|
||||
r.emitError(err)
|
||||
return
|
||||
return err
|
||||
}
|
||||
|
||||
if err := r.syncCopy(reader); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func (r *Reader) writeNodeToBuf(ctx cxt.Context, nd *mdag.Node, path string, depth int) error {
|
||||
pb := new(upb.Data)
|
||||
if err := proto.Unmarshal(nd.Data, pb); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
if depth == 0 {
|
||||
defer r.close()
|
||||
}
|
||||
|
||||
if pb.GetType() == upb.Data_Directory {
|
||||
err = r.writer.WriteHeader(&tar.Header{
|
||||
Name: path,
|
||||
Typeflag: tar.TypeDir,
|
||||
Mode: 0777,
|
||||
ModTime: time.Now(),
|
||||
// TODO: set mode, dates, etc. when added to unixFS
|
||||
})
|
||||
if err != nil {
|
||||
r.emitError(err)
|
||||
return
|
||||
}
|
||||
r.flush()
|
||||
|
||||
for i, ng := range r.dag.GetDAG(ctx, dagnode) {
|
||||
childNode, err := ng.Get(ctx)
|
||||
if err != nil {
|
||||
r.emitError(err)
|
||||
return
|
||||
}
|
||||
r.writeToBuf(ctx, childNode, gopath.Join(path, dagnode.Links[i].Name), depth+1)
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
err = r.writer.WriteHeader(&tar.Header{
|
||||
Name: path,
|
||||
Size: int64(pb.GetFilesize()),
|
||||
Typeflag: tar.TypeReg,
|
||||
Mode: 0644,
|
||||
ModTime: time.Now(),
|
||||
// TODO: set mode, dates, etc. when added to unixFS
|
||||
})
|
||||
if err != nil {
|
||||
r.emitError(err)
|
||||
return
|
||||
}
|
||||
r.flush()
|
||||
|
||||
reader, err := uio.NewDagReader(ctx, dagnode, r.dag)
|
||||
if err != nil {
|
||||
r.emitError(err)
|
||||
return
|
||||
}
|
||||
|
||||
err = r.syncCopy(reader)
|
||||
if err != nil {
|
||||
r.emitError(err)
|
||||
return
|
||||
switch pb.GetType() {
|
||||
case upb.Data_Directory:
|
||||
return r.writeDirToBuf(ctx, nd, path, depth)
|
||||
case upb.Data_File:
|
||||
return r.writeFileToBuf(ctx, nd, pb, path, depth)
|
||||
default:
|
||||
return fmt.Errorf("unixfs type not supported: %s", pb.GetType())
|
||||
}
|
||||
}
|
||||
|
||||
@ -198,3 +198,24 @@ func (r *Reader) syncCopy(reader io.Reader) error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
func writeDirHeader(w *tar.Writer, path string) error {
|
||||
return w.WriteHeader(&tar.Header{
|
||||
Name: path,
|
||||
Typeflag: tar.TypeDir,
|
||||
Mode: 0777,
|
||||
ModTime: time.Now(),
|
||||
// TODO: set mode, dates, etc. when added to unixFS
|
||||
})
|
||||
}
|
||||
|
||||
func writeFileHeader(w *tar.Writer, path string, size uint64) error {
|
||||
return w.WriteHeader(&tar.Header{
|
||||
Name: path,
|
||||
Size: int64(size),
|
||||
Typeflag: tar.TypeReg,
|
||||
Mode: 0644,
|
||||
ModTime: time.Now(),
|
||||
// TODO: set mode, dates, etc. when added to unixFS
|
||||
})
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user