1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-10-25 02:16:56 +08:00

moved util/ctx to github.com/jbenet/go-context

License: MIT
Signed-off-by: Juan Batiz-Benet <juan@benet.ai>
This commit is contained in:
Juan Batiz-Benet
2015-07-10 17:48:54 -07:00
parent 83311212ea
commit 11937be180
9 changed files with 103 additions and 53 deletions

13
Godeps/Godeps.json generated
View File

@ -14,11 +14,6 @@
"Comment": "null-5",
"Rev": "75cd24fc2f2c2a2088577d12123ddee5f54e0675"
},
{
"ImportPath": "code.google.com/p/goprotobuf/proto",
"Comment": "go.r60-152",
"Rev": "36be16571e14f67e114bb0af619e5de2c1591679"
},
{
"ImportPath": "github.com/ActiveState/tail",
"Rev": "068b72961a6bc5b4a82cf4fc14ccc724c0cfa73a"
@ -137,6 +132,14 @@
"ImportPath": "github.com/jbenet/go-base58",
"Rev": "6237cf65f3a6f7111cd8a42be3590df99a66bc7d"
},
{
"ImportPath": "github.com/jbenet/go-context/frac",
"Rev": "d14ea06fba99483203c19d92cfcd13ebe73135f4"
},
{
"ImportPath": "github.com/jbenet/go-context/io",
"Rev": "d14ea06fba99483203c19d92cfcd13ebe73135f4"
},
{
"ImportPath": "github.com/jbenet/go-datastore",
"Rev": "7d6acaf7c0164c335f2ca4100f8fe30a7e2943dd"

View File

@ -0,0 +1,62 @@
// Package ctxext provides multiple useful context constructors.
package ctxext
import (
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
// WithDeadlineFraction returns a Context with a fraction of the
// original context's timeout. This is useful in sequential pipelines
// of work, where one might try options and fall back to others
// depending on the time available, or failure to respond. For example:
//
// // getPicture returns a picture from our encrypted database
// // we have a pipeline of multiple steps. we need to:
// // - get the data from a database
// // - decrypt it
// // - apply many transforms
// //
// // we **know** that each step takes increasingly more time.
// // The transforms are much more expensive than decryption, and
// // decryption is more expensive than the database lookup.
// // If our database takes too long (i.e. >0.2 of available time),
// // there's no use in continuing.
// func getPicture(ctx context.Context, key string) ([]byte, error) {
// // fractional timeout contexts to the rescue!
//
// // try the database with 0.2 of remaining time.
// ctx1, _ := ctxext.WithDeadlineFraction(ctx, 0.2)
// val, err := db.Get(ctx1, key)
// if err != nil {
// return nil, err
// }
//
// // try decryption with 0.3 of remaining time.
// ctx2, _ := ctxext.WithDeadlineFraction(ctx, 0.3)
// if val, err = decryptor.Decrypt(ctx2, val); err != nil {
// return nil, err
// }
//
// // try transforms with all remaining time. hopefully it's enough!
// return transformer.Transform(ctx, val)
// }
//
//
func WithDeadlineFraction(ctx context.Context, fraction float64) (
context.Context, context.CancelFunc) {
d, found := ctx.Deadline()
if !found { // no deadline
return context.WithCancel(ctx)
}
left := d.Sub(time.Now())
if left < 0 { // already passed...
return context.WithCancel(ctx)
}
left = time.Duration(float64(left) * fraction)
return context.WithTimeout(ctx, left)
}

View File

@ -1,17 +1,16 @@
package ctxutil
package ctxext
import (
"os"
"testing"
"time"
travis "github.com/ipfs/go-ipfs/util/testutil/ci/travis"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
// this test is on the context tool itself, not our stuff. it's for sanity on ours.
func TestDeadline(t *testing.T) {
if travis.IsRunning() {
if os.Getenv("TRAVIS") == "true" {
t.Skip("timeouts don't work reliably on travis")
}
@ -43,7 +42,7 @@ func TestDeadlineFractionForever(t *testing.T) {
}
func TestDeadlineFractionHalf(t *testing.T) {
if travis.IsRunning() {
if os.Getenv("TRAVIS") == "true" {
t.Skip("timeouts don't work reliably on travis")
}

View File

@ -1,4 +1,14 @@
package ctxutil
// Package ctxio provides io.Reader and io.Writer wrappers that
// respect context.Contexts. Use these at the interface between
// your context code and your io.
//
// WARNING: read the code. see how writes and reads will continue
// until you cancel the io. Maybe this package should provide
// versions of io.ReadCloser and io.WriteCloser that automatically
// call .Close when the context expires. But for now -- since in my
// use cases I have long-lived connections with ephemeral io wrappers
// -- this has yet to be a need.
package ctxio
import (
"io"

View File

@ -1,4 +1,4 @@
package ctxutil
package ctxio
import (
"bytes"
@ -49,8 +49,8 @@ func TestReader(t *testing.T) {
}
func TestWriter(t *testing.T) {
buf := new(bytes.Buffer)
w := NewWriter(context.Background(), buf)
var buf bytes.Buffer
w := NewWriter(context.Background(), &buf)
// write three
n, err := w.Write([]byte("abc"))

View File

@ -13,15 +13,14 @@ import (
ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io"
proto "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/proto"
ctxio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/io"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
pb "github.com/ipfs/go-ipfs/diagnostics/pb"
host "github.com/ipfs/go-ipfs/p2p/host"
inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
protocol "github.com/ipfs/go-ipfs/p2p/protocol"
util "github.com/ipfs/go-ipfs/util"
ctxutil "github.com/ipfs/go-ipfs/util/ctx"
)
var log = util.Logger("diagnostics")
@ -209,8 +208,8 @@ func (d *Diagnostics) getDiagnosticFromPeer(ctx context.Context, p peer.ID, pmes
return nil, err
}
cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(cw)
@ -267,8 +266,8 @@ func newMessage(diagID string) *pb.Message {
func (d *Diagnostics) HandleMessage(ctx context.Context, s inet.Stream) error {
cr := ctxutil.NewReader(ctx, s)
cw := ctxutil.NewWriter(ctx, s)
cr := ctxio.NewReader(ctx, s)
cw := ctxio.NewWriter(ctx, s)
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax) // maxsize
w := ggio.NewDelimitedWriter(cw)

View File

@ -4,13 +4,12 @@ import (
"errors"
"time"
ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io"
ctxio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/io"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
inet "github.com/ipfs/go-ipfs/p2p/net"
peer "github.com/ipfs/go-ipfs/p2p/peer"
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
ctxutil "github.com/ipfs/go-ipfs/util/ctx"
ggio "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/gogo/protobuf/io"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
// handleNewStream implements the inet.StreamHandler
@ -22,8 +21,8 @@ func (dht *IpfsDHT) handleNewMessage(s inet.Stream) {
defer s.Close()
ctx := dht.Context()
cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(cw)
mPeer := s.Conn().RemotePeer()
@ -78,8 +77,8 @@ func (dht *IpfsDHT) sendRequest(ctx context.Context, p peer.ID, pmes *pb.Message
}
defer s.Close()
cr := ctxutil.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
cr := ctxio.NewReader(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
r := ggio.NewDelimitedReader(cr, inet.MessageSizeMax)
w := ggio.NewDelimitedWriter(cw)
@ -116,7 +115,7 @@ func (dht *IpfsDHT) sendMessage(ctx context.Context, p peer.ID, pmes *pb.Message
}
defer s.Close()
cw := ctxutil.NewWriter(ctx, s) // ok to use. we defer close stream in this func
cw := ctxio.NewWriter(ctx, s) // ok to use. we defer close stream in this func
w := ggio.NewDelimitedWriter(cw)
if err := w.WriteMsg(pmes); err != nil {

View File

@ -3,13 +3,13 @@ package dht
import (
"fmt"
ctxfrac "github.com/ipfs/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-context/frac"
"github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
ci "github.com/ipfs/go-ipfs/p2p/crypto"
peer "github.com/ipfs/go-ipfs/p2p/peer"
routing "github.com/ipfs/go-ipfs/routing"
pb "github.com/ipfs/go-ipfs/routing/dht/pb"
record "github.com/ipfs/go-ipfs/routing/record"
ctxutil "github.com/ipfs/go-ipfs/util/ctx"
)
func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, error) {
@ -22,7 +22,7 @@ func (dht *IpfsDHT) GetPublicKey(ctx context.Context, p peer.ID) (ci.PubKey, err
}
// ok, try the node itself. if they're overwhelmed or slow we can move on.
ctxT, cancelFunc := ctxutil.WithDeadlineFraction(ctx, 0.3)
ctxT, cancelFunc := ctxfrac.WithDeadlineFraction(ctx, 0.3)
defer cancelFunc()
if pk, err := dht.getPublicKeyFromNode(ctx, p); err == nil {
err := dht.peerstore.AddPubKey(p, pk)

View File

@ -1,22 +0,0 @@
package ctxutil
import (
"time"
context "github.com/ipfs/go-ipfs/Godeps/_workspace/src/golang.org/x/net/context"
)
func WithDeadlineFraction(ctx context.Context, fraction float64) (context.Context, context.CancelFunc) {
d, found := ctx.Deadline()
if !found { // no deadline
return context.WithCancel(ctx)
}
left := d.Sub(time.Now())
if left < 0 { // already passed...
return context.WithCancel(ctx)
}
left = time.Duration(float64(left) * fraction)
return context.WithTimeout(ctx, left)
}