diff options
author | ZHANG Dapeng <zdapeng@google.com> | 2018-03-12 14:12:46 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-03-12 14:12:46 -0700 |
commit | a83f67a706fdf9cd5b5c5026f1c413fbad1af839 (patch) | |
tree | 48e522d792b6cacc93becb5746c233301495458d /okhttp | |
parent | c6fe4deb33686a7f0e65be6f3f509768ce5d04a6 (diff) | |
download | grpc-grpc-java-a83f67a706fdf9cd5b5c5026f1c413fbad1af839.tar.gz |
core,netty,okhttp: Transparent retry
Changes:
- `ClientStreamListener.onClose(Status status, RpcProgress rpcProgress, Metadata trailers)` added.
- `AbstractClientStream.transportReportStatus(Status status, RpcProgress rpcProgress, boolean stopDelivery, Metadata trailers)` added
- `ClientCallImpl.ClientStreamListenerImpl` will ignore the arg `rpcProgress` (non retry)
- `RetriableStream.SubListener` will handle `rpcProgress` and decide if transparent retry.
- `NettyClientHandler` and `OkHttpClientTransport` will pass `RpcProgress.REFUSED` to client stream listener for later stream ids when received GOAWAY, or for stream received a RST_STREAM frame with REFUSED code.
- All other files are just a result of refactoring.
Diffstat (limited to 'okhttp')
4 files changed, 127 insertions, 25 deletions
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java index 60bdcf733..4cb6153a9 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java @@ -18,6 +18,7 @@ package io.grpc.okhttp; import static com.google.common.base.Preconditions.checkNotNull; import static com.google.common.base.Preconditions.checkState; +import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; import com.google.common.io.BaseEncoding; import io.grpc.Attributes; @@ -307,8 +308,11 @@ class OkHttpClientStream extends AbstractClientStream { window -= length; if (window < 0) { frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR); - transport.finishStream(id(), Status.INTERNAL.withDescription( - "Received data size exceeded our receiving window size"), false, null, null); + transport.finishStream( + id(), + Status.INTERNAL.withDescription( + "Received data size exceeded our receiving window size"), + PROCESSED, false, null, null); return; } super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream); @@ -319,9 +323,9 @@ class OkHttpClientStream extends AbstractClientStream { if (!framer().isClosed()) { // If server's end-of-stream is received before client sends end-of-stream, we just send a // reset to server to fully close the server side stream. - transport.finishStream(id(), null, false, ErrorCode.CANCEL, null); + transport.finishStream(id(),null, PROCESSED, false, ErrorCode.CANCEL, null); } else { - transport.finishStream(id(), null, false, null, null); + transport.finishStream(id(), null, PROCESSED, false, null, null); } } @@ -344,7 +348,8 @@ class OkHttpClientStream extends AbstractClientStream { } else { // If pendingData is null, start must have already been called, which means synStream has // been called as well. - transport.finishStream(id(), reason, stopDelivery, ErrorCode.CANCEL, trailers); + transport.finishStream( + id(), reason, PROCESSED, stopDelivery, ErrorCode.CANCEL, trailers); } } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 5a0a70d10..a1fc9fa1c 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -39,6 +39,7 @@ import io.grpc.Status.Code; import io.grpc.StatusException; import io.grpc.internal.Channelz.Security; import io.grpc.internal.Channelz.SocketStats; +import io.grpc.internal.ClientStreamListener.RpcProgress; import io.grpc.internal.ConnectionClientTransport; import io.grpc.internal.GrpcUtil; import io.grpc.internal.Http2Ping; @@ -733,12 +734,14 @@ class OkHttpClientTransport implements ConnectionClientTransport { Map.Entry<Integer, OkHttpClientStream> entry = it.next(); if (entry.getKey() > lastKnownStreamId) { it.remove(); - entry.getValue().transportState().transportReportStatus(status, false, new Metadata()); + entry.getValue().transportState().transportReportStatus( + status, RpcProgress.REFUSED, false, new Metadata()); } } for (OkHttpClientStream stream : pendingStreams) { - stream.transportState().transportReportStatus(status, true, new Metadata()); + stream.transportState().transportReportStatus( + status, RpcProgress.REFUSED, true, new Metadata()); } pendingStreams.clear(); maybeClearInUse(); @@ -765,6 +768,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { void finishStream( int streamId, @Nullable Status status, + RpcProgress rpcProgress, boolean stopDelivery, @Nullable ErrorCode errorCode, @Nullable Metadata trailers) { @@ -779,6 +783,7 @@ class OkHttpClientTransport implements ConnectionClientTransport { .transportState() .transportReportStatus( status, + rpcProgress, stopDelivery, trailers != null ? trailers : new Metadata()); } @@ -1020,7 +1025,10 @@ class OkHttpClientTransport implements ConnectionClientTransport { Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream"); boolean stopDelivery = (status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED); - finishStream(streamId, status, stopDelivery, null, null); + finishStream( + streamId, status, + errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED, + stopDelivery, null, null); } @Override @@ -1112,8 +1120,9 @@ class OkHttpClientTransport implements ConnectionClientTransport { if (streamId == 0) { onError(ErrorCode.PROTOCOL_ERROR, errorMsg); } else { - finishStream(streamId, - Status.INTERNAL.withDescription(errorMsg), false, ErrorCode.PROTOCOL_ERROR, null); + finishStream( + streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false, + ErrorCode.PROTOCOL_ERROR, null); } return; } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java index 007e35f64..79340a716 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java @@ -17,6 +17,7 @@ package io.grpc.okhttp; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; @@ -30,8 +31,8 @@ import io.grpc.Metadata; import io.grpc.MethodDescriptor; import io.grpc.MethodDescriptor.MethodType; import io.grpc.Status; -import io.grpc.internal.ClientStreamListener; import io.grpc.internal.GrpcUtil; +import io.grpc.internal.NoopClientStreamListener; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; import io.grpc.okhttp.internal.framed.ErrorCode; @@ -102,7 +103,8 @@ public class OkHttpClientStreamTest { final AtomicReference<Status> statusRef = new AtomicReference<Status>(); stream.start(new BaseClientStreamListener() { @Override - public void closed(Status status, Metadata trailers) { + public void closed( + Status status, RpcProgress rpcProgress, Metadata trailers) { statusRef.set(status); assertTrue(Thread.holdsLock(lock)); } @@ -123,11 +125,12 @@ public class OkHttpClientStreamTest { assertTrue(Thread.holdsLock(lock)); return null; } - }).when(transport).finishStream(1234, Status.CANCELLED, true, ErrorCode.CANCEL, null); + }).when(transport).finishStream( + 1234, Status.CANCELLED, PROCESSED, true, ErrorCode.CANCEL, null); stream.cancel(Status.CANCELLED); - verify(transport).finishStream(1234, Status.CANCELLED, true, ErrorCode.CANCEL, null); + verify(transport).finishStream(1234, Status.CANCELLED, PROCESSED,true, ErrorCode.CANCEL, null); } @Test @@ -213,20 +216,12 @@ public class OkHttpClientStreamTest { // TODO(carl-mastrangelo): extract this out into a testing/ directory and remove other definitions // of it. - private static class BaseClientStreamListener implements ClientStreamListener { - @Override - public void onReady() {} + private static class BaseClientStreamListener extends NoopClientStreamListener { @Override public void messagesAvailable(MessageProducer producer) { while (producer.next() != null) {} } - - @Override - public void headersRead(Metadata headers) {} - - @Override - public void closed(Status status, Metadata trailers) {} } } diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index b9d706177..bac6a8e67 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -18,6 +18,8 @@ package io.grpc.okhttp; import static com.google.common.base.Charsets.UTF_8; import static com.google.common.truth.Truth.assertThat; +import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED; +import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED; import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE; import static io.grpc.okhttp.Headers.CONTENT_TYPE_HEADER; import static io.grpc.okhttp.Headers.METHOD_HEADER; @@ -128,6 +130,7 @@ public class OkHttpClientTransportTest { private static final ProxyParameters NO_PROXY = null; private static final String NO_USER = null; private static final String NO_PW = null; + private static final int DEFAULT_START_STREAM_ID = 3; @Rule public final Timeout globalTimeout = Timeout.seconds(10); @@ -168,7 +171,7 @@ public class OkHttpClientTransportTest { } private void initTransport() throws Exception { - startTransport(3, null, true, DEFAULT_MAX_MESSAGE_SIZE, null); + startTransport(DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, null); } private void initTransport(int startId) throws Exception { @@ -177,7 +180,8 @@ public class OkHttpClientTransportTest { private void initTransportAndDelayConnected() throws Exception { delayConnectedCallback = new DelayConnectedCallback(); - startTransport(3, delayConnectedCallback, false, DEFAULT_MAX_MESSAGE_SIZE, null); + startTransport( + DEFAULT_START_STREAM_ID, delayConnectedCallback, false, DEFAULT_MAX_MESSAGE_SIZE, null); } private void startTransport(int startId, @Nullable Runnable connectingCallback, @@ -1663,6 +1667,88 @@ public class OkHttpClientTransportTest { shutdownAndVerify(); } + @Test + public void goAway_streamListenerRpcProgress() throws Exception { + initTransport(); + setMaxConcurrentStreams(2); + MockStreamListener listener1 = new MockStreamListener(); + MockStreamListener listener2 = new MockStreamListener(); + MockStreamListener listener3 = new MockStreamListener(); + OkHttpClientStream stream1 = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream1.start(listener1); + OkHttpClientStream stream2 = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream2.start(listener2); + OkHttpClientStream stream3 = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream3.start(listener3); + waitForStreamPending(1); + + assertEquals(2, activeStreamCount()); + assertContainStream(DEFAULT_START_STREAM_ID); + assertContainStream(DEFAULT_START_STREAM_ID + 2); + + frameHandler() + .goAway(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL, ByteString.encodeUtf8("blablabla")); + + listener2.waitUntilStreamClosed(); + listener3.waitUntilStreamClosed(); + assertNull(listener1.rpcProgress); + assertEquals(REFUSED, listener2.rpcProgress); + assertEquals(REFUSED, listener3.rpcProgress); + assertEquals(1, activeStreamCount()); + assertContainStream(DEFAULT_START_STREAM_ID); + + getStream(DEFAULT_START_STREAM_ID).cancel(Status.CANCELLED); + + listener1.waitUntilStreamClosed(); + assertEquals(PROCESSED, listener1.rpcProgress); + + shutdownAndVerify(); + } + + @Test + public void reset_streamListenerRpcProgress() throws Exception { + initTransport(); + MockStreamListener listener1 = new MockStreamListener(); + MockStreamListener listener2 = new MockStreamListener(); + MockStreamListener listener3 = new MockStreamListener(); + OkHttpClientStream stream1 = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream1.start(listener1); + OkHttpClientStream stream2 = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream2.start(listener2); + OkHttpClientStream stream3 = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream3.start(listener3); + + assertEquals(3, activeStreamCount()); + assertContainStream(DEFAULT_START_STREAM_ID); + assertContainStream(DEFAULT_START_STREAM_ID + 2); + assertContainStream(DEFAULT_START_STREAM_ID + 4); + + frameHandler().rstStream(DEFAULT_START_STREAM_ID + 2, ErrorCode.REFUSED_STREAM); + + listener2.waitUntilStreamClosed(); + assertNull(listener1.rpcProgress); + assertEquals(REFUSED, listener2.rpcProgress); + assertNull(listener3.rpcProgress); + + frameHandler().rstStream(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL); + listener1.waitUntilStreamClosed(); + assertEquals(PROCESSED, listener1.rpcProgress); + assertNull(listener3.rpcProgress); + + getStream(DEFAULT_START_STREAM_ID + 4).cancel(Status.CANCELLED); + + listener3.waitUntilStreamClosed(); + assertEquals(PROCESSED, listener3.rpcProgress); + + shutdownAndVerify(); + } + private int activeStreamCount() { return clientTransport.getActiveStreams().length; } @@ -1813,6 +1899,7 @@ public class OkHttpClientTransportTest { Status status; Metadata headers; Metadata trailers; + RpcProgress rpcProgress; CountDownLatch closed = new CountDownLatch(1); ArrayList<String> messages = new ArrayList<String>(); boolean onReadyCalled; @@ -1838,8 +1925,14 @@ public class OkHttpClientTransportTest { @Override public void closed(Status status, Metadata trailers) { + closed(status, PROCESSED, trailers); + } + + @Override + public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) { this.status = status; this.trailers = trailers; + this.rpcProgress = rpcProgress; closed.countDown(); } |