aboutsummaryrefslogtreecommitdiff
path: root/netty
diff options
context:
space:
mode:
authorzpencer <spencerfang@google.com>2018-04-03 10:05:47 -0700
committerGitHub <noreply@github.com>2018-04-03 10:05:47 -0700
commit724e32fe57db8ba21a180632cb144d5af0a814c4 (patch)
tree6e6d4fe3d04681e12d845a24388d489c6c9a522e /netty
parentd99d8d99d8c878ab53e29d920aa589eadd7fb213 (diff)
downloadgrpc-grpc-java-724e32fe57db8ba21a180632cb144d5af0a814c4.tar.gz
core,netty,services: add server listen sockets to channelz proto service (#4220)
Server listen sockets differ from normal sockets in that they do not have a remote address, do not have stats on calls started/failed/etc, and do not have security info.
Diffstat (limited to 'netty')
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientTransport.java13
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServer.java92
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServerBuilder.java2
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServerTransport.java13
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java5
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyServerTest.java74
6 files changed, 190 insertions, 9 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 0b3c48b41..0020c54f5 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -50,6 +50,8 @@ import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
import io.netty.handler.codec.http2.StreamBufferingEncoder.Http2ChannelClosedException;
import io.netty.util.AsciiString;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
@@ -339,7 +341,16 @@ class NettyClientTransport implements ConnectionClientTransport {
Utils.getSocketOptions(channel),
new Security()));
}
- });
+ })
+ .addListener(
+ new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess()) {
+ result.setException(future.cause());
+ }
+ }
+ });
return result;
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index 81c09cb98..2e3dcaf5c 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -21,7 +21,14 @@ import static io.grpc.netty.NettyServerBuilder.MAX_CONNECTION_AGE_NANOS_DISABLED
import static io.netty.channel.ChannelOption.SO_BACKLOG;
import static io.netty.channel.ChannelOption.SO_KEEPALIVE;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.util.concurrent.ListenableFuture;
+import com.google.common.util.concurrent.SettableFuture;
import io.grpc.ServerStreamTracer;
+import io.grpc.internal.Channelz;
+import io.grpc.internal.Channelz.SocketStats;
+import io.grpc.internal.Instrumented;
import io.grpc.internal.InternalServer;
import io.grpc.internal.LogId;
import io.grpc.internal.ServerListener;
@@ -41,6 +48,8 @@ import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import io.netty.util.AbstractReferenceCounted;
import io.netty.util.ReferenceCounted;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
@@ -82,6 +91,10 @@ class NettyServer implements InternalServer, WithLogId {
private final ReferenceCounted eventLoopReferenceCounter = new EventLoopReferenceCounter();
private final List<ServerStreamTracer.Factory> streamTracerFactories;
private final TransportTracer.Factory transportTracerFactory;
+ private final Channelz channelz;
+ // Only set once during start(). This code assumes all listen sockets are created at startup
+ // and never changed. In the future we may have >1 listen socket.
+ private ImmutableList<Instrumented<SocketStats>> listenSockets;
NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
@@ -93,7 +106,8 @@ class NettyServer implements InternalServer, WithLogId {
long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos,
long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
- boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) {
+ boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos,
+ Channelz channelz) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
checkNotNull(channelOptions, "channelOptions");
@@ -116,6 +130,7 @@ class NettyServer implements InternalServer, WithLogId {
this.maxConnectionAgeGraceInNanos = maxConnectionAgeGraceInNanos;
this.permitKeepAliveWithoutCalls = permitKeepAliveWithoutCalls;
this.permitKeepAliveTimeInNanos = permitKeepAliveTimeInNanos;
+ this.channelz = Preconditions.checkNotNull(channelz);
}
@Override
@@ -131,6 +146,11 @@ class NettyServer implements InternalServer, WithLogId {
}
@Override
+ public List<Instrumented<SocketStats>> getListenSockets() {
+ return listenSockets;
+ }
+
+ @Override
public void start(ServerListener serverListener) throws IOException {
listener = checkNotNull(serverListener, "serverListener");
@@ -220,7 +240,14 @@ class NettyServer implements InternalServer, WithLogId {
}
});
// Bind and start to accept incoming connections.
- ChannelFuture future = b.bind(address);
+ ChannelFuture future = b.bind(address).addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture f) throws Exception {
+ Instrumented<SocketStats> listenSocket = new ListenSocket(f.channel());
+ listenSockets = ImmutableList.of(listenSocket);
+ channelz.addListenSocket(listenSocket);
+ }
+ });
try {
future.await();
} catch (InterruptedException ex) {
@@ -245,6 +272,10 @@ class NettyServer implements InternalServer, WithLogId {
if (!future.isSuccess()) {
log.log(Level.WARNING, "Error shutting down server", future.cause());
}
+ for (Instrumented<SocketStats> listenSocket : listenSockets) {
+ channelz.removeListenSocket(listenSocket);
+ }
+ listenSockets = null;
synchronized (NettyServer.this) {
listener.serverShutdown();
}
@@ -291,4 +322,61 @@ class NettyServer implements InternalServer, WithLogId {
return this;
}
}
+
+ /**
+ * A class that can answer channelz queries about the server listen sockets.
+ */
+ private static final class ListenSocket implements Instrumented<SocketStats> {
+ private final LogId id = LogId.allocate(getClass().getName());
+ private final Channel ch;
+
+ ListenSocket(Channel ch) {
+ this.ch = ch;
+ }
+
+ @Override
+ public ListenableFuture<SocketStats> getStats() {
+ final SettableFuture<SocketStats> ret = SettableFuture.create();
+ if (ch.eventLoop().inEventLoop()) {
+ // This is necessary, otherwise we will block forever if we get the future from inside
+ // the event loop.
+ ret.set(new SocketStats(
+ /*data=*/ null,
+ ch.localAddress(),
+ /*remoteAddress=*/ null,
+ Utils.getSocketOptions(ch),
+ /*security=*/ null));
+ return ret;
+ }
+ ch.eventLoop()
+ .submit(
+ new Runnable() {
+ @Override
+ public void run() {
+ ret.set(new SocketStats(
+ /*data=*/ null,
+ ch.localAddress(),
+ /*remoteAddress=*/ null,
+ Utils.getSocketOptions(ch),
+ /*security=*/ null));
+ }
+ })
+ .addListener(
+ new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess()) {
+ ret.setException(future.cause());
+ }
+ }
+ });
+
+ return ret;
+ }
+
+ @Override
+ public LogId getLogId() {
+ return id;
+ }
+ }
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index 8847de89d..4e7df6380 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -425,7 +425,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
maxConnectionIdleInNanos,
maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
- permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
+ permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos, channelz);
}
@Override
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
index fe905bb2d..c76ca6673 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerTransport.java
@@ -33,6 +33,8 @@ import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelPromise;
+import io.netty.util.concurrent.Future;
+import io.netty.util.concurrent.GenericFutureListener;
import java.io.IOException;
import java.util.List;
import java.util.concurrent.ScheduledExecutorService;
@@ -222,7 +224,16 @@ class NettyServerTransport implements ServerTransport {
Utils.getSocketOptions(channel),
/*security=*/ null));
}
- });
+ })
+ .addListener(
+ new GenericFutureListener<Future<Object>>() {
+ @Override
+ public void operationComplete(Future<Object> future) throws Exception {
+ if (!future.isSuccess()) {
+ result.setException(future.cause());
+ }
+ }
+ });
return result;
}
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 7e0723820..fd2120837 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -48,6 +48,7 @@ import io.grpc.ServerStreamTracer;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.StatusException;
+import io.grpc.internal.Channelz;
import io.grpc.internal.ClientStream;
import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.ClientTransport;
@@ -107,6 +108,7 @@ public class NettyClientTransportTest {
private final List<NettyClientTransport> transports = new ArrayList<NettyClientTransport>();
private final NioEventLoopGroup group = new NioEventLoopGroup(1);
private final EchoServerListener serverListener = new EchoServerListener();
+ private final Channelz channelz = new Channelz();
private Runnable tooManyPingsRunnable = new Runnable() {
// Throwing is useless in this method, because Netty doesn't propagate the exception
@Override public void run() {}
@@ -604,7 +606,8 @@ public class NettyClientTransportTest {
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
MAX_CONNECTION_IDLE_NANOS_DISABLED,
- MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0);
+ MAX_CONNECTION_AGE_NANOS_DISABLED, MAX_CONNECTION_AGE_GRACE_NANOS_INFINITE, true, 0,
+ channelz);
server.start(serverListener);
address = TestUtils.testServerAddress(server.getPort());
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index a984f71a3..e6d64f490 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -16,9 +16,18 @@
package io.grpc.netty;
+import static com.google.common.collect.Iterables.getOnlyElement;
import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.internal.Channelz.id;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import com.google.common.util.concurrent.SettableFuture;
import io.grpc.ServerStreamTracer;
+import io.grpc.internal.Channelz;
+import io.grpc.internal.Channelz.SocketStats;
+import io.grpc.internal.Instrumented;
import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
@@ -41,6 +50,7 @@ import org.junit.runners.JUnit4;
@RunWith(JUnit4.class)
public class NettyServerTest {
+ private final Channelz channelz = new Channelz();
@Test
public void getPort() throws Exception {
@@ -61,7 +71,8 @@ public class NettyServerTest {
1, // ignore
1, 1, // ignore
1, 1, // ignore
- true, 0); // ignore
+ true, 0, // ignore
+ channelz);
ns.start(new ServerListener() {
@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
@@ -98,7 +109,8 @@ public class NettyServerTest {
1, // ignore
1, 1, // ignore
1, 1, // ignore
- true, 0); // ignore
+ true, 0, // ignore
+ channelz);
assertThat(ns.getPort()).isEqualTo(-1);
}
@@ -135,7 +147,8 @@ public class NettyServerTest {
1, // ignore
1, 1, // ignore
1, 1, // ignore
- true, 0); // ignore
+ true, 0, // ignore
+ channelz);
ns.start(new ServerListener() {
@Override
public ServerTransportListener transportCreated(ServerTransport transport) {
@@ -164,4 +177,59 @@ public class NettyServerTest {
ns.shutdown();
}
+
+ @Test
+ public void channelzListenSocket() throws Exception {
+ InetSocketAddress addr = new InetSocketAddress(0);
+ NettyServer ns = new NettyServer(
+ addr,
+ NioServerSocketChannel.class,
+ new HashMap<ChannelOption<?>, Object>(),
+ null, // no boss group
+ null, // no event group
+ new ProtocolNegotiators.PlaintextNegotiator(),
+ Collections.<ServerStreamTracer.Factory>emptyList(),
+ TransportTracer.getDefaultFactory(),
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, 1, // ignore
+ 1, 1, // ignore
+ true, 0, // ignore
+ channelz);
+ final SettableFuture<Void> shutdownCompleted = SettableFuture.create();
+ ns.start(new ServerListener() {
+ @Override
+ public ServerTransportListener transportCreated(ServerTransport transport) {
+ return null;
+ }
+
+ @Override
+ public void serverShutdown() {
+ shutdownCompleted.set(null);
+ }
+ });
+ assertThat(ns.getPort()).isGreaterThan(0);
+
+ Instrumented<SocketStats> listenSocket = getOnlyElement(ns.getListenSockets());
+ assertSame(listenSocket, channelz.getSocket(id(listenSocket)));
+
+ // very basic sanity check of the contents
+ SocketStats socketStats = listenSocket.getStats().get();
+ assertEquals(ns.getPort(), ((InetSocketAddress) socketStats.local).getPort());
+ assertNull(socketStats.remote);
+
+ // TODO(zpencer): uncomment when sock options are exposed
+ // by default, there are some socket options set on the listen socket
+ // assertThat(socketStats.socketOptions.additional).isNotEmpty();
+
+ // Cleanup
+ ns.shutdown();
+ shutdownCompleted.get();
+
+ // listen socket is removed
+ assertNull(channelz.getSocket(id(listenSocket)));
+ }
}