diff options
14 files changed, 373 insertions, 34 deletions
diff --git a/core/src/main/java/io/grpc/inprocess/InProcessServer.java b/core/src/main/java/io/grpc/inprocess/InProcessServer.java index 5859c2528..2ba9fcfcd 100644 --- a/core/src/main/java/io/grpc/inprocess/InProcessServer.java +++ b/core/src/main/java/io/grpc/inprocess/InProcessServer.java @@ -19,6 +19,8 @@ package io.grpc.inprocess; import static com.google.common.base.Preconditions.checkNotNull; import io.grpc.ServerStreamTracer; +import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.Instrumented; import io.grpc.internal.InternalServer; import io.grpc.internal.ObjectPool; import io.grpc.internal.ServerListener; @@ -77,6 +79,11 @@ final class InProcessServer implements InternalServer { } @Override + public List<Instrumented<SocketStats>> getListenSockets() { + return Collections.emptyList(); + } + + @Override public void shutdown() { if (!registry.remove(name, this)) { throw new AssertionError(); diff --git a/core/src/main/java/io/grpc/internal/Channelz.java b/core/src/main/java/io/grpc/internal/Channelz.java index 9b4a15a62..fe91970ed 100644 --- a/core/src/main/java/io/grpc/internal/Channelz.java +++ b/core/src/main/java/io/grpc/internal/Channelz.java @@ -42,8 +42,8 @@ public final class Channelz { = new ConcurrentSkipListMap<Long, Instrumented<ChannelStats>>(); private final ConcurrentMap<Long, Instrumented<ChannelStats>> subchannels = new ConcurrentHashMap<Long, Instrumented<ChannelStats>>(); - // An InProcessTransport can appear in both clientSockets and perServerSockets simultaneously - private final ConcurrentMap<Long, Instrumented<SocketStats>> clientSockets + // An InProcessTransport can appear in both otherSockets and perServerSockets simultaneously + private final ConcurrentMap<Long, Instrumented<SocketStats>> otherSockets = new ConcurrentHashMap<Long, Instrumented<SocketStats>>(); private final ConcurrentMap<Long, ServerSocketMap> perServerSockets = new ConcurrentHashMap<Long, ServerSocketMap>(); @@ -81,7 +81,11 @@ public final class Channelz { /** Adds a socket. */ public void addClientSocket(Instrumented<SocketStats> socket) { - add(clientSockets, socket); + add(otherSockets, socket); + } + + public void addListenSocket(Instrumented<SocketStats> socket) { + add(otherSockets, socket); } /** Adds a server socket. */ @@ -108,7 +112,11 @@ public final class Channelz { } public void removeClientSocket(Instrumented<SocketStats> socket) { - remove(clientSockets, socket); + remove(otherSockets, socket); + } + + public void removeListenSocket(Instrumented<SocketStats> socket) { + remove(otherSockets, socket); } /** Removes a server socket. */ @@ -174,7 +182,7 @@ public final class Channelz { /** Returns a socket. */ @Nullable public Instrumented<SocketStats> getSocket(long id) { - Instrumented<SocketStats> clientSocket = clientSockets.get(id); + Instrumented<SocketStats> clientSocket = otherSockets.get(id); if (clientSocket != null) { return clientSocket; } @@ -207,7 +215,7 @@ public final class Channelz { @VisibleForTesting public boolean containsClientSocket(LogId transportRef) { - return contains(clientSockets, transportRef); + return contains(otherSockets, transportRef); } private static <T extends Instrumented<?>> void add(Map<Long, T> map, T object) { @@ -263,7 +271,7 @@ public final class Channelz { public final long callsSucceeded; public final long callsFailed; public final long lastCallStartedMillis; - // TODO(zpencer): add listen sockets + public final List<Instrumented<SocketStats>> listenSockets; /** * Creates an instance. @@ -272,11 +280,13 @@ public final class Channelz { long callsStarted, long callsSucceeded, long callsFailed, - long lastCallStartedMillis) { + long lastCallStartedMillis, + List<Instrumented<SocketStats>> listenSockets) { this.callsStarted = callsStarted; this.callsSucceeded = callsSucceeded; this.callsFailed = callsFailed; this.lastCallStartedMillis = lastCallStartedMillis; + this.listenSockets = Preconditions.checkNotNull(listenSockets); } public static final class Builder { @@ -284,6 +294,7 @@ public final class Channelz { private long callsSucceeded; private long callsFailed; private long lastCallStartedMillis; + public List<Instrumented<SocketStats>> listenSockets = Collections.emptyList(); public Builder setCallsStarted(long callsStarted) { this.callsStarted = callsStarted; @@ -305,6 +316,14 @@ public final class Channelz { return this; } + /** Sets the listen sockets. */ + public Builder setListenSockets(List<Instrumented<SocketStats>> listenSockets) { + Preconditions.checkNotNull(listenSockets); + this.listenSockets = Collections.unmodifiableList( + new ArrayList<Instrumented<SocketStats>>(listenSockets)); + return this; + } + /** * Builds an instance. */ @@ -313,7 +332,8 @@ public final class Channelz { callsStarted, callsSucceeded, callsFailed, - lastCallStartedMillis); + lastCallStartedMillis, + listenSockets); } } } @@ -434,11 +454,11 @@ public final class Channelz { } public static final class SocketStats { - public final TransportStats data; + @Nullable public final TransportStats data; public final SocketAddress local; - public final SocketAddress remote; - public final Security security; + @Nullable public final SocketAddress remote; public final SocketOptions socketOptions; + @Nullable public final Security security; /** Creates an instance. */ public SocketStats( diff --git a/core/src/main/java/io/grpc/internal/InternalServer.java b/core/src/main/java/io/grpc/internal/InternalServer.java index 72e1c56d5..2937e1f19 100644 --- a/core/src/main/java/io/grpc/internal/InternalServer.java +++ b/core/src/main/java/io/grpc/internal/InternalServer.java @@ -16,7 +16,9 @@ package io.grpc.internal; +import io.grpc.internal.Channelz.SocketStats; import java.io.IOException; +import java.util.List; import javax.annotation.concurrent.ThreadSafe; /** @@ -47,4 +49,9 @@ public interface InternalServer { * available or does not make sense. */ int getPort(); + + /** + * Returns the listen sockets of this server. May return an empty list but never returns null. + */ + List<Instrumented<SocketStats>> getListenSockets(); } diff --git a/core/src/main/java/io/grpc/internal/ServerImpl.java b/core/src/main/java/io/grpc/internal/ServerImpl.java index 6f67bc581..f9f646b1f 100644 --- a/core/src/main/java/io/grpc/internal/ServerImpl.java +++ b/core/src/main/java/io/grpc/internal/ServerImpl.java @@ -571,7 +571,9 @@ public final class ServerImpl extends io.grpc.Server implements Instrumented<Ser @Override public ListenableFuture<ServerStats> getStats() { - ServerStats.Builder builder = new ServerStats.Builder(); + ServerStats.Builder builder + = new ServerStats.Builder() + .setListenSockets(transportServer.getListenSockets()); serverCallTracer.updateBuilder(builder); SettableFuture<ServerStats> ret = SettableFuture.create(); ret.set(builder.build()); diff --git a/core/src/test/java/io/grpc/internal/ServerImplTest.java b/core/src/test/java/io/grpc/internal/ServerImplTest.java index 2686fa88a..6ca82216c 100644 --- a/core/src/test/java/io/grpc/internal/ServerImplTest.java +++ b/core/src/test/java/io/grpc/internal/ServerImplTest.java @@ -81,6 +81,7 @@ import java.io.InputStream; import java.net.SocketAddress; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.LinkedList; import java.util.List; import java.util.concurrent.CyclicBarrier; @@ -1464,6 +1465,11 @@ public class ServerImplTest { } @Override + public List<Instrumented<SocketStats>> getListenSockets() { + return Collections.emptyList(); + } + + @Override public void shutdown() { listener.serverShutdown(); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 0b3c48b41..0020c54f5 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -50,6 +50,8 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException; import io.netty.util.AsciiString; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Map; @@ -339,7 +341,16 @@ class NettyClientTransport implements ConnectionClientTransport { Utils.getSocketOptions(channel), new Security())); } - }); + }) + .addListener( + new GenericFutureListener<Future<Object>>() { + @Override + public void operationComplete(Future<Object> future) throws Exception { + if (!future.isSuccess()) { + result.setException(future.cause()); + } + } + }); return result; } diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 81c09cb98..2e3dcaf5c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -21,7 +21,14 @@ import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.grpc.ServerStreamTracer; +import io.grpc.internal.Channelz; +import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.Instrumented; import io.grpc.internal.InternalServer; import io.grpc.internal.LogId; import io.grpc.internal.ServerListener; @@ -41,6 +48,8 @@ import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCounted; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -82,6 +91,10 @@ class NettyServer implements InternalServer, WithLogId { private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter(); private final List<ServerStreamTracer.Factory> streamTracerFactories; private final TransportTracer.Factory transportTracerFactory; + private final Channelz channelz; + // Only set once during start(). This code assumes all listen sockets are created at startup + // and never changed. In the future we may have >1 listen socket. + private ImmutableList<Instrumented<SocketStats>> listenSockets; NettyServer( SocketAddress address, Class<? extends ServerChannel> channelType, @@ -93,7 +106,8 @@ class NettyServer implements InternalServer, WithLogId { long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, - boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) { + boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, + Channelz channelz) { this.address = address; this.channelType = checkNotNull(channelType, "channelType"); checkNotNull(channelOptions, "channelOptions"); @@ -116,6 +130,7 @@ class NettyServer implements InternalServer, WithLogId { this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls; this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; + this.channelz = Preconditions.checkNotNull(channelz); } @Override @@ -131,6 +146,11 @@ class NettyServer implements InternalServer, WithLogId { } @Override + public List<Instrumented<SocketStats>> getListenSockets() { + return listenSockets; + } + + @Override public void start(ServerListener serverListener) throws IOException { listener = checkNotNull(serverListener, "serverListener"); @@ -220,7 +240,14 @@ class NettyServer implements InternalServer, WithLogId { } }); // Bind and start to accept incoming connections. - ChannelFuture future = b.bind(address); + ChannelFuture future = b.bind(address).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + Instrumented<SocketStats> listenSocket = new ListenSocket(f.channel()); + listenSockets = ImmutableList.of(listenSocket); + channelz.addListenSocket(listenSocket); + } + }); try { future.await(); } catch (InterruptedException ex) { @@ -245,6 +272,10 @@ class NettyServer implements InternalServer, WithLogId { if (!future.isSuccess()) { log.log(Level.WARNING, "Error shutting down server", future.cause()); } + for (Instrumented<SocketStats> listenSocket : listenSockets) { + channelz.removeListenSocket(listenSocket); + } + listenSockets = null; synchronized (NettyServer.this) { listener.serverShutdown(); } @@ -291,4 +322,61 @@ class NettyServer implements InternalServer, WithLogId { return this; } } + + /** + * A class that can answer channelz queries about the server listen sockets. + */ + private static final class ListenSocket implements Instrumented<SocketStats> { + private final LogId id = LogId.allocate(getClass().getName()); + private final Channel ch; + + ListenSocket(Channel ch) { + this.ch = ch; + } + + @Override + public ListenableFuture<SocketStats> getStats() { + final SettableFuture<SocketStats> ret = SettableFuture.create(); + if (ch.eventLoop().inEventLoop()) { + // This is necessary, otherwise we will block forever if we get the future from inside + // the event loop. + ret.set(new SocketStats( + /*data=*/ null, + ch.localAddress(), + /*remoteAddress=*/ null, + Utils.getSocketOptions(ch), + /*security=*/ null)); + return ret; + } + ch.eventLoop() + .submit( + new Runnable() { + @Override + public void run() { + ret.set(new SocketStats( + /*data=*/ null, + ch.localAddress(), + /*remoteAddress=*/ null, + Utils.getSocketOptions(ch), + /*security=*/ null)); + } + }) + .addListener( + new GenericFutureListener<Future<Object>>() { + @Override + public void operationComplete(Future<Object> future) throws Exception { + if (!future.isSuccess()) { + ret.setException(future.cause()); + } + } + }); + + return ret; + } + + @Override + public LogId getLogId() { + return id; + } + } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 8847de89d..4e7df6380 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -425,7 +425,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, - permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); + permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, channelz); } @Override diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index fe905bb2d..c76ca6673 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -33,6 +33,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -222,7 +224,16 @@ class NettyServerTransport implements ServerTransport { Utils.getSocketOptions(channel), /*security=*/ null)); } - }); + }) + .addListener( + new GenericFutureListener<Future<Object>>() { + @Override + public void operationComplete(Future<Object> future) throws Exception { + if (!future.isSuccess()) { + result.setException(future.cause()); + } + } + }); return result; } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 7e0723820..fd2120837 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -48,6 +48,7 @@ import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusException; +import io.grpc.internal.Channelz; import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransport; @@ -107,6 +108,7 @@ public class NettyClientTransportTest { private final List<NettyClientTransport> transports = new ArrayList<NettyClientTransport>(); private final NioEventLoopGroup group = new NioEventLoopGroup(1); private final EchoServerListener serverListener = new EchoServerListener(); + private final Channelz channelz = new Channelz(); private Runnable tooManyPingsRunnable = new Runnable() { // Throwing is useless in this method, because Netty doesn't propagate the exception @Override public void run() {} @@ -604,7 +606,8 @@ public class NettyClientTransportTest { DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, MAX_CONNECTION_IDLE_NANOS_DISABLED, - MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0); + MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0, + channelz); server.start(serverListener); address = TestUtils.testServerAddress(server.getPort()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index a984f71a3..e6d64f490 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -16,9 +16,18 @@ package io.grpc.netty; +import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.Channelz.id; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import com.google.common.util.concurrent.SettableFuture; import io.grpc.ServerStreamTracer; +import io.grpc.internal.Channelz; +import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.Instrumented; import io.grpc.internal.ServerListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; @@ -41,6 +50,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class NettyServerTest { + private final Channelz channelz = new Channelz(); @Test public void getPort() throws Exception { @@ -61,7 +71,8 @@ public class NettyServerTest { 1, // ignore 1, 1, // ignore 1, 1, // ignore - true, 0); // ignore + true, 0, // ignore + channelz); ns.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { @@ -98,7 +109,8 @@ public class NettyServerTest { 1, // ignore 1, 1, // ignore 1, 1, // ignore - true, 0); // ignore + true, 0, // ignore + channelz); assertThat(ns.getPort()).isEqualTo(-1); } @@ -135,7 +147,8 @@ public class NettyServerTest { 1, // ignore 1, 1, // ignore 1, 1, // ignore - true, 0); // ignore + true, 0, // ignore + channelz); ns.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { @@ -164,4 +177,59 @@ public class NettyServerTest { ns.shutdown(); } + + @Test + public void channelzListenSocket() throws Exception { + InetSocketAddress addr = new InetSocketAddress(0); + NettyServer ns = new NettyServer( + addr, + NioServerSocketChannel.class, + new HashMap<ChannelOption<?>, Object>(), + null, // no boss group + null, // no event group + new ProtocolNegotiators.PlaintextNegotiator(), + Collections.<ServerStreamTracer.Factory>emptyList(), + TransportTracer.getDefaultFactory(), + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, 1, // ignore + 1, 1, // ignore + true, 0, // ignore + channelz); + final SettableFuture<Void> shutdownCompleted = SettableFuture.create(); + ns.start(new ServerListener() { + @Override + public ServerTransportListener transportCreated(ServerTransport transport) { + return null; + } + + @Override + public void serverShutdown() { + shutdownCompleted.set(null); + } + }); + assertThat(ns.getPort()).isGreaterThan(0); + + Instrumented<SocketStats> listenSocket = getOnlyElement(ns.getListenSockets()); + assertSame(listenSocket, channelz.getSocket(id(listenSocket))); + + // very basic sanity check of the contents + SocketStats socketStats = listenSocket.getStats().get(); + assertEquals(ns.getPort(), ((InetSocketAddress) socketStats.local).getPort()); + assertNull(socketStats.remote); + + // TODO(zpencer): uncomment when sock options are exposed + // by default, there are some socket options set on the listen socket + // assertThat(socketStats.socketOptions.additional).isNotEmpty(); + + // Cleanup + ns.shutdown(); + shutdownCompleted.get(); + + // listen socket is removed + assertNull(channelz.getSocket(id(listenSocket))); + } } diff --git a/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java index ca8f58228..dea047d04 100644 --- a/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java +++ b/services/src/main/java/io/grpc/services/ChannelzProtoUtil.java @@ -40,6 +40,7 @@ import io.grpc.channelz.v1.Server; import io.grpc.channelz.v1.ServerData; import io.grpc.channelz.v1.ServerRef; import io.grpc.channelz.v1.Socket; +import io.grpc.channelz.v1.Socket.Builder; import io.grpc.channelz.v1.SocketData; import io.grpc.channelz.v1.SocketOption; import io.grpc.channelz.v1.SocketOptionLinger; @@ -106,11 +107,14 @@ final class ChannelzProtoUtil { static Server toServer(Instrumented<ServerStats> obj) { ServerStats stats = getFuture(obj.getStats()); - return Server + Server.Builder builder = Server .newBuilder() .setRef(toServerRef(obj)) - .setData(toServerData(stats)) - .build(); + .setData(toServerData(stats)); + for (Instrumented<SocketStats> listenSocket : stats.listenSockets) { + builder.addListenSocket(toSocketRef(listenSocket)); + } + return builder.build(); } static ServerData toServerData(ServerStats stats) { @@ -125,15 +129,21 @@ final class ChannelzProtoUtil { static Socket toSocket(Instrumented<SocketStats> obj) { SocketStats socketStats = getFuture(obj.getStats()); - return Socket.newBuilder() + Builder builder = Socket.newBuilder() .setRef(toSocketRef(obj)) - .setRemote(toAddress(socketStats.remote)) - .setLocal(toAddress(socketStats.local)) - .setData(extractSocketData(socketStats)) - .build(); + .setLocal(toAddress(socketStats.local)); + // listen sockets do not have remote nor data + if (socketStats.remote != null) { + builder.setRemote(toAddress(socketStats.remote)); + } + if (socketStats.data != null) { + builder.setData(extractSocketData(socketStats)); + } + return builder.build(); } static Address toAddress(SocketAddress address) { + Preconditions.checkNotNull(address); Address.Builder builder = Address.newBuilder(); if (address instanceof InetSocketAddress) { InetSocketAddress inetAddress = (InetSocketAddress) address; @@ -142,6 +152,7 @@ final class ChannelzProtoUtil { .newBuilder() .setIpAddress( ByteString.copyFrom(inetAddress.getAddress().getAddress())) + .setPort(inetAddress.getPort()) .build()); } else if (address.getClass().getName().endsWith("io.netty.channel.unix.DomainSocketAddress")) { builder.setUdsAddress( diff --git a/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java index 83e3c6dad..e3697f4b9 100644 --- a/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java +++ b/services/src/test/java/io/grpc/services/ChannelzProtoUtilTest.java @@ -17,6 +17,7 @@ package io.grpc.services; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.Channelz.id; import static org.junit.Assert.assertEquals; import com.google.common.collect.ImmutableList; @@ -55,9 +56,11 @@ import io.grpc.internal.Channelz.ServerList; import io.grpc.internal.Channelz.ServerSocketsList; import io.grpc.internal.Channelz.ServerStats; import io.grpc.internal.Channelz.SocketOptions; +import io.grpc.internal.Channelz.SocketStats; import io.grpc.internal.Instrumented; import io.grpc.internal.WithLogId; import io.grpc.services.ChannelzTestHelper.TestChannel; +import io.grpc.services.ChannelzTestHelper.TestListenSocket; import io.grpc.services.ChannelzTestHelper.TestServer; import io.grpc.services.ChannelzTestHelper.TestSocket; import io.netty.channel.unix.DomainSocketAddress; @@ -168,6 +171,23 @@ public final class ChannelzProtoUtilTest { .setValue("some-made-up-value") .build(); + private final TestListenSocket listenSocket = new TestListenSocket(); + private final SocketRef listenSocketRef = SocketRef + .newBuilder() + .setName(listenSocket.toString()) + .setSocketId(id(listenSocket)) + .build(); + private final Address listenAddress = Address + .newBuilder() + .setTcpipAddress( + TcpIpAddress + .newBuilder() + .setIpAddress(ByteString.copyFrom( + ((InetSocketAddress) listenSocket.listenAddress).getAddress().getAddress())) + .setPort(1234) + .build()) + .build(); + private final TestSocket socket = new TestSocket(); private final SocketRef socketRef = SocketRef .newBuilder() @@ -196,6 +216,7 @@ public final class ChannelzProtoUtilTest { .newBuilder() .setIpAddress(ByteString.copyFrom( ((InetSocketAddress) socket.local).getAddress().getAddress())) + .setPort(1000) .build()) .build(); private final Address remoteAddress = Address @@ -205,6 +226,7 @@ public final class ChannelzProtoUtilTest { .newBuilder() .setIpAddress(ByteString.copyFrom( ((InetSocketAddress) socket.remote).getAddress().getAddress())) + .setPort(1000) .build()) .build(); @@ -271,6 +293,26 @@ public final class ChannelzProtoUtilTest { } @Test + public void toSocket_listenSocket() { + assertEquals( + Socket + .newBuilder() + .setRef(listenSocketRef) + .setLocal(listenAddress) + .build(), + ChannelzProtoUtil.toSocket(listenSocket)); + } + + @Test + public void toSocketData() throws Exception { + assertEquals( + socketDataNoSockOpts + .toBuilder() + .build(), + ChannelzProtoUtil.extractSocketData(socket.getStats().get())); + } + + @Test public void toAddress_inet() throws Exception { InetSocketAddress inet4 = new InetSocketAddress(Inet4Address.getByName("10.0.0.1"), 1000); assertEquals( @@ -278,6 +320,7 @@ public final class ChannelzProtoUtilTest { TcpIpAddress .newBuilder() .setIpAddress(ByteString.copyFrom(inet4.getAddress().getAddress())) + .setPort(1000) .build()) .build(), ChannelzProtoUtil.toAddress(inet4)); @@ -318,7 +361,34 @@ public final class ChannelzProtoUtilTest { @Test public void toServer() throws Exception { + // no listen sockets assertEquals(serverProto, ChannelzProtoUtil.toServer(server)); + + // 1 listen socket + server.serverStats = toBuilder(server.serverStats) + .setListenSockets(ImmutableList.<Instrumented<SocketStats>>of(listenSocket)) + .build(); + assertEquals( + serverProto + .toBuilder() + .addListenSocket(listenSocketRef) + .build(), + ChannelzProtoUtil.toServer(server)); + + // multiple listen sockets + TestListenSocket otherListenSocket = new TestListenSocket(); + SocketRef otherListenSocketRef = ChannelzProtoUtil.toSocketRef(otherListenSocket); + server.serverStats = toBuilder(server.serverStats) + .setListenSockets( + ImmutableList.<Instrumented<SocketStats>>of(listenSocket, otherListenSocket)) + .build(); + assertEquals( + serverProto + .toBuilder() + .addListenSocket(listenSocketRef) + .addListenSocket(otherListenSocketRef) + .build(), + ChannelzProtoUtil.toServer(server)); } @Test @@ -612,6 +682,7 @@ public final class ChannelzProtoUtilTest { return builder; } + private static SocketOptions.Builder toBuilder(SocketOptions options) { SocketOptions.Builder builder = new SocketOptions.Builder() .setSocketOptionTimeoutMillis(options.soTimeoutMillis) @@ -621,4 +692,13 @@ public final class ChannelzProtoUtilTest { } return builder; } + + private static ServerStats.Builder toBuilder(ServerStats stats) { + return new ServerStats.Builder() + .setCallsStarted(stats.callsStarted) + .setCallsSucceeded(stats.callsSucceeded) + .setCallsFailed(stats.callsFailed) + .setLastCallStartedMillis(stats.lastCallStartedMillis) + .setListenSockets(stats.listenSockets); + } } diff --git a/services/src/test/java/io/grpc/services/ChannelzTestHelper.java b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java index baadc10f3..4530a7cc7 100644 --- a/services/src/test/java/io/grpc/services/ChannelzTestHelper.java +++ b/services/src/test/java/io/grpc/services/ChannelzTestHelper.java @@ -23,6 +23,7 @@ import io.grpc.internal.Channelz; import io.grpc.internal.Channelz.ChannelStats; import io.grpc.internal.Channelz.Security; import io.grpc.internal.Channelz.ServerStats; +import io.grpc.internal.Channelz.SocketOptions; import io.grpc.internal.Channelz.SocketStats; import io.grpc.internal.Channelz.TransportStats; import io.grpc.internal.Instrumented; @@ -76,13 +77,37 @@ final class ChannelzTestHelper { } } + static final class TestListenSocket implements Instrumented<SocketStats> { + private final LogId id = LogId.allocate("listensocket"); + SocketAddress listenAddress = new InetSocketAddress("10.0.0.1", 1234); + + @Override + public ListenableFuture<SocketStats> getStats() { + SettableFuture<SocketStats> ret = SettableFuture.create(); + ret.set( + new SocketStats( + /*data=*/ null, + listenAddress, + /*remoteAddress=*/ null, + new SocketOptions.Builder().build(), + /*security=*/ null)); + return ret; + } + + @Override + public LogId getLogId() { + return id; + } + } + static final class TestServer implements Instrumented<ServerStats> { private final LogId id = LogId.allocate("server"); ServerStats serverStats = new ServerStats( - /*callsStarted=*/ 1, - /*callsSucceeded=*/ 2, - /*callsFailed=*/ 3, - /*lastCallStartedMillis=*/ 4); + /*callsStarted=*/ 1, + /*callsSucceeded=*/ 2, + /*callsFailed=*/ 3, + /*lastCallStartedMillis=*/ 4, + Collections.<Instrumented<SocketStats>>emptyList()); @Override public ListenableFuture<ServerStats> getStats() { |