1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-06-30 18:13:54 +08:00

flush pinning improvements

License: MIT
Signed-off-by: Jeromy <jeromyj@gmail.com>
This commit is contained in:
Jeromy
2016-01-11 05:37:34 -08:00
parent 1feea8bc5f
commit d2e0d73bd5
4 changed files with 48 additions and 10 deletions

View File

@ -483,8 +483,11 @@ func (n *IpfsNode) loadFilesRoot() error {
return err return err
} }
err = n.Pinning.Pin(n.Context(), nnd, true) if err := n.Pinning.Pin(n.Context(), nnd, true); err != nil {
if err != nil { return err
}
if err := n.Pinning.Flush(); err != nil {
return err return err
} }

View File

@ -197,6 +197,9 @@ func DirLookup(d *Directory, pth string) (FSNode, error) {
func FlushPath(r *Root, pth string) error { func FlushPath(r *Root, pth string) error {
parts := path.SplitList(strings.Trim(pth, "/")) parts := path.SplitList(strings.Trim(pth, "/"))
if len(parts) == 1 && parts[0] == "" {
parts = nil
}
d, ok := r.GetValue().(*Directory) d, ok := r.GetValue().(*Directory)
if !ok { if !ok {
@ -214,12 +217,24 @@ func FlushPath(r *Root, pth string) error {
} }
r.repub.Update(k) r.repub.Update(k)
r.repub.WaitPub()
return nil return nil
} }
func flushPathRec(d *Directory, parts []string) (*dag.Node, error) { func flushPathRec(d *Directory, parts []string) (*dag.Node, error) {
if len(parts) == 0 { if len(parts) == 0 {
return d.GetNode() nd, err := d.GetNode()
if err != nil {
return nil, err
}
_, err = d.dserv.Add(nd)
if err != nil {
return nil, err
}
return nd, nil
} }
d.Lock() d.Lock()
@ -243,6 +258,11 @@ func flushPathRec(d *Directory, parts []string) (*dag.Node, error) {
return nil, err return nil, err
} }
_, err = d.dserv.Add(newnode)
if err != nil {
return nil, err
}
d.node = newnode d.node = newnode
return newnode, nil return newnode, nil
case *File: case *File:

View File

@ -165,7 +165,7 @@ type Republisher struct {
TimeoutShort time.Duration TimeoutShort time.Duration
Publish chan struct{} Publish chan struct{}
pubfunc PubFunc pubfunc PubFunc
pubnowch chan struct{} pubnowch chan chan struct{}
ctx context.Context ctx context.Context
cancel func() cancel func()
@ -190,7 +190,7 @@ func NewRepublisher(ctx context.Context, pf PubFunc, tshort, tlong time.Duration
TimeoutLong: tlong, TimeoutLong: tlong,
Publish: make(chan struct{}, 1), Publish: make(chan struct{}, 1),
pubfunc: pf, pubfunc: pf,
pubnowch: make(chan struct{}), pubnowch: make(chan chan struct{}),
ctx: ctx, ctx: ctx,
cancel: cancel, cancel: cancel,
} }
@ -204,11 +204,17 @@ func (p *Republisher) setVal(k key.Key) {
func (p *Republisher) pubNow() { func (p *Republisher) pubNow() {
select { select {
case p.pubnowch <- struct{}{}: case p.pubnowch <- nil:
default: default:
} }
} }
func (p *Republisher) WaitPub() {
wait := make(chan struct{})
p.pubnowch <- wait
<-wait
}
func (p *Republisher) Close() error { func (p *Republisher) Close() error {
err := p.publish(p.ctx) err := p.publish(p.ctx)
p.cancel() p.cancel()
@ -235,6 +241,8 @@ func (np *Republisher) Run() {
longer := time.After(np.TimeoutLong) longer := time.After(np.TimeoutLong)
wait: wait:
var pubnowresp chan struct{}
select { select {
case <-np.ctx.Done(): case <-np.ctx.Done():
return return
@ -243,10 +251,13 @@ func (np *Republisher) Run() {
goto wait goto wait
case <-quick: case <-quick:
case <-longer: case <-longer:
case <-np.pubnowch: case pubnowresp = <-np.pubnowch:
} }
err := np.publish(np.ctx) err := np.publish(np.ctx)
if pubnowresp != nil {
pubnowresp <- struct{}{}
}
if err != nil { if err != nil {
log.Error("republishRoot error: %s", err) log.Error("republishRoot error: %s", err)
} }

View File

@ -336,9 +336,13 @@ test_files_api() {
test_cmp root_hash_exp root_hash test_cmp root_hash_exp root_hash
' '
test_expect_success "root hash is pinned" ' test_expect_success "flush root succeeds" '
ipfs pin ls ipfs files flush /
return 1 '
test_expect_success "root hash is pinned after flush" '
ipfs pin ls > pins &&
grep $EXP_ROOT_HASH pins || (cat pins && exit 1)
' '
# test mv # test mv