From cdb1f54794e647848dbccf5233b47b1935529813 Mon Sep 17 00:00:00 2001 From: Xiaoshuang LU Date: Thu, 23 Nov 2017 08:00:33 +0800 Subject: netty: make server sockets be configurable --- netty/src/main/java/io/grpc/netty/NettyServer.java | 16 +++++ .../java/io/grpc/netty/NettyServerBuilder.java | 18 +++++- .../io/grpc/netty/NettyClientTransportTest.java | 4 +- .../test/java/io/grpc/netty/NettyServerTest.java | 72 ++++++++++++++++++++++ 4 files changed, 108 insertions(+), 2 deletions(-) (limited to 'netty') diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 6fb594ad2..d09fffb61 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -34,6 +34,7 @@ import io.netty.channel.Channel; import io.netty.channel.ChannelFuture; import io.netty.channel.ChannelFutureListener; import io.netty.channel.ChannelInitializer; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -42,7 +43,9 @@ import io.netty.util.ReferenceCounted; import java.io.IOException; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.logging.Level; import java.util.logging.Logger; import javax.annotation.Nullable; @@ -56,6 +59,7 @@ class NettyServer implements InternalServer, WithLogId { private final LogId logId = LogId.allocate(getClass().getName()); private final SocketAddress address; private final Class channelType; + private final Map, ?> channelOptions; private final ProtocolNegotiator protocolNegotiator; private final int maxStreamsPerConnection; private final boolean usingSharedBossGroup; @@ -80,6 +84,7 @@ class NettyServer implements InternalServer, WithLogId { NettyServer( SocketAddress address, Class channelType, + Map, ?> channelOptions, @Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup, ProtocolNegotiator protocolNegotiator, List streamTracerFactories, TransportTracer.Factory transportTracerFactory, @@ -90,6 +95,8 @@ class NettyServer implements InternalServer, WithLogId { boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) { this.address = address; this.channelType = checkNotNull(channelType, "channelType"); + checkNotNull(channelOptions, "channelOptions"); + this.channelOptions = new HashMap, Object>(channelOptions); this.bossGroup = bossGroup; this.workerGroup = workerGroup; this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator"); @@ -136,6 +143,15 @@ class NettyServer implements InternalServer, WithLogId { b.option(SO_BACKLOG, 128); b.childOption(SO_KEEPALIVE, true); } + + if (channelOptions != null) { + for (Map.Entry, ?> entry : channelOptions.entrySet()) { + @SuppressWarnings("unchecked") + ChannelOption key = (ChannelOption) entry.getKey(); + b.childOption(key, entry.getValue()); + } + } + b.childHandler(new ChannelInitializer() { @Override public void initChannel(Channel ch) throws Exception { diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java index 40480f212..8847de89d 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java @@ -32,6 +32,7 @@ import io.grpc.internal.AbstractServerImplBuilder; import io.grpc.internal.GrpcUtil; import io.grpc.internal.KeepAliveManager; import io.grpc.internal.TransportTracer; +import io.netty.channel.ChannelOption; import io.netty.channel.EventLoopGroup; import io.netty.channel.ServerChannel; import io.netty.channel.socket.nio.NioServerSocketChannel; @@ -39,7 +40,9 @@ import io.netty.handler.ssl.SslContext; import java.io.File; import java.net.InetSocketAddress; import java.net.SocketAddress; +import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import javax.annotation.CheckReturnValue; import javax.annotation.Nullable; @@ -65,6 +68,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder channelType = NioServerSocketChannel.class; + private final Map, Object> channelOptions = + new HashMap, Object>(); @Nullable private EventLoopGroup bossEventLoopGroup; @Nullable @@ -123,6 +128,17 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder NettyServerBuilder withChildOption(ChannelOption option, T value) { + this.channelOptions.put(option, value); + return this; + } + /** * Provides the boss EventGroupLoop to the server. * @@ -403,7 +419,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder, Object>(), + group, group, negotiator, Collections.emptyList(), TransportTracer.getDefaultFactory(), maxStreamsPerConnection, diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java index c47a205c8..0a46c822b 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java @@ -23,9 +23,18 @@ import io.grpc.internal.ServerListener; import io.grpc.internal.ServerTransport; import io.grpc.internal.ServerTransportListener; import io.grpc.internal.TransportTracer; +import io.netty.channel.Channel; +import io.netty.channel.ChannelOption; +import io.netty.channel.WriteBufferWaterMark; import io.netty.channel.socket.nio.NioServerSocketChannel; import java.net.InetSocketAddress; +import java.net.Socket; import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicInteger; + import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; @@ -39,6 +48,7 @@ public class NettyServerTest { NettyServer ns = new NettyServer( addr, NioServerSocketChannel.class, + new HashMap, Object>(), null, // no boss group null, // no event group new ProtocolNegotiators.PlaintextNegotiator(), @@ -75,6 +85,7 @@ public class NettyServerTest { NettyServer ns = new NettyServer( addr, NioServerSocketChannel.class, + new HashMap, Object>(), null, // no boss group null, // no event group new ProtocolNegotiators.PlaintextNegotiator(), @@ -91,4 +102,65 @@ public class NettyServerTest { assertThat(ns.getPort()).isEqualTo(-1); } + + @Test(timeout = 60000) + public void childChannelOptions() throws Exception { + final int originalLowWaterMark = 2097169; + final int originalHighWaterMark = 2097211; + + Map, Object> channelOptions = new HashMap, Object>(); + + channelOptions.put(ChannelOption.WRITE_BUFFER_WATER_MARK, + new WriteBufferWaterMark(originalLowWaterMark, originalHighWaterMark)); + + final AtomicInteger lowWaterMark = new AtomicInteger(0); + final AtomicInteger highWaterMark = new AtomicInteger(0); + + final CountDownLatch countDownLatch = new CountDownLatch(1); + + NettyServer ns = new NettyServer( + new InetSocketAddress(9999), + NioServerSocketChannel.class, + channelOptions, + null, // no boss group + null, // no event group + new ProtocolNegotiators.PlaintextNegotiator(), + Collections.emptyList(), + TransportTracer.getDefaultFactory(), + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, // ignore + 1, 1, // ignore + 1, 1, // ignore + true, 0); // ignore + ns.start(new ServerListener() { + @Override + public ServerTransportListener transportCreated(ServerTransport transport) { + Channel channel = ((NettyServerTransport)transport).channel(); + WriteBufferWaterMark writeBufferWaterMark = channel.config() + .getOption(ChannelOption.WRITE_BUFFER_WATER_MARK); + lowWaterMark.set(writeBufferWaterMark.low()); + highWaterMark.set(writeBufferWaterMark.high()); + + countDownLatch.countDown(); + + return null; + } + + @Override + public void serverShutdown() {} + }); + + Socket socket = new Socket(); + socket.connect(new InetSocketAddress("localhost", 9999), 8000); + countDownLatch.await(); + socket.close(); + + assertThat(lowWaterMark.get()).isEqualTo(originalLowWaterMark); + assertThat(highWaterMark.get()).isEqualTo(originalHighWaterMark); + + ns.shutdown(); + } } -- cgit v1.2.3