aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJakob Buchgraber <buchgr@google.com>2015-02-10 16:01:26 -0800
committerJakob Buchgraber <buchgr@google.com>2015-02-11 17:47:45 -0800
commit3fd7d0675cd8ec60f6c6f46a01266c04be65f7c2 (patch)
tree09e95fe404652280eee8fc9d29fe2a6f7fe1b205
parent127270bd5f58c59a14ef5badd3bcf290c233759d (diff)
downloadgrpc-grpc-java-3fd7d0675cd8ec60f6c6f46a01266c04be65f7c2.tar.gz
Add QPS Client to perform throughput and latency tests.
-rw-r--r--benchmarks/build.gradle41
-rw-r--r--benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java360
-rw-r--r--benchmarks/src/main/proto/qpstest.proto158
-rw-r--r--build.gradle2
-rw-r--r--settings.gradle2
5 files changed, 563 insertions, 0 deletions
diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle
new file mode 100644
index 000000000..0b688d283
--- /dev/null
+++ b/benchmarks/build.gradle
@@ -0,0 +1,41 @@
+apply plugin: 'application'
+apply plugin: 'protobuf'
+
+description = "gRPC Benchmarks"
+
+mainClassName = "io.grpc.benchmarks.qps.Client"
+
+buildscript {
+ repositories {
+ mavenCentral()
+ }
+ dependencies {
+ classpath libraries.protobuf_plugin
+ }
+}
+
+dependencies {
+ compile project(':grpc-core'),
+ project(':grpc-netty'),
+ project(':grpc-okhttp'),
+ project(':grpc-stub'),
+ project(':grpc-testing'),
+ libraries.junit,
+ libraries.mockito,
+ libraries.hdrhistogram
+}
+
+protobufCodeGenPlugins = ["java_plugin:$rootDir/compiler/build/binaries/java_pluginExecutable/java_plugin"]
+compileJava.dependsOn = ['generateProto']
+
+// Allow execution of test client and server.
+task execute(dependsOn: classes, type:JavaExec) {
+ main = project.hasProperty('mainClass') ? project.mainClass : 'io.grpc.benchmarks.qps.Client'
+ classpath = sourceSets.main.runtimeClasspath
+ workingDir = project.rootDir
+
+ // If appArgs were provided, set the program arguments.
+ if (project.hasProperty("appArgs")) {
+ args = Eval.me(appArgs)
+ }
+}
diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java
new file mode 100644
index 000000000..a1a2c1bcc
--- /dev/null
+++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java
@@ -0,0 +1,360 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc.benchmarks.qps;
+
+import static grpc.testing.TestServiceGrpc.TestServiceStub;
+import static grpc.testing.Qpstest.SimpleRequest;
+import static grpc.testing.Qpstest.SimpleResponse;
+import static java.lang.Math.max;
+
+import com.google.common.base.Preconditions;
+
+import grpc.testing.Qpstest.PayloadType;
+import grpc.testing.TestServiceGrpc;
+import io.grpc.Channel;
+import io.grpc.ChannelImpl;
+import io.grpc.Status;
+import io.grpc.stub.StreamObserver;
+import io.grpc.transport.netty.NegotiationType;
+import io.grpc.transport.netty.NettyChannelBuilder;
+import io.grpc.transport.okhttp.OkHttpChannelBuilder;
+import org.HdrHistogram.Histogram;
+import org.HdrHistogram.HistogramIterationValue;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.CancellationException;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.logging.Logger;
+
+// TODO: Add OkHttp and Netty TLS Support
+
+/**
+ * Runs lots of RPCs against a QPS Server to test for throughput and latency.
+ * It's a Java clone of the C version at
+ * https://github.com/grpc/grpc/blob/master/test/cpp/qps/client.cc
+ */
+public class Client {
+ private static final Logger log = Logger.getLogger(Client.class.getName());
+
+ // Can record values between 1 ns and 1 min (60 BILLION NS)
+ private static final long HISTOGRAM_MAX_VALUE = 60000000000L;
+ private static final int HISTOGRAM_PRECISION = 3;
+ // How long (in ns) to do RPCs before it counts
+ private static final long WARMUP_TIME = 5000000000L;
+
+ private int clientChannels = 4;
+ private int clientThreads = 4;
+ private int numRpcs = 100000;
+ private int payloadSize = 1;
+ private String serverHost = "127.0.0.1";
+ private int serverPort;
+ private boolean okhttp;
+
+ public static void main(String... args) throws Exception {
+ Client c = new Client();
+ c.run(args);
+ }
+
+ private void run(String[] args) throws Exception {
+ if (!parseArgs(args)) {
+ return;
+ }
+
+ SimpleRequest req = SimpleRequest.newBuilder()
+ .setResponseType(PayloadType.COMPRESSABLE)
+ .setResponseSize(payloadSize)
+ .build();
+
+ List<Channel> channels = new ArrayList<Channel>(clientChannels);
+ for (int i = 0; i < clientChannels; i++) {
+ channels.add(newChannel());
+ }
+
+ long warmupEnd = System.nanoTime() + WARMUP_TIME;
+ do {
+ doRpcs(channels.get(0), req, 10000).get();
+ } while (System.nanoTime() < warmupEnd);
+
+ long startTime = System.nanoTime();
+
+ List<Future<Histogram>> futures = new ArrayList<Future<Histogram>>(clientThreads);
+ for (int i = 0; i < clientThreads; i++) {
+ // The channel to thread assignment works a bit different than in the C++ version.
+ // It's the same for the "interesting cases": clientThreads == clientChannels and
+ // clientChannels == 1.
+ // It however doesn't support "cache thrashing" as mentioned in the comments of the
+ // C++ client. That's because it's my understanding that it doesn't make sense for our API
+ // as we neither use fixed threads per call nor do we pin them to specific cores.
+ Channel channel = channels.get(i % clientChannels);
+ futures.add(doRpcs(channel, req, numRpcs));
+ }
+
+ List<Histogram> histograms = new ArrayList<Histogram>(futures.size());
+ for (Future<Histogram> future : futures) {
+ histograms.add(future.get());
+ }
+
+ long elapsedTime = System.nanoTime() - startTime;
+
+ Histogram merged = merge(histograms);
+
+ assert merged.getTotalCount() == numRpcs * clientThreads;
+
+ printStats(merged, elapsedTime);
+
+ // shutdown
+ for (Channel channel : channels) {
+ ((ChannelImpl) channel).shutdown();
+ }
+ }
+
+ private Channel newChannel() {
+ if (okhttp) {
+ return OkHttpChannelBuilder.forAddress(serverHost, serverPort).build();
+ } else {
+ return NettyChannelBuilder.forAddress(serverHost, serverPort)
+ .negotiationType(NegotiationType.PLAINTEXT)
+ .build();
+ }
+ }
+
+ private boolean parseArgs(String[] args) {
+ try {
+ boolean hasServerPort = false;
+
+ for (String arg : args) {
+ if (!arg.startsWith("--")) {
+ System.err.println("All arguments must start with '--': " + arg);
+ printUsage();
+ return false;
+ }
+
+ String[] pair = arg.substring(2).split("=", 2);
+ if (pair.length < 2) {
+ continue;
+ }
+ String key = pair[0];
+ String value = pair[1];
+
+ if ("client_channels".equals(key)) {
+ clientChannels = max(Integer.parseInt(value), 1);
+ } else if ("client_threads".equals(key)) {
+ clientThreads = max(Integer.parseInt(value), 1);
+ } else if ("num_rpcs".equals(key)) {
+ numRpcs = max(Integer.parseInt(value), 1);
+ } else if ("payload_size".equals(key)) {
+ payloadSize = max(Integer.parseInt(value), 0);
+ } else if ("server_host".equals(key)) {
+ serverHost = value;
+ } else if ("server_port".equals(key)) {
+ serverPort = Integer.parseInt(value);
+ hasServerPort = true;
+ } else if ("transport".equals(key)) {
+ okhttp = "okhttp".equals(value);
+ }
+ }
+
+ if (!hasServerPort) {
+ System.err.println("'--server_port' was not specified.");
+ printUsage();
+ return false;
+ }
+ } catch (Exception e) {
+ e.printStackTrace();
+ printUsage();
+ return false;
+ }
+
+ return true;
+ }
+
+ private void printUsage() {
+ Client c = new Client();
+ System.out.println(
+ "Usage: [ARGS...]"
+ + "\n"
+ + "\n --server_port=INT Port of the server. Required. No default."
+ + "\n --server_host=STR Hostname of the server. Default " + c.serverHost
+ + "\n --client_channels=INT Number of client channels. Default " + c.clientChannels
+ + "\n --client_threads=INT Number of client threads. Default " + c.clientThreads
+ + "\n --num_rpcs=INT Number of RPCs per thread. Default " + c.numRpcs
+ + "\n --payload_size=INT Payload size in bytes. Default " + c.payloadSize
+ + "\n --transport=(okhttp|netty) The transport to use. Default netty"
+ );
+ }
+
+ private Future<Histogram> doRpcs(Channel channel,
+ final SimpleRequest request,
+ final int numRpcs) {
+ final TestServiceStub stub = TestServiceGrpc.newStub(channel);
+ final CountDownLatch remainingRpcs = new CountDownLatch(numRpcs);
+ final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
+ final HistogramFuture future = new HistogramFuture(histogram, remainingRpcs);
+
+ stub.unaryCall(request, new StreamObserver<SimpleResponse>() {
+ long lastCall = System.nanoTime();
+
+ @Override
+ public void onValue(SimpleResponse value) {
+ PayloadType type = value.getPayload().getType();
+ int actualSize = value.getPayload().getBody().size();
+
+ if (!PayloadType.COMPRESSABLE.equals(type)) {
+ throw new RuntimeException("type was '" + type + "', expected '" +
+ PayloadType.COMPRESSABLE + "'.");
+ }
+
+ if (payloadSize != actualSize) {
+ throw new RuntimeException("size was '" + actualSize + "', expected '" +
+ payloadSize + "'");
+ }
+ }
+
+ @Override
+ public void onError(Throwable t) {
+ Status status = Status.fromThrowable(t);
+ System.err.println("onError called: " + status);
+
+ future.cancel(true);
+ }
+
+ @Override
+ public void onCompleted() {
+ long now = System.nanoTime();
+ histogram.recordValue(now - lastCall);
+ lastCall = now;
+
+ remainingRpcs.countDown();
+
+ if (remainingRpcs.getCount() > 0) {
+ stub.unaryCall(request, this);
+ }
+ }
+ });
+
+ return future;
+ }
+
+ private Histogram merge(List<Histogram> histograms) {
+ Histogram merged = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION);
+ for (Histogram histogram : histograms) {
+ for (HistogramIterationValue value : histogram.allValues()) {
+ long latency = value.getValueIteratedTo();
+ long count = value.getCountAtValueIteratedTo();
+ merged.recordValueWithCount(latency, count);
+ }
+ }
+ return merged;
+ }
+
+ private void printStats(Histogram histogram, long elapsedTime) {
+ double percentiles[] = {50, 90, 95, 99, 99.9, 99.99};
+
+ // Generate a comma-separated string of percentiles
+ StringBuilder header = new StringBuilder();
+ StringBuilder values = new StringBuilder();
+
+ header.append("Threads, Channels, Payload Size, ");
+ values.append(String.format("%s, %d, %d, ", clientThreads, clientChannels, payloadSize));
+
+ for (double percentile : percentiles) {
+ header.append(percentile).append("%ile").append(", ");
+ values.append(histogram.getValueAtPercentile(percentile)).append(", ");
+ }
+
+ header.append("QPS");
+ values.append((histogram.getTotalCount() * 1000000000L) / elapsedTime);
+
+ System.out.println(header.toString());
+ System.out.println(values.toString());
+ }
+
+ private static class HistogramFuture implements Future<Histogram> {
+ private final Histogram histogram;
+ private final CountDownLatch latch;
+
+ private final AtomicBoolean canceled = new AtomicBoolean();
+
+ HistogramFuture(Histogram histogram, CountDownLatch latch) {
+ Preconditions.checkNotNull(histogram, "histogram");
+ Preconditions.checkNotNull(histogram, "latch");
+
+ this.histogram = histogram;
+ this.latch = latch;
+ }
+
+ @Override
+ public boolean cancel(boolean mayInterruptIfRunning) {
+ if (latch.getCount() > 0 && canceled.compareAndSet(false, true)) {
+ while (latch.getCount() > 0) {
+ latch.countDown();
+ }
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public boolean isCancelled() {
+ return canceled.get();
+ }
+
+ @Override
+ public boolean isDone() {
+ return latch.getCount() == 0 || canceled.get();
+ }
+
+ @Override
+ public Histogram get() throws InterruptedException, ExecutionException {
+ latch.await();
+
+ if (canceled.get()) {
+ throw new CancellationException();
+ }
+
+ return histogram;
+ }
+
+ @Override
+ public Histogram get(long timeout, TimeUnit unit) throws InterruptedException,
+ ExecutionException,
+ TimeoutException {
+ throw new UnsupportedOperationException();
+ }
+ }
+}
diff --git a/benchmarks/src/main/proto/qpstest.proto b/benchmarks/src/main/proto/qpstest.proto
new file mode 100644
index 000000000..8acbe19b1
--- /dev/null
+++ b/benchmarks/src/main/proto/qpstest.proto
@@ -0,0 +1,158 @@
+// An integration test service that covers all the method signature permutations
+// of unary/streaming requests/responses.
+syntax = "proto2";
+
+package grpc.testing;
+
+enum PayloadType {
+ // Compressable text format.
+ COMPRESSABLE= 1;
+
+ // Uncompressable binary format.
+ UNCOMPRESSABLE = 2;
+
+ // Randomly chosen from all other formats defined in this enum.
+ RANDOM = 3;
+}
+
+message StatsRequest {
+ // run number
+ optional int32 test_num = 1;
+}
+
+message ServerStats {
+ // wall clock time for timestamp
+ required double time_now = 1;
+
+ // user time used by the server process and threads
+ required double time_user = 2;
+
+ // server time used by the server process and all threads
+ required double time_system = 3;
+
+ // RPC count so far
+ optional int32 num_rpcs = 4;
+}
+
+message Payload {
+ // The type of data in body.
+ optional PayloadType type = 1;
+ // Primary contents of payload.
+ optional bytes body = 2;
+}
+
+message Latencies {
+ required double l_50 = 1;
+ required double l_90 = 2;
+ required double l_99 = 3;
+ required double l_999 = 4;
+}
+
+message StartArgs {
+ required string server_host = 1;
+ required int32 server_port = 2;
+ optional bool enable_ssl = 3 [default = false];
+ optional int32 client_threads = 4 [default = 1];
+ optional int32 client_channels = 5 [default = -1];
+ optional int32 num_rpcs = 6 [default = 1];
+ optional int32 payload_size = 7 [default = 1];
+}
+
+message StartResult {
+ required Latencies latencies = 1;
+ required int32 num_rpcs = 2;
+ required double time_elapsed = 3;
+ required double time_user = 4;
+ required double time_system = 5;
+}
+
+message SimpleRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, server randomly chooses one from other formats.
+ optional PayloadType response_type = 1 [default=COMPRESSABLE];
+
+ // Desired payload size in the response from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ optional int32 response_size = 2;
+
+ // Optional input payload sent along with the request.
+ optional Payload payload = 3;
+}
+
+message SimpleResponse {
+ optional Payload payload = 1;
+}
+
+message StreamingInputCallRequest {
+ // Optional input payload sent along with the request.
+ optional Payload payload = 1;
+
+ // Not expecting any payload from the response.
+}
+
+message StreamingInputCallResponse {
+ // Aggregated size of payloads received from the client.
+ optional int32 aggregated_payload_size = 1;
+}
+
+message ResponseParameters {
+ // Desired payload sizes in responses from the server.
+ // If response_type is COMPRESSABLE, this denotes the size before compression.
+ required int32 size = 1;
+
+ // Desired interval between consecutive responses in the response stream in
+ // microseconds.
+ required int32 interval_us = 2;
+}
+
+message StreamingOutputCallRequest {
+ // Desired payload type in the response from the server.
+ // If response_type is RANDOM, the payload from each response in the stream
+ // might be of different types. This is to simulate a mixed type of payload
+ // stream.
+ optional PayloadType response_type = 1 [default=COMPRESSABLE];
+
+ repeated ResponseParameters response_parameters = 2;
+
+ // Optional input payload sent along with the request.
+ optional Payload payload = 3;
+}
+
+message StreamingOutputCallResponse {
+ optional Payload payload = 1;
+}
+
+service TestService {
+ // Start test with specified workload
+ rpc StartTest(StartArgs) returns (Latencies);
+
+ // Collect stats from server, ignore request content
+ rpc CollectServerStats(StatsRequest) returns (ServerStats);
+
+ // One request followed by one response.
+ // The server returns the client payload as-is.
+ rpc UnaryCall(SimpleRequest) returns (SimpleResponse);
+
+ // One request followed by a sequence of responses (streamed download).
+ // The server returns the payload with client desired type and sizes.
+ rpc StreamingOutputCall(StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+
+ // A sequence of requests followed by one response (streamed upload).
+ // The server returns the aggregated size of client payload as the result.
+ rpc StreamingInputCall(stream StreamingInputCallRequest)
+ returns (StreamingInputCallResponse);
+
+ // A sequence of requests with each request served by the server immediately.
+ // As one request could lead to multiple responses, this interface
+ // demonstrates the idea of full duplexing.
+ rpc FullDuplexCall(stream StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+
+ // A sequence of requests followed by a sequence of responses.
+ // The server buffers all the client requests and then serves them in order. A
+ // stream of responses are returned to the client when the server starts with
+ // first request.
+ rpc HalfDuplexCall(stream StreamingOutputCallRequest)
+ returns (stream StreamingOutputCallResponse);
+}
diff --git a/build.gradle b/build.gradle
index 1bcdf1b28..0e0c039f3 100644
--- a/build.gradle
+++ b/build.gradle
@@ -28,6 +28,8 @@ subprojects {
hpack: 'com.twitter:hpack:0.9.1',
protobuf_plugin: 'ws.antonov.gradle.plugins:gradle-plugin-protobuf:0.9.1',
okhttp: 'com.squareup.okhttp:okhttp:2.2.0',
+ // used to collect benchmark results
+ hdrhistogram: 'org.hdrhistogram:HdrHistogram:2.1.4',
// TODO: Unreleased dependencies.
// These must already be installed in the local maven repository.
diff --git a/settings.gradle b/settings.gradle
index 41d99c09e..1175a681d 100644
--- a/settings.gradle
+++ b/settings.gradle
@@ -8,6 +8,7 @@ include ":grpc-testing"
include ":grpc-compiler"
include ":grpc-integration-testing"
include ":grpc-all"
+include ":grpc-benchmarks"
project(':grpc-core').projectDir = "$rootDir/core" as File
project(':grpc-stub').projectDir = "$rootDir/stub" as File
@@ -18,3 +19,4 @@ project(':grpc-testing').projectDir = "$rootDir/testing" as File
project(':grpc-compiler').projectDir = "$rootDir/compiler" as File
project(':grpc-integration-testing').projectDir = "$rootDir/integration-testing" as File
project(':grpc-all').projectDir = "$rootDir/all" as File
+project(':grpc-benchmarks').projectDir = "$rootDir/benchmarks" as File