From 3fd7d0675cd8ec60f6c6f46a01266c04be65f7c2 Mon Sep 17 00:00:00 2001 From: Jakob Buchgraber Date: Tue, 10 Feb 2015 16:01:26 -0800 Subject: Add QPS Client to perform throughput and latency tests. --- benchmarks/build.gradle | 41 +++ .../main/java/io/grpc/benchmarks/qps/Client.java | 360 +++++++++++++++++++++ benchmarks/src/main/proto/qpstest.proto | 158 +++++++++ build.gradle | 2 + settings.gradle | 2 + 5 files changed, 563 insertions(+) create mode 100644 benchmarks/build.gradle create mode 100644 benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java create mode 100644 benchmarks/src/main/proto/qpstest.proto diff --git a/benchmarks/build.gradle b/benchmarks/build.gradle new file mode 100644 index 000000000..0b688d283 --- /dev/null +++ b/benchmarks/build.gradle @@ -0,0 +1,41 @@ +apply plugin: 'application' +apply plugin: 'protobuf' + +description = "gRPC Benchmarks" + +mainClassName = "io.grpc.benchmarks.qps.Client" + +buildscript { + repositories { + mavenCentral() + } + dependencies { + classpath libraries.protobuf_plugin + } +} + +dependencies { + compile project(':grpc-core'), + project(':grpc-netty'), + project(':grpc-okhttp'), + project(':grpc-stub'), + project(':grpc-testing'), + libraries.junit, + libraries.mockito, + libraries.hdrhistogram +} + +protobufCodeGenPlugins = ["java_plugin:$rootDir/compiler/build/binaries/java_pluginExecutable/java_plugin"] +compileJava.dependsOn = ['generateProto'] + +// Allow execution of test client and server. +task execute(dependsOn: classes, type:JavaExec) { + main = project.hasProperty('mainClass') ? project.mainClass : 'io.grpc.benchmarks.qps.Client' + classpath = sourceSets.main.runtimeClasspath + workingDir = project.rootDir + + // If appArgs were provided, set the program arguments. + if (project.hasProperty("appArgs")) { + args = Eval.me(appArgs) + } +} diff --git a/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java new file mode 100644 index 000000000..a1a2c1bcc --- /dev/null +++ b/benchmarks/src/main/java/io/grpc/benchmarks/qps/Client.java @@ -0,0 +1,360 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc.benchmarks.qps; + +import static grpc.testing.TestServiceGrpc.TestServiceStub; +import static grpc.testing.Qpstest.SimpleRequest; +import static grpc.testing.Qpstest.SimpleResponse; +import static java.lang.Math.max; + +import com.google.common.base.Preconditions; + +import grpc.testing.Qpstest.PayloadType; +import grpc.testing.TestServiceGrpc; +import io.grpc.Channel; +import io.grpc.ChannelImpl; +import io.grpc.Status; +import io.grpc.stub.StreamObserver; +import io.grpc.transport.netty.NegotiationType; +import io.grpc.transport.netty.NettyChannelBuilder; +import io.grpc.transport.okhttp.OkHttpChannelBuilder; +import org.HdrHistogram.Histogram; +import org.HdrHistogram.HistogramIterationValue; + +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CancellationException; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.Future; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.logging.Logger; + +// TODO: Add OkHttp and Netty TLS Support + +/** + * Runs lots of RPCs against a QPS Server to test for throughput and latency. + * It's a Java clone of the C version at + * https://github.com/grpc/grpc/blob/master/test/cpp/qps/client.cc + */ +public class Client { + private static final Logger log = Logger.getLogger(Client.class.getName()); + + // Can record values between 1 ns and 1 min (60 BILLION NS) + private static final long HISTOGRAM_MAX_VALUE = 60000000000L; + private static final int HISTOGRAM_PRECISION = 3; + // How long (in ns) to do RPCs before it counts + private static final long WARMUP_TIME = 5000000000L; + + private int clientChannels = 4; + private int clientThreads = 4; + private int numRpcs = 100000; + private int payloadSize = 1; + private String serverHost = "127.0.0.1"; + private int serverPort; + private boolean okhttp; + + public static void main(String... args) throws Exception { + Client c = new Client(); + c.run(args); + } + + private void run(String[] args) throws Exception { + if (!parseArgs(args)) { + return; + } + + SimpleRequest req = SimpleRequest.newBuilder() + .setResponseType(PayloadType.COMPRESSABLE) + .setResponseSize(payloadSize) + .build(); + + List channels = new ArrayList(clientChannels); + for (int i = 0; i < clientChannels; i++) { + channels.add(newChannel()); + } + + long warmupEnd = System.nanoTime() + WARMUP_TIME; + do { + doRpcs(channels.get(0), req, 10000).get(); + } while (System.nanoTime() < warmupEnd); + + long startTime = System.nanoTime(); + + List> futures = new ArrayList>(clientThreads); + for (int i = 0; i < clientThreads; i++) { + // The channel to thread assignment works a bit different than in the C++ version. + // It's the same for the "interesting cases": clientThreads == clientChannels and + // clientChannels == 1. + // It however doesn't support "cache thrashing" as mentioned in the comments of the + // C++ client. That's because it's my understanding that it doesn't make sense for our API + // as we neither use fixed threads per call nor do we pin them to specific cores. + Channel channel = channels.get(i % clientChannels); + futures.add(doRpcs(channel, req, numRpcs)); + } + + List histograms = new ArrayList(futures.size()); + for (Future future : futures) { + histograms.add(future.get()); + } + + long elapsedTime = System.nanoTime() - startTime; + + Histogram merged = merge(histograms); + + assert merged.getTotalCount() == numRpcs * clientThreads; + + printStats(merged, elapsedTime); + + // shutdown + for (Channel channel : channels) { + ((ChannelImpl) channel).shutdown(); + } + } + + private Channel newChannel() { + if (okhttp) { + return OkHttpChannelBuilder.forAddress(serverHost, serverPort).build(); + } else { + return NettyChannelBuilder.forAddress(serverHost, serverPort) + .negotiationType(NegotiationType.PLAINTEXT) + .build(); + } + } + + private boolean parseArgs(String[] args) { + try { + boolean hasServerPort = false; + + for (String arg : args) { + if (!arg.startsWith("--")) { + System.err.println("All arguments must start with '--': " + arg); + printUsage(); + return false; + } + + String[] pair = arg.substring(2).split("=", 2); + if (pair.length < 2) { + continue; + } + String key = pair[0]; + String value = pair[1]; + + if ("client_channels".equals(key)) { + clientChannels = max(Integer.parseInt(value), 1); + } else if ("client_threads".equals(key)) { + clientThreads = max(Integer.parseInt(value), 1); + } else if ("num_rpcs".equals(key)) { + numRpcs = max(Integer.parseInt(value), 1); + } else if ("payload_size".equals(key)) { + payloadSize = max(Integer.parseInt(value), 0); + } else if ("server_host".equals(key)) { + serverHost = value; + } else if ("server_port".equals(key)) { + serverPort = Integer.parseInt(value); + hasServerPort = true; + } else if ("transport".equals(key)) { + okhttp = "okhttp".equals(value); + } + } + + if (!hasServerPort) { + System.err.println("'--server_port' was not specified."); + printUsage(); + return false; + } + } catch (Exception e) { + e.printStackTrace(); + printUsage(); + return false; + } + + return true; + } + + private void printUsage() { + Client c = new Client(); + System.out.println( + "Usage: [ARGS...]" + + "\n" + + "\n --server_port=INT Port of the server. Required. No default." + + "\n --server_host=STR Hostname of the server. Default " + c.serverHost + + "\n --client_channels=INT Number of client channels. Default " + c.clientChannels + + "\n --client_threads=INT Number of client threads. Default " + c.clientThreads + + "\n --num_rpcs=INT Number of RPCs per thread. Default " + c.numRpcs + + "\n --payload_size=INT Payload size in bytes. Default " + c.payloadSize + + "\n --transport=(okhttp|netty) The transport to use. Default netty" + ); + } + + private Future doRpcs(Channel channel, + final SimpleRequest request, + final int numRpcs) { + final TestServiceStub stub = TestServiceGrpc.newStub(channel); + final CountDownLatch remainingRpcs = new CountDownLatch(numRpcs); + final Histogram histogram = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION); + final HistogramFuture future = new HistogramFuture(histogram, remainingRpcs); + + stub.unaryCall(request, new StreamObserver() { + long lastCall = System.nanoTime(); + + @Override + public void onValue(SimpleResponse value) { + PayloadType type = value.getPayload().getType(); + int actualSize = value.getPayload().getBody().size(); + + if (!PayloadType.COMPRESSABLE.equals(type)) { + throw new RuntimeException("type was '" + type + "', expected '" + + PayloadType.COMPRESSABLE + "'."); + } + + if (payloadSize != actualSize) { + throw new RuntimeException("size was '" + actualSize + "', expected '" + + payloadSize + "'"); + } + } + + @Override + public void onError(Throwable t) { + Status status = Status.fromThrowable(t); + System.err.println("onError called: " + status); + + future.cancel(true); + } + + @Override + public void onCompleted() { + long now = System.nanoTime(); + histogram.recordValue(now - lastCall); + lastCall = now; + + remainingRpcs.countDown(); + + if (remainingRpcs.getCount() > 0) { + stub.unaryCall(request, this); + } + } + }); + + return future; + } + + private Histogram merge(List histograms) { + Histogram merged = new Histogram(HISTOGRAM_MAX_VALUE, HISTOGRAM_PRECISION); + for (Histogram histogram : histograms) { + for (HistogramIterationValue value : histogram.allValues()) { + long latency = value.getValueIteratedTo(); + long count = value.getCountAtValueIteratedTo(); + merged.recordValueWithCount(latency, count); + } + } + return merged; + } + + private void printStats(Histogram histogram, long elapsedTime) { + double percentiles[] = {50, 90, 95, 99, 99.9, 99.99}; + + // Generate a comma-separated string of percentiles + StringBuilder header = new StringBuilder(); + StringBuilder values = new StringBuilder(); + + header.append("Threads, Channels, Payload Size, "); + values.append(String.format("%s, %d, %d, ", clientThreads, clientChannels, payloadSize)); + + for (double percentile : percentiles) { + header.append(percentile).append("%ile").append(", "); + values.append(histogram.getValueAtPercentile(percentile)).append(", "); + } + + header.append("QPS"); + values.append((histogram.getTotalCount() * 1000000000L) / elapsedTime); + + System.out.println(header.toString()); + System.out.println(values.toString()); + } + + private static class HistogramFuture implements Future { + private final Histogram histogram; + private final CountDownLatch latch; + + private final AtomicBoolean canceled = new AtomicBoolean(); + + HistogramFuture(Histogram histogram, CountDownLatch latch) { + Preconditions.checkNotNull(histogram, "histogram"); + Preconditions.checkNotNull(histogram, "latch"); + + this.histogram = histogram; + this.latch = latch; + } + + @Override + public boolean cancel(boolean mayInterruptIfRunning) { + if (latch.getCount() > 0 && canceled.compareAndSet(false, true)) { + while (latch.getCount() > 0) { + latch.countDown(); + } + return true; + } + return false; + } + + @Override + public boolean isCancelled() { + return canceled.get(); + } + + @Override + public boolean isDone() { + return latch.getCount() == 0 || canceled.get(); + } + + @Override + public Histogram get() throws InterruptedException, ExecutionException { + latch.await(); + + if (canceled.get()) { + throw new CancellationException(); + } + + return histogram; + } + + @Override + public Histogram get(long timeout, TimeUnit unit) throws InterruptedException, + ExecutionException, + TimeoutException { + throw new UnsupportedOperationException(); + } + } +} diff --git a/benchmarks/src/main/proto/qpstest.proto b/benchmarks/src/main/proto/qpstest.proto new file mode 100644 index 000000000..8acbe19b1 --- /dev/null +++ b/benchmarks/src/main/proto/qpstest.proto @@ -0,0 +1,158 @@ +// An integration test service that covers all the method signature permutations +// of unary/streaming requests/responses. +syntax = "proto2"; + +package grpc.testing; + +enum PayloadType { + // Compressable text format. + COMPRESSABLE= 1; + + // Uncompressable binary format. + UNCOMPRESSABLE = 2; + + // Randomly chosen from all other formats defined in this enum. + RANDOM = 3; +} + +message StatsRequest { + // run number + optional int32 test_num = 1; +} + +message ServerStats { + // wall clock time for timestamp + required double time_now = 1; + + // user time used by the server process and threads + required double time_user = 2; + + // server time used by the server process and all threads + required double time_system = 3; + + // RPC count so far + optional int32 num_rpcs = 4; +} + +message Payload { + // The type of data in body. + optional PayloadType type = 1; + // Primary contents of payload. + optional bytes body = 2; +} + +message Latencies { + required double l_50 = 1; + required double l_90 = 2; + required double l_99 = 3; + required double l_999 = 4; +} + +message StartArgs { + required string server_host = 1; + required int32 server_port = 2; + optional bool enable_ssl = 3 [default = false]; + optional int32 client_threads = 4 [default = 1]; + optional int32 client_channels = 5 [default = -1]; + optional int32 num_rpcs = 6 [default = 1]; + optional int32 payload_size = 7 [default = 1]; +} + +message StartResult { + required Latencies latencies = 1; + required int32 num_rpcs = 2; + required double time_elapsed = 3; + required double time_user = 4; + required double time_system = 5; +} + +message SimpleRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, server randomly chooses one from other formats. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + // Desired payload size in the response from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + optional int32 response_size = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message SimpleResponse { + optional Payload payload = 1; +} + +message StreamingInputCallRequest { + // Optional input payload sent along with the request. + optional Payload payload = 1; + + // Not expecting any payload from the response. +} + +message StreamingInputCallResponse { + // Aggregated size of payloads received from the client. + optional int32 aggregated_payload_size = 1; +} + +message ResponseParameters { + // Desired payload sizes in responses from the server. + // If response_type is COMPRESSABLE, this denotes the size before compression. + required int32 size = 1; + + // Desired interval between consecutive responses in the response stream in + // microseconds. + required int32 interval_us = 2; +} + +message StreamingOutputCallRequest { + // Desired payload type in the response from the server. + // If response_type is RANDOM, the payload from each response in the stream + // might be of different types. This is to simulate a mixed type of payload + // stream. + optional PayloadType response_type = 1 [default=COMPRESSABLE]; + + repeated ResponseParameters response_parameters = 2; + + // Optional input payload sent along with the request. + optional Payload payload = 3; +} + +message StreamingOutputCallResponse { + optional Payload payload = 1; +} + +service TestService { + // Start test with specified workload + rpc StartTest(StartArgs) returns (Latencies); + + // Collect stats from server, ignore request content + rpc CollectServerStats(StatsRequest) returns (ServerStats); + + // One request followed by one response. + // The server returns the client payload as-is. + rpc UnaryCall(SimpleRequest) returns (SimpleResponse); + + // One request followed by a sequence of responses (streamed download). + // The server returns the payload with client desired type and sizes. + rpc StreamingOutputCall(StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by one response (streamed upload). + // The server returns the aggregated size of client payload as the result. + rpc StreamingInputCall(stream StreamingInputCallRequest) + returns (StreamingInputCallResponse); + + // A sequence of requests with each request served by the server immediately. + // As one request could lead to multiple responses, this interface + // demonstrates the idea of full duplexing. + rpc FullDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); + + // A sequence of requests followed by a sequence of responses. + // The server buffers all the client requests and then serves them in order. A + // stream of responses are returned to the client when the server starts with + // first request. + rpc HalfDuplexCall(stream StreamingOutputCallRequest) + returns (stream StreamingOutputCallResponse); +} diff --git a/build.gradle b/build.gradle index 1bcdf1b28..0e0c039f3 100644 --- a/build.gradle +++ b/build.gradle @@ -28,6 +28,8 @@ subprojects { hpack: 'com.twitter:hpack:0.9.1', protobuf_plugin: 'ws.antonov.gradle.plugins:gradle-plugin-protobuf:0.9.1', okhttp: 'com.squareup.okhttp:okhttp:2.2.0', + // used to collect benchmark results + hdrhistogram: 'org.hdrhistogram:HdrHistogram:2.1.4', // TODO: Unreleased dependencies. // These must already be installed in the local maven repository. diff --git a/settings.gradle b/settings.gradle index 41d99c09e..1175a681d 100644 --- a/settings.gradle +++ b/settings.gradle @@ -8,6 +8,7 @@ include ":grpc-testing" include ":grpc-compiler" include ":grpc-integration-testing" include ":grpc-all" +include ":grpc-benchmarks" project(':grpc-core').projectDir = "$rootDir/core" as File project(':grpc-stub').projectDir = "$rootDir/stub" as File @@ -18,3 +19,4 @@ project(':grpc-testing').projectDir = "$rootDir/testing" as File project(':grpc-compiler').projectDir = "$rootDir/compiler" as File project(':grpc-integration-testing').projectDir = "$rootDir/integration-testing" as File project(':grpc-all').projectDir = "$rootDir/all" as File +project(':grpc-benchmarks').projectDir = "$rootDir/benchmarks" as File -- cgit v1.2.3