1
0
mirror of https://github.com/ipfs/kubo.git synced 2025-08-06 11:31:54 +08:00

handler muxer

This commit is contained in:
Juan Batiz-Benet
2014-12-16 02:56:11 -08:00
parent 6bc26f1752
commit 9c11970554
2 changed files with 170 additions and 0 deletions

105
net/mux2/mux.go Normal file
View File

@ -0,0 +1,105 @@
package mux
import (
"errors"
"fmt"
"io"
context "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/go.net/context"
swarm "github.com/jbenet/go-ipfs/net/swarm2"
eventlog "github.com/jbenet/go-ipfs/util/eventlog"
lgbl "github.com/jbenet/go-ipfs/util/eventlog/loggables"
)
var log = eventlog.Logger("mux2")
// Mux provides simple stream multixplexing.
// It helps you precisely when:
// * You have many streams
// * You have function handlers
//
// We use a totally ad-hoc encoding:
//
// <1 byte length in bytes><string name>
//
// So "bitswap" is 0x0762697473776170
//
// NOTE: only the dialer specifies this muxing line.
// This is because we're using Streams :)
//
// WARNING: this datastructure IS NOT threadsafe.
// do not modify it once it's begun serving.
type Mux struct {
Default StreamHandler
Handlers map[string]StreamHandler
}
type StreamHandler func(s *swarm.Stream)
// NextName reads the stream and returns the next protocol name
// according to the muxer encoding.
func (m *Mux) NextName(s io.Reader) (string, error) {
// c-string identifier
// the first byte is our length
l := make([]byte, 1)
if _, err := io.ReadFull(s, l); err != nil {
return "", err
}
length := int(l[0])
// the next are our identifier
name := make([]byte, length)
if _, err := io.ReadFull(s, name); err != nil {
return "", err
}
return string(name), nil
}
// NextHandler reads the stream and returns the next Handler function
// according to the muxer encoding.
func (m *Mux) NextHandler(s io.Reader) (string, StreamHandler, error) {
name, err := m.NextName(s)
if err != nil {
return "", nil, err
}
h, found := m.Handlers[name]
if !found {
if m.Default == nil {
return name, nil, errors.New("no handler with name: " + name)
}
return name, m.Default, nil
}
return name, h, nil
}
// Handle reads the next name off the Stream, and calls a function
func (m *Mux) Handle(s *swarm.Stream) {
ctx := context.Background()
name, handler, err := m.NextHandler(s)
if err != nil {
err = fmt.Errorf("protocol mux error: %s", err)
log.Error(err)
log.Event(ctx, "muxError", lgbl.Error(err))
return
}
log.Info("muxer handle protocol: %s", name)
log.Event(ctx, "muxHandle", eventlog.Metadata{"protocol": name})
handler(s)
}
// Write writes the name into Writer with a length-byte-prefix.
func Write(w io.Writer, name string) error {
s := make([]byte, len(name)+1)
s[0] = byte(len(name))
copy(s[1:], []byte(name))
_, err := w.Write(s)
return err
}

65
net/mux2/mux_test.go Normal file
View File

@ -0,0 +1,65 @@
package mux
import (
"bytes"
"testing"
swarm "github.com/jbenet/go-ipfs/net/swarm2"
)
var testCases = map[string]string{
"bitswap": "\u0007bitswap",
"dht": "\u0003dht",
"ipfs": "\u0004ipfs",
"ipfsdksnafkasnfkdajfkdajfdsjadosiaaodjasofdias": ".ipfsdksnafkasnfkdajfkdajfdsjadosiaaodjasofdias",
}
func TestWrite(t *testing.T) {
for k, v := range testCases {
var buf bytes.Buffer
Write(&buf, k)
v2 := buf.Bytes()
if !bytes.Equal(v2, []byte(v)) {
t.Errorf("failed: %s - %v != %v", k, []byte(v), v2)
}
}
}
func TestHandler(t *testing.T) {
outs := make(chan string, 10)
h := func(n string) func(s *swarm.Stream) {
return func(s *swarm.Stream) {
outs <- n
}
}
m := Mux{Handlers: map[string]StreamHandler{}}
m.Default = h("default")
m.Handlers["dht"] = h("bitswap")
// m.Handlers["ipfs"] = h("bitswap") // default!
m.Handlers["bitswap"] = h("bitswap")
m.Handlers["ipfsdksnafkasnfkdajfkdajfdsjadosiaaodjasofdias"] = h("bitswap")
for k, v := range testCases {
var buf bytes.Buffer
if _, err := buf.Write([]byte(v)); err != nil {
t.Error(err)
continue
}
name, _, err := m.NextHandler(&buf)
if err != nil {
t.Error(err)
continue
}
if name != k {
t.Errorf("name mismatch: %s != %s", k, name)
continue
}
}
}