aboutsummaryrefslogtreecommitdiff
path: root/okhttp
diff options
context:
space:
mode:
authorcreamsoup <jihuncho@google.com>2018-09-06 15:52:02 -0700
committerGitHub <noreply@github.com>2018-09-06 15:52:02 -0700
commit554210da2a9199c7c8e0ea3613a32e9d71d57031 (patch)
tree3705a0ea1585f350535a1b6b342e9f5c6abcabc5 /okhttp
parent2fca42feb93f1bda1a80f6649d1e6304e9a67b08 (diff)
downloadgrpc-grpc-java-554210da2a9199c7c8e0ea3613a32e9d71d57031.tar.gz
okhttp: settings acks back after apply settings before sending any data (#4825)
okhttp: setting acks back after apply settings before sending any data as a result of the change. Resolves #4809 also, make #4816 the not flaky.
Diffstat (limited to 'okhttp')
-rw-r--r--okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java15
-rw-r--r--okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java19
-rw-r--r--okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java36
3 files changed, 60 insertions, 10 deletions
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
index cf70d7e20..9bca807eb 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
@@ -1070,6 +1070,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
@Override
public void settings(boolean clearPrevious, Settings settings) {
+ boolean outboundWindowSizeIncreased = false;
synchronized (lock) {
if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) {
int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get(
@@ -1080,16 +1081,24 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep
if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) {
int initialWindowSize = OkHttpSettingsUtil.get(
settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE);
- outboundFlow.initialOutboundWindowSize(initialWindowSize);
+ outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize);
}
if (firstSettings) {
listener.transportReady();
firstSettings = false;
}
+
+ // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any
+ // writes due to update in settings must be sent after SETTINGS acknowledgment frame,
+ // otherwise it will cause a stream error (RST_STREAM).
+ frameWriter.ackSettings(settings);
+
+ // send any pending bytes / streams
+ if (outboundWindowSizeIncreased) {
+ outboundFlow.writeStreams();
+ }
startPendingStreams();
}
-
- frameWriter.ackSettings(settings);
}
@Override
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java
index fdce027a1..c81c32304 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java
@@ -46,9 +46,15 @@ class OutboundFlowController {
}
/**
- * Must be called with holding transport lock.
+ * Adjusts outbound window size requested by peer. When window size is increased, it does not send
+ * any pending frames. If this method returns {@code true}, the caller should call {@link
+ * #writeStreams()} after settings ack.
+ *
+ * <p>Must be called with holding transport lock.
+ *
+ * @return true, if new window size is increased, false otherwise.
*/
- void initialOutboundWindowSize(int newWindowSize) {
+ boolean initialOutboundWindowSize(int newWindowSize) {
if (newWindowSize < 0) {
throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize);
}
@@ -66,10 +72,7 @@ class OutboundFlowController {
}
}
- if (delta > 0) {
- // The window size increased, send any pending frames for all streams.
- writeStreams();
- }
+ return delta > 0;
}
/**
@@ -163,8 +166,10 @@ class OutboundFlowController {
/**
* Writes as much data for all the streams as possible given the current flow control windows.
+ *
+ * <p>Must be called with holding transport lock.
*/
- private void writeStreams() {
+ void writeStreams() {
OkHttpClientStream[] streams = transport.getActiveStreams();
int connectionWindow = connectionState.window();
for (int numStreams = streams.length; numStreams > 0 && connectionWindow > 0;) {
diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
index 44212ca43..5f43ad965 100644
--- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
+++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java
@@ -38,6 +38,7 @@ import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isA;
import static org.mockito.Matchers.same;
+import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.reset;
@@ -113,6 +114,7 @@ import org.junit.rules.Timeout;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.mockito.ArgumentCaptor;
+import org.mockito.InOrder;
import org.mockito.Matchers;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
@@ -759,6 +761,40 @@ public class OkHttpClientTransportTest {
}
@Test
+ public void outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream() throws Exception {
+ initTransport();
+ MockStreamListener listener = new MockStreamListener();
+ OkHttpClientStream stream =
+ clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT);
+ stream.start(listener);
+ int messageLength = 20;
+ setInitialWindowSize(HEADER_LENGTH + 10);
+ InputStream input = new ByteArrayInputStream(new byte[messageLength]);
+ stream.writeMessage(input);
+ stream.flush();
+ // part of the message can be sent.
+ verify(frameWriter, timeout(TIME_OUT_MS))
+ .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10));
+ // Avoid connection flow control.
+ frameHandler().windowUpdate(0, HEADER_LENGTH + 20);
+
+ // Increase initial window size
+ setInitialWindowSize(HEADER_LENGTH + 20);
+
+ // wait until pending frames sent (inOrder doesn't support timeout)
+ verify(frameWriter, timeout(TIME_OUT_MS).atLeastOnce())
+ .data(eq(false), eq(3), any(Buffer.class), eq(10));
+ // It should ack the settings, then send remaining message.
+ InOrder inOrder = inOrder(frameWriter);
+ inOrder.verify(frameWriter).ackSettings(any(Settings.class));
+ inOrder.verify(frameWriter).data(eq(false), eq(3), any(Buffer.class), eq(10));
+
+ stream.cancel(Status.CANCELLED);
+ listener.waitUntilStreamClosed();
+ shutdownAndVerify();
+ }
+
+ @Test
public void stopNormally() throws Exception {
initTransport();
MockStreamListener listener1 = new MockStreamListener();