From c3280ce88559d0ba1859fadd28ca70d04ef20d0c Mon Sep 17 00:00:00 2001 From: Jeromy Date: Tue, 8 Sep 2015 21:15:53 -0700 Subject: [PATCH] use new methods from goprocess/context, remove thirdparty/waitable License: MIT Signed-off-by: Jeromy --- Godeps/Godeps.json | 2 +- .../jbenet/goprocess/context/derive.go | 59 +++++++++++++++++++ .../github.com/jbenet/goprocess/goprocess.go | 4 ++ .../github.com/jbenet/goprocess/impl-mutex.go | 5 ++ blockservice/worker/worker.go | 4 +- core/bootstrap.go | 4 +- exchange/bitswap/workers.go | 6 +- routing/dht/ext_test.go | 2 +- routing/dht/query.go | 4 +- thirdparty/waitable/waitable.go | 19 ------ 10 files changed, 79 insertions(+), 30 deletions(-) create mode 100644 Godeps/_workspace/src/github.com/jbenet/goprocess/context/derive.go delete mode 100644 thirdparty/waitable/waitable.go diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 544fa32d2..6c0d949a5 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -199,7 +199,7 @@ }, { "ImportPath": "github.com/jbenet/goprocess", - "Rev": "4562d0c5780b8f060df2b84a8945bb8678bfc023" + "Rev": "64a8220330a485070813201cc05b0c6777f6a516" }, { "ImportPath": "github.com/kardianos/osext", diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/context/derive.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/context/derive.go new file mode 100644 index 000000000..b0aecee52 --- /dev/null +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/context/derive.go @@ -0,0 +1,59 @@ +package goprocessctx + +import ( + "errors" + "time" + + goprocess "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" +) + +const ( + closing = iota + closed +) + +type procContext struct { + done <-chan struct{} + which int +} + +// OnClosingContext derives a context from a given goprocess that will +// be 'Done' when the process is closing +func OnClosingContext(p goprocess.Process) context.Context { + return &procContext{ + done: p.Closing(), + which: closing, + } +} + +// OnClosedContext derives a context from a given goprocess that will +// be 'Done' when the process is closed +func OnClosedContext(p goprocess.Process) context.Context { + return &procContext{ + done: p.Closed(), + which: closed, + } +} + +func (c *procContext) Done() <-chan struct{} { + return c.done +} + +func (c *procContext) Deadline() (time.Time, bool) { + return time.Time{}, false +} + +func (c *procContext) Err() error { + if c.which == closing { + return errors.New("process closing") + } else if c.which == closed { + return errors.New("process closed") + } else { + panic("unrecognized process context type") + } +} + +func (c *procContext) Value(key interface{}) interface{} { + return nil +} diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go index b027d8214..a81ec8b92 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/goprocess.go @@ -139,6 +139,10 @@ type Process interface { // _after_ Close has completed; teardown has finished. The primary use case // of Closed is waiting for a Process to Close without _causing_ the Close. Closed() <-chan struct{} + + // Err waits until the process is closed, and then returns any error that + // occurred during shutdown. + Err() error } // TeardownFunc is a function used to cleanup state at the end of the diff --git a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go index a67deea75..fb86fcce2 100644 --- a/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go +++ b/Godeps/_workspace/src/github.com/jbenet/goprocess/impl-mutex.go @@ -163,6 +163,11 @@ func (p *process) Closed() <-chan struct{} { return p.closed } +func (p *process) Err() error { + <-p.Closed() + return p.closeErr +} + // the _actual_ close process. func (p *process) doClose() { // this function is only be called once (protected by p.Lock()). diff --git a/blockservice/worker/worker.go b/blockservice/worker/worker.go index 88cf4c326..88149add3 100644 --- a/blockservice/worker/worker.go +++ b/blockservice/worker/worker.go @@ -7,11 +7,11 @@ import ( "time" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" blocks "github.com/ipfs/go-ipfs/blocks" key "github.com/ipfs/go-ipfs/blocks/key" exchange "github.com/ipfs/go-ipfs/exchange" - waitable "github.com/ipfs/go-ipfs/thirdparty/waitable" util "github.com/ipfs/go-ipfs/util" ) @@ -121,7 +121,7 @@ func (w *Worker) start(c Config) { // reads from |workerChan| until w.process closes limiter := ratelimit.NewRateLimiter(w.process, c.NumWorkers) limiter.Go(func(proc process.Process) { - ctx := waitable.Context(proc) // shut down in-progress HasBlock when time to die + ctx := procctx.OnClosingContext(proc) // shut down in-progress HasBlock when time to die for { select { case <-proc.Closing(): diff --git a/core/bootstrap.go b/core/bootstrap.go index a7508de1f..bf95f351b 100644 --- a/core/bootstrap.go +++ b/core/bootstrap.go @@ -79,7 +79,7 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { // the periodic bootstrap function -- the connection supervisor periodic := func(worker goprocess.Process) { - ctx := procctx.WithProcessClosing(context.Background(), worker) + ctx := procctx.OnClosingContext(worker) defer log.EventBegin(ctx, "periodicBootstrap", n.Identity).Done() if err := bootstrapRound(ctx, n.PeerHost, cfg); err != nil { @@ -96,7 +96,7 @@ func Bootstrap(n *IpfsNode, cfg BootstrapConfig) (io.Closer, error) { // kick off Routing.Bootstrap if n.Routing != nil { - ctx := procctx.WithProcessClosing(context.Background(), proc) + ctx := procctx.OnClosingContext(proc) if err := n.Routing.Bootstrap(ctx); err != nil { proc.Close() return nil, err diff --git a/exchange/bitswap/workers.go b/exchange/bitswap/workers.go index e19cf2fbc..b33ea9221 100644 --- a/exchange/bitswap/workers.go +++ b/exchange/bitswap/workers.go @@ -4,9 +4,9 @@ import ( "time" process "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess" + procctx "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/context" ratelimit "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/goprocess/ratelimit" context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" - waitable "github.com/ipfs/go-ipfs/thirdparty/waitable" key "github.com/ipfs/go-ipfs/blocks/key" eventlog "github.com/ipfs/go-ipfs/thirdparty/eventlog" @@ -80,7 +80,7 @@ func (bs *Bitswap) provideWorker(px process.Process) { ev := eventlog.LoggableMap{"ID": wid} limiter.LimitedGo(func(px process.Process) { - ctx := waitable.Context(px) // derive ctx from px + ctx := procctx.OnClosingContext(px) // derive ctx from px defer log.EventBegin(ctx, "Bitswap.ProvideWorker.Work", ev, &k).Done() ctx, cancel := context.WithTimeout(ctx, provideTimeout) // timeout ctx @@ -97,7 +97,7 @@ func (bs *Bitswap) provideWorker(px process.Process) { limiter.Go(func(px process.Process) { for wid := 2; ; wid++ { ev := eventlog.LoggableMap{"ID": 1} - log.Event(waitable.Context(px), "Bitswap.ProvideWorker.Loop", ev) + log.Event(procctx.OnClosingContext(px), "Bitswap.ProvideWorker.Loop", ev) select { case <-px.Closing(): diff --git a/routing/dht/ext_test.go b/routing/dht/ext_test.go index 75219da5c..710a9afca 100644 --- a/routing/dht/ext_test.go +++ b/routing/dht/ext_test.go @@ -51,7 +51,7 @@ func TestGetFailures(t *testing.T) { err = merr[0] } - if err != context.DeadlineExceeded && err != context.Canceled { + if err.Error() != "process closing" { t.Fatal("Got different error than we expected", err) } } else { diff --git a/routing/dht/query.go b/routing/dht/query.go index a6c8a14b3..9906e5189 100644 --- a/routing/dht/query.go +++ b/routing/dht/query.go @@ -85,7 +85,7 @@ type dhtQueryRunner struct { func newQueryRunner(q *dhtQuery) *dhtQueryRunner { proc := process.WithParent(process.Background()) - ctx := ctxproc.WithProcessClosing(context.Background(), proc) + ctx := ctxproc.OnClosingContext(proc) return &dhtQueryRunner{ query: q, peersToQuery: queue.NewChanQueue(ctx, queue.NewXORDistancePQ(q.key)), @@ -210,7 +210,7 @@ func (r *dhtQueryRunner) queryPeer(proc process.Process, p peer.ID) { // ok let's do this! // create a context from our proc. - ctx := ctxproc.WithProcessClosing(context.Background(), proc) + ctx := ctxproc.OnClosingContext(proc) // make sure we do this when we exit defer func() { diff --git a/thirdparty/waitable/waitable.go b/thirdparty/waitable/waitable.go deleted file mode 100644 index 79aa32cc8..000000000 --- a/thirdparty/waitable/waitable.go +++ /dev/null @@ -1,19 +0,0 @@ -package waitable - -import ( - context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context" -) - -type Waitable interface { - Closing() <-chan struct{} -} - -// Context returns a context that cancels when the waitable is closing. -func Context(w Waitable) context.Context { - ctx, cancel := context.WithCancel(context.Background()) - go func() { - <-w.Closing() - cancel() - }() - return ctx -}