1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-05-17 23:16:11 +08:00
Files
kubo/core/coreunix/add_test.go
Jakub Sztandera 42e191c017 gx: unrewrite
License: MIT
Signed-off-by: Jakub Sztandera <kubuxu@protonmail.ch>
2019-03-05 18:33:56 +01:00

258 lines
6.2 KiB
Go

package coreunix
import (
"bytes"
"context"
"io"
"io/ioutil"
"math/rand"
"os"
"path/filepath"
"testing"
"time"
"github.com/ipfs/go-ipfs/core"
"github.com/ipfs/go-ipfs/pin/gc"
"github.com/ipfs/go-ipfs/repo"
blocks "github.com/ipfs/go-block-format"
"github.com/ipfs/go-blockservice"
cid "github.com/ipfs/go-cid"
datastore "github.com/ipfs/go-datastore"
syncds "github.com/ipfs/go-datastore/sync"
blockstore "github.com/ipfs/go-ipfs-blockstore"
config "github.com/ipfs/go-ipfs-config"
files "github.com/ipfs/go-ipfs-files"
pi "github.com/ipfs/go-ipfs-posinfo"
dag "github.com/ipfs/go-merkledag"
coreiface "github.com/ipfs/interface-go-ipfs-core"
)
const testPeerID = "QmTFauExutTsy4XP6JbMFcw2Wa9645HJt2bTqL6qYDCKfe"
func TestAddGCLive(t *testing.T) {
r := &repo.Mock{
C: config.Config{
Identity: config.Identity{
PeerID: testPeerID, // required by offline node
},
},
D: syncds.MutexWrap(datastore.NewMapDatastore()),
}
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
if err != nil {
t.Fatal(err)
}
out := make(chan interface{})
adder, err := NewAdder(context.Background(), node.Pinning, node.Blockstore, node.DAG)
if err != nil {
t.Fatal(err)
}
adder.Out = out
rfa := files.NewBytesFile([]byte("testfileA"))
// make two files with pipes so we can 'pause' the add for timing of the test
piper, pipew := io.Pipe()
hangfile := files.NewReaderFile(piper)
rfd := files.NewBytesFile([]byte("testfileD"))
slf := files.NewMapDirectory(map[string]files.Node{
"a": rfa,
"b": hangfile,
"d": rfd,
})
addDone := make(chan struct{})
go func() {
defer close(addDone)
defer close(out)
_, err := adder.AddAllAndPin(slf)
if err != nil {
t.Fatal(err)
}
}()
addedHashes := make(map[string]struct{})
select {
case o := <-out:
addedHashes[o.(*coreiface.AddEvent).Path.Cid().String()] = struct{}{}
case <-addDone:
t.Fatal("add shouldnt complete yet")
}
var gcout <-chan gc.Result
gcstarted := make(chan struct{})
go func() {
defer close(gcstarted)
gcout = gc.GC(context.Background(), node.Blockstore, node.Repo.Datastore(), node.Pinning, nil)
}()
// gc shouldnt start until we let the add finish its current file.
pipew.Write([]byte("some data for file b"))
select {
case <-gcstarted:
t.Fatal("gc shouldnt have started yet")
default:
}
time.Sleep(time.Millisecond * 100) // make sure gc gets to requesting lock
// finish write and unblock gc
pipew.Close()
// receive next object from adder
o := <-out
addedHashes[o.(*coreiface.AddEvent).Path.Cid().String()] = struct{}{}
<-gcstarted
for r := range gcout {
if r.Error != nil {
t.Fatal(err)
}
if _, ok := addedHashes[r.KeyRemoved.String()]; ok {
t.Fatal("gc'ed a hash we just added")
}
}
var last cid.Cid
for a := range out {
// wait for it to finish
c, err := cid.Decode(a.(*coreiface.AddEvent).Path.Cid().String())
if err != nil {
t.Fatal(err)
}
last = c
}
ctx, cancel := context.WithTimeout(context.Background(), time.Second*5)
defer cancel()
set := cid.NewSet()
err = dag.EnumerateChildren(ctx, dag.GetLinksWithDAG(node.DAG), last, set.Visit)
if err != nil {
t.Fatal(err)
}
}
func testAddWPosInfo(t *testing.T, rawLeaves bool) {
r := &repo.Mock{
C: config.Config{
Identity: config.Identity{
PeerID: testPeerID, // required by offline node
},
},
D: syncds.MutexWrap(datastore.NewMapDatastore()),
}
node, err := core.NewNode(context.Background(), &core.BuildCfg{Repo: r})
if err != nil {
t.Fatal(err)
}
bs := &testBlockstore{GCBlockstore: node.Blockstore, expectedPath: filepath.Join(os.TempDir(), "foo.txt"), t: t}
bserv := blockservice.New(bs, node.Exchange)
dserv := dag.NewDAGService(bserv)
adder, err := NewAdder(context.Background(), node.Pinning, bs, dserv)
if err != nil {
t.Fatal(err)
}
out := make(chan interface{})
adder.Out = out
adder.Progress = true
adder.RawLeaves = rawLeaves
adder.NoCopy = true
data := make([]byte, 5*1024*1024)
rand.New(rand.NewSource(2)).Read(data) // Rand.Read never returns an error
fileData := ioutil.NopCloser(bytes.NewBuffer(data))
fileInfo := dummyFileInfo{"foo.txt", int64(len(data)), time.Now()}
file, _ := files.NewReaderPathFile(filepath.Join(os.TempDir(), "foo.txt"), fileData, &fileInfo)
go func() {
defer close(adder.Out)
_, err = adder.AddAllAndPin(file)
if err != nil {
t.Fatal(err)
}
}()
for range out {
}
exp := 0
nonOffZero := 0
if rawLeaves {
exp = 1
nonOffZero = 19
}
if bs.countAtOffsetZero != exp {
t.Fatalf("expected %d blocks with an offset at zero (one root and one leaf), got %d", exp, bs.countAtOffsetZero)
}
if bs.countAtOffsetNonZero != nonOffZero {
// note: the exact number will depend on the size and the sharding algo. used
t.Fatalf("expected %d blocks with an offset > 0, got %d", nonOffZero, bs.countAtOffsetNonZero)
}
}
func TestAddWPosInfo(t *testing.T) {
testAddWPosInfo(t, false)
}
func TestAddWPosInfoAndRawLeafs(t *testing.T) {
testAddWPosInfo(t, true)
}
type testBlockstore struct {
blockstore.GCBlockstore
expectedPath string
t *testing.T
countAtOffsetZero int
countAtOffsetNonZero int
}
func (bs *testBlockstore) Put(block blocks.Block) error {
bs.CheckForPosInfo(block)
return bs.GCBlockstore.Put(block)
}
func (bs *testBlockstore) PutMany(blocks []blocks.Block) error {
for _, blk := range blocks {
bs.CheckForPosInfo(blk)
}
return bs.GCBlockstore.PutMany(blocks)
}
func (bs *testBlockstore) CheckForPosInfo(block blocks.Block) error {
fsn, ok := block.(*pi.FilestoreNode)
if ok {
posInfo := fsn.PosInfo
if posInfo.FullPath != bs.expectedPath {
bs.t.Fatal("PosInfo does not have the expected path")
}
if posInfo.Offset == 0 {
bs.countAtOffsetZero += 1
} else {
bs.countAtOffsetNonZero += 1
}
}
return nil
}
type dummyFileInfo struct {
name string
size int64
modTime time.Time
}
func (fi *dummyFileInfo) Name() string { return fi.name }
func (fi *dummyFileInfo) Size() int64 { return fi.size }
func (fi *dummyFileInfo) Mode() os.FileMode { return 0 }
func (fi *dummyFileInfo) ModTime() time.Time { return fi.modTime }
func (fi *dummyFileInfo) IsDir() bool { return false }
func (fi *dummyFileInfo) Sys() interface{} { return nil }