channelz: respect max_results in listing methods (#2516)

This commit is contained in:
Masahiro Sano
2018-12-21 06:53:39 +09:00
committed by lyuxuan
parent 463950a151
commit 1838dedee3
5 changed files with 180 additions and 139 deletions

View File

@ -259,7 +259,7 @@ func socketMetricToProto(sm *channelz.SocketMetric) *channelzpb.Socket {
}
func (s *serverImpl) GetTopChannels(ctx context.Context, req *channelzpb.GetTopChannelsRequest) (*channelzpb.GetTopChannelsResponse, error) {
metrics, end := channelz.GetTopChannels(req.GetStartChannelId())
metrics, end := channelz.GetTopChannels(req.GetStartChannelId(), req.GetMaxResults())
resp := &channelzpb.GetTopChannelsResponse{}
for _, m := range metrics {
resp.Channel = append(resp.Channel, channelMetricToProto(m))
@ -290,7 +290,7 @@ func serverMetricToProto(sm *channelz.ServerMetric) *channelzpb.Server {
}
func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersRequest) (*channelzpb.GetServersResponse, error) {
metrics, end := channelz.GetServers(req.GetStartServerId())
metrics, end := channelz.GetServers(req.GetStartServerId(), req.GetMaxResults())
resp := &channelzpb.GetServersResponse{}
for _, m := range metrics {
resp.Server = append(resp.Server, serverMetricToProto(m))
@ -300,7 +300,7 @@ func (s *serverImpl) GetServers(ctx context.Context, req *channelzpb.GetServersR
}
func (s *serverImpl) GetServerSockets(ctx context.Context, req *channelzpb.GetServerSocketsRequest) (*channelzpb.GetServerSocketsResponse, error) {
metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId())
metrics, end := channelz.GetServerSockets(req.GetServerId(), req.GetStartSocketId(), req.GetMaxResults())
resp := &channelzpb.GetServerSocketsResponse{}
for _, m := range metrics {
resp.SocketRef = append(resp.SocketRef, &channelzpb.SocketRef{SocketId: m.ID, Name: m.RefName})

View File

@ -40,7 +40,7 @@ var (
db dbWrapper
idGen idGenerator
// EntryPerPage defines the number of channelz entries to be shown on a web page.
EntryPerPage = 50
EntryPerPage = int64(50)
curState int32
maxTraceEntry = defaultMaxTraceEntry
)
@ -113,20 +113,20 @@ func NewChannelzStorage() {
// boolean indicating whether there's more top channels to be queried for.
//
// The arg id specifies that only top channel with id at or above it will be included
// in the result. The returned slice is up to a length of EntryPerPage, and is
// sorted in ascending id order.
func GetTopChannels(id int64) ([]*ChannelMetric, bool) {
return db.get().GetTopChannels(id)
// in the result. The returned slice is up to a length of the arg maxResults or
// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
return db.get().GetTopChannels(id, maxResults)
}
// GetServers returns a slice of server's ServerMetric, along with a
// boolean indicating whether there's more servers to be queried for.
//
// The arg id specifies that only server with id at or above it will be included
// in the result. The returned slice is up to a length of EntryPerPage, and is
// sorted in ascending id order.
func GetServers(id int64) ([]*ServerMetric, bool) {
return db.get().GetServers(id)
// in the result. The returned slice is up to a length of the arg maxResults or
// EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func GetServers(id int64, maxResults int64) ([]*ServerMetric, bool) {
return db.get().GetServers(id, maxResults)
}
// GetServerSockets returns a slice of server's (identified by id) normal socket's
@ -134,10 +134,10 @@ func GetServers(id int64) ([]*ServerMetric, bool) {
// be queried for.
//
// The arg startID specifies that only sockets with id at or above it will be
// included in the result. The returned slice is up to a length of EntryPerPage,
// and is sorted in ascending id order.
func GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
return db.get().GetServerSockets(id, startID)
// included in the result. The returned slice is up to a length of the arg maxResults
// or EntryPerPage if maxResults is zero, and is sorted in ascending id order.
func GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
return db.get().GetServerSockets(id, startID, maxResults)
}
// GetChannel returns the ChannelMetric for the channel (identified by id).
@ -447,29 +447,32 @@ func copyMap(m map[int64]string) map[int64]string {
return n
}
func min(a, b int) int {
func min(a, b int64) int64 {
if a < b {
return a
}
return b
}
func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
func (c *channelMap) GetTopChannels(id int64, maxResults int64) ([]*ChannelMetric, bool) {
if maxResults <= 0 {
maxResults = EntryPerPage
}
c.mu.RLock()
l := len(c.topLevelChannels)
l := int64(len(c.topLevelChannels))
ids := make([]int64, 0, l)
cns := make([]*channel, 0, min(l, EntryPerPage))
cns := make([]*channel, 0, min(l, maxResults))
for k := range c.topLevelChannels {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
count := 0
count := int64(0)
var end bool
var t []*ChannelMetric
for i, v := range ids[idx:] {
if count == EntryPerPage {
if count == maxResults {
break
}
if cn, ok := c.channels[v]; ok {
@ -499,21 +502,24 @@ func (c *channelMap) GetTopChannels(id int64) ([]*ChannelMetric, bool) {
return t, end
}
func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
func (c *channelMap) GetServers(id, maxResults int64) ([]*ServerMetric, bool) {
if maxResults <= 0 {
maxResults = EntryPerPage
}
c.mu.RLock()
l := len(c.servers)
l := int64(len(c.servers))
ids := make([]int64, 0, l)
ss := make([]*server, 0, min(l, EntryPerPage))
ss := make([]*server, 0, min(l, maxResults))
for k := range c.servers {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= id })
count := 0
count := int64(0)
var end bool
var s []*ServerMetric
for i, v := range ids[idx:] {
if count == EntryPerPage {
if count == maxResults {
break
}
if svr, ok := c.servers[v]; ok {
@ -541,7 +547,10 @@ func (c *channelMap) GetServers(id int64) ([]*ServerMetric, bool) {
return s, end
}
func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric, bool) {
func (c *channelMap) GetServerSockets(id int64, startID int64, maxResults int64) ([]*SocketMetric, bool) {
if maxResults <= 0 {
maxResults = EntryPerPage
}
var svr *server
var ok bool
c.mu.RLock()
@ -551,18 +560,18 @@ func (c *channelMap) GetServerSockets(id int64, startID int64) ([]*SocketMetric,
return nil, true
}
svrskts := svr.sockets
l := len(svrskts)
l := int64(len(svrskts))
ids := make([]int64, 0, l)
sks := make([]*normalSocket, 0, min(l, EntryPerPage))
sks := make([]*normalSocket, 0, min(l, maxResults))
for k := range svrskts {
ids = append(ids, k)
}
sort.Sort(int64Slice(ids))
idx := sort.Search(len(ids), func(i int) bool { return ids[i] >= startID })
count := 0
count := int64(0)
var end bool
for i, v := range ids[idx:] {
if count == EntryPerPage {
if count == maxResults {
break
}
if ns, ok := c.normalSockets[v]; ok {

View File

@ -51,7 +51,7 @@ func testCZSocketMetricsSocketOption(t *testing.T, e env) {
doSuccessfulUnaryCall(tc, t)
time.Sleep(10 * time.Millisecond)
ss, _ := channelz.GetServers(0)
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
t.Fatalf("There should be one server, not %d", len(ss))
}
@ -64,7 +64,7 @@ func testCZSocketMetricsSocketOption(t *testing.T, e env) {
t.Fatalf("Unable to get server listen socket options")
}
}
ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
if len(ns) != 1 {
t.Fatalf("There should be one server normal socket, not %d", len(ns))
}
@ -72,7 +72,7 @@ func testCZSocketMetricsSocketOption(t *testing.T, e env) {
t.Fatalf("Unable to get server normal socket options")
}
tchan, _ := channelz.GetTopChannels(0)
tchan, _ := channelz.GetTopChannels(0, 0)
if len(tchan) != 1 {
t.Fatalf("There should only be one top channel, not %d", len(tchan))
}

View File

@ -72,13 +72,16 @@ func TestCZServerRegistrationAndDeletion(t *testing.T) {
testcases := []struct {
total int
start int64
length int
max int64
length int64
end bool
}{
{total: channelz.EntryPerPage, start: 0, length: channelz.EntryPerPage, end: true},
{total: channelz.EntryPerPage - 1, start: 0, length: channelz.EntryPerPage - 1, end: true},
{total: channelz.EntryPerPage + 1, start: 0, length: channelz.EntryPerPage, end: false},
{total: channelz.EntryPerPage + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), length: 0, end: true},
{total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
{total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
{total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
{total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true},
{total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
{total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
}
for _, c := range testcases {
@ -87,12 +90,12 @@ func TestCZServerRegistrationAndDeletion(t *testing.T) {
te := newTest(t, e)
te.startServers(&testServer{security: e.security}, c.total)
ss, end := channelz.GetServers(c.start)
if len(ss) != c.length || end != c.end {
ss, end := channelz.GetServers(c.start, c.max)
if int64(len(ss)) != c.length || end != c.end {
t.Fatalf("GetServers(%d) = %+v (len of which: %d), end: %+v, want len(GetServers(%d)) = %d, end: %+v", c.start, ss, len(ss), end, c.start, c.length, c.end)
}
te.tearDown()
ss, end = channelz.GetServers(c.start)
ss, end = channelz.GetServers(c.start, c.max)
if len(ss) != 0 || !end {
t.Fatalf("GetServers(0) = %+v (len of which: %d), end: %+v, want len(GetServers(0)) = 0, end: true", ss, len(ss), end)
}
@ -104,13 +107,16 @@ func TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
testcases := []struct {
total int
start int64
length int
max int64
length int64
end bool
}{
{total: channelz.EntryPerPage, start: 0, length: channelz.EntryPerPage, end: true},
{total: channelz.EntryPerPage - 1, start: 0, length: channelz.EntryPerPage - 1, end: true},
{total: channelz.EntryPerPage + 1, start: 0, length: channelz.EntryPerPage, end: false},
{total: channelz.EntryPerPage + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), length: 0, end: true},
{total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
{total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
{total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
{total: int(channelz.EntryPerPage) + 1, start: int64(2*(channelz.EntryPerPage+1) + 1), max: 0, length: 0, end: true},
{total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
{total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
}
for _, c := range testcases {
@ -126,7 +132,7 @@ func TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
ccs = append(ccs, cc)
}
if err := verifyResultWithDelay(func() (bool, error) {
if tcs, end := channelz.GetTopChannels(c.start); len(tcs) != c.length || end != c.end {
if tcs, end := channelz.GetTopChannels(c.start, c.max); int64(len(tcs)) != c.length || end != c.end {
return false, fmt.Errorf("getTopChannels(%d) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(%d)) = %d, end: %+v", c.start, tcs, len(tcs), end, c.start, c.length, c.end)
}
return true, nil
@ -139,7 +145,7 @@ func TestCZTopChannelRegistrationAndDeletion(t *testing.T) {
}
if err := verifyResultWithDelay(func() (bool, error) {
if tcs, end := channelz.GetTopChannels(c.start); len(tcs) != 0 || !end {
if tcs, end := channelz.GetTopChannels(c.start, c.max); len(tcs) != 0 || !end {
return false, fmt.Errorf("getTopChannels(0) = %+v (len of which: %d), end: %+v, want len(GetTopChannels(0)) = 0, end: true", tcs, len(tcs), end)
}
return true, nil
@ -166,7 +172,7 @@ func TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
defer te.tearDown()
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -183,7 +189,7 @@ func TestCZNestedChannelRegistrationAndDeletion(t *testing.T) {
// wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -216,7 +222,7 @@ func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
// Here, we just wait for all sockets to be up. In the future, if we implement
// IDLE, we may need to make several rpc calls to create the sockets.
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -243,7 +249,7 @@ func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
r.NewAddress(svrAddrs[:len(svrAddrs)-1])
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -270,52 +276,78 @@ func TestCZClientSubChannelSocketRegistrationAndDeletion(t *testing.T) {
func TestCZServerSocketRegistrationAndDeletion(t *testing.T) {
defer leakcheck.Check(t)
channelz.NewChannelzStorage()
e := tcpClearRREnv
num := 3 // number of clients
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
var ccs []*grpc.ClientConn
for i := 0; i < num; i++ {
cc := te.clientConn()
te.cc = nil
ccs = append(ccs, cc)
}
defer func() {
for _, c := range ccs[:len(ccs)-1] {
c.Close()
}
}()
var svrID int64
if err := verifyResultWithDelay(func() (bool, error) {
ss, _ := channelz.GetServers(0)
if len(ss) != 1 {
return false, fmt.Errorf("there should only be one server, not %d", len(ss))
}
if len(ss[0].ListenSockets) != 1 {
return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
}
ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
if len(ns) != num {
return false, fmt.Errorf("there should be %d normal sockets not %d", num, len(ns))
}
svrID = ss[0].ID
return true, nil
}); err != nil {
t.Fatal(err)
testcases := []struct {
total int
start int64
max int64
length int64
end bool
}{
{total: int(channelz.EntryPerPage), start: 0, max: 0, length: channelz.EntryPerPage, end: true},
{total: int(channelz.EntryPerPage) - 1, start: 0, max: 0, length: channelz.EntryPerPage - 1, end: true},
{total: int(channelz.EntryPerPage) + 1, start: 0, max: 0, length: channelz.EntryPerPage, end: false},
{total: int(channelz.EntryPerPage), start: 1, max: 0, length: channelz.EntryPerPage - 1, end: true},
{total: int(channelz.EntryPerPage) + 1, start: channelz.EntryPerPage + 1, max: 0, length: 0, end: true},
{total: int(channelz.EntryPerPage), start: 0, max: 1, length: 1, end: false},
{total: int(channelz.EntryPerPage), start: 0, max: channelz.EntryPerPage - 1, length: channelz.EntryPerPage - 1, end: false},
}
ccs[len(ccs)-1].Close()
if err := verifyResultWithDelay(func() (bool, error) {
ns, _ := channelz.GetServerSockets(svrID, 0)
if len(ns) != num-1 {
return false, fmt.Errorf("there should be %d normal sockets not %d", num-1, len(ns))
for _, c := range testcases {
channelz.NewChannelzStorage()
e := tcpClearRREnv
te := newTest(t, e)
te.startServer(&testServer{security: e.security})
defer te.tearDown()
var ccs []*grpc.ClientConn
for i := 0; i < c.total; i++ {
cc := te.clientConn()
te.cc = nil
ccs = append(ccs, cc)
}
var svrID int64
if err := verifyResultWithDelay(func() (bool, error) {
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
return false, fmt.Errorf("there should only be one server, not %d", len(ss))
}
if len(ss[0].ListenSockets) != 1 {
return false, fmt.Errorf("there should only be one server listen socket, not %d", len(ss[0].ListenSockets))
}
startID := c.start
if startID != 0 {
ns, _ := channelz.GetServerSockets(ss[0].ID, 0, int64(c.total))
if int64(len(ns)) < c.start {
return false, fmt.Errorf("there should more than %d sockets, not %d", len(ns), c.start)
}
startID = ns[c.start-1].ID + 1
}
ns, end := channelz.GetServerSockets(ss[0].ID, startID, c.max)
if int64(len(ns)) != c.length || end != c.end {
return false, fmt.Errorf("GetServerSockets(%d) = %+v (len of which: %d), end: %+v, want len(GetServerSockets(%d)) = %d, end: %+v", c.start, ns, len(ns), end, c.start, c.length, c.end)
}
svrID = ss[0].ID
return true, nil
}); err != nil {
t.Fatal(err)
}
for _, cc := range ccs {
cc.Close()
}
if err := verifyResultWithDelay(func() (bool, error) {
ns, _ := channelz.GetServerSockets(svrID, c.start, c.max)
if len(ns) != 0 {
return false, fmt.Errorf("there should be %d normal sockets not %d", 0, len(ns))
}
return true, nil
}); err != nil {
t.Fatal(err)
}
return true, nil
}); err != nil {
t.Fatal(err)
}
}
@ -329,7 +361,7 @@ func TestCZServerListenSocketDeletion(t *testing.T) {
}
go s.Serve(lis)
if err := verifyResultWithDelay(func() (bool, error) {
ss, _ := channelz.GetServers(0)
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
return false, fmt.Errorf("there should only be one server, not %d", len(ss))
}
@ -343,7 +375,7 @@ func TestCZServerListenSocketDeletion(t *testing.T) {
lis.Close()
if err := verifyResultWithDelay(func() (bool, error) {
ss, _ := channelz.GetServers(0)
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
return false, fmt.Errorf("there should be 1 server, not %d", len(ss))
}
@ -381,7 +413,7 @@ func TestCZRecusivelyDeletionOfEntry(t *testing.T) {
sktID1 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
sktID2 := channelz.RegisterNormalSocket(&dummySocket{}, subChanID1, "")
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if tcs == nil || len(tcs) != 1 {
t.Fatalf("There should be one TopChannel entry")
}
@ -394,14 +426,14 @@ func TestCZRecusivelyDeletionOfEntry(t *testing.T) {
}
channelz.RemoveEntry(topChanID)
tcs, _ = channelz.GetTopChannels(0)
tcs, _ = channelz.GetTopChannels(0, 0)
if tcs == nil || len(tcs) != 1 {
t.Fatalf("There should be one TopChannel entry")
}
channelz.RemoveEntry(subChanID1)
channelz.RemoveEntry(subChanID2)
tcs, _ = channelz.GetTopChannels(0)
tcs, _ = channelz.GetTopChannels(0, 0)
if tcs == nil || len(tcs) != 1 {
t.Fatalf("There should be one TopChannel entry")
}
@ -411,7 +443,7 @@ func TestCZRecusivelyDeletionOfEntry(t *testing.T) {
channelz.RemoveEntry(sktID1)
channelz.RemoveEntry(sktID2)
tcs, _ = channelz.GetTopChannels(0)
tcs, _ = channelz.GetTopChannels(0, 0)
if tcs != nil {
t.Fatalf("There should be no TopChannel entry")
}
@ -465,7 +497,7 @@ func TestCZChannelMetrics(t *testing.T) {
// Here, we just wait for all sockets to be up. In the future, if we implement
// IDLE, we may need to make several rpc calls to create the sockets.
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -543,7 +575,7 @@ func TestCZServerMetrics(t *testing.T) {
defer stream.CloseSend()
if err := verifyResultWithDelay(func() (bool, error) {
ss, _ := channelz.GetServers(0)
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
return false, fmt.Errorf("there should only be one server, not %d", len(ss))
}
@ -806,7 +838,7 @@ func TestCZClientSocketMetricsStreamsAndMessagesCount(t *testing.T) {
doSuccessfulUnaryCall(tc, t)
var scID, skID int64
if err := verifyResultWithDelay(func() (bool, error) {
tchan, _ := channelz.GetTopChannels(0)
tchan, _ := channelz.GetTopChannels(0, 0)
if len(tchan) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
}
@ -907,7 +939,7 @@ func TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testi
doServerSideInitiatedFailedStreamWithClientBreakFlowControl(tc, t, dw)
if err := verifyResultWithDelay(func() (bool, error) {
tchan, _ := channelz.GetTopChannels(0)
tchan, _ := channelz.GetTopChannels(0, 0)
if len(tchan) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
}
@ -933,12 +965,12 @@ func TestCZClientAndServerSocketMetricsStreamsCountFlowControlRSTStream(t *testi
if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 0 || sktData.StreamsFailed != 1 {
return false, fmt.Errorf("channelz.GetSocket(%d), want (StreamsStarted, StreamsSucceeded, StreamsFailed) = (1, 0, 1), got (%d, %d, %d)", skt.ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed)
}
ss, _ := channelz.GetServers(0)
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
return false, fmt.Errorf("there should only be one server, not %d", len(ss))
}
ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
if len(ns) != 1 {
return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
}
@ -973,7 +1005,7 @@ func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
var cliSktID, svrSktID int64
if err := verifyResultWithDelay(func() (bool, error) {
tchan, _ := channelz.GetTopChannels(0)
tchan, _ := channelz.GetTopChannels(0, 0)
if len(tchan) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
}
@ -1000,11 +1032,11 @@ func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
}
ss, _ := channelz.GetServers(0)
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
return false, fmt.Errorf("there should only be one server, not %d", len(ss))
}
ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
sktData = ns[0].SocketData
if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65486 {
return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
@ -1025,11 +1057,11 @@ func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 55475 {
return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 55475), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
}
ss, _ := channelz.GetServers(0)
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
return false, fmt.Errorf("there should only be one server, not %d", len(ss))
}
ns, _ := channelz.GetServerSockets(svrSktID, 0)
ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
sktData = ns[0].SocketData
if sktData.LocalFlowControlWindow != 55475 || sktData.RemoteFlowControlWindow != 65486 {
return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (55475, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
@ -1050,11 +1082,11 @@ func TestCZClientAndServerSocketMetricsFlowControl(t *testing.T) {
if sktData.LocalFlowControlWindow != 65486 || sktData.RemoteFlowControlWindow != 65536 {
return false, fmt.Errorf("client: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65486, 65536), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
}
ss, _ := channelz.GetServers(0)
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
return false, fmt.Errorf("there should only be one server, not %d", len(ss))
}
ns, _ := channelz.GetServerSockets(svrSktID, 0)
ns, _ := channelz.GetServerSockets(svrSktID, 0, 0)
sktData = ns[0].SocketData
if sktData.LocalFlowControlWindow != 65536 || sktData.RemoteFlowControlWindow != 65486 {
return false, fmt.Errorf("server: (LocalFlowControlWindow, RemoteFlowControlWindow) size should be (65536, 65486), not (%d, %d)", sktData.LocalFlowControlWindow, sktData.RemoteFlowControlWindow)
@ -1078,7 +1110,7 @@ func TestCZClientSocketMetricsKeepAlive(t *testing.T) {
doIdleCallToInvokeKeepAlive(tc, t)
if err := verifyResultWithDelay(func() (bool, error) {
tchan, _ := channelz.GetTopChannels(0)
tchan, _ := channelz.GetTopChannels(0, 0)
if len(tchan) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
}
@ -1123,7 +1155,7 @@ func TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
var svrID int64
if err := verifyResultWithDelay(func() (bool, error) {
ss, _ := channelz.GetServers(0)
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
return false, fmt.Errorf("there should only be one server, not %d", len(ss))
}
@ -1135,7 +1167,7 @@ func TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
doSuccessfulUnaryCall(tc, t)
if err := verifyResultWithDelay(func() (bool, error) {
ns, _ := channelz.GetServerSockets(svrID, 0)
ns, _ := channelz.GetServerSockets(svrID, 0, 0)
sktData := ns[0].SocketData
if sktData.StreamsStarted != 1 || sktData.StreamsSucceeded != 1 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, MessagesSent, MessagesReceived) = (1, 1, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
@ -1147,7 +1179,7 @@ func TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
doServerSideFailedUnaryCall(tc, t)
if err := verifyResultWithDelay(func() (bool, error) {
ns, _ := channelz.GetServerSockets(svrID, 0)
ns, _ := channelz.GetServerSockets(svrID, 0, 0)
sktData := ns[0].SocketData
if sktData.StreamsStarted != 2 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 0 || sktData.MessagesSent != 1 || sktData.MessagesReceived != 1 {
return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (2, 2, 0, 1, 1), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
@ -1159,7 +1191,7 @@ func TestCZServerSocketMetricsStreamsAndMessagesCount(t *testing.T) {
doClientSideInitiatedFailedStream(tc, t)
if err := verifyResultWithDelay(func() (bool, error) {
ns, _ := channelz.GetServerSockets(svrID, 0)
ns, _ := channelz.GetServerSockets(svrID, 0, 0)
sktData := ns[0].SocketData
if sktData.StreamsStarted != 3 || sktData.StreamsSucceeded != 2 || sktData.StreamsFailed != 1 || sktData.MessagesSent != 2 || sktData.MessagesReceived != 2 {
return false, fmt.Errorf("server socket metric with ID %d, want (StreamsStarted, StreamsSucceeded, StreamsFailed, MessagesSent, MessagesReceived) = (3, 2, 1, 2, 2), got (%d, %d, %d, %d, %d)", ns[0].ID, sktData.StreamsStarted, sktData.StreamsSucceeded, sktData.StreamsFailed, sktData.MessagesSent, sktData.MessagesReceived)
@ -1183,11 +1215,11 @@ func TestCZServerSocketMetricsKeepAlive(t *testing.T) {
doIdleCallToInvokeKeepAlive(tc, t)
if err := verifyResultWithDelay(func() (bool, error) {
ss, _ := channelz.GetServers(0)
ss, _ := channelz.GetServers(0, 0)
if len(ss) != 1 {
return false, fmt.Errorf("there should be one server, not %d", len(ss))
}
ns, _ := channelz.GetServerSockets(ss[0].ID, 0)
ns, _ := channelz.GetServerSockets(ss[0].ID, 0, 0)
if len(ns) != 1 {
return false, fmt.Errorf("there should be one server normal socket, not %d", len(ns))
}
@ -1235,7 +1267,7 @@ func TestCZSocketGetSecurityValueTLS(t *testing.T) {
defer te.tearDown()
te.clientConn()
if err := verifyResultWithDelay(func() (bool, error) {
tchan, _ := channelz.GetTopChannels(0)
tchan, _ := channelz.GetTopChannels(0, 0)
if len(tchan) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tchan))
}
@ -1292,7 +1324,7 @@ func TestCZChannelTraceCreationDeletion(t *testing.T) {
defer te.tearDown()
var nestedConn int64
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1327,7 +1359,7 @@ func TestCZChannelTraceCreationDeletion(t *testing.T) {
// wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1369,7 +1401,7 @@ func TestCZSubChannelTraceCreationDeletion(t *testing.T) {
// Here, we just wait for all sockets to be up. In the future, if we implement
// IDLE, we may need to make several rpc calls to create the sockets.
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1405,7 +1437,7 @@ func TestCZSubChannelTraceCreationDeletion(t *testing.T) {
r.NewAddress([]resolver.Address{})
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1449,7 +1481,7 @@ func TestCZChannelAddressResolutionChange(t *testing.T) {
// Here, we just wait for all sockets to be up. In the future, if we implement
// IDLE, we may need to make several rpc calls to create the sockets.
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1564,7 +1596,7 @@ func TestCZSubChannelPickedNewAddress(t *testing.T) {
// Here, we just wait for all sockets to be up. In the future, if we implement
// IDLE, we may need to make several rpc calls to create the sockets.
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1623,7 +1655,7 @@ func TestCZSubChannelConnectivityState(t *testing.T) {
// we need to obtain the SubChannel id before it gets deleted from Channel's children list (due
// to effect of r.NewAddress([]resolver.Address{}))
if subConn == 0 {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1714,7 +1746,7 @@ func TestCZChannelConnectivityState(t *testing.T) {
}
te.srv.Stop()
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1768,7 +1800,7 @@ func TestCZTraceOverwriteChannelDeletion(t *testing.T) {
defer te.tearDown()
var nestedConn int64
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1788,7 +1820,7 @@ func TestCZTraceOverwriteChannelDeletion(t *testing.T) {
// wait for the shutdown of grpclb balancer
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1830,7 +1862,7 @@ func TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
// Here, we just wait for all sockets to be up. In the future, if we implement
// IDLE, we may need to make several rpc calls to create the sockets.
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1848,7 +1880,7 @@ func TestCZTraceOverwriteSubChannelDeletion(t *testing.T) {
r.NewAddress([]resolver.Address{})
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}
@ -1887,7 +1919,7 @@ func TestCZTraceTopChannelDeletionTraceClear(t *testing.T) {
// Here, we just wait for all sockets to be up. In the future, if we implement
// IDLE, we may need to make several rpc calls to create the sockets.
if err := verifyResultWithDelay(func() (bool, error) {
tcs, _ := channelz.GetTopChannels(0)
tcs, _ := channelz.GetTopChannels(0, 0)
if len(tcs) != 1 {
return false, fmt.Errorf("there should only be one top channel, not %d", len(tcs))
}

View File

@ -903,7 +903,7 @@ func TestHealthCheckChannelzCountingCallSuccess(t *testing.T) {
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
if err := verifyResultWithDelay(func() (bool, error) {
cm, _ := channelz.GetTopChannels(0)
cm, _ := channelz.GetTopChannels(0, 0)
if len(cm) == 0 {
return false, errors.New("channelz.GetTopChannels return 0 top channel")
}
@ -960,7 +960,7 @@ func TestHealthCheckChannelzCountingCallFailure(t *testing.T) {
r.NewAddress([]resolver.Address{{Addr: lis.Addr().String()}})
if err := verifyResultWithDelay(func() (bool, error) {
cm, _ := channelz.GetTopChannels(0)
cm, _ := channelz.GetTopChannels(0, 0)
if len(cm) == 0 {
return false, errors.New("channelz.GetTopChannels return 0 top channel")
}