mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-01 19:24:14 +08:00
chore(util) remove forward
License: MIT Signed-off-by: Brian Tiger Chow <brian@perfmode.com>
This commit is contained in:
@ -1,34 +0,0 @@
|
|||||||
package async
|
|
||||||
|
|
||||||
import (
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
|
||||||
"github.com/jbenet/go-ipfs/blocks"
|
|
||||||
u "github.com/jbenet/go-ipfs/util"
|
|
||||||
)
|
|
||||||
|
|
||||||
var log = u.Logger("async")
|
|
||||||
|
|
||||||
// ForwardN forwards up to |num| blocks to the returned channel.
|
|
||||||
func ForwardN(ctx context.Context, in <-chan *blocks.Block, num int) <-chan *blocks.Block {
|
|
||||||
out := make(chan *blocks.Block)
|
|
||||||
go func() {
|
|
||||||
defer close(out)
|
|
||||||
for i := 0; i < num; i++ {
|
|
||||||
select {
|
|
||||||
case block, ok := <-in:
|
|
||||||
if !ok {
|
|
||||||
log.Error("Forwarder exiting early!")
|
|
||||||
return // otherwise nil value is forwarded to output
|
|
||||||
}
|
|
||||||
select {
|
|
||||||
case out <- block:
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
return out
|
|
||||||
}
|
|
@ -1,58 +0,0 @@
|
|||||||
package async
|
|
||||||
|
|
||||||
import (
|
|
||||||
"testing"
|
|
||||||
|
|
||||||
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
|
|
||||||
"github.com/jbenet/go-ipfs/blocks"
|
|
||||||
)
|
|
||||||
|
|
||||||
func TestForwardNThenClose(t *testing.T) {
|
|
||||||
const n = 2
|
|
||||||
const buf = 2 * n
|
|
||||||
in := make(chan *blocks.Block, buf)
|
|
||||||
ctx := context.Background()
|
|
||||||
out := ForwardN(ctx, in, n)
|
|
||||||
|
|
||||||
for i := 0; i < buf; i++ {
|
|
||||||
in <- blocks.NewBlock([]byte(""))
|
|
||||||
}
|
|
||||||
|
|
||||||
for i := 0; i < n; i++ {
|
|
||||||
_ = <-out
|
|
||||||
}
|
|
||||||
|
|
||||||
_, ok := <-out // closed
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t.Fatal("channel still open after receiving n blocks")
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestCloseInput(t *testing.T) {
|
|
||||||
const n = 2
|
|
||||||
in := make(chan *blocks.Block, 0)
|
|
||||||
ctx := context.Background()
|
|
||||||
out := ForwardN(ctx, in, n)
|
|
||||||
|
|
||||||
close(in)
|
|
||||||
_, ok := <-out // closed
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t.Fatal("input channel closed, but output channel not")
|
|
||||||
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestContextClosedWhenBlockingOnInput(t *testing.T) {
|
|
||||||
const n = 1 // but we won't ever send a block
|
|
||||||
ctx, cancel := context.WithCancel(context.Background())
|
|
||||||
out := ForwardN(ctx, make(chan *blocks.Block), n)
|
|
||||||
|
|
||||||
cancel() // before sending anything
|
|
||||||
_, ok := <-out
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
t.Fail()
|
|
||||||
}
|
|
Reference in New Issue
Block a user