aboutsummaryrefslogtreecommitdiff
path: root/netty
diff options
context:
space:
mode:
authorCarl Mastrangelo <notcarl@google.com>2017-09-12 19:56:25 -0700
committerGitHub <noreply@github.com>2017-09-12 19:56:25 -0700
commit332c46ff1a5f5d015502c9c22fe8297304760447 (patch)
treed70c7798e1e547de1c632debebfda4fd5481a8ef /netty
parentdf92533524ab6d02d1b90370197518b0f2cfde57 (diff)
downloadgrpc-grpc-java-332c46ff1a5f5d015502c9c22fe8297304760447.tar.gz
netty: hide ProtocolNegotiator, and expose initial ChannelHandler
* netty: hide ProtocolNegotiator, and expose initial ChannelHandler This change does two things: it hides the ProtocolNegotiator from NSB, and exposes an internal "init channel" on NSB and NCB. The reason for the change is that PN is not a powerful enough abstraction for internal Google use (and for some other outside users with highly specific uses). The new API exposes adding a ChannelHandler to the pipeline upon registration of the channel. To accomplish this, NettyClientTransport is modified to use ChannelInitializer. There is a comment explaining why it cannot be used, but after looking at the the original discussion, I believe the reasons for doing so are no longer applicable. Specifically, at the time that CI was removed, there was no WriteQueue class. The WQ class buffers all writes and executes them on the EventLoop. Prior to WQ it was not the case that all writes happened on the loop, so it could race. If the write was not on the loop, it would be put on the loops execution queue, but with the CI handler as the target. Since CI removed itself upon registration, the write wouldn get fired on the wrong handler. With the additional of WQ, this is no longer a problem. All writes go through WQ, and only execute on the loop, so pipeline changes are no longer racy. ...That is, except for the initial noop write. This does still experience the race. If the channel is failed during registration or connect, the lifecycle manager will fail for differing, racy reasons. ==== To make things more uniform across NCT and NST, I have put them both back to using CI. I have added listeners to each of the bootstrap futures. I have also moved the initial write to the CI, so that it always goes through the the buffering negotiation handler. Lastly, racy shutdown errors will be logged so that if multiple callbacks try to shutdown, it will be obvious where they came from and in which order they happened. I am not sure how to test the raciness of this code, but I *think* it is deterministic. From my reading, Promises are resolved before channel events so the first future to complete should be the winner. Since listeners are always added from the same thread, and resolved by the loop, I think this forces determinism. One last note: the negotiator has a scheme that is hard coded after the transport has started. This makes it impossible to change schemes after the channel is started. Thats okay, but it should be a use case we knowingly prevent. Others may want to do something more bold than we do.
Diffstat (limited to 'netty')
-rw-r--r--netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java38
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java52
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientTransport.java116
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServer.java31
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServerBuilder.java52
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java22
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyServerTest.java2
7 files changed, 235 insertions, 78 deletions
diff --git a/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java
new file mode 100644
index 000000000..606a2959e
--- /dev/null
+++ b/netty/src/main/java/io/grpc/netty/InternalNettyServerBuilder.java
@@ -0,0 +1,38 @@
+/*
+ * Copyright 2016, gRPC Authors All rights reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.netty;
+
+import io.grpc.Internal;
+import io.netty.channel.ChannelHandler;
+
+/**
+ * Internal {@link InternalNettyServerBuilder} accessor. This is intended for usage internal to
+ * the gRPC team. If you *really* think you need to use this, contact the gRPC team first.
+ */
+@Internal
+public final class InternalNettyServerBuilder {
+
+ /**
+ * Adds an initialization handler to the Netty child at Channel initialization time. It must be
+ * annotated {@link ChannelHandler.Sharable}.
+ */
+ public static NettyServerBuilder initHandler(NettyServerBuilder nsb, ChannelHandler initHandler) {
+ return nsb.initHandler(initHandler);
+ }
+
+ private InternalNettyServerBuilder() {}
+}
diff --git a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
index 2fc7a3851..4f545237a 100644
--- a/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyChannelBuilder.java
@@ -38,6 +38,7 @@ import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.SharedResourceHolder;
import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.socket.nio.NioSocketChannel;
@@ -73,6 +74,7 @@ public final class NettyChannelBuilder
@Nullable
private EventLoopGroup eventLoopGroup;
private SslContext sslContext;
+ @Nullable private ChannelHandler initHandler;
private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private int maxHeaderListSize = GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE;
private long keepAliveTimeNanos = KEEPALIVE_TIME_NANOS_DISABLED;
@@ -221,6 +223,16 @@ public final class NettyChannelBuilder
return this;
}
+ final NettyChannelBuilder initHandler(ChannelHandler initHandler) {
+ if (initHandler != null) {
+ checkArgument(
+ initHandler.getClass().isAnnotationPresent(ChannelHandler.Sharable.class),
+ "initHandler must be sharable");
+ }
+ this.initHandler = initHandler;
+ return this;
+ }
+
/**
* Equivalent to using {@link #negotiationType(NegotiationType)} with {@code PLAINTEXT} or
* {@code PLAINTEXT_UPGRADE}.
@@ -322,9 +334,20 @@ public final class NettyChannelBuilder
@CheckReturnValue
@Internal
protected ClientTransportFactory buildTransportFactory() {
- return new NettyTransportFactory(dynamicParamsFactory, channelType, channelOptions,
- negotiationType, sslContext, eventLoopGroup, flowControlWindow, maxInboundMessageSize(),
- maxHeaderListSize, keepAliveTimeNanos, keepAliveTimeoutNanos, keepAliveWithoutCalls);
+ return new NettyTransportFactory(
+ dynamicParamsFactory,
+ channelType,
+ channelOptions,
+ negotiationType,
+ sslContext,
+ initHandler,
+ eventLoopGroup,
+ flowControlWindow,
+ maxInboundMessageSize(),
+ maxHeaderListSize,
+ keepAliveTimeNanos,
+ keepAliveTimeoutNanos,
+ keepAliveWithoutCalls);
}
@Override
@@ -433,6 +456,7 @@ public final class NettyChannelBuilder
private final Class<? extends Channel> channelType;
private final Map<ChannelOption<?>, ?> channelOptions;
private final NegotiationType negotiationType;
+ private final ChannelHandler initHandler;
private final EventLoopGroup group;
private final boolean usingSharedGroup;
private final int flowControlWindow;
@@ -444,11 +468,20 @@ public final class NettyChannelBuilder
private boolean closed;
- NettyTransportFactory(TransportCreationParamsFilterFactory transportCreationParamsFilterFactory,
- Class<? extends Channel> channelType, Map<ChannelOption<?>, ?> channelOptions,
- NegotiationType negotiationType, SslContext sslContext, EventLoopGroup group,
- int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
- long keepAliveTimeNanos, long keepAliveTimeoutNanos, boolean keepAliveWithoutCalls) {
+ NettyTransportFactory(
+ TransportCreationParamsFilterFactory transportCreationParamsFilterFactory,
+ Class<? extends Channel> channelType,
+ Map<ChannelOption<?>, ?> channelOptions,
+ NegotiationType negotiationType,
+ SslContext sslContext,
+ ChannelHandler initHandler,
+ EventLoopGroup group,
+ int flowControlWindow,
+ int maxMessageSize,
+ int maxHeaderListSize,
+ long keepAliveTimeNanos,
+ long keepAliveTimeoutNanos,
+ boolean keepAliveWithoutCalls) {
this.channelType = channelType;
this.negotiationType = negotiationType;
this.channelOptions = new HashMap<ChannelOption<?>, Object>(channelOptions);
@@ -458,6 +491,7 @@ public final class NettyChannelBuilder
new DefaultNettyTransportCreationParamsFilterFactory(sslContext);
}
this.transportCreationParamsFilterFactory = transportCreationParamsFilterFactory;
+ this.initHandler = initHandler;
this.flowControlWindow = flowControlWindow;
this.maxMessageSize = maxMessageSize;
@@ -491,7 +525,7 @@ public final class NettyChannelBuilder
};
NettyClientTransport transport = new NettyClientTransport(
dparams.getTargetServerAddress(), channelType, channelOptions, group,
- dparams.getProtocolNegotiator(), flowControlWindow,
+ dparams.getProtocolNegotiator(), initHandler, flowControlWindow,
maxMessageSize, maxHeaderListSize, keepAliveTimeNanosState.get(), keepAliveTimeoutNanos,
keepAliveWithoutCalls, dparams.getAuthority(), dparams.getUserAgent(),
tooManyPingsRunnable);
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 189b06a48..aa1651003 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -36,10 +36,13 @@ import io.grpc.internal.KeepAliveManager;
import io.grpc.internal.KeepAliveManager.ClientKeepAlivePinger;
import io.grpc.internal.LogId;
import io.grpc.internal.StatsTraceContext;
+import io.grpc.netty.ProtocolNegotiator.Handler;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
+import io.netty.channel.ChannelInitializer;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
@@ -49,19 +52,26 @@ import io.netty.util.AsciiString;
import java.net.SocketAddress;
import java.nio.channels.ClosedChannelException;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.concurrent.Executor;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Level;
+import java.util.logging.Logger;
import javax.annotation.Nullable;
/**
* A Netty-based {@link ConnectionClientTransport} implementation.
*/
class NettyClientTransport implements ConnectionClientTransport {
+ private static final Logger logger = Logger.getLogger(NettyClientTransport.class.getName());
+
private final LogId logId = LogId.allocate(getClass().getName());
private final Map<ChannelOption<?>, ?> channelOptions;
private final SocketAddress address;
private final Class<? extends Channel> channelType;
private final EventLoopGroup group;
private final ProtocolNegotiator negotiator;
+ private final ChannelHandler initHandler;
private final AsciiString authority;
private final AsciiString userAgent;
private final int flowControlWindow;
@@ -73,7 +83,7 @@ class NettyClientTransport implements ConnectionClientTransport {
private final boolean keepAliveWithoutCalls;
private final Runnable tooManyPingsRunnable;
- private ProtocolNegotiator.Handler negotiationHandler;
+ private AsciiString negotiatorScheme;
private NettyClientHandler handler;
// We should not send on the channel until negotiation completes. This is a hard requirement
// by SslHandler but is appropriate for HTTP/1.1 Upgrade as well.
@@ -82,15 +92,26 @@ class NettyClientTransport implements ConnectionClientTransport {
private Status statusExplainingWhyTheChannelIsNull;
/** Since not thread-safe, may only be used from event loop. */
private ClientTransportLifecycleManager lifecycleManager;
+ private final AtomicBoolean notifyClosed = new AtomicBoolean();
NettyClientTransport(
- SocketAddress address, Class<? extends Channel> channelType,
- Map<ChannelOption<?>, ?> channelOptions, EventLoopGroup group,
- ProtocolNegotiator negotiator, int flowControlWindow, int maxMessageSize,
- int maxHeaderListSize, long keepAliveTimeNanos, long keepAliveTimeoutNanos,
- boolean keepAliveWithoutCalls, String authority, @Nullable String userAgent,
+ SocketAddress address,
+ Class<? extends Channel> channelType,
+ Map<ChannelOption<?>, ?> channelOptions,
+ EventLoopGroup group,
+ ProtocolNegotiator negotiator,
+ @Nullable ChannelHandler initHandler,
+ int flowControlWindow,
+ int maxMessageSize,
+ int maxHeaderListSize,
+ long keepAliveTimeNanos,
+ long keepAliveTimeoutNanos,
+ boolean keepAliveWithoutCalls,
+ String authority,
+ @Nullable String userAgent,
Runnable tooManyPingsRunnable) {
this.negotiator = Preconditions.checkNotNull(negotiator, "negotiator");
+ this.initHandler = initHandler;
this.address = Preconditions.checkNotNull(address, "address");
this.group = Preconditions.checkNotNull(group, "group");
this.channelType = Preconditions.checkNotNull(channelType, "channelType");
@@ -151,8 +172,23 @@ class NettyClientTransport implements ConnectionClientTransport {
return NettyClientTransport.this.statusFromFailedFuture(f);
}
},
- method, headers, channel, authority, negotiationHandler.scheme(), userAgent,
- statsTraceCtx);
+ method, headers, channel, authority, negotiatorScheme, userAgent, statsTraceCtx);
+ }
+
+ private void lifecycleManagerNotifyTerminated(String reason, Throwable t) {
+ lifecycleManagerNotifyTerminated(reason, Utils.statusFromThrowable(t));
+ }
+
+ private void lifecycleManagerNotifyTerminated(String detail, Status s) {
+ if (!notifyClosed.compareAndSet(false, true)) {
+ logger.log(
+ Level.FINE, detail + "ignoring additional lifecycle errors", s.asRuntimeException());
+ return;
+ }
+ if (detail != null) {
+ s = s.augmentDescription(detail);
+ }
+ lifecycleManager.notifyTerminated(s);
}
@SuppressWarnings("unchecked")
@@ -171,8 +207,6 @@ class NettyClientTransport implements ConnectionClientTransport {
maxHeaderListSize, Ticker.systemTicker(), tooManyPingsRunnable);
NettyHandlerSettings.setAutoWindow(handler);
- negotiationHandler = negotiator.newHandler(handler);
-
Bootstrap b = new Bootstrap();
b.group(eventLoop);
b.channel(channelType);
@@ -186,13 +220,43 @@ class NettyClientTransport implements ConnectionClientTransport {
b.option((ChannelOption<Object>) entry.getKey(), entry.getValue());
}
- /**
- * We don't use a ChannelInitializer in the client bootstrap because its "initChannel" method
- * is executed in the event loop and we need this handler to be in the pipeline immediately so
- * that it may begin buffering writes.
- */
- b.handler(negotiationHandler);
+ final class LifecycleChannelFutureListener implements ChannelFutureListener {
+ private final String reason;
+
+ LifecycleChannelFutureListener(String reason) {
+ this.reason = reason;
+ }
+
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ lifecycleManagerNotifyTerminated(reason, future.cause());
+ }
+ }
+ }
+
+ final Handler negotiationHandler = negotiator.newHandler(handler);
+ negotiatorScheme = negotiationHandler.scheme();
+
+ b.handler(new ChannelInitializer<Channel>() {
+ @Override
+ protected void initChannel(Channel ch) throws Exception {
+ if (initHandler != null) {
+ ch.pipeline().addFirst(initHandler);
+ }
+
+ ch.pipeline().addLast(negotiationHandler);
+
+ // This write will have no effect, yet it will only complete once the negotiationHandler
+ // flushes any pending writes.
+ ch.writeAndFlush(NettyClientHandler.NOOP_MESSAGE)
+ .addListener(new LifecycleChannelFutureListener("noop write"));
+ }
+ });
+
ChannelFuture regFuture = b.register();
+ regFuture.addListener(new LifecycleChannelFutureListener("register"));
+
channel = regFuture.channel();
if (channel == null) {
// Initialization has failed badly. All new streams should be made to fail.
@@ -210,29 +274,15 @@ class NettyClientTransport implements ConnectionClientTransport {
// could use GlobalEventExecutor (which is what regFuture would use for notifying
// listeners in this case), but avoiding on-demand thread creation in an error case seems
// a good idea and is probably clearer threading.
- lifecycleManager.notifyTerminated(statusExplainingWhyTheChannelIsNull);
+ lifecycleManagerNotifyTerminated(null, statusExplainingWhyTheChannelIsNull);
}
};
}
// Start the write queue as soon as the channel is constructed
handler.startWriteQueue(channel);
- // This write will have no effect, yet it will only complete once the negotiationHandler
- // flushes any pending writes. We need it to be staged *before* the `connect` so that
- // the channel can't have been closed yet, removing all handlers. This write will sit in the
- // AbstractBufferingHandler's buffer, and will either be flushed on a successful connection,
- // or failed if the connection fails.
- channel.writeAndFlush(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- // Need to notify of this failure, because NettyClientHandler may not have been added to
- // the pipeline before the error occurred.
- lifecycleManager.notifyTerminated(Utils.statusFromThrowable(future.cause()));
- }
- }
- });
+
// Start the connection operation to the server.
- channel.connect(address);
+ channel.connect(address).addListener(new LifecycleChannelFutureListener("connect"));
if (keepAliveManager != null) {
keepAliveManager.onTransportStarted();
diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java
index ad66540ef..db9380300 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServer.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServer.java
@@ -30,6 +30,7 @@ import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
@@ -52,6 +53,7 @@ class NettyServer implements InternalServer {
private final SocketAddress address;
private final Class<? extends ServerChannel> channelType;
+ private final ChannelHandler initHandler;
private final ProtocolNegotiator protocolNegotiator;
private final int maxStreamsPerConnection;
private final boolean usingSharedBossGroup;
@@ -74,19 +76,30 @@ class NettyServer implements InternalServer {
private final List<ServerStreamTracer.Factory> streamTracerFactories;
NettyServer(
- SocketAddress address, Class<? extends ServerChannel> channelType,
- @Nullable EventLoopGroup bossGroup, @Nullable EventLoopGroup workerGroup,
- ProtocolNegotiator protocolNegotiator, List<ServerStreamTracer.Factory> streamTracerFactories,
- int maxStreamsPerConnection, int flowControlWindow, int maxMessageSize, int maxHeaderListSize,
- long keepAliveTimeInNanos, long keepAliveTimeoutInNanos,
+ SocketAddress address,
+ Class<? extends ServerChannel> channelType,
+ @Nullable EventLoopGroup bossGroup,
+ @Nullable EventLoopGroup workerGroup,
+ ProtocolNegotiator protocolNegotiator,
+ @Nullable ChannelHandler initHandler,
+ List<ServerStreamTracer.Factory> streamTracerFactories,
+ int maxStreamsPerConnection,
+ int flowControlWindow,
+ int maxMessageSize,
+ int maxHeaderListSize,
+ long keepAliveTimeInNanos,
+ long keepAliveTimeoutInNanos,
long maxConnectionIdleInNanos,
- long maxConnectionAgeInNanos, long maxConnectionAgeGraceInNanos,
- boolean permitKeepAliveWithoutCalls, long permitKeepAliveTimeInNanos) {
+ long maxConnectionAgeInNanos,
+ long maxConnectionAgeGraceInNanos,
+ boolean permitKeepAliveWithoutCalls,
+ long permitKeepAliveTimeInNanos) {
this.address = address;
this.channelType = checkNotNull(channelType, "channelType");
this.bossGroup = bossGroup;
this.workerGroup = workerGroup;
this.protocolNegotiator = checkNotNull(protocolNegotiator, "protocolNegotiator");
+ this.initHandler = initHandler;
this.streamTracerFactories = checkNotNull(streamTracerFactories, "streamTracerFactories");
this.usingSharedBossGroup = bossGroup == null;
this.usingSharedWorkerGroup = workerGroup == null;
@@ -133,6 +146,10 @@ class NettyServer implements InternalServer {
@Override
public void initChannel(Channel ch) throws Exception {
+ if (initHandler != null) {
+ ch.pipeline().addFirst(initHandler);
+ }
+
long maxConnectionAgeInNanos = NettyServer.this.maxConnectionAgeInNanos;
if (maxConnectionAgeInNanos != MAX_CONNECTION_AGE_NANOS_DISABLED) {
// apply a random jitter of +/-10% to max connection age
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
index a2e0251eb..88ad65d61 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerBuilder.java
@@ -25,11 +25,11 @@ import static io.grpc.internal.GrpcUtil.SERVER_KEEPALIVE_TIME_NANOS_DISABLED;
import com.google.common.base.Preconditions;
import com.google.errorprone.annotations.CanIgnoreReturnValue;
import io.grpc.ExperimentalApi;
-import io.grpc.Internal;
import io.grpc.ServerStreamTracer;
import io.grpc.internal.AbstractServerImplBuilder;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.KeepAliveManager;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.ServerChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
@@ -68,7 +68,7 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
@Nullable
private EventLoopGroup workerEventLoopGroup;
private SslContext sslContext;
- private ProtocolNegotiator protocolNegotiator;
+ private ChannelHandler initHandler;
private int maxConcurrentCallsPerConnection = Integer.MAX_VALUE;
private int flowControlWindow = DEFAULT_FLOW_CONTROL_WINDOW;
private int maxMessageSize = DEFAULT_MAX_MESSAGE_SIZE;
@@ -181,16 +181,14 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
return this;
}
- /**
- * Sets the {@link ProtocolNegotiator} to be used. If non-{@code null}, overrides the value
- * specified in {@link #sslContext(SslContext)}.
- *
- * <p>Default: {@code null}.
- */
- @Internal
- public final NettyServerBuilder protocolNegotiator(
- @Nullable ProtocolNegotiator protocolNegotiator) {
- this.protocolNegotiator = protocolNegotiator;
+ NettyServerBuilder initHandler(ChannelHandler initHandler) {
+ if (initHandler != null) {
+ checkArgument(
+ initHandler.getClass().isAnnotationPresent(ChannelHandler.Sharable.class),
+ "initHandler must be sharable");
+ }
+
+ this.initHandler = initHandler;
return this;
}
@@ -373,19 +371,29 @@ public final class NettyServerBuilder extends AbstractServerImplBuilder<NettySer
@CheckReturnValue
protected NettyServer buildTransportServer(
List<ServerStreamTracer.Factory> streamTracerFactories) {
- ProtocolNegotiator negotiator = protocolNegotiator;
- if (negotiator == null) {
- negotiator = sslContext != null ? ProtocolNegotiators.serverTls(sslContext) :
- ProtocolNegotiators.serverPlaintext();
- }
+ ProtocolNegotiator negotiator = sslContext != null
+ ? ProtocolNegotiators.serverTls(sslContext)
+ : ProtocolNegotiators.serverPlaintext();
return new NettyServer(
- address, channelType, bossEventLoopGroup, workerEventLoopGroup,
- negotiator, streamTracerFactories, maxConcurrentCallsPerConnection, flowControlWindow,
- maxMessageSize, maxHeaderListSize, keepAliveTimeInNanos, keepAliveTimeoutInNanos,
+ address,
+ channelType,
+ bossEventLoopGroup,
+ workerEventLoopGroup,
+ negotiator,
+ initHandler,
+ streamTracerFactories,
+ maxConcurrentCallsPerConnection,
+ flowControlWindow,
+ maxMessageSize,
+ maxHeaderListSize,
+ keepAliveTimeInNanos,
+ keepAliveTimeoutInNanos,
maxConnectionIdleInNanos,
- maxConnectionAgeInNanos, maxConnectionAgeGraceInNanos,
- permitKeepAliveWithoutCalls, permitKeepAliveTimeInNanos);
+ maxConnectionAgeInNanos,
+ maxConnectionAgeGraceInNanos,
+ permitKeepAliveWithoutCalls,
+ permitKeepAliveTimeInNanos);
}
@Override
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
index 6912e39d1..6df65b159 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientTransportTest.java
@@ -35,6 +35,7 @@ import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import com.google.common.io.ByteStreams;
+import com.google.common.truth.Truth;
import com.google.common.util.concurrent.SettableFuture;
import io.grpc.Attributes;
import io.grpc.CallOptions;
@@ -60,6 +61,7 @@ import io.grpc.internal.ServerTransport;
import io.grpc.internal.ServerTransportListener;
import io.grpc.internal.testing.TestUtils;
import io.netty.channel.ChannelConfig;
+import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelOption;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannelConfig;
@@ -167,7 +169,7 @@ public class NettyClientTransportTest {
int soLinger = 123;
channelOptions.put(ChannelOption.SO_LINGER, soLinger);
NettyClientTransport transport = new NettyClientTransport(
- address, NioSocketChannel.class, channelOptions, group, newNegotiator(),
+ address, NioSocketChannel.class, channelOptions, group, newNegotiator(), null,
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE,
KEEPALIVE_TIME_NANOS_DISABLED, 1L, false, authority, null /* user agent */,
tooManyPingsRunnable);
@@ -264,7 +266,9 @@ public class NettyClientTransportTest {
rpc.waitForClose();
fail("expected exception");
} catch (ExecutionException ex) {
- assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus());
+ StatusException se = (StatusException) ex.getCause();
+ assertSame(failureStatus.getCode(), se.getStatus().getCode());
+ Truth.assertThat(se.getStatus().getDescription()).contains(failureStatus.getDescription());
}
}
@@ -284,7 +288,9 @@ public class NettyClientTransportTest {
rpc.waitForClose();
fail("expected exception");
} catch (ExecutionException ex) {
- assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus());
+ StatusException se = (StatusException) ex.getCause();
+ assertSame(failureStatus.getCode(), se.getStatus().getCode());
+ Truth.assertThat(se.getStatus().getDescription()).contains(failureStatus.getDescription());
}
}
@@ -315,7 +321,9 @@ public class NettyClientTransportTest {
rpc.waitForClose();
fail("expected exception");
} catch (ExecutionException ex) {
- assertSame(failureStatus, ((StatusException) ex.getCause()).getStatus());
+ StatusException se = (StatusException) ex.getCause();
+ assertSame(failureStatus.getCode(), se.getStatus().getCode());
+ Truth.assertThat(se.getStatus().getDescription()).contains(failureStatus.getDescription());
}
}
@@ -372,7 +380,7 @@ public class NettyClientTransportTest {
authority = GrpcUtil.authorityFromHostAndPort(address.getHostString(), address.getPort());
NettyClientTransport transport = new NettyClientTransport(
address, CantConstructChannel.class, new HashMap<ChannelOption<?>, Object>(), group,
- newNegotiator(), DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
+ newNegotiator(), null, DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE,
GrpcUtil.DEFAULT_MAX_HEADER_LIST_SIZE, KEEPALIVE_TIME_NANOS_DISABLED, 1, false, authority,
null, tooManyPingsRunnable);
transports.add(transport);
@@ -540,7 +548,7 @@ public class NettyClientTransportTest {
}
NettyClientTransport transport = new NettyClientTransport(
address, NioSocketChannel.class, new HashMap<ChannelOption<?>, Object>(), group, negotiator,
- DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
+ (ChannelHandler) null, DEFAULT_WINDOW_SIZE, maxMsgSize, maxHeaderListSize,
keepAliveTimeNano, keepAliveTimeoutNano,
false, authority, userAgent, tooManyPingsRunnable);
transports.add(transport);
@@ -554,7 +562,7 @@ public class NettyClientTransportTest {
private void startServer(int maxStreamsPerConnection, int maxHeaderListSize) throws IOException {
server = new NettyServer(
TestUtils.testServerAddress(0),
- NioServerSocketChannel.class, group, group, negotiator,
+ NioServerSocketChannel.class, group, group, negotiator, null,
Collections.<ServerStreamTracer.Factory>emptyList(), maxStreamsPerConnection,
DEFAULT_WINDOW_SIZE, DEFAULT_MAX_MESSAGE_SIZE, maxHeaderListSize,
DEFAULT_SERVER_KEEPALIVE_TIME_NANOS, DEFAULT_SERVER_KEEPALIVE_TIMEOUT_NANOS,
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
index 68e183d64..b6952b399 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerTest.java
@@ -41,6 +41,7 @@ public class NettyServerTest {
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
+ null, // no channel init
Collections.<ServerStreamTracer.Factory>emptyList(),
1, // ignore
1, // ignore
@@ -76,6 +77,7 @@ public class NettyServerTest {
null, // no boss group
null, // no event group
new ProtocolNegotiators.PlaintextNegotiator(),
+ null, // no channel init
Collections.<ServerStreamTracer.Factory>emptyList(),
1, // ignore
1, // ignore