aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--core/src/main/java/io/grpc/inprocess/InProcessServer.java7
-rw-r--r--core/src/main/java/io/grpc/internal/Channelz.java44
-rw-r--r--core/src/main/java/io/grpc/internal/InternalServer.java7
-rw-r--r--core/src/main/java/io/grpc/internal/ServerImpl.java4
-rw-r--r--core/src/test/java/io/grpc/internal/ServerImplTest.java6
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientTransport.java13
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServer.java92
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServerBuilder.java2
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServerTransport.java13
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java5
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyServerTest.java74
-rw-r--r--services/src/main/java/io/grpc/services/ChannelzProtoUtil.java27
-rw-r--r--services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java80
-rw-r--r--services/src/test/java/io/grpc/services/ChannelzTestHelper.java33
14 files changed, 373 insertions, 34 deletions
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java
index 5859c2528..2ba9fcfcd 100644
--- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java
+++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java
@@ -19,6 +19,8 @@ package io.grpc.inprocess;
import static com.google.common.base.Preconditions.checkNotNull;
import io.grpc.ServerStreamTracer;
+import io.grpc.internal.Channelz.SocketStats;
+import io.grpc.internal.Instrumented;
import io.grpc.internal.InternalServer;
import io.grpc.internal.ObjectPool;
import io.grpc.internal.ServerListener;
@@ -77,6 +79,11 @@ final class InProcessServer implements InternalServer {
}
@Override
+ public List<Instrumented<SocketStats>> getListenSockets() {
+ return Collections.emptyList();
+ }
+
+ @Override
public void shutdown() {
if (!registry.remove(name, this)) {
throw new AssertionError();
diff --git a/core/src/main/java/io/grpc/internal/Channelz.java b/core/src/main/java/io/grpc/internal/Channelz.java
index 9b4a15a62..fe91970ed 100644
--- a/core/src/main/java/io/grpc/internal/Channelz.java
+++ b/core/src/main/java/io/grpc/internal/Channelz.java
@@ -42,8 +42,8 @@ public final class Channelz {
= new ConcurrentSkipListMap<Long, Instrumented<ChannelStats>>();
private final ConcurrentMap<Long, Instrumented<ChannelStats>> subchannels
= new ConcurrentHashMap<Long, Instrumented<ChannelStats>>();
- // An InProcessTransport can appear in both clientSockets and perServerSockets simultaneously
- private final ConcurrentMap<Long, Instrumented<SocketStats>> clientSockets
+ // An InProcessTransport can appear in both otherSockets and perServerSockets simultaneously
+ private final ConcurrentMap<Long, Instrumented<SocketStats>> otherSockets
= new ConcurrentHashMap<Long, Instrumented<SocketStats>>();
private final ConcurrentMap<Long, ServerSocketMap> perServerSockets
= new ConcurrentHashMap<Long, ServerSocketMap>();
@@ -81,7 +81,11 @@ public final class Channelz {
/** Adds a socket. */
public void addClientSocket(Instrumented<SocketStats> socket) {
- add(clientSockets, socket);
+ add(otherSockets, socket);
+ }
+
+ public void addListenSocket(Instrumented<SocketStats> socket) {
+ add(otherSockets, socket);
}
/** Adds a server socket. */
@@ -108,7 +112,11 @@ public final class Channelz {
}
public void removeClientSocket(Instrumented<SocketStats> socket) {
- remove(clientSockets, socket);
+ remove(otherSockets, socket);
+ }
+
+ public void removeListenSocket(Instrumented<SocketStats> socket) {
+ remove(otherSockets, socket);
}
/** Removes a server socket. */
@@ -174,7 +182,7 @@ public final class Channelz {
/** Returns a socket. */
@Nullable
public Instrumented<SocketStats> getSocket(long id) {
- Instrumented<SocketStats> clientSocket = clientSockets.get(id);
+ Instrumented<SocketStats> clientSocket = otherSockets.get(id);
if (clientSocket != null) {
return clientSocket;
}
@@ -207,7 +215,7 @@ public final class Channelz {
@VisibleForTesting
public boolean containsClientSocket(LogId transportRef) {
- return contains(clientSockets, transportRef);
+ return contains(otherSockets, transportRef);
}
private static <T extends Instrumented<?>> void add(Map<Long, T> map, T object) {
@@ -263,7 +271,7 @@ public final class Channelz {
public final long callsSucceeded;
public final long callsFailed;
public final long lastCallStartedMillis;
- // TODO(zpencer): add listen sockets
+ public final List<Instrumented<SocketStats>> listenSockets;
/**
* Creates an instance.
@@ -272,11 +280,13 @@ public final class Channelz {
long callsStarted,
long callsSucceeded,
long callsFailed,
- long lastCallStartedMillis) {
+ long lastCallStartedMillis,
+ List<Instrumented<SocketStats>> listenSockets) {
this.callsStarted = callsStarted;
this.callsSucceeded = callsSucceeded;
this.callsFailed = callsFailed;
this.lastCallStartedMillis = lastCallStartedMillis;
+ this.listenSockets = Preconditions.checkNotNull(listenSockets);
}
public static final class Builder {
@@ -284,6 +294,7 @@ public final class Channelz {
private long callsSucceeded;
private long callsFailed;
private long lastCallStartedMillis;
+ public List<Instrumented<SocketStats>> listenSockets = Collections.emptyList();
public Builder setCallsStarted(long callsStarted) {
this.callsStarted = callsStarted;
@@ -305,6 +316,14 @@ public final class Channelz {
return this;
}
+ /** Sets the listen sockets. */
+ public Builder setListenSockets(List<Instrumented<SocketStats>> listenSockets) {
+ Preconditions.checkNotNull(listenSockets);
+ this.listenSockets = Collections.unmodifiableList(
+ new ArrayList<Instrumented<SocketStats>>(listenSockets));
+ return this;
+ }
+
/**
* Builds an instance.
*/
@@ -313,7 +332,8 @@ public final class Channelz {
callsStarted,
callsSucceeded,
callsFailed,
- lastCallStartedMillis);
+ lastCallStartedMillis,
+ listenSockets);
}
}
}
@@ -434,11 +454,11 @@ public final class Channelz {
}
public static final class SocketStats {
- public final TransportStats data;
+ @Nullable public final TransportStats data;
public final SocketAddress local;
- public final SocketAddress remote;
- public final Security security;
+ @Nullable public final SocketAddress remote;
public final SocketOptions socketOptions;
+ @Nullable public final Security security;
/** Creates an instance. */
public SocketStats(
diff --git a/core/src/main/java/io/grpc/internal/InternalServer.java b/core/src/main/java/io/grpc/internal/InternalServer.java
index 72e1c56d5..2937e1f19 100644
--- a/core/src/main/java/io/grpc/internal/InternalServer.java
+++ b/core/src/main/java/io/grpc/internal/InternalServer.java
@@ -16,7 +16,9 @@
package io.grpc.internal;
+import io.grpc.internal.Channelz.SocketStats;
import java.io.IOException;
+import java.util.List;
import javax.annotation.concurrent.ThreadSafe;
/**
@@ -47,4 +49,9 @@ public interface InternalServer {
* available or does not make sense.
*/
int getPort();
+
+ /**
+ * Returns the listen sockets of this server. May return an empty list but never returns null.
+ */
+ List<Instrumented<SocketStats>> getListenSockets();
}
diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java
index 6f67bc581..f9f646b1f 100644
--- a/core/src/main/java/io/grpc/internal/ServerImpl.java
+++ b/core/src/main/java/io/grpc/internal/ServerImpl.java
@@ -571,7 +571,9 @@ public final class ServerImpl extends io.grpc.Server implements Instrumented<Ser
@Override
public ListenableFuture<ServerStats> getStats() {
- ServerStats.Builder builder = new ServerStats.Builder();
+ ServerStats.Builder builder
+ = new ServerStats.Builder()
+ .setListenSockets(transportServer.getListenSockets());
serverCallTracer.updateBuilder(builder);
SettableFuture<ServerStats> ret = SettableFuture.create();
ret.set(builder.build());
diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java
index 2686fa88a..6ca82216c 100644
--- a/core/src/test/java/io/grpc/internal/ServerImplTest.java
+++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java
@@ -81,6 +81,7 @@ import java.io.InputStream;
import java.net.SocketAddress;
import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collections;
import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.CyclicBarrier;
@@ -1464,6 +1465,11 @@ public class ServerImplTest {
}
@Override
+ public List<Instrumented<SocketStats>> getListenSockets() {
+ return Collections.emptyList();
+ }
+
+ @Override
public void shutdown() {
listener.serverShutdown();
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 0b3c48b41..0020c54f5 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -50,6 +50,8 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
import io.netty.util.AsciiString;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
@@ -339,7 +341,16 @@ class NettyClientTransport implements ConnectionClientTransport {
Utils.getSocketOptions(channel),
new Security()));
}
- });
+ })
+ .addListener(
+ new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess()) {
+ result.setException(future.cause());
+ }
+ }
+ });
return result;
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index 81c09cb98..2e3dcaf5c 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -21,7 +21,14 @@ import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.grpc.ServerStreamTracer;
+import io.grpc.internal.Channelz;
+import io.grpc.internal.Channelz.SocketStats;
+import io.grpc.internal.Instrumented;
import io.grpc.internal.InternalServer;
import io.grpc.internal.LogId;
import io.grpc.internal.ServerListener;
@@ -41,6 +48,8 @@ import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -82,6 +91,10 @@ class NettyServer implements InternalServer, WithLogId {
private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter();
private final List<ServerStreamTracer.Factory> streamTracerFactories;
private final TransportTracer.Factory transportTracerFactory;
+ private final Channelz channelz;
+ // Only set once during start(). This code assumes all listen sockets are created at startup
+ // and never changed. In the future we may have >1 listen socket.
+ private ImmutableList<Instrumented<SocketStats>> listenSockets;
NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
@@ -93,7 +106,8 @@ class NettyServer implements InternalServer, WithLogId {
long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
- boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) {
+ boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
+ Channelz channelz) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
checkNotNull(channelOptions, "channelOptions");
@@ -116,6 +130,7 @@ class NettyServer implements InternalServer, WithLogId {
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
+ this.channelz = Preconditions.checkNotNull(channelz);
}
@Override
@@ -131,6 +146,11 @@ class NettyServer implements InternalServer, WithLogId {
}
@Override
+ public List<Instrumented<SocketStats>> getListenSockets() {
+ return listenSockets;
+ }
+
+ @Override
public void start(ServerListener serverListener) throws IOException {
listener = checkNotNull(serverListener, "serverListener");
@@ -220,7 +240,14 @@ class NettyServer implements InternalServer, WithLogId {
}
});
// Bind and start to accept incoming connections.
- ChannelFuture future = b.bind(address);
+ ChannelFuture future = b.bind(address).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f) throws Exception {
+ Instrumented<SocketStats> listenSocket = new ListenSocket(f.channel());
+ listenSockets = ImmutableList.of(listenSocket);
+ channelz.addListenSocket(listenSocket);
+ }
+ });
try {
future.await();
} catch (InterruptedException ex) {
@@ -245,6 +272,10 @@ class NettyServer implements InternalServer, WithLogId {
if (!future.isSuccess()) {
log.log(Level.WARNING, "Error shutting down server", future.cause());
}
+ for (Instrumented<SocketStats> listenSocket : listenSockets) {
+ channelz.removeListenSocket(listenSocket);
+ }
+ listenSockets = null;
synchronized (NettyServer.this) {
listener.serverShutdown();
}
@@ -291,4 +322,61 @@ class NettyServer implements InternalServer, WithLogId {
return this;
}
}
+
+ /**
+ * A class that can answer channelz queries about the server listen sockets.
+ */
+ private static final class ListenSocket implements Instrumented<SocketStats> {
+ private final LogId id = LogId.allocate(getClass().getName());
+ private final Channel ch;
+
+ ListenSocket(Channel ch) {
+ this.ch = ch;
+ }
+
+ @Override
+ public ListenableFuture<SocketStats> getStats() {
+ final SettableFuture<SocketStats> ret = SettableFuture.create();
+ if (ch.eventLoop().inEventLoop()) {
+ // This is necessary, otherwise we will block forever if we get the future from inside
+ // the event loop.
+ ret.set(new SocketStats(
+ /*data=*/ null,
+ ch.localAddress(),
+ /*remoteAddress=*/ null,
+ Utils.getSocketOptions(ch),
+ /*security=*/ null));
+ return ret;
+ }
+ ch.eventLoop()
+ .submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ ret.set(new SocketStats(
+ /*data=*/ null,
+ ch.localAddress(),
+ /*remoteAddress=*/ null,
+ Utils.getSocketOptions(ch),
+ /*security=*/ null));
+ }
+ })
+ .addListener(
+ new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess()) {
+ ret.setException(future.cause());
+ }
+ }
+ });
+
+ return ret;
+ }
+
+ @Override
+ public LogId getLogId() {
+ return id;
+ }
+ }
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index 8847de89d..4e7df6380 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -425,7 +425,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
- permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
+ permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, channelz);
}
@Override
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
index fe905bb2d..c76ca6673 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
@@ -33,6 +33,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
@@ -222,7 +224,16 @@ class NettyServerTransport implements ServerTransport {
Utils.getSocketOptions(channel),
/*security=*/ null));
}
- });
+ })
+ .addListener(
+ new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess()) {
+ result.setException(future.cause());
+ }
+ }
+ });
return result;
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 7e0723820..fd2120837 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -48,6 +48,7 @@ import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
+import io.grpc.internal.Channelz;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
@@ -107,6 +108,7 @@ public class NettyClientTransportTest {
private final List<NettyClientTransport> transports = new ArrayList<NettyClientTransport>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
private final EchoServerListener serverListener = new EchoServerListener();
+ private final Channelz channelz = new Channelz();
private Runnable tooManyPingsRunnable = new Runnable() {
// Throwing is useless in this method, because Netty doesn't propagate the exception
@Override public void run() {}
@@ -604,7 +606,8 @@ public class NettyClientTransportTest {
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
MAX_CONNECTION_IDLE_NANOS_DISABLED,
- MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0);
+ MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0,
+ channelz);
server.start(serverListener);
address = TestUtils.testServerAddress(server.getPort());
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index a984f71a3..e6d64f490 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -16,9 +16,18 @@
package io.grpc.netty;
+import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.internal.Channelz.id;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import com.google.common.util.concurrent.SettableFuture;
import io.grpc.ServerStreamTracer;
+import io.grpc.internal.Channelz;
+import io.grpc.internal.Channelz.SocketStats;
+import io.grpc.internal.Instrumented;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
@@ -41,6 +50,7 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class NettyServerTest {
+ private final Channelz channelz = new Channelz();
@Test
public void getPort() throws Exception {
@@ -61,7 +71,8 @@ public class NettyServerTest {
1, // ignore
1, 1, // ignore
1, 1, // ignore
- true, 0); // ignore
+ true, 0, // ignore
+ channelz);
ns.start(new ServerListener() {
@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
@@ -98,7 +109,8 @@ public class NettyServerTest {
1, // ignore
1, 1, // ignore
1, 1, // ignore
- true, 0); // ignore
+ true, 0, // ignore
+ channelz);
assertThat(ns.getPort()).isEqualTo(-1);
}
@@ -135,7 +147,8 @@ public class NettyServerTest {
1, // ignore
1, 1, // ignore
1, 1, // ignore
- true, 0); // ignore
+ true, 0, // ignore
+ channelz);
ns.start(new ServerListener() {
@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
@@ -164,4 +177,59 @@ public class NettyServerTest {
ns.shutdown();
}
+
+ @Test
+ public void channelzListenSocket() throws Exception {
+ InetSocketAddress addr = new InetSocketAddress(0);
+ NettyServer ns = new NettyServer(
+ addr,
+ NioServerSocketChannel.class,
+ new HashMap<ChannelOption<?>, Object>(),
+ null, // no boss group
+ null, // no event group
+ new ProtocolNegotiators.PlaintextNegotiator(),
+ Collections.<ServerStreamTracer.Factory>emptyList(),
+ TransportTracer.getDefaultFactory(),
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, 1, // ignore
+ 1, 1, // ignore
+ true, 0, // ignore
+ channelz);
+ final SettableFuture<Void> shutdownCompleted = SettableFuture.create();
+ ns.start(new ServerListener() {
+ @Override
+ public ServerTransportListener transportCreated(ServerTransport transport) {
+ return null;
+ }
+
+ @Override
+ public void serverShutdown() {
+ shutdownCompleted.set(null);
+ }
+ });
+ assertThat(ns.getPort()).isGreaterThan(0);
+
+ Instrumented<SocketStats> listenSocket = getOnlyElement(ns.getListenSockets());
+ assertSame(listenSocket, channelz.getSocket(id(listenSocket)));
+
+ // very basic sanity check of the contents
+ SocketStats socketStats = listenSocket.getStats().get();
+ assertEquals(ns.getPort(), ((InetSocketAddress) socketStats.local).getPort());
+ assertNull(socketStats.remote);
+
+ // TODO(zpencer): uncomment when sock options are exposed
+ // by default, there are some socket options set on the listen socket
+ // assertThat(socketStats.socketOptions.additional).isNotEmpty();
+
+ // Cleanup
+ ns.shutdown();
+ shutdownCompleted.get();
+
+ // listen socket is removed
+ assertNull(channelz.getSocket(id(listenSocket)));
+ }
}
diff --git a/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java
index ca8f58228..dea047d04 100644
--- a/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java
+++ b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java
@@ -40,6 +40,7 @@ import io.grpc.channelz.v1.Server;
import io.grpc.channelz.v1.ServerData;
import io.grpc.channelz.v1.ServerRef;
import io.grpc.channelz.v1.Socket;
+import io.grpc.channelz.v1.Socket.Builder;
import io.grpc.channelz.v1.SocketData;
import io.grpc.channelz.v1.SocketOption;
import io.grpc.channelz.v1.SocketOptionLinger;
@@ -106,11 +107,14 @@ final class ChannelzProtoUtil {
static Server toServer(Instrumented<ServerStats> obj) {
ServerStats stats = getFuture(obj.getStats());
- return Server
+ Server.Builder builder = Server
.newBuilder()
.setRef(toServerRef(obj))
- .setData(toServerData(stats))
- .build();
+ .setData(toServerData(stats));
+ for (Instrumented<SocketStats> listenSocket : stats.listenSockets) {
+ builder.addListenSocket(toSocketRef(listenSocket));
+ }
+ return builder.build();
}
static ServerData toServerData(ServerStats stats) {
@@ -125,15 +129,21 @@ final class ChannelzProtoUtil {
static Socket toSocket(Instrumented<SocketStats> obj) {
SocketStats socketStats = getFuture(obj.getStats());
- return Socket.newBuilder()
+ Builder builder = Socket.newBuilder()
.setRef(toSocketRef(obj))
- .setRemote(toAddress(socketStats.remote))
- .setLocal(toAddress(socketStats.local))
- .setData(extractSocketData(socketStats))
- .build();
+ .setLocal(toAddress(socketStats.local));
+ // listen sockets do not have remote nor data
+ if (socketStats.remote != null) {
+ builder.setRemote(toAddress(socketStats.remote));
+ }
+ if (socketStats.data != null) {
+ builder.setData(extractSocketData(socketStats));
+ }
+ return builder.build();
}
static Address toAddress(SocketAddress address) {
+ Preconditions.checkNotNull(address);
Address.Builder builder = Address.newBuilder();
if (address instanceof InetSocketAddress) {
InetSocketAddress inetAddress = (InetSocketAddress) address;
@@ -142,6 +152,7 @@ final class ChannelzProtoUtil {
.newBuilder()
.setIpAddress(
ByteString.copyFrom(inetAddress.getAddress().getAddress()))
+ .setPort(inetAddress.getPort())
.build());
} else if (address.getClass().getName().endsWith("io.netty.channel.unix.DomainSocketAddress")) {
builder.setUdsAddress(
diff --git a/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java
index 83e3c6dad..e3697f4b9 100644
--- a/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java
+++ b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java
@@ -17,6 +17,7 @@
package io.grpc.services;
import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.internal.Channelz.id;
import static org.junit.Assert.assertEquals;
import com.google.common.collect.ImmutableList;
@@ -55,9 +56,11 @@ import io.grpc.internal.Channelz.ServerList;
import io.grpc.internal.Channelz.ServerSocketsList;
import io.grpc.internal.Channelz.ServerStats;
import io.grpc.internal.Channelz.SocketOptions;
+import io.grpc.internal.Channelz.SocketStats;
import io.grpc.internal.Instrumented;
import io.grpc.internal.WithLogId;
import io.grpc.services.ChannelzTestHelper.TestChannel;
+import io.grpc.services.ChannelzTestHelper.TestListenSocket;
import io.grpc.services.ChannelzTestHelper.TestServer;
import io.grpc.services.ChannelzTestHelper.TestSocket;
import io.netty.channel.unix.DomainSocketAddress;
@@ -168,6 +171,23 @@ public final class ChannelzProtoUtilTest {
.setValue("some-made-up-value")
.build();
+ private final TestListenSocket listenSocket = new TestListenSocket();
+ private final SocketRef listenSocketRef = SocketRef
+ .newBuilder()
+ .setName(listenSocket.toString())
+ .setSocketId(id(listenSocket))
+ .build();
+ private final Address listenAddress = Address
+ .newBuilder()
+ .setTcpipAddress(
+ TcpIpAddress
+ .newBuilder()
+ .setIpAddress(ByteString.copyFrom(
+ ((InetSocketAddress) listenSocket.listenAddress).getAddress().getAddress()))
+ .setPort(1234)
+ .build())
+ .build();
+
private final TestSocket socket = new TestSocket();
private final SocketRef socketRef = SocketRef
.newBuilder()
@@ -196,6 +216,7 @@ public final class ChannelzProtoUtilTest {
.newBuilder()
.setIpAddress(ByteString.copyFrom(
((InetSocketAddress) socket.local).getAddress().getAddress()))
+ .setPort(1000)
.build())
.build();
private final Address remoteAddress = Address
@@ -205,6 +226,7 @@ public final class ChannelzProtoUtilTest {
.newBuilder()
.setIpAddress(ByteString.copyFrom(
((InetSocketAddress) socket.remote).getAddress().getAddress()))
+ .setPort(1000)
.build())
.build();
@@ -271,6 +293,26 @@ public final class ChannelzProtoUtilTest {
}
@Test
+ public void toSocket_listenSocket() {
+ assertEquals(
+ Socket
+ .newBuilder()
+ .setRef(listenSocketRef)
+ .setLocal(listenAddress)
+ .build(),
+ ChannelzProtoUtil.toSocket(listenSocket));
+ }
+
+ @Test
+ public void toSocketData() throws Exception {
+ assertEquals(
+ socketDataNoSockOpts
+ .toBuilder()
+ .build(),
+ ChannelzProtoUtil.extractSocketData(socket.getStats().get()));
+ }
+
+ @Test
public void toAddress_inet() throws Exception {
InetSocketAddress inet4 = new InetSocketAddress(Inet4Address.getByName("10.0.0.1"), 1000);
assertEquals(
@@ -278,6 +320,7 @@ public final class ChannelzProtoUtilTest {
TcpIpAddress
.newBuilder()
.setIpAddress(ByteString.copyFrom(inet4.getAddress().getAddress()))
+ .setPort(1000)
.build())
.build(),
ChannelzProtoUtil.toAddress(inet4));
@@ -318,7 +361,34 @@ public final class ChannelzProtoUtilTest {
@Test
public void toServer() throws Exception {
+ // no listen sockets
assertEquals(serverProto, ChannelzProtoUtil.toServer(server));
+
+ // 1 listen socket
+ server.serverStats = toBuilder(server.serverStats)
+ .setListenSockets(ImmutableList.<Instrumented<SocketStats>>of(listenSocket))
+ .build();
+ assertEquals(
+ serverProto
+ .toBuilder()
+ .addListenSocket(listenSocketRef)
+ .build(),
+ ChannelzProtoUtil.toServer(server));
+
+ // multiple listen sockets
+ TestListenSocket otherListenSocket = new TestListenSocket();
+ SocketRef otherListenSocketRef = ChannelzProtoUtil.toSocketRef(otherListenSocket);
+ server.serverStats = toBuilder(server.serverStats)
+ .setListenSockets(
+ ImmutableList.<Instrumented<SocketStats>>of(listenSocket, otherListenSocket))
+ .build();
+ assertEquals(
+ serverProto
+ .toBuilder()
+ .addListenSocket(listenSocketRef)
+ .addListenSocket(otherListenSocketRef)
+ .build(),
+ ChannelzProtoUtil.toServer(server));
}
@Test
@@ -612,6 +682,7 @@ public final class ChannelzProtoUtilTest {
return builder;
}
+
private static SocketOptions.Builder toBuilder(SocketOptions options) {
SocketOptions.Builder builder = new SocketOptions.Builder()
.setSocketOptionTimeoutMillis(options.soTimeoutMillis)
@@ -621,4 +692,13 @@ public final class ChannelzProtoUtilTest {
}
return builder;
}
+
+ private static ServerStats.Builder toBuilder(ServerStats stats) {
+ return new ServerStats.Builder()
+ .setCallsStarted(stats.callsStarted)
+ .setCallsSucceeded(stats.callsSucceeded)
+ .setCallsFailed(stats.callsFailed)
+ .setLastCallStartedMillis(stats.lastCallStartedMillis)
+ .setListenSockets(stats.listenSockets);
+ }
}
diff --git a/services/src/test/java/io/grpc/services/ChannelzTestHelper.java b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java
index baadc10f3..4530a7cc7 100644
--- a/services/src/test/java/io/grpc/services/ChannelzTestHelper.java
+++ b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java
@@ -23,6 +23,7 @@ import io.grpc.internal.Channelz;
import io.grpc.internal.Channelz.ChannelStats;
import io.grpc.internal.Channelz.Security;
import io.grpc.internal.Channelz.ServerStats;
+import io.grpc.internal.Channelz.SocketOptions;
import io.grpc.internal.Channelz.SocketStats;
import io.grpc.internal.Channelz.TransportStats;
import io.grpc.internal.Instrumented;
@@ -76,13 +77,37 @@ final class ChannelzTestHelper {
}
}
+ static final class TestListenSocket implements Instrumented<SocketStats> {
+ private final LogId id = LogId.allocate("listensocket");
+ SocketAddress listenAddress = new InetSocketAddress("10.0.0.1", 1234);
+
+ @Override
+ public ListenableFuture<SocketStats> getStats() {
+ SettableFuture<SocketStats> ret = SettableFuture.create();
+ ret.set(
+ new SocketStats(
+ /*data=*/ null,
+ listenAddress,
+ /*remoteAddress=*/ null,
+ new SocketOptions.Builder().build(),
+ /*security=*/ null));
+ return ret;
+ }
+
+ @Override
+ public LogId getLogId() {
+ return id;
+ }
+ }
+
static final class TestServer implements Instrumented<ServerStats> {
private final LogId id = LogId.allocate("server");
ServerStats serverStats = new ServerStats(
- /*callsStarted=*/ 1,
- /*callsSucceeded=*/ 2,
- /*callsFailed=*/ 3,
- /*lastCallStartedMillis=*/ 4);
+ /*callsStarted=*/ 1,
+ /*callsSucceeded=*/ 2,
+ /*callsFailed=*/ 3,
+ /*lastCallStartedMillis=*/ 4,
+ Collections.<Instrumented<SocketStats>>emptyList());
@Override
public ListenableFuture<ServerStats> getStats() {