diff --git a/balancer/balancer.go b/balancer/balancer.go index 219a2940..499acbd9 100644 --- a/balancer/balancer.go +++ b/balancer/balancer.go @@ -126,6 +126,8 @@ type BuildOptions struct { // to a remote load balancer server. The Balancer implementations // can ignore this if it doesn't need to talk to remote balancer. Dialer func(context.Context, string) (net.Conn, error) + // ChannelzParentID is the entity parent's channelz unique identification number. + ChannelzParentID int64 } // Builder creates a balancer. diff --git a/channelz/funcs.go b/channelz/funcs.go new file mode 100644 index 00000000..586a0336 --- /dev/null +++ b/channelz/funcs.go @@ -0,0 +1,573 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +// Package channelz defines APIs for enabling channelz service, entry +// registration/deletion, and accessing channelz data. It also defines channelz +// metric struct formats. +// +// All APIs in this package are experimental. +package channelz + +import ( + "sort" + "sync" + "sync/atomic" + + "google.golang.org/grpc/grpclog" +) + +var ( + db dbWrapper + idGen idGenerator + // EntryPerPage defines the number of channelz entries to be shown on a web page. + EntryPerPage = 50 + curState int32 +) + +// TurnOn turns on channelz data collection. +func TurnOn() { + if !IsOn() { + NewChannelzStorage() + atomic.StoreInt32(&curState, 1) + } +} + +// IsOn returns whether channelz data collection is on. +func IsOn() bool { + return atomic.CompareAndSwapInt32(&curState, 1, 1) +} + +// dbWarpper wraps around a reference to internal channelz data storage, and +// provide synchronized functionality to set and get the reference. +type dbWrapper struct { + mu sync.RWMutex + DB *channelMap +} + +func (d *dbWrapper) set(db *channelMap) { + d.mu.Lock() + d.DB = db + d.mu.Unlock() +} + +func (d *dbWrapper) get() *channelMap { + d.mu.RLock() + defer d.mu.RUnlock() + return d.DB +} + +// NewChannelzStorage initializes channelz data storage and id generator. +// +// Note: This function is exported for testing purpose only. User should not call +// it in most cases. +func NewChannelzStorage() { + db.set(&channelMap{ + topLevelChannels: make(map[int64]struct{}), + channels: make(map[int64]*channel), + listenSockets: make(map[int64]*listenSocket), + normalSockets: make(map[int64]*normalSocket), + servers: make(map[int64]*server), + subChannels: make(map[int64]*subChannel), + }) + idGen.reset() +} + +// GetTopChannels returns a slice of top channel's ChannelMetric, along with a +// boolean indicating whether there's more top channels to be queried for. +// +// The arg id specifies that only top channel with id at or above it will be included +// in the result. The returned slice is up to a length of EntryPerPage, and is +// sorted in ascending id order. +func GetTopChannels(id int64) ([]*ChannelMetric, bool) { + return db.get().GetTopChannels(id) +} + +// GetServers returns a slice of server's ServerMetric, along with a +// boolean indicating whether there's more servers to be queried for. +// +// The arg id specifies that only server with id at or above it will be included +// in the result. The returned slice is up to a length of EntryPerPage, and is +// sorted in ascending id order. +func GetServers(id int64) ([]*ServerMetric, bool) { + return db.get().GetServers(id) +} + +// GetServerSockets returns a slice of server's (identified by id) normal socket's +// SocketMetric, along with a boolean indicating whether there's more sockets to +// be queried for. +// +// The arg startID specifies that only sockets with id at or above it will be +// included in the result. The returned slice is up to a length of EntryPerPage, +// and is sorted in ascending id order. +func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) { + return db.get().GetServerSockets(id, startID) +} + +// GetChannel returns the ChannelMetric for the channel (identified by id). +func GetChannel(id int64) *ChannelMetric { + return db.get().GetChannel(id) +} + +// GetSubChannel returns the SubChannelMetric for the subchannel (identified by id). +func GetSubChannel(id int64) *SubChannelMetric { + return db.get().GetSubChannel(id) +} + +// GetSocket returns the SocketInternalMetric for the socket (identified by id). +func GetSocket(id int64) *SocketMetric { + return db.get().GetSocket(id) +} + +// RegisterChannel registers the given channel c in channelz database with ref +// as its reference name, and add it to the child list of its parent (identified +// by pid). pid = 0 means no parent. It returns the unique channelz tracking id +// assigned to this channel. +func RegisterChannel(c Channel, pid int64, ref string) int64 { + id := idGen.genID() + cn := &channel{ + refName: ref, + c: c, + subChans: make(map[int64]string), + nestedChans: make(map[int64]string), + id: id, + pid: pid, + } + if pid == 0 { + db.get().addChannel(id, cn, true, pid, ref) + } else { + db.get().addChannel(id, cn, false, pid, ref) + } + return id +} + +// RegisterSubChannel registers the given channel c in channelz database with ref +// as its reference name, and add it to the child list of its parent (identified +// by pid). It returns the unique channelz tracking id assigned to this subchannel. +func RegisterSubChannel(c Channel, pid int64, ref string) int64 { + if pid == 0 { + grpclog.Error("a SubChannel's parent id cannot be 0") + return 0 + } + id := idGen.genID() + sc := &subChannel{ + refName: ref, + c: c, + sockets: make(map[int64]string), + id: id, + pid: pid, + } + db.get().addSubChannel(id, sc, pid, ref) + return id +} + +// RegisterServer registers the given server s in channelz database. It returns +// the unique channelz tracking id assigned to this server. +func RegisterServer(s Server, ref string) int64 { + id := idGen.genID() + svr := &server{ + refName: ref, + s: s, + sockets: make(map[int64]string), + listenSockets: make(map[int64]string), + id: id, + } + db.get().addServer(id, svr) + return id +} + +// RegisterListenSocket registers the given listen socket s in channelz database +// with ref as its reference name, and add it to the child list of its parent +// (identified by pid). It returns the unique channelz tracking id assigned to +// this listen socket. +func RegisterListenSocket(s Socket, pid int64, ref string) int64 { + if pid == 0 { + grpclog.Error("a ListenSocket's parent id cannot be 0") + return 0 + } + id := idGen.genID() + ls := &listenSocket{refName: ref, s: s, id: id, pid: pid} + db.get().addListenSocket(id, ls, pid, ref) + return id +} + +// RegisterNormalSocket registers the given normal socket s in channelz database +// with ref as its reference name, and add it to the child list of its parent +// (identified by pid). It returns the unique channelz tracking id assigned to +// this normal socket. +func RegisterNormalSocket(s Socket, pid int64, ref string) int64 { + if pid == 0 { + grpclog.Error("a NormalSocket's parent id cannot be 0") + return 0 + } + id := idGen.genID() + ns := &normalSocket{refName: ref, s: s, id: id, pid: pid} + db.get().addNormalSocket(id, ns, pid, ref) + return id +} + +// RemoveEntry removes an entry with unique channelz trakcing id to be id from +// channelz database. +func RemoveEntry(id int64) { + db.get().removeEntry(id) +} + +// channelMap is the storage data structure for channelz. +// Methods of channelMap can be divided in two two categories with respect to locking. +// 1. Methods acquire the global lock. +// 2. Methods that can only be called when global lock is held. +// A second type of method need always to be called inside a first type of method. +type channelMap struct { + mu sync.RWMutex + topLevelChannels map[int64]struct{} + servers map[int64]*server + channels map[int64]*channel + subChannels map[int64]*subChannel + listenSockets map[int64]*listenSocket + normalSockets map[int64]*normalSocket +} + +func (c *channelMap) addServer(id int64, s *server) { + c.mu.Lock() + s.cm = c + c.servers[id] = s + c.mu.Unlock() +} + +func (c *channelMap) addChannel(id int64, cn *channel, isTopChannel bool, pid int64, ref string) { + c.mu.Lock() + cn.cm = c + c.channels[id] = cn + if isTopChannel { + c.topLevelChannels[id] = struct{}{} + } else { + c.findEntry(pid).addChild(id, cn) + } + c.mu.Unlock() +} + +func (c *channelMap) addSubChannel(id int64, sc *subChannel, pid int64, ref string) { + c.mu.Lock() + sc.cm = c + c.subChannels[id] = sc + c.findEntry(pid).addChild(id, sc) + c.mu.Unlock() +} + +func (c *channelMap) addListenSocket(id int64, ls *listenSocket, pid int64, ref string) { + c.mu.Lock() + ls.cm = c + c.listenSockets[id] = ls + c.findEntry(pid).addChild(id, ls) + c.mu.Unlock() +} + +func (c *channelMap) addNormalSocket(id int64, ns *normalSocket, pid int64, ref string) { + c.mu.Lock() + ns.cm = c + c.normalSockets[id] = ns + c.findEntry(pid).addChild(id, ns) + c.mu.Unlock() +} + +// removeEntry triggers the removal of an entry, which may not indeed delete the +// entry, if it has to wait on the deletion of its children, or may lead to a chain +// of entry deletion. For example, deleting the last socket of a gracefully shutting +// down server will lead to the server being also deleted. +func (c *channelMap) removeEntry(id int64) { + c.mu.Lock() + c.findEntry(id).triggerDelete() + c.mu.Unlock() +} + +// c.mu must be held by the caller. +func (c *channelMap) findEntry(id int64) entry { + var v entry + var ok bool + if v, ok = c.channels[id]; ok { + return v + } + if v, ok = c.subChannels[id]; ok { + return v + } + if v, ok = c.servers[id]; ok { + return v + } + if v, ok = c.listenSockets[id]; ok { + return v + } + if v, ok = c.normalSockets[id]; ok { + return v + } + return &dummyEntry{idNotFound: id} +} + +// c.mu must be held by the caller +// deleteEntry simply deletes an entry from the channelMap. Before calling this +// method, caller must check this entry is ready to be deleted, i.e removeEntry() +// has been called on it, and no children still exist. +// Conditionals are ordered by the expected frequency of deletion of each entity +// type, in order to optimize performance. +func (c *channelMap) deleteEntry(id int64) { + var ok bool + if _, ok = c.normalSockets[id]; ok { + delete(c.normalSockets, id) + return + } + if _, ok = c.subChannels[id]; ok { + delete(c.subChannels, id) + return + } + if _, ok = c.channels[id]; ok { + delete(c.channels, id) + delete(c.topLevelChannels, id) + return + } + if _, ok = c.listenSockets[id]; ok { + delete(c.listenSockets, id) + return + } + if _, ok = c.servers[id]; ok { + delete(c.servers, id) + return + } +} + +type int64Slice []int64 + +func (s int64Slice) Len() int { return len(s) } +func (s int64Slice) Swap(i, j int) { s[i], s[j] = s[j], s[i] } +func (s int64Slice) Less(i, j int) bool { return s[i] < s[j] } + +func copyMap(m map[int64]string) map[int64]string { + n := make(map[int64]string) + for k, v := range m { + n[k] = v + } + return n +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} + +func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) { + c.mu.RLock() + l := len(c.topLevelChannels) + ids := make([]int64, 0, l) + cns := make([]*channel, 0, min(l, EntryPerPage)) + + for k := range c.topLevelChannels { + ids = append(ids, k) + } + sort.Sort(int64Slice(ids)) + idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) + count := 0 + var end bool + var t []*ChannelMetric + for i, v := range ids[idx:] { + if count == EntryPerPage { + break + } + if cn, ok := c.channels[v]; ok { + cns = append(cns, cn) + t = append(t, &ChannelMetric{ + NestedChans: copyMap(cn.nestedChans), + SubChans: copyMap(cn.subChans), + }) + count++ + } + if i == len(ids[idx:])-1 { + end = true + break + } + } + c.mu.RUnlock() + if count == 0 { + end = true + } + + for i, cn := range cns { + t[i].ChannelData = cn.c.ChannelzMetric() + t[i].ID = cn.id + t[i].RefName = cn.refName + } + return t, end +} + +func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) { + c.mu.RLock() + l := len(c.servers) + ids := make([]int64, 0, l) + ss := make([]*server, 0, min(l, EntryPerPage)) + for k := range c.servers { + ids = append(ids, k) + } + sort.Sort(int64Slice(ids)) + idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) + count := 0 + var end bool + var s []*ServerMetric + for i, v := range ids[idx:] { + if count == EntryPerPage { + break + } + if svr, ok := c.servers[v]; ok { + ss = append(ss, svr) + s = append(s, &ServerMetric{ + ListenSockets: copyMap(svr.listenSockets), + }) + count++ + } + if i == len(ids[idx:])-1 { + end = true + break + } + } + c.mu.RUnlock() + if count == 0 { + end = true + } + + for i, svr := range ss { + s[i].ServerData = svr.s.ChannelzMetric() + s[i].ID = svr.id + s[i].RefName = svr.refName + } + return s, end +} + +func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) { + var svr *server + var ok bool + c.mu.RLock() + if svr, ok = c.servers[id]; !ok { + // server with id doesn't exist. + c.mu.RUnlock() + return nil, true + } + svrskts := svr.sockets + l := len(svrskts) + ids := make([]int64, 0, l) + sks := make([]*normalSocket, 0, min(l, EntryPerPage)) + for k := range svrskts { + ids = append(ids, k) + } + sort.Sort((int64Slice(ids))) + idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id }) + count := 0 + var end bool + for i, v := range ids[idx:] { + if count == EntryPerPage { + break + } + if ns, ok := c.normalSockets[v]; ok { + sks = append(sks, ns) + count++ + } + if i == len(ids[idx:])-1 { + end = true + break + } + } + c.mu.RUnlock() + if count == 0 { + end = true + } + var s []*SocketMetric + for _, ns := range sks { + sm := &SocketMetric{} + sm.SocketData = ns.s.ChannelzMetric() + sm.ID = ns.id + sm.RefName = ns.refName + s = append(s, sm) + } + return s, end +} + +func (c *channelMap) GetChannel(id int64) *ChannelMetric { + cm := &ChannelMetric{} + var cn *channel + var ok bool + c.mu.RLock() + if cn, ok = c.channels[id]; !ok { + // channel with id doesn't exist. + c.mu.RUnlock() + return nil + } + cm.NestedChans = copyMap(cn.nestedChans) + cm.SubChans = copyMap(cn.subChans) + c.mu.RUnlock() + cm.ChannelData = cn.c.ChannelzMetric() + cm.ID = cn.id + cm.RefName = cn.refName + return cm +} + +func (c *channelMap) GetSubChannel(id int64) *SubChannelMetric { + cm := &SubChannelMetric{} + var sc *subChannel + var ok bool + c.mu.RLock() + if sc, ok = c.subChannels[id]; !ok { + // subchannel with id doesn't exist. + c.mu.RUnlock() + return nil + } + cm.Sockets = copyMap(sc.sockets) + c.mu.RUnlock() + cm.ChannelData = sc.c.ChannelzMetric() + cm.ID = sc.id + cm.RefName = sc.refName + return cm +} + +func (c *channelMap) GetSocket(id int64) *SocketMetric { + sm := &SocketMetric{} + c.mu.RLock() + if ls, ok := c.listenSockets[id]; ok { + c.mu.RUnlock() + sm.SocketData = ls.s.ChannelzMetric() + sm.ID = ls.id + sm.RefName = ls.refName + return sm + } + if ns, ok := c.normalSockets[id]; ok { + c.mu.RUnlock() + sm.SocketData = ns.s.ChannelzMetric() + sm.ID = ns.id + sm.RefName = ns.refName + return sm + } + c.mu.RUnlock() + return nil +} + +type idGenerator struct { + id int64 +} + +func (i *idGenerator) reset() { + atomic.StoreInt64(&i.id, 0) +} + +func (i *idGenerator) genID() int64 { + return atomic.AddInt64(&i.id, 1) +} diff --git a/channelz/types.go b/channelz/types.go new file mode 100644 index 00000000..b84abfe8 --- /dev/null +++ b/channelz/types.go @@ -0,0 +1,415 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package channelz + +import ( + "net" + "time" + + "google.golang.org/grpc/connectivity" + "google.golang.org/grpc/grpclog" +) + +// entry represents a node in the channelz database. +type entry interface { + // addChild adds a child e, whose channelz id is id to child list + addChild(id int64, e entry) + // deleteChild deletes a child with channelz id to be id from child list + deleteChild(id int64) + // triggerDelete tries to delete self from channelz database. However, if child + // list is not empty, then deletion from the database is on hold until the last + // child is deleted from database. + triggerDelete() + // deleteSelfIfReady check whether triggerDelete() has been called before, and whether child + // list is now empty. If both conditions are met, then delete self from database. + deleteSelfIfReady() +} + +// dummyEntry is a fake entry to handle entry not found case. +type dummyEntry struct { + idNotFound int64 +} + +func (d *dummyEntry) addChild(id int64, e entry) { + // Note: It is possible for a normal program to reach here under race condition. + // For example, there could be a race between ClientConn.Close() info being propagated + // to addrConn and http2Client. ClientConn.Close() cancel the context and result + // in http2Client to error. The error info is then caught by transport monitor + // and before addrConn.tearDown() is called in side ClientConn.Close(). Therefore, + // the addrConn will create a new transport. And when registering the new transport in + // channelz, its parent addrConn could have already been torn down and deleted + // from channelz tracking, and thus reach the code here. + grpclog.Infof("attempt to add child of type %T with id %d to a parent (id=%d) that doesn't currently exist", e, id, d.idNotFound) +} + +func (d *dummyEntry) deleteChild(id int64) { + // It is possible for a normal program to reach here under race condition. + // Refer to the example described in addChild(). + grpclog.Infof("attempt to delete child with id %d from a parent (id=%d) that doesn't currently exist", id, d.idNotFound) +} + +func (d *dummyEntry) triggerDelete() { + grpclog.Warningf("attempt to delete an entry (id=%d) that doesn't currently exist", d.idNotFound) +} + +func (*dummyEntry) deleteSelfIfReady() { + // code should not reach here. deleteSelfIfReady is always called on an existing entry. +} + +// ChannelMetric defines the info channelz provides for a specific Channel, which +// includes ChannelInternalMetric and channelz-specific data, such as channelz id, +// child list, etc. +type ChannelMetric struct { + // ID is the channelz id of this channel. + ID int64 + // RefName is the human readable reference string of this channel. + RefName string + // ChannelData contains channel internal metric reported by the channel through + // ChannelzMetric(). + ChannelData *ChannelInternalMetric + // NestedChans tracks the nested channel type children of this channel in the format of + // a map from nested channel channelz id to corresponding reference string. + NestedChans map[int64]string + // SubChans tracks the subchannel type children of this channel in the format of a + // map from subchannel channelz id to corresponding reference string. + SubChans map[int64]string + // Sockets tracks the socket type children of this channel in the format of a map + // from socket channelz id to corresponding reference string. + // Note current grpc implementation doesn't allow channel having sockets directly, + // therefore, this is field is unused. + Sockets map[int64]string +} + +// SubChannelMetric defines the info channelz provides for a specific SubChannel, +// which includes ChannelInternalMetric and channelz-specific data, such as +// channelz id, child list, etc. +type SubChannelMetric struct { + // ID is the channelz id of this subchannel. + ID int64 + // RefName is the human readable reference string of this subchannel. + RefName string + // ChannelData contains subchannel internal metric reported by the subchannel + // through ChannelzMetric(). + ChannelData *ChannelInternalMetric + // NestedChans tracks the nested channel type children of this subchannel in the format of + // a map from nested channel channelz id to corresponding reference string. + // Note current grpc implementation doesn't allow subchannel to have nested channels + // as children, therefore, this field is unused. + NestedChans map[int64]string + // SubChans tracks the subchannel type children of this subchannel in the format of a + // map from subchannel channelz id to corresponding reference string. + // Note current grpc implementation doesn't allow subchannel to have subchannels + // as children, therefore, this field is unused. + SubChans map[int64]string + // Sockets tracks the socket type children of this subchannel in the format of a map + // from socket channelz id to corresponding reference string. + Sockets map[int64]string +} + +// ChannelInternalMetric defines the struct that the implementor of Channel interface +// should return from ChannelzMetric(). +type ChannelInternalMetric struct { + // current connectivity state of the channel. + State connectivity.State + // The target this channel originally tried to connect to. May be absent + Target string + // The number of calls started on the channel. + CallsStarted int64 + // The number of calls that have completed with an OK status. + CallsSucceeded int64 + // The number of calls that have a completed with a non-OK status. + CallsFailed int64 + // The last time a call was started on the channel. + LastCallStartedTimestamp time.Time + //TODO: trace +} + +// Channel is the interface that should be satisfied in order to be tracked by +// channelz as Channel or SubChannel. +type Channel interface { + ChannelzMetric() *ChannelInternalMetric +} + +type channel struct { + refName string + c Channel + closeCalled bool + nestedChans map[int64]string + subChans map[int64]string + id int64 + pid int64 + cm *channelMap +} + +func (c *channel) addChild(id int64, e entry) { + switch v := e.(type) { + case *subChannel: + c.subChans[id] = v.refName + case *channel: + c.nestedChans[id] = v.refName + default: + grpclog.Errorf("cannot add a child (id = %d) of type %T to a channel", id, e) + } +} + +func (c *channel) deleteChild(id int64) { + delete(c.subChans, id) + delete(c.nestedChans, id) + c.deleteSelfIfReady() +} + +func (c *channel) triggerDelete() { + c.closeCalled = true + c.deleteSelfIfReady() +} + +func (c *channel) deleteSelfIfReady() { + if !c.closeCalled || len(c.subChans)+len(c.nestedChans) != 0 { + return + } + c.cm.deleteEntry(c.id) + // not top channel + if c.pid != 0 { + c.cm.findEntry(c.pid).deleteChild(c.id) + } +} + +type subChannel struct { + refName string + c Channel + closeCalled bool + sockets map[int64]string + id int64 + pid int64 + cm *channelMap +} + +func (sc *subChannel) addChild(id int64, e entry) { + if v, ok := e.(*normalSocket); ok { + sc.sockets[id] = v.refName + } else { + grpclog.Errorf("cannot add a child (id = %d) of type %T to a subChannel", id, e) + } +} + +func (sc *subChannel) deleteChild(id int64) { + delete(sc.sockets, id) + sc.deleteSelfIfReady() +} + +func (sc *subChannel) triggerDelete() { + sc.closeCalled = true + sc.deleteSelfIfReady() +} + +func (sc *subChannel) deleteSelfIfReady() { + if !sc.closeCalled || len(sc.sockets) != 0 { + return + } + sc.cm.deleteEntry(sc.id) + sc.cm.findEntry(sc.pid).deleteChild(sc.id) +} + +// SocketMetric defines the info channelz provides for a specific Socket, which +// includes SocketInternalMetric and channelz-specific data, such as channelz id, etc. +type SocketMetric struct { + // ID is the channelz id of this socket. + ID int64 + // RefName is the human readable reference string of this socket. + RefName string + // SocketData contains socket internal metric reported by the socket through + // ChannelzMetric(). + SocketData *SocketInternalMetric +} + +// SocketInternalMetric defines the struct that the implementor of Socket interface +// should return from ChannelzMetric(). +type SocketInternalMetric struct { + // The number of streams that have been started. + StreamsStarted int64 + // The number of streams that have ended successfully with the EoS bit set for + // both end points. + StreamsSucceeded int64 + // The number of incoming streams that have a completed with a non-OK status. + StreamsFailed int64 + // The number of messages successfully sent on this socket. + MessagesSent int64 + MessagesReceived int64 + // The number of keep alives sent. This is typically implemented with HTTP/2 + // ping messages. + KeepAlivesSent int64 + // The last time a stream was created by this endpoint. Usually unset for + // servers. + LastLocalStreamCreatedTimestamp time.Time + // The last time a stream was created by the remote endpoint. Usually unset + // for clients. + LastRemoteStreamCreatedTimestamp time.Time + // The last time a message was sent by this endpoint. + LastMessageSentTimestamp time.Time + // The last time a message was received by this endpoint. + LastMessageReceivedTimestamp time.Time + // The amount of window, granted to the local endpoint by the remote endpoint. + // This may be slightly out of date due to network latency. This does NOT + // include stream level or TCP level flow control info. + LocalFlowControlWindow int64 + // The amount of window, granted to the remote endpoint by the local endpoint. + // This may be slightly out of date due to network latency. This does NOT + // include stream level or TCP level flow control info. + RemoteFlowControlWindow int64 + // The locally bound address. + LocalAddr net.Addr + // The remote bound address. May be absent. + RemoteAddr net.Addr + // Optional, represents the name of the remote endpoint, if different than + // the original target name. + RemoteName string + //TODO: socket options + //TODO: Security +} + +// Socket is the interface that should be satisfied in order to be tracked by +// channelz as Socket. +type Socket interface { + ChannelzMetric() *SocketInternalMetric +} + +type listenSocket struct { + refName string + s Socket + id int64 + pid int64 + cm *channelMap +} + +func (ls *listenSocket) addChild(id int64, e entry) { + grpclog.Errorf("cannot add a child (id = %d) of type %T to a listen socket", id, e) +} + +func (ls *listenSocket) deleteChild(id int64) { + grpclog.Errorf("cannot delete a child (id = %d) from a listen socket", id) +} + +func (ls *listenSocket) triggerDelete() { + ls.cm.deleteEntry(ls.id) + ls.cm.findEntry(ls.pid).deleteChild(ls.id) +} + +func (ls *listenSocket) deleteSelfIfReady() { + grpclog.Errorf("cannot call deleteSelfIfReady on a listen socket") +} + +type normalSocket struct { + refName string + s Socket + id int64 + pid int64 + cm *channelMap +} + +func (ns *normalSocket) addChild(id int64, e entry) { + grpclog.Errorf("cannot add a child (id = %d) of type %T to a normal socket", id, e) +} + +func (ns *normalSocket) deleteChild(id int64) { + grpclog.Errorf("cannot delete a child (id = %d) from a normal socket", id) +} + +func (ns *normalSocket) triggerDelete() { + ns.cm.deleteEntry(ns.id) + ns.cm.findEntry(ns.pid).deleteChild(ns.id) +} + +func (ns *normalSocket) deleteSelfIfReady() { + grpclog.Errorf("cannot call deleteSelfIfReady on a normal socket") +} + +// ServerMetric defines the info channelz provides for a specific Server, which +// includes ServerInternalMetric and channelz-specific data, such as channelz id, +// child list, etc. +type ServerMetric struct { + // ID is the channelz id of this server. + ID int64 + // RefName is the human readable reference string of this server. + RefName string + // ServerData contains server internal metric reported by the server through + // ChannelzMetric(). + ServerData *ServerInternalMetric + // ListenSockets tracks the listener socket type children of this server in the + // format of a map from socket channelz id to corresponding reference string. + ListenSockets map[int64]string +} + +// ServerInternalMetric defines the struct that the implementor of Server interface +// should return from ChannelzMetric(). +type ServerInternalMetric struct { + // The number of incoming calls started on the server. + CallsStarted int64 + // The number of incoming calls that have completed with an OK status. + CallsSucceeded int64 + // The number of incoming calls that have a completed with a non-OK status. + CallsFailed int64 + // The last time a call was started on the server. + LastCallStartedTimestamp time.Time + //TODO: trace +} + +// Server is the interface to be satisfied in order to be tracked by channelz as +// Server. +type Server interface { + ChannelzMetric() *ServerInternalMetric +} + +type server struct { + refName string + s Server + closeCalled bool + sockets map[int64]string + listenSockets map[int64]string + id int64 + cm *channelMap +} + +func (s *server) addChild(id int64, e entry) { + switch v := e.(type) { + case *normalSocket: + s.sockets[id] = v.refName + case *listenSocket: + s.listenSockets[id] = v.refName + default: + grpclog.Errorf("cannot add a child (id = %d) of type %T to a server", id, e) + } +} + +func (s *server) deleteChild(id int64) { + delete(s.sockets, id) + delete(s.listenSockets, id) + s.deleteSelfIfReady() +} + +func (s *server) triggerDelete() { + s.closeCalled = true + s.deleteSelfIfReady() +} + +func (s *server) deleteSelfIfReady() { + if !s.closeCalled || len(s.sockets)+len(s.listenSockets) != 0 { + return + } + s.cm.deleteEntry(s.id) +} diff --git a/clientconn.go b/clientconn.go index 513fb3fa..3fbfe297 100644 --- a/clientconn.go +++ b/clientconn.go @@ -32,6 +32,7 @@ import ( "golang.org/x/net/trace" "google.golang.org/grpc/balancer" _ "google.golang.org/grpc/balancer/roundrobin" // To register roundrobin. + "google.golang.org/grpc/channelz" "google.golang.org/grpc/codes" "google.golang.org/grpc/connectivity" "google.golang.org/grpc/credentials" @@ -109,6 +110,7 @@ type dialOptions struct { // This is to support grpclb. resolverBuilder resolver.Builder waitForHandshake bool + channelzParentID int64 } const ( @@ -116,6 +118,12 @@ const ( defaultClientMaxSendMessageSize = math.MaxInt32 ) +// RegisterChannelz turns on channelz service. +// This is an EXPERIMENTAL API. +func RegisterChannelz() { + channelz.TurnOn() +} + // DialOption configures how we set up the connection. type DialOption func(*dialOptions) @@ -396,6 +404,14 @@ func WithAuthority(a string) DialOption { } } +// WithChannelzParentID returns a DialOption that specifies the channelz ID of current ClientConn's +// parent. This function is used in nested channel creation (e.g. grpclb dial). +func WithChannelzParentID(id int64) DialOption { + return func(o *dialOptions) { + o.channelzParentID = id + } +} + // Dial creates a client connection to the given target. func Dial(target string, opts ...DialOption) (*ClientConn, error) { return DialContext(context.Background(), target, opts...) @@ -423,6 +439,14 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * opt(&cc.dopts) } + if channelz.IsOn() { + if cc.dopts.channelzParentID != 0 { + cc.channelzID = channelz.RegisterChannel(cc, cc.dopts.channelzParentID, target) + } else { + cc.channelzID = channelz.RegisterChannel(cc, 0, target) + } + } + if !cc.dopts.insecure { if cc.dopts.copts.TransportCredentials == nil { return nil, errNoTransportSecurity @@ -538,8 +562,9 @@ func DialContext(ctx context.Context, target string, opts ...DialOption) (conn * credsClone = creds.Clone() } cc.balancerBuildOpts = balancer.BuildOptions{ - DialCreds: credsClone, - Dialer: cc.dopts.copts.Dialer, + DialCreds: credsClone, + Dialer: cc.dopts.copts.Dialer, + ChannelzParentID: cc.channelzID, } // Build the resolver. @@ -641,6 +666,8 @@ type ClientConn struct { preBalancerName string // previous balancer name. curAddresses []resolver.Address balancerWrapper *ccBalancerWrapper + + channelzID int64 // channelz unique identification number } // WaitForStateChange waits until the connectivity.State of ClientConn changes from sourceState or @@ -804,6 +831,9 @@ func (cc *ClientConn) newAddrConn(addrs []resolver.Address) (*addrConn, error) { cc.mu.Unlock() return nil, ErrClientConnClosing } + if channelz.IsOn() { + ac.channelzID = channelz.RegisterSubChannel(ac, cc.channelzID, "") + } cc.conns[ac] = struct{}{} cc.mu.Unlock() return ac, nil @@ -822,6 +852,12 @@ func (cc *ClientConn) removeAddrConn(ac *addrConn, err error) { ac.tearDown(err) } +// ChannelzMetric returns ChannelInternalMetric of current ClientConn. +// This is an EXPERIMENTAL API. +func (cc *ClientConn) ChannelzMetric() *channelz.ChannelInternalMetric { + return &channelz.ChannelInternalMetric{} +} + // connect starts to creating transport and also starts the transport monitor // goroutine for this ac. // It does nothing if the ac is not IDLE. @@ -979,6 +1015,9 @@ func (cc *ClientConn) Close() error { for ac := range conns { ac.tearDown(ErrClientConnClosing) } + if channelz.IsOn() { + channelz.RemoveEntry(cc.channelzID) + } return nil } @@ -1012,6 +1051,8 @@ type addrConn struct { // connectDeadline is the time by which all connection // negotiations must complete. connectDeadline time.Time + + channelzID int64 // channelz unique identification number } // adjustParams updates parameters used to create transports upon @@ -1153,6 +1194,9 @@ func (ac *addrConn) createTransport(connectRetryNum, ridx int, backoffDeadline, // Do not cancel in the success path because of // this issue in Go1.6: https://github.com/golang/go/issues/15078. connectCtx, cancel := context.WithDeadline(ac.ctx, connectDeadline) + if channelz.IsOn() { + copts.ChannelzParentID = ac.channelzID + } newTr, err := transport.NewClientTransport(connectCtx, ac.cc.ctx, target, copts, onPrefaceReceipt) if err != nil { cancel() @@ -1399,6 +1443,9 @@ func (ac *addrConn) tearDown(err error) { close(ac.ready) ac.ready = nil } + if channelz.IsOn() { + channelz.RemoveEntry(ac.channelzID) + } return } @@ -1408,6 +1455,10 @@ func (ac *addrConn) getState() connectivity.State { return ac.state } +func (ac *addrConn) ChannelzMetric() *channelz.ChannelInternalMetric { + return &channelz.ChannelInternalMetric{} +} + // ErrClientConnTimeout indicates that the ClientConn cannot establish the // underlying connections within the specified timeout. // diff --git a/grpclb_remote_balancer.go b/grpclb_remote_balancer.go index 1b580df2..f87e9733 100644 --- a/grpclb_remote_balancer.go +++ b/grpclb_remote_balancer.go @@ -26,6 +26,8 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc/balancer" + "google.golang.org/grpc/channelz" + "google.golang.org/grpc/connectivity" lbpb "google.golang.org/grpc/grpclb/grpc_lb_v1/messages" "google.golang.org/grpc/grpclog" @@ -168,6 +170,7 @@ func (lb *lbBalancer) sendLoadReport(s *balanceLoadClientStream, interval time.D } } } + func (lb *lbBalancer) callRemoteBalancer() error { lbClient := &loadBalancerClient{cc: lb.ccRemoteLB} ctx, cancel := context.WithCancel(context.Background()) @@ -243,9 +246,13 @@ func (lb *lbBalancer) dialRemoteLB(remoteLBName string) { // Explicitly set pickfirst as the balancer. dopts = append(dopts, WithBalancerName(PickFirstBalancerName)) dopts = append(dopts, withResolverBuilder(lb.manualResolver)) - // Dial using manualResolver.Scheme, which is a random scheme generated + if channelz.IsOn() { + dopts = append(dopts, WithChannelzParentID(lb.opt.ChannelzParentID)) + } + + // DialContext using manualResolver.Scheme, which is a random scheme generated // when init grpclb. The target name is not important. - cc, err := Dial("grpclb:///grpclb.server", dopts...) + cc, err := DialContext(context.Background(), "grpclb:///grpclb.server", dopts...) if err != nil { grpclog.Fatalf("failed to dial: %v", err) } diff --git a/server.go b/server.go index c6b413b9..b5363572 100644 --- a/server.go +++ b/server.go @@ -37,6 +37,8 @@ import ( "golang.org/x/net/context" "golang.org/x/net/http2" "golang.org/x/net/trace" + + "google.golang.org/grpc/channelz" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/encoding" @@ -97,11 +99,14 @@ type Server struct { m map[string]*service // service name -> service info events trace.EventLog - quit chan struct{} - done chan struct{} - quitOnce sync.Once - doneOnce sync.Once - serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop + quit chan struct{} + done chan struct{} + quitOnce sync.Once + doneOnce sync.Once + channelzRemoveOnce sync.Once + serveWG sync.WaitGroup // counts active Serve goroutines for GracefulStop + + channelzID int64 // channelz unique identification number } type options struct { @@ -343,6 +348,10 @@ func NewServer(opt ...ServerOption) *Server { _, file, line, _ := runtime.Caller(1) s.events = trace.NewEventLog("grpc.Server", fmt.Sprintf("%s:%d", file, line)) } + + if channelz.IsOn() { + s.channelzID = channelz.RegisterServer(s, "") + } return s } @@ -458,6 +467,23 @@ func (s *Server) useTransportAuthenticator(rawConn net.Conn) (net.Conn, credenti return s.opts.creds.ServerHandshake(rawConn) } +type listenSocket struct { + net.Listener + channelzID int64 +} + +func (l *listenSocket) ChannelzMetric() *channelz.SocketInternalMetric { + return &channelz.SocketInternalMetric{} +} + +func (l *listenSocket) Close() error { + err := l.Listener.Close() + if channelz.IsOn() { + channelz.RemoveEntry(l.channelzID) + } + return err +} + // Serve accepts incoming connections on the listener lis, creating a new // ServerTransport and service goroutine for each. The service goroutines // read gRPC requests and then call the registered handlers to reply to them. @@ -482,17 +508,29 @@ func (s *Server) Serve(lis net.Listener) error { // Stop or GracefulStop called; block until done and return nil. case <-s.quit: <-s.done + + s.channelzRemoveOnce.Do(func() { + if channelz.IsOn() { + channelz.RemoveEntry(s.channelzID) + } + }) default: } }() - s.lis[lis] = true + ls := &listenSocket{Listener: lis} + s.lis[ls] = true + + if channelz.IsOn() { + ls.channelzID = channelz.RegisterListenSocket(ls, s.channelzID, "") + } s.mu.Unlock() + defer func() { s.mu.Lock() - if s.lis != nil && s.lis[lis] { - lis.Close() - delete(s.lis, lis) + if s.lis != nil && s.lis[ls] { + ls.Close() + delete(s.lis, ls) } s.mu.Unlock() }() @@ -614,6 +652,7 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr InitialConnWindowSize: s.opts.initialConnWindowSize, WriteBufferSize: s.opts.writeBufferSize, ReadBufferSize: s.opts.readBufferSize, + ChannelzParentID: s.channelzID, } st, err := transport.NewServerTransport("http2", c, config) if err != nil { @@ -624,6 +663,7 @@ func (s *Server) newHTTP2Transport(c net.Conn, authInfo credentials.AuthInfo) tr grpclog.Warningln("grpc: Server.Serve failed to create ServerTransport: ", err) return nil } + return st } @@ -751,6 +791,12 @@ func (s *Server) removeConn(c io.Closer) { } } +// ChannelzMetric returns ServerInternalMetric of current server. +// This is an EXPERIMENTAL API. +func (s *Server) ChannelzMetric() *channelz.ServerInternalMetric { + return &channelz.ServerInternalMetric{} +} + func (s *Server) sendResponse(t transport.ServerTransport, stream *transport.Stream, msg interface{}, cp Compressor, opts *transport.Options, comp encoding.Compressor) error { var ( outPayload *stats.OutPayload @@ -1224,6 +1270,12 @@ func (s *Server) Stop() { }) }() + s.channelzRemoveOnce.Do(func() { + if channelz.IsOn() { + channelz.RemoveEntry(s.channelzID) + } + }) + s.mu.Lock() listeners := s.lis s.lis = nil @@ -1262,11 +1314,17 @@ func (s *Server) GracefulStop() { }) }() + s.channelzRemoveOnce.Do(func() { + if channelz.IsOn() { + channelz.RemoveEntry(s.channelzID) + } + }) s.mu.Lock() if s.conns == nil { s.mu.Unlock() return } + for lis := range s.lis { lis.Close() } diff --git a/test/channelz_test.go b/test/channelz_test.go new file mode 100644 index 00000000..572bf5a0 --- /dev/null +++ b/test/channelz_test.go @@ -0,0 +1,350 @@ +/* + * + * Copyright 2018 gRPC authors. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package test + +import ( + "net" + "testing" + "time" + + "google.golang.org/grpc" + + "google.golang.org/grpc/channelz" + "google.golang.org/grpc/resolver" + "google.golang.org/grpc/resolver/manual" + testpb "google.golang.org/grpc/test/grpc_testing" + "google.golang.org/grpc/test/leakcheck" +) + +func init() { + channelz.TurnOn() +} + +func (te *test) startServers(ts testpb.TestServiceServer, num int) { + for i := 0; i < num; i++ { + te.startServer(ts) + te.srvs = append(te.srvs, te.srv) + te.srvAddrs = append(te.srvAddrs, te.srvAddr) + te.srv = nil + te.srvAddr = "" + } +} + +func TestCZServerRegistrationAndDeletion(t *testing.T) { + defer leakcheck.Check(t) + testcases := []struct { + total int + start int64 + length int + end bool + }{ + {total: channelz.EntryPerPage, start: 0, length: channelz.EntryPerPage, end: true}, + {total: channelz.EntryPerPage - 1, start: 0, length: channelz.EntryPerPage - 1, end: true}, + {total: channelz.EntryPerPage + 1, start: 0, length: channelz.EntryPerPage, end: false}, + {total: channelz.EntryPerPage + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), length: 0, end: true}, + } + + for _, c := range testcases { + channelz.NewChannelzStorage() + e := tcpClearRREnv + te := newTest(t, e) + te.startServers(&testServer{security: e.security}, c.total) + + ss, end := channelz.GetServers(c.start) + if len(ss) != c.length || end != c.end { + t.Fatalf("GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", c.start, ss, len(ss), end, c.start, c.length, c.end) + } + te.tearDown() + ss, end = channelz.GetServers(c.start) + if len(ss) != 0 || !end { + t.Fatalf("GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", ss, len(ss), end) + } + } +} + +func TestCZTopChannelRegistrationAndDeletion(t *testing.T) { + defer leakcheck.Check(t) + testcases := []struct { + total int + start int64 + length int + end bool + }{ + {total: channelz.EntryPerPage, start: 0, length: channelz.EntryPerPage, end: true}, + {total: channelz.EntryPerPage - 1, start: 0, length: channelz.EntryPerPage - 1, end: true}, + {total: channelz.EntryPerPage + 1, start: 0, length: channelz.EntryPerPage, end: false}, + {total: channelz.EntryPerPage + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), length: 0, end: true}, + } + + for _, c := range testcases { + channelz.NewChannelzStorage() + e := tcpClearRREnv + te := newTest(t, e) + var ccs []*grpc.ClientConn + for i := 0; i < c.total; i++ { + cc := te.clientConn() + te.cc = nil + // avoid making next dial blocking + te.srvAddr = "" + ccs = append(ccs, cc) + } + time.Sleep(10 * time.Millisecond) + tcs, end := channelz.GetTopChannels(c.start) + if len(tcs) != c.length || end != c.end { + t.Fatalf("GetTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end) + } + for _, cc := range ccs { + cc.Close() + } + tcs, end = channelz.GetTopChannels(c.start) + if len(tcs) != 0 || !end { + t.Fatalf("GetTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end) + } + + te.tearDown() + } +} + +func TestCZNestedChannelRegistrationAndDeletion(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + // avoid calling API to set balancer type, which will void service config's change of balancer. + e.balancer = "" + te := newTest(t, e) + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + resolvedAddrs := []resolver.Address{{Addr: "127.0.0.1:0", Type: resolver.GRPCLB, ServerName: "grpclb.server"}} + r.InitialAddrs(resolvedAddrs) + te.resolverScheme = r.Scheme() + te.clientConn() + defer te.tearDown() + time.Sleep(10 * time.Millisecond) + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + t.Fatalf("There should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].NestedChans) != 1 { + t.Fatalf("There should be one nested channel from grpclb, not %d", len(tcs[0].NestedChans)) + } + + r.NewServiceConfig(`{"loadBalancingPolicy": "round_robin"}`) + r.NewAddress([]resolver.Address{{Addr: "127.0.0.1:0"}}) + + // wait for the shutdown of grpclb balancer + time.Sleep(10 * time.Millisecond) + tcs, _ = channelz.GetTopChannels(0) + if len(tcs) != 1 { + t.Fatalf("There should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].NestedChans) != 0 { + t.Fatalf("There should be 0 nested channel from grpclb, not %d", len(tcs[0].NestedChans)) + } + +} + +func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + num := 3 // number of backends + te := newTest(t, e) + var svrAddrs []resolver.Address + te.startServers(&testServer{security: e.security}, num) + r, cleanup := manual.GenerateAndRegisterManualResolver() + defer cleanup() + for _, a := range te.srvAddrs { + svrAddrs = append(svrAddrs, resolver.Address{Addr: a}) + } + r.InitialAddrs(svrAddrs) + te.resolverScheme = r.Scheme() + te.clientConn() + defer te.tearDown() + // Here, we just wait for all sockets to be up. In the future, if we implement + // IDLE, we may need to make several rpc calls to create the sockets. + time.Sleep(100 * time.Millisecond) + tcs, _ := channelz.GetTopChannels(0) + if len(tcs) != 1 { + t.Fatalf("There should only be one top channel, not %d", len(tcs)) + } + if len(tcs[0].SubChans) != num { + t.Fatalf("There should be %d subchannel not %d", num, len(tcs[0].SubChans)) + } + count := 0 + for k := range tcs[0].SubChans { + sc := channelz.GetSubChannel(k) + if sc == nil { + t.Fatalf("got subchannel") + } + count += len(sc.Sockets) + } + if count != num { + t.Fatalf("There should be %d sockets not %d", num, count) + } + + r.NewAddress(svrAddrs[:len(svrAddrs)-1]) + time.Sleep(100 * time.Millisecond) + tcs, _ = channelz.GetTopChannels(0) + if len(tcs[0].SubChans) != num-1 { + t.Fatalf("There should be %d subchannel not %d", num-1, len(tcs[0].SubChans)) + } + count = 0 + for k := range tcs[0].SubChans { + sc := channelz.GetSubChannel(k) + if sc == nil { + t.Fatalf("got subchannel") + } + count += len(sc.Sockets) + } + if count != num-1 { + t.Fatalf("There should be %d sockets not %d", num-1, count) + } + +} + +func TestCZServerSocketRegistrationAndDeletion(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + e := tcpClearRREnv + num := 3 // number of clients + te := newTest(t, e) + te.startServer(&testServer{security: e.security}) + defer te.tearDown() + var ccs []*grpc.ClientConn + for i := 0; i < num; i++ { + cc := te.clientConn() + te.cc = nil + ccs = append(ccs, cc) + } + defer func() { + for _, c := range ccs[:len(ccs)-1] { + c.Close() + } + }() + time.Sleep(10 * time.Millisecond) + ss, _ := channelz.GetServers(0) + if len(ss) != 1 { + t.Fatalf("There should only be one server, not %d", len(ss)) + } + if len(ss[0].ListenSockets) != 1 { + t.Fatalf("There should only be one server listen socket, not %d", len(ss[0].ListenSockets)) + } + ns, _ := channelz.GetServerSockets(ss[0].ID, 0) + if len(ns) != num { + t.Fatalf("There should be %d normal sockets not %d", num, len(ns)) + } + + ccs[len(ccs)-1].Close() + time.Sleep(10 * time.Millisecond) + + ns, _ = channelz.GetServerSockets(ss[0].ID, 0) + if len(ns) != num-1 { + t.Fatalf("There should be %d normal sockets not %d", num-1, len(ns)) + } +} + +func TestCZServerListenSocketDeletion(t *testing.T) { + defer leakcheck.Check(t) + channelz.NewChannelzStorage() + s := grpc.NewServer() + lis, err := net.Listen("tcp", "localhost:0") + if err != nil { + t.Fatalf("failed to listen: %v", err) + } + go s.Serve(lis) + time.Sleep(10 * time.Millisecond) + ss, _ := channelz.GetServers(0) + if len(ss) != 1 { + t.Fatalf("There should only be one server, not %d", len(ss)) + } + if len(ss[0].ListenSockets) != 1 { + t.Fatalf("There should only be one server listen socket, not %d", len(ss[0].ListenSockets)) + } + + lis.Close() + time.Sleep(10 * time.Millisecond) + ss, _ = channelz.GetServers(0) + if len(ss) != 1 { + t.Fatalf("There should be 1 server, not %d", len(ss)) + } + s.Stop() +} + +type dummyChannel struct{} + +func (d *dummyChannel) ChannelzMetric() *channelz.ChannelInternalMetric { + return &channelz.ChannelInternalMetric{} +} + +type dummySocket struct{} + +func (d *dummySocket) ChannelzMetric() *channelz.SocketInternalMetric { + return &channelz.SocketInternalMetric{} +} + +func TestCZRecusivelyDeletionOfEntry(t *testing.T) { + // +--+TopChan+---+ + // | | + // v v + // +-+SubChan1+--+ SubChan2 + // | | + // v v + // Socket1 Socket2 + channelz.NewChannelzStorage() + topChanID := channelz.RegisterChannel(&dummyChannel{}, 0, "") + subChanID1 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") + subChanID2 := channelz.RegisterSubChannel(&dummyChannel{}, topChanID, "") + sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "") + sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "") + + tcs, _ := channelz.GetTopChannels(0) + if tcs == nil || len(tcs) != 1 { + t.Fatalf("There should be one TopChannel entry") + } + if len(tcs[0].SubChans) != 2 { + t.Fatalf("There should be two SubChannel entries") + } + sc := channelz.GetSubChannel(subChanID1) + if sc == nil || len(sc.Sockets) != 2 { + t.Fatalf("There should be two Socket entries") + } + + channelz.RemoveEntry(topChanID) + tcs, _ = channelz.GetTopChannels(0) + if tcs == nil || len(tcs) != 1 { + t.Fatalf("There should be one TopChannel entry") + } + + channelz.RemoveEntry(subChanID1) + channelz.RemoveEntry(subChanID2) + tcs, _ = channelz.GetTopChannels(0) + if tcs == nil || len(tcs) != 1 { + t.Fatalf("There should be one TopChannel entry") + } + if len(tcs[0].SubChans) != 1 { + t.Fatalf("There should be one SubChannel entry") + } + + channelz.RemoveEntry(sktID1) + channelz.RemoveEntry(sktID2) + tcs, _ = channelz.GetTopChannels(0) + if tcs != nil { + t.Fatalf("There should be no TopChannel entry") + } +} diff --git a/test/end2end_test.go b/test/end2end_test.go index ffd13961..3ca30414 100644 --- a/test/end2end_test.go +++ b/test/end2end_test.go @@ -68,6 +68,10 @@ import ( "google.golang.org/grpc/testdata" ) +func init() { + grpc.RegisterChannelz() +} + var ( // For headers: testMetadata = metadata.MD{ @@ -478,6 +482,10 @@ type test struct { srv *grpc.Server srvAddr string + // srvs and srvAddrs are set once startServers is called. + srvs []*grpc.Server + srvAddrs []string + cc *grpc.ClientConn // nil until requested via clientConn restoreLogs func() // nil unless declareLogNoise is used } @@ -498,6 +506,11 @@ func (te *test) tearDown() { if te.srv != nil { te.srv.Stop() } + if len(te.srvs) != 0 { + for _, s := range te.srvs { + s.Stop() + } + } } // newTest returns a new test using the provided testing.T and diff --git a/transport/handler_server.go b/transport/handler_server.go index 9d0c88c7..a7533800 100644 --- a/transport/handler_server.go +++ b/transport/handler_server.go @@ -420,6 +420,10 @@ func (ht *serverHandlerTransport) runStream() { } } +func (ht *serverHandlerTransport) IncrMsgSent() {} + +func (ht *serverHandlerTransport) IncrMsgRecv() {} + func (ht *serverHandlerTransport) Drain() { panic("Drain() is not implemented") } diff --git a/transport/http2_client.go b/transport/http2_client.go index 560dc397..115a275b 100644 --- a/transport/http2_client.go +++ b/transport/http2_client.go @@ -30,6 +30,8 @@ import ( "golang.org/x/net/context" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + + "google.golang.org/grpc/channelz" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" @@ -101,6 +103,8 @@ type http2Client struct { // goAwayReason records the http2.ErrCode and debug data received with the // GoAway frame. goAwayReason GoAwayReason + + channelzID int64 // channelz unique identification number } func dial(ctx context.Context, fn func(context.Context, string) (net.Conn, error), addr string) (net.Conn, error) { @@ -238,6 +242,9 @@ func newHTTP2Client(connectCtx, ctx context.Context, addr TargetInfo, opts Conne } t.statsHandler.HandleConn(t.ctx, connBegin) } + if channelz.IsOn() { + t.channelzID = channelz.RegisterNormalSocket(t, opts.ChannelzParentID, "") + } // Start the reader goroutine for incoming message. Each transport has // a dedicated goroutine which reads HTTP2 frame from network. Then it // dispatches the frame to the corresponding stream entity. @@ -665,6 +672,9 @@ func (t *http2Client) Close() error { t.controlBuf.finish() t.cancel() err := t.conn.Close() + if channelz.IsOn() { + channelz.RemoveEntry(t.channelzID) + } // Notify all active streams. for _, s := range streams { t.closeStream(s, ErrConnClosing, false, http2.ErrCodeNo, nil, nil) @@ -1188,3 +1198,10 @@ func (t *http2Client) Error() <-chan struct{} { func (t *http2Client) GoAway() <-chan struct{} { return t.goAway } + +func (t *http2Client) ChannelzMetric() *channelz.SocketInternalMetric { + return &channelz.SocketInternalMetric{} +} + +func (t *http2Client) IncrMsgSent() {} +func (t *http2Client) IncrMsgRecv() {} diff --git a/transport/http2_server.go b/transport/http2_server.go index cd9f25a6..7f5cf206 100644 --- a/transport/http2_server.go +++ b/transport/http2_server.go @@ -35,6 +35,8 @@ import ( "golang.org/x/net/context" "golang.org/x/net/http2" "golang.org/x/net/http2/hpack" + + "google.golang.org/grpc/channelz" "google.golang.org/grpc/codes" "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" @@ -90,6 +92,8 @@ type http2Server struct { initialWindowSize int32 bdpEst *bdpEstimator + channelzID int64 // channelz unique identification number + mu sync.Mutex // guard the following // drainChan is initialized when drain(...) is called the first time. @@ -218,6 +222,9 @@ func newHTTP2Server(conn net.Conn, config *ServerConfig) (_ ServerTransport, err connBegin := &stats.ConnBegin{} t.stats.HandleConn(t.ctx, connBegin) } + if channelz.IsOn() { + t.channelzID = channelz.RegisterNormalSocket(t, config.ChannelzParentID, "") + } t.framer.writer.Flush() defer func() { @@ -907,6 +914,9 @@ func (t *http2Server) Close() error { t.controlBuf.finish() t.cancel() err := t.conn.Close() + if channelz.IsOn() { + channelz.RemoveEntry(t.channelzID) + } // Cancel all active streams. for _, s := range streams { s.cancel() @@ -1027,6 +1037,13 @@ func (t *http2Server) outgoingGoAwayHandler(g *goAway) (bool, error) { return false, nil } +func (t *http2Server) ChannelzMetric() *channelz.SocketInternalMetric { + return &channelz.SocketInternalMetric{} +} + +func (t *http2Server) IncrMsgSent() {} +func (t *http2Server) IncrMsgRecv() {} + var rgen = rand.New(rand.NewSource(time.Now().UnixNano())) func getJitter(v time.Duration) time.Duration { diff --git a/transport/transport.go b/transport/transport.go index fc7658db..1afbd13a 100644 --- a/transport/transport.go +++ b/transport/transport.go @@ -413,6 +413,7 @@ type ServerConfig struct { InitialConnWindowSize int32 WriteBufferSize int ReadBufferSize int + ChannelzParentID int64 } // NewServerTransport creates a ServerTransport with conn or non-nil error @@ -448,6 +449,8 @@ type ConnectOptions struct { WriteBufferSize int // ReadBufferSize sets the size of read buffer, which in turn determines how much data can be read at most for one read syscall. ReadBufferSize int + // ChannelzParentID sets the addrConn id which initiate the creation of this client transport. + ChannelzParentID int64 } // TargetInfo contains the information of the target such as network address and metadata. @@ -547,6 +550,12 @@ type ClientTransport interface { // GetGoAwayReason returns the reason why GoAway frame was received. GetGoAwayReason() GoAwayReason + + // IncrMsgSent increments the number of message sent through this transport. + IncrMsgSent() + + // IncrMsgRecv increments the number of message received through this transport. + IncrMsgRecv() } // ServerTransport is the common interface for all gRPC server-side transport @@ -580,6 +589,12 @@ type ServerTransport interface { // Drain notifies the client this ServerTransport stops accepting new RPCs. Drain() + + // IncrMsgSent increments the number of message sent through this transport. + IncrMsgSent() + + // IncrMsgRecv increments the number of message received through this transport. + IncrMsgRecv() } // streamErrorf creates an StreamError with the specified error code and description.