aboutsummaryrefslogtreecommitdiff
path: root/okhttp
diff options
context:
space:
mode:
authorZHANG Dapeng <zdapeng@google.com>2018-03-12 14:12:46 -0700
committerGitHub <noreply@github.com>2018-03-12 14:12:46 -0700
commita83f67a706fdf9cd5b5c5026f1c413fbad1af839 (patch)
tree48e522d792b6cacc93becb5746c233301495458d /okhttp
parentc6fe4deb33686a7f0e65be6f3f509768ce5d04a6 (diff)
downloadgrpc-grpc-java-a83f67a706fdf9cd5b5c5026f1c413fbad1af839.tar.gz
core,netty,okhttp: Transparent retry
Changes: - `ClientStreamListener.onClose(Status status, RpcProgress rpcProgress, Metadata trailers)` added. - `AbstractClientStream.transportReportStatus(Status status, RpcProgress rpcProgress, boolean stopDelivery, Metadata trailers)` added - `ClientCallImpl.ClientStreamListenerImpl` will ignore the arg `rpcProgress` (non retry) - `RetriableStream.SubListener` will handle `rpcProgress` and decide if transparent retry. - `NettyClientHandler` and `OkHttpClientTransport` will pass `RpcProgress.REFUSED` to client stream listener for later stream ids when received GOAWAY, or for stream received a RST_STREAM frame with REFUSED code. - All other files are just a result of refactoring.
Diffstat (limited to 'okhttp')
-rw-r--r--okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java15
-rw-r--r--okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java19
-rw-r--r--okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java21
-rw-r--r--okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java97
4 files changed, 127 insertions, 25 deletions
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
index 60bdcf733..4cb6153a9 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientStream.java
@@ -18,6 +18,7 @@ package io.grpc.okhttp;
import static com.google.common.base.Preconditions.checkNotNull;
import static com.google.common.base.Preconditions.checkState;
+import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import com.google.common.io.BaseEncoding;
import io.grpc.Attributes;
@@ -307,8 +308,11 @@ class OkHttpClientStream extends AbstractClientStream {
window -= length;
if (window < 0) {
frameWriter.rstStream(id(), ErrorCode.FLOW_CONTROL_ERROR);
- transport.finishStream(id(), Status.INTERNAL.withDescription(
- "Received data size exceeded our receiving window size"), false, null, null);
+ transport.finishStream(
+ id(),
+ Status.INTERNAL.withDescription(
+ "Received data size exceeded our receiving window size"),
+ PROCESSED, false, null, null);
return;
}
super.transportDataReceived(new OkHttpReadableBuffer(frame), endOfStream);
@@ -319,9 +323,9 @@ class OkHttpClientStream extends AbstractClientStream {
if (!framer().isClosed()) {
// If server's end-of-stream is received before client sends end-of-stream, we just send a
// reset to server to fully close the server side stream.
- transport.finishStream(id(), null, false, ErrorCode.CANCEL, null);
+ transport.finishStream(id(),null, PROCESSED, false, ErrorCode.CANCEL, null);
} else {
- transport.finishStream(id(), null, false, null, null);
+ transport.finishStream(id(), null, PROCESSED, false, null, null);
}
}
@@ -344,7 +348,8 @@ class OkHttpClientStream extends AbstractClientStream {
} else {
// If pendingData is null, start must have already been called, which means synStream has
// been called as well.
- transport.finishStream(id(), reason, stopDelivery, ErrorCode.CANCEL, trailers);
+ transport.finishStream(
+ id(), reason, PROCESSED, stopDelivery, ErrorCode.CANCEL, trailers);
}
}
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
index 5a0a70d10..a1fc9fa1c 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
@@ -39,6 +39,7 @@ import io.grpc.Status.Code;
import io.grpc.StatusException;
import io.grpc.internal.Channelz.Security;
import io.grpc.internal.Channelz.SocketStats;
+import io.grpc.internal.ClientStreamListener.RpcProgress;
import io.grpc.internal.ConnectionClientTransport;
import io.grpc.internal.GrpcUtil;
import io.grpc.internal.Http2Ping;
@@ -733,12 +734,14 @@ class OkHttpClientTransport implements ConnectionClientTransport {
Map.Entry<Integer, OkHttpClientStream> entry = it.next();
if (entry.getKey() > lastKnownStreamId) {
it.remove();
- entry.getValue().transportState().transportReportStatus(status, false, new Metadata());
+ entry.getValue().transportState().transportReportStatus(
+ status, RpcProgress.REFUSED, false, new Metadata());
}
}
for (OkHttpClientStream stream : pendingStreams) {
- stream.transportState().transportReportStatus(status, true, new Metadata());
+ stream.transportState().transportReportStatus(
+ status, RpcProgress.REFUSED, true, new Metadata());
}
pendingStreams.clear();
maybeClearInUse();
@@ -765,6 +768,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
void finishStream(
int streamId,
@Nullable Status status,
+ RpcProgress rpcProgress,
boolean stopDelivery,
@Nullable ErrorCode errorCode,
@Nullable Metadata trailers) {
@@ -779,6 +783,7 @@ class OkHttpClientTransport implements ConnectionClientTransport {
.transportState()
.transportReportStatus(
status,
+ rpcProgress,
stopDelivery,
trailers != null ? trailers : new Metadata());
}
@@ -1020,7 +1025,10 @@ class OkHttpClientTransport implements ConnectionClientTransport {
Status status = toGrpcStatus(errorCode).augmentDescription("Rst Stream");
boolean stopDelivery =
(status.getCode() == Code.CANCELLED || status.getCode() == Code.DEADLINE_EXCEEDED);
- finishStream(streamId, status, stopDelivery, null, null);
+ finishStream(
+ streamId, status,
+ errorCode == ErrorCode.REFUSED_STREAM ? RpcProgress.REFUSED : RpcProgress.PROCESSED,
+ stopDelivery, null, null);
}
@Override
@@ -1112,8 +1120,9 @@ class OkHttpClientTransport implements ConnectionClientTransport {
if (streamId == 0) {
onError(ErrorCode.PROTOCOL_ERROR, errorMsg);
} else {
- finishStream(streamId,
- Status.INTERNAL.withDescription(errorMsg), false, ErrorCode.PROTOCOL_ERROR, null);
+ finishStream(
+ streamId, Status.INTERNAL.withDescription(errorMsg), RpcProgress.PROCESSED, false,
+ ErrorCode.PROTOCOL_ERROR, null);
}
return;
}
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java
index 007e35f64..79340a716 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientStreamTest.java
@@ -17,6 +17,7 @@
package io.grpc.okhttp;
import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
@@ -30,8 +31,8 @@ import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.MethodDescriptor.MethodType;
import io.grpc.Status;
-import io.grpc.internal.ClientStreamListener;
import io.grpc.internal.GrpcUtil;
+import io.grpc.internal.NoopClientStreamListener;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
import io.grpc.okhttp.internal.framed.ErrorCode;
@@ -102,7 +103,8 @@ public class OkHttpClientStreamTest {
final AtomicReference<Status> statusRef = new AtomicReference<Status>();
stream.start(new BaseClientStreamListener() {
@Override
- public void closed(Status status, Metadata trailers) {
+ public void closed(
+ Status status, RpcProgress rpcProgress, Metadata trailers) {
statusRef.set(status);
assertTrue(Thread.holdsLock(lock));
}
@@ -123,11 +125,12 @@ public class OkHttpClientStreamTest {
assertTrue(Thread.holdsLock(lock));
return null;
}
- }).when(transport).finishStream(1234, Status.CANCELLED, true, ErrorCode.CANCEL, null);
+ }).when(transport).finishStream(
+ 1234, Status.CANCELLED, PROCESSED, true, ErrorCode.CANCEL, null);
stream.cancel(Status.CANCELLED);
- verify(transport).finishStream(1234, Status.CANCELLED, true, ErrorCode.CANCEL, null);
+ verify(transport).finishStream(1234, Status.CANCELLED, PROCESSED,true, ErrorCode.CANCEL, null);
}
@Test
@@ -213,20 +216,12 @@ public class OkHttpClientStreamTest {
// TODO(carl-mastrangelo): extract this out into a testing/ directory and remove other definitions
// of it.
- private static class BaseClientStreamListener implements ClientStreamListener {
- @Override
- public void onReady() {}
+ private static class BaseClientStreamListener extends NoopClientStreamListener {
@Override
public void messagesAvailable(MessageProducer producer) {
while (producer.next() != null) {}
}
-
- @Override
- public void headersRead(Metadata headers) {}
-
- @Override
- public void closed(Status status, Metadata trailers) {}
}
}
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
index b9d706177..bac6a8e67 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
@@ -18,6 +18,8 @@ package io.grpc.okhttp;
import static com.google.common.base.Charsets.UTF_8;
import static com.google.common.truth.Truth.assertThat;
+import static io.grpc.internal.ClientStreamListener.RpcProgress.PROCESSED;
+import static io.grpc.internal.ClientStreamListener.RpcProgress.REFUSED;
import static io.grpc.internal.GrpcUtil.DEFAULT_MAX_MESSAGE_SIZE;
import static io.grpc.okhttp.Headers.CONTENT_TYPE_HEADER;
import static io.grpc.okhttp.Headers.METHOD_HEADER;
@@ -128,6 +130,7 @@ public class OkHttpClientTransportTest {
private static final ProxyParameters NO_PROXY = null;
private static final String NO_USER = null;
private static final String NO_PW = null;
+ private static final int DEFAULT_START_STREAM_ID = 3;
@Rule public final Timeout globalTimeout = Timeout.seconds(10);
@@ -168,7 +171,7 @@ public class OkHttpClientTransportTest {
}
private void initTransport() throws Exception {
- startTransport(3, null, true, DEFAULT_MAX_MESSAGE_SIZE, null);
+ startTransport(DEFAULT_START_STREAM_ID, null, true, DEFAULT_MAX_MESSAGE_SIZE, null);
}
private void initTransport(int startId) throws Exception {
@@ -177,7 +180,8 @@ public class OkHttpClientTransportTest {
private void initTransportAndDelayConnected() throws Exception {
delayConnectedCallback = new DelayConnectedCallback();
- startTransport(3, delayConnectedCallback, false, DEFAULT_MAX_MESSAGE_SIZE, null);
+ startTransport(
+ DEFAULT_START_STREAM_ID, delayConnectedCallback, false, DEFAULT_MAX_MESSAGE_SIZE, null);
}
private void startTransport(int startId, @Nullable Runnable connectingCallback,
@@ -1663,6 +1667,88 @@ public class OkHttpClientTransportTest {
shutdownAndVerify();
}
+ @Test
+ public void goAway_streamListenerRpcProgress() throws Exception {
+ initTransport();
+ setMaxConcurrentStreams(2);
+ MockStreamListener listener1 = new MockStreamListener();
+ MockStreamListener listener2 = new MockStreamListener();
+ MockStreamListener listener3 = new MockStreamListener();
+ OkHttpClientStream stream1 =
+ clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
+ stream1.start(listener1);
+ OkHttpClientStream stream2 =
+ clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
+ stream2.start(listener2);
+ OkHttpClientStream stream3 =
+ clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
+ stream3.start(listener3);
+ waitForStreamPending(1);
+
+ assertEquals(2, activeStreamCount());
+ assertContainStream(DEFAULT_START_STREAM_ID);
+ assertContainStream(DEFAULT_START_STREAM_ID + 2);
+
+ frameHandler()
+ .goAway(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL, ByteString.encodeUtf8("blablabla"));
+
+ listener2.waitUntilStreamClosed();
+ listener3.waitUntilStreamClosed();
+ assertNull(listener1.rpcProgress);
+ assertEquals(REFUSED, listener2.rpcProgress);
+ assertEquals(REFUSED, listener3.rpcProgress);
+ assertEquals(1, activeStreamCount());
+ assertContainStream(DEFAULT_START_STREAM_ID);
+
+ getStream(DEFAULT_START_STREAM_ID).cancel(Status.CANCELLED);
+
+ listener1.waitUntilStreamClosed();
+ assertEquals(PROCESSED, listener1.rpcProgress);
+
+ shutdownAndVerify();
+ }
+
+ @Test
+ public void reset_streamListenerRpcProgress() throws Exception {
+ initTransport();
+ MockStreamListener listener1 = new MockStreamListener();
+ MockStreamListener listener2 = new MockStreamListener();
+ MockStreamListener listener3 = new MockStreamListener();
+ OkHttpClientStream stream1 =
+ clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
+ stream1.start(listener1);
+ OkHttpClientStream stream2 =
+ clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
+ stream2.start(listener2);
+ OkHttpClientStream stream3 =
+ clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
+ stream3.start(listener3);
+
+ assertEquals(3, activeStreamCount());
+ assertContainStream(DEFAULT_START_STREAM_ID);
+ assertContainStream(DEFAULT_START_STREAM_ID + 2);
+ assertContainStream(DEFAULT_START_STREAM_ID + 4);
+
+ frameHandler().rstStream(DEFAULT_START_STREAM_ID + 2, ErrorCode.REFUSED_STREAM);
+
+ listener2.waitUntilStreamClosed();
+ assertNull(listener1.rpcProgress);
+ assertEquals(REFUSED, listener2.rpcProgress);
+ assertNull(listener3.rpcProgress);
+
+ frameHandler().rstStream(DEFAULT_START_STREAM_ID, ErrorCode.CANCEL);
+ listener1.waitUntilStreamClosed();
+ assertEquals(PROCESSED, listener1.rpcProgress);
+ assertNull(listener3.rpcProgress);
+
+ getStream(DEFAULT_START_STREAM_ID + 4).cancel(Status.CANCELLED);
+
+ listener3.waitUntilStreamClosed();
+ assertEquals(PROCESSED, listener3.rpcProgress);
+
+ shutdownAndVerify();
+ }
+
private int activeStreamCount() {
return clientTransport.getActiveStreams().length;
}
@@ -1813,6 +1899,7 @@ public class OkHttpClientTransportTest {
Status status;
Metadata headers;
Metadata trailers;
+ RpcProgress rpcProgress;
CountDownLatch closed = new CountDownLatch(1);
ArrayList<String> messages = new ArrayList<String>();
boolean onReadyCalled;
@@ -1838,8 +1925,14 @@ public class OkHttpClientTransportTest {
@Override
public void closed(Status status, Metadata trailers) {
+ closed(status, PROCESSED, trailers);
+ }
+
+ @Override
+ public void closed(Status status, RpcProgress rpcProgress, Metadata trailers) {
this.status = status;
this.trailers = trailers;
+ this.rpcProgress = rpcProgress;
closed.countDown();
}