diff options
author | Eric Gribkoff <ericgribkoff@google.com> | 2018-03-22 12:38:42 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-22 12:38:42 -0700 |
commit | 5337728fa7b36c79664759ff408d3469c101e324 (patch) | |
tree | 0afece5416d2e8db406369acc84af4a6c2b31486 /netty | |
parent | ca55b6f7e7ea8521543c36e062e62e394ff14716 (diff) | |
download | grpc-grpc-java-5337728fa7b36c79664759ff408d3469c101e324.tar.gz |
core,netty: client sends rst stream when server half-closes (#4222)
Diffstat (limited to 'netty')
4 files changed, 35 insertions, 5 deletions
diff --git a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java index 9ea2725cd..f13863e25 100644 --- a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java +++ b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java @@ -18,18 +18,19 @@ package io.grpc.netty; import com.google.common.base.Preconditions; import io.grpc.Status; +import javax.annotation.Nullable; /** * Command sent from a Netty client stream to the handler to cancel the stream. */ class CancelClientStreamCommand extends WriteQueue.AbstractQueuedCommand { private final NettyClientStream.TransportState stream; - private final Status reason; + @Nullable private final Status reason; CancelClientStreamCommand(NettyClientStream.TransportState stream, Status reason) { this.stream = Preconditions.checkNotNull(stream, "stream"); - Preconditions.checkNotNull(reason, "reason"); - Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status"); + Preconditions.checkArgument( + reason == null || !reason.isOk(), "Should not cancel with OK status"); this.reason = reason; } @@ -37,6 +38,7 @@ class CancelClientStreamCommand extends WriteQueue.AbstractQueuedCommand { return stream; } + @Nullable Status reason() { return reason; } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java index 552d174bb..fd16746b4 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java @@ -524,7 +524,10 @@ class NettyClientHandler extends AbstractNettyHandler { private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd, ChannelPromise promise) { NettyClientStream.TransportState stream = cmd.stream(); - stream.transportReportStatus(cmd.reason(), true, new Metadata()); + Status reason = cmd.reason(); + if (reason != null) { + stream.transportReportStatus(reason, true, new Metadata()); + } encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise); } diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java index 0e3ecfc87..e569ca9d8 100644 --- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java +++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java @@ -293,6 +293,9 @@ class NettyClientStream extends AbstractClientStream { void transportHeadersReceived(Http2Headers headers, boolean endOfStream) { if (endOfStream) { + if (!isOutboundClosed()) { + handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, null), true); + } transportTrailersReceived(Utils.convertTrailers(headers)); } else { transportHeadersReceived(Utils.convertHeaders(headers)); diff --git a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java index fe6731323..4ff3b4f64 100644 --- a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java +++ b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java @@ -262,6 +262,28 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream } @Test + public void inboundTrailersBeforeHalfCloseSendsRstStream() { + stream().transportState().setId(STREAM_ID); + stream().transportState().transportHeadersReceived(grpcResponseHeaders(), false); + stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true); + + // Verify a cancel stream with reason=null is sent to the handler. + ArgumentCaptor<CancelClientStreamCommand> captor = ArgumentCaptor + .forClass(CancelClientStreamCommand.class); + verify(writeQueue).enqueue(captor.capture(), eq(true)); + assertNull(captor.getValue().reason()); + } + + @Test + public void inboundTrailersAfterHalfCloseDoesNotSendRstStream() { + stream().transportState().setId(STREAM_ID); + stream().transportState().transportHeadersReceived(grpcResponseHeaders(), false); + stream.halfClose(); + stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true); + verify(writeQueue, never()).enqueue(isA(CancelClientStreamCommand.class), eq(true)); + } + + @Test public void inboundStatusShouldSetStatus() throws Exception { stream().transportState().setId(STREAM_ID); @@ -293,7 +315,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream stream().transportState().transportDataReceived(Unpooled.buffer(1000).writeZero(1000), false); // Now verify that cancel is sent and an error is reported to the listener - verify(writeQueue).enqueue(any(CancelClientStreamCommand.class), eq(true)); + verify(writeQueue).enqueue(isA(CancelClientStreamCommand.class), eq(true)); ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class); ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class); verify(listener).closed(captor.capture(), same(PROCESSED), metadataCaptor.capture()); |