1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-20 10:42:11 +08:00

fix context respect through fuse reading

This commit is contained in:
Jeromy
2015-03-23 14:01:42 -07:00
parent d8bc95f43e
commit 13c489eca1
8 changed files with 65 additions and 36 deletions

View File

@ -254,7 +254,7 @@ func (rw *RefWriter) writeRefsRecursive(n *dag.Node) (int, error) {
return count, err return count, err
} }
nd, err := ng.Get() nd, err := ng.Get(rw.Ctx)
if err != nil { if err != nil {
return count, err return count, err
} }

View File

@ -6,7 +6,6 @@ package ipns
import ( import (
"errors" "errors"
"io"
"os" "os"
fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse" fuse "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse"
@ -274,32 +273,26 @@ func (dir *Directory) ReadDirAll(ctx context.Context) ([]fuse.Dirent, error) {
} }
func (fi *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error { func (fi *File) Read(ctx context.Context, req *fuse.ReadRequest, resp *fuse.ReadResponse) error {
errs := make(chan error, 1)
go func() {
_, err := fi.fi.Seek(req.Offset, os.SEEK_SET) _, err := fi.fi.Seek(req.Offset, os.SEEK_SET)
if err != nil { if err != nil {
errs <- err return err
return
} }
fisize, err := fi.fi.Size() fisize, err := fi.fi.Size()
if err != nil { if err != nil {
errs <- err return err
return
} }
readsize := min(req.Size, int(fisize-req.Offset))
n, err := io.ReadFull(fi.fi, resp.Data[:readsize])
resp.Data = resp.Data[:n]
errs <- err
}()
select { select {
case <-ctx.Done(): case <-ctx.Done():
return ctx.Err() return ctx.Err()
case err := <-errs: default:
return err
} }
readsize := min(req.Size, int(fisize-req.Offset))
n, err := fi.fi.CtxReadFull(ctx, resp.Data[:readsize])
resp.Data = resp.Data[:n]
return err
} }
func (fi *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error { func (fi *File) Write(ctx context.Context, req *fuse.WriteRequest, resp *fuse.WriteResponse) error {

View File

@ -51,6 +51,13 @@ func (fi *File) Read(b []byte) (int, error) {
return fi.mod.Read(b) return fi.mod.Read(b)
} }
// Read reads into the given buffer from the current offset
func (fi *File) CtxReadFull(ctx context.Context, b []byte) (int, error) {
fi.Lock()
defer fi.Unlock()
return fi.mod.CtxReadFull(ctx, b)
}
// Close flushes, then propogates the modified dag node up the directory structure // Close flushes, then propogates the modified dag node up the directory structure
// and signals a republish to occur // and signals a republish to occur
func (fi *File) Close() error { func (fi *File) Close() error {

View File

@ -257,10 +257,10 @@ type nodePromise struct {
// from its internal channels, subsequent calls will return the // from its internal channels, subsequent calls will return the
// cached node. // cached node.
type NodeGetter interface { type NodeGetter interface {
Get() (*Node, error) Get(context.Context) (*Node, error)
} }
func (np *nodePromise) Get() (*Node, error) { func (np *nodePromise) Get(ctx context.Context) (*Node, error) {
if np.cache != nil { if np.cache != nil {
return np.cache, nil return np.cache, nil
} }
@ -270,6 +270,8 @@ func (np *nodePromise) Get() (*Node, error) {
np.cache = blk np.cache = blk
case <-np.ctx.Done(): case <-np.ctx.Done():
return nil, np.ctx.Err() return nil, np.ctx.Err()
case <-ctx.Done():
return nil, ctx.Err()
} }
return np.cache, nil return np.cache, nil
} }

View File

@ -177,7 +177,7 @@ func (p *pinner) pinLinks(node *mdag.Node) error {
defer cancel() defer cancel()
for _, ng := range p.dserv.GetDAG(ctx, node) { for _, ng := range p.dserv.GetDAG(ctx, node) {
subnode, err := ng.Get() subnode, err := ng.Get(ctx)
if err != nil { if err != nil {
// TODO: Maybe just log and continue? // TODO: Maybe just log and continue?
return err return err

View File

@ -100,12 +100,13 @@ func newDataFileReader(ctx context.Context, n *mdag.Node, pb *ftpb.Data, serv md
// precalcNextBuf follows the next link in line and loads it from the DAGService, // precalcNextBuf follows the next link in line and loads it from the DAGService,
// setting the next buffer to read from // setting the next buffer to read from
func (dr *DagReader) precalcNextBuf() error { func (dr *DagReader) precalcNextBuf(ctx context.Context) error {
dr.buf.Close() // Just to make sure dr.buf.Close() // Just to make sure
if dr.linkPosition >= len(dr.promises) { if dr.linkPosition >= len(dr.promises) {
return io.EOF return io.EOF
} }
nxt, err := dr.promises[dr.linkPosition].Get()
nxt, err := dr.promises[dr.linkPosition].Get(ctx)
if err != nil { if err != nil {
return err return err
} }
@ -141,6 +142,11 @@ func (dr *DagReader) Size() int64 {
// Read reads data from the DAG structured file // Read reads data from the DAG structured file
func (dr *DagReader) Read(b []byte) (int, error) { func (dr *DagReader) Read(b []byte) (int, error) {
return dr.CtxReadFull(dr.ctx, b)
}
// CtxReadFull reads data from the DAG structured file
func (dr *DagReader) CtxReadFull(ctx context.Context, b []byte) (int, error) {
// If no cached buffer, load one // If no cached buffer, load one
total := 0 total := 0
for { for {
@ -161,7 +167,7 @@ func (dr *DagReader) Read(b []byte) (int, error) {
} }
// Otherwise, load up the next block // Otherwise, load up the next block
err = dr.precalcNextBuf() err = dr.precalcNextBuf(ctx)
if err != nil { if err != nil {
return total, err return total, err
} }
@ -183,7 +189,7 @@ func (dr *DagReader) WriteTo(w io.Writer) (int64, error) {
} }
// Otherwise, load up the next block // Otherwise, load up the next block
err = dr.precalcNextBuf() err = dr.precalcNextBuf(dr.ctx)
if err != nil { if err != nil {
if err == io.EOF { if err == io.EOF {
return total, nil return total, nil
@ -239,7 +245,7 @@ func (dr *DagReader) Seek(offset int64, whence int) (int64, error) {
} }
// start sub-block request // start sub-block request
err := dr.precalcNextBuf() err := dr.precalcNextBuf(dr.ctx)
if err != nil { if err != nil {
return 0, err return 0, err
} }

View File

@ -315,32 +315,53 @@ func (dm *DagModifier) appendData(node *mdag.Node, blks <-chan []byte) (*mdag.No
// Read data from this dag starting at the current offset // Read data from this dag starting at the current offset
func (dm *DagModifier) Read(b []byte) (int, error) { func (dm *DagModifier) Read(b []byte) (int, error) {
err := dm.Sync() err := dm.readPrep()
if err != nil { if err != nil {
return 0, err return 0, err
} }
n, err := dm.read.Read(b)
dm.curWrOff += uint64(n)
return n, err
}
func (dm *DagModifier) readPrep() error {
err := dm.Sync()
if err != nil {
return err
}
if dm.read == nil { if dm.read == nil {
ctx, cancel := context.WithCancel(dm.ctx) ctx, cancel := context.WithCancel(dm.ctx)
dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv) dr, err := uio.NewDagReader(ctx, dm.curNode, dm.dagserv)
if err != nil { if err != nil {
return 0, err return err
} }
i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET) i, err := dr.Seek(int64(dm.curWrOff), os.SEEK_SET)
if err != nil { if err != nil {
return 0, err return err
} }
if i != int64(dm.curWrOff) { if i != int64(dm.curWrOff) {
return 0, ErrSeekFail return ErrSeekFail
} }
dm.readCancel = cancel dm.readCancel = cancel
dm.read = dr dm.read = dr
} }
n, err := dm.read.Read(b) return nil
}
// Read data from this dag starting at the current offset
func (dm *DagModifier) CtxReadFull(ctx context.Context, b []byte) (int, error) {
err := dm.readPrep()
if err != nil {
return 0, err
}
n, err := dm.read.CtxReadFull(ctx, b)
dm.curWrOff += uint64(n) dm.curWrOff += uint64(n)
return n, err return n, err
} }

View File

@ -90,7 +90,7 @@ func (r *Reader) writeToBuf(dagnode *mdag.Node, path string, depth int) {
defer cancel() defer cancel()
for i, ng := range r.dag.GetDAG(ctx, dagnode) { for i, ng := range r.dag.GetDAG(ctx, dagnode) {
childNode, err := ng.Get() childNode, err := ng.Get(ctx)
if err != nil { if err != nil {
r.emitError(err) r.emitError(err)
return return