aboutsummaryrefslogtreecommitdiff
path: root/okhttp
diff options
context:
space:
mode:
authorcreamsoup <jihuncho@google.com>2018-08-17 11:08:16 -0700
committerGitHub <noreply@github.com>2018-08-17 11:08:16 -0700
commit30a4bfb2f0d1daf38d3b59e633f300b8edba9faf (patch)
tree031b002e7406fb14a720a641870b04c96550c0aa /okhttp
parent3bc6e314bd87eadcd7d2f6e49f03baf56b215512 (diff)
downloadgrpc-grpc-java-30a4bfb2f0d1daf38d3b59e633f300b8edba9faf.tar.gz
Implement flush coalescing in OkHttp. (#4763)
okhttp: Implement flush coalescing.
Diffstat (limited to 'okhttp')
-rw-r--r--okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java29
-rw-r--r--okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java6
-rw-r--r--okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java124
3 files changed, 151 insertions, 8 deletions
diff --git a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java
index 4c9aed597..3049c5bd6 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java
@@ -25,6 +25,7 @@ import io.grpc.okhttp.internal.framed.Settings;
import java.io.IOException;
import java.net.Socket;
import java.util.List;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.logging.Level;
import java.util.logging.Logger;
import okio.Buffer;
@@ -36,10 +37,12 @@ class AsyncFrameWriter implements FrameWriter {
// Although writes are thread-safe, we serialize them to prevent consuming many Threads that are
// just waiting on each other.
private final SerializingExecutor executor;
- private final OkHttpClientTransport transport;
+ private final TransportExceptionHandler transportExceptionHandler;
+ private final AtomicLong flushCounter = new AtomicLong();
- public AsyncFrameWriter(OkHttpClientTransport transport, SerializingExecutor executor) {
- this.transport = transport;
+ public AsyncFrameWriter(
+ TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor) {
+ this.transportExceptionHandler = transportExceptionHandler;
this.executor = executor;
}
@@ -89,10 +92,17 @@ class AsyncFrameWriter implements FrameWriter {
@Override
public void flush() {
+ // keep track of version of flushes to skip flush if another flush task is queued.
+ final long flushCount = flushCounter.incrementAndGet();
+
executor.execute(new WriteRunnable() {
@Override
public void doRun() throws IOException {
- frameWriter.flush();
+ // There can be a flush starvation if there are continuous flood of flush is queued, this
+ // is not an issue with OkHttp since it flushes if the buffer is full.
+ if (flushCounter.get() == flushCount) {
+ frameWriter.flush();
+ }
}
});
}
@@ -219,9 +229,9 @@ class AsyncFrameWriter implements FrameWriter {
}
doRun();
} catch (RuntimeException e) {
- transport.onException(e);
+ transportExceptionHandler.onException(e);
} catch (Exception e) {
- transport.onException(e);
+ transportExceptionHandler.onException(e);
}
}
@@ -233,4 +243,11 @@ class AsyncFrameWriter implements FrameWriter {
return frameWriter == null ? 0x4000 /* 16384, the minimum required by the HTTP/2 spec */
: frameWriter.maxDataLength();
}
+
+ /** A class that handles transport exception. */
+ interface TransportExceptionHandler {
+
+ /** Handles exception. */
+ void onException(Throwable throwable);
+ }
}
diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
index 6a3c0881e..27c60a809 100644
--- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
+++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java
@@ -55,6 +55,7 @@ import io.grpc.internal.SerializingExecutor;
import io.grpc.internal.SharedResourceHolder;
import io.grpc.internal.StatsTraceContext;
import io.grpc.internal.TransportTracer;
+import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler;
import io.grpc.okhttp.internal.ConnectionSpec;
import io.grpc.okhttp.internal.framed.ErrorCode;
import io.grpc.okhttp.internal.framed.FrameReader;
@@ -98,7 +99,7 @@ import okio.Timeout;
/**
* A okhttp-based {@link ConnectionClientTransport} implementation.
*/
-class OkHttpClientTransport implements ConnectionClientTransport {
+class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler {
private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap();
private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName());
private static final OkHttpClientStream[] EMPTY_STREAM_ARRAY = new OkHttpClientStream[0];
@@ -728,7 +729,8 @@ class OkHttpClientTransport implements ConnectionClientTransport {
/**
* Finish all active streams due to an IOException, then close the transport.
*/
- void onException(Throwable failureCause) {
+ @Override
+ public void onException(Throwable failureCause) {
Preconditions.checkNotNull(failureCause, "failureCause");
Status status = Status.UNAVAILABLE.withCause(failureCause);
startGoAway(0, ErrorCode.INTERNAL_ERROR, status);
diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java
new file mode 100644
index 000000000..479a35a78
--- /dev/null
+++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java
@@ -0,0 +1,124 @@
+/*
+ * Copyright 2018 The gRPC Authors
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package io.grpc.okhttp;
+
+import static org.mockito.Matchers.anyBoolean;
+import static org.mockito.Matchers.anyInt;
+import static org.mockito.Mockito.inOrder;
+import static org.mockito.Mockito.verify;
+import static org.mockito.internal.verification.VerificationModeFactory.times;
+
+import io.grpc.internal.SerializingExecutor;
+import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler;
+import io.grpc.okhttp.internal.framed.FrameWriter;
+import java.io.IOException;
+import java.net.Socket;
+import java.util.Queue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.Executor;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.mockito.InOrder;
+import org.mockito.Mock;
+import org.mockito.runners.MockitoJUnitRunner;
+
+@RunWith(MockitoJUnitRunner.class)
+public class AsyncFrameWriterTest {
+
+ @Mock private Socket socket;
+ @Mock private FrameWriter frameWriter;
+
+ private QueueingExecutor queueingExecutor = new QueueingExecutor();
+ private TransportExceptionHandler transportExceptionHandler =
+ new EscalatingTransportErrorHandler();
+ private AsyncFrameWriter asyncFrameWriter =
+ new AsyncFrameWriter(transportExceptionHandler, new SerializingExecutor(queueingExecutor));
+
+ @Before
+ public void setUp() throws Exception {
+ asyncFrameWriter.becomeConnected(frameWriter, socket);
+ }
+
+ @Test
+ public void noCoalesceRequired() throws IOException {
+ asyncFrameWriter.ping(true, 0, 1);
+ asyncFrameWriter.flush();
+ queueingExecutor.runAll();
+
+ verify(frameWriter, times(1)).ping(anyBoolean(), anyInt(), anyInt());
+ verify(frameWriter, times(1)).flush();
+ }
+
+ @Test
+ public void flushCoalescing_shouldNotMergeTwoDistinctFlushes() throws IOException {
+ asyncFrameWriter.ping(true, 0, 1);
+ asyncFrameWriter.flush();
+ queueingExecutor.runAll();
+
+ asyncFrameWriter.ping(true, 0, 2);
+ asyncFrameWriter.flush();
+ queueingExecutor.runAll();
+
+ verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt());
+ verify(frameWriter, times(2)).flush();
+ }
+
+ @Test
+ public void flushCoalescing_shouldMergeTwoQueuedFlushes() throws IOException {
+ asyncFrameWriter.ping(true, 0, 1);
+ asyncFrameWriter.flush();
+ asyncFrameWriter.ping(true, 0, 2);
+ asyncFrameWriter.flush();
+
+ queueingExecutor.runAll();
+
+ InOrder inOrder = inOrder(frameWriter);
+ inOrder.verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt());
+ inOrder.verify(frameWriter).flush();
+ }
+
+ /**
+ * Executor queues incoming runnables instead of running it. Runnables can be invoked via {@link
+ * QueueingExecutor#runAll} in serial order.
+ */
+ private static class QueueingExecutor implements Executor {
+
+ private final Queue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>();
+
+ @Override
+ public void execute(Runnable command) {
+ runnables.add(command);
+ }
+
+ public void runAll() {
+ Runnable r;
+ while ((r = runnables.poll()) != null) {
+ r.run();
+ }
+ }
+ }
+
+ /** Rethrows as Assertion error. */
+ private static class EscalatingTransportErrorHandler implements TransportExceptionHandler {
+
+ @Override
+ public void onException(Throwable throwable) {
+ throw new AssertionError(throwable);
+ }
+ }
+}