aboutsummaryrefslogtreecommitdiff
path: root/netty
diff options
context:
space:
mode:
authorEric Gribkoff <ericgribkoff@google.com>2018-03-22 12:38:42 -0700
committerGitHub <noreply@github.com>2018-03-22 12:38:42 -0700
commit5337728fa7b36c79664759ff408d3469c101e324 (patch)
tree0afece5416d2e8db406369acc84af4a6c2b31486 /netty
parentca55b6f7e7ea8521543c36e062e62e394ff14716 (diff)
downloadgrpc-grpc-java-5337728fa7b36c79664759ff408d3469c101e324.tar.gz
core,netty: client sends rst stream when server half-closes (#4222)
Diffstat (limited to 'netty')
-rw-r--r--netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java8
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientHandler.java5
-rw-r--r--netty/src/main/java/io/grpc/netty/NettyClientStream.java3
-rw-r--r--netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java24
4 files changed, 35 insertions, 5 deletions
diff --git a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java
index 9ea2725cd..f13863e25 100644
--- a/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java
+++ b/netty/src/main/java/io/grpc/netty/CancelClientStreamCommand.java
@@ -18,18 +18,19 @@ package io.grpc.netty;
import com.google.common.base.Preconditions;
import io.grpc.Status;
+import javax.annotation.Nullable;
/**
* Command sent from a Netty client stream to the handler to cancel the stream.
*/
class CancelClientStreamCommand extends WriteQueue.AbstractQueuedCommand {
private final NettyClientStream.TransportState stream;
- private final Status reason;
+ @Nullable private final Status reason;
CancelClientStreamCommand(NettyClientStream.TransportState stream, Status reason) {
this.stream = Preconditions.checkNotNull(stream, "stream");
- Preconditions.checkNotNull(reason, "reason");
- Preconditions.checkArgument(!reason.isOk(), "Should not cancel with OK status");
+ Preconditions.checkArgument(
+ reason == null || !reason.isOk(), "Should not cancel with OK status");
this.reason = reason;
}
@@ -37,6 +38,7 @@ class CancelClientStreamCommand extends WriteQueue.AbstractQueuedCommand {
return stream;
}
+ @Nullable
Status reason() {
return reason;
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
index 552d174bb..fd16746b4 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientHandler.java
@@ -524,7 +524,10 @@ class NettyClientHandler extends AbstractNettyHandler {
private void cancelStream(ChannelHandlerContext ctx, CancelClientStreamCommand cmd,
ChannelPromise promise) {
NettyClientStream.TransportState stream = cmd.stream();
- stream.transportReportStatus(cmd.reason(), true, new Metadata());
+ Status reason = cmd.reason();
+ if (reason != null) {
+ stream.transportReportStatus(reason, true, new Metadata());
+ }
encoder().writeRstStream(ctx, stream.id(), Http2Error.CANCEL.code(), promise);
}
diff --git a/netty/src/main/java/io/grpc/netty/NettyClientStream.java b/netty/src/main/java/io/grpc/netty/NettyClientStream.java
index 0e3ecfc87..e569ca9d8 100644
--- a/netty/src/main/java/io/grpc/netty/NettyClientStream.java
+++ b/netty/src/main/java/io/grpc/netty/NettyClientStream.java
@@ -293,6 +293,9 @@ class NettyClientStream extends AbstractClientStream {
void transportHeadersReceived(Http2Headers headers, boolean endOfStream) {
if (endOfStream) {
+ if (!isOutboundClosed()) {
+ handler.getWriteQueue().enqueue(new CancelClientStreamCommand(this, null), true);
+ }
transportTrailersReceived(Utils.convertTrailers(headers));
} else {
transportHeadersReceived(Utils.convertHeaders(headers));
diff --git a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
index fe6731323..4ff3b4f64 100644
--- a/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
+++ b/netty/src/test/java/io/grpc/netty/NettyClientStreamTest.java
@@ -262,6 +262,28 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
}
@Test
+ public void inboundTrailersBeforeHalfCloseSendsRstStream() {
+ stream().transportState().setId(STREAM_ID);
+ stream().transportState().transportHeadersReceived(grpcResponseHeaders(), false);
+ stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true);
+
+ // Verify a cancel stream with reason=null is sent to the handler.
+ ArgumentCaptor<CancelClientStreamCommand> captor = ArgumentCaptor
+ .forClass(CancelClientStreamCommand.class);
+ verify(writeQueue).enqueue(captor.capture(), eq(true));
+ assertNull(captor.getValue().reason());
+ }
+
+ @Test
+ public void inboundTrailersAfterHalfCloseDoesNotSendRstStream() {
+ stream().transportState().setId(STREAM_ID);
+ stream().transportState().transportHeadersReceived(grpcResponseHeaders(), false);
+ stream.halfClose();
+ stream().transportState().transportHeadersReceived(grpcResponseTrailers(Status.OK), true);
+ verify(writeQueue, never()).enqueue(isA(CancelClientStreamCommand.class), eq(true));
+ }
+
+ @Test
public void inboundStatusShouldSetStatus() throws Exception {
stream().transportState().setId(STREAM_ID);
@@ -293,7 +315,7 @@ public class NettyClientStreamTest extends NettyStreamTestBase<NettyClientStream
stream().transportState().transportDataReceived(Unpooled.buffer(1000).writeZero(1000), false);
// Now verify that cancel is sent and an error is reported to the listener
- verify(writeQueue).enqueue(any(CancelClientStreamCommand.class), eq(true));
+ verify(writeQueue).enqueue(isA(CancelClientStreamCommand.class), eq(true));
ArgumentCaptor<Status> captor = ArgumentCaptor.forClass(Status.class);
ArgumentCaptor<Metadata> metadataCaptor = ArgumentCaptor.forClass(Metadata.class);
verify(listener).closed(captor.capture(), same(PROCESSED), metadataCaptor.capture());