From f56ac76b353be506688d02c2dcd7c284f9393013 Mon Sep 17 00:00:00 2001 From: zpencer Date: Sat, 10 Mar 2018 14:58:34 -0800 Subject: core,services: add ChannelzService class (#4205) This implements the methods to expose the stats as a gRPC service. GetServerSockets is still unimplemented and will require a follow up change. --- .../java/io/grpc/services/ChannelzProtoUtil.java | 253 ++++++++++++ .../java/io/grpc/services/ChannelzService.java | 141 +++++++ .../io/grpc/services/ChannelzProtoUtilTest.java | 457 +++++++++++++++++++++ .../java/io/grpc/services/ChannelzServiceTest.java | 235 +++++++++++ .../java/io/grpc/services/ChannelzTestHelper.java | 118 ++++++ 5 files changed, 1204 insertions(+) create mode 100644 services/src/main/java/io/grpc/services/ChannelzProtoUtil.java create mode 100644 services/src/main/java/io/grpc/services/ChannelzService.java create mode 100644 services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java create mode 100644 services/src/test/java/io/grpc/services/ChannelzServiceTest.java create mode 100644 services/src/test/java/io/grpc/services/ChannelzTestHelper.java (limited to 'services') diff --git a/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java new file mode 100644 index 000000000..9ee4015ea --- /dev/null +++ b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java @@ -0,0 +1,253 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.protobuf.ByteString; +import com.google.protobuf.Int64Value; +import com.google.protobuf.util.Timestamps; +import io.grpc.ConnectivityState; +import io.grpc.Status; +import io.grpc.channelz.v1.Address; +import io.grpc.channelz.v1.Address.OtherAddress; +import io.grpc.channelz.v1.Address.TcpIpAddress; +import io.grpc.channelz.v1.Address.UdsAddress; +import io.grpc.channelz.v1.Channel; +import io.grpc.channelz.v1.ChannelData; +import io.grpc.channelz.v1.ChannelData.State; +import io.grpc.channelz.v1.ChannelRef; +import io.grpc.channelz.v1.GetServersResponse; +import io.grpc.channelz.v1.GetTopChannelsResponse; +import io.grpc.channelz.v1.Server; +import io.grpc.channelz.v1.ServerData; +import io.grpc.channelz.v1.ServerRef; +import io.grpc.channelz.v1.Socket; +import io.grpc.channelz.v1.SocketData; +import io.grpc.channelz.v1.SocketRef; +import io.grpc.channelz.v1.Subchannel; +import io.grpc.channelz.v1.SubchannelRef; +import io.grpc.internal.Channelz; +import io.grpc.internal.Channelz.ChannelStats; +import io.grpc.internal.Channelz.RootChannelList; +import io.grpc.internal.Channelz.ServerList; +import io.grpc.internal.Channelz.ServerStats; +import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.Channelz.TransportStats; +import io.grpc.internal.Instrumented; +import io.grpc.internal.WithLogId; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.concurrent.ExecutionException; + +/** + * A static utility class for turning internal data structures into protos. + */ +final class ChannelzProtoUtil { + private ChannelzProtoUtil() { + // do not instantiate. + } + + static ChannelRef toChannelRef(WithLogId obj) { + return ChannelRef + .newBuilder() + .setChannelId(obj.getLogId().getId()) + .setName(obj.toString()) + .build(); + } + + static SubchannelRef toSubchannelRef(WithLogId obj) { + return SubchannelRef + .newBuilder() + .setSubchannelId(obj.getLogId().getId()) + .setName(obj.toString()) + .build(); + } + + static ServerRef toServerRef(WithLogId obj) { + return ServerRef + .newBuilder() + .setServerId(obj.getLogId().getId()) + .setName(obj.toString()) + .build(); + } + + static SocketRef toSocketRef(WithLogId obj) { + return SocketRef + .newBuilder() + .setSocketId(obj.getLogId().getId()) + .setName(obj.toString()) + .build(); + } + + static Server toServer(Instrumented obj) { + ServerStats stats = getFuture(obj.getStats()); + return Server + .newBuilder() + .setRef(toServerRef(obj)) + .setData(toServerData(stats)) + .build(); + } + + static ServerData toServerData(ServerStats stats) { + return ServerData + .newBuilder() + .setCallsStarted(stats.callsStarted) + .setCallsSucceeded(stats.callsSucceeded) + .setCallsFailed(stats.callsFailed) + .setLastCallStartedTimestamp(Timestamps.fromMillis(stats.lastCallStartedMillis)) + .build(); + } + + static Socket toSocket(Instrumented obj) { + SocketStats socketStats = getFuture(obj.getStats()); + return Socket.newBuilder() + .setRef(toSocketRef(obj)) + .setRemote(toAddress(socketStats.remote)) + .setLocal(toAddress(socketStats.local)) + .setData(toSocketData(socketStats.data)) + .build(); + } + + static Address toAddress(SocketAddress address) { + Address.Builder builder = Address.newBuilder(); + if (address instanceof InetSocketAddress) { + InetSocketAddress inetAddress = (InetSocketAddress) address; + builder.setTcpipAddress( + TcpIpAddress + .newBuilder() + .setIpAddress( + ByteString.copyFrom(inetAddress.getAddress().getAddress())) + .build()); + } else if (address.getClass().getName().endsWith("io.netty.channel.unix.DomainSocketAddress")) { + builder.setUdsAddress( + UdsAddress + .newBuilder() + .setFilename(address.toString()) // DomainSocketAddress.toString returns filename + .build()); + } else { + builder.setOtherAddress(OtherAddress.newBuilder().setName(address.toString()).build()); + } + return builder.build(); + } + + static SocketData toSocketData(TransportStats s) { + return SocketData + .newBuilder() + .setStreamsStarted(s.streamsStarted) + .setStreamsSucceeded(s.streamsSucceeded) + .setStreamsFailed(s.streamsFailed) + .setMessagesSent(s.messagesSent) + .setMessagesReceived(s.messagesReceived) + .setKeepAlivesSent(s.keepAlivesSent) + .setLastLocalStreamCreatedTimestamp( + Timestamps.fromNanos(s.lastLocalStreamCreatedTimeNanos)) + .setLastRemoteStreamCreatedTimestamp( + Timestamps.fromNanos(s.lastRemoteStreamCreatedTimeNanos)) + .setLastMessageSentTimestamp( + Timestamps.fromNanos(s.lastMessageSentTimeNanos)) + .setLastMessageReceivedTimestamp( + Timestamps.fromNanos(s.lastMessageReceivedTimeNanos)) + .setLocalFlowControlWindow( + Int64Value.newBuilder().setValue(s.localFlowControlWindow).build()) + .setRemoteFlowControlWindow( + Int64Value.newBuilder().setValue(s.remoteFlowControlWindow).build()) + .build(); + } + + static Channel toChannel(Instrumented channel) { + ChannelStats stats = getFuture(channel.getStats()); + Channel.Builder channelBuilder = Channel + .newBuilder() + .setRef(toChannelRef(channel)) + .setData(extractChannelData(stats)); + for (WithLogId subchannel : stats.subchannels) { + channelBuilder.addSubchannelRef(toSubchannelRef(subchannel)); + } + + return channelBuilder.build(); + } + + static ChannelData extractChannelData(Channelz.ChannelStats stats) { + return ChannelData + .newBuilder() + .setTarget(stats.target) + .setState(toState(stats.state)) + .setCallsStarted(stats.callsStarted) + .setCallsSucceeded(stats.callsSucceeded) + .setCallsFailed(stats.callsFailed) + .setLastCallStartedTimestamp(Timestamps.fromMillis(stats.lastCallStartedMillis)) + .build(); + } + + static State toState(ConnectivityState state) { + if (state == null) { + return State.UNKNOWN; + } + try { + return Enum.valueOf(State.class, state.name()); + } catch (IllegalArgumentException e) { + return State.UNKNOWN; + } + } + + static Subchannel toSubchannel(Instrumented subchannel) { + ChannelStats stats = getFuture(subchannel.getStats()); + Subchannel.Builder subchannelBuilder = Subchannel + .newBuilder() + .setRef(toSubchannelRef(subchannel)) + .setData(extractChannelData(stats)); + Preconditions.checkState(stats.sockets.isEmpty() || stats.subchannels.isEmpty()); + for (WithLogId childSocket : stats.sockets) { + subchannelBuilder.addSocketRef(toSocketRef(childSocket)); + } + for (WithLogId childSubchannel : stats.subchannels) { + subchannelBuilder.addSubchannelRef(toSubchannelRef(childSubchannel)); + } + return subchannelBuilder.build(); + } + + static GetTopChannelsResponse toGetTopChannelResponse(RootChannelList rootChannels) { + GetTopChannelsResponse.Builder responseBuilder = GetTopChannelsResponse + .newBuilder() + .setEnd(rootChannels.end); + for (Instrumented c : rootChannels.channels) { + responseBuilder.addChannel(ChannelzProtoUtil.toChannel(c)); + } + return responseBuilder.build(); + } + + static GetServersResponse toGetServersResponse(ServerList servers) { + GetServersResponse.Builder responseBuilder = GetServersResponse + .newBuilder() + .setEnd(servers.end); + for (Instrumented s : servers.servers) { + responseBuilder.addServer(ChannelzProtoUtil.toServer(s)); + } + return responseBuilder.build(); + } + + private static T getFuture(ListenableFuture future) { + try { + return future.get(); + } catch (InterruptedException e) { + throw Status.INTERNAL.withCause(e).asRuntimeException(); + } catch (ExecutionException e) { + throw Status.INTERNAL.withCause(e).asRuntimeException(); + } + } +} diff --git a/services/src/main/java/io/grpc/services/ChannelzService.java b/services/src/main/java/io/grpc/services/ChannelzService.java new file mode 100644 index 000000000..848d5df9c --- /dev/null +++ b/services/src/main/java/io/grpc/services/ChannelzService.java @@ -0,0 +1,141 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.common.annotations.VisibleForTesting; +import io.grpc.ExperimentalApi; +import io.grpc.Status; +import io.grpc.channelz.v1.ChannelzGrpc; +import io.grpc.channelz.v1.GetChannelRequest; +import io.grpc.channelz.v1.GetChannelResponse; +import io.grpc.channelz.v1.GetServerSocketsRequest; +import io.grpc.channelz.v1.GetServerSocketsResponse; +import io.grpc.channelz.v1.GetServersRequest; +import io.grpc.channelz.v1.GetServersResponse; +import io.grpc.channelz.v1.GetSocketRequest; +import io.grpc.channelz.v1.GetSocketResponse; +import io.grpc.channelz.v1.GetSubchannelRequest; +import io.grpc.channelz.v1.GetSubchannelResponse; +import io.grpc.channelz.v1.GetTopChannelsRequest; +import io.grpc.channelz.v1.GetTopChannelsResponse; +import io.grpc.internal.Channelz; +import io.grpc.internal.Channelz.ChannelStats; +import io.grpc.internal.Channelz.ServerList; +import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.Instrumented; +import io.grpc.stub.StreamObserver; + +/** + * The channelz service provides stats about a running gRPC process. + */ +@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4206") +public final class ChannelzService extends ChannelzGrpc.ChannelzImplBase { + private final Channelz channelz; + private final int maxPageSize; + + public ChannelzService newInstance(int maxPageSize) { + return new ChannelzService(Channelz.instance(), maxPageSize); + } + + @VisibleForTesting + ChannelzService(Channelz channelz, int maxPageSize) { + this.channelz = channelz; + this.maxPageSize = maxPageSize; + } + + /** Returns top level channel aka {@link io.grpc.internal.ManagedChannelImpl}. */ + @Override + public void getTopChannels( + GetTopChannelsRequest request, StreamObserver responseObserver) { + Channelz.RootChannelList rootChannels + = channelz.getRootChannels(request.getStartChannelId(), maxPageSize); + + responseObserver.onNext(ChannelzProtoUtil.toGetTopChannelResponse(rootChannels)); + responseObserver.onCompleted(); + } + + /** Returns a top level channel aka {@link io.grpc.internal.ManagedChannelImpl}. */ + @Override + public void getChannel( + GetChannelRequest request, StreamObserver responseObserver) { + Instrumented s = channelz.getRootChannel(request.getChannelId()); + if (s == null) { + responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); + return; + } + + responseObserver.onNext( + GetChannelResponse + .newBuilder() + .setChannel(ChannelzProtoUtil.toChannel(s)) + .build()); + responseObserver.onCompleted(); + } + + /** Returns servers. */ + @Override + public void getServers( + GetServersRequest request, StreamObserver responseObserver) { + ServerList servers = channelz.getServers(request.getStartServerId(), maxPageSize); + + responseObserver.onNext(ChannelzProtoUtil.toGetServersResponse(servers)); + responseObserver.onCompleted(); + } + + /** Returns a subchannel. */ + @Override + public void getSubchannel( + GetSubchannelRequest request, StreamObserver responseObserver) { + Instrumented s = channelz.getSubchannel(request.getSubchannelId()); + if (s == null) { + responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); + return; + } + + responseObserver.onNext( + GetSubchannelResponse + .newBuilder() + .setSubchannel(ChannelzProtoUtil.toSubchannel(s)) + .build()); + responseObserver.onCompleted(); + } + + /** Returns a socket. */ + @Override + public void getSocket( + GetSocketRequest request, StreamObserver responseObserver) { + Instrumented s = channelz.getSocket(request.getSocketId()); + if (s == null) { + responseObserver.onError(Status.NOT_FOUND.asRuntimeException()); + return; + } + + responseObserver.onNext( + GetSocketResponse + .newBuilder() + .setSocket(ChannelzProtoUtil.toSocket(s)) + .build()); + responseObserver.onCompleted(); + } + + @Override + public void getServerSockets( + GetServerSocketsRequest request, StreamObserver responseObserver) { + // TODO(zpencer): fill this one out after refactoring channelz class + responseObserver.onError(Status.UNIMPLEMENTED.asRuntimeException()); + } +} diff --git a/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java new file mode 100644 index 000000000..897aafb3b --- /dev/null +++ b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java @@ -0,0 +1,457 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static org.junit.Assert.assertEquals; + +import com.google.common.collect.ImmutableList; +import com.google.protobuf.ByteString; +import com.google.protobuf.Int64Value; +import com.google.protobuf.util.Timestamps; +import io.grpc.ConnectivityState; +import io.grpc.channelz.v1.Address; +import io.grpc.channelz.v1.Address.OtherAddress; +import io.grpc.channelz.v1.Address.TcpIpAddress; +import io.grpc.channelz.v1.Address.UdsAddress; +import io.grpc.channelz.v1.Channel; +import io.grpc.channelz.v1.ChannelData; +import io.grpc.channelz.v1.ChannelData.State; +import io.grpc.channelz.v1.ChannelRef; +import io.grpc.channelz.v1.GetServersResponse; +import io.grpc.channelz.v1.GetTopChannelsResponse; +import io.grpc.channelz.v1.Server; +import io.grpc.channelz.v1.ServerData; +import io.grpc.channelz.v1.ServerRef; +import io.grpc.channelz.v1.Socket; +import io.grpc.channelz.v1.SocketData; +import io.grpc.channelz.v1.SocketRef; +import io.grpc.channelz.v1.Subchannel; +import io.grpc.channelz.v1.SubchannelRef; +import io.grpc.internal.Channelz.ChannelStats; +import io.grpc.internal.Channelz.RootChannelList; +import io.grpc.internal.Channelz.ServerList; +import io.grpc.internal.Channelz.ServerStats; +import io.grpc.internal.Instrumented; +import io.grpc.internal.WithLogId; +import io.grpc.services.ChannelzTestHelper.TestChannel; +import io.grpc.services.ChannelzTestHelper.TestServer; +import io.grpc.services.ChannelzTestHelper.TestSocket; +import io.netty.channel.unix.DomainSocketAddress; +import java.net.Inet4Address; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +@RunWith(JUnit4.class) +public final class ChannelzProtoUtilTest { + + private final TestChannel channel = new TestChannel(); + private final ChannelRef channelRef = ChannelRef + .newBuilder() + .setName(channel.toString()) + .setChannelId(channel.getLogId().getId()) + .build(); + private final ChannelData channelData = ChannelData + .newBuilder() + .setTarget("sometarget") + .setState(State.READY) + .setCallsStarted(1) + .setCallsSucceeded(2) + .setCallsFailed(3) + .setLastCallStartedTimestamp(Timestamps.fromMillis(4)) + .build(); + private final Channel channelProto = Channel + .newBuilder() + .setRef(channelRef) + .setData(channelData) + .build(); + + private final TestChannel subchannel = new TestChannel(); + private final SubchannelRef subchannelRef = SubchannelRef + .newBuilder() + .setName(subchannel.toString()) + .setSubchannelId(subchannel.getLogId().getId()) + .build(); + private final ChannelData subchannelData = ChannelData + .newBuilder() + .setTarget("sometarget") + .setState(State.READY) + .setCallsStarted(1) + .setCallsSucceeded(2) + .setCallsFailed(3) + .setLastCallStartedTimestamp(Timestamps.fromMillis(4)) + .build(); + private final Subchannel subchannelProto = Subchannel + .newBuilder() + .setRef(subchannelRef) + .setData(subchannelData) + .build(); + + private final TestServer server = new TestServer(); + private final ServerRef serverRef = ServerRef + .newBuilder() + .setName(server.toString()) + .setServerId(server.getLogId().getId()) + .build(); + private final ServerData serverData = ServerData + .newBuilder() + .setCallsStarted(1) + .setCallsSucceeded(2) + .setCallsFailed(3) + .setLastCallStartedTimestamp(Timestamps.fromMillis(4)) + .build(); + private final Server serverProto = Server + .newBuilder() + .setRef(serverRef) + .setData(serverData) + .build(); + + private final TestSocket socket = new TestSocket(); + private final SocketRef socketRef = SocketRef + .newBuilder() + .setName(socket.toString()) + .setSocketId(socket.getLogId().getId()) + .build(); + private final SocketData socketData = SocketData + .newBuilder() + .setStreamsStarted(1) + .setLastLocalStreamCreatedTimestamp(Timestamps.fromNanos(2)) + .setLastRemoteStreamCreatedTimestamp(Timestamps.fromNanos(3)) + .setStreamsSucceeded(4) + .setStreamsFailed(5) + .setMessagesSent(6) + .setMessagesReceived(7) + .setKeepAlivesSent(8) + .setLastMessageSentTimestamp(Timestamps.fromNanos(9)) + .setLastMessageReceivedTimestamp(Timestamps.fromNanos(10)) + .setLocalFlowControlWindow(Int64Value.newBuilder().setValue(11).build()) + .setRemoteFlowControlWindow(Int64Value.newBuilder().setValue(12).build()) + .build(); + private final Address localAddress = Address + .newBuilder() + .setTcpipAddress( + TcpIpAddress + .newBuilder() + .setIpAddress(ByteString.copyFrom( + ((InetSocketAddress) socket.local).getAddress().getAddress())) + .build()) + .build(); + private final Address remoteAddress = Address + .newBuilder() + .setTcpipAddress( + TcpIpAddress + .newBuilder() + .setIpAddress(ByteString.copyFrom( + ((InetSocketAddress) socket.remote).getAddress().getAddress())) + .build()) + .build(); + + @Test + public void toChannelRef() { + assertEquals(channelRef, ChannelzProtoUtil.toChannelRef(channel)); + } + + @Test + public void toSubchannelRef() { + assertEquals(subchannelRef, ChannelzProtoUtil.toSubchannelRef(subchannel)); + } + + @Test + public void toServerRef() { + assertEquals(serverRef, ChannelzProtoUtil.toServerRef(server)); + } + + @Test + public void toSocketRef() { + assertEquals(socketRef, ChannelzProtoUtil.toSocketRef(socket)); + } + + @Test + public void toState() { + for (ConnectivityState connectivityState : ConnectivityState.values()) { + assertEquals( + connectivityState.name(), + ChannelzProtoUtil.toState(connectivityState).getValueDescriptor().getName()); + } + assertEquals(State.UNKNOWN, ChannelzProtoUtil.toState(null)); + } + + @Test + public void toSocket() throws Exception { + assertEquals( + Socket + .newBuilder() + .setRef(socketRef) + .setLocal(localAddress) + .setRemote(remoteAddress) + .setData(socketData) + .build(), + ChannelzProtoUtil.toSocket(socket)); + } + + @Test + public void toSocketData() { + assertEquals( + socketData, + ChannelzProtoUtil.toSocketData(socket.transportStats)); + } + + @Test + public void toAddress_inet() throws Exception { + InetSocketAddress inet4 = new InetSocketAddress(Inet4Address.getByName("10.0.0.1"), 1000); + assertEquals( + Address.newBuilder().setTcpipAddress( + TcpIpAddress + .newBuilder() + .setIpAddress(ByteString.copyFrom(inet4.getAddress().getAddress())) + .build()) + .build(), + ChannelzProtoUtil.toAddress(inet4)); + } + + @Test + public void toAddress_uds() throws Exception { + String path = "/tmp/foo"; + DomainSocketAddress uds = new DomainSocketAddress(path); + assertEquals( + Address.newBuilder().setUdsAddress( + UdsAddress + .newBuilder() + .setFilename(path) + .build()) + .build(), + ChannelzProtoUtil.toAddress(uds)); + } + + @Test + public void toAddress_other() throws Exception { + final String name = "my name"; + SocketAddress other = new SocketAddress() { + @Override + public String toString() { + return name; + } + }; + assertEquals( + Address.newBuilder().setOtherAddress( + OtherAddress + .newBuilder() + .setName(name) + .build()) + .build(), + ChannelzProtoUtil.toAddress(other)); + } + + @Test + public void toServer() throws Exception { + assertEquals(serverProto, ChannelzProtoUtil.toServer(server)); + } + + @Test + public void toServerData() throws Exception { + assertEquals(serverData, ChannelzProtoUtil.toServerData(server.serverStats)); + } + + @Test + public void toChannel() throws Exception { + assertEquals(channelProto, ChannelzProtoUtil.toChannel(channel)); + + channel.stats = toBuilder(channel.stats) + .setSubchannels(ImmutableList.of(subchannel)) + .build(); + + assertEquals( + channelProto + .toBuilder() + .addSubchannelRef(subchannelRef) + .build(), + ChannelzProtoUtil.toChannel(channel)); + + TestChannel otherSubchannel = new TestChannel(); + channel.stats = toBuilder(channel.stats) + .setSubchannels(ImmutableList.of(subchannel, otherSubchannel)) + .build(); + assertEquals( + channelProto + .toBuilder() + .addSubchannelRef(subchannelRef) + .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(otherSubchannel)) + .build(), + ChannelzProtoUtil.toChannel(channel)); + } + + @Test + public void extractChannelData() { + assertEquals(channelData, ChannelzProtoUtil.extractChannelData(channel.stats)); + } + + @Test + public void toSubchannel_noChildren() throws Exception { + assertEquals( + subchannelProto, + ChannelzProtoUtil.toSubchannel(subchannel)); + } + + @Test + public void toSubchannel_socketChildren() throws Exception { + subchannel.stats = toBuilder(subchannel.stats) + .setSockets(ImmutableList.of(socket)) + .build(); + + assertEquals( + subchannelProto.toBuilder() + .addSocketRef(socketRef) + .build(), + ChannelzProtoUtil.toSubchannel(subchannel)); + + TestSocket otherSocket = new TestSocket(); + subchannel.stats = toBuilder(subchannel.stats) + .setSockets(ImmutableList.of(socket, otherSocket)) + .build(); + assertEquals( + subchannelProto + .toBuilder() + .addSocketRef(socketRef) + .addSocketRef(ChannelzProtoUtil.toSocketRef(otherSocket)) + .build(), + ChannelzProtoUtil.toSubchannel(subchannel)); + } + + @Test + public void toSubchannel_subchannelChildren() throws Exception { + TestChannel subchannel1 = new TestChannel(); + subchannel.stats = toBuilder(subchannel.stats) + .setSubchannels(ImmutableList.of(subchannel1)) + .build(); + assertEquals( + subchannelProto.toBuilder() + .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(subchannel1)) + .build(), + ChannelzProtoUtil.toSubchannel(subchannel)); + + TestChannel subchannel2 = new TestChannel(); + subchannel.stats = toBuilder(subchannel.stats) + .setSubchannels(ImmutableList.of(subchannel1, subchannel2)) + .build(); + assertEquals( + subchannelProto + .toBuilder() + .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(subchannel1)) + .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(subchannel2)) + .build(), + ChannelzProtoUtil.toSubchannel(subchannel)); + } + + @Test + public void toGetTopChannelsResponse() { + // empty results + assertEquals( + GetTopChannelsResponse.newBuilder().setEnd(true).build(), + ChannelzProtoUtil.toGetTopChannelResponse( + new RootChannelList(Collections.>emptyList(), true))); + + // 1 result, paginated + assertEquals( + GetTopChannelsResponse + .newBuilder() + .addChannel(channelProto) + .build(), + ChannelzProtoUtil.toGetTopChannelResponse( + new RootChannelList(ImmutableList.>of(channel), false))); + + // 1 result, end + assertEquals( + GetTopChannelsResponse + .newBuilder() + .addChannel(channelProto) + .setEnd(true) + .build(), + ChannelzProtoUtil.toGetTopChannelResponse( + new RootChannelList(ImmutableList.>of(channel), true))); + + // 2 results, end + TestChannel channel2 = new TestChannel(); + assertEquals( + GetTopChannelsResponse + .newBuilder() + .addChannel(channelProto) + .addChannel(ChannelzProtoUtil.toChannel(channel2)) + .build(), + ChannelzProtoUtil.toGetTopChannelResponse( + new RootChannelList( + ImmutableList.>of(channel, channel2), false))); + } + + @Test + public void toGetServersResponse() { + // empty results + assertEquals( + GetServersResponse.getDefaultInstance(), + ChannelzProtoUtil.toGetServersResponse( + new ServerList(Collections.>emptyList(), false))); + + // 1 result, paginated + assertEquals( + GetServersResponse + .newBuilder() + .addServer(serverProto) + .build(), + ChannelzProtoUtil.toGetServersResponse( + new ServerList(ImmutableList.>of(server), false))); + + // 1 result, end + assertEquals( + GetServersResponse + .newBuilder() + .addServer(serverProto) + .setEnd(true) + .build(), + ChannelzProtoUtil.toGetServersResponse( + new ServerList(ImmutableList.>of(server), true))); + + TestServer server2 = new TestServer(); + // 2 results, end + assertEquals( + GetServersResponse + .newBuilder() + .addServer(serverProto) + .addServer(ChannelzProtoUtil.toServer(server2)) + .build(), + ChannelzProtoUtil.toGetServersResponse( + new ServerList(ImmutableList.>of(server, server2), false))); + } + + private static ChannelStats.Builder toBuilder(ChannelStats stats) { + ChannelStats.Builder builder = new ChannelStats.Builder() + .setTarget(stats.target) + .setState(stats.state) + .setCallsStarted(stats.callsStarted) + .setCallsSucceeded(stats.callsSucceeded) + .setCallsFailed(stats.callsFailed) + .setLastCallStartedMillis(stats.lastCallStartedMillis); + if (!stats.subchannels.isEmpty()) { + builder.setSubchannels(stats.subchannels); + } + if (!stats.sockets.isEmpty()) { + builder.setSockets(stats.sockets); + } + return builder; + } +} diff --git a/services/src/test/java/io/grpc/services/ChannelzServiceTest.java b/services/src/test/java/io/grpc/services/ChannelzServiceTest.java new file mode 100644 index 000000000..99f4ec37b --- /dev/null +++ b/services/src/test/java/io/grpc/services/ChannelzServiceTest.java @@ -0,0 +1,235 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import static org.junit.Assert.assertEquals; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import io.grpc.Status; +import io.grpc.StatusRuntimeException; +import io.grpc.channelz.v1.GetChannelRequest; +import io.grpc.channelz.v1.GetChannelResponse; +import io.grpc.channelz.v1.GetServersRequest; +import io.grpc.channelz.v1.GetServersResponse; +import io.grpc.channelz.v1.GetSocketRequest; +import io.grpc.channelz.v1.GetSocketResponse; +import io.grpc.channelz.v1.GetSubchannelRequest; +import io.grpc.channelz.v1.GetSubchannelResponse; +import io.grpc.channelz.v1.GetTopChannelsRequest; +import io.grpc.channelz.v1.GetTopChannelsResponse; +import io.grpc.internal.Channelz; +import io.grpc.services.ChannelzTestHelper.TestChannel; +import io.grpc.services.ChannelzTestHelper.TestServer; +import io.grpc.services.ChannelzTestHelper.TestSocket; +import io.grpc.stub.StreamObserver; +import java.util.concurrent.ExecutionException; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; +import org.mockito.ArgumentCaptor; + +@RunWith(JUnit4.class) +public class ChannelzServiceTest { + // small value to force pagination + private static final int MAX_PAGE_SIZE = 1; + + private final Channelz channelz = new Channelz(); + private ChannelzService service = new ChannelzService(channelz, MAX_PAGE_SIZE); + + @Test + public void getTopChannels_empty() { + assertEquals( + GetTopChannelsResponse.newBuilder().setEnd(true).build(), + getTopChannelHelper(0)); + } + + @Test + public void getTopChannels_onePage() throws Exception { + TestChannel root = new TestChannel(); + channelz.addRootChannel(root); + + assertEquals( + GetTopChannelsResponse + .newBuilder() + .addChannel(ChannelzProtoUtil.toChannel(root)) + .setEnd(true) + .build(), + getTopChannelHelper(0)); + } + + @Test + public void getChannel() throws ExecutionException, InterruptedException { + TestChannel root = new TestChannel(); + assertChannelNotFound(root.getLogId().getId()); + + channelz.addRootChannel(root); + assertEquals( + GetChannelResponse + .newBuilder() + .setChannel(ChannelzProtoUtil.toChannel(root)) + .build(), + getChannelHelper(root.getLogId().getId())); + + channelz.removeRootChannel(root); + assertChannelNotFound(root.getLogId().getId()); + } + + @Test + public void getSubchannel() throws Exception { + TestChannel subchannel = new TestChannel(); + assertSubchannelNotFound(subchannel.getLogId().getId()); + + channelz.addSubchannel(subchannel); + assertEquals( + GetSubchannelResponse + .newBuilder() + .setSubchannel(ChannelzProtoUtil.toSubchannel(subchannel)) + .build(), + getSubchannelHelper(subchannel.getLogId().getId())); + + channelz.removeSubchannel(subchannel); + assertSubchannelNotFound(subchannel.getLogId().getId()); + } + + @Test + public void getServers_empty() { + assertEquals( + GetServersResponse.newBuilder().setEnd(true).build(), + getServersHelper(0)); + } + + @Test + public void getServers_onePage() throws Exception { + TestServer server = new TestServer(); + channelz.addServer(server); + + assertEquals( + GetServersResponse + .newBuilder() + .addServer(ChannelzProtoUtil.toServer(server)) + .setEnd(true) + .build(), + getServersHelper(0)); + } + + @Test + public void getSocket() throws Exception { + TestSocket socket = new TestSocket(); + assertSocketNotFound(socket.getLogId().getId()); + + channelz.addSocket(socket); + assertEquals( + GetSocketResponse + .newBuilder() + .setSocket(ChannelzProtoUtil.toSocket(socket)) + .build(), + getSocketHelper(socket.getLogId().getId())); + + channelz.removeSocket(socket); + assertSocketNotFound(socket.getLogId().getId()); + } + + private GetTopChannelsResponse getTopChannelHelper(long startId) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor responseCaptor + = ArgumentCaptor.forClass(GetTopChannelsResponse.class); + service.getTopChannels( + GetTopChannelsRequest.newBuilder().setStartChannelId(startId).build(), + observer); + verify(observer).onNext(responseCaptor.capture()); + verify(observer).onCompleted(); + return responseCaptor.getValue(); + } + + private GetChannelResponse getChannelHelper(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor response + = ArgumentCaptor.forClass(GetChannelResponse.class); + service.getChannel(GetChannelRequest.newBuilder().setChannelId(id).build(), observer); + verify(observer).onNext(response.capture()); + verify(observer).onCompleted(); + return response.getValue(); + } + + private void assertChannelNotFound(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + service.getChannel(GetChannelRequest.newBuilder().setChannelId(id).build(), observer); + verify(observer).onError(exceptionCaptor.capture()); + Exception exception = exceptionCaptor.getValue(); + assertEquals(Status.NOT_FOUND, ((StatusRuntimeException) exception).getStatus()); + } + + private GetSubchannelResponse getSubchannelHelper(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor response + = ArgumentCaptor.forClass(GetSubchannelResponse.class); + service.getSubchannel(GetSubchannelRequest.newBuilder().setSubchannelId(id).build(), observer); + verify(observer).onNext(response.capture()); + verify(observer).onCompleted(); + return response.getValue(); + } + + private void assertSubchannelNotFound(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + service.getSubchannel(GetSubchannelRequest.newBuilder().setSubchannelId(id).build(), observer); + verify(observer).onError(exceptionCaptor.capture()); + Exception exception = exceptionCaptor.getValue(); + assertEquals(Status.NOT_FOUND, ((StatusRuntimeException) exception).getStatus()); + } + + private GetServersResponse getServersHelper(long startId) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor responseCaptor + = ArgumentCaptor.forClass(GetServersResponse.class); + service.getServers( + GetServersRequest.newBuilder().setStartServerId(startId).build(), + observer); + verify(observer).onNext(responseCaptor.capture()); + verify(observer).onCompleted(); + return responseCaptor.getValue(); + } + + private void assertSocketNotFound(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor exceptionCaptor = ArgumentCaptor.forClass(Exception.class); + service.getSocket(GetSocketRequest.newBuilder().setSocketId(id).build(), observer); + verify(observer).onError(exceptionCaptor.capture()); + Exception exception = exceptionCaptor.getValue(); + assertEquals(Status.NOT_FOUND, ((StatusRuntimeException) exception).getStatus()); + } + + private GetSocketResponse getSocketHelper(long id) { + @SuppressWarnings("unchecked") + StreamObserver observer = mock(StreamObserver.class); + ArgumentCaptor response + = ArgumentCaptor.forClass(GetSocketResponse.class); + service.getSocket(GetSocketRequest.newBuilder().setSocketId(id).build(), observer); + verify(observer).onNext(response.capture()); + verify(observer).onCompleted(); + return response.getValue(); + } +} diff --git a/services/src/test/java/io/grpc/services/ChannelzTestHelper.java b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java new file mode 100644 index 000000000..7389e2d6a --- /dev/null +++ b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java @@ -0,0 +1,118 @@ +/* + * Copyright 2018, gRPC Authors All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.services; + +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; +import io.grpc.ConnectivityState; +import io.grpc.internal.Channelz.ChannelStats; +import io.grpc.internal.Channelz.Security; +import io.grpc.internal.Channelz.ServerStats; +import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.Channelz.TransportStats; +import io.grpc.internal.Instrumented; +import io.grpc.internal.LogId; +import io.grpc.internal.WithLogId; +import java.net.InetSocketAddress; +import java.net.SocketAddress; +import java.util.Collections; + +/** + * Test class definitions that will be used in the proto utils test as well as + * channelz service test. + */ +final class ChannelzTestHelper { + + static final class TestSocket implements Instrumented { + private final LogId id = LogId.allocate("socket"); + TransportStats transportStats = new TransportStats( + /*streamsStarted=*/ 1, + /*lastLocalStreamCreatedTimeNanos=*/ 2, + /*lastRemoteStreamCreatedTimeNanos=*/ 3, + /*streamsSucceeded=*/ 4, + /*streamsFailed=*/ 5, + /*messagesSent=*/ 6, + /*messagesReceived=*/ 7, + /*keepAlivesSent=*/ 8, + /*lastMessageSentTimeNanos=*/ 9, + /*lastMessageReceivedTimeNanos=*/ 10, + /*localFlowControlWindow=*/ 11, + /*remoteFlowControlWindow=*/ 12); + SocketAddress local = new InetSocketAddress("10.0.0.1", 1000); + SocketAddress remote = new InetSocketAddress("10.0.0.2", 1000); + + + @Override + public ListenableFuture getStats() { + SettableFuture ret = SettableFuture.create(); + ret.set(new SocketStats(transportStats, local, remote, new Security())); + return ret; + } + + @Override + public LogId getLogId() { + return id; + } + } + + static final class TestServer implements Instrumented { + private final LogId id = LogId.allocate("server"); + ServerStats serverStats = new ServerStats( + /*callsStarted=*/ 1, + /*callsSucceeded=*/ 2, + /*callsFailed=*/ 3, + /*lastCallStartedMillis=*/ 4); + + @Override + public ListenableFuture getStats() { + SettableFuture ret = SettableFuture.create(); + ret.set(serverStats); + return ret; + } + + @Override + public LogId getLogId() { + return id; + } + } + + static final class TestChannel implements Instrumented { + private final LogId id = LogId.allocate("channel-or-subchannel"); + + ChannelStats stats = new ChannelStats( + /*target=*/ "sometarget", + /*connectivityState=*/ ConnectivityState.READY, + /*callsStarted=*/ 1, + /*callsSucceeded=*/ 2, + /*callsFailed=*/ 3, + /*lastCallStartedMillis=*/ 4, + /*subchannels=*/ Collections.emptyList(), + /*sockets=*/ Collections.emptyList()); + + @Override + public ListenableFuture getStats() { + SettableFuture ret = SettableFuture.create(); + ret.set(stats); + return ret; + } + + @Override + public LogId getLogId() { + return id; + } + } +} -- cgit v1.2.3