diff options
author | creamsoup <jihuncho@google.com> | 2018-08-17 11:08:16 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-08-17 11:08:16 -0700 |
commit | 30a4bfb2f0d1daf38d3b59e633f300b8edba9faf (patch) | |
tree | 031b002e7406fb14a720a641870b04c96550c0aa /okhttp | |
parent | 3bc6e314bd87eadcd7d2f6e49f03baf56b215512 (diff) | |
download | grpc-grpc-java-30a4bfb2f0d1daf38d3b59e633f300b8edba9faf.tar.gz |
Implement flush coalescing in OkHttp. (#4763)
okhttp: Implement flush coalescing.
Diffstat (limited to 'okhttp')
3 files changed, 151 insertions, 8 deletions
diff --git a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java index 4c9aed597..3049c5bd6 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java +++ b/okhttp/src/main/java/io/grpc/okhttp/AsyncFrameWriter.java @@ -25,6 +25,7 @@ import io.grpc.okhttp.internal.framed.Settings; import java.io.IOException; import java.net.Socket; import java.util.List; +import java.util.concurrent.atomic.AtomicLong; import java.util.logging.Level; import java.util.logging.Logger; import okio.Buffer; @@ -36,10 +37,12 @@ class AsyncFrameWriter implements FrameWriter { // Although writes are thread-safe, we serialize them to prevent consuming many Threads that are // just waiting on each other. private final SerializingExecutor executor; - private final OkHttpClientTransport transport; + private final TransportExceptionHandler transportExceptionHandler; + private final AtomicLong flushCounter = new AtomicLong(); - public AsyncFrameWriter(OkHttpClientTransport transport, SerializingExecutor executor) { - this.transport = transport; + public AsyncFrameWriter( + TransportExceptionHandler transportExceptionHandler, SerializingExecutor executor) { + this.transportExceptionHandler = transportExceptionHandler; this.executor = executor; } @@ -89,10 +92,17 @@ class AsyncFrameWriter implements FrameWriter { @Override public void flush() { + // keep track of version of flushes to skip flush if another flush task is queued. + final long flushCount = flushCounter.incrementAndGet(); + executor.execute(new WriteRunnable() { @Override public void doRun() throws IOException { - frameWriter.flush(); + // There can be a flush starvation if there are continuous flood of flush is queued, this + // is not an issue with OkHttp since it flushes if the buffer is full. + if (flushCounter.get() == flushCount) { + frameWriter.flush(); + } } }); } @@ -219,9 +229,9 @@ class AsyncFrameWriter implements FrameWriter { } doRun(); } catch (RuntimeException e) { - transport.onException(e); + transportExceptionHandler.onException(e); } catch (Exception e) { - transport.onException(e); + transportExceptionHandler.onException(e); } } @@ -233,4 +243,11 @@ class AsyncFrameWriter implements FrameWriter { return frameWriter == null ? 0x4000 /* 16384, the minimum required by the HTTP/2 spec */ : frameWriter.maxDataLength(); } + + /** A class that handles transport exception. */ + interface TransportExceptionHandler { + + /** Handles exception. */ + void onException(Throwable throwable); + } } diff --git a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java index 6a3c0881e..27c60a809 100644 --- a/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java +++ b/okhttp/src/main/java/io/grpc/okhttp/OkHttpClientTransport.java @@ -55,6 +55,7 @@ import io.grpc.internal.SerializingExecutor; import io.grpc.internal.SharedResourceHolder; import io.grpc.internal.StatsTraceContext; import io.grpc.internal.TransportTracer; +import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler; import io.grpc.okhttp.internal.ConnectionSpec; import io.grpc.okhttp.internal.framed.ErrorCode; import io.grpc.okhttp.internal.framed.FrameReader; @@ -98,7 +99,7 @@ import okio.Timeout; /** * A okhttp-based {@link ConnectionClientTransport} implementation. */ -class OkHttpClientTransport implements ConnectionClientTransport { +class OkHttpClientTransport implements ConnectionClientTransport, TransportExceptionHandler { private static final Map<ErrorCode, Status> ERROR_CODE_TO_STATUS = buildErrorCodeToStatusMap(); private static final Logger log = Logger.getLogger(OkHttpClientTransport.class.getName()); private static final OkHttpClientStream[] EMPTY_STREAM_ARRAY = new OkHttpClientStream[0]; @@ -728,7 +729,8 @@ class OkHttpClientTransport implements ConnectionClientTransport { /** * Finish all active streams due to an IOException, then close the transport. */ - void onException(Throwable failureCause) { + @Override + public void onException(Throwable failureCause) { Preconditions.checkNotNull(failureCause, "failureCause"); Status status = Status.UNAVAILABLE.withCause(failureCause); startGoAway(0, ErrorCode.INTERNAL_ERROR, status); diff --git a/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java new file mode 100644 index 000000000..479a35a78 --- /dev/null +++ b/okhttp/src/test/java/io/grpc/okhttp/AsyncFrameWriterTest.java @@ -0,0 +1,124 @@ +/* + * Copyright 2018 The gRPC Authors + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package io.grpc.okhttp; + +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.anyInt; +import static org.mockito.Mockito.inOrder; +import static org.mockito.Mockito.verify; +import static org.mockito.internal.verification.VerificationModeFactory.times; + +import io.grpc.internal.SerializingExecutor; +import io.grpc.okhttp.AsyncFrameWriter.TransportExceptionHandler; +import io.grpc.okhttp.internal.framed.FrameWriter; +import java.io.IOException; +import java.net.Socket; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; +import java.util.concurrent.Executor; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.InOrder; +import org.mockito.Mock; +import org.mockito.runners.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class AsyncFrameWriterTest { + + @Mock private Socket socket; + @Mock private FrameWriter frameWriter; + + private QueueingExecutor queueingExecutor = new QueueingExecutor(); + private TransportExceptionHandler transportExceptionHandler = + new EscalatingTransportErrorHandler(); + private AsyncFrameWriter asyncFrameWriter = + new AsyncFrameWriter(transportExceptionHandler, new SerializingExecutor(queueingExecutor)); + + @Before + public void setUp() throws Exception { + asyncFrameWriter.becomeConnected(frameWriter, socket); + } + + @Test + public void noCoalesceRequired() throws IOException { + asyncFrameWriter.ping(true, 0, 1); + asyncFrameWriter.flush(); + queueingExecutor.runAll(); + + verify(frameWriter, times(1)).ping(anyBoolean(), anyInt(), anyInt()); + verify(frameWriter, times(1)).flush(); + } + + @Test + public void flushCoalescing_shouldNotMergeTwoDistinctFlushes() throws IOException { + asyncFrameWriter.ping(true, 0, 1); + asyncFrameWriter.flush(); + queueingExecutor.runAll(); + + asyncFrameWriter.ping(true, 0, 2); + asyncFrameWriter.flush(); + queueingExecutor.runAll(); + + verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt()); + verify(frameWriter, times(2)).flush(); + } + + @Test + public void flushCoalescing_shouldMergeTwoQueuedFlushes() throws IOException { + asyncFrameWriter.ping(true, 0, 1); + asyncFrameWriter.flush(); + asyncFrameWriter.ping(true, 0, 2); + asyncFrameWriter.flush(); + + queueingExecutor.runAll(); + + InOrder inOrder = inOrder(frameWriter); + inOrder.verify(frameWriter, times(2)).ping(anyBoolean(), anyInt(), anyInt()); + inOrder.verify(frameWriter).flush(); + } + + /** + * Executor queues incoming runnables instead of running it. Runnables can be invoked via {@link + * QueueingExecutor#runAll} in serial order. + */ + private static class QueueingExecutor implements Executor { + + private final Queue<Runnable> runnables = new ConcurrentLinkedQueue<Runnable>(); + + @Override + public void execute(Runnable command) { + runnables.add(command); + } + + public void runAll() { + Runnable r; + while ((r = runnables.poll()) != null) { + r.run(); + } + } + } + + /** Rethrows as Assertion error. */ + private static class EscalatingTransportErrorHandler implements TransportExceptionHandler { + + @Override + public void onException(Throwable throwable) { + throw new AssertionError(throwable); + } + } +} |