diff options
author | Xiaoshuang LU <luxiaoshuang@qiyi.com> | 2017-11-23 08:00:33 +0800 |
---|---|---|
committer | Carl Mastrangelo <notcarl@google.com> | 2017-11-22 16:00:33 -0800 |
commit | cdb1f54794e647848dbccf5233b47b1935529813 (patch) | |
tree | 2ffe367717797ab5b12185e51e52d3add09ff53a /netty | |
parent | 7fd199f32e1ef78a98ebb1b76de7b63fea6d2d90 (diff) | |
download | grpc-grpc-java-cdb1f54794e647848dbccf5233b47b1935529813.tar.gz |
netty: make server sockets be configurable
Diffstat (limited to 'netty')
4 files changed, 108 insertions, 2 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 6fb594ad2..d09fffb61 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -34,6 +34,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -42,7 +43,9 @@ import io.netty.util.ReferenceCounted; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -56,6 +59,7 @@ class NettyServer implements InternalServer, WithLogId { private final LogId logId = LogId.allocate(getClass().getName()); private final SocketAddress address; private final Class<? extends ServerChannel> channelType; + private final Map<ChannelOption<?>, ?> channelOptions; private final ProtocolNegotiator protocolNegotiator; private final int maxStreamsPerConnection; private final boolean usingSharedBossGroup; @@ -80,6 +84,7 @@ class NettyServer implements InternalServer, WithLogId { NettyServer( SocketAddress address, Class<? extends ServerChannel> channelType, + Map<ChannelOption<?>, ?> channelOptions, @Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup, ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories, TransportTracer.Factory transportTracerFactory, @@ -90,6 +95,8 @@ class NettyServer implements InternalServer, WithLogId { boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) { this.address = address; this.channelType = checkNotNull(channelType, "channelType"); + checkNotNull(channelOptions, "channelOptions"); + this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions); this.bossGroup = bossGroup; this.workerGroup = workerGroup; this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator"); @@ -136,6 +143,15 @@ class NettyServer implements InternalServer, WithLogId { b.option(SO_BACKLOG, 128); b.childOption(SO_KEEPALIVE, true); } + + if (channelOptions != null) { + for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) { + @SuppressWarnings("unchecked") + ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey(); + b.childOption(key, entry.getValue()); + } + } + b.childHandler(new ChannelInitializer<Channel>() { @Override public void initChannel(Channel ch) throws Exception { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 40480f212..8847de89d 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -32,6 +32,7 @@ import io.grpc.internal.AbstractServerImplBuilder; import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.TransportTracer; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -39,7 +40,9 @@ import io.netty.handler.ssl.SslContext; import java.io.File; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; @@ -65,6 +68,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer private final SocketAddress address; private Class<? extends ServerChannel> channelType = NioServerSocketChannel.class; + private final Map<ChannelOption<?>, Object> channelOptions = + new HashMap<ChannelOption<?>, Object>(); @Nullable private EventLoopGroup bossEventLoopGroup; @Nullable @@ -124,6 +129,17 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer } /** + * Specifies a channel option. As the underlying channel as well as network implementation may + * ignore this value applications should consider it a hint. + * + * @since 1.9.0 + */ + public <T> NettyServerBuilder withChildOption(ChannelOption<T> option, T value) { + this.channelOptions.put(option, value); + return this; + } + + /** * Provides the boss EventGroupLoop to the server. * * <p>It's an optional parameter. If the user has not provided one when the server is built, the @@ -403,7 +419,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer } return new NettyServer( - address, channelType, bossEventLoopGroup, workerEventLoopGroup, + address, channelType, channelOptions, bossEventLoopGroup, workerEventLoopGroup, negotiator, streamTracerFactories, transportTracerFactory, maxConcurrentCallsPerConnection, flowControlWindow, maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos, diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java index a62e0ecfb..dc4c44e79 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java @@ -556,7 +556,9 @@ public class NettyClientTransportTest { private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException { server = new NettyServer( TestUtils.testServerAddress(0), - NioServerSocketChannel.class, group, group, negotiator, + NioServerSocketChannel.class, + new HashMap<ChannelOption<?>, Object>(), + group, group, negotiator, Collections.<ServerStreamTracer.Factory>emptyList(), TransportTracer.getDefaultFactory(), maxStreamsPerConnection, diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index c47a205c8..0a46c822b 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -23,9 +23,18 @@ import io.grpc.internal.ServerListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.TransportTracer; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; +import java.net.Socket; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -39,6 +48,7 @@ public class NettyServerTest { NettyServer ns = new NettyServer( addr, NioServerSocketChannel.class, + new HashMap<ChannelOption<?>, Object>(), null, // no boss group null, // no event group new ProtocolNegotiators.PlaintextNegotiator(), @@ -75,6 +85,7 @@ public class NettyServerTest { NettyServer ns = new NettyServer( addr, NioServerSocketChannel.class, + new HashMap<ChannelOption<?>, Object>(), null, // no boss group null, // no event group new ProtocolNegotiators.PlaintextNegotiator(), @@ -91,4 +102,65 @@ public class NettyServerTest { assertThat(ns.getPort()).isEqualTo(-1); } + + @Test(timeout = 60000) + public void childChannelOptions() throws Exception { + final int originalLowWaterMark = 2097169; + final int originalHighWaterMark = 2097211; + + Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>(); + + channelOptions.put(ChannelOption.WRITE_BUFFER_WATER_MARK, + new WriteBufferWaterMark(originalLowWaterMark, originalHighWaterMark)); + + final AtomicInteger lowWaterMark = new AtomicInteger(0); + final AtomicInteger highWaterMark = new AtomicInteger(0); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + + NettyServer ns = new NettyServer( + new InetSocketAddress(9999), + NioServerSocketChannel.class, + channelOptions, + null, // no boss group + null, // no event group + new ProtocolNegotiators.PlaintextNegotiator(), + Collections.<ServerStreamTracer.Factory>emptyList(), + TransportTracer.getDefaultFactory(), + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, 1, // ignore + 1, 1, // ignore + true, 0); // ignore + ns.start(new ServerListener() { + @Override + public ServerTransportListener transportCreated(ServerTransport transport) { + Channel channel = ((NettyServerTransport)transport).channel(); + WriteBufferWaterMark writeBufferWaterMark = channel.config() + .getOption(ChannelOption.WRITE_BUFFER_WATER_MARK); + lowWaterMark.set(writeBufferWaterMark.low()); + highWaterMark.set(writeBufferWaterMark.high()); + + countDownLatch.countDown(); + + return null; + } + + @Override + public void serverShutdown() {} + }); + + Socket socket = new Socket(); + socket.connect(new InetSocketAddress("localhost", 9999), 8000); + countDownLatch.await(); + socket.close(); + + assertThat(lowWaterMark.get()).isEqualTo(originalLowWaterMark); + assertThat(highWaterMark.get()).isEqualTo(originalHighWaterMark); + + ns.shutdown(); + } } |