mirror of
https://github.com/ipfs/kubo.git
synced 2025-06-29 17:36:38 +08:00
feat(net:message) get net package from e2430ae4279
fix(net:msg) use vendored imports
This commit is contained in:
@ -6,38 +6,52 @@ import (
|
|||||||
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
proto "github.com/jbenet/go-ipfs/Godeps/_workspace/src/code.google.com/p/goprotobuf/proto"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Message represents a packet of information sent to or received from a
|
type NetMessage interface {
|
||||||
|
Peer() *peer.Peer
|
||||||
|
Data() []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func New(p *peer.Peer, data []byte) NetMessage {
|
||||||
|
return &message{peer: p, data: data}
|
||||||
|
}
|
||||||
|
|
||||||
|
// message represents a packet of information sent to or received from a
|
||||||
// particular Peer.
|
// particular Peer.
|
||||||
type Message struct {
|
type message struct {
|
||||||
// To or from, depending on direction.
|
// To or from, depending on direction.
|
||||||
Peer *peer.Peer
|
peer *peer.Peer
|
||||||
|
|
||||||
// Opaque data
|
// Opaque data
|
||||||
Data []byte
|
data []byte
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *message) Peer() *peer.Peer {
|
||||||
|
return m.peer
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *message) Data() []byte {
|
||||||
|
return m.data
|
||||||
}
|
}
|
||||||
|
|
||||||
// FromObject creates a message from a protobuf-marshallable message.
|
// FromObject creates a message from a protobuf-marshallable message.
|
||||||
func FromObject(p *peer.Peer, data proto.Message) (*Message, error) {
|
func FromObject(p *peer.Peer, data proto.Message) (NetMessage, error) {
|
||||||
bytes, err := proto.Marshal(data)
|
bytes, err := proto.Marshal(data)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
return &Message{
|
return New(p, bytes), nil
|
||||||
Peer: p,
|
|
||||||
Data: bytes,
|
|
||||||
}, nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// Pipe objects represent a bi-directional message channel.
|
// Pipe objects represent a bi-directional message channel.
|
||||||
type Pipe struct {
|
type Pipe struct {
|
||||||
Incoming chan *Message
|
Incoming chan NetMessage
|
||||||
Outgoing chan *Message
|
Outgoing chan NetMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewPipe constructs a pipe with channels of a given buffer size.
|
// NewPipe constructs a pipe with channels of a given buffer size.
|
||||||
func NewPipe(bufsize int) *Pipe {
|
func NewPipe(bufsize int) *Pipe {
|
||||||
return &Pipe{
|
return &Pipe{
|
||||||
Incoming: make(chan *Message, bufsize),
|
Incoming: make(chan NetMessage, bufsize),
|
||||||
Outgoing: make(chan *Message, bufsize),
|
Outgoing: make(chan NetMessage, bufsize),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -87,15 +87,15 @@ func (m *Muxer) handleIncomingMessages(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handleIncomingMessage routes message to the appropriate protocol.
|
// handleIncomingMessage routes message to the appropriate protocol.
|
||||||
func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 *msg.Message) {
|
func (m *Muxer) handleIncomingMessage(ctx context.Context, m1 msg.NetMessage) {
|
||||||
|
|
||||||
data, pid, err := unwrapData(m1.Data)
|
data, pid, err := unwrapData(m1.Data())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
u.PErr("muxer de-serializing error: %v\n", err)
|
u.PErr("muxer de-serializing error: %v\n", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
m2 := &msg.Message{Peer: m1.Peer, Data: data}
|
m2 := msg.New(m1.Peer(), data)
|
||||||
proto, found := m.Protocols[pid]
|
proto, found := m.Protocols[pid]
|
||||||
if !found {
|
if !found {
|
||||||
u.PErr("muxer unknown protocol %v\n", pid)
|
u.PErr("muxer unknown protocol %v\n", pid)
|
||||||
@ -125,14 +125,14 @@ func (m *Muxer) handleOutgoingMessages(ctx context.Context, pid ProtocolID, prot
|
|||||||
}
|
}
|
||||||
|
|
||||||
// handleOutgoingMessage wraps out a message and sends it out the
|
// handleOutgoingMessage wraps out a message and sends it out the
|
||||||
func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 *msg.Message) {
|
func (m *Muxer) handleOutgoingMessage(ctx context.Context, pid ProtocolID, m1 msg.NetMessage) {
|
||||||
data, err := wrapData(m1.Data, pid)
|
data, err := wrapData(m1.Data(), pid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
u.PErr("muxer serializing error: %v\n", err)
|
u.PErr("muxer serializing error: %v\n", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
m2 := &msg.Message{Peer: m1.Peer, Data: data}
|
m2 := msg.New(m1.Peer(), data)
|
||||||
select {
|
select {
|
||||||
case m.GetPipe().Outgoing <- m2:
|
case m.GetPipe().Outgoing <- m2:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
|
@ -32,14 +32,14 @@ func newPeer(t *testing.T, id string) *peer.Peer {
|
|||||||
return &peer.Peer{ID: peer.ID(mh)}
|
return &peer.Peer{ID: peer.ID(mh)}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testMsg(t *testing.T, m *msg.Message, data []byte) {
|
func testMsg(t *testing.T, m msg.NetMessage, data []byte) {
|
||||||
if !bytes.Equal(data, m.Data) {
|
if !bytes.Equal(data, m.Data()) {
|
||||||
t.Errorf("Data does not match: %v != %v", data, m.Data)
|
t.Errorf("Data does not match: %v != %v", data, m.Data())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func testWrappedMsg(t *testing.T, m *msg.Message, pid ProtocolID, data []byte) {
|
func testWrappedMsg(t *testing.T, m msg.NetMessage, pid ProtocolID, data []byte) {
|
||||||
data2, pid2, err := unwrapData(m.Data)
|
data2, pid2, err := unwrapData(m.Data())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -76,7 +76,7 @@ func TestSimpleMuxer(t *testing.T) {
|
|||||||
|
|
||||||
// test outgoing p1
|
// test outgoing p1
|
||||||
for _, s := range []string{"foo", "bar", "baz"} {
|
for _, s := range []string{"foo", "bar", "baz"} {
|
||||||
p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
|
p1.Outgoing <- msg.New(peer1, []byte(s))
|
||||||
testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s))
|
testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -86,13 +86,13 @@ func TestSimpleMuxer(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
|
mux1.Incoming <- msg.New(peer1, d)
|
||||||
testMsg(t, <-p1.Incoming, []byte(s))
|
testMsg(t, <-p1.Incoming, []byte(s))
|
||||||
}
|
}
|
||||||
|
|
||||||
// test outgoing p2
|
// test outgoing p2
|
||||||
for _, s := range []string{"foo", "bar", "baz"} {
|
for _, s := range []string{"foo", "bar", "baz"} {
|
||||||
p2.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
|
p2.Outgoing <- msg.New(peer1, []byte(s))
|
||||||
testWrappedMsg(t, <-mux1.Outgoing, pid2, []byte(s))
|
testWrappedMsg(t, <-mux1.Outgoing, pid2, []byte(s))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -102,7 +102,7 @@ func TestSimpleMuxer(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
|
mux1.Incoming <- msg.New(peer1, d)
|
||||||
testMsg(t, <-p2.Incoming, []byte(s))
|
testMsg(t, <-p2.Incoming, []byte(s))
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -139,7 +139,7 @@ func TestSimultMuxer(t *testing.T) {
|
|||||||
for i := 0; i < size; i++ {
|
for i := 0; i < size; i++ {
|
||||||
<-limiter
|
<-limiter
|
||||||
s := fmt.Sprintf("proto %v out %v", pid, i)
|
s := fmt.Sprintf("proto %v out %v", pid, i)
|
||||||
m := &msg.Message{Peer: peer1, Data: []byte(s)}
|
m := msg.New(peer1, []byte(s))
|
||||||
mux1.Protocols[pid].GetPipe().Outgoing <- m
|
mux1.Protocols[pid].GetPipe().Outgoing <- m
|
||||||
counts[pid][0][0]++
|
counts[pid][0][0]++
|
||||||
u.DOut("sent %v\n", s)
|
u.DOut("sent %v\n", s)
|
||||||
@ -156,7 +156,7 @@ func TestSimultMuxer(t *testing.T) {
|
|||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
m := &msg.Message{Peer: peer1, Data: d}
|
m := msg.New(peer1, d)
|
||||||
mux1.Incoming <- m
|
mux1.Incoming <- m
|
||||||
counts[pid][1][0]++
|
counts[pid][1][0]++
|
||||||
u.DOut("sent %v\n", s)
|
u.DOut("sent %v\n", s)
|
||||||
@ -167,7 +167,7 @@ func TestSimultMuxer(t *testing.T) {
|
|||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case m := <-mux1.Outgoing:
|
case m := <-mux1.Outgoing:
|
||||||
data, pid, err := unwrapData(m.Data)
|
data, pid, err := unwrapData(m.Data())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -186,7 +186,7 @@ func TestSimultMuxer(t *testing.T) {
|
|||||||
select {
|
select {
|
||||||
case m := <-mux1.Protocols[pid].GetPipe().Incoming:
|
case m := <-mux1.Protocols[pid].GetPipe().Incoming:
|
||||||
counts[pid][0][1]++
|
counts[pid][0][1]++
|
||||||
u.DOut("got %v\n", string(m.Data))
|
u.DOut("got %v\n", string(m.Data()))
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -239,7 +239,7 @@ func TestStopping(t *testing.T) {
|
|||||||
|
|
||||||
// test outgoing p1
|
// test outgoing p1
|
||||||
for _, s := range []string{"foo", "bar", "baz"} {
|
for _, s := range []string{"foo", "bar", "baz"} {
|
||||||
p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
|
p1.Outgoing <- msg.New(peer1, []byte(s))
|
||||||
testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s))
|
testWrappedMsg(t, <-mux1.Outgoing, pid1, []byte(s))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -249,7 +249,7 @@ func TestStopping(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
|
mux1.Incoming <- msg.New(peer1, d)
|
||||||
testMsg(t, <-p1.Incoming, []byte(s))
|
testMsg(t, <-p1.Incoming, []byte(s))
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -260,7 +260,7 @@ func TestStopping(t *testing.T) {
|
|||||||
|
|
||||||
// test outgoing p1
|
// test outgoing p1
|
||||||
for _, s := range []string{"foo", "bar", "baz"} {
|
for _, s := range []string{"foo", "bar", "baz"} {
|
||||||
p1.Outgoing <- &msg.Message{Peer: peer1, Data: []byte(s)}
|
p1.Outgoing <- msg.New(peer1, []byte(s))
|
||||||
select {
|
select {
|
||||||
case <-mux1.Outgoing:
|
case <-mux1.Outgoing:
|
||||||
t.Error("should not have received anything.")
|
t.Error("should not have received anything.")
|
||||||
@ -274,7 +274,7 @@ func TestStopping(t *testing.T) {
|
|||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
mux1.Incoming <- &msg.Message{Peer: peer1, Data: d}
|
mux1.Incoming <- msg.New(peer1, d)
|
||||||
select {
|
select {
|
||||||
case <-p1.Incoming:
|
case <-p1.Incoming:
|
||||||
t.Error("should not have received anything.")
|
t.Error("should not have received anything.")
|
||||||
|
@ -75,7 +75,7 @@ type Request struct {
|
|||||||
PeerID peer.ID
|
PeerID peer.ID
|
||||||
|
|
||||||
// Response is the channel of incoming responses.
|
// Response is the channel of incoming responses.
|
||||||
Response chan *msg.Message
|
Response chan msg.NetMessage
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewRequest creates a request for given peer.ID
|
// NewRequest creates a request for given peer.ID
|
||||||
@ -88,7 +88,7 @@ func NewRequest(pid peer.ID) (*Request, error) {
|
|||||||
return &Request{
|
return &Request{
|
||||||
ID: id,
|
ID: id,
|
||||||
PeerID: pid,
|
PeerID: pid,
|
||||||
Response: make(chan *msg.Message, 1),
|
Response: make(chan msg.NetMessage, 1),
|
||||||
}, nil
|
}, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,7 +16,7 @@ type Handler interface {
|
|||||||
|
|
||||||
// HandleMessage receives an incoming message, and potentially returns
|
// HandleMessage receives an incoming message, and potentially returns
|
||||||
// a response message to send back.
|
// a response message to send back.
|
||||||
HandleMessage(context.Context, *msg.Message) (*msg.Message, error)
|
HandleMessage(context.Context, msg.NetMessage) (msg.NetMessage, error)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Service is a networking component that protocols can use to multiplex
|
// Service is a networking component that protocols can use to multiplex
|
||||||
@ -74,16 +74,16 @@ func (s *Service) GetPipe() *msg.Pipe {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SendMessage sends a message out
|
// SendMessage sends a message out
|
||||||
func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID) error {
|
func (s *Service) SendMessage(ctx context.Context, m msg.NetMessage, rid RequestID) error {
|
||||||
|
|
||||||
// serialize ServiceMessage wrapper
|
// serialize ServiceMessage wrapper
|
||||||
data, err := wrapData(m.Data, rid)
|
data, err := wrapData(m.Data(), rid)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
// send message
|
// send message
|
||||||
m2 := &msg.Message{Peer: m.Peer, Data: data}
|
m2 := msg.New(m.Peer(), data)
|
||||||
select {
|
select {
|
||||||
case s.Outgoing <- m2:
|
case s.Outgoing <- m2:
|
||||||
case <-ctx.Done():
|
case <-ctx.Done():
|
||||||
@ -94,10 +94,10 @@ func (s *Service) SendMessage(ctx context.Context, m *msg.Message, rid RequestID
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SendRequest sends a request message out and awaits a response.
|
// SendRequest sends a request message out and awaits a response.
|
||||||
func (s *Service) SendRequest(ctx context.Context, m *msg.Message) (*msg.Message, error) {
|
func (s *Service) SendRequest(ctx context.Context, m msg.NetMessage) (msg.NetMessage, error) {
|
||||||
|
|
||||||
// create a request
|
// create a request
|
||||||
r, err := NewRequest(m.Peer.ID)
|
r, err := NewRequest(m.Peer().ID)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, err
|
return nil, err
|
||||||
}
|
}
|
||||||
@ -150,14 +150,14 @@ func (s *Service) handleIncomingMessages(ctx context.Context) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) {
|
func (s *Service) handleIncomingMessage(ctx context.Context, m msg.NetMessage) {
|
||||||
|
|
||||||
// unwrap the incoming message
|
// unwrap the incoming message
|
||||||
data, rid, err := unwrapData(m.Data)
|
data, rid, err := unwrapData(m.Data())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
u.PErr("de-serializing error: %v\n", err)
|
u.PErr("de-serializing error: %v\n", err)
|
||||||
}
|
}
|
||||||
m2 := &msg.Message{Peer: m.Peer, Data: data}
|
m2 := msg.New(m.Peer(), data)
|
||||||
|
|
||||||
// if it's a request (or has no RequestID), handle it
|
// if it's a request (or has no RequestID), handle it
|
||||||
if rid == nil || rid.IsRequest() {
|
if rid == nil || rid.IsRequest() {
|
||||||
@ -182,7 +182,7 @@ func (s *Service) handleIncomingMessage(ctx context.Context, m *msg.Message) {
|
|||||||
u.PErr("RequestID should identify a response here.\n")
|
u.PErr("RequestID should identify a response here.\n")
|
||||||
}
|
}
|
||||||
|
|
||||||
key := RequestKey(m.Peer.ID, RequestID(rid))
|
key := RequestKey(m.Peer().ID, RequestID(rid))
|
||||||
s.RequestsLock.RLock()
|
s.RequestsLock.RLock()
|
||||||
r, found := s.Requests[key]
|
r, found := s.Requests[key]
|
||||||
s.RequestsLock.RUnlock()
|
s.RequestsLock.RUnlock()
|
||||||
|
@ -15,15 +15,15 @@ import (
|
|||||||
// ReverseHandler reverses all Data it receives and sends it back.
|
// ReverseHandler reverses all Data it receives and sends it back.
|
||||||
type ReverseHandler struct{}
|
type ReverseHandler struct{}
|
||||||
|
|
||||||
func (t *ReverseHandler) HandleMessage(ctx context.Context, m *msg.Message) (
|
func (t *ReverseHandler) HandleMessage(ctx context.Context, m msg.NetMessage) (
|
||||||
*msg.Message, error) {
|
msg.NetMessage, error) {
|
||||||
|
|
||||||
d := m.Data
|
d := m.Data()
|
||||||
for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 {
|
for i, j := 0, len(d)-1; i < j; i, j = i+1, j-1 {
|
||||||
d[i], d[j] = d[j], d[i]
|
d[i], d[j] = d[j], d[i]
|
||||||
}
|
}
|
||||||
|
|
||||||
return &msg.Message{Peer: m.Peer, Data: d}, nil
|
return msg.New(m.Peer(), d), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func newPeer(t *testing.T, id string) *peer.Peer {
|
func newPeer(t *testing.T, id string) *peer.Peer {
|
||||||
@ -47,11 +47,11 @@ func TestServiceHandler(t *testing.T) {
|
|||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
m1 := &msg.Message{Peer: peer1, Data: d}
|
m1 := msg.New(peer1, d)
|
||||||
s.Incoming <- m1
|
s.Incoming <- m1
|
||||||
m2 := <-s.Outgoing
|
m2 := <-s.Outgoing
|
||||||
|
|
||||||
d, rid, err := unwrapData(m2.Data)
|
d, rid, err := unwrapData(m2.Data())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
@ -85,14 +85,14 @@ func TestServiceRequest(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
m1 := &msg.Message{Peer: peer1, Data: []byte("beep")}
|
m1 := msg.New(peer1, []byte("beep"))
|
||||||
m2, err := s1.SendRequest(ctx, m1)
|
m2, err := s1.SendRequest(ctx, m1)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
t.Error(err)
|
t.Error(err)
|
||||||
}
|
}
|
||||||
|
|
||||||
if !bytes.Equal(m2.Data, []byte("peeb")) {
|
if !bytes.Equal(m2.Data(), []byte("peeb")) {
|
||||||
t.Errorf("service handler data incorrect: %v != %v", m2.Data, "oof")
|
t.Errorf("service handler data incorrect: %v != %v", m2.Data(), "oof")
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -117,7 +117,7 @@ func TestServiceRequestTimeout(t *testing.T) {
|
|||||||
}
|
}
|
||||||
}()
|
}()
|
||||||
|
|
||||||
m1 := &msg.Message{Peer: peer1, Data: []byte("beep")}
|
m1 := msg.New(peer1, []byte("beep"))
|
||||||
m2, err := s1.SendRequest(ctx, m1)
|
m2, err := s1.SendRequest(ctx, m1)
|
||||||
if err == nil || m2 != nil {
|
if err == nil || m2 != nil {
|
||||||
t.Error("should've timed out")
|
t.Error("should've timed out")
|
||||||
|
Reference in New Issue
Block a user