aboutsummaryrefslogtreecommitdiff
path: root/netty
diff options
context:
space:
mode:
authorZHANG Dapeng <zdapeng@google.com>2018-03-28 15:58:31 -0700
committerGitHub <noreply@github.com>2018-03-28 15:58:31 -0700
commitbdecdaea22ba4e56949be7534aa47306e5b5f758 (patch)
tree9c78fddaba1800514e4948a15c043c9ca4e1d88c /netty
parent03a00aa8cf6e928a007a35a45f013e0a7c58ad07 (diff)
downloadgrpc-grpc-java-bdecdaea22ba4e56949be7534aa47306e5b5f758.tar.gz
netty: http2 server transport graceful shutdown sends 2 GOAWAYs
resolves #3442
Diffstat (limited to 'netty')
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyServerHandler.java133
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java238
2 files changed, 331 insertions, 40 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
index 21c1e6de7..47c628e0c 100644
--- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java
@@ -82,6 +82,7 @@ import io.netty.handler.logging.LogLevel;
import io.netty.util.AsciiString;
import io.netty.util.ReferenceCountUtil;
import java.util.List;
+import java.util.concurrent.Future;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.TimeUnit;
import java.util.logging.Level;
@@ -96,6 +97,8 @@ import javax.annotation.Nullable;
class NettyServerHandler extends AbstractNettyHandler {
private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName());
private static final long KEEPALIVE_PING = 0xDEADL;
+ private static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L;
+ private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10);
private final Http2Connection.PropertyKey streamKey;
private final ServerTransportListener transportListener;
@@ -121,6 +124,8 @@ class NettyServerHandler extends AbstractNettyHandler {
private MaxConnectionIdleManager maxConnectionIdleManager;
@CheckForNull
private ScheduledFuture<?> maxConnectionAgeMonitor;
+ @CheckForNull
+ private GracefulShutdown gracefulShutdown;
static NettyServerHandler newHandler(
ServerTransportListener transportListener,
@@ -250,17 +255,10 @@ class NettyServerHandler extends AbstractNettyHandler {
maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos) {
@Override
void close(ChannelHandlerContext ctx) {
- goAway(
- ctx,
- Integer.MAX_VALUE,
- Http2Error.NO_ERROR.code(),
- ByteBufUtil.writeAscii(ctx.alloc(), "max_idle"),
- ctx.newPromise());
- ctx.flush();
- try {
- NettyServerHandler.this.close(ctx, ctx.newPromise());
- } catch (Exception e) {
- onError(ctx, /* outbound= */ true, e);
+ if (gracefulShutdown == null) {
+ gracefulShutdown = new GracefulShutdown("max_idle", null);
+ gracefulShutdown.start(ctx);
+ ctx.flush();
}
}
};
@@ -321,25 +319,10 @@ class NettyServerHandler extends AbstractNettyHandler {
new LogExceptionRunnable(new Runnable() {
@Override
public void run() {
- // send GO_AWAY
- ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "max_age");
- goAway(
- ctx,
- Integer.MAX_VALUE,
- Http2Error.NO_ERROR.code(),
- debugData,
- ctx.newPromise());
-
- // gracefully shutdown with specified grace time
- long savedGracefulShutdownTime = gracefulShutdownTimeoutMillis();
- try {
- gracefulShutdownTimeoutMillis(
- TimeUnit.NANOSECONDS.toMillis(maxConnectionAgeGraceInNanos));
- close(ctx, ctx.newPromise());
- } catch (Exception e) {
- onError(ctx, /* outbound= */ true, e);
- } finally {
- gracefulShutdownTimeoutMillis(savedGracefulShutdownTime);
+ if (gracefulShutdown == null) {
+ gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos);
+ gracefulShutdown.start(ctx);
+ ctx.flush();
}
}
}),
@@ -787,6 +770,13 @@ class NettyServerHandler extends AbstractNettyHandler {
logger.log(Level.FINE, String.format("Window: %d",
decoder().flowController().initialWindowSize(connection().connectionStream())));
}
+ } else if (data == GRACEFUL_SHUTDOWN_PING) {
+ if (gracefulShutdown == null) {
+ // this should never happen
+ logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null");
+ } else {
+ gracefulShutdown.secondGoAwayAndClose(ctx);
+ }
} else if (data != KEEPALIVE_PING) {
logger.warning("Received unexpected ping ack. No ping outstanding");
}
@@ -803,7 +793,6 @@ class NettyServerHandler extends AbstractNettyHandler {
@Override
public void ping() {
ChannelFuture pingFuture = encoder().writePing(
- // slice KEEPALIVE_PING because tls handler may modify the reader index
ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise());
ctx.flush();
if (transportTracer != null) {
@@ -837,6 +826,88 @@ class NettyServerHandler extends AbstractNettyHandler {
}
}
+ private final class GracefulShutdown {
+ String goAwayMessage;
+
+ /**
+ * The grace time between starting graceful shutdown and closing the netty channel,
+ * {@code null} is unspecified.
+ */
+ @CheckForNull
+ Long graceTimeInNanos;
+
+ /**
+ * True if ping is Acked or ping is timeout.
+ */
+ boolean pingAckedOrTimeout;
+
+ Future<?> pingFuture;
+
+ GracefulShutdown(String goAwayMessage,
+ @Nullable Long graceTimeInNanos) {
+ this.goAwayMessage = goAwayMessage;
+ this.graceTimeInNanos = graceTimeInNanos;
+ }
+
+ /**
+ * Sends out first GOAWAY and ping, and schedules second GOAWAY and close.
+ */
+ void start(final ChannelHandlerContext ctx) {
+ goAway(
+ ctx,
+ Integer.MAX_VALUE,
+ Http2Error.NO_ERROR.code(),
+ ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
+ ctx.newPromise());
+
+ long gracefulShutdownPingTimeout = GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS;
+ pingFuture = ctx.executor().schedule(
+ new Runnable() {
+ @Override
+ public void run() {
+ secondGoAwayAndClose(ctx);
+ }
+ },
+ GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS,
+ TimeUnit.NANOSECONDS);
+
+ encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise());
+ }
+
+ void secondGoAwayAndClose(ChannelHandlerContext ctx) {
+ if (pingAckedOrTimeout) {
+ return;
+ }
+ pingAckedOrTimeout = true;
+
+ checkNotNull(pingFuture, "pingFuture");
+ pingFuture.cancel(false);
+
+ // send the second GOAWAY with last stream id
+ goAway(
+ ctx,
+ connection().remote().lastStreamCreated(),
+ Http2Error.NO_ERROR.code(),
+ ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage),
+ ctx.newPromise());
+
+ // gracefully shutdown with specified grace time
+ long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis();
+ long gracefulShutdownTimeoutMillis = savedGracefulShutdownTimeMillis;
+ if (graceTimeInNanos != null) {
+ gracefulShutdownTimeoutMillis = TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos);
+ }
+ try {
+ gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis);
+ close(ctx, ctx.newPromise());
+ } catch (Exception e) {
+ onError(ctx, /* outbound= */ true, e);
+ } finally {
+ gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis);
+ }
+ }
+ }
+
// Use a frame writer so that we know when frames are through flow control and actually being
// written.
private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter {
diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
index b53a69010..8be0ba6ec 100644
--- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java
@@ -697,24 +697,72 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
}
@Test
- public void maxConnectionIdle_goAwaySent() throws Exception {
+ public void maxConnectionIdle_goAwaySent_pingAck() throws Exception {
maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
assertTrue(channel().isOpen());
fakeClock().forwardNanos(maxConnectionIdleInNanos);
- // GO_AWAY sent
+ // first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
+ // ping sent
+ verifyWrite().writePing(
+ eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ assertTrue(channel().isOpen());
+
+ channelRead(pingFrame(true /* isAck */, 0x97ACEF001L));
+
+ // second GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@Test
- public void maxConnectionIdle_activeThenRst() throws Exception {
+ public void maxConnectionIdle_goAwaySent_pingTimeout() throws Exception {
+ maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
+ manualSetUp();
+ assertTrue(channel().isOpen());
+
+ fakeClock().forwardNanos(maxConnectionIdleInNanos);
+
+ // first GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ // ping sent
+ verifyWrite().writePing(
+ eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ assertTrue(channel().isOpen());
+
+ fakeClock().forwardTime(10, TimeUnit.SECONDS);
+
+ // second GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ // channel closed
+ assertTrue(!channel().isOpen());
+ }
+
+ @Test
+ public void maxConnectionIdle_activeThenRst_pingAck() throws Exception {
maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
createStream();
@@ -731,11 +779,64 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
fakeClock().forwardNanos(maxConnectionIdleInNanos);
- // GO_AWAY sent
+ // first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
+ // ping sent
+ verifyWrite().writePing(
+ eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ assertTrue(channel().isOpen());
+ fakeClock().forwardTime(10, TimeUnit.SECONDS);
+
+ // second GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ // channel closed
+ assertTrue(!channel().isOpen());
+ }
+
+ @Test
+ public void maxConnectionIdle_activeThenRst_pingTimeoutk() throws Exception {
+ maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
+ manualSetUp();
+ createStream();
+
+ fakeClock().forwardNanos(maxConnectionIdleInNanos);
+
+ // GO_AWAY not sent when active
+ verifyWrite(never()).writeGoAway(
+ any(ChannelHandlerContext.class), any(Integer.class), any(Long.class), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ assertTrue(channel().isOpen());
+
+ channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code()));
+
+ fakeClock().forwardNanos(maxConnectionIdleInNanos);
+
+ // first GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ // ping sent
+ verifyWrite().writePing(
+ eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ assertTrue(channel().isOpen());
+
+ channelRead(pingFrame(true /* isAck */, 0x97ACEF001L));
+
+ // second GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@@ -755,18 +856,68 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
}
@Test
- public void maxConnectionAge_goAwaySent() throws Exception {
+ public void maxConnectionAge_goAwaySent_pingAck() throws Exception {
+
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
manualSetUp();
assertTrue(channel().isOpen());
fakeClock().forwardNanos(maxConnectionAgeInNanos);
- // GO_AWAY sent
+ // first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
+ // ping sent
+ verifyWrite().writePing(
+ eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+
+ channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ assertTrue(channel().isOpen());
+
+ channelRead(pingFrame(true /* isAck */, 0x97ACEF001L));
+
+ // second GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ // channel closed
+ assertTrue(!channel().isOpen());
+ }
+
+ @Test
+ public void maxConnectionAge_goAwaySent_pingTimeout() throws Exception {
+
+ maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
+ manualSetUp();
+ assertTrue(channel().isOpen());
+
+ fakeClock().forwardNanos(maxConnectionAgeInNanos);
+ // first GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ // ping sent
+ verifyWrite().writePing(
+ eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ assertTrue(channel().isOpen());
+
+ fakeClock().forwardTime(10, TimeUnit.SECONDS);
+
+ // second GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
// channel closed
assertTrue(!channel().isOpen());
}
@@ -780,31 +931,100 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand
fakeClock().forwardNanos(maxConnectionAgeInNanos);
+ // first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
+ // ping sent
+ verifyWrite().writePing(
+ eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
fakeClock().forwardTime(20, TimeUnit.MINUTES);
+ // second GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
// channel not closed yet
assertTrue(channel().isOpen());
}
@Test
- public void maxConnectionAgeGrace_channelClosedAfterGracePeriod() throws Exception {
+ public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingTimeout()
+ throws Exception {
maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
- maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L);
+ maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); // greater than ping timeout
+ manualSetUp();
+ createStream();
+
+ fakeClock().forwardNanos(maxConnectionAgeInNanos);
+
+ // first GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+ // ping sent
+ verifyWrite().writePing(
+ eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+
+ fakeClock().forwardNanos(TimeUnit.SECONDS.toNanos(10));
+
+ // second GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+
+ fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - 2);
+
+ assertTrue(channel().isOpen());
+
+ fakeClock().forwardTime(2, TimeUnit.MILLISECONDS);
+
+ // channel closed
+ assertTrue(!channel().isOpen());
+ }
+
+ @Test
+ public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingAck()
+ throws Exception {
+ maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L);
+ maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); // greater than ping timeout
manualSetUp();
createStream();
fakeClock().forwardNanos(maxConnectionAgeInNanos);
+ // first GO_AWAY sent
verifyWrite().writeGoAway(
eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
any(ChannelPromise.class));
+ // ping sent
+ verifyWrite().writePing(
+ eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class));
+ verifyWrite(never()).writeGoAway(
+ eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+
+ long pingRoundTripMillis = 100; // less than ping timeout
+ fakeClock().forwardTime(pingRoundTripMillis, TimeUnit.MILLISECONDS);
+ channelRead(pingFrame(true /* isAck */, 0x97ACEF001L));
+
+ // second GO_AWAY sent
+ verifyWrite().writeGoAway(
+ eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class),
+ any(ChannelPromise.class));
+
+ fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - TimeUnit.MILLISECONDS.toNanos(2));
+
assertTrue(channel().isOpen());
- fakeClock().forwardNanos(maxConnectionAgeGraceInNanos);
+ fakeClock().forwardTime(2, TimeUnit.MILLISECONDS);
// channel closed
assertTrue(!channel().isOpen());