diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index a613a529d..a51a179f6 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -200,7 +200,7 @@ }, { "ImportPath": "github.com/jbenet/goprocess", - "Rev": "a6650d0b69f2aa0fe7c9685baf0b4d7ecc8766bf" + "Rev": "788dcf5ca3517f243d276394545ca6b3b4ac32d5" }, { "ImportPath": "github.com/kardianos/osext", diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml b/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml index d16d679a6..15de84170 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/.travis.yml @@ -1,10 +1,10 @@ +sudo: false + language: go go: - 1.3 - 1.4 - - release - - tip script: - go test -race -cpu=5 -v ./... diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/context/context.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/context/context.go index cb4aa3ef8..2ac8535a4 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/context/context.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/context/context.go @@ -23,14 +23,8 @@ func WithContext(ctx context.Context) goprocess.Process { // WithContextAndTeardown is a helper function to set teardown at initiation // of WithContext func WithContextAndTeardown(ctx context.Context, tf goprocess.TeardownFunc) goprocess.Process { - if ctx == nil { - panic("nil Context") - } p := goprocess.WithTeardown(tf) - go func() { - <-ctx.Done() - p.Close() - }() + CloseAfterContext(p, ctx) return p } @@ -61,6 +55,12 @@ func CloseAfterContext(p goprocess.Process, ctx context.Context) { panic("nil Context") } + // context.Background(). if ctx.Done() is nil, it will never be done. + // we check for this to avoid wasting a goroutine forever. + if ctx.Done() == nil { + return + } + go func() { <-ctx.Done() p.Close() diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go index f51ad512d..17cb37799 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go @@ -114,7 +114,7 @@ type Process interface { // // It is useful to construct simple asynchronous workers, children of p. Go(f ProcessFunc) Process - + // SetTeardown sets the process's teardown to tf. SetTeardown(tf TeardownFunc) diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go index 8516e03d0..81c195843 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess_test.go @@ -1,6 +1,8 @@ package goprocess import ( + "fmt" + "runtime" "syscall" "testing" "time" @@ -515,6 +517,71 @@ func TestWithSignals(t *testing.T) { testClosed(t, p) } +func TestMemoryLeak(t *testing.T) { + iters := 100 + fanout := 10 + P := newProcess(nil) + var memories []float32 + + measure := func(str string) float32 { + s := new(runtime.MemStats) + runtime.ReadMemStats(s) + //fmt.Printf("%d ", s.HeapObjects) + //fmt.Printf("%d ", len(P.children)) + //fmt.Printf("%d ", runtime.NumGoroutine()) + //fmt.Printf("%s: %dk\n", str, s.HeapAlloc/1000) + return float32(s.HeapAlloc) / 1000 + } + + spawn := func() []Process { + var ps []Process + // Spawn processes + for i := 0; i < fanout; i++ { + p := WithParent(P) + ps = append(ps, p) + + for i := 0; i < fanout; i++ { + p2 := WithParent(p) + ps = append(ps, p2) + + for i := 0; i < fanout; i++ { + p3 := WithParent(p2) + ps = append(ps, p3) + } + } + } + return ps + } + + // Read initial memory stats + measure("initial") + for i := 0; i < iters; i++ { + ps := spawn() + //measure("alloc") // read after alloc + + // Close all processes + for _, p := range ps { + p.Close() + <-p.Closed() + } + ps = nil + + //measure("dealloc") // read after dealloc, but before gc + + // wait until all/most goroutines finish + <-time.After(time.Millisecond) + + // Run GC + runtime.GC() + memories = append(memories, measure("gc")) // read after gc + } + + memoryInit := memories[10] + percentGrowth := 100 * (memories[len(memories)-1] - memoryInit) / memoryInit + fmt.Printf("Memory growth after %d iteration with each %d processes: %.2f%% after %dk\n", iters, fanout*fanout*fanout, percentGrowth, int(memoryInit)) + +} + func testClosing(t *testing.T, p Process) { select { case <-p.Closing(): diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go index e2a780183..c6acc4f41 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go @@ -6,9 +6,9 @@ import ( // process implements Process type process struct { - children []*processLink // process to close with us - waitfors []*processLink // process to only wait for - waiters []*processLink // processes that wait for us. for gc. + children map[*processLink]struct{} // process to close with us + waitfors map[*processLink]struct{} // process to only wait for + waiters []*processLink // processes that wait for us. for gc. teardown TeardownFunc // called to run the teardown logic. waiting chan struct{} // closed when CloseAfterChildrenClosed is called. @@ -33,6 +33,8 @@ func newProcess(tf TeardownFunc) *process { teardown: tf, closed: make(chan struct{}), closing: make(chan struct{}), + waitfors: make(map[*processLink]struct{}), + children: make(map[*processLink]struct{}), } } @@ -50,7 +52,7 @@ func (p *process) WaitFor(q Process) { } pl := newProcessLink(p, q) - p.waitfors = append(p.waitfors, pl) + p.waitfors[pl] = struct{}{} p.Unlock() go pl.AddToChild() } @@ -71,7 +73,7 @@ func (p *process) AddChildNoWait(child Process) { } pl := newProcessLink(p, child) - p.children = append(p.children, pl) + p.children[pl] = struct{}{} p.Unlock() go pl.AddToChild() } @@ -92,8 +94,12 @@ func (p *process) AddChild(child Process) { } pl := newProcessLink(p, child) - p.waitfors = append(p.waitfors, pl) - p.children = append(p.children, pl) + if p.waitfors != nil { // if p.waitfors hasn't been set nil + p.waitfors[pl] = struct{}{} + } + if p.children != nil { // if p.children hasn't been set nil + p.children[pl] = struct{}{} + } p.Unlock() go pl.AddToChild() } @@ -167,7 +173,7 @@ func (p *process) doClose() { close(p.closing) // signal that we're shutting down (Closing) for len(p.children) > 0 || len(p.waitfors) > 0 { - for _, plc := range p.children { + for plc, _ := range p.children { child := plc.Child() if child != nil { // check because child may already have been removed. go child.Close() // force all children to shut down @@ -180,7 +186,7 @@ func (p *process) doClose() { // change under our feet. wf := p.waitfors p.waitfors = nil // clear them. release memory. - for _, w := range wf { + for w, _ := range wf { // Here, we wait UNLOCKED, so that waitfors who are in the middle of // adding a child to us can finish. we will immediately close the child. p.Unlock() @@ -197,8 +203,18 @@ func (p *process) doClose() { go func(waiters []*processLink) { for _, pl := range waiters { pl.ClearChild() + pr, ok := pl.Parent().(*process) + if !ok { + // parent has already been called to close + continue + } + pr.Lock() + delete(pr.waitfors, pl) + delete(pr.children, pl) + pr.Unlock() } }(p.waiters) // pass in so + p.waiters = nil // clear them. release memory. } // We will only wait on the children we have now. @@ -223,7 +239,7 @@ func (p *process) CloseAfterChildren() error { nextToWaitFor := func() Process { p.Lock() defer p.Unlock() - for _, e := range p.waitfors { + for e, _ := range p.waitfors { c := e.Child() if c == nil { continue