aboutsummaryrefslogtreecommitdiff
path: root/netty
diff options
context:
space:
mode:
authorXiaoshuang LU <luxiaoshuang@qiyi.com>2017-11-23 08:00:33 +0800
committerCarl Mastrangelo <notcarl@google.com>2017-11-22 16:00:33 -0800
commitcdb1f54794e647848dbccf5233b47b1935529813 (patch)
tree2ffe367717797ab5b12185e51e52d3add09ff53a /netty
parent7fd199f32e1ef78a98ebb1b76de7b63fea6d2d90 (diff)
downloadgrpc-grpc-java-cdb1f54794e647848dbccf5233b47b1935529813.tar.gz
netty: make server sockets be configurable
Diffstat (limited to 'netty')
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServer.java16
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServerBuilder.java18
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java4
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyServerTest.java72
4 files changed, 108 insertions, 2 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index 6fb594ad2..d09fffb61 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -34,6 +34,7 @@ import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelInitializer;
+import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -42,7 +43,9 @@ import io.netty.util.ReferenceCounted;
import java.io.IOException;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.logging.Level;
import java.util.logging.Logger;
import javax.annotation.Nullable;
@@ -56,6 +59,7 @@ class NettyServer implements InternalServer, WithLogId {
private final LogId logId = LogId.allocate(getClass().getName());
private final SocketAddress address;
private final Class<? extends ServerChannel> channelType;
+ private final Map<ChannelOption<?>, ?> channelOptions;
private final ProtocolNegotiator protocolNegotiator;
private final int maxStreamsPerConnection;
private final boolean usingSharedBossGroup;
@@ -80,6 +84,7 @@ class NettyServer implements InternalServer, WithLogId {
NettyServer(
SocketAddress address, Class<? extends ServerChannel> channelType,
+ Map<ChannelOption<?>, ?> channelOptions,
@Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories,
TransportTracer.Factory transportTracerFactory,
@@ -90,6 +95,8 @@ class NettyServer implements InternalServer, WithLogId {
boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
+ checkNotNull(channelOptions, "channelOptions");
+ this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
@@ -136,6 +143,15 @@ class NettyServer implements InternalServer, WithLogId {
b.option(SO_BACKLOG, 128);
b.childOption(SO_KEEPALIVE, true);
}
+
+ if (channelOptions != null) {
+ for (Map.Entry<ChannelOption<?>, ?> entry : channelOptions.entrySet()) {
+ @SuppressWarnings("unchecked")
+ ChannelOption<Object> key = (ChannelOption<Object>) entry.getKey();
+ b.childOption(key, entry.getValue());
+ }
+ }
+
b.childHandler(new ChannelInitializer<Channel>() {
@Override
public void initChannel(Channel ch) throws Exception {
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index 40480f212..8847de89d 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -32,6 +32,7 @@ import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.TransportTracer;
+import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -39,7 +40,9 @@ import io.netty.handler.ssl.SslContext;
import java.io.File;
import java.net.InetSocketAddress;
import java.net.SocketAddress;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.TimeUnit;
import javax.annotation.CheckReturnValue;
import javax.annotation.Nullable;
@@ -65,6 +68,8 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
private final SocketAddress address;
private Class<? extends ServerChannel> channelType = NioServerSocketChannel.class;
+ private final Map<ChannelOption<?>, Object> channelOptions =
+ new HashMap<ChannelOption<?>, Object>();
@Nullable
private EventLoopGroup bossEventLoopGroup;
@Nullable
@@ -124,6 +129,17 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
}
/**
+ * Specifies a channel option. As the underlying channel as well as network implementation may
+ * ignore this value applications should consider it a hint.
+ *
+ * @since 1.9.0
+ */
+ public <T> NettyServerBuilder withChildOption(ChannelOption<T> option, T value) {
+ this.channelOptions.put(option, value);
+ return this;
+ }
+
+ /**
* Provides the boss EventGroupLoop to the server.
*
* <p>It's an optional parameter. If the user has not provided one when the server is built, the
@@ -403,7 +419,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
}
return new NettyServer(
- address, channelType, bossEventLoopGroup, workerEventLoopGroup,
+ address, channelType, channelOptions, bossEventLoopGroup, workerEventLoopGroup,
negotiator, streamTracerFactories, transportTracerFactory,
maxConcurrentCallsPerConnection, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index a62e0ecfb..dc4c44e79 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -556,7 +556,9 @@ public class NettyClientTransportTest {
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer(
TestUtils.testServerAddress(0),
- NioServerSocketChannel.class, group, group, negotiator,
+ NioServerSocketChannel.class,
+ new HashMap<ChannelOption<?>, Object>(),
+ group, group, negotiator,
Collections.<ServerStreamTracer.Factory>emptyList(),
TransportTracer.getDefaultFactory(),
maxStreamsPerConnection,
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index c47a205c8..0a46c822b 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -23,9 +23,18 @@ import io.grpc.internal.ServerListener;
import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.TransportTracer;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelOption;
+import io.netty.channel.WriteBufferWaterMark;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
+import java.net.Socket;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicInteger;
+
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
@@ -39,6 +48,7 @@ public class NettyServerTest {
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
+ new HashMap<ChannelOption<?>, Object>(),
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
@@ -75,6 +85,7 @@ public class NettyServerTest {
NettyServer ns = new NettyServer(
addr,
NioServerSocketChannel.class,
+ new HashMap<ChannelOption<?>, Object>(),
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
@@ -91,4 +102,65 @@ public class NettyServerTest {
assertThat(ns.getPort()).isEqualTo(-1);
}
+
+ @Test(timeout = 60000)
+ public void childChannelOptions() throws Exception {
+ final int originalLowWaterMark = 2097169;
+ final int originalHighWaterMark = 2097211;
+
+ Map<ChannelOption<?>, Object> channelOptions = new HashMap<ChannelOption<?>, Object>();
+
+ channelOptions.put(ChannelOption.WRITE_BUFFER_WATER_MARK,
+ new WriteBufferWaterMark(originalLowWaterMark, originalHighWaterMark));
+
+ final AtomicInteger lowWaterMark = new AtomicInteger(0);
+ final AtomicInteger highWaterMark = new AtomicInteger(0);
+
+ final CountDownLatch countDownLatch = new CountDownLatch(1);
+
+ NettyServer ns = new NettyServer(
+ new InetSocketAddress(9999),
+ NioServerSocketChannel.class,
+ channelOptions,
+ null, // no boss group
+ null, // no event group
+ new ProtocolNegotiators.PlaintextNegotiator(),
+ Collections.<ServerStreamTracer.Factory>emptyList(),
+ TransportTracer.getDefaultFactory(),
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, // ignore
+ 1, 1, // ignore
+ 1, 1, // ignore
+ true, 0); // ignore
+ ns.start(new ServerListener() {
+ @Override
+ public ServerTransportListener transportCreated(ServerTransport transport) {
+ Channel channel = ((NettyServerTransport)transport).channel();
+ WriteBufferWaterMark writeBufferWaterMark = channel.config()
+ .getOption(ChannelOption.WRITE_BUFFER_WATER_MARK);
+ lowWaterMark.set(writeBufferWaterMark.low());
+ highWaterMark.set(writeBufferWaterMark.high());
+
+ countDownLatch.countDown();
+
+ return null;
+ }
+
+ @Override
+ public void serverShutdown() {}
+ });
+
+ Socket socket = new Socket();
+ socket.connect(new InetSocketAddress("localhost", 9999), 8000);
+ countDownLatch.await();
+ socket.close();
+
+ assertThat(lowWaterMark.get()).isEqualTo(originalLowWaterMark);
+ assertThat(highWaterMark.get()).isEqualTo(originalHighWaterMark);
+
+ ns.shutdown();
+ }
}