1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-09-10 11:52:21 +08:00

updated goprocess, for periodic

This commit is contained in:
Juan Batiz-Benet
2015-01-20 05:53:56 -08:00
parent 010cedf0a9
commit c43f97d64e
11 changed files with 638 additions and 124 deletions

2
Godeps/Godeps.json generated
View File

@ -172,7 +172,7 @@
},
{
"ImportPath": "github.com/jbenet/goprocess",
"Rev": "7f96033e206c3cd4e79d1c61cbdfff57869feaf8"
"Rev": "c37725a4a97d6ad772818b071ceef82789562142"
},
{
"ImportPath": "github.com/kr/binarydist",

View File

@ -0,0 +1,11 @@
language: go
go:
- 1.2
- 1.3
- 1.4
- release
- tip
script:
- go test -v ./...

View File

@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright (c) 2014 Juan Batiz-Benet
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

View File

@ -1,5 +1,7 @@
# goprocess - lifecycles in go
[![travisbadge](https://travis-ci.org/jbenet/goprocess.svg)](https://travis-ci.org/jbenet/goprocess)
(Based on https://github.com/jbenet/go-ctxgroup)
- Godoc: https://godoc.org/github.com/jbenet/goprocess

View File

@ -18,7 +18,7 @@ import (
// More specifically, it fits this:
//
// p := WithTeardown(tf) // new process is created, it is now running.
// p.AddChild(q) // can register children **before** Closing.
// p.AddChild(q) // can register children **before** Closed().
// go p.Close() // blocks until done running teardown func.
// <-p.Closing() // would now return true.
// <-p.childrenDone() // wait on all children to be done

View File

@ -1,114 +0,0 @@
// +build ignore
// WARNING: this implementation is not correct.
// here only for historical purposes.
package goprocess
import (
"sync"
)
// process implements Process
type process struct {
children sync.WaitGroup // wait group for child goroutines
teardown TeardownFunc // called to run the teardown logic.
closing chan struct{} // closed once close starts.
closed chan struct{} // closed once close is done.
closeOnce sync.Once // ensure close is only called once.
closeErr error // error to return to clients of Close()
}
// newProcess constructs and returns a Process.
// It will call tf TeardownFunc exactly once:
// **after** all children have fully Closed,
// **after** entering <-Closing(), and
// **before** <-Closed().
func newProcess(tf TeardownFunc) *process {
if tf == nil {
tf = nilTeardownFunc
}
return &process{
teardown: tf,
closed: make(chan struct{}),
closing: make(chan struct{}),
}
}
func (p *process) WaitFor(q Process) {
p.children.Add(1) // p waits on q to be done
go func(p *process, q Process) {
<-q.Closed() // wait until q is closed
p.children.Done() // p done waiting on q
}(p, q)
}
func (p *process) AddChildNoWait(child Process) {
go func(p, child Process) {
<-p.Closing() // wait until p is closing
child.Close() // close child
}(p, child)
}
func (p *process) AddChild(child Process) {
select {
case <-p.Closing():
panic("attempt to add child to closing or closed process")
default:
}
p.children.Add(1) // p waits on child to be done
go func(p *process, child Process) {
<-p.Closing() // wait until p is closing
child.Close() // close child and wait
p.children.Done() // p done waiting on child
}(p, child)
}
func (p *process) Go(f ProcessFunc) Process {
select {
case <-p.Closing():
panic("attempt to add child to closing or closed process")
default:
}
// this is very similar to AddChild, but also runs the func
// in the child. we replicate it here to save one goroutine.
child := newProcessGoroutines(nil)
child.children.Add(1) // child waits on func to be done
p.AddChild(child)
go func() {
f(child)
child.children.Done() // wait on child's children to be done.
child.Close() // close to tear down.
}()
return child
}
// Close is the external close function.
// it's a wrapper around internalClose that waits on Closed()
func (p *process) Close() error {
p.closeOnce.Do(p.doClose)
<-p.Closed() // sync.Once should block, but this checks chan is closed too
return p.closeErr
}
func (p *process) Closing() <-chan struct{} {
return p.closing
}
func (p *process) Closed() <-chan struct{} {
return p.closed
}
// the _actual_ close process.
func (p *process) doClose() {
// this function should only be called once (hence the sync.Once).
// and it will panic (on closing channels) otherwise.
close(p.closing) // signal that we're shutting down (Closing)
p.children.Wait() // wait till all children are done (before teardown)
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)
close(p.closed) // signal that we're shut down (Closed)
}

View File

@ -92,16 +92,18 @@ func (p *process) Go(f ProcessFunc) Process {
// it's a wrapper around internalClose that waits on Closed()
func (p *process) Close() error {
p.Lock()
defer p.Unlock()
// if already closed, get out.
// if already closing, or closed, get out. (but wait!)
select {
case <-p.Closed():
case <-p.Closing():
p.Unlock()
<-p.Closed()
return p.closeErr
default:
}
p.doClose()
p.Unlock()
return p.closeErr
}
@ -120,12 +122,23 @@ func (p *process) doClose() {
close(p.closing) // signal that we're shutting down (Closing)
for _, c := range p.children {
go c.Close() // force all children to shut down
}
for len(p.children) > 0 || len(p.waitfors) > 0 {
for _, c := range p.children {
go c.Close() // force all children to shut down
}
p.children = nil // clear them
for _, w := range p.waitfors {
<-w.Closed() // wait till all waitfors are fully closed (before teardown)
// we must be careful not to iterate over waitfors directly, as it may
// change under our feet.
wf := p.waitfors
p.waitfors = nil // clear them
for _, w := range wf {
// Here, we wait UNLOCKED, so that waitfors who are in the middle of
// adding a child to us can finish. we will immediately close the child.
p.Unlock()
<-w.Closed() // wait till all waitfors are fully closed (before teardown)
p.Lock()
}
}
p.closeErr = p.teardown() // actually run the close logic (ok safe to teardown)

View File

@ -0,0 +1,4 @@
# goprocess/periodic - periodic process creation
- goprocess: https://github.com/jbenet/goprocess
- Godoc: https://godoc.org/github.com/jbenet/goprocess/periodic

View File

@ -0,0 +1,85 @@
package periodicproc_test
import (
"fmt"
"time"
goprocess "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
periodicproc "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/periodic"
)
func ExampleEvery() {
tock := make(chan struct{})
i := 0
p := periodicproc.Every(time.Second, func(proc goprocess.Process) {
tock <- struct{}{}
fmt.Printf("hello %d\n", i)
i++
})
<-tock
<-tock
<-tock
p.Close()
// Output:
// hello 0
// hello 1
// hello 2
}
func ExampleTick() {
p := periodicproc.Tick(time.Second, func(proc goprocess.Process) {
fmt.Println("tick")
})
<-time.After(3*time.Second + 500*time.Millisecond)
p.Close()
// Output:
// tick
// tick
// tick
}
func ExampleTickGo() {
// with TickGo, execution is not rate limited,
// there can be many in-flight simultaneously
wait := make(chan struct{})
p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) {
fmt.Println("tick")
<-wait
})
<-time.After(3*time.Second + 500*time.Millisecond)
wait <- struct{}{}
wait <- struct{}{}
wait <- struct{}{}
p.Close() // blocks us until all children are closed.
// Output:
// tick
// tick
// tick
}
func ExampleOnSignal() {
sig := make(chan struct{})
p := periodicproc.OnSignal(sig, func(proc goprocess.Process) {
fmt.Println("fire!")
})
sig <- struct{}{}
sig <- struct{}{}
sig <- struct{}{}
p.Close()
// Output:
// fire!
// fire!
// fire!
}

View File

@ -0,0 +1,232 @@
// Package periodic is part of github.com/jbenet/goprocess.
// It provides a simple periodic processor that calls a function
// periodically based on some options.
//
// For example:
//
// // use a time.Duration
// p := periodicproc.Every(time.Second, func(proc goprocess.Process) {
// fmt.Printf("the time is %s and all is well", time.Now())
// })
//
// <-time.After(5*time.Second)
// p.Close()
//
// // use a time.Time channel (like time.Ticker)
// p := periodicproc.Tick(time.Tick(time.Second), func(proc goprocess.Process) {
// fmt.Printf("the time is %s and all is well", time.Now())
// })
//
// <-time.After(5*time.Second)
// p.Close()
//
// // or arbitrary signals
// signal := make(chan struct{})
// p := periodicproc.OnSignal(signal, func(proc goprocess.Process) {
// fmt.Printf("the time is %s and all is well", time.Now())
// })
//
// signal<- struct{}{}
// signal<- struct{}{}
// <-time.After(5 * time.Second)
// signal<- struct{}{}
// p.Close()
//
package periodicproc
import (
"time"
gp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
// Every calls the given ProcessFunc at periodic intervals. Internally, it uses
// <-time.After(interval), so it will have the behavior of waiting _at least_
// interval in between calls. If you'd prefer the time.Ticker behavior, use
// periodicproc.Tick instead.
// This is sequentially rate limited, only one call will be in-flight at a time.
func Every(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-time.After(interval):
select {
case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done.
case <-proc.Closing(): // we're told to close
return
}
case <-proc.Closing(): // we're told to close
return
}
}
})
}
// EveryGo calls the given ProcessFunc at periodic intervals. Internally, it uses
// <-time.After(interval)
// This is not rate limited, multiple calls could be in-flight at the same time.
func EveryGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-time.After(interval):
proc.Go(procfunc)
case <-proc.Closing(): // we're told to close
return
}
}
})
}
// Tick constructs a ticker with interval, and calls the given ProcessFunc every
// time the ticker fires.
// This is sequentially rate limited, only one call will be in-flight at a time.
//
// p := periodicproc.Tick(time.Second, func(proc goprocess.Process) {
// fmt.Println("fire!")
// })
//
// <-time.After(3 * time.Second)
// p.Close()
//
// // Output:
// // fire!
// // fire!
// // fire!
func Tick(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
ticker := time.NewTicker(interval)
callOnTicker(ticker.C, procfunc)(proc)
ticker.Stop()
})
}
// TickGo constructs a ticker with interval, and calls the given ProcessFunc every
// time the ticker fires.
// This is not rate limited, multiple calls could be in-flight at the same time.
//
// p := periodicproc.TickGo(time.Second, func(proc goprocess.Process) {
// fmt.Println("fire!")
// <-time.After(10 * time.Second) // will not block sequential execution
// })
//
// <-time.After(3 * time.Second)
// p.Close()
//
// // Output:
// // fire!
// // fire!
// // fire!
func TickGo(interval time.Duration, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
ticker := time.NewTicker(interval)
goCallOnTicker(ticker.C, procfunc)(proc)
ticker.Stop()
})
}
// Ticker calls the given ProcessFunc every time the ticker fires.
// This is sequentially rate limited, only one call will be in-flight at a time.
func Ticker(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(callOnTicker(ticker, procfunc))
}
// TickerGo calls the given ProcessFunc every time the ticker fires.
// This is not rate limited, multiple calls could be in-flight at the same time.
func TickerGo(ticker <-chan time.Time, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(goCallOnTicker(ticker, procfunc))
}
func callOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc {
return func(proc gp.Process) {
for {
select {
case <-ticker:
select {
case <-proc.Go(pf).Closed(): // spin it out as a child, and wait till it's done.
case <-proc.Closing(): // we're told to close
return
}
case <-proc.Closing(): // we're told to close
return
}
}
}
}
func goCallOnTicker(ticker <-chan time.Time, pf gp.ProcessFunc) gp.ProcessFunc {
return func(proc gp.Process) {
for {
select {
case <-ticker:
proc.Go(pf)
case <-proc.Closing(): // we're told to close
return
}
}
}
}
// OnSignal calls the given ProcessFunc every time the signal fires, and waits for it to exit.
// This is sequentially rate limited, only one call will be in-flight at a time.
//
// sig := make(chan struct{})
// p := periodicproc.OnSignal(sig, func(proc goprocess.Process) {
// fmt.Println("fire!")
// <-time.After(time.Second) // delays sequential execution by 1 second
// })
//
// sig<- struct{}
// sig<- struct{}
// sig<- struct{}
//
// // Output:
// // fire!
// // fire!
// // fire!
func OnSignal(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-sig:
select {
case <-proc.Go(procfunc).Closed(): // spin it out as a child, and wait till it's done.
case <-proc.Closing(): // we're told to close
return
}
case <-proc.Closing(): // we're told to close
return
}
}
})
}
// OnSignalGo calls the given ProcessFunc every time the signal fires.
// This is not rate limited, multiple calls could be in-flight at the same time.
//
// sig := make(chan struct{})
// p := periodicproc.OnSignalGo(sig, func(proc goprocess.Process) {
// fmt.Println("fire!")
// <-time.After(time.Second) // wont block execution
// })
//
// sig<- struct{}
// sig<- struct{}
// sig<- struct{}
//
// // Output:
// // fire!
// // fire!
// // fire!
func OnSignalGo(sig <-chan struct{}, procfunc gp.ProcessFunc) gp.Process {
return gp.Go(func(proc gp.Process) {
for {
select {
case <-sig:
proc.Go(procfunc)
case <-proc.Closing(): // we're told to close
return
}
}
})
}

View File

@ -0,0 +1,260 @@
package periodicproc
import (
"testing"
"time"
ci "github.com/jbenet/go-cienv"
gp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess"
)
var (
grace = time.Millisecond * 5
interval = time.Millisecond * 10
timeout = time.Second * 5
)
func init() {
if ci.IsRunning() {
grace = time.Millisecond * 500
interval = time.Millisecond * 1000
timeout = time.Second * 15
}
}
func between(min, diff, max time.Duration) bool {
return min <= diff && diff <= max
}
func testBetween(t *testing.T, min, diff, max time.Duration) {
if !between(min, diff, max) {
t.Error("time diff incorrect:", min, diff, max)
}
}
type intervalFunc func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process)
func testSeq(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
p := toTest(times, nil)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, interval-grace, next.Sub(last), interval+grace)
last = next
}
go p.Close()
select {
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func testSeqWait(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
wait := make(chan struct{})
p := toTest(times, wait)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, interval-grace, next.Sub(last), interval+grace)
<-time.After(interval * 2) // make it wait.
last = time.Now() // make it now (sequential)
wait <- struct{}{} // release it.
}
go p.Close()
select {
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func testSeqNoWait(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
wait := make(chan struct{})
p := toTest(times, wait)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, 0, next.Sub(last), interval+grace) // min of 0
<-time.After(interval * 2) // make it wait.
last = time.Now() // make it now (sequential)
wait <- struct{}{} // release it.
}
go p.Close()
end:
select {
case wait <- struct{}{}: // drain any extras.
goto end
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func testParallel(t *testing.T, toTest intervalFunc) {
t.Parallel()
last := time.Now()
times := make(chan time.Time, 10)
wait := make(chan struct{})
p := toTest(times, wait)
for i := 0; i < 5; i++ {
next := <-times
testBetween(t, interval-grace, next.Sub(last), interval+grace)
last = next
<-time.After(interval * 2) // make it wait.
wait <- struct{}{} // release it.
}
go p.Close()
end:
select {
case wait <- struct{}{}: // drain any extras.
goto end
case <-p.Closed():
case <-time.After(timeout):
t.Error("proc failed to close")
}
}
func TestEverySeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Every(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestEverySeqWait(t *testing.T) {
testSeqWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Every(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestEveryGoSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return EveryGo(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestEveryGoSeqParallel(t *testing.T) {
testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return EveryGo(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Tick(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickSeqNoWait(t *testing.T) {
testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Tick(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickGoSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickGo(interval, func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickGoSeqParallel(t *testing.T) {
testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickGo(interval, func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickerSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Ticker(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickerSeqNoWait(t *testing.T) {
testSeqNoWait(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return Ticker(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}
func TestTickerGoSeq(t *testing.T) {
testSeq(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickerGo(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
})
})
}
func TestTickerGoParallel(t *testing.T) {
testParallel(t, func(times chan<- time.Time, wait <-chan struct{}) (proc gp.Process) {
return TickerGo(time.Tick(interval), func(proc gp.Process) {
times <- time.Now()
select {
case <-wait:
case <-proc.Closing():
}
})
})
}