aboutsummaryrefslogtreecommitdiff
path: root/services
diff options
context:
space:
mode:
authorzpencer <spencerfang@google.com>2018-03-10 14:58:34 -0800
committerGitHub <noreply@github.com>2018-03-10 14:58:34 -0800
commitf56ac76b353be506688d02c2dcd7c284f9393013 (patch)
tree394e9b82e199d568d0b4af0175741a650a8c67e8 /services
parent06784999659e3d184dad6dc504c4a175ab1e05ba (diff)
downloadgrpc-grpc-java-f56ac76b353be506688d02c2dcd7c284f9393013.tar.gz
core,services: add ChannelzService class (#4205)
This implements the methods to expose the stats as a gRPC service. GetServerSockets is still unimplemented and will require a follow up change.
Diffstat (limited to 'services')
-rw-r--r--services/src/main/java/io/grpc/services/ChannelzProtoUtil.java253
-rw-r--r--services/src/main/java/io/grpc/services/ChannelzService.java141
-rw-r--r--services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java457
-rw-r--r--services/src/test/java/io/grpc/services/ChannelzServiceTest.java235
-rw-r--r--services/src/test/java/io/grpc/services/ChannelzTestHelper.java118
5 files changed, 1204 insertions, 0 deletions
diff --git a/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java
new file mode 100644
index 000000000..9ee4015ea
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java
@@ -0,0 +1,253 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.util.Timestamps;
+import io.grpc.ConnectivityState;
+import io.grpc.Status;
+import io.grpc.channelz.v1.Address;
+import io.grpc.channelz.v1.Address.OtherAddress;
+import io.grpc.channelz.v1.Address.TcpIpAddress;
+import io.grpc.channelz.v1.Address.UdsAddress;
+import io.grpc.channelz.v1.Channel;
+import io.grpc.channelz.v1.ChannelData;
+import io.grpc.channelz.v1.ChannelData.State;
+import io.grpc.channelz.v1.ChannelRef;
+import io.grpc.channelz.v1.GetServersResponse;
+import io.grpc.channelz.v1.GetTopChannelsResponse;
+import io.grpc.channelz.v1.Server;
+import io.grpc.channelz.v1.ServerData;
+import io.grpc.channelz.v1.ServerRef;
+import io.grpc.channelz.v1.Socket;
+import io.grpc.channelz.v1.SocketData;
+import io.grpc.channelz.v1.SocketRef;
+import io.grpc.channelz.v1.Subchannel;
+import io.grpc.channelz.v1.SubchannelRef;
+import io.grpc.internal.Channelz;
+import io.grpc.internal.Channelz.ChannelStats;
+import io.grpc.internal.Channelz.RootChannelList;
+import io.grpc.internal.Channelz.ServerList;
+import io.grpc.internal.Channelz.ServerStats;
+import io.grpc.internal.Channelz.SocketStats;
+import io.grpc.internal.Channelz.TransportStats;
+import io.grpc.internal.Instrumented;
+import io.grpc.internal.WithLogId;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.concurrent.ExecutionException;
+
+/**
+ * A static utility class for turning internal data structures into protos.
+ */
+final class ChannelzProtoUtil {
+ private ChannelzProtoUtil() {
+ // do not instantiate.
+ }
+
+ static ChannelRef toChannelRef(WithLogId obj) {
+ return ChannelRef
+ .newBuilder()
+ .setChannelId(obj.getLogId().getId())
+ .setName(obj.toString())
+ .build();
+ }
+
+ static SubchannelRef toSubchannelRef(WithLogId obj) {
+ return SubchannelRef
+ .newBuilder()
+ .setSubchannelId(obj.getLogId().getId())
+ .setName(obj.toString())
+ .build();
+ }
+
+ static ServerRef toServerRef(WithLogId obj) {
+ return ServerRef
+ .newBuilder()
+ .setServerId(obj.getLogId().getId())
+ .setName(obj.toString())
+ .build();
+ }
+
+ static SocketRef toSocketRef(WithLogId obj) {
+ return SocketRef
+ .newBuilder()
+ .setSocketId(obj.getLogId().getId())
+ .setName(obj.toString())
+ .build();
+ }
+
+ static Server toServer(Instrumented<ServerStats> obj) {
+ ServerStats stats = getFuture(obj.getStats());
+ return Server
+ .newBuilder()
+ .setRef(toServerRef(obj))
+ .setData(toServerData(stats))
+ .build();
+ }
+
+ static ServerData toServerData(ServerStats stats) {
+ return ServerData
+ .newBuilder()
+ .setCallsStarted(stats.callsStarted)
+ .setCallsSucceeded(stats.callsSucceeded)
+ .setCallsFailed(stats.callsFailed)
+ .setLastCallStartedTimestamp(Timestamps.fromMillis(stats.lastCallStartedMillis))
+ .build();
+ }
+
+ static Socket toSocket(Instrumented<SocketStats> obj) {
+ SocketStats socketStats = getFuture(obj.getStats());
+ return Socket.newBuilder()
+ .setRef(toSocketRef(obj))
+ .setRemote(toAddress(socketStats.remote))
+ .setLocal(toAddress(socketStats.local))
+ .setData(toSocketData(socketStats.data))
+ .build();
+ }
+
+ static Address toAddress(SocketAddress address) {
+ Address.Builder builder = Address.newBuilder();
+ if (address instanceof InetSocketAddress) {
+ InetSocketAddress inetAddress = (InetSocketAddress) address;
+ builder.setTcpipAddress(
+ TcpIpAddress
+ .newBuilder()
+ .setIpAddress(
+ ByteString.copyFrom(inetAddress.getAddress().getAddress()))
+ .build());
+ } else if (address.getClass().getName().endsWith("io.netty.channel.unix.DomainSocketAddress")) {
+ builder.setUdsAddress(
+ UdsAddress
+ .newBuilder()
+ .setFilename(address.toString()) // DomainSocketAddress.toString returns filename
+ .build());
+ } else {
+ builder.setOtherAddress(OtherAddress.newBuilder().setName(address.toString()).build());
+ }
+ return builder.build();
+ }
+
+ static SocketData toSocketData(TransportStats s) {
+ return SocketData
+ .newBuilder()
+ .setStreamsStarted(s.streamsStarted)
+ .setStreamsSucceeded(s.streamsSucceeded)
+ .setStreamsFailed(s.streamsFailed)
+ .setMessagesSent(s.messagesSent)
+ .setMessagesReceived(s.messagesReceived)
+ .setKeepAlivesSent(s.keepAlivesSent)
+ .setLastLocalStreamCreatedTimestamp(
+ Timestamps.fromNanos(s.lastLocalStreamCreatedTimeNanos))
+ .setLastRemoteStreamCreatedTimestamp(
+ Timestamps.fromNanos(s.lastRemoteStreamCreatedTimeNanos))
+ .setLastMessageSentTimestamp(
+ Timestamps.fromNanos(s.lastMessageSentTimeNanos))
+ .setLastMessageReceivedTimestamp(
+ Timestamps.fromNanos(s.lastMessageReceivedTimeNanos))
+ .setLocalFlowControlWindow(
+ Int64Value.newBuilder().setValue(s.localFlowControlWindow).build())
+ .setRemoteFlowControlWindow(
+ Int64Value.newBuilder().setValue(s.remoteFlowControlWindow).build())
+ .build();
+ }
+
+ static Channel toChannel(Instrumented<ChannelStats> channel) {
+ ChannelStats stats = getFuture(channel.getStats());
+ Channel.Builder channelBuilder = Channel
+ .newBuilder()
+ .setRef(toChannelRef(channel))
+ .setData(extractChannelData(stats));
+ for (WithLogId subchannel : stats.subchannels) {
+ channelBuilder.addSubchannelRef(toSubchannelRef(subchannel));
+ }
+
+ return channelBuilder.build();
+ }
+
+ static ChannelData extractChannelData(Channelz.ChannelStats stats) {
+ return ChannelData
+ .newBuilder()
+ .setTarget(stats.target)
+ .setState(toState(stats.state))
+ .setCallsStarted(stats.callsStarted)
+ .setCallsSucceeded(stats.callsSucceeded)
+ .setCallsFailed(stats.callsFailed)
+ .setLastCallStartedTimestamp(Timestamps.fromMillis(stats.lastCallStartedMillis))
+ .build();
+ }
+
+ static State toState(ConnectivityState state) {
+ if (state == null) {
+ return State.UNKNOWN;
+ }
+ try {
+ return Enum.valueOf(State.class, state.name());
+ } catch (IllegalArgumentException e) {
+ return State.UNKNOWN;
+ }
+ }
+
+ static Subchannel toSubchannel(Instrumented<ChannelStats> subchannel) {
+ ChannelStats stats = getFuture(subchannel.getStats());
+ Subchannel.Builder subchannelBuilder = Subchannel
+ .newBuilder()
+ .setRef(toSubchannelRef(subchannel))
+ .setData(extractChannelData(stats));
+ Preconditions.checkState(stats.sockets.isEmpty() || stats.subchannels.isEmpty());
+ for (WithLogId childSocket : stats.sockets) {
+ subchannelBuilder.addSocketRef(toSocketRef(childSocket));
+ }
+ for (WithLogId childSubchannel : stats.subchannels) {
+ subchannelBuilder.addSubchannelRef(toSubchannelRef(childSubchannel));
+ }
+ return subchannelBuilder.build();
+ }
+
+ static GetTopChannelsResponse toGetTopChannelResponse(RootChannelList rootChannels) {
+ GetTopChannelsResponse.Builder responseBuilder = GetTopChannelsResponse
+ .newBuilder()
+ .setEnd(rootChannels.end);
+ for (Instrumented<ChannelStats> c : rootChannels.channels) {
+ responseBuilder.addChannel(ChannelzProtoUtil.toChannel(c));
+ }
+ return responseBuilder.build();
+ }
+
+ static GetServersResponse toGetServersResponse(ServerList servers) {
+ GetServersResponse.Builder responseBuilder = GetServersResponse
+ .newBuilder()
+ .setEnd(servers.end);
+ for (Instrumented<ServerStats> s : servers.servers) {
+ responseBuilder.addServer(ChannelzProtoUtil.toServer(s));
+ }
+ return responseBuilder.build();
+ }
+
+ private static <T> T getFuture(ListenableFuture<T> future) {
+ try {
+ return future.get();
+ } catch (InterruptedException e) {
+ throw Status.INTERNAL.withCause(e).asRuntimeException();
+ } catch (ExecutionException e) {
+ throw Status.INTERNAL.withCause(e).asRuntimeException();
+ }
+ }
+}
diff --git a/services/src/main/java/io/grpc/services/ChannelzService.java b/services/src/main/java/io/grpc/services/ChannelzService.java
new file mode 100644
index 000000000..848d5df9c
--- /dev/null
+++ b/services/src/main/java/io/grpc/services/ChannelzService.java
@@ -0,0 +1,141 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import com.google.common.annotations.VisibleForTesting;
+import io.grpc.ExperimentalApi;
+import io.grpc.Status;
+import io.grpc.channelz.v1.ChannelzGrpc;
+import io.grpc.channelz.v1.GetChannelRequest;
+import io.grpc.channelz.v1.GetChannelResponse;
+import io.grpc.channelz.v1.GetServerSocketsRequest;
+import io.grpc.channelz.v1.GetServerSocketsResponse;
+import io.grpc.channelz.v1.GetServersRequest;
+import io.grpc.channelz.v1.GetServersResponse;
+import io.grpc.channelz.v1.GetSocketRequest;
+import io.grpc.channelz.v1.GetSocketResponse;
+import io.grpc.channelz.v1.GetSubchannelRequest;
+import io.grpc.channelz.v1.GetSubchannelResponse;
+import io.grpc.channelz.v1.GetTopChannelsRequest;
+import io.grpc.channelz.v1.GetTopChannelsResponse;
+import io.grpc.internal.Channelz;
+import io.grpc.internal.Channelz.ChannelStats;
+import io.grpc.internal.Channelz.ServerList;
+import io.grpc.internal.Channelz.SocketStats;
+import io.grpc.internal.Instrumented;
+import io.grpc.stub.StreamObserver;
+
+/**
+ * The channelz service provides stats about a running gRPC process.
+ */
+@ExperimentalApi("https://github.com/grpc/grpc-java/issues/4206")
+public final class ChannelzService extends ChannelzGrpc.ChannelzImplBase {
+ private final Channelz channelz;
+ private final int maxPageSize;
+
+ public ChannelzService newInstance(int maxPageSize) {
+ return new ChannelzService(Channelz.instance(), maxPageSize);
+ }
+
+ @VisibleForTesting
+ ChannelzService(Channelz channelz, int maxPageSize) {
+ this.channelz = channelz;
+ this.maxPageSize = maxPageSize;
+ }
+
+ /** Returns top level channel aka {@link io.grpc.internal.ManagedChannelImpl}. */
+ @Override
+ public void getTopChannels(
+ GetTopChannelsRequest request, StreamObserver<GetTopChannelsResponse> responseObserver) {
+ Channelz.RootChannelList rootChannels
+ = channelz.getRootChannels(request.getStartChannelId(), maxPageSize);
+
+ responseObserver.onNext(ChannelzProtoUtil.toGetTopChannelResponse(rootChannels));
+ responseObserver.onCompleted();
+ }
+
+ /** Returns a top level channel aka {@link io.grpc.internal.ManagedChannelImpl}. */
+ @Override
+ public void getChannel(
+ GetChannelRequest request, StreamObserver<GetChannelResponse> responseObserver) {
+ Instrumented<ChannelStats> s = channelz.getRootChannel(request.getChannelId());
+ if (s == null) {
+ responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
+ return;
+ }
+
+ responseObserver.onNext(
+ GetChannelResponse
+ .newBuilder()
+ .setChannel(ChannelzProtoUtil.toChannel(s))
+ .build());
+ responseObserver.onCompleted();
+ }
+
+ /** Returns servers. */
+ @Override
+ public void getServers(
+ GetServersRequest request, StreamObserver<GetServersResponse> responseObserver) {
+ ServerList servers = channelz.getServers(request.getStartServerId(), maxPageSize);
+
+ responseObserver.onNext(ChannelzProtoUtil.toGetServersResponse(servers));
+ responseObserver.onCompleted();
+ }
+
+ /** Returns a subchannel. */
+ @Override
+ public void getSubchannel(
+ GetSubchannelRequest request, StreamObserver<GetSubchannelResponse> responseObserver) {
+ Instrumented<ChannelStats> s = channelz.getSubchannel(request.getSubchannelId());
+ if (s == null) {
+ responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
+ return;
+ }
+
+ responseObserver.onNext(
+ GetSubchannelResponse
+ .newBuilder()
+ .setSubchannel(ChannelzProtoUtil.toSubchannel(s))
+ .build());
+ responseObserver.onCompleted();
+ }
+
+ /** Returns a socket. */
+ @Override
+ public void getSocket(
+ GetSocketRequest request, StreamObserver<GetSocketResponse> responseObserver) {
+ Instrumented<SocketStats> s = channelz.getSocket(request.getSocketId());
+ if (s == null) {
+ responseObserver.onError(Status.NOT_FOUND.asRuntimeException());
+ return;
+ }
+
+ responseObserver.onNext(
+ GetSocketResponse
+ .newBuilder()
+ .setSocket(ChannelzProtoUtil.toSocket(s))
+ .build());
+ responseObserver.onCompleted();
+ }
+
+ @Override
+ public void getServerSockets(
+ GetServerSocketsRequest request, StreamObserver<GetServerSocketsResponse> responseObserver) {
+ // TODO(zpencer): fill this one out after refactoring channelz class
+ responseObserver.onError(Status.UNIMPLEMENTED.asRuntimeException());
+ }
+}
diff --git a/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java
new file mode 100644
index 000000000..897aafb3b
--- /dev/null
+++ b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java
@@ -0,0 +1,457 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import static org.junit.Assert.assertEquals;
+
+import com.google.common.collect.ImmutableList;
+import com.google.protobuf.ByteString;
+import com.google.protobuf.Int64Value;
+import com.google.protobuf.util.Timestamps;
+import io.grpc.ConnectivityState;
+import io.grpc.channelz.v1.Address;
+import io.grpc.channelz.v1.Address.OtherAddress;
+import io.grpc.channelz.v1.Address.TcpIpAddress;
+import io.grpc.channelz.v1.Address.UdsAddress;
+import io.grpc.channelz.v1.Channel;
+import io.grpc.channelz.v1.ChannelData;
+import io.grpc.channelz.v1.ChannelData.State;
+import io.grpc.channelz.v1.ChannelRef;
+import io.grpc.channelz.v1.GetServersResponse;
+import io.grpc.channelz.v1.GetTopChannelsResponse;
+import io.grpc.channelz.v1.Server;
+import io.grpc.channelz.v1.ServerData;
+import io.grpc.channelz.v1.ServerRef;
+import io.grpc.channelz.v1.Socket;
+import io.grpc.channelz.v1.SocketData;
+import io.grpc.channelz.v1.SocketRef;
+import io.grpc.channelz.v1.Subchannel;
+import io.grpc.channelz.v1.SubchannelRef;
+import io.grpc.internal.Channelz.ChannelStats;
+import io.grpc.internal.Channelz.RootChannelList;
+import io.grpc.internal.Channelz.ServerList;
+import io.grpc.internal.Channelz.ServerStats;
+import io.grpc.internal.Instrumented;
+import io.grpc.internal.WithLogId;
+import io.grpc.services.ChannelzTestHelper.TestChannel;
+import io.grpc.services.ChannelzTestHelper.TestServer;
+import io.grpc.services.ChannelzTestHelper.TestSocket;
+import io.netty.channel.unix.DomainSocketAddress;
+import java.net.Inet4Address;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collections;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+@RunWith(JUnit4.class)
+public final class ChannelzProtoUtilTest {
+
+ private final TestChannel channel = new TestChannel();
+ private final ChannelRef channelRef = ChannelRef
+ .newBuilder()
+ .setName(channel.toString())
+ .setChannelId(channel.getLogId().getId())
+ .build();
+ private final ChannelData channelData = ChannelData
+ .newBuilder()
+ .setTarget("sometarget")
+ .setState(State.READY)
+ .setCallsStarted(1)
+ .setCallsSucceeded(2)
+ .setCallsFailed(3)
+ .setLastCallStartedTimestamp(Timestamps.fromMillis(4))
+ .build();
+ private final Channel channelProto = Channel
+ .newBuilder()
+ .setRef(channelRef)
+ .setData(channelData)
+ .build();
+
+ private final TestChannel subchannel = new TestChannel();
+ private final SubchannelRef subchannelRef = SubchannelRef
+ .newBuilder()
+ .setName(subchannel.toString())
+ .setSubchannelId(subchannel.getLogId().getId())
+ .build();
+ private final ChannelData subchannelData = ChannelData
+ .newBuilder()
+ .setTarget("sometarget")
+ .setState(State.READY)
+ .setCallsStarted(1)
+ .setCallsSucceeded(2)
+ .setCallsFailed(3)
+ .setLastCallStartedTimestamp(Timestamps.fromMillis(4))
+ .build();
+ private final Subchannel subchannelProto = Subchannel
+ .newBuilder()
+ .setRef(subchannelRef)
+ .setData(subchannelData)
+ .build();
+
+ private final TestServer server = new TestServer();
+ private final ServerRef serverRef = ServerRef
+ .newBuilder()
+ .setName(server.toString())
+ .setServerId(server.getLogId().getId())
+ .build();
+ private final ServerData serverData = ServerData
+ .newBuilder()
+ .setCallsStarted(1)
+ .setCallsSucceeded(2)
+ .setCallsFailed(3)
+ .setLastCallStartedTimestamp(Timestamps.fromMillis(4))
+ .build();
+ private final Server serverProto = Server
+ .newBuilder()
+ .setRef(serverRef)
+ .setData(serverData)
+ .build();
+
+ private final TestSocket socket = new TestSocket();
+ private final SocketRef socketRef = SocketRef
+ .newBuilder()
+ .setName(socket.toString())
+ .setSocketId(socket.getLogId().getId())
+ .build();
+ private final SocketData socketData = SocketData
+ .newBuilder()
+ .setStreamsStarted(1)
+ .setLastLocalStreamCreatedTimestamp(Timestamps.fromNanos(2))
+ .setLastRemoteStreamCreatedTimestamp(Timestamps.fromNanos(3))
+ .setStreamsSucceeded(4)
+ .setStreamsFailed(5)
+ .setMessagesSent(6)
+ .setMessagesReceived(7)
+ .setKeepAlivesSent(8)
+ .setLastMessageSentTimestamp(Timestamps.fromNanos(9))
+ .setLastMessageReceivedTimestamp(Timestamps.fromNanos(10))
+ .setLocalFlowControlWindow(Int64Value.newBuilder().setValue(11).build())
+ .setRemoteFlowControlWindow(Int64Value.newBuilder().setValue(12).build())
+ .build();
+ private final Address localAddress = Address
+ .newBuilder()
+ .setTcpipAddress(
+ TcpIpAddress
+ .newBuilder()
+ .setIpAddress(ByteString.copyFrom(
+ ((InetSocketAddress) socket.local).getAddress().getAddress()))
+ .build())
+ .build();
+ private final Address remoteAddress = Address
+ .newBuilder()
+ .setTcpipAddress(
+ TcpIpAddress
+ .newBuilder()
+ .setIpAddress(ByteString.copyFrom(
+ ((InetSocketAddress) socket.remote).getAddress().getAddress()))
+ .build())
+ .build();
+
+ @Test
+ public void toChannelRef() {
+ assertEquals(channelRef, ChannelzProtoUtil.toChannelRef(channel));
+ }
+
+ @Test
+ public void toSubchannelRef() {
+ assertEquals(subchannelRef, ChannelzProtoUtil.toSubchannelRef(subchannel));
+ }
+
+ @Test
+ public void toServerRef() {
+ assertEquals(serverRef, ChannelzProtoUtil.toServerRef(server));
+ }
+
+ @Test
+ public void toSocketRef() {
+ assertEquals(socketRef, ChannelzProtoUtil.toSocketRef(socket));
+ }
+
+ @Test
+ public void toState() {
+ for (ConnectivityState connectivityState : ConnectivityState.values()) {
+ assertEquals(
+ connectivityState.name(),
+ ChannelzProtoUtil.toState(connectivityState).getValueDescriptor().getName());
+ }
+ assertEquals(State.UNKNOWN, ChannelzProtoUtil.toState(null));
+ }
+
+ @Test
+ public void toSocket() throws Exception {
+ assertEquals(
+ Socket
+ .newBuilder()
+ .setRef(socketRef)
+ .setLocal(localAddress)
+ .setRemote(remoteAddress)
+ .setData(socketData)
+ .build(),
+ ChannelzProtoUtil.toSocket(socket));
+ }
+
+ @Test
+ public void toSocketData() {
+ assertEquals(
+ socketData,
+ ChannelzProtoUtil.toSocketData(socket.transportStats));
+ }
+
+ @Test
+ public void toAddress_inet() throws Exception {
+ InetSocketAddress inet4 = new InetSocketAddress(Inet4Address.getByName("10.0.0.1"), 1000);
+ assertEquals(
+ Address.newBuilder().setTcpipAddress(
+ TcpIpAddress
+ .newBuilder()
+ .setIpAddress(ByteString.copyFrom(inet4.getAddress().getAddress()))
+ .build())
+ .build(),
+ ChannelzProtoUtil.toAddress(inet4));
+ }
+
+ @Test
+ public void toAddress_uds() throws Exception {
+ String path = "/tmp/foo";
+ DomainSocketAddress uds = new DomainSocketAddress(path);
+ assertEquals(
+ Address.newBuilder().setUdsAddress(
+ UdsAddress
+ .newBuilder()
+ .setFilename(path)
+ .build())
+ .build(),
+ ChannelzProtoUtil.toAddress(uds));
+ }
+
+ @Test
+ public void toAddress_other() throws Exception {
+ final String name = "my name";
+ SocketAddress other = new SocketAddress() {
+ @Override
+ public String toString() {
+ return name;
+ }
+ };
+ assertEquals(
+ Address.newBuilder().setOtherAddress(
+ OtherAddress
+ .newBuilder()
+ .setName(name)
+ .build())
+ .build(),
+ ChannelzProtoUtil.toAddress(other));
+ }
+
+ @Test
+ public void toServer() throws Exception {
+ assertEquals(serverProto, ChannelzProtoUtil.toServer(server));
+ }
+
+ @Test
+ public void toServerData() throws Exception {
+ assertEquals(serverData, ChannelzProtoUtil.toServerData(server.serverStats));
+ }
+
+ @Test
+ public void toChannel() throws Exception {
+ assertEquals(channelProto, ChannelzProtoUtil.toChannel(channel));
+
+ channel.stats = toBuilder(channel.stats)
+ .setSubchannels(ImmutableList.<WithLogId>of(subchannel))
+ .build();
+
+ assertEquals(
+ channelProto
+ .toBuilder()
+ .addSubchannelRef(subchannelRef)
+ .build(),
+ ChannelzProtoUtil.toChannel(channel));
+
+ TestChannel otherSubchannel = new TestChannel();
+ channel.stats = toBuilder(channel.stats)
+ .setSubchannels(ImmutableList.<WithLogId>of(subchannel, otherSubchannel))
+ .build();
+ assertEquals(
+ channelProto
+ .toBuilder()
+ .addSubchannelRef(subchannelRef)
+ .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(otherSubchannel))
+ .build(),
+ ChannelzProtoUtil.toChannel(channel));
+ }
+
+ @Test
+ public void extractChannelData() {
+ assertEquals(channelData, ChannelzProtoUtil.extractChannelData(channel.stats));
+ }
+
+ @Test
+ public void toSubchannel_noChildren() throws Exception {
+ assertEquals(
+ subchannelProto,
+ ChannelzProtoUtil.toSubchannel(subchannel));
+ }
+
+ @Test
+ public void toSubchannel_socketChildren() throws Exception {
+ subchannel.stats = toBuilder(subchannel.stats)
+ .setSockets(ImmutableList.<WithLogId>of(socket))
+ .build();
+
+ assertEquals(
+ subchannelProto.toBuilder()
+ .addSocketRef(socketRef)
+ .build(),
+ ChannelzProtoUtil.toSubchannel(subchannel));
+
+ TestSocket otherSocket = new TestSocket();
+ subchannel.stats = toBuilder(subchannel.stats)
+ .setSockets(ImmutableList.<WithLogId>of(socket, otherSocket))
+ .build();
+ assertEquals(
+ subchannelProto
+ .toBuilder()
+ .addSocketRef(socketRef)
+ .addSocketRef(ChannelzProtoUtil.toSocketRef(otherSocket))
+ .build(),
+ ChannelzProtoUtil.toSubchannel(subchannel));
+ }
+
+ @Test
+ public void toSubchannel_subchannelChildren() throws Exception {
+ TestChannel subchannel1 = new TestChannel();
+ subchannel.stats = toBuilder(subchannel.stats)
+ .setSubchannels(ImmutableList.<WithLogId>of(subchannel1))
+ .build();
+ assertEquals(
+ subchannelProto.toBuilder()
+ .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(subchannel1))
+ .build(),
+ ChannelzProtoUtil.toSubchannel(subchannel));
+
+ TestChannel subchannel2 = new TestChannel();
+ subchannel.stats = toBuilder(subchannel.stats)
+ .setSubchannels(ImmutableList.<WithLogId>of(subchannel1, subchannel2))
+ .build();
+ assertEquals(
+ subchannelProto
+ .toBuilder()
+ .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(subchannel1))
+ .addSubchannelRef(ChannelzProtoUtil.toSubchannelRef(subchannel2))
+ .build(),
+ ChannelzProtoUtil.toSubchannel(subchannel));
+ }
+
+ @Test
+ public void toGetTopChannelsResponse() {
+ // empty results
+ assertEquals(
+ GetTopChannelsResponse.newBuilder().setEnd(true).build(),
+ ChannelzProtoUtil.toGetTopChannelResponse(
+ new RootChannelList(Collections.<Instrumented<ChannelStats>>emptyList(), true)));
+
+ // 1 result, paginated
+ assertEquals(
+ GetTopChannelsResponse
+ .newBuilder()
+ .addChannel(channelProto)
+ .build(),
+ ChannelzProtoUtil.toGetTopChannelResponse(
+ new RootChannelList(ImmutableList.<Instrumented<ChannelStats>>of(channel), false)));
+
+ // 1 result, end
+ assertEquals(
+ GetTopChannelsResponse
+ .newBuilder()
+ .addChannel(channelProto)
+ .setEnd(true)
+ .build(),
+ ChannelzProtoUtil.toGetTopChannelResponse(
+ new RootChannelList(ImmutableList.<Instrumented<ChannelStats>>of(channel), true)));
+
+ // 2 results, end
+ TestChannel channel2 = new TestChannel();
+ assertEquals(
+ GetTopChannelsResponse
+ .newBuilder()
+ .addChannel(channelProto)
+ .addChannel(ChannelzProtoUtil.toChannel(channel2))
+ .build(),
+ ChannelzProtoUtil.toGetTopChannelResponse(
+ new RootChannelList(
+ ImmutableList.<Instrumented<ChannelStats>>of(channel, channel2), false)));
+ }
+
+ @Test
+ public void toGetServersResponse() {
+ // empty results
+ assertEquals(
+ GetServersResponse.getDefaultInstance(),
+ ChannelzProtoUtil.toGetServersResponse(
+ new ServerList(Collections.<Instrumented<ServerStats>>emptyList(), false)));
+
+ // 1 result, paginated
+ assertEquals(
+ GetServersResponse
+ .newBuilder()
+ .addServer(serverProto)
+ .build(),
+ ChannelzProtoUtil.toGetServersResponse(
+ new ServerList(ImmutableList.<Instrumented<ServerStats>>of(server), false)));
+
+ // 1 result, end
+ assertEquals(
+ GetServersResponse
+ .newBuilder()
+ .addServer(serverProto)
+ .setEnd(true)
+ .build(),
+ ChannelzProtoUtil.toGetServersResponse(
+ new ServerList(ImmutableList.<Instrumented<ServerStats>>of(server), true)));
+
+ TestServer server2 = new TestServer();
+ // 2 results, end
+ assertEquals(
+ GetServersResponse
+ .newBuilder()
+ .addServer(serverProto)
+ .addServer(ChannelzProtoUtil.toServer(server2))
+ .build(),
+ ChannelzProtoUtil.toGetServersResponse(
+ new ServerList(ImmutableList.<Instrumented<ServerStats>>of(server, server2), false)));
+ }
+
+ private static ChannelStats.Builder toBuilder(ChannelStats stats) {
+ ChannelStats.Builder builder = new ChannelStats.Builder()
+ .setTarget(stats.target)
+ .setState(stats.state)
+ .setCallsStarted(stats.callsStarted)
+ .setCallsSucceeded(stats.callsSucceeded)
+ .setCallsFailed(stats.callsFailed)
+ .setLastCallStartedMillis(stats.lastCallStartedMillis);
+ if (!stats.subchannels.isEmpty()) {
+ builder.setSubchannels(stats.subchannels);
+ }
+ if (!stats.sockets.isEmpty()) {
+ builder.setSockets(stats.sockets);
+ }
+ return builder;
+ }
+}
diff --git a/services/src/test/java/io/grpc/services/ChannelzServiceTest.java b/services/src/test/java/io/grpc/services/ChannelzServiceTest.java
new file mode 100644
index 000000000..99f4ec37b
--- /dev/null
+++ b/services/src/test/java/io/grpc/services/ChannelzServiceTest.java
@@ -0,0 +1,235 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import io.grpc.Status;
+import io.grpc.StatusRuntimeException;
+import io.grpc.channelz.v1.GetChannelRequest;
+import io.grpc.channelz.v1.GetChannelResponse;
+import io.grpc.channelz.v1.GetServersRequest;
+import io.grpc.channelz.v1.GetServersResponse;
+import io.grpc.channelz.v1.GetSocketRequest;
+import io.grpc.channelz.v1.GetSocketResponse;
+import io.grpc.channelz.v1.GetSubchannelRequest;
+import io.grpc.channelz.v1.GetSubchannelResponse;
+import io.grpc.channelz.v1.GetTopChannelsRequest;
+import io.grpc.channelz.v1.GetTopChannelsResponse;
+import io.grpc.internal.Channelz;
+import io.grpc.services.ChannelzTestHelper.TestChannel;
+import io.grpc.services.ChannelzTestHelper.TestServer;
+import io.grpc.services.ChannelzTestHelper.TestSocket;
+import io.grpc.stub.StreamObserver;
+import java.util.concurrent.ExecutionException;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+import org.mockito.ArgumentCaptor;
+
+@RunWith(JUnit4.class)
+public class ChannelzServiceTest {
+ // small value to force pagination
+ private static final int MAX_PAGE_SIZE = 1;
+
+ private final Channelz channelz = new Channelz();
+ private ChannelzService service = new ChannelzService(channelz, MAX_PAGE_SIZE);
+
+ @Test
+ public void getTopChannels_empty() {
+ assertEquals(
+ GetTopChannelsResponse.newBuilder().setEnd(true).build(),
+ getTopChannelHelper(0));
+ }
+
+ @Test
+ public void getTopChannels_onePage() throws Exception {
+ TestChannel root = new TestChannel();
+ channelz.addRootChannel(root);
+
+ assertEquals(
+ GetTopChannelsResponse
+ .newBuilder()
+ .addChannel(ChannelzProtoUtil.toChannel(root))
+ .setEnd(true)
+ .build(),
+ getTopChannelHelper(0));
+ }
+
+ @Test
+ public void getChannel() throws ExecutionException, InterruptedException {
+ TestChannel root = new TestChannel();
+ assertChannelNotFound(root.getLogId().getId());
+
+ channelz.addRootChannel(root);
+ assertEquals(
+ GetChannelResponse
+ .newBuilder()
+ .setChannel(ChannelzProtoUtil.toChannel(root))
+ .build(),
+ getChannelHelper(root.getLogId().getId()));
+
+ channelz.removeRootChannel(root);
+ assertChannelNotFound(root.getLogId().getId());
+ }
+
+ @Test
+ public void getSubchannel() throws Exception {
+ TestChannel subchannel = new TestChannel();
+ assertSubchannelNotFound(subchannel.getLogId().getId());
+
+ channelz.addSubchannel(subchannel);
+ assertEquals(
+ GetSubchannelResponse
+ .newBuilder()
+ .setSubchannel(ChannelzProtoUtil.toSubchannel(subchannel))
+ .build(),
+ getSubchannelHelper(subchannel.getLogId().getId()));
+
+ channelz.removeSubchannel(subchannel);
+ assertSubchannelNotFound(subchannel.getLogId().getId());
+ }
+
+ @Test
+ public void getServers_empty() {
+ assertEquals(
+ GetServersResponse.newBuilder().setEnd(true).build(),
+ getServersHelper(0));
+ }
+
+ @Test
+ public void getServers_onePage() throws Exception {
+ TestServer server = new TestServer();
+ channelz.addServer(server);
+
+ assertEquals(
+ GetServersResponse
+ .newBuilder()
+ .addServer(ChannelzProtoUtil.toServer(server))
+ .setEnd(true)
+ .build(),
+ getServersHelper(0));
+ }
+
+ @Test
+ public void getSocket() throws Exception {
+ TestSocket socket = new TestSocket();
+ assertSocketNotFound(socket.getLogId().getId());
+
+ channelz.addSocket(socket);
+ assertEquals(
+ GetSocketResponse
+ .newBuilder()
+ .setSocket(ChannelzProtoUtil.toSocket(socket))
+ .build(),
+ getSocketHelper(socket.getLogId().getId()));
+
+ channelz.removeSocket(socket);
+ assertSocketNotFound(socket.getLogId().getId());
+ }
+
+ private GetTopChannelsResponse getTopChannelHelper(long startId) {
+ @SuppressWarnings("unchecked")
+ StreamObserver<GetTopChannelsResponse> observer = mock(StreamObserver.class);
+ ArgumentCaptor<GetTopChannelsResponse> responseCaptor
+ = ArgumentCaptor.forClass(GetTopChannelsResponse.class);
+ service.getTopChannels(
+ GetTopChannelsRequest.newBuilder().setStartChannelId(startId).build(),
+ observer);
+ verify(observer).onNext(responseCaptor.capture());
+ verify(observer).onCompleted();
+ return responseCaptor.getValue();
+ }
+
+ private GetChannelResponse getChannelHelper(long id) {
+ @SuppressWarnings("unchecked")
+ StreamObserver<GetChannelResponse> observer = mock(StreamObserver.class);
+ ArgumentCaptor<GetChannelResponse> response
+ = ArgumentCaptor.forClass(GetChannelResponse.class);
+ service.getChannel(GetChannelRequest.newBuilder().setChannelId(id).build(), observer);
+ verify(observer).onNext(response.capture());
+ verify(observer).onCompleted();
+ return response.getValue();
+ }
+
+ private void assertChannelNotFound(long id) {
+ @SuppressWarnings("unchecked")
+ StreamObserver<GetChannelResponse> observer = mock(StreamObserver.class);
+ ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
+ service.getChannel(GetChannelRequest.newBuilder().setChannelId(id).build(), observer);
+ verify(observer).onError(exceptionCaptor.capture());
+ Exception exception = exceptionCaptor.getValue();
+ assertEquals(Status.NOT_FOUND, ((StatusRuntimeException) exception).getStatus());
+ }
+
+ private GetSubchannelResponse getSubchannelHelper(long id) {
+ @SuppressWarnings("unchecked")
+ StreamObserver<GetSubchannelResponse> observer = mock(StreamObserver.class);
+ ArgumentCaptor<GetSubchannelResponse> response
+ = ArgumentCaptor.forClass(GetSubchannelResponse.class);
+ service.getSubchannel(GetSubchannelRequest.newBuilder().setSubchannelId(id).build(), observer);
+ verify(observer).onNext(response.capture());
+ verify(observer).onCompleted();
+ return response.getValue();
+ }
+
+ private void assertSubchannelNotFound(long id) {
+ @SuppressWarnings("unchecked")
+ StreamObserver<GetSubchannelResponse> observer = mock(StreamObserver.class);
+ ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
+ service.getSubchannel(GetSubchannelRequest.newBuilder().setSubchannelId(id).build(), observer);
+ verify(observer).onError(exceptionCaptor.capture());
+ Exception exception = exceptionCaptor.getValue();
+ assertEquals(Status.NOT_FOUND, ((StatusRuntimeException) exception).getStatus());
+ }
+
+ private GetServersResponse getServersHelper(long startId) {
+ @SuppressWarnings("unchecked")
+ StreamObserver<GetServersResponse> observer = mock(StreamObserver.class);
+ ArgumentCaptor<GetServersResponse> responseCaptor
+ = ArgumentCaptor.forClass(GetServersResponse.class);
+ service.getServers(
+ GetServersRequest.newBuilder().setStartServerId(startId).build(),
+ observer);
+ verify(observer).onNext(responseCaptor.capture());
+ verify(observer).onCompleted();
+ return responseCaptor.getValue();
+ }
+
+ private void assertSocketNotFound(long id) {
+ @SuppressWarnings("unchecked")
+ StreamObserver<GetSocketResponse> observer = mock(StreamObserver.class);
+ ArgumentCaptor<Exception> exceptionCaptor = ArgumentCaptor.forClass(Exception.class);
+ service.getSocket(GetSocketRequest.newBuilder().setSocketId(id).build(), observer);
+ verify(observer).onError(exceptionCaptor.capture());
+ Exception exception = exceptionCaptor.getValue();
+ assertEquals(Status.NOT_FOUND, ((StatusRuntimeException) exception).getStatus());
+ }
+
+ private GetSocketResponse getSocketHelper(long id) {
+ @SuppressWarnings("unchecked")
+ StreamObserver<GetSocketResponse> observer = mock(StreamObserver.class);
+ ArgumentCaptor<GetSocketResponse> response
+ = ArgumentCaptor.forClass(GetSocketResponse.class);
+ service.getSocket(GetSocketRequest.newBuilder().setSocketId(id).build(), observer);
+ verify(observer).onNext(response.capture());
+ verify(observer).onCompleted();
+ return response.getValue();
+ }
+}
diff --git a/services/src/test/java/io/grpc/services/ChannelzTestHelper.java b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java
new file mode 100644
index 000000000..7389e2d6a
--- /dev/null
+++ b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java
@@ -0,0 +1,118 @@
+/*
+ * Copyright 2018, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.services;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
+import io.grpc.ConnectivityState;
+import io.grpc.internal.Channelz.ChannelStats;
+import io.grpc.internal.Channelz.Security;
+import io.grpc.internal.Channelz.ServerStats;
+import io.grpc.internal.Channelz.SocketStats;
+import io.grpc.internal.Channelz.TransportStats;
+import io.grpc.internal.Instrumented;
+import io.grpc.internal.LogId;
+import io.grpc.internal.WithLogId;
+import java.net.InetSocketAddress;
+import java.net.SocketAddress;
+import java.util.Collections;
+
+/**
+ * Test class definitions that will be used in the proto utils test as well as
+ * channelz service test.
+ */
+final class ChannelzTestHelper {
+
+ static final class TestSocket implements Instrumented<SocketStats> {
+ private final LogId id = LogId.allocate("socket");
+ TransportStats transportStats = new TransportStats(
+ /*streamsStarted=*/ 1,
+ /*lastLocalStreamCreatedTimeNanos=*/ 2,
+ /*lastRemoteStreamCreatedTimeNanos=*/ 3,
+ /*streamsSucceeded=*/ 4,
+ /*streamsFailed=*/ 5,
+ /*messagesSent=*/ 6,
+ /*messagesReceived=*/ 7,
+ /*keepAlivesSent=*/ 8,
+ /*lastMessageSentTimeNanos=*/ 9,
+ /*lastMessageReceivedTimeNanos=*/ 10,
+ /*localFlowControlWindow=*/ 11,
+ /*remoteFlowControlWindow=*/ 12);
+ SocketAddress local = new InetSocketAddress("10.0.0.1", 1000);
+ SocketAddress remote = new InetSocketAddress("10.0.0.2", 1000);
+
+
+ @Override
+ public ListenableFuture<SocketStats> getStats() {
+ SettableFuture<SocketStats> ret = SettableFuture.create();
+ ret.set(new SocketStats(transportStats, local, remote, new Security()));
+ return ret;
+ }
+
+ @Override
+ public LogId getLogId() {
+ return id;
+ }
+ }
+
+ static final class TestServer implements Instrumented<ServerStats> {
+ private final LogId id = LogId.allocate("server");
+ ServerStats serverStats = new ServerStats(
+ /*callsStarted=*/ 1,
+ /*callsSucceeded=*/ 2,
+ /*callsFailed=*/ 3,
+ /*lastCallStartedMillis=*/ 4);
+
+ @Override
+ public ListenableFuture<ServerStats> getStats() {
+ SettableFuture<ServerStats> ret = SettableFuture.create();
+ ret.set(serverStats);
+ return ret;
+ }
+
+ @Override
+ public LogId getLogId() {
+ return id;
+ }
+ }
+
+ static final class TestChannel implements Instrumented<ChannelStats> {
+ private final LogId id = LogId.allocate("channel-or-subchannel");
+
+ ChannelStats stats = new ChannelStats(
+ /*target=*/ "sometarget",
+ /*connectivityState=*/ ConnectivityState.READY,
+ /*callsStarted=*/ 1,
+ /*callsSucceeded=*/ 2,
+ /*callsFailed=*/ 3,
+ /*lastCallStartedMillis=*/ 4,
+ /*subchannels=*/ Collections.<WithLogId>emptyList(),
+ /*sockets=*/ Collections.<WithLogId>emptyList());
+
+ @Override
+ public ListenableFuture<ChannelStats> getStats() {
+ SettableFuture<ChannelStats> ret = SettableFuture.create();
+ ret.set(stats);
+ return ret;
+ }
+
+ @Override
+ public LogId getLogId() {
+ return id;
+ }
+ }
+}