mirror of
https://github.com/ipfs/kubo.git
synced 2025-07-03 04:37:30 +08:00
util: cleaner ByteChanReader.Read
This commit is contained in:
48
util/util.go
48
util/util.go
@ -60,40 +60,28 @@ func NewByteChanReader(in chan []byte) io.Reader {
|
|||||||
return &byteChanReader{in: in}
|
return &byteChanReader{in: in}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (bcr *byteChanReader) Read(b []byte) (int, error) {
|
func (bcr *byteChanReader) Read(output []byte) (int, error) {
|
||||||
if len(bcr.buf) == 0 {
|
remain := output
|
||||||
data, ok := <-bcr.in
|
remainLen := len(output)
|
||||||
if !ok {
|
outputLen := 0
|
||||||
return 0, io.EOF
|
more := false
|
||||||
}
|
next := bcr.buf
|
||||||
bcr.buf = data
|
|
||||||
|
for {
|
||||||
|
n := copy(remain, next)
|
||||||
|
remainLen -= n
|
||||||
|
outputLen += n
|
||||||
|
if remainLen == 0 {
|
||||||
|
bcr.buf = next[n:]
|
||||||
|
return outputLen, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
if len(bcr.buf) >= len(b) {
|
remain = remain[n:]
|
||||||
copy(b, bcr.buf)
|
next, more = <-bcr.in
|
||||||
bcr.buf = bcr.buf[len(b):]
|
if !more {
|
||||||
return len(b), nil
|
return outputLen, io.EOF
|
||||||
}
|
|
||||||
|
|
||||||
copy(b, bcr.buf)
|
|
||||||
b = b[len(bcr.buf):]
|
|
||||||
totread := len(bcr.buf)
|
|
||||||
|
|
||||||
for data := range bcr.in {
|
|
||||||
if len(data) > len(b) {
|
|
||||||
totread += len(b)
|
|
||||||
copy(b, data[:len(b)])
|
|
||||||
bcr.buf = data[len(b):]
|
|
||||||
return totread, nil
|
|
||||||
}
|
|
||||||
copy(b, data)
|
|
||||||
totread += len(data)
|
|
||||||
b = b[len(data):]
|
|
||||||
if len(b) == 0 {
|
|
||||||
return totread, nil
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return totread, io.EOF
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type randGen struct {
|
type randGen struct {
|
||||||
|
@ -2,6 +2,7 @@ package util
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
"bytes"
|
||||||
|
"fmt"
|
||||||
"io/ioutil"
|
"io/ioutil"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
"testing"
|
"testing"
|
||||||
@ -30,22 +31,20 @@ func TestKey(t *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestByteChanReader(t *testing.T) {
|
func TestByteChanReader(t *testing.T) {
|
||||||
data := make([]byte, 1024*1024)
|
|
||||||
r := NewTimeSeededRand()
|
var data bytes.Buffer
|
||||||
r.Read(data)
|
|
||||||
dch := make(chan []byte, 8)
|
dch := make(chan []byte, 8)
|
||||||
|
randr := NewTimeSeededRand()
|
||||||
|
|
||||||
go func() {
|
go func() {
|
||||||
beg := 0
|
defer close(dch)
|
||||||
for i := 0; i < len(data); {
|
for i := 0; i < rand.Intn(100)+100; i++ {
|
||||||
i += rand.Intn(100) + 1
|
chunk := make([]byte, rand.Intn(100000)+10)
|
||||||
if i > len(data) {
|
randr.Read(chunk)
|
||||||
i = len(data)
|
data.Write(chunk)
|
||||||
|
fmt.Printf("chunk: %6.d %v\n", len(chunk), chunk[:10])
|
||||||
|
dch <- chunk
|
||||||
}
|
}
|
||||||
dch <- data[beg:i]
|
|
||||||
beg = i
|
|
||||||
}
|
|
||||||
close(dch)
|
|
||||||
}()
|
}()
|
||||||
|
|
||||||
read := NewByteChanReader(dch)
|
read := NewByteChanReader(dch)
|
||||||
@ -54,7 +53,8 @@ func TestByteChanReader(t *testing.T) {
|
|||||||
t.Fatal(err)
|
t.Fatal(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bytes.Equal(out, data) {
|
// fmt.Printf("lens: %d == %d\n", len(out), len(data.Bytes()))
|
||||||
|
if !bytes.Equal(out, data.Bytes()) {
|
||||||
t.Fatal("Reader failed to stream correct bytes")
|
t.Fatal("Reader failed to stream correct bytes")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Reference in New Issue
Block a user