aboutsummaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorEric Anderson <ejona@google.com>2014-12-19 16:41:03 -0800
committerEric Anderson <ejona@google.com>2015-01-27 09:20:24 -0800
commitaeeebb7cdbdefadcf4e200974e8ef504fae05756 (patch)
treea346f7035e25dba974c2ac939c50080c78e4dc01
parent19052499f79381551a31ccd0ccf8f37a6b8e611b (diff)
downloadgrpc-grpc-java-aeeebb7cdbdefadcf4e200974e8ef504fae05756.tar.gz
Remove Service API from ServerImpl
Fixes #21
-rw-r--r--core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java25
-rw-r--r--core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java51
-rw-r--r--core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java194
-rw-r--r--core/src/main/java/com/google/net/stubby/Server.java4
-rw-r--r--core/src/main/java/com/google/net/stubby/ServerImpl.java192
-rw-r--r--core/src/test/java/com/google/net/stubby/ServerImplTest.java74
-rw-r--r--integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java4
-rw-r--r--integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java10
-rw-r--r--netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java14
9 files changed, 234 insertions, 334 deletions
diff --git a/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java b/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java
index 0ecda3d2e..16b9d52f2 100644
--- a/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java
+++ b/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java
@@ -31,12 +31,13 @@
package com.google.net.stubby;
-import static com.google.net.stubby.AbstractServiceBuilder.DEFAULT_EXECUTOR;
-
import com.google.common.base.Preconditions;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.net.stubby.SharedResourceHolder.Resource;
import com.google.net.stubby.transport.ClientTransportFactory;
import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
import javax.annotation.Nullable;
@@ -46,6 +47,26 @@ import javax.annotation.Nullable;
* @param <BuilderT> The concrete type of this builder.
*/
public abstract class AbstractChannelBuilder<BuilderT extends AbstractChannelBuilder<BuilderT>> {
+ static final Resource<ExecutorService> DEFAULT_EXECUTOR =
+ new Resource<ExecutorService>() {
+ private static final String name = "grpc-default-executor";
+ @Override
+ public ExecutorService create() {
+ return Executors.newCachedThreadPool(new ThreadFactoryBuilder()
+ .setNameFormat(name + "-%d").build());
+ }
+
+ @Override
+ public void close(ExecutorService instance) {
+ instance.shutdown();
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ };
+
@Nullable
private ExecutorService userExecutor;
diff --git a/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java b/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java
index 516a44065..09d934686 100644
--- a/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java
+++ b/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java
@@ -31,21 +31,26 @@
package com.google.net.stubby;
+import static com.google.net.stubby.AbstractChannelBuilder.DEFAULT_EXECUTOR;
+
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.Service;
import com.google.net.stubby.transport.ServerListener;
import java.util.concurrent.ExecutorService;
+import javax.annotation.Nullable;
+
/**
* The base class for server builders.
*
* @param <BuilderT> The concrete type for this builder.
*/
-public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuilder<BuilderT>>
- extends AbstractServiceBuilder<ServerImpl, BuilderT> {
+public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuilder<BuilderT>> {
private final HandlerRegistry registry;
+ @Nullable
+ private ExecutorService userExecutor;
/**
* Constructs using a given handler registry.
@@ -62,6 +67,21 @@ public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuild
}
/**
+ * Provides a custom executor.
+ *
+ * <p>It's an optional parameter. If the user has not provided an executor when the server is
+ * built, the builder will use a static cached thread pool.
+ *
+ * <p>The server won't take ownership of the given executor. It's caller's responsibility to
+ * shut down the executor when it's desired.
+ */
+ @SuppressWarnings("unchecked")
+ public final BuilderT executor(ExecutorService executor) {
+ userExecutor = executor;
+ return (BuilderT) this;
+ }
+
+ /**
* Adds a service implementation to the handler registry.
*
* <p>This is supported only if the user didn't provide a handler registry, or the provided one is
@@ -76,10 +96,33 @@ public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuild
throw new UnsupportedOperationException("Underlying HandlerRegistry is not mutable");
}
- @Override
- protected final ServerImpl buildImpl(ExecutorService executor) {
+ /**
+ * Builds a server using the given parameters.
+ *
+ * <p>The returned service will not been started or be bound a port. You will need to start it
+ * with {@link ServerImpl#start()}.
+ */
+ public ServerImpl build() {
+ final ExecutorService executor;
+ final boolean releaseExecutor;
+ if (userExecutor != null) {
+ executor = userExecutor;
+ releaseExecutor = false;
+ } else {
+ executor = SharedResourceHolder.get(DEFAULT_EXECUTOR);
+ releaseExecutor = true;
+ }
+
ServerImpl server = new ServerImpl(executor, registry);
server.setTransportServer(buildTransportServer(server.serverListener()));
+ server.setTerminationRunnable(new Runnable() {
+ @Override
+ public void run() {
+ if (releaseExecutor) {
+ SharedResourceHolder.release(DEFAULT_EXECUTOR, executor);
+ }
+ }
+ });
return server;
}
diff --git a/core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java b/core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java
deleted file mode 100644
index 2c23a02e9..000000000
--- a/core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java
+++ /dev/null
@@ -1,194 +0,0 @@
-/*
- * Copyright 2014, Google Inc. All rights reserved.
- *
- * Redistribution and use in source and binary forms, with or without
- * modification, are permitted provided that the following conditions are
- * met:
- *
- * * Redistributions of source code must retain the above copyright
- * notice, this list of conditions and the following disclaimer.
- * * Redistributions in binary form must reproduce the above
- * copyright notice, this list of conditions and the following disclaimer
- * in the documentation and/or other materials provided with the
- * distribution.
- *
- * * Neither the name of Google Inc. nor the names of its
- * contributors may be used to endorse or promote products derived from
- * this software without specific prior written permission.
- *
- * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
- * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
- * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
- * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
- * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
- * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
- * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
- * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
- * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
- * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
- * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
- */
-
-package com.google.net.stubby;
-
-import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.MoreExecutors;
-import com.google.common.util.concurrent.Service;
-import com.google.common.util.concurrent.ThreadFactoryBuilder;
-import com.google.net.stubby.SharedResourceHolder.Resource;
-
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-
-import javax.annotation.Nullable;
-
-/**
- * Base class for channel builders and server builders.
- *
- * <p>The ownership rule: a builder generally does not take ownership of any objects passed to it.
- * The caller is responsible for closing them if needed. The builder is only responsible for the
- * life-cycle of objects created inside.
- *
- * @param <ProductT> The product that is built by this builder.
- * @param <BuilderT> The concrete type of this builder.
- */
-abstract class AbstractServiceBuilder<ProductT extends Service,
- BuilderT extends AbstractServiceBuilder<ProductT, BuilderT>> {
-
- static final Resource<ExecutorService> DEFAULT_EXECUTOR =
- new Resource<ExecutorService>() {
- private static final String name = "grpc-default-executor";
- @Override
- public ExecutorService create() {
- return Executors.newCachedThreadPool(new ThreadFactoryBuilder()
- .setNameFormat(name + "-%d").build());
- }
-
- @Override
- public void close(ExecutorService instance) {
- instance.shutdown();
- }
-
- @Override
- public String toString() {
- return name;
- }
- };
-
- @Nullable
- private ExecutorService userExecutor;
-
- /**
- * Provides a custom executor.
- *
- * <p>It's an optional parameter. If the user has not provided an executor when the service is
- * built, the builder will use a static cached thread pool.
- *
- * <p>The service won't take ownership of the given executor. It's caller's responsibility to
- * shut down the executor when it's desired.
- */
- @SuppressWarnings("unchecked")
- public final BuilderT executor(ExecutorService executor) {
- userExecutor = executor;
- return (BuilderT) this;
- }
-
- /**
- * Builds a service using the given parameters.
- *
- * <p>The returned service has not been started at this point. You will need to start it by
- * yourself or use {@link #buildAndStart()}.
- */
- public ProductT build() {
- final ExecutorService executor = (userExecutor == null)
- ? SharedResourceHolder.get(DEFAULT_EXECUTOR) : userExecutor;
- ProductT service = buildImpl(executor);
- // We shut down the executor only if we created it.
- if (userExecutor == null) {
- service.addListener(new ClosureHook() {
- @Override
- protected void onClosed() {
- SharedResourceHolder.release(DEFAULT_EXECUTOR, executor);
- }
- }, MoreExecutors.directExecutor());
- }
- return service;
- }
-
- /**
- * Builds and starts a service.
- *
- * <p>The service may not be running when this method returns. If you want to wait until it's up
- * and running, either use {@link Service#awaitRunning()} or {@link #buildAndWaitForRunning()}.
- *
- * @return the service that has just been built and started
- */
- public final ProductT buildAndStart() {
- ProductT service = build();
- service.startAsync();
- return service;
- }
-
- /**
- * Builds and starts a service, and wait until it's up and running.
- *
- * @return the service that has just been built and is now running.
- */
- public final ProductT buildAndWaitForRunning() {
- ProductT service = buildAndStart();
- try {
- service.awaitRunning();
- } catch (Exception e) {
- service.stopAsync();
- throw Throwables.propagate(e);
- }
- return service;
- }
-
- /**
- * Builds and starts a service, and wait until it's up and running, with a timeout.
- *
- * @return the service that has just been built and is now running.
- * @throws TimeoutException if the service didn't become running within the given timeout.
- */
- public final ProductT buildAndWaitForRunning(long timeout, TimeUnit unit)
- throws TimeoutException {
- ProductT service = buildAndStart();
- try {
- service.awaitRunning(timeout, unit);
- } catch (Exception e) {
- service.stopAsync();
- if (e instanceof TimeoutException) {
- throw (TimeoutException) e;
- } else {
- throw Throwables.propagate(e);
- }
- }
- return service;
- }
-
- /**
- * Subclasses may use this as a convenient listener for cleaning up after the built service.
- */
- protected abstract static class ClosureHook extends Service.Listener {
- protected abstract void onClosed();
-
- @Override
- public void terminated(Service.State from) {
- onClosed();
- }
-
- @Override
- public void failed(Service.State from, Throwable failure) {
- onClosed();
- }
- }
-
- /**
- * Implemented by subclasses to build the actual service object. The given executor is owned by
- * this base class.
- */
- protected abstract ProductT buildImpl(ExecutorService executor);
-}
diff --git a/core/src/main/java/com/google/net/stubby/Server.java b/core/src/main/java/com/google/net/stubby/Server.java
index e413777d9..fbc2e94e5 100644
--- a/core/src/main/java/com/google/net/stubby/Server.java
+++ b/core/src/main/java/com/google/net/stubby/Server.java
@@ -31,8 +31,6 @@
package com.google.net.stubby;
-import com.google.common.util.concurrent.Service;
-
import javax.annotation.concurrent.ThreadSafe;
/**
@@ -40,4 +38,4 @@ import javax.annotation.concurrent.ThreadSafe;
* not expected to be implemented by application code or interceptors.
*/
@ThreadSafe
-public interface Server extends Service {}
+public interface Server {}
diff --git a/core/src/main/java/com/google/net/stubby/ServerImpl.java b/core/src/main/java/com/google/net/stubby/ServerImpl.java
index df933fe00..5883d4880 100644
--- a/core/src/main/java/com/google/net/stubby/ServerImpl.java
+++ b/core/src/main/java/com/google/net/stubby/ServerImpl.java
@@ -33,7 +33,6 @@ package com.google.net.stubby;
import com.google.common.base.Preconditions;
import com.google.common.base.Throwables;
-import com.google.common.util.concurrent.AbstractService;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.Service;
import com.google.net.stubby.transport.ServerListener;
@@ -47,6 +46,7 @@ import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
/**
* Default implementation of {@link Server}, for creation by transports.
@@ -64,7 +64,7 @@ import java.util.concurrent.Executor;
* <p>Starting the server starts the underlying transport for servicing requests. Stopping the
* server stops servicing new requests and waits for all connections to terminate.
*/
-public class ServerImpl extends AbstractService implements Server {
+public class ServerImpl implements Server {
private static final ServerStreamListener NOOP_LISTENER = new NoopListener();
private final ServerListener serverListener = new ServerListenerImpl();
@@ -72,11 +72,14 @@ public class ServerImpl extends AbstractService implements Server {
/** Executor for application processing. */
private final Executor executor;
private final HandlerRegistry registry;
+ private boolean started;
+ private boolean shutdown;
+ private boolean terminated;
+ private Runnable terminationRunnable;
/** Service encapsulating something similar to an accept() socket. */
private Service transportServer;
/** {@code transportServer} and services encapsulating something similar to a TCP connection. */
- private final Collection<Service> transports
- = Collections.synchronizedSet(new HashSet<Service>());
+ private final Collection<Service> transports = new HashSet<Service>();
/**
* Construct a server. {@link #setTransportServer(Service)} must be called before starting the
@@ -96,18 +99,17 @@ public class ServerImpl extends AbstractService implements Server {
*
* @return this object
*/
- public ServerImpl setTransportServer(Service transportServer) {
- Preconditions.checkState(state() == Server.State.NEW, "server must be in NEW state");
+ public synchronized ServerImpl setTransportServer(Service transportServer) {
+ if (shutdown) {
+ throw new IllegalStateException("Already shutdown");
+ }
Preconditions.checkState(this.transportServer == null, "transportServer already set");
this.transportServer = Preconditions.checkNotNull(transportServer);
Preconditions.checkArgument(
- transportServer.state() == Server.State.NEW, "transport server not in NEW state");
- transportServer.addListener(new TransportLifecycleListener(), MoreExecutors.directExecutor());
+ transportServer.state() == Service.State.NEW, "transport server not in NEW state");
+ transportServer.addListener(new TransportServiceListener(transportServer),
+ MoreExecutors.directExecutor());
transports.add(transportServer);
- // We assume that transport.state() won't change by another thread before we return from this
- // call.
- Preconditions.checkState(
- transportServer.state() == Server.State.NEW, "transport server changed state!");
return this;
}
@@ -116,64 +118,114 @@ public class ServerImpl extends AbstractService implements Server {
return serverListener;
}
- @Override
- protected void doStart() {
- Preconditions.checkState(transportServer != null, "setTransportServer not called");
- transportServer.startAsync();
- }
-
- @Override
- protected void doStop() {
- stopTransports();
+ /** Hack to allow executors to auto-shutdown. Not for general use. */
+ // TODO(ejona): Replace with a real API.
+ synchronized void setTerminationRunnable(Runnable runnable) {
+ this.terminationRunnable = runnable;
}
/**
- * Remove transport service from accounting list and notify of complete shutdown if necessary.
+ * Bind and start the server.
*
- * @param transport service to remove
- * @return {@code true} if shutting down and it is now complete
+ * @return {@code this} object
+ * @throws IllegalStateException if already started
*/
- private boolean transportClosed(Service transport) {
- boolean shutdownComplete;
- synchronized (transports) {
- if (!transports.remove(transport)) {
- throw new AssertionError("Transport already removed");
- }
- shutdownComplete = transports.isEmpty();
+ public synchronized ServerImpl start() {
+ if (started) {
+ throw new IllegalStateException("Already started");
}
- if (shutdownComplete) {
- Service.State state = state();
- if (state == Service.State.STOPPING) {
- notifyStopped();
- } else if (state == Service.State.FAILED) {
- // NOOP: already failed
- } else {
- notifyFailed(new IllegalStateException("server transport terminated unexpectedly"));
+ started = true;
+ try {
+ // Start and wait for any port to actually be bound.
+ transportServer.startAsync().awaitRunning();
+ } catch (IllegalStateException ex) {
+ Throwable t = transportServer.failureCause();
+ if (t != null) {
+ throw Throwables.propagate(t);
}
+ throw ex;
}
- return shutdownComplete;
+ return this;
}
/**
- * The transport server closed, so cleanup its resources and start shutdown.
+ * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected.
*/
- private void transportServerClosed() {
- boolean shutdownComplete = transportClosed(transportServer);
- if (shutdownComplete) {
- return;
+ public synchronized ServerImpl shutdown() {
+ shutdown = true;
+ // transports collection can be modified during stopAsync(), even if we hold the lock, due to
+ // reentrancy.
+ for (Service transport : transports.toArray(new Service[transports.size()])) {
+ transport.stopAsync();
}
- stopTransports();
+ return this;
}
/**
- * Shutdown all transports (including transportServer). Safe to be called even if previously
- * called.
+ * Initiates a forceful shutdown in which preexisting and new calls are rejected. Although
+ * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely
+ * return {@code false} immediately after this method returns.
+ *
+ * <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown().
*/
- private void stopTransports() {
- for (Service transport : transports.toArray(new Service[0])) {
- // transports list can be modified during this call, even if we hold the lock, due to
- // reentrancy.
- transport.stopAsync();
+ // TODO(ejona): cancel preexisting calls.
+ public synchronized ServerImpl shutdownNow() {
+ shutdown();
+ return this;
+ }
+
+ /**
+ * Returns whether the server is shutdown. Shutdown servers reject any new calls, but may still
+ * have some calls being processed.
+ *
+ * @see #shutdown()
+ * @see #isTerminated()
+ */
+ public synchronized boolean isShutdown() {
+ return shutdown;
+ }
+
+ /**
+ * Waits for the server to become terminated, giving up if the timeout is reached.
+ *
+ * @return whether the server is terminated, as would be done by {@link #isTerminated()}.
+ */
+ public synchronized boolean awaitTerminated(long timeout, TimeUnit unit)
+ throws InterruptedException {
+ long timeoutNanos = unit.toNanos(timeout);
+ long endTimeNanos = System.nanoTime() + timeoutNanos;
+ while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) {
+ TimeUnit.NANOSECONDS.timedWait(this, timeoutNanos);
+ }
+ return terminated;
+ }
+
+ /**
+ * Returns whether the server is terminated. Terminated servers have no running calls and
+ * relevant resources released (like TCP connections).
+ *
+ * @see #isShutdown()
+ */
+ public synchronized boolean isTerminated() {
+ return terminated;
+ }
+
+ /**
+ * Remove transport service from accounting collection and notify of complete shutdown if
+ * necessary.
+ *
+ * @param transport service to remove
+ */
+ private synchronized void transportClosed(Service transport) {
+ if (!transports.remove(transport)) {
+ throw new AssertionError("Transport already removed");
+ }
+ if (shutdown && transports.isEmpty()) {
+ terminated = true;
+ notifyAll();
+ if (terminationRunnable != null) {
+ terminationRunnable.run();
+ }
}
}
@@ -184,12 +236,14 @@ public class ServerImpl extends AbstractService implements Server {
Preconditions.checkArgument(
transportState == Service.State.STARTING || transportState == Service.State.RUNNING,
"Created transport should be starting or running");
- if (state() != Server.State.RUNNING) {
- transport.stopAsync();
- return serverTransportListener;
+ synchronized (this) {
+ if (shutdown) {
+ transport.stopAsync();
+ return serverTransportListener;
+ }
+ transports.add(transport);
}
- transports.add(transport);
- // transports list can be modified during this call, even if we hold the lock, due to
+ // transports collection can be modified during this call, even if we hold the lock, due to
// reentrancy.
transport.addListener(new TransportServiceListener(transport),
MoreExecutors.directExecutor());
@@ -201,28 +255,6 @@ public class ServerImpl extends AbstractService implements Server {
}
}
- /** Listens for lifecycle changes to the "accept() socket." */
- private class TransportLifecycleListener extends Service.Listener {
- @Override
- public void running() {
- notifyStarted();
- }
-
- @Override
- public void terminated(Service.State from) {
- transportServerClosed();
- }
-
- @Override
- public void failed(Service.State from, Throwable failure) {
- // TODO(ejona): Ideally we would want to force-stop transports before notifying application of
- // failure, but that would cause us to have an unrepresentative state since we would be
- // RUNNING but not accepting connections.
- notifyFailed(failure);
- transportServerClosed();
- }
- }
-
/** Listens for lifecycle changes to a "TCP connection." */
private class TransportServiceListener extends Service.Listener {
private final Service transport;
diff --git a/core/src/test/java/com/google/net/stubby/ServerImplTest.java b/core/src/test/java/com/google/net/stubby/ServerImplTest.java
index 9cb6cf4b8..e99e3281d 100644
--- a/core/src/test/java/com/google/net/stubby/ServerImplTest.java
+++ b/core/src/test/java/com/google/net/stubby/ServerImplTest.java
@@ -33,7 +33,10 @@ package com.google.net.stubby;
import static com.google.common.base.Charsets.UTF_8;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.eq;
import static org.mockito.Matchers.isNull;
import static org.mockito.Matchers.notNull;
@@ -68,6 +71,7 @@ import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
/** Unit tests for {@link ServerImpl}. */
@@ -92,8 +96,7 @@ public class ServerImplTest {
public void startup() {
MockitoAnnotations.initMocks(this);
- server.startAsync();
- server.awaitRunning();
+ server.start();
}
@After
@@ -102,50 +105,33 @@ public class ServerImplTest {
}
@Test
- public void startStopImmediate() {
+ public void startStopImmediate() throws InterruptedException {
Service transportServer = new NoopService();
- Server server = new ServerImpl(executor, registry).setTransportServer(transportServer);
- assertEquals(Service.State.NEW, server.state());
+ ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer);
assertEquals(Service.State.NEW, transportServer.state());
- server.startAsync();
- server.awaitRunning();
- assertEquals(Service.State.RUNNING, server.state());
+ server.start();
assertEquals(Service.State.RUNNING, transportServer.state());
- server.stopAsync();
- server.awaitTerminated();
- assertEquals(Service.State.TERMINATED, server.state());
+ server.shutdown();
+ assertTrue(server.awaitTerminated(100, TimeUnit.MILLISECONDS));
assertEquals(Service.State.TERMINATED, transportServer.state());
}
@Test
- public void transportServerFailureFailsServer() {
- class FailableService extends NoopService {
- public void doNotifyFailed(Throwable cause) {
- notifyFailed(cause);
- }
- }
- FailableService transportServer = new FailableService();
- Server server = new ServerImpl(executor, registry).setTransportServer(transportServer);
- server.startAsync();
- server.awaitRunning();
- RuntimeException ex = new RuntimeException("force failure");
- transportServer.doNotifyFailed(ex);
- assertEquals(Service.State.FAILED, server.state());
- assertEquals(ex, server.failureCause());
- }
-
- @Test
public void transportServerFailsStartup() {
+ final Exception ex = new RuntimeException();
class FailingStartupService extends NoopService {
@Override
public void doStart() {
- notifyFailed(new RuntimeException());
+ notifyFailed(ex);
}
}
FailingStartupService transportServer = new FailingStartupService();
- Server server = new ServerImpl(executor, registry).setTransportServer(transportServer);
- server.startAsync();
- assertEquals(Service.State.FAILED, server.state());
+ ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer);
+ try {
+ server.start();
+ } catch (Exception e) {
+ assertSame(ex, e);
+ }
}
@Test
@@ -159,20 +145,20 @@ public class ServerImplTest {
public void doStop() {} // Don't notify.
}
NoopService transportServer = new NoopService();
- ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer);
- server.startAsync();
- server.awaitRunning();
+ ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer)
+ .start();
ManualStoppedService transport = new ManualStoppedService();
transport.startAsync();
server.serverListener().transportCreated(transport);
- server.stopAsync();
+ server.shutdown();
assertEquals(Service.State.STOPPING, transport.state());
assertEquals(Service.State.TERMINATED, transportServer.state());
- assertEquals(Service.State.STOPPING, server.state());
+ assertTrue(server.isShutdown());
+ assertFalse(server.isTerminated());
transport.doNotifyStopped();
assertEquals(Service.State.TERMINATED, transport.state());
- assertEquals(Service.State.TERMINATED, server.state());
+ assertTrue(server.isTerminated());
}
@Test
@@ -186,20 +172,20 @@ public class ServerImplTest {
public void doStop() {} // Don't notify.
}
ManualStoppedService transportServer = new ManualStoppedService();
- ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer);
- server.startAsync();
- server.awaitRunning();
+ ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer)
+ .start();
Service transport = new NoopService();
transport.startAsync();
server.serverListener().transportCreated(transport);
- server.stopAsync();
+ server.shutdown();
assertEquals(Service.State.TERMINATED, transport.state());
assertEquals(Service.State.STOPPING, transportServer.state());
- assertEquals(Service.State.STOPPING, server.state());
+ assertTrue(server.isShutdown());
+ assertFalse(server.isTerminated());
transportServer.doNotifyStopped();
assertEquals(Service.State.TERMINATED, transportServer.state());
- assertEquals(Service.State.TERMINATED, server.state());
+ assertTrue(server.isTerminated());
}
@Test
diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java
index cadb10a67..0b4d983a5 100644
--- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java
+++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java
@@ -99,11 +99,11 @@ public abstract class AbstractTransportTest {
.addService(ServerInterceptors.intercept(
TestServiceGrpc.bindService(new TestServiceImpl(testServiceExecutor)),
TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY)))
- .buildAndWaitForRunning();
+ .build().start();
}
protected static void stopStaticServer() {
- server.stopAsync();
+ server.shutdownNow();
testServiceExecutor.shutdown();
}
diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java
index 7452e719d..726930aec 100644
--- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java
+++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java
@@ -138,14 +138,14 @@ public class TestServiceServer {
.addService(ServerInterceptors.intercept(
TestServiceGrpc.bindService(new TestServiceImpl(executor)),
TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY)))
- .build();
- server.startAsync();
- server.awaitRunning(5, TimeUnit.SECONDS);
+ .build().start();
}
private void stop() throws Exception {
- server.stopAsync();
- server.awaitTerminated();
+ server.shutdownNow();
+ if (!server.awaitTerminated(5, TimeUnit.SECONDS)) {
+ System.err.println("Timed out waiting for server shutdown");
+ }
MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS);
}
}
diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java
index 5cf90ceea..6c309597f 100644
--- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java
+++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java
@@ -143,4 +143,18 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB
}
return server;
}
+
+ private abstract static class ClosureHook extends Service.Listener {
+ protected abstract void onClosed();
+
+ @Override
+ public void terminated(Service.State from) {
+ onClosed();
+ }
+
+ @Override
+ public void failed(Service.State from, Throwable failure) {
+ onClosed();
+ }
+ }
}