1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-01 10:49:24 +08:00

Merge pull request #2062 from ipfs/fix-t-fatal-goroutine

fix t.Fatal in a goroutine
This commit is contained in:
Juan Benet
2015-12-15 14:13:34 -05:00
4 changed files with 70 additions and 16 deletions

View File

@ -168,19 +168,31 @@ func PerformDistributionTest(t *testing.T, numInstances, numBlocks int) {
t.Log("Distribute!") t.Log("Distribute!")
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
errs := make(chan error)
for _, inst := range instances[1:] { for _, inst := range instances[1:] {
wg.Add(1) wg.Add(1)
go func(inst Instance) { go func(inst Instance) {
defer wg.Done() defer wg.Done()
outch, err := inst.Exchange.GetBlocks(ctx, blkeys) outch, err := inst.Exchange.GetBlocks(ctx, blkeys)
if err != nil { if err != nil {
t.Fatal(err) errs <- err
} }
for _ = range outch { for _ = range outch {
} }
}(inst) }(inst)
} }
wg.Wait()
go func() {
wg.Wait()
close(errs)
}()
for err := range errs {
if err != nil {
t.Fatal(err)
}
}
t.Log("Verify!") t.Log("Verify!")

View File

@ -4,6 +4,7 @@ package readonly
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"io/ioutil" "io/ioutil"
"math/rand" "math/rand"
@ -154,6 +155,7 @@ func TestIpfsStressRead(t *testing.T) {
// Now read a bunch, concurrently // Now read a bunch, concurrently
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
errs := make(chan error)
for s := 0; s < 4; s++ { for s := 0; s < 4; s++ {
wg.Add(1) wg.Add(1)
@ -165,26 +167,36 @@ func TestIpfsStressRead(t *testing.T) {
fname := path.Join(mnt.Dir, item) fname := path.Join(mnt.Dir, item)
rbuf, err := ioutil.ReadFile(fname) rbuf, err := ioutil.ReadFile(fname)
if err != nil { if err != nil {
t.Fatal(err) errs <- err
} }
read, err := coreunix.Cat(nd.Context(), nd, item) read, err := coreunix.Cat(nd.Context(), nd, item)
if err != nil { if err != nil {
t.Fatal(err) errs <- err
} }
data, err := ioutil.ReadAll(read) data, err := ioutil.ReadAll(read)
if err != nil { if err != nil {
t.Fatal(err) errs <- err
} }
if !bytes.Equal(rbuf, data) { if !bytes.Equal(rbuf, data) {
t.Fatal("Incorrect Read!") errs <- errors.New("Incorrect Read!")
} }
} }
}() }()
} }
wg.Wait()
go func() {
wg.Wait()
close(errs)
}()
for err := range errs {
if err != nil {
t.Fatal(err)
}
}
} }
// Test writing a file and reading it back // Test writing a file and reading it back

View File

@ -2,6 +2,7 @@ package merkledag_test
import ( import (
"bytes" "bytes"
"errors"
"fmt" "fmt"
"io" "io"
"io/ioutil" "io/ioutil"
@ -193,32 +194,43 @@ func runBatchFetchTest(t *testing.T, read io.Reader) {
} }
wg := sync.WaitGroup{} wg := sync.WaitGroup{}
errs := make(chan error)
for i := 1; i < len(dagservs); i++ { for i := 1; i < len(dagservs); i++ {
wg.Add(1) wg.Add(1)
go func(i int) { go func(i int) {
defer wg.Done() defer wg.Done()
first, err := dagservs[i].Get(ctx, k) first, err := dagservs[i].Get(ctx, k)
if err != nil { if err != nil {
t.Fatal(err) errs <- err
} }
fmt.Println("Got first node back.") fmt.Println("Got first node back.")
read, err := uio.NewDagReader(ctx, first, dagservs[i]) read, err := uio.NewDagReader(ctx, first, dagservs[i])
if err != nil { if err != nil {
t.Fatal(err) errs <- err
} }
datagot, err := ioutil.ReadAll(read) datagot, err := ioutil.ReadAll(read)
if err != nil { if err != nil {
t.Fatal(err) errs <- err
} }
if !bytes.Equal(datagot, expected) { if !bytes.Equal(datagot, expected) {
t.Fatal("Got bad data back!") errs <- errors.New("Got bad data back!")
} }
}(i) }(i)
} }
wg.Wait() go func() {
wg.Wait()
close(errs)
}()
for err := range errs {
if err != nil {
t.Fatal(err)
}
}
} }
func TestRecursiveAdd(t *testing.T) { func TestRecursiveAdd(t *testing.T) {

View File

@ -142,6 +142,7 @@ func sizeOfIthFile(i int64) int64 {
} }
func runFileAddingWorker(n *core.IpfsNode) error { func runFileAddingWorker(n *core.IpfsNode) error {
errs := make(chan error)
go func() { go func() {
var i int64 var i int64
for i = 1; i < math.MaxInt32; i++ { for i = 1; i < math.MaxInt32; i++ {
@ -149,17 +150,26 @@ func runFileAddingWorker(n *core.IpfsNode) error {
go func() { go func() {
defer pipew.Close() defer pipew.Close()
if err := random.WritePseudoRandomBytes(sizeOfIthFile(i), pipew, *seed); err != nil { if err := random.WritePseudoRandomBytes(sizeOfIthFile(i), pipew, *seed); err != nil {
log.Fatal(err) errs <- err
} }
}() }()
k, err := coreunix.Add(n, piper) k, err := coreunix.Add(n, piper)
if err != nil { if err != nil {
log.Fatal(err) errs <- err
} }
log.Println("added file", "seed", *seed, "#", i, "key", k, "size", unit.Information(sizeOfIthFile(i))) log.Println("added file", "seed", *seed, "#", i, "key", k, "size", unit.Information(sizeOfIthFile(i)))
time.Sleep(1 * time.Second) time.Sleep(1 * time.Second)
} }
}() }()
var i int64
for i = 0; i < math.MaxInt32; i++ {
err := <-errs
if err != nil {
log.Fatal(err)
}
}
return nil return nil
} }
@ -180,18 +190,20 @@ func runFileCattingWorker(ctx context.Context, n *core.IpfsNode) error {
return err return err
} }
errs := make(chan error)
go func() { go func() {
defer dummy.Close() defer dummy.Close()
var i int64 = 1 var i int64 = 1
for { for {
buf := new(bytes.Buffer) buf := new(bytes.Buffer)
if err := random.WritePseudoRandomBytes(sizeOfIthFile(i), buf, *seed); err != nil { if err := random.WritePseudoRandomBytes(sizeOfIthFile(i), buf, *seed); err != nil {
log.Fatal(err) errs <- err
} }
// add to a dummy node to discover the key // add to a dummy node to discover the key
k, err := coreunix.Add(dummy, bytes.NewReader(buf.Bytes())) k, err := coreunix.Add(dummy, bytes.NewReader(buf.Bytes()))
if err != nil { if err != nil {
log.Fatal(err) errs <- err
} }
e := elog.EventBegin(ctx, "cat", logging.LoggableF(func() map[string]interface{} { e := elog.EventBegin(ctx, "cat", logging.LoggableF(func() map[string]interface{} {
return map[string]interface{}{ return map[string]interface{}{
@ -212,6 +224,12 @@ func runFileCattingWorker(ctx context.Context, n *core.IpfsNode) error {
time.Sleep(time.Second) time.Sleep(time.Second)
} }
}() }()
err = <- errs
if err != nil {
log.Fatal(err)
}
return nil return nil
} }