diff options
author | Eric Anderson <ejona@google.com> | 2014-12-19 16:41:03 -0800 |
---|---|---|
committer | Eric Anderson <ejona@google.com> | 2015-01-27 09:20:24 -0800 |
commit | aeeebb7cdbdefadcf4e200974e8ef504fae05756 (patch) | |
tree | a346f7035e25dba974c2ac939c50080c78e4dc01 | |
parent | 19052499f79381551a31ccd0ccf8f37a6b8e611b (diff) | |
download | grpc-grpc-java-aeeebb7cdbdefadcf4e200974e8ef504fae05756.tar.gz |
Remove Service API from ServerImpl
Fixes #21
9 files changed, 234 insertions, 334 deletions
diff --git a/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java b/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java index 0ecda3d2e..16b9d52f2 100644 --- a/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java +++ b/core/src/main/java/com/google/net/stubby/AbstractChannelBuilder.java @@ -31,12 +31,13 @@ package com.google.net.stubby; -import static com.google.net.stubby.AbstractServiceBuilder.DEFAULT_EXECUTOR; - import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.net.stubby.SharedResourceHolder.Resource; import com.google.net.stubby.transport.ClientTransportFactory; import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import javax.annotation.Nullable; @@ -46,6 +47,26 @@ import javax.annotation.Nullable; * @param <BuilderT> The concrete type of this builder. */ public abstract class AbstractChannelBuilder<BuilderT extends AbstractChannelBuilder<BuilderT>> { + static final Resource<ExecutorService> DEFAULT_EXECUTOR = + new Resource<ExecutorService>() { + private static final String name = "grpc-default-executor"; + @Override + public ExecutorService create() { + return Executors.newCachedThreadPool(new ThreadFactoryBuilder() + .setNameFormat(name + "-%d").build()); + } + + @Override + public void close(ExecutorService instance) { + instance.shutdown(); + } + + @Override + public String toString() { + return name; + } + }; + @Nullable private ExecutorService userExecutor; diff --git a/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java b/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java index 516a44065..09d934686 100644 --- a/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java +++ b/core/src/main/java/com/google/net/stubby/AbstractServerBuilder.java @@ -31,21 +31,26 @@ package com.google.net.stubby; +import static com.google.net.stubby.AbstractChannelBuilder.DEFAULT_EXECUTOR; + import com.google.common.base.Preconditions; import com.google.common.util.concurrent.Service; import com.google.net.stubby.transport.ServerListener; import java.util.concurrent.ExecutorService; +import javax.annotation.Nullable; + /** * The base class for server builders. * * @param <BuilderT> The concrete type for this builder. */ -public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuilder<BuilderT>> - extends AbstractServiceBuilder<ServerImpl, BuilderT> { +public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuilder<BuilderT>> { private final HandlerRegistry registry; + @Nullable + private ExecutorService userExecutor; /** * Constructs using a given handler registry. @@ -62,6 +67,21 @@ public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuild } /** + * Provides a custom executor. + * + * <p>It's an optional parameter. If the user has not provided an executor when the server is + * built, the builder will use a static cached thread pool. + * + * <p>The server won't take ownership of the given executor. It's caller's responsibility to + * shut down the executor when it's desired. + */ + @SuppressWarnings("unchecked") + public final BuilderT executor(ExecutorService executor) { + userExecutor = executor; + return (BuilderT) this; + } + + /** * Adds a service implementation to the handler registry. * * <p>This is supported only if the user didn't provide a handler registry, or the provided one is @@ -76,10 +96,33 @@ public abstract class AbstractServerBuilder<BuilderT extends AbstractServerBuild throw new UnsupportedOperationException("Underlying HandlerRegistry is not mutable"); } - @Override - protected final ServerImpl buildImpl(ExecutorService executor) { + /** + * Builds a server using the given parameters. + * + * <p>The returned service will not been started or be bound a port. You will need to start it + * with {@link ServerImpl#start()}. + */ + public ServerImpl build() { + final ExecutorService executor; + final boolean releaseExecutor; + if (userExecutor != null) { + executor = userExecutor; + releaseExecutor = false; + } else { + executor = SharedResourceHolder.get(DEFAULT_EXECUTOR); + releaseExecutor = true; + } + ServerImpl server = new ServerImpl(executor, registry); server.setTransportServer(buildTransportServer(server.serverListener())); + server.setTerminationRunnable(new Runnable() { + @Override + public void run() { + if (releaseExecutor) { + SharedResourceHolder.release(DEFAULT_EXECUTOR, executor); + } + } + }); return server; } diff --git a/core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java b/core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java deleted file mode 100644 index 2c23a02e9..000000000 --- a/core/src/main/java/com/google/net/stubby/AbstractServiceBuilder.java +++ /dev/null @@ -1,194 +0,0 @@ -/* - * Copyright 2014, Google Inc. All rights reserved. - * - * Redistribution and use in source and binary forms, with or without - * modification, are permitted provided that the following conditions are - * met: - * - * * Redistributions of source code must retain the above copyright - * notice, this list of conditions and the following disclaimer. - * * Redistributions in binary form must reproduce the above - * copyright notice, this list of conditions and the following disclaimer - * in the documentation and/or other materials provided with the - * distribution. - * - * * Neither the name of Google Inc. nor the names of its - * contributors may be used to endorse or promote products derived from - * this software without specific prior written permission. - * - * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS - * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT - * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR - * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT - * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, - * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT - * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, - * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY - * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT - * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE - * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. - */ - -package com.google.net.stubby; - -import com.google.common.base.Throwables; -import com.google.common.util.concurrent.MoreExecutors; -import com.google.common.util.concurrent.Service; -import com.google.common.util.concurrent.ThreadFactoryBuilder; -import com.google.net.stubby.SharedResourceHolder.Resource; - -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.TimeoutException; - -import javax.annotation.Nullable; - -/** - * Base class for channel builders and server builders. - * - * <p>The ownership rule: a builder generally does not take ownership of any objects passed to it. - * The caller is responsible for closing them if needed. The builder is only responsible for the - * life-cycle of objects created inside. - * - * @param <ProductT> The product that is built by this builder. - * @param <BuilderT> The concrete type of this builder. - */ -abstract class AbstractServiceBuilder<ProductT extends Service, - BuilderT extends AbstractServiceBuilder<ProductT, BuilderT>> { - - static final Resource<ExecutorService> DEFAULT_EXECUTOR = - new Resource<ExecutorService>() { - private static final String name = "grpc-default-executor"; - @Override - public ExecutorService create() { - return Executors.newCachedThreadPool(new ThreadFactoryBuilder() - .setNameFormat(name + "-%d").build()); - } - - @Override - public void close(ExecutorService instance) { - instance.shutdown(); - } - - @Override - public String toString() { - return name; - } - }; - - @Nullable - private ExecutorService userExecutor; - - /** - * Provides a custom executor. - * - * <p>It's an optional parameter. If the user has not provided an executor when the service is - * built, the builder will use a static cached thread pool. - * - * <p>The service won't take ownership of the given executor. It's caller's responsibility to - * shut down the executor when it's desired. - */ - @SuppressWarnings("unchecked") - public final BuilderT executor(ExecutorService executor) { - userExecutor = executor; - return (BuilderT) this; - } - - /** - * Builds a service using the given parameters. - * - * <p>The returned service has not been started at this point. You will need to start it by - * yourself or use {@link #buildAndStart()}. - */ - public ProductT build() { - final ExecutorService executor = (userExecutor == null) - ? SharedResourceHolder.get(DEFAULT_EXECUTOR) : userExecutor; - ProductT service = buildImpl(executor); - // We shut down the executor only if we created it. - if (userExecutor == null) { - service.addListener(new ClosureHook() { - @Override - protected void onClosed() { - SharedResourceHolder.release(DEFAULT_EXECUTOR, executor); - } - }, MoreExecutors.directExecutor()); - } - return service; - } - - /** - * Builds and starts a service. - * - * <p>The service may not be running when this method returns. If you want to wait until it's up - * and running, either use {@link Service#awaitRunning()} or {@link #buildAndWaitForRunning()}. - * - * @return the service that has just been built and started - */ - public final ProductT buildAndStart() { - ProductT service = build(); - service.startAsync(); - return service; - } - - /** - * Builds and starts a service, and wait until it's up and running. - * - * @return the service that has just been built and is now running. - */ - public final ProductT buildAndWaitForRunning() { - ProductT service = buildAndStart(); - try { - service.awaitRunning(); - } catch (Exception e) { - service.stopAsync(); - throw Throwables.propagate(e); - } - return service; - } - - /** - * Builds and starts a service, and wait until it's up and running, with a timeout. - * - * @return the service that has just been built and is now running. - * @throws TimeoutException if the service didn't become running within the given timeout. - */ - public final ProductT buildAndWaitForRunning(long timeout, TimeUnit unit) - throws TimeoutException { - ProductT service = buildAndStart(); - try { - service.awaitRunning(timeout, unit); - } catch (Exception e) { - service.stopAsync(); - if (e instanceof TimeoutException) { - throw (TimeoutException) e; - } else { - throw Throwables.propagate(e); - } - } - return service; - } - - /** - * Subclasses may use this as a convenient listener for cleaning up after the built service. - */ - protected abstract static class ClosureHook extends Service.Listener { - protected abstract void onClosed(); - - @Override - public void terminated(Service.State from) { - onClosed(); - } - - @Override - public void failed(Service.State from, Throwable failure) { - onClosed(); - } - } - - /** - * Implemented by subclasses to build the actual service object. The given executor is owned by - * this base class. - */ - protected abstract ProductT buildImpl(ExecutorService executor); -} diff --git a/core/src/main/java/com/google/net/stubby/Server.java b/core/src/main/java/com/google/net/stubby/Server.java index e413777d9..fbc2e94e5 100644 --- a/core/src/main/java/com/google/net/stubby/Server.java +++ b/core/src/main/java/com/google/net/stubby/Server.java @@ -31,8 +31,6 @@ package com.google.net.stubby; -import com.google.common.util.concurrent.Service; - import javax.annotation.concurrent.ThreadSafe; /** @@ -40,4 +38,4 @@ import javax.annotation.concurrent.ThreadSafe; * not expected to be implemented by application code or interceptors. */ @ThreadSafe -public interface Server extends Service {} +public interface Server {} diff --git a/core/src/main/java/com/google/net/stubby/ServerImpl.java b/core/src/main/java/com/google/net/stubby/ServerImpl.java index df933fe00..5883d4880 100644 --- a/core/src/main/java/com/google/net/stubby/ServerImpl.java +++ b/core/src/main/java/com/google/net/stubby/ServerImpl.java @@ -33,7 +33,6 @@ package com.google.net.stubby; import com.google.common.base.Preconditions; import com.google.common.base.Throwables; -import com.google.common.util.concurrent.AbstractService; import com.google.common.util.concurrent.MoreExecutors; import com.google.common.util.concurrent.Service; import com.google.net.stubby.transport.ServerListener; @@ -47,6 +46,7 @@ import java.util.Collection; import java.util.Collections; import java.util.HashSet; import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; /** * Default implementation of {@link Server}, for creation by transports. @@ -64,7 +64,7 @@ import java.util.concurrent.Executor; * <p>Starting the server starts the underlying transport for servicing requests. Stopping the * server stops servicing new requests and waits for all connections to terminate. */ -public class ServerImpl extends AbstractService implements Server { +public class ServerImpl implements Server { private static final ServerStreamListener NOOP_LISTENER = new NoopListener(); private final ServerListener serverListener = new ServerListenerImpl(); @@ -72,11 +72,14 @@ public class ServerImpl extends AbstractService implements Server { /** Executor for application processing. */ private final Executor executor; private final HandlerRegistry registry; + private boolean started; + private boolean shutdown; + private boolean terminated; + private Runnable terminationRunnable; /** Service encapsulating something similar to an accept() socket. */ private Service transportServer; /** {@code transportServer} and services encapsulating something similar to a TCP connection. */ - private final Collection<Service> transports - = Collections.synchronizedSet(new HashSet<Service>()); + private final Collection<Service> transports = new HashSet<Service>(); /** * Construct a server. {@link #setTransportServer(Service)} must be called before starting the @@ -96,18 +99,17 @@ public class ServerImpl extends AbstractService implements Server { * * @return this object */ - public ServerImpl setTransportServer(Service transportServer) { - Preconditions.checkState(state() == Server.State.NEW, "server must be in NEW state"); + public synchronized ServerImpl setTransportServer(Service transportServer) { + if (shutdown) { + throw new IllegalStateException("Already shutdown"); + } Preconditions.checkState(this.transportServer == null, "transportServer already set"); this.transportServer = Preconditions.checkNotNull(transportServer); Preconditions.checkArgument( - transportServer.state() == Server.State.NEW, "transport server not in NEW state"); - transportServer.addListener(new TransportLifecycleListener(), MoreExecutors.directExecutor()); + transportServer.state() == Service.State.NEW, "transport server not in NEW state"); + transportServer.addListener(new TransportServiceListener(transportServer), + MoreExecutors.directExecutor()); transports.add(transportServer); - // We assume that transport.state() won't change by another thread before we return from this - // call. - Preconditions.checkState( - transportServer.state() == Server.State.NEW, "transport server changed state!"); return this; } @@ -116,64 +118,114 @@ public class ServerImpl extends AbstractService implements Server { return serverListener; } - @Override - protected void doStart() { - Preconditions.checkState(transportServer != null, "setTransportServer not called"); - transportServer.startAsync(); - } - - @Override - protected void doStop() { - stopTransports(); + /** Hack to allow executors to auto-shutdown. Not for general use. */ + // TODO(ejona): Replace with a real API. + synchronized void setTerminationRunnable(Runnable runnable) { + this.terminationRunnable = runnable; } /** - * Remove transport service from accounting list and notify of complete shutdown if necessary. + * Bind and start the server. * - * @param transport service to remove - * @return {@code true} if shutting down and it is now complete + * @return {@code this} object + * @throws IllegalStateException if already started */ - private boolean transportClosed(Service transport) { - boolean shutdownComplete; - synchronized (transports) { - if (!transports.remove(transport)) { - throw new AssertionError("Transport already removed"); - } - shutdownComplete = transports.isEmpty(); + public synchronized ServerImpl start() { + if (started) { + throw new IllegalStateException("Already started"); } - if (shutdownComplete) { - Service.State state = state(); - if (state == Service.State.STOPPING) { - notifyStopped(); - } else if (state == Service.State.FAILED) { - // NOOP: already failed - } else { - notifyFailed(new IllegalStateException("server transport terminated unexpectedly")); + started = true; + try { + // Start and wait for any port to actually be bound. + transportServer.startAsync().awaitRunning(); + } catch (IllegalStateException ex) { + Throwable t = transportServer.failureCause(); + if (t != null) { + throw Throwables.propagate(t); } + throw ex; } - return shutdownComplete; + return this; } /** - * The transport server closed, so cleanup its resources and start shutdown. + * Initiates an orderly shutdown in which preexisting calls continue but new calls are rejected. */ - private void transportServerClosed() { - boolean shutdownComplete = transportClosed(transportServer); - if (shutdownComplete) { - return; + public synchronized ServerImpl shutdown() { + shutdown = true; + // transports collection can be modified during stopAsync(), even if we hold the lock, due to + // reentrancy. + for (Service transport : transports.toArray(new Service[transports.size()])) { + transport.stopAsync(); } - stopTransports(); + return this; } /** - * Shutdown all transports (including transportServer). Safe to be called even if previously - * called. + * Initiates a forceful shutdown in which preexisting and new calls are rejected. Although + * forceful, the shutdown process is still not instantaneous; {@link #isTerminated()} will likely + * return {@code false} immediately after this method returns. + * + * <p>NOT YET IMPLEMENTED. This method currently behaves identically to shutdown(). */ - private void stopTransports() { - for (Service transport : transports.toArray(new Service[0])) { - // transports list can be modified during this call, even if we hold the lock, due to - // reentrancy. - transport.stopAsync(); + // TODO(ejona): cancel preexisting calls. + public synchronized ServerImpl shutdownNow() { + shutdown(); + return this; + } + + /** + * Returns whether the server is shutdown. Shutdown servers reject any new calls, but may still + * have some calls being processed. + * + * @see #shutdown() + * @see #isTerminated() + */ + public synchronized boolean isShutdown() { + return shutdown; + } + + /** + * Waits for the server to become terminated, giving up if the timeout is reached. + * + * @return whether the server is terminated, as would be done by {@link #isTerminated()}. + */ + public synchronized boolean awaitTerminated(long timeout, TimeUnit unit) + throws InterruptedException { + long timeoutNanos = unit.toNanos(timeout); + long endTimeNanos = System.nanoTime() + timeoutNanos; + while (!terminated && (timeoutNanos = endTimeNanos - System.nanoTime()) > 0) { + TimeUnit.NANOSECONDS.timedWait(this, timeoutNanos); + } + return terminated; + } + + /** + * Returns whether the server is terminated. Terminated servers have no running calls and + * relevant resources released (like TCP connections). + * + * @see #isShutdown() + */ + public synchronized boolean isTerminated() { + return terminated; + } + + /** + * Remove transport service from accounting collection and notify of complete shutdown if + * necessary. + * + * @param transport service to remove + */ + private synchronized void transportClosed(Service transport) { + if (!transports.remove(transport)) { + throw new AssertionError("Transport already removed"); + } + if (shutdown && transports.isEmpty()) { + terminated = true; + notifyAll(); + if (terminationRunnable != null) { + terminationRunnable.run(); + } } } @@ -184,12 +236,14 @@ public class ServerImpl extends AbstractService implements Server { Preconditions.checkArgument( transportState == Service.State.STARTING || transportState == Service.State.RUNNING, "Created transport should be starting or running"); - if (state() != Server.State.RUNNING) { - transport.stopAsync(); - return serverTransportListener; + synchronized (this) { + if (shutdown) { + transport.stopAsync(); + return serverTransportListener; + } + transports.add(transport); } - transports.add(transport); - // transports list can be modified during this call, even if we hold the lock, due to + // transports collection can be modified during this call, even if we hold the lock, due to // reentrancy. transport.addListener(new TransportServiceListener(transport), MoreExecutors.directExecutor()); @@ -201,28 +255,6 @@ public class ServerImpl extends AbstractService implements Server { } } - /** Listens for lifecycle changes to the "accept() socket." */ - private class TransportLifecycleListener extends Service.Listener { - @Override - public void running() { - notifyStarted(); - } - - @Override - public void terminated(Service.State from) { - transportServerClosed(); - } - - @Override - public void failed(Service.State from, Throwable failure) { - // TODO(ejona): Ideally we would want to force-stop transports before notifying application of - // failure, but that would cause us to have an unrepresentative state since we would be - // RUNNING but not accepting connections. - notifyFailed(failure); - transportServerClosed(); - } - } - /** Listens for lifecycle changes to a "TCP connection." */ private class TransportServiceListener extends Service.Listener { private final Service transport; diff --git a/core/src/test/java/com/google/net/stubby/ServerImplTest.java b/core/src/test/java/com/google/net/stubby/ServerImplTest.java index 9cb6cf4b8..e99e3281d 100644 --- a/core/src/test/java/com/google/net/stubby/ServerImplTest.java +++ b/core/src/test/java/com/google/net/stubby/ServerImplTest.java @@ -33,7 +33,10 @@ package com.google.net.stubby; import static com.google.common.base.Charsets.UTF_8; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.eq; import static org.mockito.Matchers.isNull; import static org.mockito.Matchers.notNull; @@ -68,6 +71,7 @@ import java.util.concurrent.CyclicBarrier; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; /** Unit tests for {@link ServerImpl}. */ @@ -92,8 +96,7 @@ public class ServerImplTest { public void startup() { MockitoAnnotations.initMocks(this); - server.startAsync(); - server.awaitRunning(); + server.start(); } @After @@ -102,50 +105,33 @@ public class ServerImplTest { } @Test - public void startStopImmediate() { + public void startStopImmediate() throws InterruptedException { Service transportServer = new NoopService(); - Server server = new ServerImpl(executor, registry).setTransportServer(transportServer); - assertEquals(Service.State.NEW, server.state()); + ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer); assertEquals(Service.State.NEW, transportServer.state()); - server.startAsync(); - server.awaitRunning(); - assertEquals(Service.State.RUNNING, server.state()); + server.start(); assertEquals(Service.State.RUNNING, transportServer.state()); - server.stopAsync(); - server.awaitTerminated(); - assertEquals(Service.State.TERMINATED, server.state()); + server.shutdown(); + assertTrue(server.awaitTerminated(100, TimeUnit.MILLISECONDS)); assertEquals(Service.State.TERMINATED, transportServer.state()); } @Test - public void transportServerFailureFailsServer() { - class FailableService extends NoopService { - public void doNotifyFailed(Throwable cause) { - notifyFailed(cause); - } - } - FailableService transportServer = new FailableService(); - Server server = new ServerImpl(executor, registry).setTransportServer(transportServer); - server.startAsync(); - server.awaitRunning(); - RuntimeException ex = new RuntimeException("force failure"); - transportServer.doNotifyFailed(ex); - assertEquals(Service.State.FAILED, server.state()); - assertEquals(ex, server.failureCause()); - } - - @Test public void transportServerFailsStartup() { + final Exception ex = new RuntimeException(); class FailingStartupService extends NoopService { @Override public void doStart() { - notifyFailed(new RuntimeException()); + notifyFailed(ex); } } FailingStartupService transportServer = new FailingStartupService(); - Server server = new ServerImpl(executor, registry).setTransportServer(transportServer); - server.startAsync(); - assertEquals(Service.State.FAILED, server.state()); + ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer); + try { + server.start(); + } catch (Exception e) { + assertSame(ex, e); + } } @Test @@ -159,20 +145,20 @@ public class ServerImplTest { public void doStop() {} // Don't notify. } NoopService transportServer = new NoopService(); - ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer); - server.startAsync(); - server.awaitRunning(); + ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer) + .start(); ManualStoppedService transport = new ManualStoppedService(); transport.startAsync(); server.serverListener().transportCreated(transport); - server.stopAsync(); + server.shutdown(); assertEquals(Service.State.STOPPING, transport.state()); assertEquals(Service.State.TERMINATED, transportServer.state()); - assertEquals(Service.State.STOPPING, server.state()); + assertTrue(server.isShutdown()); + assertFalse(server.isTerminated()); transport.doNotifyStopped(); assertEquals(Service.State.TERMINATED, transport.state()); - assertEquals(Service.State.TERMINATED, server.state()); + assertTrue(server.isTerminated()); } @Test @@ -186,20 +172,20 @@ public class ServerImplTest { public void doStop() {} // Don't notify. } ManualStoppedService transportServer = new ManualStoppedService(); - ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer); - server.startAsync(); - server.awaitRunning(); + ServerImpl server = new ServerImpl(executor, registry).setTransportServer(transportServer) + .start(); Service transport = new NoopService(); transport.startAsync(); server.serverListener().transportCreated(transport); - server.stopAsync(); + server.shutdown(); assertEquals(Service.State.TERMINATED, transport.state()); assertEquals(Service.State.STOPPING, transportServer.state()); - assertEquals(Service.State.STOPPING, server.state()); + assertTrue(server.isShutdown()); + assertFalse(server.isTerminated()); transportServer.doNotifyStopped(); assertEquals(Service.State.TERMINATED, transportServer.state()); - assertEquals(Service.State.TERMINATED, server.state()); + assertTrue(server.isTerminated()); } @Test diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java index cadb10a67..0b4d983a5 100644 --- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java +++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/AbstractTransportTest.java @@ -99,11 +99,11 @@ public abstract class AbstractTransportTest { .addService(ServerInterceptors.intercept( TestServiceGrpc.bindService(new TestServiceImpl(testServiceExecutor)), TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY))) - .buildAndWaitForRunning(); + .build().start(); } protected static void stopStaticServer() { - server.stopAsync(); + server.shutdownNow(); testServiceExecutor.shutdown(); } diff --git a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java index 7452e719d..726930aec 100644 --- a/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java +++ b/integration-testing/src/main/java/com/google/net/stubby/testing/integration/TestServiceServer.java @@ -138,14 +138,14 @@ public class TestServiceServer { .addService(ServerInterceptors.intercept( TestServiceGrpc.bindService(new TestServiceImpl(executor)), TestUtils.echoRequestHeadersInterceptor(Util.METADATA_KEY))) - .build(); - server.startAsync(); - server.awaitRunning(5, TimeUnit.SECONDS); + .build().start(); } private void stop() throws Exception { - server.stopAsync(); - server.awaitTerminated(); + server.shutdownNow(); + if (!server.awaitTerminated(5, TimeUnit.SECONDS)) { + System.err.println("Timed out waiting for server shutdown"); + } MoreExecutors.shutdownAndAwaitTermination(executor, 5, TimeUnit.SECONDS); } } diff --git a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java index 5cf90ceea..6c309597f 100644 --- a/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java +++ b/netty/src/main/java/com/google/net/stubby/transport/netty/NettyServerBuilder.java @@ -143,4 +143,18 @@ public final class NettyServerBuilder extends AbstractServerBuilder<NettyServerB } return server; } + + private abstract static class ClosureHook extends Service.Listener { + protected abstract void onClosed(); + + @Override + public void terminated(Service.State from) { + onClosed(); + } + + @Override + public void failed(Service.State from, Throwable failure) { + onClosed(); + } + } } |