From 49c8bdb60ac8783eda098aa42375b837342edde3 Mon Sep 17 00:00:00 2001 From: zpencer Date: Wed, 4 Apr 2018 17:04:29 -0700 Subject: netty: fix getListenSockets race (#4301) Move registration to separate future and wait for it. --- netty/src/main/java/io/grpc/netty/NettyServer.java | 28 +++++++++++++--------- 1 file changed, 17 insertions(+), 11 deletions(-) (limited to 'netty') diff --git a/netty/src/main/java/io/grpc/netty/NettyServer.java b/netty/src/main/java/io/grpc/netty/NettyServer.java index 9de2c918f..f9efdf03e 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServer.java +++ b/netty/src/main/java/io/grpc/netty/NettyServer.java @@ -92,9 +92,9 @@ class NettyServer implements InternalServer, WithLogId { private final List streamTracerFactories; private final TransportTracer.Factory transportTracerFactory; private final Channelz channelz; - // Only set once during start(). This code assumes all listen sockets are created at startup - // and never changed. In the future we may have >1 listen socket. - private ImmutableList> listenSockets; + // Only modified in event loop but safe to read any time. Set at startup and unset at shutdown. + // In the future we may have >1 listen socket. + private volatile ImmutableList> listenSockets = ImmutableList.of(); NettyServer( SocketAddress address, Class channelType, @@ -240,14 +240,7 @@ class NettyServer implements InternalServer, WithLogId { } }); // Bind and start to accept incoming connections. - ChannelFuture future = b.bind(address).addListener(new ChannelFutureListener() { - @Override - public void operationComplete(ChannelFuture f) throws Exception { - Instrumented listenSocket = new ListenSocket(f.channel()); - listenSockets = ImmutableList.of(listenSocket); - channelz.addListenSocket(listenSocket); - } - }); + ChannelFuture future = b.bind(address); try { future.await(); } catch (InterruptedException ex) { @@ -258,6 +251,19 @@ class NettyServer implements InternalServer, WithLogId { throw new IOException("Failed to bind", future.cause()); } channel = future.channel(); + Future channelzFuture = channel.eventLoop().submit(new Runnable() { + @Override + public void run() { + Instrumented listenSocket = new ListenSocket(channel); + listenSockets = ImmutableList.of(listenSocket); + channelz.addListenSocket(listenSocket); + } + }); + try { + channelzFuture.await(); + } catch (InterruptedException ex) { + throw new RuntimeException("Interrupted while registering listen socket to channelz", ex); + } } @Override -- cgit v1.2.3