diff --git a/Godeps/Godeps.json b/Godeps/Godeps.json index 532d9f3ba..76c9cfea1 100644 --- a/Godeps/Godeps.json +++ b/Godeps/Godeps.json @@ -81,6 +81,10 @@ "ImportPath": "github.com/gorilla/mux", "Rev": "4b8fbc56f3b2400a7c7ea3dba9b3539787c486b6" }, + { + "ImportPath": "github.com/h2so5/utp", + "Rev": "654d875bb65e96729678180215cf080fe2810371" + }, { "ImportPath": "github.com/inconshreveable/go-update", "Rev": "221d034a558b4c21b0624b2a450c076913854a57" @@ -112,7 +116,7 @@ }, { "ImportPath": "github.com/jbenet/go-multiaddr-net", - "Rev": "625fac6e5073702ac9cd6028b9dc2457fd9cbf9d" + "Rev": "b6265d8119558acf3912db44abb34d97c30c3220" }, { "ImportPath": "github.com/jbenet/go-multihash", diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/.gitignore b/Godeps/_workspace/src/github.com/h2so5/utp/.gitignore new file mode 100644 index 000000000..485e6d2e2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/.gitignore @@ -0,0 +1,26 @@ +# Compiled Object files, Static and Dynamic libs (Shared Objects) +*.o +*.a +*.so + +# Folders +_obj +_test + +# Architecture specific extensions/prefixes +*.[568vq] +[568vq].out + +*.cgo1.go +*.cgo2.c +_cgo_defun.c +_cgo_gotypes.go +_cgo_export.* + +_testmain.go + +*.exe +*.test +*.prof + +_ucat_test/libutp diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/.travis.yml b/Godeps/_workspace/src/github.com/h2so5/utp/.travis.yml new file mode 100644 index 000000000..935bde506 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/.travis.yml @@ -0,0 +1,7 @@ +language: go + +script: + - GO_UTP_LOGGING=2 go test -v -bench . + - go test -v -race + - GO_UTP_LOGGING=2 go run benchmark/main.go -h + - GO_UTP_LOGGING=2 cd _ucat_test; make test diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/LICENSE b/Godeps/_workspace/src/github.com/h2so5/utp/LICENSE new file mode 100644 index 000000000..8ef628adc --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright (c) 2014 Ron Hashimoto + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/README.md b/Godeps/_workspace/src/github.com/h2so5/utp/README.md new file mode 100644 index 000000000..8fca2b3e6 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/README.md @@ -0,0 +1,57 @@ +utp +=== + +μTP (Micro Transport Protocol) implementation + +[![Build status](https://ci.appveyor.com/api/projects/status/j1be8y7p6nd2wqqw?svg=true)](https://ci.appveyor.com/project/h2so5/utp) +[![Build Status](https://travis-ci.org/h2so5/utp.svg)](https://travis-ci.org/h2so5/utp) +[![GoDoc](https://godoc.org/github.com/h2so5/utp?status.svg)](http://godoc.org/github.com/h2so5/utp) + +http://www.bittorrent.org/beps/bep_0029.html + +**warning: This is a buggy alpha version.** + +## Benchmark History + +[![Benchmark status](http://107.170.244.57:80/go-utp-bench.php)]() + +## Installation + +``` +go get github.com/h2so5/utp +``` + +## Example + +Echo server + +```go +package main + +import ( + "time" + + "github.com/h2so5/utp" +) + +func main() { + ln, _ := utp.Listen("utp", ":11000") + defer ln.Close() + + conn, _ := ln.AcceptUTP() + conn.SetKeepAlive(time.Minute) + defer conn.Close() + + for { + var buf [1024]byte + l, err := conn.Read(buf[:]) + if err != nil { + break + } + _, err = conn.Write(buf[:l]) + if err != nil { + break + } + } +} +``` diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/addr.go b/Godeps/_workspace/src/github.com/h2so5/utp/addr.go new file mode 100644 index 000000000..bb5af1d10 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/addr.go @@ -0,0 +1,34 @@ +package utp + +import "net" + +type UTPAddr struct { + net.Addr +} + +func (a UTPAddr) Network() string { return "utp" } + +func utp2udp(n string) (string, error) { + switch n { + case "utp": + return "udp", nil + case "utp4": + return "udp4", nil + case "utp6": + return "udp6", nil + default: + return "", net.UnknownNetworkError(n) + } +} + +func ResolveUTPAddr(n, addr string) (*UTPAddr, error) { + udpnet, err := utp2udp(n) + if err != nil { + return nil, err + } + udp, err := net.ResolveUDPAddr(udpnet, addr) + if err != nil { + return nil, err + } + return &UTPAddr{Addr: udp}, nil +} diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/benchmark/main.go b/Godeps/_workspace/src/github.com/h2so5/utp/benchmark/main.go new file mode 100644 index 000000000..fb29e6a72 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/benchmark/main.go @@ -0,0 +1,276 @@ +package main + +import ( + "bytes" + "crypto/md5" + "flag" + "fmt" + "io" + "log" + "math/rand" + "sync" + "time" + + "github.com/davecheney/profile" + "github.com/dustin/go-humanize" + "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/h2so5/utp" +) + +type RandReader struct{} + +func (r RandReader) Read(p []byte) (n int, err error) { + for i := range p { + p[i] = byte(rand.Int()) + } + return len(p), nil +} + +type ByteCounter struct { + n int64 + mutex sync.RWMutex +} + +func (b *ByteCounter) Write(p []byte) (n int, err error) { + b.mutex.Lock() + defer b.mutex.Unlock() + b.n += int64(len(p)) + return len(p), nil +} + +func (b *ByteCounter) Length() int64 { + b.mutex.RLock() + defer b.mutex.RUnlock() + return b.n +} + +var h = flag.Bool("h", false, "Human readable") + +func main() { + var l = flag.Int("c", 10485760, "Payload length (bytes)") + var s = flag.Bool("s", false, "Stream mode(Low memory usage, but Slow)") + flag.Parse() + + defer profile.Start(profile.CPUProfile).Stop() + + if *h { + fmt.Printf("Payload: %s\n", humanize.IBytes(uint64(*l))) + } else { + fmt.Printf("Payload: %d\n", *l) + } + + c2s := c2s(int64(*l), *s) + n, p := humanize.ComputeSI(c2s) + if *h { + fmt.Printf("C2S: %f%sbps\n", n, p) + } else { + fmt.Printf("C2S: %f\n", c2s) + } + + s2c := s2c(int64(*l), *s) + n, p = humanize.ComputeSI(s2c) + if *h { + fmt.Printf("S2C: %f%sbps\n", n, p) + } else { + fmt.Printf("S2C: %f\n", s2c) + } + + avg := (c2s + s2c) / 2.0 + n, p = humanize.ComputeSI(avg) + + if *h { + fmt.Printf("AVG: %f%sbps\n", n, p) + } else { + fmt.Printf("AVG: %f\n", avg) + } +} + +func c2s(l int64, stream bool) float64 { + ln, err := utp.Listen("utp", "127.0.0.1:0") + if err != nil { + log.Fatal(err) + } + + raddr, err := utp.ResolveUTPAddr("utp", ln.Addr().String()) + if err != nil { + log.Fatal(err) + } + + c, err := utp.DialUTPTimeout("utp", nil, raddr, 1000*time.Millisecond) + if err != nil { + log.Fatal(err) + } + defer c.Close() + + if err != nil { + log.Fatal(err) + } + + s, err := ln.Accept() + if err != nil { + log.Fatal(err) + } + defer s.Close() + ln.Close() + + rch := make(chan int) + + sendHash := md5.New() + readHash := md5.New() + counter := ByteCounter{} + + var bps float64 + if stream { + go func() { + defer c.Close() + io.Copy(io.MultiWriter(c, sendHash, &counter), io.LimitReader(RandReader{}, l)) + }() + + go func() { + io.Copy(readHash, s) + close(rch) + }() + + go func() { + for { + select { + case <-time.After(time.Second): + if *h { + fmt.Printf("\r <--> %s ", humanize.IBytes(uint64(counter.Length()))) + } else { + fmt.Printf("\r <--> %d ", counter.Length()) + } + case <-rch: + fmt.Printf("\r") + return + } + } + }() + + start := time.Now() + <-rch + bps = float64(l*8) / (float64(time.Now().Sub(start)) / float64(time.Second)) + + } else { + var sendBuf, readBuf bytes.Buffer + io.Copy(io.MultiWriter(&sendBuf, sendHash), io.LimitReader(RandReader{}, l)) + + go func() { + defer c.Close() + io.Copy(c, &sendBuf) + }() + + go func() { + io.Copy(&readBuf, s) + rch <- 0 + }() + + start := time.Now() + <-rch + bps = float64(l*8) / (float64(time.Now().Sub(start)) / float64(time.Second)) + + io.Copy(sendHash, &sendBuf) + io.Copy(readHash, &readBuf) + } + + if !bytes.Equal(sendHash.Sum(nil), readHash.Sum(nil)) { + log.Fatal("Broken payload") + } + + return bps +} + +func s2c(l int64, stream bool) float64 { + ln, err := utp.Listen("utp", "127.0.0.1:0") + if err != nil { + log.Fatal(err) + } + + raddr, err := utp.ResolveUTPAddr("utp", ln.Addr().String()) + if err != nil { + log.Fatal(err) + } + + c, err := utp.DialUTPTimeout("utp", nil, raddr, 1000*time.Millisecond) + if err != nil { + log.Fatal(err) + } + defer c.Close() + + if err != nil { + log.Fatal(err) + } + + s, err := ln.Accept() + if err != nil { + log.Fatal(err) + } + defer s.Close() + ln.Close() + + rch := make(chan int) + + sendHash := md5.New() + readHash := md5.New() + counter := ByteCounter{} + + var bps float64 + + if stream { + go func() { + defer s.Close() + io.Copy(io.MultiWriter(s, sendHash, &counter), io.LimitReader(RandReader{}, l)) + }() + + go func() { + io.Copy(readHash, c) + close(rch) + }() + + go func() { + for { + select { + case <-time.After(time.Second): + if *h { + fmt.Printf("\r <--> %s ", humanize.IBytes(uint64(counter.Length()))) + } else { + fmt.Printf("\r <--> %d ", counter.Length()) + } + case <-rch: + fmt.Printf("\r") + return + } + } + }() + + start := time.Now() + <-rch + bps = float64(l*8) / (float64(time.Now().Sub(start)) / float64(time.Second)) + + } else { + var sendBuf, readBuf bytes.Buffer + io.Copy(io.MultiWriter(&sendBuf, sendHash), io.LimitReader(RandReader{}, l)) + + go func() { + defer s.Close() + io.Copy(s, &sendBuf) + }() + + go func() { + io.Copy(&readBuf, c) + rch <- 0 + }() + + start := time.Now() + <-rch + bps = float64(l*8) / (float64(time.Now().Sub(start)) / float64(time.Second)) + + io.Copy(sendHash, &sendBuf) + io.Copy(readHash, &readBuf) + } + + if !bytes.Equal(sendHash.Sum(nil), readHash.Sum(nil)) { + log.Fatal("Broken payload") + } + + return bps +} diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/buffer.go b/Godeps/_workspace/src/github.com/h2so5/utp/buffer.go new file mode 100644 index 000000000..d889b5b30 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/buffer.go @@ -0,0 +1,230 @@ +package utp + +import ( + "errors" + "math" + "time" +) + +type packetBuffer struct { + root *packetBufferNode + size int + begin int +} + +type packetBufferNode struct { + p *packet + next *packetBufferNode + pushed time.Time +} + +func newPacketBuffer(size, begin int) *packetBuffer { + return &packetBuffer{ + size: size, + begin: begin, + } +} + +func (b *packetBuffer) push(p *packet) error { + if int(p.header.seq) > b.begin+b.size-1 { + return errors.New("out of bounds") + } else if int(p.header.seq) < b.begin { + if int(p.header.seq)+math.MaxUint16 > b.begin+b.size-1 { + return errors.New("out of bounds") + } + } + if b.root == nil { + b.root = &packetBufferNode{} + } + n := b.root + i := b.begin + for { + if i == int(p.header.seq) { + n.p = p + n.pushed = time.Now() + return nil + } else if n.next == nil { + n.next = &packetBufferNode{} + } + n = n.next + i = (i + 1) % (math.MaxUint16 + 1) + } + return nil +} + +func (b *packetBuffer) fetch(id uint16) *packet { + for p := b.root; p != nil; p = p.next { + if p.p != nil { + if p.p.header.seq < id { + p.p = nil + } else if p.p.header.seq == id { + r := p.p + p.p = nil + return r + } + } + } + return nil +} + +func (b *packetBuffer) compact() { + for b.root != nil && b.root.p == nil { + b.root = b.root.next + b.begin = (b.begin + 1) % (math.MaxUint16 + 1) + } +} + +func (b *packetBuffer) first() *packet { + if b.root == nil || b.root.p == nil { + return nil + } + return b.root.p +} + +func (b *packetBuffer) frontPushedTime() (time.Time, error) { + if b.root == nil || b.root.p == nil { + return time.Time{}, errors.New("no first packet") + } + return b.root.pushed, nil +} + +func (b *packetBuffer) fetchSequence() []*packet { + var a []*packet + for ; b.root != nil && b.root.p != nil; b.root = b.root.next { + a = append(a, b.root.p) + b.begin = (b.begin + 1) % (math.MaxUint16 + 1) + } + return a +} + +func (b *packetBuffer) sequence() []*packet { + var a []*packet + n := b.root + for ; n != nil && n.p != nil; n = n.next { + a = append(a, n.p) + } + return a +} + +func (b *packetBuffer) space() int { + s := b.size + for p := b.root; p != nil; p = p.next { + s-- + } + return s +} + +func (b *packetBuffer) empty() bool { + return b.root == nil +} + +// test use only +func (b *packetBuffer) all() []*packet { + var a []*packet + for p := b.root; p != nil; p = p.next { + if p.p != nil { + a = append(a, p.p) + } + } + return a +} + +func (b *packetBuffer) generateSelectiveACK() []byte { + if b.empty() { + return nil + } + + var ack []byte + var bit uint + var octet byte + for p := b.root.next; p != nil; p = p.next { + if p.p != nil { + octet |= (1 << bit) + } + bit++ + if bit == 8 { + ack = append(ack, octet) + bit = 0 + octet = 0 + } + } + + if bit != 0 { + ack = append(ack, octet) + } + + for len(ack) > 0 && ack[len(ack)-1] == 0 { + ack = ack[:len(ack)-1] + } + + return ack +} + +func (b *packetBuffer) processSelectiveACK(ack []byte) { + if b.empty() { + return + } + + p := b.root.next + if p == nil { + return + } + + for _, a := range ack { + for i := 0; i < 8; i++ { + acked := (a & 1) != 0 + a >>= 1 + if acked { + p.p = nil + } + p = p.next + if p == nil { + return + } + } + } +} + +type timedBuffer struct { + d time.Duration + root *timedBufferNode +} + +type timedBufferNode struct { + val float64 + next *timedBufferNode + pushed time.Time +} + +func (b *timedBuffer) push(val float64) { + var before *timedBufferNode + for n := b.root; n != nil; n = n.next { + if time.Now().Sub(n.pushed) >= b.d { + if before != nil { + before.next = nil + } else { + b.root = nil + } + break + } + before = n + } + b.root = &timedBufferNode{ + val: val, + next: b.root, + pushed: time.Now(), + } +} + +func (b *timedBuffer) min() float64 { + if b.root == nil { + return 0 + } + min := b.root.val + for n := b.root; n != nil; n = n.next { + if min > n.val { + min = n.val + } + } + return min +} diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/conn.go b/Godeps/_workspace/src/github.com/h2so5/utp/conn.go new file mode 100644 index 000000000..ee41e973d --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/conn.go @@ -0,0 +1,761 @@ +package utp + +import ( + "bytes" + "errors" + "io" + "math" + "math/rand" + "net" + "syscall" + "time" +) + +type UTPConn struct { + conn net.PacketConn + raddr net.Addr + rid, sid, seq, ack, lastAck uint16 + rtt, rttVar, minRtt, rto, dupAck int64 + diff, maxWindow uint32 + rdeadline, wdeadline time.Time + + state state + lastTimedOut time.Time + + outch chan *outgoingPacket + outchch chan int + sendch chan *outgoingPacket + sendchch chan int + recvch chan *packet + recvchch chan int + readch chan []byte + readchch chan int + winch chan uint32 + quitch chan int + activech chan int + connch chan error + finch chan int + closech chan<- uint16 + eofid uint16 + keepalivech chan time.Duration + + readbuf bytes.Buffer + recvbuf *packetBuffer + sendbuf *packetBuffer + + stat statistics +} + +type statistics struct { + sentPackets int + resentPackets int + receivedPackets int + receivedDuplicatedACKs int + packetTimedOuts int + sentSelectiveACKs int + receivedSelectiveACKs int + + rtoSum int + rtoCount int +} + +func dial(n string, laddr, raddr *UTPAddr, timeout time.Duration) (*UTPConn, error) { + udpnet, err := utp2udp(n) + if err != nil { + return nil, err + } + + // TODO extract + if laddr == nil { + addr, err := net.ResolveUDPAddr(udpnet, ":0") + if err != nil { + return nil, err + } + laddr = &UTPAddr{Addr: addr} + } + + conn, err := net.ListenPacket(udpnet, laddr.Addr.String()) + if err != nil { + return nil, err + } + + id := uint16(rand.Intn(math.MaxUint16)) + + c := newUTPConn() + c.conn = conn + c.raddr = raddr.Addr + c.rid = id + c.sid = id + 1 + c.seq = 1 + c.state = state_syn_sent + c.sendbuf = newPacketBuffer(window_size, 1) + + go c.recv() + go c.loop() + + select { + case c.sendch <- &outgoingPacket{st_syn, nil, nil}: + case <-c.sendchch: + return nil, errors.New("use of closed network connection") + } + + var t <-chan time.Time + if timeout != 0 { + t = time.After(timeout) + } + + select { + case err := <-c.connch: + if err != nil { + c.closed() + return nil, err + } + ulog.Printf(1, "Conn(%v): Connected", c.LocalAddr()) + return c, nil + case <-t: + c.quitch <- 0 + return nil, &timeoutError{} + } +} + +func newUTPConn() *UTPConn { + rto := 60 + + return &UTPConn{ + minRtt: math.MaxInt64, + maxWindow: mtu, + rto: int64(rto), + + outch: make(chan *outgoingPacket, 1), + outchch: make(chan int), + sendch: make(chan *outgoingPacket, 1), + sendchch: make(chan int), + recvch: make(chan *packet, 2), + recvchch: make(chan int), + winch: make(chan uint32, 1), + quitch: make(chan int), + activech: make(chan int), + readch: make(chan []byte, 1), + readchch: make(chan int), + connch: make(chan error, 1), + finch: make(chan int, 1), + + keepalivech: make(chan time.Duration), + + stat: statistics{ + rtoSum: rto, + rtoCount: 1, + }, + } +} + +func (c *UTPConn) ok() bool { return c != nil && c.conn != nil } + +func (c *UTPConn) Close() error { + if !c.ok() { + return syscall.EINVAL + } + + select { + case <-c.activech: + default: + c.quitch <- 0 + ulog.Printf(2, "Conn(%v): Wait for close", c.LocalAddr()) + <-c.finch + } + + return nil +} + +func (c *UTPConn) LocalAddr() net.Addr { + return &UTPAddr{Addr: c.conn.LocalAddr()} +} + +func (c *UTPConn) RemoteAddr() net.Addr { + return &UTPAddr{Addr: c.raddr} +} + +func (c *UTPConn) Read(b []byte) (int, error) { + if !c.ok() { + return 0, syscall.EINVAL + } + + if c.readbuf.Len() == 0 { + var timeout <-chan time.Time + if !c.rdeadline.IsZero() { + timeout = time.After(c.rdeadline.Sub(time.Now())) + } + + select { + case b := <-c.readch: + if b == nil { + return 0, io.EOF + } + _, err := c.readbuf.Write(b) + if err != nil { + return 0, err + } + case <-c.readchch: + loop: + for { + select { + case b := <-c.readch: + _, err := c.readbuf.Write(b) + if err != nil { + return 0, err + } + default: + break loop + } + } + if c.readbuf.Len() == 0 { + return 0, io.EOF + } + case <-timeout: + return 0, &timeoutError{} + } + } + return c.readbuf.Read(b) +} + +func (c *UTPConn) Write(b []byte) (int, error) { + if !c.ok() { + return 0, syscall.EINVAL + } + + var wrote uint64 + for { + l := uint64(len(b)) - wrote + if l > mss { + l = mss + } + select { + case c.outch <- &outgoingPacket{st_data, nil, b[wrote : wrote+l]}: + case <-c.outchch: + return 0, errors.New("use of closed network connection") + } + + wrote += l + ulog.Printf(4, "Conn(%v): Write %d/%d bytes", c.LocalAddr(), wrote, len(b)) + if l < mss { + break + } + } + + return len(b), nil +} + +func (c *UTPConn) SetDeadline(t time.Time) error { + if !c.ok() { + return syscall.EINVAL + } + if err := c.SetReadDeadline(t); err != nil { + return err + } + if err := c.SetWriteDeadline(t); err != nil { + return err + } + return nil +} + +func (c *UTPConn) SetReadDeadline(t time.Time) error { + if !c.ok() { + return syscall.EINVAL + } + c.rdeadline = t + return nil +} + +func (c *UTPConn) SetWriteDeadline(t time.Time) error { + if !c.ok() { + return syscall.EINVAL + } + c.wdeadline = t + return nil +} + +func (c *UTPConn) SetKeepAlive(d time.Duration) error { + if !c.ok() { + return syscall.EINVAL + } + select { + case <-c.activech: + default: + c.keepalivech <- d + } + return nil +} + +func readPacket(data []byte) (*packet, error) { + p := globalPool.get() + err := p.UnmarshalBinary(data) + if err != nil { + return nil, err + } + if p.header.ver != version { + return nil, errors.New("unsupported header version") + } + return p, nil +} + +func (c *UTPConn) recv() { + for { + var buf [mtu]byte + len, addr, err := c.conn.ReadFrom(buf[:]) + if err != nil { + return + } + if addr.String() != c.raddr.String() { + continue + } + p, err := readPacket(buf[:len]) + if err == nil { + select { + case c.recvch <- p: + case <-c.recvchch: + return + } + } + } +} + +func (c *UTPConn) loop() { + var recvExit, sendExit bool + var lastReceived time.Time + var keepalive <-chan time.Time + + go func() { + var window uint32 = window_size * mtu + for { + if window >= mtu { + select { + case b := <-c.outch: + select { + case c.sendch <- b: + window -= mtu + case <-c.sendchch: + return + } + case <-c.outchch: + return + case w := <-c.winch: + window = w + } + } else { + window = <-c.winch + } + } + }() + + for { + select { + case <-c.sendchch: + sendExit = true + default: + } + select { + case <-c.recvchch: + recvExit = true + default: + } + select { + case p := <-c.recvch: + ack := c.processPacket(p) + lastReceived = time.Now() + if ack { + out := &outgoingPacket{st_state, nil, nil} + selack := c.sendbuf.generateSelectiveACK() + if len(selack) > 0 { + out.ext = []extension{ + extension{ + typ: ext_selective_ack, + payload: selack, + }, + } + c.stat.sentSelectiveACKs++ + } + c.sendPacket(out) + } + + case b := <-c.sendch: + c.sendPacket(b) + + case <-time.After(time.Duration(c.rto) * time.Millisecond): + if !c.state.active && time.Now().Sub(lastReceived) > reset_timeout { + ulog.Printf(2, "Conn(%v): Connection timed out", c.LocalAddr()) + c.sendPacket(&outgoingPacket{st_reset, nil, nil}) + c.close() + } else { + t, err := c.sendbuf.frontPushedTime() + if err == nil && c.lastTimedOut != t && time.Now().Sub(t) > time.Duration(c.rto)*time.Millisecond { + c.lastTimedOut = t + c.stat.packetTimedOuts++ + c.maxWindow /= 2 + if c.maxWindow < mtu { + c.maxWindow = mtu + } + for _, p := range c.sendbuf.sequence() { + c.resendPacket(p) + } + } + } + case d := <-c.keepalivech: + if d <= 0 { + keepalive = nil + } else { + keepalive = time.Tick(d) + } + case <-keepalive: + ulog.Printf(2, "Conn(%v): Send keepalive", c.LocalAddr()) + c.sendPacket(&outgoingPacket{st_state, nil, nil}) + + case <-c.quitch: + if c.state.exit != nil { + c.state.exit(c) + } + } + if recvExit && sendExit { + return + } + } +} + +func (c *UTPConn) sendPacket(b *outgoingPacket) { + p := c.makePacket(b) + bin, err := p.MarshalBinary() + if err == nil { + ulog.Printf(3, "SEND %v -> %v: %v", c.conn.LocalAddr(), c.raddr, p.String()) + c.stat.sentPackets++ + _, err = c.conn.WriteTo(bin, c.raddr) + if err != nil { + return + } + if b.typ != st_state { + c.sendbuf.push(p) + } else { + globalPool.put(p) + } + } +} + +func (c *UTPConn) resendPacket(p *packet) { + bin, err := p.MarshalBinary() + if err == nil { + ulog.Printf(3, "RESEND %v -> %v: %v", c.conn.LocalAddr(), c.raddr, p.String()) + c.stat.resentPackets++ + _, err = c.conn.WriteTo(bin, c.raddr) + if err != nil { + return + } + } +} + +func currentMicrosecond() uint32 { + return uint32(time.Now().Nanosecond() / 1000) +} + +func (c *UTPConn) processPacket(p *packet) bool { + var ack bool + + if p.header.t == 0 { + c.diff = 0 + } else { + t := currentMicrosecond() + if t > p.header.t { + c.diff = t - p.header.t + if c.minRtt > int64(c.diff) { + c.minRtt = int64(c.diff) + } + } + } + + ulog.Printf(3, "RECV %v -> %v: %v", c.raddr, c.conn.LocalAddr(), p.String()) + c.stat.receivedPackets++ + + if p.header.typ == st_state { + + f := c.sendbuf.first() + if f != nil && p.header.ack == f.header.seq { + for _, e := range p.ext { + if e.typ == ext_selective_ack { + ulog.Printf(3, "Conn(%v): Receive Selective ACK", c.LocalAddr()) + c.stat.receivedSelectiveACKs++ + c.sendbuf.processSelectiveACK(e.payload) + } + } + } + + s := c.sendbuf.fetch(p.header.ack) + if s != nil { + current := currentMicrosecond() + if current > s.header.t { + e := int64(current-s.header.t) / 1000 + if c.rtt == 0 { + c.rtt = e + c.rttVar = e / 2 + } else { + d := c.rtt - e + if d < 0 { + d = -d + } + c.rttVar += (d - c.rttVar) / 4 + c.rtt = c.rtt - c.rtt/8 + e/8 + } + c.rto = c.rtt + c.rttVar*4 + if c.rto < 60 { + c.rto = 60 + } else if c.rto > 1000 { + c.rto = 1000 + } + c.stat.rtoSum += int(c.rto) + c.stat.rtoCount++ + } + + if c.diff != 0 { + ourDelay := float64(c.diff) + offTarget := 100000.0 - ourDelay + windowFactor := float64(mtu) / float64(c.maxWindow) + delayFactor := offTarget / 100000.0 + gain := 3000.0 * delayFactor * windowFactor + c.maxWindow = uint32(int(c.maxWindow) + int(gain)) + if c.maxWindow < mtu { + c.maxWindow = mtu + } + ulog.Printf(4, "Conn(%v): Update maxWindow: %d", c.LocalAddr(), c.maxWindow) + } + globalPool.put(s) + } + c.sendbuf.compact() + if c.lastAck == p.header.ack { + c.dupAck++ + if c.dupAck >= 2 { + ulog.Printf(3, "Conn(%v): Receive 3 duplicated acks: %d", c.LocalAddr(), p.header.ack) + c.stat.receivedDuplicatedACKs++ + p := c.sendbuf.first() + if p != nil { + c.maxWindow /= 2 + if c.maxWindow < mtu { + c.maxWindow = mtu + } + ulog.Printf(4, "Conn(%v): Update maxWindow: %d", c.LocalAddr(), c.maxWindow) + c.resendPacket(p) + } + c.dupAck = 0 + } + } else { + c.dupAck = 0 + } + c.lastAck = p.header.ack + if p.header.ack == c.seq-1 { + wnd := p.header.wnd + if wnd > c.maxWindow { + wnd = c.maxWindow + } + ulog.Printf(4, "Conn(%v): Reset window: %d", c.LocalAddr(), wnd) + go func() { + c.winch <- wnd + }() + } + if c.state.state != nil { + c.state.state(c, p) + } + globalPool.put(p) + } else if p.header.typ == st_reset { + globalPool.put(p) + c.close() + } else { + if c.recvbuf == nil { + return false + } + ack = true + c.recvbuf.push(p) + for _, s := range c.recvbuf.fetchSequence() { + c.ack = s.header.seq + switch s.header.typ { + case st_data: + if c.state.data != nil { + c.state.data(c, s) + } + case st_fin: + if c.state.fin != nil { + c.state.fin(c, s) + } + case st_state: + if c.state.state != nil { + c.state.state(c, s) + } + } + globalPool.put(s) + } + } + return ack +} + +func (c *UTPConn) makePacket(b *outgoingPacket) *packet { + wnd := window_size * mtu + if c.recvbuf != nil { + wnd = c.recvbuf.space() * mtu + } + id := c.sid + if b.typ == st_syn { + id = c.rid + } + p := globalPool.get() + p.header.typ = b.typ + p.header.ver = version + p.header.id = id + p.header.t = currentMicrosecond() + p.header.diff = c.diff + p.header.wnd = uint32(wnd) + p.header.seq = c.seq + p.header.ack = c.ack + if b.typ == st_fin { + c.eofid = c.seq + } + if !(b.typ == st_state && len(b.payload) == 0) { + c.seq++ + } + p.payload = p.payload[:len(b.payload)] + copy(p.payload, b.payload) + return p +} + +func (c *UTPConn) close() { + if !c.state.closed { + close(c.outchch) + close(c.readchch) + close(c.sendchch) + close(c.recvchch) + close(c.activech) + close(c.finch) + c.closed() + + // Accepted connection + if c.closech != nil { + c.closech <- c.sid + } else { + c.conn.Close() + } + + ulog.Printf(1, "Conn(%v): Closed", c.LocalAddr()) + ulog.Printf(1, "Conn(%v): * SentPackets: %d", c.LocalAddr(), c.stat.sentPackets) + ulog.Printf(1, "Conn(%v): * ResentPackets: %d", c.LocalAddr(), c.stat.resentPackets) + ulog.Printf(1, "Conn(%v): * ReceivedPackets: %d", c.LocalAddr(), c.stat.receivedPackets) + ulog.Printf(1, "Conn(%v): * ReceivedDuplicatedACKs: %d", c.LocalAddr(), c.stat.receivedDuplicatedACKs) + ulog.Printf(1, "Conn(%v): * PacketTimedOuts: %d", c.LocalAddr(), c.stat.packetTimedOuts) + ulog.Printf(1, "Conn(%v): * SentSelectiveACKs: %d", c.LocalAddr(), c.stat.sentSelectiveACKs) + ulog.Printf(1, "Conn(%v): * ReceivedSelectiveACKs: %d", c.LocalAddr(), c.stat.receivedSelectiveACKs) + ulog.Printf(1, "Conn(%v): * AverageRTO: %d", c.LocalAddr(), c.stat.rtoSum/c.stat.rtoCount) + } +} + +func (c *UTPConn) closed() { + ulog.Printf(2, "Conn(%v): Change state: CLOSED", c.LocalAddr()) + c.state = state_closed +} + +func (c *UTPConn) closing() { + ulog.Printf(2, "Conn(%v): Change state: CLOSING", c.LocalAddr()) + c.state = state_closing +} + +func (c *UTPConn) syn_sent() { + ulog.Printf(2, "Conn(%v): Change state: SYN_SENT", c.LocalAddr()) + c.state = state_syn_sent +} + +func (c *UTPConn) connected() { + ulog.Printf(2, "Conn(%v): Change state: CONNECTED", c.LocalAddr()) + c.state = state_connected +} + +func (c *UTPConn) fin_sent() { + ulog.Printf(2, "Conn(%v): Change state: FIN_SENT", c.LocalAddr()) + c.state = state_fin_sent +} + +type state struct { + data func(c *UTPConn, p *packet) + fin func(c *UTPConn, p *packet) + state func(c *UTPConn, p *packet) + exit func(c *UTPConn) + active bool + closed bool +} + +var state_closed state = state{ + closed: true, +} + +var state_closing state = state{ + data: func(c *UTPConn, p *packet) { + select { + case c.readch <- append([]byte(nil), p.payload...): + case <-c.readchch: + } + if c.recvbuf.empty() && c.sendbuf.empty() { + c.close() + } + }, + state: func(c *UTPConn, p *packet) { + if c.recvbuf.empty() && c.sendbuf.empty() { + c.close() + } + }, +} + +var state_syn_sent state = state{ + state: func(c *UTPConn, p *packet) { + c.recvbuf = newPacketBuffer(window_size, int(p.header.seq)) + c.connected() + c.connch <- nil + }, + exit: func(c *UTPConn) { + go func() { + select { + case c.outch <- &outgoingPacket{st_fin, nil, nil}: + case <-c.outchch: + } + }() + c.fin_sent() + }, + active: true, +} + +var state_connected state = state{ + data: func(c *UTPConn, p *packet) { + select { + case c.readch <- append([]byte(nil), p.payload...): + case <-c.readchch: + } + }, + fin: func(c *UTPConn, p *packet) { + if c.recvbuf.empty() && c.sendbuf.empty() { + c.close() + } else { + c.closing() + } + }, + exit: func(c *UTPConn) { + go func() { + select { + case c.outch <- &outgoingPacket{st_fin, nil, nil}: + case <-c.outchch: + } + }() + c.fin_sent() + }, + active: true, +} + +var state_fin_sent state = state{ + state: func(c *UTPConn, p *packet) { + if p.header.ack == c.eofid { + if c.recvbuf.empty() && c.sendbuf.empty() { + c.close() + } else { + c.closing() + } + } + }, +} diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/dial.go b/Godeps/_workspace/src/github.com/h2so5/utp/dial.go new file mode 100644 index 000000000..10e76fab8 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/dial.go @@ -0,0 +1,68 @@ +package utp + +import ( + "errors" + "net" + "time" +) + +func Dial(n, addr string) (*UTPConn, error) { + raddr, err := ResolveUTPAddr(n, addr) + if err != nil { + return nil, err + } + return DialUTP(n, nil, raddr) +} + +func DialUTP(n string, laddr, raddr *UTPAddr) (*UTPConn, error) { + return dial(n, laddr, raddr, 0) +} + +func DialUTPTimeout(n string, laddr, raddr *UTPAddr, timeout time.Duration) (*UTPConn, error) { + return dial(n, laddr, raddr, timeout) +} + +// A Dialer contains options for connecting to an address. +// +// The zero value for each field is equivalent to dialing without +// that option. Dialing with the zero value of Dialer is therefore +// equivalent to just calling the Dial function. +type Dialer struct { + // Timeout is the maximum amount of time a dial will wait for + // a connect to complete. If Deadline is also set, it may fail + // earlier. + // + // The default is no timeout. + // + // With or without a timeout, the operating system may impose + // its own earlier timeout. For instance, TCP timeouts are + // often around 3 minutes. + Timeout time.Duration + + // LocalAddr is the local address to use when dialing an + // address. The address must be of a compatible type for the + // network being dialed. + // If nil, a local address is automatically chosen. + LocalAddr net.Addr +} + +// Dial connects to the address on the named network. +// +// See func Dial for a description of the network and address parameters. +func (d *Dialer) Dial(n, addr string) (*UTPConn, error) { + raddr, err := ResolveUTPAddr(n, addr) + if err != nil { + return nil, err + } + + var laddr *UTPAddr + if d.LocalAddr != nil { + var ok bool + laddr, ok = d.LocalAddr.(*UTPAddr) + if !ok { + return nil, errors.New("Dialer.LocalAddr is not a UTPAddr") + } + } + + return DialUTPTimeout(n, laddr, raddr, d.Timeout) +} diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/listener.go b/Godeps/_workspace/src/github.com/h2so5/utp/listener.go new file mode 100644 index 000000000..25eb0ef0f --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/listener.go @@ -0,0 +1,329 @@ +package utp + +import ( + "errors" + "math" + "math/rand" + "net" + "syscall" + "time" +) + +type UTPListener struct { + // RawConn represents an out-of-band connection. + // This allows a single socket to handle multiple protocols. + RawConn net.PacketConn + + conn net.PacketConn + conns map[uint16]*UTPConn + accept chan (*UTPConn) + err chan (error) + lasterr error + deadline time.Time + closech chan int + connch chan uint16 + closed bool +} + +func Listen(n, laddr string) (*UTPListener, error) { + addr, err := ResolveUTPAddr(n, laddr) + if err != nil { + return nil, err + } + return ListenUTP(n, addr) +} + +func ListenUTP(n string, laddr *UTPAddr) (*UTPListener, error) { + udpnet, err := utp2udp(n) + if err != nil { + return nil, err + } + conn, err := listenPacket(udpnet, laddr.Addr.String()) + if err != nil { + return nil, err + } + + l := UTPListener{ + RawConn: newRawConn(conn), + conn: conn, + conns: make(map[uint16]*UTPConn), + accept: make(chan (*UTPConn), 10), + err: make(chan (error), 1), + closech: make(chan int), + connch: make(chan uint16), + lasterr: nil, + } + + l.listen() + return &l, nil +} + +type incoming struct { + p *packet + addr net.Addr +} + +func (l *UTPListener) listen() { + inch := make(chan incoming) + raw := l.RawConn.(*rawConn) + + // reads udp packets + go func() { + for { + var buf [mtu]byte + len, addr, err := l.conn.ReadFrom(buf[:]) + if err != nil { + l.err <- err + return + } + p, err := readPacket(buf[:len]) + if err == nil { + inch <- incoming{p, addr} + } else { + select { + case <-raw.closed: + default: + i := rawIncoming{b: buf[:len], addr: addr} + select { + case raw.in <- i: + default: + // discard the oldest packet + <-raw.in + raw.in <- i + } + } + } + } + }() + + go func() { + for { + select { + case i := <-inch: + l.processPacket(i.p, i.addr) + case <-l.closech: + ulog.Printf(2, "Listener(%v): Stop listening", l.conn.LocalAddr()) + close(l.accept) + l.closed = true + case id := <-l.connch: + if _, ok := l.conns[id]; !ok { + delete(l.conns, id+1) + ulog.Printf(2, "Listener(%v): Connection closed #%d (alive: %d)", l.conn.LocalAddr(), id, len(l.conns)) + if l.closed && len(l.conns) == 0 { + ulog.Printf(2, "Listener(%v): All accepted connections are closed", l.conn.LocalAddr()) + l.conn.Close() + ulog.Printf(1, "Listener(%v): Closed", l.conn.LocalAddr()) + return + } + } + } + } + }() + + ulog.Printf(1, "Listener(%v): Start listening", l.conn.LocalAddr()) +} + +func listenPacket(n, addr string) (net.PacketConn, error) { + if n == "mem" { + return nil, errors.New("TODO implement in-memory packet connection") + } + return net.ListenPacket(n, addr) +} + +func (l *UTPListener) processPacket(p *packet, addr net.Addr) { + switch p.header.typ { + case st_data, st_fin, st_state, st_reset: + if c, ok := l.conns[p.header.id]; ok { + select { + case c.recvch <- p: + case <-c.recvchch: + } + } + case st_syn: + if l.closed { + return + } + sid := p.header.id + 1 + if _, ok := l.conns[p.header.id]; !ok { + seq := rand.Intn(math.MaxUint16) + + c := newUTPConn() + c.conn = l.conn + c.raddr = addr + c.rid = p.header.id + 1 + c.sid = p.header.id + c.seq = uint16(seq) + c.ack = p.header.seq + c.diff = currentMicrosecond() - p.header.t + c.state = state_connected + c.closech = l.connch + c.recvbuf = newPacketBuffer(window_size, int(p.header.seq)) + c.sendbuf = newPacketBuffer(window_size, seq) + + go c.loop() + select { + case c.recvch <- p: + case <-c.recvchch: + } + + l.conns[sid] = c + ulog.Printf(2, "Listener(%v): New incoming connection #%d from %v (alive: %d)", l.conn.LocalAddr(), sid, addr, len(l.conns)) + + l.accept <- c + } + } +} + +func (l *UTPListener) Accept() (net.Conn, error) { + return l.AcceptUTP() +} + +func (l *UTPListener) AcceptUTP() (*UTPConn, error) { + if l == nil || l.conn == nil { + return nil, syscall.EINVAL + } + if l.lasterr != nil { + return nil, l.lasterr + } + var timeout <-chan time.Time + if !l.deadline.IsZero() { + timeout = time.After(l.deadline.Sub(time.Now())) + } + select { + case conn := <-l.accept: + if conn == nil { + return nil, errors.New("use of closed network connection") + } + return conn, nil + case err := <-l.err: + l.lasterr = err + return nil, err + case <-timeout: + return nil, &timeoutError{} + } +} + +func (l *UTPListener) Addr() net.Addr { + return &UTPAddr{Addr: l.conn.LocalAddr()} +} + +func (l *UTPListener) Close() error { + if l == nil || l.conn == nil { + return syscall.EINVAL + } + l.closech <- 0 + l.RawConn.Close() + return nil +} + +func (l *UTPListener) SetDeadline(t time.Time) error { + if l == nil || l.conn == nil { + return syscall.EINVAL + } + l.deadline = t + return nil +} + +type rawIncoming struct { + b []byte + addr net.Addr +} + +type rawConn struct { + conn net.PacketConn + rdeadline, wdeadline time.Time + in chan rawIncoming + closed chan int +} + +func newRawConn(conn net.PacketConn) *rawConn { + return &rawConn{ + conn: conn, + in: make(chan rawIncoming, 100), + closed: make(chan int), + } +} + +func (c *rawConn) ok() bool { return c != nil && c.conn != nil } + +func (c *rawConn) ReadFrom(b []byte) (n int, addr net.Addr, err error) { + if !c.ok() { + return 0, nil, syscall.EINVAL + } + select { + case <-c.closed: + return 0, nil, errors.New("use of closed network connection") + default: + } + var timeout <-chan time.Time + if !c.rdeadline.IsZero() { + timeout = time.After(c.rdeadline.Sub(time.Now())) + } + select { + case r := <-c.in: + return copy(b, r.b), r.addr, nil + case <-timeout: + return 0, nil, &timeoutError{} + } +} + +func (c *rawConn) WriteTo(b []byte, addr net.Addr) (n int, err error) { + if !c.ok() { + return 0, syscall.EINVAL + } + select { + case <-c.closed: + return 0, errors.New("use of closed network connection") + default: + } + return c.conn.WriteTo(b, addr) +} + +func (c *rawConn) Close() error { + if !c.ok() { + return syscall.EINVAL + } + select { + case <-c.closed: + return errors.New("use of closed network connection") + default: + close(c.closed) + } + return nil +} + +func (c *rawConn) LocalAddr() net.Addr { + if !c.ok() { + return nil + } + return c.conn.LocalAddr() +} + +func (c *rawConn) SetDeadline(t time.Time) error { + if !c.ok() { + return syscall.EINVAL + } + if err := c.SetReadDeadline(t); err != nil { + return err + } + if err := c.SetWriteDeadline(t); err != nil { + return err + } + return nil +} + +func (c *rawConn) SetReadDeadline(t time.Time) error { + if !c.ok() { + return syscall.EINVAL + } + c.rdeadline = t + return nil +} + +func (c *rawConn) SetWriteDeadline(t time.Time) error { + if !c.ok() { + return syscall.EINVAL + } + c.wdeadline = t + return nil +} diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/log.go b/Godeps/_workspace/src/github.com/h2so5/utp/log.go new file mode 100644 index 000000000..dbf4ddcd3 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/log.go @@ -0,0 +1,50 @@ +package utp + +import ( + "log" + "os" + "strconv" +) + +type logger struct { + level int +} + +var ulog *logger + +func init() { + logenv := os.Getenv("GO_UTP_LOGGING") + + var level int + if len(logenv) > 0 { + l, err := strconv.Atoi(logenv) + if err != nil { + log.Print("warning: GO_UTP_LOGGING must be numeric") + } else { + level = l + } + } + + ulog = &logger{level} +} + +func (l *logger) Print(level int, v ...interface{}) { + if l.level < level { + return + } + log.Print(v...) +} + +func (l *logger) Printf(level int, format string, v ...interface{}) { + if l.level < level { + return + } + log.Printf(format, v...) +} + +func (l *logger) Println(level int, v ...interface{}) { + if l.level < level { + return + } + log.Println(v...) +} diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/packet.go b/Godeps/_workspace/src/github.com/h2so5/utp/packet.go new file mode 100644 index 000000000..b23cac108 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/packet.go @@ -0,0 +1,240 @@ +package utp + +import ( + "bytes" + "encoding/binary" + "fmt" + "io" + "sync" +) + +type header struct { + typ, ver int + id uint16 + t, diff, wnd uint32 + seq, ack uint16 +} + +type extension struct { + typ int + payload []byte +} + +type packet struct { + header header + ext []extension + payload []byte +} + +type outgoingPacket struct { + typ int + ext []extension + payload []byte +} + +func (p *packet) MarshalBinary() ([]byte, error) { + firstExt := ext_none + if len(p.ext) > 0 { + firstExt = p.ext[0].typ + } + buf := new(bytes.Buffer) + var beforeExt = []interface{}{ + // | type | ver | + uint8(((byte(p.header.typ) << 4) & 0xF0) | (byte(p.header.ver) & 0xF)), + // | extension | + uint8(firstExt), + } + var afterExt = []interface{}{ + // | connection_id | + uint16(p.header.id), + // | timestamp_microseconds | + uint32(p.header.t), + // | timestamp_difference_microseconds | + uint32(p.header.diff), + // | wnd_size | + uint32(p.header.wnd), + // | seq_nr | + uint16(p.header.seq), + // | ack_nr | + uint16(p.header.ack), + } + + for _, v := range beforeExt { + err := binary.Write(buf, binary.BigEndian, v) + if err != nil { + return nil, err + } + } + + if len(p.ext) > 0 { + for i, e := range p.ext { + next := ext_none + if i < len(p.ext)-1 { + next = p.ext[i+1].typ + } + var ext = []interface{}{ + // | extension | + uint8(next), + // | len | + uint8(len(e.payload)), + } + for _, v := range ext { + err := binary.Write(buf, binary.BigEndian, v) + if err != nil { + return nil, err + } + } + _, err := buf.Write(e.payload) + if err != nil { + return nil, err + } + } + } + + for _, v := range afterExt { + err := binary.Write(buf, binary.BigEndian, v) + if err != nil { + return nil, err + } + } + + _, err := buf.Write(p.payload) + if err != nil { + return nil, err + } + return buf.Bytes(), nil +} + +func (p *packet) UnmarshalBinary(data []byte) error { + p.ext = nil + buf := bytes.NewReader(data) + var tv, e uint8 + + var beforeExt = []interface{}{ + // | type | ver | + (*uint8)(&tv), + // | extension | + (*uint8)(&e), + } + for _, v := range beforeExt { + err := binary.Read(buf, binary.BigEndian, v) + if err != nil { + return err + } + } + + for e != ext_none { + currentExt := int(e) + var l uint8 + var ext = []interface{}{ + // | extension | + (*uint8)(&e), + // | len | + (*uint8)(&l), + } + for _, v := range ext { + err := binary.Read(buf, binary.BigEndian, v) + if err != nil { + return err + } + } + payload := make([]byte, l) + size, err := buf.Read(payload[:]) + if err != nil { + return err + } + if size != len(payload) { + return io.EOF + } + p.ext = append(p.ext, extension{typ: currentExt, payload: payload}) + } + + var afterExt = []interface{}{ + // | connection_id | + (*uint16)(&p.header.id), + // | timestamp_microseconds | + (*uint32)(&p.header.t), + // | timestamp_difference_microseconds | + (*uint32)(&p.header.diff), + // | wnd_size | + (*uint32)(&p.header.wnd), + // | seq_nr | + (*uint16)(&p.header.seq), + // | ack_nr | + (*uint16)(&p.header.ack), + } + for _, v := range afterExt { + err := binary.Read(buf, binary.BigEndian, v) + if err != nil { + return err + } + } + + p.header.typ = int((tv >> 4) & 0xF) + p.header.ver = int(tv & 0xF) + + l := buf.Len() + if l > 0 { + p.payload = p.payload[:l] + _, err := buf.Read(p.payload[:]) + if err != nil { + return err + } + } + + return nil +} + +func (p packet) String() string { + var s string = fmt.Sprintf("[%d ", p.header.id) + switch p.header.typ { + case st_data: + s += "ST_DATA" + case st_fin: + s += "ST_FIN" + case st_state: + s += "ST_STATE" + case st_reset: + s += "ST_RESET" + case st_syn: + s += "ST_SYN" + } + s += fmt.Sprintf(" seq:%d ack:%d len:%d", p.header.seq, p.header.ack, len(p.payload)) + s += "]" + return s +} + +var globalPool packetPool + +type packetPool struct { + root *packetPoolNode + mutex sync.Mutex +} + +type packetPoolNode struct { + p *packet + next *packetPoolNode +} + +func (o *packetPool) get() *packet { + o.mutex.Lock() + defer o.mutex.Unlock() + r := o.root + if r != nil { + o.root = o.root.next + return r.p + } else { + return &packet{ + payload: make([]byte, 0, mss), + } + } +} + +func (o *packetPool) put(p *packet) { + o.mutex.Lock() + defer o.mutex.Unlock() + o.root = &packetPoolNode{ + p: p, + next: o.root, + } +} diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/ucat/.gitignore b/Godeps/_workspace/src/github.com/h2so5/utp/ucat/.gitignore new file mode 100644 index 000000000..aab84c12d --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/ucat/.gitignore @@ -0,0 +1,3 @@ +ucat +random +.trash/ diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/ucat/Makefile b/Godeps/_workspace/src/github.com/h2so5/utp/ucat/Makefile new file mode 100644 index 000000000..6009f77a2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/ucat/Makefile @@ -0,0 +1,34 @@ +# Run tests + +testnames=simple +tests=$(addprefix test_, $(testnames)) +trash=.trash/ + +all: ucat + +test: clean ucat ${tests} + @echo ${tests} + @echo "*** tests passed ***" + +# not sue why this doesn't work: +# test_%: test_%.sh +test_simple: test_simple.sh + mkdir -p ${trash} + @echo "*** running $@ ***" + ./$@.sh + +clean: + @echo "*** $@ ***" + -rm -r ${trash} + +deps: random ucat + +ucat: + go build + +random: + @echo "*** installing $@ ***" + go get github.com/jbenet/go-random/random + go build -o random github.com/jbenet/go-random/random + +.PHONY: clean ucat ${tests} diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/ucat/test_simple.sh b/Godeps/_workspace/src/github.com/h2so5/utp/ucat/test_simple.sh new file mode 100644 index 000000000..5e7842ca2 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/ucat/test_simple.sh @@ -0,0 +1,49 @@ +#!/bin/sh + +set -e # exit on error +# set -v # verbose + +log() { + echo "--> $1" +} + +test_send() { + file=$1_ + count=$2 + addr=localhost:8765 + + # generate random data + log "generating $count bytes of random data" + ./random $count $RANDOM > ${file}expected + + # dialer sends + log "sending from dialer" + ./ucat -v $addr 2>&1 <${file}expected | sed "s/^/ dialer1: /" & + ./ucat -v -l $addr 2>&1 >${file}actual1 | sed "s/^/listener1: /" + diff ${file}expected ${file}actual1 + if test $? != 0; then + log "sending from dialer failed. compare with:\n" + log "diff ${file}expected ${file}actual1" + exit 1 + fi + + # listener sends + log "sending from listener" + ./ucat -v -l $addr 2>&1 <${file}expected | sed "s/^/listener2: /" & + ./ucat -v $addr 2>&1 >${file}actual2 | sed "s/^/ dialer2: /" + diff ${file}expected ${file}actual2 + if test $? != 0; then + log "sending from listener failed. compare with:\n" + log "diff ${file}expected ${file}actual2" + exit 1 + fi + + echo rm ${file}{expected,actual1,actual2} + rm ${file}{expected,actual1,actual2} + return 0 +} + + +test_send ".trash/1KB" 1024 +test_send ".trash/1MB" 1048576 +test_send ".trash/1GB" 1073741824 diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/ucat/ucat.go b/Godeps/_workspace/src/github.com/h2so5/utp/ucat/ucat.go new file mode 100644 index 000000000..4382f6e0c --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/ucat/ucat.go @@ -0,0 +1,188 @@ +// package ucat provides an implementation of netcat using the go utp package. +// It is meant to exercise the utp implementation. +// Usage: +// ucat [] +// ucat -l +// +// Address format is: [host]:port +// +// Note that uTP's congestion control gives priority to tcp flows (web traffic), +// so you could use this ucat tool to transfer massive files without hogging +// all the bandwidth. +package main + +import ( + "flag" + "fmt" + "io" + "net" + "os" + "os/signal" + "syscall" + + utp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/h2so5/utp" +) + +var verbose = false + +// Usage prints out the usage of this module. +// Assumes flags use go stdlib flag pacakage. +var Usage = func() { + text := `ucat - uTP netcat in Go + +Usage: + + listen: %s [] + dial: %s -l + +Address format is Go's: [host]:port +` + + fmt.Fprintf(os.Stderr, text, os.Args[0], os.Args[0]) + flag.PrintDefaults() +} + +type args struct { + listen bool + verbose bool + localAddr string + remoteAddr string +} + +func parseArgs() args { + var a args + + // setup + parse flags + flag.BoolVar(&a.listen, "listen", false, "listen for connections") + flag.BoolVar(&a.listen, "l", false, "listen for connections (short)") + flag.BoolVar(&a.verbose, "v", false, "verbose debugging") + flag.Usage = Usage + flag.Parse() + osArgs := flag.Args() + + if len(osArgs) < 1 { + exit("") + } + + if a.listen { + a.localAddr = osArgs[0] + } else { + if len(osArgs) > 1 { + a.localAddr = osArgs[0] + a.remoteAddr = osArgs[1] + } else { + a.remoteAddr = osArgs[0] + } + } + + return a +} + +func main() { + args := parseArgs() + verbose = args.verbose + + var err error + if args.listen { + err = Listen(args.localAddr) + } else { + err = Dial(args.localAddr, args.remoteAddr) + } + + if err != nil { + exit("%s", err) + } +} + +func exit(format string, vals ...interface{}) { + if format != "" { + fmt.Fprintf(os.Stderr, "ucat error: "+format+"\n", vals...) + } + Usage() + os.Exit(1) +} + +func log(format string, vals ...interface{}) { + if verbose { + fmt.Fprintf(os.Stderr, "ucat log: "+format+"\n", vals...) + } +} + +// Listen listens and accepts one incoming uTP connection on a given port, +// and pipes all incoming data to os.Stdout. +func Listen(localAddr string) error { + l, err := utp.Listen("utp", localAddr) + if err != nil { + return err + } + log("listening at %s", l.Addr()) + + c, err := l.Accept() + if err != nil { + return err + } + log("accepted connection from %s", c.RemoteAddr()) + + // should be able to close listener here, but utp.Listener.Close + // closes all open connections. + defer l.Close() + + netcat(c) + return c.Close() +} + +// Dial connects to a remote address and pipes all os.Stdin to the remote end. +// If localAddr is set, uses it to Dial from. +func Dial(localAddr, remoteAddr string) error { + + var laddr net.Addr + var err error + if localAddr != "" { + laddr, err = utp.ResolveUTPAddr("utp", localAddr) + if err != nil { + return fmt.Errorf("failed to resolve address %s", localAddr) + } + } + + if laddr != nil { + log("dialing %s from %s", remoteAddr, laddr) + } else { + log("dialing %s", remoteAddr) + } + + d := utp.Dialer{LocalAddr: laddr} + c, err := d.Dial("utp", remoteAddr) + if err != nil { + return err + } + log("connected to %s", c.RemoteAddr()) + + netcat(c) + return c.Close() +} + +func netcat(c net.Conn) { + log("piping stdio to connection") + + done := make(chan struct{}) + + go func() { + n, _ := io.Copy(c, os.Stdin) + log("sent %d bytes", n) + done <- struct{}{} + }() + go func() { + n, _ := io.Copy(os.Stdout, c) + log("received %d bytes", n) + done <- struct{}{} + }() + + // wait until we exit. + sigc := make(chan os.Signal, 1) + signal.Notify(sigc, syscall.SIGHUP, syscall.SIGINT, + syscall.SIGTERM, syscall.SIGQUIT) + select { + case <-done: + case <-sigc: + } +} diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/utp.go b/Godeps/_workspace/src/github.com/h2so5/utp/utp.go new file mode 100644 index 000000000..5a92bae64 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/utp.go @@ -0,0 +1,29 @@ +package utp + +import "time" + +const ( + version = 1 + + st_data = 0 + st_fin = 1 + st_state = 2 + st_reset = 3 + st_syn = 4 + + ext_none = 0 + ext_selective_ack = 1 + + header_size = 20 + mtu = 3200 + mss = mtu - header_size + window_size = 100 + + reset_timeout = time.Second +) + +type timeoutError struct{} + +func (e *timeoutError) Error() string { return "i/o timeout" } +func (e *timeoutError) Timeout() bool { return true } +func (e *timeoutError) Temporary() bool { return true } diff --git a/Godeps/_workspace/src/github.com/h2so5/utp/utp_test.go b/Godeps/_workspace/src/github.com/h2so5/utp/utp_test.go new file mode 100644 index 000000000..7e4f11a79 --- /dev/null +++ b/Godeps/_workspace/src/github.com/h2so5/utp/utp_test.go @@ -0,0 +1,601 @@ +package utp + +import ( + "bytes" + "io" + "io/ioutil" + "math" + "math/rand" + "net" + "reflect" + "testing" + "time" +) + +func init() { + rand.Seed(time.Now().Unix()) +} + +func TestReadWrite(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + raddr, err := ResolveUTPAddr("utp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + + c, err := DialUTPTimeout("utp", nil, raddr, 1000*time.Millisecond) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + err = ln.SetDeadline(time.Now().Add(1000 * time.Millisecond)) + if err != nil { + t.Fatal(err) + } + + s, err := ln.Accept() + if err != nil { + t.Fatal(err) + } + ln.Close() + + payload := []byte("Hello!") + _, err = c.Write(payload) + if err != nil { + t.Fatal(err) + } + + err = s.SetDeadline(time.Now().Add(1000 * time.Millisecond)) + if err != nil { + t.Fatal(err) + } + + var buf [256]byte + l, err := s.Read(buf[:]) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(payload, buf[:l]) { + t.Errorf("expected payload of %v; got %v", payload, buf[:l]) + } + + payload2 := []byte("World!") + _, err = s.Write(payload2) + if err != nil { + t.Fatal(err) + } + + err = c.SetDeadline(time.Now().Add(1000 * time.Millisecond)) + if err != nil { + t.Fatal(err) + } + + l, err = c.Read(buf[:]) + if err != nil { + t.Fatal(err) + } + + if !bytes.Equal(payload2, buf[:l]) { + t.Errorf("expected payload of %v; got %v", payload2, buf[:l]) + } +} + +func TestRawReadWrite(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + raddr, err := net.ResolveUDPAddr("udp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + + c, err := net.DialUDP("udp", nil, raddr) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + payload := []byte("Hello!") + _, err = c.Write(payload) + if err != nil { + t.Fatal(err) + } + + var buf [256]byte + n, addr, err := ln.RawConn.ReadFrom(buf[:]) + if !bytes.Equal(payload, buf[:n]) { + t.Errorf("expected payload of %v; got %v", payload, buf[:n]) + } + if addr.String() != c.LocalAddr().String() { + t.Errorf("expected addr of %v; got %v", c.LocalAddr(), addr.String()) + } +} + +func TestLongReadWriteC2S(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + raddr, err := ResolveUTPAddr("utp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + + c, err := DialUTPTimeout("utp", nil, raddr, 1000*time.Millisecond) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + err = ln.SetDeadline(time.Now().Add(1000 * time.Millisecond)) + if err != nil { + t.Fatal(err) + } + + s, err := ln.Accept() + if err != nil { + t.Fatal(err) + } + defer s.Close() + ln.Close() + + var payload [10485760]byte + for i := range payload { + payload[i] = byte(rand.Int()) + } + + rch := make(chan []byte) + ech := make(chan error, 2) + + go func() { + defer c.Close() + _, err := c.Write(payload[:]) + if err != nil { + ech <- err + } + }() + + go func() { + b, err := ioutil.ReadAll(s) + if err != nil { + ech <- err + rch <- nil + } else { + ech <- nil + rch <- b + } + }() + + err = <-ech + if err != nil { + t.Fatal(err) + } + + r := <-rch + if r == nil { + return + } + + if !bytes.Equal(r, payload[:]) { + t.Errorf("expected payload of %d; got %d", len(payload[:]), len(r)) + } +} + +func TestLongReadWriteS2C(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + + raddr, err := ResolveUTPAddr("utp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + + c, err := DialUTPTimeout("utp", nil, raddr, 1000*time.Millisecond) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + err = ln.SetDeadline(time.Now().Add(1000 * time.Millisecond)) + if err != nil { + t.Fatal(err) + } + + s, err := ln.Accept() + if err != nil { + t.Fatal(err) + } + defer s.Close() + ln.Close() + + var payload [10485760]byte + for i := range payload { + payload[i] = byte(rand.Int()) + } + + rch := make(chan []byte) + ech := make(chan error, 2) + + go func() { + defer s.Close() + _, err := s.Write(payload[:]) + if err != nil { + ech <- err + } + }() + + go func() { + b, err := ioutil.ReadAll(c) + if err != nil { + ech <- err + rch <- nil + } else { + ech <- nil + rch <- b + } + }() + + err = <-ech + if err != nil { + t.Fatal(err) + } + + r := <-rch + if r == nil { + return + } + + if !bytes.Equal(r, payload[:]) { + t.Errorf("expected payload of %d; got %d", len(payload[:]), len(r)) + } +} + +func TestAccept(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + c, err := DialUTPTimeout("utp", nil, ln.Addr().(*UTPAddr), 200*time.Millisecond) + if err != nil { + t.Fatal(err) + } + defer c.Close() + + err = ln.SetDeadline(time.Now().Add(100 * time.Millisecond)) + _, err = ln.Accept() + if err != nil { + t.Fatal(err) + } +} + +func TestAcceptDeadline(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + err = ln.SetDeadline(time.Now().Add(time.Millisecond)) + _, err = ln.Accept() + if err == nil { + t.Fatal("Accept should failed") + } +} + +func TestAcceptClosedListener(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + err = ln.Close() + if err != nil { + t.Fatal(err) + } + _, err = ln.Accept() + if err == nil { + t.Fatal("Accept should failed") + } + _, err = ln.Accept() + if err == nil { + t.Fatal("Accept should failed") + } +} + +func TestDialer(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + d := Dialer{} + c, err := d.Dial("utp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + defer c.Close() +} + +func TestDialerAddrs(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + laddr, err := ResolveUTPAddr("utp", "127.0.0.1:45678") + if err != nil { + t.Fatal(err) + } + + d := Dialer{LocalAddr: laddr} + c1, err := d.Dial("utp", ln.Addr().String()) + if err != nil { + t.Fatal(err) + } + defer c1.Close() + + c2, err := ln.Accept() + if err != nil { + t.Fatal(err) + } + defer c2.Close() + + eq := func(a, b net.Addr) bool { + return a.String() == b.String() + } + + if !eq(d.LocalAddr, c2.RemoteAddr()) { + t.Fatal("dialer.LocalAddr not equal to c2.RemoteAddr ") + } + if !eq(c1.LocalAddr(), c2.RemoteAddr()) { + t.Fatal("c1.LocalAddr not equal to c2.RemoteAddr ") + } + if !eq(c2.LocalAddr(), c1.RemoteAddr()) { + t.Fatal("c2.LocalAddr not equal to c1.RemoteAddr ") + } +} + +func TestDialerTimeout(t *testing.T) { + timeout := time.Millisecond * 200 + d := Dialer{Timeout: timeout} + done := make(chan struct{}) + + go func() { + _, err := d.Dial("utp", "127.0.0.1:34567") + if err == nil { + t.Fatal("should not connect") + } + done <- struct{}{} + }() + + select { + case <-time.After(timeout * 2): + t.Fatal("should have ended already") + case <-done: + } +} + +func TestPacketBinary(t *testing.T) { + h := header{ + typ: st_fin, + ver: version, + id: 100, + t: 50000, + diff: 10000, + wnd: 65535, + seq: 100, + ack: 200, + } + + e := []extension{ + extension{ + typ: ext_selective_ack, + payload: []byte{0, 1, 0, 1}, + }, + extension{ + typ: ext_selective_ack, + payload: []byte{100, 0, 200, 0}, + }, + } + + p := packet{ + header: h, + ext: e, + payload: []byte("abcdefg"), + } + + b, err := p.MarshalBinary() + if err != nil { + t.Fatal(err) + } + + p2 := packet{payload: make([]byte, 0, mss)} + err = p2.UnmarshalBinary(b) + if err != nil { + t.Fatal(err) + } + + if !reflect.DeepEqual(p, p2) { + t.Errorf("expected packet of %v; got %v", p, p2) + } +} + +func TestUnmarshalShortPacket(t *testing.T) { + b := make([]byte, 18) + p := packet{} + err := p.UnmarshalBinary(b) + + if err == nil { + t.Fatal("UnmarshalBinary should fail") + } else if err != io.EOF { + t.Fatal(err) + } +} + +func TestWriteOnClosedChannel(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + c, err := DialUTPTimeout("utp", nil, ln.Addr().(*UTPAddr), 200*time.Millisecond) + if err != nil { + t.Fatal(err) + } + + go func() { + for { + _, err := c.Write([]byte{100}) + if err != nil { + return + } + } + }() + + c.Close() +} + +func TestReadOnClosedChannel(t *testing.T) { + ln, err := Listen("utp", "127.0.0.1:0") + if err != nil { + t.Fatal(err) + } + defer ln.Close() + + c, err := DialUTPTimeout("utp", nil, ln.Addr().(*UTPAddr), 200*time.Millisecond) + if err != nil { + t.Fatal(err) + } + + go func() { + for { + var buf [16]byte + _, err := c.Read(buf[:]) + if err != nil { + return + } + } + }() + + c.Close() +} + +func TestPacketBuffer(t *testing.T) { + size := 12 + b := newPacketBuffer(12, 1) + + if b.space() != size { + t.Errorf("expected space == %d; got %d", size, b.space()) + } + + for i := 1; i <= size; i++ { + b.push(&packet{header: header{seq: uint16(i)}}) + } + + if b.space() != 0 { + t.Errorf("expected space == 0; got %d", b.space()) + } + + a := []byte{255, 7} + ack := b.generateSelectiveACK() + if !bytes.Equal(a, ack) { + t.Errorf("expected ack == %v; got %v", a, ack) + } + + err := b.push(&packet{header: header{seq: 15}}) + if err == nil { + t.Fatal("push should fail") + } + + all := b.all() + if len(all) != size { + t.Errorf("expected %d packets sequence; got %d", size, len(all)) + } + + f := b.fetch(6) + if f == nil { + t.Fatal("fetch should not fail") + } + + b.compact() + + err = b.push(&packet{header: header{seq: 15}}) + if err != nil { + t.Fatal(err) + } + + err = b.push(&packet{header: header{seq: 17}}) + if err != nil { + t.Fatal(err) + } + + for i := 7; i <= size; i++ { + f := b.fetch(uint16(i)) + if f == nil { + t.Fatal("fetch should not fail") + } + } + + a = []byte{128, 2} + ack = b.generateSelectiveACK() + if !bytes.Equal(a, ack) { + t.Errorf("expected ack == %v; got %v", a, ack) + } + + all = b.all() + if len(all) != 2 { + t.Errorf("expected 2 packets sequence; got %d", len(all)) + } + + b.compact() + if b.space() != 9 { + t.Errorf("expected space == 9; got %d", b.space()) + } + + ack = b.generateSelectiveACK() + b.processSelectiveACK(ack) + + all = b.all() + if len(all) != 1 { + t.Errorf("expected size == 1; got %d", len(all)) + } +} + +func TestPacketBufferBoundary(t *testing.T) { + begin := math.MaxUint16 - 3 + b := newPacketBuffer(12, begin) + for i := begin; i != 5; i = (i + 1) % (math.MaxUint16 + 1) { + err := b.push(&packet{header: header{seq: uint16(i)}}) + if err != nil { + t.Fatal(err) + } + } +} + +func TestTimedBufferNode(t *testing.T) { + b := timedBuffer{d: time.Millisecond * 100} + b.push(100) + b.push(200) + time.Sleep(time.Millisecond * 200) + b.push(300) + b.push(400) + m := b.min() + if m != 300 { + t.Errorf("expected min == 300; got %d", m) + } +} diff --git a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/convert.go b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/convert.go index 18a8f79e5..05f00a66c 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/convert.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/convert.go @@ -5,7 +5,7 @@ import ( "net" "strings" - utp "github.com/h2so5/utp" + utp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/h2so5/utp" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) @@ -154,6 +154,7 @@ func DialArgs(m ma.Multiaddr) (string, string, error) { if parts[2] == "udp" && len(parts) > 4 && parts[4] == "utp" { network = parts[4] } + var host string switch parts[0] { case "ip4": diff --git a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/convert_test.go b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/convert_test.go index bd0974aa1..de4b338b1 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/convert_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/convert_test.go @@ -4,7 +4,7 @@ import ( "net" "testing" - utp "github.com/h2so5/utp" + utp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/h2so5/utp" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) diff --git a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/net.go b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/net.go index a585d3650..3e19dba38 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/net.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/net.go @@ -4,7 +4,7 @@ import ( "fmt" "net" - utp "github.com/h2so5/utp" + utp "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/h2so5/utp" ma "github.com/jbenet/go-ipfs/Godeps/_workspace/src/github.com/jbenet/go-multiaddr" ) @@ -226,14 +226,11 @@ func Listen(laddr ma.Multiaddr) (Listener, error) { switch lnet { case "utp": nl, err = utp.Listen(lnet, lnaddr) - if err != nil { - return nil, err - } - case "tcp": + default: nl, err = net.Listen(lnet, lnaddr) - if err != nil { - return nil, err - } + } + if err != nil { + return nil, err } return &maListener{ diff --git a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/net_test.go b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/net_test.go index 91d5d7e6e..f87dd837e 100644 --- a/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/net_test.go +++ b/Godeps/_workspace/src/github.com/jbenet/go-multiaddr-net/net_test.go @@ -138,6 +138,36 @@ func TestListen(t *testing.T) { wg.Wait() } +func TestListenAddrs(t *testing.T) { + + test := func(addr string, succeed bool) { + + maddr := newMultiaddr(t, addr) + l, err := Listen(maddr) + if !succeed { + if err == nil { + t.Fatal("succeeded in listening", addr) + } + return + } + if succeed && err != nil { + t.Fatal("failed to listen", addr, err) + } + if l == nil { + t.Fatal("failed to listen", addr, succeed, err) + } + + if err = l.Close(); err != nil { + t.Fatal("failed to close listener", addr, err) + } + } + + test("/ip4/127.0.0.1/tcp/4324", true) + test("/ip4/127.0.0.1/udp/4325", false) + test("/ip4/127.0.0.1/udp/4326/udt", false) + test("/ip4/127.0.0.1/udp/4326/utp", true) +} + func TestListenAndDial(t *testing.T) { maddr := newMultiaddr(t, "/ip4/127.0.0.1/tcp/4323")