diff options
author | ZHANG Dapeng <zdapeng@google.com> | 2018-03-28 15:58:31 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-28 15:58:31 -0700 |
commit | bdecdaea22ba4e56949be7534aa47306e5b5f758 (patch) | |
tree | 9c78fddaba1800514e4948a15c043c9ca4e1d88c /netty | |
parent | 03a00aa8cf6e928a007a35a45f013e0a7c58ad07 (diff) | |
download | grpc-grpc-java-bdecdaea22ba4e56949be7534aa47306e5b5f758.tar.gz |
netty: http2 server transport graceful shutdown sends 2 GOAWAYs
resolves #3442
Diffstat (limited to 'netty')
-rw-r--r-- | netty/src/main/java/io/grpc/netty/NettyServerHandler.java | 133 | ||||
-rw-r--r-- | netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java | 238 |
2 files changed, 331 insertions, 40 deletions
diff --git a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java index 21c1e6de7..47c628e0c 100644 --- a/netty/src/main/java/io/grpc/netty/NettyServerHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyServerHandler.java @@ -82,6 +82,7 @@ import io.netty.handler.logging.LogLevel; import io.netty.util.AsciiString; import io.netty.util.ReferenceCountUtil; import java.util.List; +import java.util.concurrent.Future; import java.util.concurrent.ScheduledFuture; import java.util.concurrent.TimeUnit; import java.util.logging.Level; @@ -96,6 +97,8 @@ import javax.annotation.Nullable; class NettyServerHandler extends AbstractNettyHandler { private static final Logger logger = Logger.getLogger(NettyServerHandler.class.getName()); private static final long KEEPALIVE_PING = 0xDEADL; + private static final long GRACEFUL_SHUTDOWN_PING = 0x97ACEF001L; + private static final long GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS = TimeUnit.SECONDS.toNanos(10); private final Http2Connection.PropertyKey streamKey; private final ServerTransportListener transportListener; @@ -121,6 +124,8 @@ class NettyServerHandler extends AbstractNettyHandler { private MaxConnectionIdleManager maxConnectionIdleManager; @CheckForNull private ScheduledFuture<?> maxConnectionAgeMonitor; + @CheckForNull + private GracefulShutdown gracefulShutdown; static NettyServerHandler newHandler( ServerTransportListener transportListener, @@ -250,17 +255,10 @@ class NettyServerHandler extends AbstractNettyHandler { maxConnectionIdleManager = new MaxConnectionIdleManager(maxConnectionIdleInNanos) { @Override void close(ChannelHandlerContext ctx) { - goAway( - ctx, - Integer.MAX_VALUE, - Http2Error.NO_ERROR.code(), - ByteBufUtil.writeAscii(ctx.alloc(), "max_idle"), - ctx.newPromise()); - ctx.flush(); - try { - NettyServerHandler.this.close(ctx, ctx.newPromise()); - } catch (Exception e) { - onError(ctx, /* outbound= */ true, e); + if (gracefulShutdown == null) { + gracefulShutdown = new GracefulShutdown("max_idle", null); + gracefulShutdown.start(ctx); + ctx.flush(); } } }; @@ -321,25 +319,10 @@ class NettyServerHandler extends AbstractNettyHandler { new LogExceptionRunnable(new Runnable() { @Override public void run() { - // send GO_AWAY - ByteBuf debugData = ByteBufUtil.writeAscii(ctx.alloc(), "max_age"); - goAway( - ctx, - Integer.MAX_VALUE, - Http2Error.NO_ERROR.code(), - debugData, - ctx.newPromise()); - - // gracefully shutdown with specified grace time - long savedGracefulShutdownTime = gracefulShutdownTimeoutMillis(); - try { - gracefulShutdownTimeoutMillis( - TimeUnit.NANOSECONDS.toMillis(maxConnectionAgeGraceInNanos)); - close(ctx, ctx.newPromise()); - } catch (Exception e) { - onError(ctx, /* outbound= */ true, e); - } finally { - gracefulShutdownTimeoutMillis(savedGracefulShutdownTime); + if (gracefulShutdown == null) { + gracefulShutdown = new GracefulShutdown("max_age", maxConnectionAgeGraceInNanos); + gracefulShutdown.start(ctx); + ctx.flush(); } } }), @@ -787,6 +770,13 @@ class NettyServerHandler extends AbstractNettyHandler { logger.log(Level.FINE, String.format("Window: %d", decoder().flowController().initialWindowSize(connection().connectionStream()))); } + } else if (data == GRACEFUL_SHUTDOWN_PING) { + if (gracefulShutdown == null) { + // this should never happen + logger.warning("Received GRACEFUL_SHUTDOWN_PING Ack but gracefulShutdown is null"); + } else { + gracefulShutdown.secondGoAwayAndClose(ctx); + } } else if (data != KEEPALIVE_PING) { logger.warning("Received unexpected ping ack. No ping outstanding"); } @@ -803,7 +793,6 @@ class NettyServerHandler extends AbstractNettyHandler { @Override public void ping() { ChannelFuture pingFuture = encoder().writePing( - // slice KEEPALIVE_PING because tls handler may modify the reader index ctx, false /* isAck */, KEEPALIVE_PING, ctx.newPromise()); ctx.flush(); if (transportTracer != null) { @@ -837,6 +826,88 @@ class NettyServerHandler extends AbstractNettyHandler { } } + private final class GracefulShutdown { + String goAwayMessage; + + /** + * The grace time between starting graceful shutdown and closing the netty channel, + * {@code null} is unspecified. + */ + @CheckForNull + Long graceTimeInNanos; + + /** + * True if ping is Acked or ping is timeout. + */ + boolean pingAckedOrTimeout; + + Future<?> pingFuture; + + GracefulShutdown(String goAwayMessage, + @Nullable Long graceTimeInNanos) { + this.goAwayMessage = goAwayMessage; + this.graceTimeInNanos = graceTimeInNanos; + } + + /** + * Sends out first GOAWAY and ping, and schedules second GOAWAY and close. + */ + void start(final ChannelHandlerContext ctx) { + goAway( + ctx, + Integer.MAX_VALUE, + Http2Error.NO_ERROR.code(), + ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage), + ctx.newPromise()); + + long gracefulShutdownPingTimeout = GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS; + pingFuture = ctx.executor().schedule( + new Runnable() { + @Override + public void run() { + secondGoAwayAndClose(ctx); + } + }, + GRACEFUL_SHUTDOWN_PING_TIMEOUT_NANOS, + TimeUnit.NANOSECONDS); + + encoder().writePing(ctx, false /* isAck */, GRACEFUL_SHUTDOWN_PING, ctx.newPromise()); + } + + void secondGoAwayAndClose(ChannelHandlerContext ctx) { + if (pingAckedOrTimeout) { + return; + } + pingAckedOrTimeout = true; + + checkNotNull(pingFuture, "pingFuture"); + pingFuture.cancel(false); + + // send the second GOAWAY with last stream id + goAway( + ctx, + connection().remote().lastStreamCreated(), + Http2Error.NO_ERROR.code(), + ByteBufUtil.writeAscii(ctx.alloc(), goAwayMessage), + ctx.newPromise()); + + // gracefully shutdown with specified grace time + long savedGracefulShutdownTimeMillis = gracefulShutdownTimeoutMillis(); + long gracefulShutdownTimeoutMillis = savedGracefulShutdownTimeMillis; + if (graceTimeInNanos != null) { + gracefulShutdownTimeoutMillis = TimeUnit.NANOSECONDS.toMillis(graceTimeInNanos); + } + try { + gracefulShutdownTimeoutMillis(gracefulShutdownTimeoutMillis); + close(ctx, ctx.newPromise()); + } catch (Exception e) { + onError(ctx, /* outbound= */ true, e); + } finally { + gracefulShutdownTimeoutMillis(savedGracefulShutdownTimeMillis); + } + } + } + // Use a frame writer so that we know when frames are through flow control and actually being // written. private static class WriteMonitoringFrameWriter extends DecoratingHttp2FrameWriter { diff --git a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java index b53a69010..8be0ba6ec 100644 --- a/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyServerHandlerTest.java @@ -697,24 +697,72 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand } @Test - public void maxConnectionIdle_goAwaySent() throws Exception { + public void maxConnectionIdle_goAwaySent_pingAck() throws Exception { maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L); manualSetUp(); assertTrue(channel().isOpen()); fakeClock().forwardNanos(maxConnectionIdleInNanos); - // GO_AWAY sent + // first GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + channelRead(pingFrame(true /* isAck */, 0x97ACEF001L)); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); } @Test - public void maxConnectionIdle_activeThenRst() throws Exception { + public void maxConnectionIdle_goAwaySent_pingTimeout() throws Exception { + maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L); + manualSetUp(); + assertTrue(channel().isOpen()); + + fakeClock().forwardNanos(maxConnectionIdleInNanos); + + // first GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + fakeClock().forwardTime(10, TimeUnit.SECONDS); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // channel closed + assertTrue(!channel().isOpen()); + } + + @Test + public void maxConnectionIdle_activeThenRst_pingAck() throws Exception { maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L); manualSetUp(); createStream(); @@ -731,11 +779,64 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand fakeClock().forwardNanos(maxConnectionIdleInNanos); - // GO_AWAY sent + // first GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + fakeClock().forwardTime(10, TimeUnit.SECONDS); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // channel closed + assertTrue(!channel().isOpen()); + } + + @Test + public void maxConnectionIdle_activeThenRst_pingTimeoutk() throws Exception { + maxConnectionIdleInNanos = TimeUnit.MILLISECONDS.toNanos(10L); + manualSetUp(); + createStream(); + + fakeClock().forwardNanos(maxConnectionIdleInNanos); + + // GO_AWAY not sent when active + verifyWrite(never()).writeGoAway( + any(ChannelHandlerContext.class), any(Integer.class), any(Long.class), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + channelRead(rstStreamFrame(STREAM_ID, (int) Http2Error.CANCEL.code())); + + fakeClock().forwardNanos(maxConnectionIdleInNanos); + + // first GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + channelRead(pingFrame(true /* isAck */, 0x97ACEF001L)); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); } @@ -755,18 +856,68 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand } @Test - public void maxConnectionAge_goAwaySent() throws Exception { + public void maxConnectionAge_goAwaySent_pingAck() throws Exception { + maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); manualSetUp(); assertTrue(channel().isOpen()); fakeClock().forwardNanos(maxConnectionAgeInNanos); - // GO_AWAY sent + // first GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + + channelRead(pingFrame(true /* isAck */, 0xDEADL)); // irrelevant ping Ack + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + channelRead(pingFrame(true /* isAck */, 0x97ACEF001L)); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // channel closed + assertTrue(!channel().isOpen()); + } + + @Test + public void maxConnectionAge_goAwaySent_pingTimeout() throws Exception { + + maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); + manualSetUp(); + assertTrue(channel().isOpen()); + + fakeClock().forwardNanos(maxConnectionAgeInNanos); + // first GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + assertTrue(channel().isOpen()); + + fakeClock().forwardTime(10, TimeUnit.SECONDS); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(0), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); // channel closed assertTrue(!channel().isOpen()); } @@ -780,31 +931,100 @@ public class NettyServerHandlerTest extends NettyHandlerTestBase<NettyServerHand fakeClock().forwardNanos(maxConnectionAgeInNanos); + // first GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); fakeClock().forwardTime(20, TimeUnit.MINUTES); + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); // channel not closed yet assertTrue(channel().isOpen()); } @Test - public void maxConnectionAgeGrace_channelClosedAfterGracePeriod() throws Exception { + public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingTimeout() + throws Exception { maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); - maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); + maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); // greater than ping timeout + manualSetUp(); + createStream(); + + fakeClock().forwardNanos(maxConnectionAgeInNanos); + + // first GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + + fakeClock().forwardNanos(TimeUnit.SECONDS.toNanos(10)); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + + fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - 2); + + assertTrue(channel().isOpen()); + + fakeClock().forwardTime(2, TimeUnit.MILLISECONDS); + + // channel closed + assertTrue(!channel().isOpen()); + } + + @Test + public void maxConnectionAgeGrace_channelClosedAfterGracePeriod_withPingAck() + throws Exception { + maxConnectionAgeInNanos = TimeUnit.MILLISECONDS.toNanos(10L); + maxConnectionAgeGraceInNanos = TimeUnit.MINUTES.toNanos(30L); // greater than ping timeout manualSetUp(); createStream(); fakeClock().forwardNanos(maxConnectionAgeInNanos); + // first GO_AWAY sent verifyWrite().writeGoAway( eq(ctx()), eq(Integer.MAX_VALUE), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), any(ChannelPromise.class)); + // ping sent + verifyWrite().writePing( + eq(ctx()), eq(false), eq(0x97ACEF001L), any(ChannelPromise.class)); + verifyWrite(never()).writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + + long pingRoundTripMillis = 100; // less than ping timeout + fakeClock().forwardTime(pingRoundTripMillis, TimeUnit.MILLISECONDS); + channelRead(pingFrame(true /* isAck */, 0x97ACEF001L)); + + // second GO_AWAY sent + verifyWrite().writeGoAway( + eq(ctx()), eq(STREAM_ID), eq(Http2Error.NO_ERROR.code()), any(ByteBuf.class), + any(ChannelPromise.class)); + + fakeClock().forwardNanos(maxConnectionAgeGraceInNanos - TimeUnit.MILLISECONDS.toNanos(2)); + assertTrue(channel().isOpen()); - fakeClock().forwardNanos(maxConnectionAgeGraceInNanos); + fakeClock().forwardTime(2, TimeUnit.MILLISECONDS); // channel closed assertTrue(!channel().isOpen()); |