diff options
author | creamsoup <jihuncho@google.com> | 2018-09-06 15:52:02 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-09-06 15:52:02 -0700 |
commit | 554210da2a9199c7c8e0ea3613a32e9d71d57031 (patch) | |
tree | 3705a0ea1585f350535a1b6b342e9f5c6abcabc5 /okhttp | |
parent | 2fca42feb93f1bda1a80f6649d1e6304e9a67b08 (diff) | |
download | grpc-grpc-java-554210da2a9199c7c8e0ea3613a32e9d71d57031.tar.gz |
okhttp: settings acks back after apply settings before sending any data (#4825)
okhttp: setting acks back after apply settings before sending any data as a result of the change.
Resolves #4809 also, make #4816 the not flaky.
Diffstat (limited to 'okhttp')
3 files changed, 60 insertions, 10 deletions
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index cf70d7e20..9bca807eb 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -1070,6 +1070,7 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep @Override public void settings(boolean clearPrevious, Settings settings) { + boolean outboundWindowSizeIncreased = false; synchronized (lock) { if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.MAX_CONCURRENT_STREAMS)) { int receivedMaxConcurrentStreams = OkHttpSettingsUtil.get( @@ -1080,16 +1081,24 @@ class OkHttpClientTransport implements ConnectionClientTransport, TransportExcep if (OkHttpSettingsUtil.isSet(settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE)) { int initialWindowSize = OkHttpSettingsUtil.get( settings, OkHttpSettingsUtil.INITIAL_WINDOW_SIZE); - outboundFlow.initialOutboundWindowSize(initialWindowSize); + outboundWindowSizeIncreased = outboundFlow.initialOutboundWindowSize(initialWindowSize); } if (firstSettings) { listener.transportReady(); firstSettings = false; } + + // The changed settings are not finalized until SETTINGS acknowledgment frame is sent. Any + // writes due to update in settings must be sent after SETTINGS acknowledgment frame, + // otherwise it will cause a stream error (RST_STREAM). + frameWriter.ackSettings(settings); + + // send any pending bytes / streams + if (outboundWindowSizeIncreased) { + outboundFlow.writeStreams(); + } startPendingStreams(); } - - frameWriter.ackSettings(settings); } @Override diff --git a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java index fdce027a1..c81c32304 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OutboundFlowController.java @@ -46,9 +46,15 @@ class OutboundFlowController { } /** - * Must be called with holding transport lock. + * Adjusts outbound window size requested by peer. When window size is increased, it does not send + * any pending frames. If this method returns {@code true}, the caller should call {@link + * #writeStreams()} after settings ack. + * + * <p>Must be called with holding transport lock. + * + * @return true, if new window size is increased, false otherwise. */ - void initialOutboundWindowSize(int newWindowSize) { + boolean initialOutboundWindowSize(int newWindowSize) { if (newWindowSize < 0) { throw new IllegalArgumentException("Invalid initial window size: " + newWindowSize); } @@ -66,10 +72,7 @@ class OutboundFlowController { } } - if (delta > 0) { - // The window size increased, send any pending frames for all streams. - writeStreams(); - } + return delta > 0; } /** @@ -163,8 +166,10 @@ class OutboundFlowController { /** * Writes as much data for all the streams as possible given the current flow control windows. + * + * <p>Must be called with holding transport lock. */ - private void writeStreams() { + void writeStreams() { OkHttpClientStream[] streams = transport.getActiveStreams(); int connectionWindow = connectionState.window(); for (int numStreams = streams.length; numStreams > 0 && connectionWindow > 0;) { diff --git a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java index 44212ca43..5f43ad965 100644 --- a/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java +++ b/okhttp/src/test/java/io/grpc/okhttp/OkHttpClientTransportTest.java @@ -38,6 +38,7 @@ import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isA; import static org.mockito.Matchers.same; +import static org.mockito.Mockito.inOrder; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.never; import static org.mockito.Mockito.reset; @@ -113,6 +114,7 @@ import org.junit.rules.Timeout; import org.junit.runner.RunWith; import org.junit.runners.JUnit4; import org.mockito.ArgumentCaptor; +import org.mockito.InOrder; import org.mockito.Matchers; import org.mockito.Mock; import org.mockito.MockitoAnnotations; @@ -759,6 +761,40 @@ public class OkHttpClientTransportTest { } @Test + public void outboundFlowControlWithInitialWindowSizeChangeInMiddleOfStream() throws Exception { + initTransport(); + MockStreamListener listener = new MockStreamListener(); + OkHttpClientStream stream = + clientTransport.newStream(method, new Metadata(), CallOptions.DEFAULT); + stream.start(listener); + int messageLength = 20; + setInitialWindowSize(HEADER_LENGTH + 10); + InputStream input = new ByteArrayInputStream(new byte[messageLength]); + stream.writeMessage(input); + stream.flush(); + // part of the message can be sent. + verify(frameWriter, timeout(TIME_OUT_MS)) + .data(eq(false), eq(3), any(Buffer.class), eq(HEADER_LENGTH + 10)); + // Avoid connection flow control. + frameHandler().windowUpdate(0, HEADER_LENGTH + 20); + + // Increase initial window size + setInitialWindowSize(HEADER_LENGTH + 20); + + // wait until pending frames sent (inOrder doesn't support timeout) + verify(frameWriter, timeout(TIME_OUT_MS).atLeastOnce()) + .data(eq(false), eq(3), any(Buffer.class), eq(10)); + // It should ack the settings, then send remaining message. + InOrder inOrder = inOrder(frameWriter); + inOrder.verify(frameWriter).ackSettings(any(Settings.class)); + inOrder.verify(frameWriter).data(eq(false), eq(3), any(Buffer.class), eq(10)); + + stream.cancel(Status.CANCELLED); + listener.waitUntilStreamClosed(); + shutdownAndVerify(); + } + + @Test public void stopNormally() throws Exception { initTransport(); MockStreamListener listener1 = new MockStreamListener(); |