1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-07-02 03:28:25 +08:00

Bump goprocess

License: MIT
Signed-off-by: rht <rhtbot@gmail.com>
This commit is contained in:
rht
2015-07-15 09:40:23 +07:00
parent 88ec46ee15
commit 3089ebaba7
6 changed files with 104 additions and 21 deletions

2
Godeps/Godeps.json generated
View File

@ -200,7 +200,7 @@
},
{
"ImportPath": "github.com/jbenet/goprocess",
"Rev": "a6650d0b69f2aa0fe7c9685baf0b4d7ecc8766bf"
"Rev": "788dcf5ca3517f243d276394545ca6b3b4ac32d5"
},
{
"ImportPath": "github.com/kardianos/osext",

View File

@ -1,10 +1,10 @@
sudo: false
language: go
go:
- 1.3
- 1.4
- release
- tip
script:
- go test -race -cpu=5 -v ./...

View File

@ -23,14 +23,8 @@ func WithContext(ctx context.Context) goprocess.Process {
// WithContextAndTeardown is a helper function to set teardown at initiation
// of WithContext
func WithContextAndTeardown(ctx context.Context, tf goprocess.TeardownFunc) goprocess.Process {
if ctx == nil {
panic("nil Context")
}
p := goprocess.WithTeardown(tf)
go func() {
<-ctx.Done()
p.Close()
}()
CloseAfterContext(p, ctx)
return p
}
@ -61,6 +55,12 @@ func CloseAfterContext(p goprocess.Process, ctx context.Context) {
panic("nil Context")
}
// context.Background(). if ctx.Done() is nil, it will never be done.
// we check for this to avoid wasting a goroutine forever.
if ctx.Done() == nil {
return
}
go func() {
<-ctx.Done()
p.Close()

View File

@ -114,7 +114,7 @@ type Process interface {
//
// It is useful to construct simple asynchronous workers, children of p.
Go(f ProcessFunc) Process
// SetTeardown sets the process's teardown to tf.
SetTeardown(tf TeardownFunc)

View File

@ -1,6 +1,8 @@
package goprocess
import (
"fmt"
"runtime"
"syscall"
"testing"
"time"
@ -515,6 +517,71 @@ func TestWithSignals(t *testing.T) {
testClosed(t, p)
}
func TestMemoryLeak(t *testing.T) {
iters := 100
fanout := 10
P := newProcess(nil)
var memories []float32
measure := func(str string) float32 {
s := new(runtime.MemStats)
runtime.ReadMemStats(s)
//fmt.Printf("%d ", s.HeapObjects)
//fmt.Printf("%d ", len(P.children))
//fmt.Printf("%d ", runtime.NumGoroutine())
//fmt.Printf("%s: %dk\n", str, s.HeapAlloc/1000)
return float32(s.HeapAlloc) / 1000
}
spawn := func() []Process {
var ps []Process
// Spawn processes
for i := 0; i < fanout; i++ {
p := WithParent(P)
ps = append(ps, p)
for i := 0; i < fanout; i++ {
p2 := WithParent(p)
ps = append(ps, p2)
for i := 0; i < fanout; i++ {
p3 := WithParent(p2)
ps = append(ps, p3)
}
}
}
return ps
}
// Read initial memory stats
measure("initial")
for i := 0; i < iters; i++ {
ps := spawn()
//measure("alloc") // read after alloc
// Close all processes
for _, p := range ps {
p.Close()
<-p.Closed()
}
ps = nil
//measure("dealloc") // read after dealloc, but before gc
// wait until all/most goroutines finish
<-time.After(time.Millisecond)
// Run GC
runtime.GC()
memories = append(memories, measure("gc")) // read after gc
}
memoryInit := memories[10]
percentGrowth := 100 * (memories[len(memories)-1] - memoryInit) / memoryInit
fmt.Printf("Memory growth after %d iteration with each %d processes: %.2f%% after %dk\n", iters, fanout*fanout*fanout, percentGrowth, int(memoryInit))
}
func testClosing(t *testing.T, p Process) {
select {
case <-p.Closing():

View File

@ -6,9 +6,9 @@ import (
// process implements Process
type process struct {
children []*processLink // process to close with us
waitfors []*processLink // process to only wait for
waiters []*processLink // processes that wait for us. for gc.
children map[*processLink]struct{} // process to close with us
waitfors map[*processLink]struct{} // process to only wait for
waiters []*processLink // processes that wait for us. for gc.
teardown TeardownFunc // called to run the teardown logic.
waiting chan struct{} // closed when CloseAfterChildrenClosed is called.
@ -33,6 +33,8 @@ func newProcess(tf TeardownFunc) *process {
teardown: tf,
closed: make(chan struct{}),
closing: make(chan struct{}),
waitfors: make(map[*processLink]struct{}),
children: make(map[*processLink]struct{}),
}
}
@ -50,7 +52,7 @@ func (p *process) WaitFor(q Process) {
}
pl := newProcessLink(p, q)
p.waitfors = append(p.waitfors, pl)
p.waitfors[pl] = struct{}{}
p.Unlock()
go pl.AddToChild()
}
@ -71,7 +73,7 @@ func (p *process) AddChildNoWait(child Process) {
}
pl := newProcessLink(p, child)
p.children = append(p.children, pl)
p.children[pl] = struct{}{}
p.Unlock()
go pl.AddToChild()
}
@ -92,8 +94,12 @@ func (p *process) AddChild(child Process) {
}
pl := newProcessLink(p, child)
p.waitfors = append(p.waitfors, pl)
p.children = append(p.children, pl)
if p.waitfors != nil { // if p.waitfors hasn't been set nil
p.waitfors[pl] = struct{}{}
}
if p.children != nil { // if p.children hasn't been set nil
p.children[pl] = struct{}{}
}
p.Unlock()
go pl.AddToChild()
}
@ -167,7 +173,7 @@ func (p *process) doClose() {
close(p.closing) // signal that we're shutting down (Closing)
for len(p.children) > 0 || len(p.waitfors) > 0 {
for _, plc := range p.children {
for plc, _ := range p.children {
child := plc.Child()
if child != nil { // check because child may already have been removed.
go child.Close() // force all children to shut down
@ -180,7 +186,7 @@ func (p *process) doClose() {
// change under our feet.
wf := p.waitfors
p.waitfors = nil // clear them. release memory.
for _, w := range wf {
for w, _ := range wf {
// Here, we wait UNLOCKED, so that waitfors who are in the middle of
// adding a child to us can finish. we will immediately close the child.
p.Unlock()
@ -197,8 +203,18 @@ func (p *process) doClose() {
go func(waiters []*processLink) {
for _, pl := range waiters {
pl.ClearChild()
pr, ok := pl.Parent().(*process)
if !ok {
// parent has already been called to close
continue
}
pr.Lock()
delete(pr.waitfors, pl)
delete(pr.children, pl)
pr.Unlock()
}
}(p.waiters) // pass in so
p.waiters = nil // clear them. release memory.
}
// We will only wait on the children we have now.
@ -223,7 +239,7 @@ func (p *process) CloseAfterChildren() error {
nextToWaitFor := func() Process {
p.Lock()
defer p.Unlock()
for _, e := range p.waitfors {
for e, _ := range p.waitfors {
c := e.Child()
if c == nil {
continue