diff options
author | zpencer <spencerfang@google.com> | 2018-04-03 10:05:47 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-04-03 10:05:47 -0700 |
commit | 724e32fe57db8ba21a180632cb144d5af0a814c4 (patch) | |
tree | 6e6d4fe3d04681e12d845a24388d489c6c9a522e /netty | |
parent | d99d8d99d8c878ab53e29d920aa589eadd7fb213 (diff) | |
download | grpc-grpc-java-724e32fe57db8ba21a180632cb144d5af0a814c4.tar.gz |
core,netty,services: add server listen sockets to channelz proto service (#4220)
Server listen sockets differ from normal sockets in that they do not
have a remote address, do not have stats on calls started/failed/etc,
and do not have security info.
Diffstat (limited to 'netty')
6 files changed, 190 insertions, 9 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java index 0b3c48b41..0020c54f5 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java @@ -50,6 +50,8 @@ import io.netty.channel.EventLoopGroup; import io.netty.channel.socket.nio.NioSocketChannel; import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException; import io.netty.util.AsciiString; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.net.SocketAddress; import java.nio.channels.ClosedChannelException; import java.util.Map; @@ -339,7 +341,16 @@ class NettyClientTransport implements ConnectionClientTransport { Utils.getSocketOptions(channel), new Security())); } - }); + }) + .addListener( + new GenericFutureListener<Future<Object>>() { + @Override + public void operationComplete(Future<Object> future) throws Exception { + if (!future.isSuccess()) { + result.setException(future.cause()); + } + } + }); return result; } diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 81c09cb98..2e3dcaf5c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -21,7 +21,14 @@ import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED import static io.netty.channel.ChannelOption.SO_BACKLOG; import static io.netty.channel.ChannelOption.SO_KEEPALIVE; +import com.google.common.base.Preconditions; +import com.google.common.collect.ImmutableList; +import com.google.common.util.concurrent.ListenableFuture; +import com.google.common.util.concurrent.SettableFuture; import io.grpc.ServerStreamTracer; +import io.grpc.internal.Channelz; +import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.Instrumented; import io.grpc.internal.InternalServer; import io.grpc.internal.LogId; import io.grpc.internal.ServerListener; @@ -41,6 +48,8 @@ import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; import io.netty.util.AbstractReferenceCounted; import io.netty.util.ReferenceCounted; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; @@ -82,6 +91,10 @@ class NettyServer implements InternalServer, WithLogId { private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter(); private final List<ServerStreamTracer.Factory> streamTracerFactories; private final TransportTracer.Factory transportTracerFactory; + private final Channelz channelz; + // Only set once during start(). This code assumes all listen sockets are created at startup + // and never changed. In the future we may have >1 listen socket. + private ImmutableList<Instrumented<SocketStats>> listenSockets; NettyServer( SocketAddress address, Class<? extends ServerChannel> channelType, @@ -93,7 +106,8 @@ class NettyServer implements InternalServer, WithLogId { long keepAliveTimeInNanos, long keepAliveTimeoutInNanos, long maxConnectionIdleInNanos, long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos, - boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) { + boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos, + Channelz channelz) { this.address = address; this.channelType = checkNotNull(channelType, "channelType"); checkNotNull(channelOptions, "channelOptions"); @@ -116,6 +130,7 @@ class NettyServer implements InternalServer, WithLogId { this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos; this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls; this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos; + this.channelz = Preconditions.checkNotNull(channelz); } @Override @@ -131,6 +146,11 @@ class NettyServer implements InternalServer, WithLogId { } @Override + public List<Instrumented<SocketStats>> getListenSockets() { + return listenSockets; + } + + @Override public void start(ServerListener serverListener) throws IOException { listener = checkNotNull(serverListener, "serverListener"); @@ -220,7 +240,14 @@ class NettyServer implements InternalServer, WithLogId { } }); // Bind and start to accept incoming connections. - ChannelFuture future = b.bind(address); + ChannelFuture future = b.bind(address).addListener(new ChannelFutureListener() { + @Override + public void operationComplete(ChannelFuture f) throws Exception { + Instrumented<SocketStats> listenSocket = new ListenSocket(f.channel()); + listenSockets = ImmutableList.of(listenSocket); + channelz.addListenSocket(listenSocket); + } + }); try { future.await(); } catch (InterruptedException ex) { @@ -245,6 +272,10 @@ class NettyServer implements InternalServer, WithLogId { if (!future.isSuccess()) { log.log(Level.WARNING, "Error shutting down server", future.cause()); } + for (Instrumented<SocketStats> listenSocket : listenSockets) { + channelz.removeListenSocket(listenSocket); + } + listenSockets = null; synchronized (NettyServer.this) { listener.serverShutdown(); } @@ -291,4 +322,61 @@ class NettyServer implements InternalServer, WithLogId { return this; } } + + /** + * A class that can answer channelz queries about the server listen sockets. + */ + private static final class ListenSocket implements Instrumented<SocketStats> { + private final LogId id = LogId.allocate(getClass().getName()); + private final Channel ch; + + ListenSocket(Channel ch) { + this.ch = ch; + } + + @Override + public ListenableFuture<SocketStats> getStats() { + final SettableFuture<SocketStats> ret = SettableFuture.create(); + if (ch.eventLoop().inEventLoop()) { + // This is necessary, otherwise we will block forever if we get the future from inside + // the event loop. + ret.set(new SocketStats( + /*data=*/ null, + ch.localAddress(), + /*remoteAddress=*/ null, + Utils.getSocketOptions(ch), + /*security=*/ null)); + return ret; + } + ch.eventLoop() + .submit( + new Runnable() { + @Override + public void run() { + ret.set(new SocketStats( + /*data=*/ null, + ch.localAddress(), + /*remoteAddress=*/ null, + Utils.getSocketOptions(ch), + /*security=*/ null)); + } + }) + .addListener( + new GenericFutureListener<Future<Object>>() { + @Override + public void operationComplete(Future<Object> future) throws Exception { + if (!future.isSuccess()) { + ret.setException(future.cause()); + } + } + }); + + return ret; + } + + @Override + public LogId getLogId() { + return id; + } + } } diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 8847de89d..4e7df6380 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -425,7 +425,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, maxConnectionIdleInNanos, maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos, - permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos); + permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, channelz); } @Override diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java index fe905bb2d..c76ca6673 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java @@ -33,6 +33,8 @@ import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelHandler; import io.netty.channel.ChannelPromise; +import io.netty.util.concurrent.Future; +import io.netty.util.concurrent.GenericFutureListener; import java.io.IOException; import java.util.List; import java.util.concurrent.ScheduledExecutorService; @@ -222,7 +224,16 @@ class NettyServerTransport implements ServerTransport { Utils.getSocketOptions(channel), /*security=*/ null)); } - }); + }) + .addListener( + new GenericFutureListener<Future<Object>>() { + @Override + public void operationComplete(Future<Object> future) throws Exception { + if (!future.isSuccess()) { + result.setException(future.cause()); + } + } + }); return result; } diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index 7e0723820..fd2120837 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -48,6 +48,7 @@ import io.grpc.ServerStreamTracer; import io.grpc.Status; import io.grpc.Status.Code; import io.grpc.StatusException; +import io.grpc.internal.Channelz; import io.grpc.internal.ClientStream; import io.grpc.internal.ClientStreamListener; import io.grpc.internal.ClientTransport; @@ -107,6 +108,7 @@ public class NettyClientTransportTest { private final List<NettyClientTransport> transports = new ArrayList<NettyClientTransport>(); private final NioEventLoopGroup group = new NioEventLoopGroup(1); private final EchoServerListener serverListener = new EchoServerListener(); + private final Channelz channelz = new Channelz(); private Runnable tooManyPingsRunnable = new Runnable() { // Throwing is useless in this method, because Netty doesn't propagate the exception @Override public void run() {} @@ -604,7 +606,8 @@ public class NettyClientTransportTest { DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize, DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS, MAX_CONNECTION_IDLE_NANOS_DISABLED, - MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0); + MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0, + channelz); server.start(serverListener); address = TestUtils.testServerAddress(server.getPort()); authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort()); diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index a984f71a3..e6d64f490 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -16,9 +16,18 @@ package io.grpc.netty; +import static com.google.common.collect.Iterables.getOnlyElement; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.Channelz.id; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import com.google.common.util.concurrent.SettableFuture; import io.grpc.ServerStreamTracer; +import io.grpc.internal.Channelz; +import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.Instrumented; import io.grpc.internal.ServerListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; @@ -41,6 +50,7 @@ import org.junit.runners.JUnit4; @RunWith(JUnit4.class) public class NettyServerTest { + private final Channelz channelz = new Channelz(); @Test public void getPort() throws Exception { @@ -61,7 +71,8 @@ public class NettyServerTest { 1, // ignore 1, 1, // ignore 1, 1, // ignore - true, 0); // ignore + true, 0, // ignore + channelz); ns.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { @@ -98,7 +109,8 @@ public class NettyServerTest { 1, // ignore 1, 1, // ignore 1, 1, // ignore - true, 0); // ignore + true, 0, // ignore + channelz); assertThat(ns.getPort()).isEqualTo(-1); } @@ -135,7 +147,8 @@ public class NettyServerTest { 1, // ignore 1, 1, // ignore 1, 1, // ignore - true, 0); // ignore + true, 0, // ignore + channelz); ns.start(new ServerListener() { @Override public ServerTransportListener transportCreated(ServerTransport transport) { @@ -164,4 +177,59 @@ public class NettyServerTest { ns.shutdown(); } + + @Test + public void channelzListenSocket() throws Exception { + InetSocketAddress addr = new InetSocketAddress(0); + NettyServer ns = new NettyServer( + addr, + NioServerSocketChannel.class, + new HashMap<ChannelOption<?>, Object>(), + null, // no boss group + null, // no event group + new ProtocolNegotiators.PlaintextNegotiator(), + Collections.<ServerStreamTracer.Factory>emptyList(), + TransportTracer.getDefaultFactory(), + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, 1, // ignore + 1, 1, // ignore + true, 0, // ignore + channelz); + final SettableFuture<Void> shutdownCompleted = SettableFuture.create(); + ns.start(new ServerListener() { + @Override + public ServerTransportListener transportCreated(ServerTransport transport) { + return null; + } + + @Override + public void serverShutdown() { + shutdownCompleted.set(null); + } + }); + assertThat(ns.getPort()).isGreaterThan(0); + + Instrumented<SocketStats> listenSocket = getOnlyElement(ns.getListenSockets()); + assertSame(listenSocket, channelz.getSocket(id(listenSocket))); + + // very basic sanity check of the contents + SocketStats socketStats = listenSocket.getStats().get(); + assertEquals(ns.getPort(), ((InetSocketAddress) socketStats.local).getPort()); + assertNull(socketStats.remote); + + // TODO(zpencer): uncomment when sock options are exposed + // by default, there are some socket options set on the listen socket + // assertThat(socketStats.socketOptions.additional).isNotEmpty(); + + // Cleanup + ns.shutdown(); + shutdownCompleted.get(); + + // listen socket is removed + assertNull(channelz.getSocket(id(listenSocket))); + } } |