From 459e0d53737b97a2f3f3ee38982c4adf863791df Mon Sep 17 00:00:00 2001 From: Juan Batiz-Benet Date: Wed, 8 Oct 2014 02:52:16 -0700 Subject: [PATCH] ipns TestFastRepublish --- fuse/ipns/ipns_test.go | 108 +++++++++++++++++++++++++++++++++++++++- fuse/ipns/ipns_unix.go | 7 ++- fuse/ipns/repub_unix.go | 37 ++++++-------- 3 files changed, 127 insertions(+), 25 deletions(-) diff --git a/fuse/ipns/ipns_test.go b/fuse/ipns/ipns_test.go index c311c5225..8daf14769 100644 --- a/fuse/ipns/ipns_test.go +++ b/fuse/ipns/ipns_test.go @@ -9,7 +9,8 @@ import ( "time" fstest "github.com/jbenet/go-ipfs/Godeps/_workspace/src/bazil.org/fuse/fs/fstestutil" - "github.com/jbenet/go-ipfs/core" + core "github.com/jbenet/go-ipfs/core" + u "github.com/jbenet/go-ipfs/util" ) func randBytes(size int) []byte { @@ -19,7 +20,10 @@ func randBytes(size int) []byte { } func writeFile(t *testing.T, size int, path string) []byte { - data := randBytes(size) + return writeFileData(t, randBytes(size), path) +} + +func writeFileData(t *testing.T, data []byte, path string) []byte { fi, err := os.Create(path) if err != nil { t.Fatal(err) @@ -179,6 +183,106 @@ func TestAppendFile(t *testing.T) { } } +func TestFastRepublish(t *testing.T) { + + // make timeout noticeable. + osrt := shortRepublishTimeout + shortRepublishTimeout = time.Millisecond * 100 + + olrt := longRepublishTimeout + longRepublishTimeout = time.Second + + node, mnt := setupIpnsTest(t, nil) + + h, err := node.Identity.PrivKey.GetPublic().Hash() + if err != nil { + t.Fatal(err) + } + pubkeyHash := u.Key(h).Pretty() + + // set them back + defer func() { + shortRepublishTimeout = osrt + longRepublishTimeout = olrt + mnt.Close() + }() + + closed := make(chan struct{}) + dataA := []byte("aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa") + dataB := []byte("bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb") + + fname := mnt.Dir + "/local/file" + + // get first resolved hash + log.Debug("publishing first hash") + writeFileData(t, dataA, fname) // random + <-time.After(shortRepublishTimeout * 11 / 10) + log.Debug("resolving first hash") + resolvedHash, err := node.Namesys.Resolve(pubkeyHash) + if err != nil { + t.Fatal("resolve err:", pubkeyHash, err) + } + + // constantly keep writing to the file + go func() { + for { + select { + case <-closed: + return + + case <-time.After(shortRepublishTimeout * 8 / 10): + writeFileData(t, dataB, fname) + } + } + }() + + hasPublished := func() bool { + res, err := node.Namesys.Resolve(pubkeyHash) + if err != nil { + t.Fatal("resolve err: %v", err) + } + return res != resolvedHash + } + + // test things + + // at this point, should not have written dataA and not have written dataB + rbuf, err := ioutil.ReadFile(fname) + if err != nil || !bytes.Equal(rbuf, dataA) { + t.Fatal("Data inconsistent! %v %v", err, string(rbuf)) + } + + if hasPublished() { + t.Fatal("published (wrote)") + } + + <-time.After(shortRepublishTimeout * 11 / 10) + + // at this point, should have written written dataB, but not published it + rbuf, err = ioutil.ReadFile(fname) + if err != nil || !bytes.Equal(rbuf, dataB) { + t.Fatal("Data inconsistent! %v %v", err, string(rbuf)) + } + + if hasPublished() { + t.Fatal("published (wrote)") + } + + <-time.After(longRepublishTimeout * 11 / 10) + + // at this point, should have written written dataB, and published it + rbuf, err = ioutil.ReadFile(fname) + if err != nil || !bytes.Equal(rbuf, dataB) { + t.Fatal("Data inconsistent! %v %v", err, string(rbuf)) + } + + if !hasPublished() { + t.Fatal("not published") + } + + close(closed) +} + // Test writing a medium sized file one byte at a time func TestMultiWrite(t *testing.T) { _, mnt := setupIpnsTest(t, nil) diff --git a/fuse/ipns/ipns_unix.go b/fuse/ipns/ipns_unix.go index dc03d5113..a17f68649 100644 --- a/fuse/ipns/ipns_unix.go +++ b/fuse/ipns/ipns_unix.go @@ -21,6 +21,11 @@ import ( var log = u.Logger("ipns") +var ( + shortRepublishTimeout = time.Millisecond * 5 + longRepublishTimeout = time.Millisecond * 500 +) + // FileSystem is the readwrite IPNS Fuse Filesystem. type FileSystem struct { Ipfs *core.IpfsNode @@ -71,7 +76,7 @@ func CreateRoot(n *core.IpfsNode, keys []ci.PrivKey, ipfsroot string) (*Root, er nd := new(Node) nd.Ipfs = n nd.key = k - nd.repub = NewRepublisher(nd, time.Millisecond*5, time.Millisecond*500) + nd.repub = NewRepublisher(nd, shortRepublishTimeout, longRepublishTimeout) go nd.repub.Run() diff --git a/fuse/ipns/repub_unix.go b/fuse/ipns/repub_unix.go index f1a898287..4e807578d 100644 --- a/fuse/ipns/repub_unix.go +++ b/fuse/ipns/repub_unix.go @@ -22,28 +22,21 @@ func (np *Republisher) Run() { for _ = range np.Publish { quick := time.After(np.TimeoutShort) longer := time.After(np.TimeoutLong) - for { - select { - case <-quick: - //Do the publish! - log.Info("Publishing Changes!") - err := np.node.republishRoot() - if err != nil { - log.Critical("republishRoot error: %s", err) - } - goto done - case <-longer: - //Do the publish! - log.Info("Publishing Changes!") - err := np.node.republishRoot() - if err != nil { - log.Critical("republishRoot error: %s", err) - } - goto done - case <-np.Publish: - quick = time.After(np.TimeoutShort) - } + + wait: + select { + case <-quick: + case <-longer: + case <-np.Publish: + quick = time.After(np.TimeoutShort) + goto wait } - done: + + log.Info("Publishing Changes!") + err := np.node.republishRoot() + if err != nil { + log.Critical("republishRoot error: %s", err) + } + } }