aboutsummaryrefslogtreecommitdiff
path: root/netty
diff options
context:
space:
mode:
authorzpencer <spencerfang@google.com>2017-06-22 22:07:51 -0700
committerGitHub <noreply@github.com>2017-06-22 22:07:51 -0700
commit0fa222562330887b88da3cf82f2cfd7e07d1093c (patch)
tree61e2cf67c879b73e0cf5aa551127998bdd7cb70d /netty
parentb7e50ddd054934b14a94c62f853a2d0133ac699f (diff)
downloadgrpc-grpc-java-0fa222562330887b88da3cf82f2cfd7e07d1093c.tar.gz
netty: fix race condition for listeners attaching to connect future (#3122)
Sadly, the serverNotListening test is still flakey after this change, but this PR fixes a legit problem. The listener to the connect future depends on the channel pipeline being intact. But the way it is attached allows the connect attempt to fail, and have the entire pipeline being torn down by netty before the .addListener actually runs. The result is that the listener will be attached to an already completed future, and the logic will be applied to an empty pipeline. The fundamental problem is that there are two threads, the grpc thread, and the netty thread. This PR moves the listener attaching code into the netty thread, guaranteeing the listener is attached before any connection is made. It makes more sense for the code to live inside AbstractBufferingHandler, since handlers are generally free to swallow exceptions (the alternative is to make NettyClientHandler forward exceptions up the pipeline from itself). AbstractBufferingHandler needs the special guarantees, so it will be the one with special code.
Diffstat (limited to 'netty')
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientTransport.java17
-rw-r--r--netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java21
2 files changed, 22 insertions, 16 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
index 1c27222c2..96636ab6e 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientTransport.java
@@ -40,7 +40,6 @@ import io.netty.bootstrap.Bootstrap;
import io.netty.channel.Channel;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelFutureListener;
-import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOption;
import io.netty.channel.EventLoop;
import io.netty.channel.EventLoopGroup;
@@ -222,21 +221,7 @@ class NettyClientTransport implements ConnectionClientTransport {
// Start the write queue as soon as the channel is constructed
handler.startWriteQueue(channel);
// Start the connection operation to the server.
- channel.connect(address).addListener(new ChannelFutureListener() {
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- ChannelHandlerContext ctx = future.channel().pipeline().context(handler);
- if (ctx != null) {
- // NettyClientHandler doesn't propagate exceptions, but the negotiator will need the
- // exception to fail any writes. Note that this fires after handler, because it is as if
- // handler was propagating the notification.
- ctx.fireExceptionCaught(future.cause());
- }
- future.channel().pipeline().fireExceptionCaught(future.cause());
- }
- }
- });
+ channel.connect(address);
// This write will have no effect, yet it will only complete once the negotiationHandler
// flushes any pending writes.
channel.write(NettyClientHandler.NOOP_MESSAGE).addListener(new ChannelFutureListener() {
diff --git a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
index 2a2f9c719..c7dc2f3e6 100644
--- a/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
+++ b/netty/src/main/java/io/grpc/netty/ProtocolNegotiators.java
@@ -27,6 +27,8 @@ import io.grpc.Internal;
import io.grpc.Status;
import io.grpc.internal.GrpcUtil;
import io.netty.channel.ChannelDuplexHandler;
+import io.netty.channel.ChannelFuture;
+import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerAdapter;
import io.netty.channel.ChannelHandlerContext;
@@ -446,6 +448,25 @@ public final class ProtocolNegotiators {
}
/**
+ * Do not rely on channel handlers to propagate exceptions to us.
+ * {@link NettyClientHandler} is an example of a class that does not propagate exceptions.
+ * Add a listener to the connect future directly and do appropriate error handling.
+ */
+ @Override
+ public void connect(final ChannelHandlerContext ctx, SocketAddress remoteAddress,
+ SocketAddress localAddress, ChannelPromise promise) throws Exception {
+ super.connect(ctx, remoteAddress, localAddress, promise);
+ promise.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ fail(ctx, future.cause());
+ }
+ }
+ });
+ }
+
+ /**
* If we encounter an exception, then notify all buffered writes that we failed.
*/
@Override