diff options
author | Kun Zhang <zhangkun@google.com> | 2016-09-01 16:00:43 -0700 |
---|---|---|
committer | Kun Zhang <zhangkun@google.com> | 2016-09-02 13:18:35 -0700 |
commit | c4f7f5c4fd3d4f2909ea92f879c4b882098032a9 (patch) | |
tree | 2c69ec97ce39765f6e408aa93a7eae20d2d4fd90 /context | |
parent | 58d78dd0aae93b23d9351cd03876baaf37164b73 (diff) | |
download | grpc-grpc-java-c4f7f5c4fd3d4f2909ea92f879c4b882098032a9.tar.gz |
core: split Context into a separate grpc-context artifact.
The Context API is not particularly gRPC-specific, and will be used by
Census as its context propagation mechanism.
Removed all dependencies to make it easy for other libraries to depend
on.
Diffstat (limited to 'context')
-rw-r--r-- | context/build.gradle | 14 | ||||
-rw-r--r-- | context/src/main/java/io/grpc/Context.java | 858 | ||||
-rw-r--r-- | context/src/main/java/io/grpc/Deadline.java | 186 | ||||
-rw-r--r-- | context/src/test/java/io/grpc/ContextTest.java | 746 | ||||
-rw-r--r-- | context/src/test/java/io/grpc/DeadlineTest.java | 287 |
5 files changed, 2091 insertions, 0 deletions
diff --git a/context/build.gradle b/context/build.gradle new file mode 100644 index 000000000..0566f6a0f --- /dev/null +++ b/context/build.gradle @@ -0,0 +1,14 @@ +plugins { + id "be.insaneprogramming.gradle.animalsniffer" version "1.4.0" +} + +description = 'gRPC: Context' + +dependencies { + testCompile project(':grpc-testing') +} + +// Configure the animal sniffer plugin +animalsniffer { + signature = "org.codehaus.mojo.signature:java16:+@signature" +} diff --git a/context/src/main/java/io/grpc/Context.java b/context/src/main/java/io/grpc/Context.java new file mode 100644 index 000000000..27a736750 --- /dev/null +++ b/context/src/main/java/io/grpc/Context.java @@ -0,0 +1,858 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import java.util.ArrayList; +import java.util.concurrent.Callable; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.logging.Level; +import java.util.logging.Logger; + +/** + * A context propagation mechanism which can carry scoped-values across API boundaries and between + * threads. Examples of state propagated via context include: + * <ul> + * <li>Security principals and credentials.</li> + * <li>Local and distributed tracing information.</li> + * </ul> + * + * <p>Context objects make their state available by being attached to the executing thread using + * a {@link ThreadLocal}. The context object bound to a thread is considered {@link #current()}. + * Context objects are immutable and inherit state from their parent. To add or overwrite the + * current state a new context object must be created and then attached to the thread replacing the + * previously bound context. For example: + * <pre> + * Context withCredential = Context.current().withValue(CRED_KEY, cred); + * executorService.execute(withCredential.wrap(new Runnable() { + * public void run() { + * readUserRecords(userId, CRED_KEY.get()); + * } + * })); + + * </pre> + * + * + * <p>Contexts are also used to represent a scoped unit of work. When the unit of work is + * done the context can be cancelled. This cancellation will also cascade to all descendant + * contexts. You can add a {@link CancellationListener} to a context to be notified when it or + * one of its ancestors has been cancelled. Cancellation does not release the state stored by + * a context and it's perfectly valid to {@link #attach()} an already cancelled context to a + * thread to make it current. To cancel a context (and its descendants) you first create a + * {@link CancellableContext} and when you need to signal cancellation call + * {@link CancellableContext#cancel} or {@link CancellableContext#detachAndCancel}. For example: + * <pre> + * CancellableContext withCancellation = Context.current().withCancellation(); + * try { + * executorService.execute(withCancellation.wrap(new Runnable() { + * public void run() { + * while (waitingForData() && !Context.current().isCancelled()) {} + * } + * }); + * doSomeWork(); + * } catch (Throwable t) { + * withCancellation.cancel(t); + * } + * </pre> + * + * <p>Contexts can also be created with a timeout relative to the system nano clock which will + * cause it to automatically cancel at the desired time. + * + * + * <p>Notes and cautions on use: + * <ul> + * <li>While Context objects are immutable they do not place such a restriction on the state + * they store.</li> + * <li>Context is not intended for passing optional parameters to an API and developers should + * take care to avoid excessive dependence on context when designing an API.</li> + * <li>If Context is being used in an environment that needs to support class unloading it is the + * responsibility of the application to ensure that all contexts are properly cancelled.</li> + * </ul> + */ +public class Context { + + private static final Logger log = Logger.getLogger(Context.class.getName()); + + private static final Object[][] EMPTY_ENTRIES = new Object[0][2]; + + private static final Key<Deadline> DEADLINE_KEY = new Key<Deadline>("deadline"); + + /** + * The logical root context which is {@link #current()} if no other context is bound. This context + * is not cancellable and so will not cascade cancellation or retain listeners. + */ + public static final Context ROOT = new Context(null); + + /** + * Currently bound context. + */ + private static final ThreadLocal<Context> localContext = new ThreadLocal<Context>() { + @Override + protected Context initialValue() { + return ROOT; + } + }; + + /** + * Create a {@link Key} with the given debug name. Multiple different keys may have the same name; + * the name is intended for debugging purposes and does not impact behavior. + */ + public static <T> Key<T> key(String name) { + return new Key<T>(name); + } + + /** + * Create a {@link Key} with the given debug name and default value. Multiple different keys may + * have the same name; the name is intended for debugging purposes and does not impact behavior. + */ + public static <T> Key<T> keyWithDefault(String name, T defaultValue) { + return new Key<T>(name, defaultValue); + } + + /** + * Return the context associated with the current thread, will never return {@code null} as + * the {@link #ROOT} context is implicitly associated with all threads. + * + * <p>Will never return {@link CancellableContext} even if one is attached, instead a + * {@link Context} is returned with the same properties and lifetime. This is to avoid + * code stealing the ability to cancel arbitrarily. + */ + public static Context current() { + Context current = localContext.get(); + if (current == null) { + return ROOT; + } + return current; + } + + private final Context parent; + private final Object[][] keyValueEntries; + private final boolean cascadesCancellation; + private ArrayList<ExecutableListener> listeners; + private CancellationListener parentListener = new ParentListener(); + private final boolean canBeCancelled; + + /** + * Construct a context that cannot be cancelled and will not cascade cancellation from its parent. + */ + private Context(Context parent) { + this.parent = parent; + // Not inheriting cancellation implies not inheriting a deadline too. + keyValueEntries = new Object[][]{{DEADLINE_KEY, null}}; + cascadesCancellation = false; + canBeCancelled = false; + } + + /** + * Construct a context that cannot be cancelled but will cascade cancellation from its parent if + * it is cancellable. + */ + private Context(Context parent, Object[][] keyValueEntries) { + this.parent = parent; + this.keyValueEntries = keyValueEntries; + cascadesCancellation = true; + canBeCancelled = this.parent != null && this.parent.canBeCancelled; + } + + /** + * Construct a context that can be cancelled and will cascade cancellation from its parent if + * it is cancellable. + */ + private Context(Context parent, Object[][] keyValueEntries, boolean isCancellable) { + this.parent = parent; + this.keyValueEntries = keyValueEntries; + cascadesCancellation = true; + canBeCancelled = isCancellable; + } + + /** + * Create a new context which is independently cancellable and also cascades cancellation from + * its parent. Callers should ensure that either {@link CancellableContext#cancel(Throwable)} + * or {@link CancellableContext#detachAndCancel(Context, Throwable)} are called to notify + * listeners and release the resources associated with them. + * + * <p>Sample usage: + * <pre> + * Context.CancellableContext withCancellation = Context.current().withCancellation(); + * try { + * executorService.execute(withCancellation.wrap(new Runnable() { + * public void run() { + * Context current = Context.current(); + * while (!current.isCancelled()) { + * keepWorking(); + * } + * } + * }); + * doSomethingRelatedWork(); + * } catch (Throwable t) { + * withCancellation.cancel(t); + * } + * </pre> + */ + public CancellableContext withCancellation() { + return new CancellableContext(this); + } + + /** + * Create a new context which will cancel itself after the given {@code duration} from now. + * The returned context will cascade cancellation of its parent. Callers may explicitly cancel + * the returned context prior to the deadline just as for {@link #withCancellation()}, + * + * <p>Sample usage: + * <pre> + * Context.CancellableContext withDeadline = Context.current().withDeadlineAfter(5, + * TimeUnit.SECONDS, scheduler); + * executorService.execute(withDeadline.wrap(new Runnable() { + * public void run() { + * Context current = Context.current(); + * while (!current.isCancelled()) { + * keepWorking(); + * } + * } + * }); + * </pre> + */ + public CancellableContext withDeadlineAfter(long duration, TimeUnit unit, + ScheduledExecutorService scheduler) { + return withDeadline(Deadline.after(duration, unit), scheduler); + } + + /** + * Create a new context which will cancel itself at the given {@link Deadline}. + * The returned context will cascade cancellation of its parent. Callers may explicitly cancel + * the returned context prior to the deadline just as for {@link #withCancellation()}, + * + * <p>Sample usage: + * <pre> + * Context.CancellableContext withDeadline = Context.current() + * .withDeadline(someReceivedDeadline); + * executorService.execute(withDeadline.wrap(new Runnable() { + * public void run() { + * Context current = Context.current(); + * while (!current.isCancelled()) { + * keepWorking(); + * } + * } + * }); + * </pre> + */ + public CancellableContext withDeadline(Deadline deadline, + ScheduledExecutorService scheduler) { + checkNotNull(deadline, "deadline"); + checkNotNull(scheduler, "scheduler"); + return new CancellableContext(this, deadline, scheduler); + } + + /** + * Create a new context with the given key value set. The new context will cascade cancellation + * from its parent. + * + <pre> + * Context withCredential = Context.current().withValue(CRED_KEY, cred); + * executorService.execute(withCredential.wrap(new Runnable() { + * public void run() { + * readUserRecords(userId, CRED_KEY.get()); + * } + * })); + * </pre> + * + */ + public <V> Context withValue(Key<V> k1, V v1) { + return new Context(this, new Object[][]{{k1, v1}}); + } + + /** + * Create a new context with the given key value set. The new context will cascade cancellation + * from its parent. + */ + public <V1, V2> Context withValues(Key<V1> k1, V1 v1, Key<V2> k2, V2 v2) { + return new Context(this, new Object[][]{{k1, v1}, {k2, v2}}); + } + + /** + * Create a new context with the given key value set. The new context will cascade cancellation + * from its parent. + */ + public <V1, V2, V3> Context withValues(Key<V1> k1, V1 v1, Key<V2> k2, V2 v2, Key<V3> k3, V3 v3) { + return new Context(this, new Object[][]{{k1, v1}, {k2, v2}, {k3, v3}}); + } + + /** + * Create a new context which propagates the values of this context but does not cascade its + * cancellation. + */ + public Context fork() { + return new Context(this); + } + + boolean canBeCancelled() { + // A context is cancellable if it cascades from its parent and its parent is + // cancellable or is itself directly cancellable.. + return canBeCancelled; + } + + /** + * Attach this context to the thread and make it {@link #current}, the previously current context + * is returned. It is allowed to attach contexts where {@link #isCancelled()} is {@code true}. + * + * <p>Instead of using {@link #attach()} & {@link #detach(Context)} most use-cases are better + * served by using the {@link #run(Runnable)} or {@link #call(java.util.concurrent.Callable)} + * to execute work immediately within a context. If work needs to be done in other threads + * it is recommended to use the 'wrap' methods or to use a propagating executor. + */ + public Context attach() { + Context previous = current(); + localContext.set(this); + return previous; + } + + /** + * Detach the current context from the thread and attach the provided replacement. If this + * context is not {@link #current()} a SEVERE message will be logged but the context to attach + * will still be bound. + */ + public void detach(Context toAttach) { + checkNotNull(toAttach, "toAttach"); + if (toAttach.attach() != this) { + // Log a severe message instead of throwing an exception as the context to attach is assumed + // to be the correct one and the unbalanced state represents a coding mistake in a lower + // layer in the stack that cannot be recovered from here. + log.log(Level.SEVERE, "Context was not attached when detaching", + new Throwable().fillInStackTrace()); + } + } + + // Visible for testing + boolean isCurrent() { + return current() == this; + } + + /** + * Is this context cancelled. + */ + public boolean isCancelled() { + if (parent == null || !cascadesCancellation) { + return false; + } else { + return parent.isCancelled(); + } + } + + /** + * If a context {@link #isCancelled()} then return the cause of the cancellation or + * {@code null} if context was cancelled without a cause. If the context is not yet cancelled + * will always return {@code null}. + * + * <p>The cancellation cause is provided for informational purposes only and implementations + * should generally assume that it has already been handled and logged properly. + */ + public Throwable cancellationCause() { + if (parent == null || !cascadesCancellation) { + return null; + } else { + return parent.cancellationCause(); + } + } + + /** + * A context may have an associated {@link Deadline} at which it will be automatically cancelled. + * @return A {@link io.grpc.Deadline} or {@code null} if no deadline is set. + */ + public Deadline getDeadline() { + return DEADLINE_KEY.get(this); + } + + /** + * Add a listener that will be notified when the context becomes cancelled. + */ + public void addListener(final CancellationListener cancellationListener, + final Executor executor) { + checkNotNull(cancellationListener, "cancellationListener"); + checkNotNull(executor, "executor"); + if (canBeCancelled) { + ExecutableListener executableListener = + new ExecutableListener(executor, cancellationListener); + synchronized (this) { + if (isCancelled()) { + executableListener.deliver(); + } else { + if (listeners == null) { + // Now that we have a listener we need to listen to our parent so + // we can cascade listener notification. + listeners = new ArrayList<ExecutableListener>(); + listeners.add(executableListener); + parent.addListener(parentListener, DirectExecutor.INSTANCE); + } else { + listeners.add(executableListener); + } + } + } + } + } + + /** + * Remove a {@link CancellationListener}. + */ + public void removeListener(CancellationListener cancellationListener) { + if (!canBeCancelled) { + return; + } + synchronized (this) { + if (listeners != null) { + for (int i = listeners.size() - 1; i >= 0; i--) { + if (listeners.get(i).listener == cancellationListener) { + listeners.remove(i); + // Just remove the first matching listener, given that we allow duplicate + // adds we should allow for duplicates after remove. + break; + } + } + // We have no listeners so no need to listen to our parent + if (listeners.isEmpty()) { + parent.removeListener(parentListener); + listeners = null; + } + } + } + } + + /** + * Notify all listeners that this context has been cancelled and immediately release + * any reference to them so that they may be garbage collected. + */ + void notifyAndClearListeners() { + if (!canBeCancelled) { + return; + } + ArrayList<ExecutableListener> tmpListeners; + synchronized (this) { + if (listeners == null) { + return; + } + tmpListeners = listeners; + listeners = null; + } + // Deliver events to non-child context listeners before we notify child contexts. We do this + // to cancel higher level units of work before child units. This allows for a better error + // handling paradigm where the higher level unit of work knows it is cancelled and so can + // ignore errors that bubble up as a result of cancellation of lower level units. + for (int i = 0; i < tmpListeners.size(); i++) { + if (!(tmpListeners.get(i).listener instanceof ParentListener)) { + tmpListeners.get(i).deliver(); + } + } + for (int i = 0; i < tmpListeners.size(); i++) { + if (tmpListeners.get(i).listener instanceof ParentListener) { + tmpListeners.get(i).deliver(); + } + } + parent.removeListener(parentListener); + } + + // Used in tests to ensure that listeners are defined and released when cancellation cascades. + // It's very important to ensure that we do not accidentally retain listeners. + int listenerCount() { + synchronized (this) { + return listeners == null ? 0 : listeners.size(); + } + } + + /** + * Immediately run a {@link Runnable} with this context as the {@link #current} context. + * @param r {@link Runnable} to run. + */ + public void run(Runnable r) { + Context previous = attach(); + try { + r.run(); + } finally { + detach(previous); + } + } + + /** + * Immediately call a {@link Callable} with this context as the {@link #current} context. + * @param c {@link Callable} to call. + * @return result of call. + */ + public <V> V call(Callable<V> c) throws Exception { + Context previous = attach(); + try { + return c.call(); + } finally { + detach(previous); + } + } + + /** + * Wrap a {@link Runnable} so that it executes with this context as the {@link #current} context. + */ + public Runnable wrap(final Runnable r) { + return new Runnable() { + @Override + public void run() { + Context previous = attach(); + try { + r.run(); + } finally { + detach(previous); + } + } + }; + } + + /** + * Wrap a {@link Callable} so that it executes with this context as the {@link #current} context. + */ + public <C> Callable<C> wrap(final Callable<C> c) { + return new Callable<C>() { + @Override + public C call() throws Exception { + Context previous = attach(); + try { + return c.call(); + } finally { + detach(previous); + } + } + }; + } + + /** + * Wrap an {@link Executor} so that it always executes with this context as the {@link #current} + * context. It is generally expected that {@link #currentContextExecutor(Executor)} would be + * used more commonly than this method. + * + * <p>One scenario in which this executor may be useful is when a single thread is sharding work + * to multiple threads. + * + * @see #currentContextExecutor(Executor) + */ + public Executor fixedContextExecutor(final Executor e) { + class FixedContextExecutor implements Executor { + @Override + public void execute(Runnable r) { + e.execute(wrap(r)); + } + } + + return new FixedContextExecutor(); + } + + /** + * Create an executor that propagates the {@link #current} context when {@link Executor#execute} + * is called as the {@link #current} context of the {@code Runnable} scheduled. <em>Note that this + * is a static method.</em> + * + * @see #fixedContextExecutor(Executor) + */ + public static Executor currentContextExecutor(final Executor e) { + class CurrentContextExecutor implements Executor { + @Override + public void execute(Runnable r) { + e.execute(Context.current().wrap(r)); + } + } + + return new CurrentContextExecutor(); + } + + /** + * Lookup the value for a key in the context inheritance chain. + */ + private Object lookup(Key<?> key) { + for (int i = 0; i < keyValueEntries.length; i++) { + if (key.equals(keyValueEntries[i][0])) { + return keyValueEntries[i][1]; + } + } + if (parent == null) { + return null; + } + return parent.lookup(key); + } + + /** + * A context which inherits cancellation from its parent but which can also be independently + * cancelled and which will propagate cancellation to its descendants. + */ + public static final class CancellableContext extends Context { + + private boolean cancelled; + private Throwable cancellationCause; + private final Context uncancellableSurrogate; + private ScheduledFuture<?> pendingDeadline; + + /** + * If the parent deadline is before the given deadline there is no need to install the value + * or listen for its expiration as the parent context will already be listening for it. + */ + private static Object[][] deriveDeadline(Context parent, Deadline deadline) { + Deadline parentDeadline = DEADLINE_KEY.get(parent); + return parentDeadline == null || deadline.isBefore(parentDeadline) + ? new Object[][]{{ DEADLINE_KEY, deadline}} : + EMPTY_ENTRIES; + } + + /** + * Create a cancellable context that does not have a deadline. + */ + private CancellableContext(Context parent) { + super(parent, EMPTY_ENTRIES, true); + // Create a surrogate that inherits from this to attach so that you cannot retrieve a + // cancellable context from Context.current() + uncancellableSurrogate = new Context(this, EMPTY_ENTRIES); + } + + /** + * Create a cancellable context that has a deadline. + */ + private CancellableContext(Context parent, Deadline deadline, + ScheduledExecutorService scheduler) { + super(parent, deriveDeadline(parent, deadline), true); + if (DEADLINE_KEY.get(this) == deadline) { + final TimeoutException cause = new TimeoutException("context timed out"); + if (!deadline.isExpired()) { + // The parent deadline was after the new deadline so we need to install a listener + // on the new earlier deadline to trigger expiration for this context. + pendingDeadline = deadline.runOnExpiration(new Runnable() { + @Override + public void run() { + try { + cancel(cause); + } catch (Throwable t) { + log.log(Level.SEVERE, "Cancel threw an exception, which should not happen", t); + } + } + }, scheduler); + } else { + // Cancel immediately if the deadline is already expired. + cancel(cause); + } + } + uncancellableSurrogate = new Context(this, EMPTY_ENTRIES); + } + + + @Override + public Context attach() { + return uncancellableSurrogate.attach(); + } + + @Override + public void detach(Context toAttach) { + uncancellableSurrogate.detach(toAttach); + } + + @Override + public boolean isCurrent() { + return uncancellableSurrogate.isCurrent(); + } + + /** + * Cancel this context and optionally provide a cause (can be {@code null}) for the + * cancellation. This will trigger notification of listeners. + * + * @return {@code true} if this context cancelled the context and notified listeners, + * {@code false} if the context was already cancelled. + */ + public boolean cancel(Throwable cause) { + boolean triggeredCancel = false; + synchronized (this) { + if (!cancelled) { + cancelled = true; + if (pendingDeadline != null) { + // If we have a scheduled cancellation pending attempt to cancel it. + pendingDeadline.cancel(false); + pendingDeadline = null; + } + this.cancellationCause = cause; + triggeredCancel = true; + } + } + if (triggeredCancel) { + notifyAndClearListeners(); + } + return triggeredCancel; + } + + /** + * Cancel this context and detach it as the current context from the thread. + * + * @param toAttach context to make current. + * @param cause of cancellation, can be {@code null}. + */ + public void detachAndCancel(Context toAttach, Throwable cause) { + try { + detach(toAttach); + } finally { + cancel(cause); + } + } + + @Override + public boolean isCancelled() { + synchronized (this) { + if (cancelled) { + return true; + } + } + // Detect cancellation of parent in the case where we have no listeners and + // record it. + if (super.isCancelled()) { + cancel(super.cancellationCause()); + return true; + } + return false; + } + + @Override + public Throwable cancellationCause() { + if (isCancelled()) { + return cancellationCause; + } + return null; + } + } + + /** + * A listener notified on context cancellation. + */ + public interface CancellationListener { + /** + * @param context the newly cancelled context. + */ + public void cancelled(Context context); + } + + /** + * Key for indexing values stored in a context. + */ + public static class Key<T> { + + private final String name; + private final T defaultValue; + + Key(String name) { + this(name, null); + } + + Key(String name, T defaultValue) { + this.name = checkNotNull(name, "name"); + this.defaultValue = defaultValue; + } + + /** + * Get the value from the {@link #current()} context for this key. + */ + @SuppressWarnings("unchecked") + public T get() { + return get(Context.current()); + } + + /** + * Get the value from the specified context for this key. + */ + @SuppressWarnings("unchecked") + public T get(Context context) { + T value = (T) context.lookup(this); + return value == null ? defaultValue : value; + } + + @Override + public String toString() { + return name; + } + } + + /** + * Stores listener & executor pair. + */ + private class ExecutableListener implements Runnable { + private final Executor executor; + private final CancellationListener listener; + + private ExecutableListener(Executor executor, CancellationListener listener) { + this.executor = executor; + this.listener = listener; + } + + private void deliver() { + try { + executor.execute(this); + } catch (Throwable t) { + log.log(Level.INFO, "Exception notifying context listener", t); + } + } + + @Override + public void run() { + listener.cancelled(Context.this); + } + } + + private class ParentListener implements CancellationListener { + @Override + public void cancelled(Context context) { + if (Context.this instanceof CancellableContext) { + // Record cancellation with its cancellationCause. + ((CancellableContext) Context.this).cancel(context.cancellationCause()); + } else { + notifyAndClearListeners(); + } + } + } + + private static <T> T checkNotNull(T reference, Object errorMessage) { + if (reference == null) { + throw new NullPointerException(String.valueOf(errorMessage)); + } + return reference; + } + + private enum DirectExecutor implements Executor { + INSTANCE; + + @Override + public void execute(Runnable command) { + command.run(); + } + + @Override + public String toString() { + return "Context.DirectExecutor"; + } + } +} diff --git a/context/src/main/java/io/grpc/Deadline.java b/context/src/main/java/io/grpc/Deadline.java new file mode 100644 index 000000000..c3ab53e32 --- /dev/null +++ b/context/src/main/java/io/grpc/Deadline.java @@ -0,0 +1,186 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledFuture; +import java.util.concurrent.TimeUnit; + +/** + * An absolute deadline in system time. + */ +public final class Deadline implements Comparable<Deadline> { + private static final SystemTicker SYSTEM_TICKER = new SystemTicker(); + // nanoTime has a range of just under 300 years. Only allow up to 100 years in the past or future + // to prevent wraparound as long as process runs for less than ~100 years. + private static final long MAX_OFFSET = TimeUnit.DAYS.toNanos(100 * 365); + private static final long MIN_OFFSET = -MAX_OFFSET; + + /** + * Create a deadline that will expire at the specified offset from the current system clock. + * @param duration A non-negative duration. + * @param units The time unit for the duration. + * @return A new deadline. + */ + public static Deadline after(long duration, TimeUnit units) { + return after(duration, units, SYSTEM_TICKER); + } + + // For testing + static Deadline after(long duration, TimeUnit units, Ticker ticker) { + checkNotNull(units, "units"); + return new Deadline(ticker, units.toNanos(duration), true); + } + + private final Ticker ticker; + private final long deadlineNanos; + private volatile boolean expired; + + private Deadline(Ticker ticker, long offset, boolean baseInstantAlreadyExpired) { + this(ticker, ticker.read(), offset, baseInstantAlreadyExpired); + } + + private Deadline(Ticker ticker, long baseInstant, long offset, + boolean baseInstantAlreadyExpired) { + this.ticker = ticker; + // Clamp to range [MIN_OFFSET, MAX_OFFSET] + offset = Math.min(MAX_OFFSET, Math.max(MIN_OFFSET, offset)); + deadlineNanos = baseInstant + offset; + expired = baseInstantAlreadyExpired && offset <= 0; + } + + /** + * Has this deadline expired + * @return {@code true} if it has, otherwise {@code false}. + */ + public boolean isExpired() { + if (!expired) { + if (deadlineNanos - ticker.read() <= 0) { + expired = true; + } else { + return false; + } + } + return true; + } + + /** + * Is {@code this} deadline before another. + */ + public boolean isBefore(Deadline other) { + return this.deadlineNanos - other.deadlineNanos < 0; + } + + /** + * Return the minimum deadline of {@code this} or an other deadline. + * @param other deadline to compare with {@code this}. + */ + public Deadline minimum(Deadline other) { + return isBefore(other) ? this : other; + } + + /** + * Create a new deadline that is offset from {@code this}. + */ + // TODO(ejona): This method can cause deadlines to grow too far apart. For example: + // Deadline.after(100 * 365, DAYS).offset(100 * 365, DAYS) would be less than + // Deadline.after(-100 * 365, DAYS) + public Deadline offset(long offset, TimeUnit units) { + // May already be expired + if (offset == 0) { + return this; + } + return new Deadline(ticker, deadlineNanos, units.toNanos(offset), isExpired()); + } + + /** + * How much time is remaining in the specified time unit. Internal units are maintained as + * nanoseconds and conversions are subject to the constraints documented for + * {@link TimeUnit#convert}. If there is no time remaining, the returned duration is how + * long ago the deadline expired. + */ + public long timeRemaining(TimeUnit unit) { + final long nowNanos = ticker.read(); + if (!expired && deadlineNanos - nowNanos <= 0) { + expired = true; + } + return unit.convert(deadlineNanos - nowNanos, TimeUnit.NANOSECONDS); + } + + /** + * Schedule a task to be run when the deadline expires. + * @param task to run on expiration + * @param scheduler used to execute the task + * @return {@link ScheduledFuture} which can be used to cancel execution of the task + */ + public ScheduledFuture<?> runOnExpiration(Runnable task, ScheduledExecutorService scheduler) { + checkNotNull(task, "task"); + checkNotNull(scheduler, "scheduler"); + return scheduler.schedule(task, deadlineNanos - ticker.read(), TimeUnit.NANOSECONDS); + } + + @Override + public String toString() { + return timeRemaining(TimeUnit.NANOSECONDS) + " ns from now"; + } + + @Override + public int compareTo(Deadline that) { + long diff = this.deadlineNanos - that.deadlineNanos; + if (diff < 0) { + return -1; + } else if (diff > 0) { + return 1; + } + return 0; + } + + /** Time source representing nanoseconds since fixed but arbitrary point in time. */ + abstract static class Ticker { + /** Returns the number of nanoseconds since this source's epoch. */ + public abstract long read(); + } + + private static class SystemTicker extends Ticker { + @Override + public long read() { + return System.nanoTime(); + } + } + + private static <T> T checkNotNull(T reference, Object errorMessage) { + if (reference == null) { + throw new NullPointerException(String.valueOf(errorMessage)); + } + return reference; + } +} diff --git a/context/src/test/java/io/grpc/ContextTest.java b/context/src/test/java/io/grpc/ContextTest.java new file mode 100644 index 000000000..f36cdfae4 --- /dev/null +++ b/context/src/test/java/io/grpc/ContextTest.java @@ -0,0 +1,746 @@ +/* + * Copyright 2015, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import static org.hamcrest.core.IsInstanceOf.instanceOf; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; + +import com.google.common.util.concurrent.MoreExecutors; +import com.google.common.util.concurrent.SettableFuture; + +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.JUnit4; + +import java.util.ArrayDeque; +import java.util.Queue; +import java.util.concurrent.Callable; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Executor; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; +import java.util.logging.Handler; +import java.util.logging.Level; +import java.util.logging.LogRecord; +import java.util.logging.Logger; + +/** + * Tests for {@link Context}. + */ +@RunWith(JUnit4.class) +public class ContextTest { + + private static final Context.Key<String> PET = Context.key("pet"); + private static final Context.Key<String> FOOD = Context.keyWithDefault("food", "lasagna"); + private static final Context.Key<String> COLOR = Context.key("color"); + private static final Context.Key<Object> FAVORITE = Context.key("favorite"); + + private Context listenerNotifedContext; + private CountDownLatch deadlineLatch = new CountDownLatch(1); + private Context.CancellationListener cancellationListener = new Context.CancellationListener() { + @Override + public void cancelled(Context context) { + listenerNotifedContext = context; + deadlineLatch.countDown(); + } + }; + + private Context observed; + private Runnable runner = new Runnable() { + @Override + public void run() { + observed = Context.current(); + } + }; + private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor(); + + @Before + public void setUp() throws Exception { + Context.ROOT.attach(); + } + + @After + public void tearDown() throws Exception { + scheduler.shutdown(); + } + + @Test + public void rootIsInitialContext() { + assertNotNull(Context.ROOT); + assertTrue(Context.ROOT.isCurrent()); + } + + @Test + public void rootIsAlwaysBound() throws Exception { + final SettableFuture<Boolean> rootIsBound = SettableFuture.create(); + new Thread(new Runnable() { + @Override + public void run() { + rootIsBound.set(Context.current() == Context.ROOT); + } + }).start(); + assertTrue(rootIsBound.get(5, TimeUnit.SECONDS)); + } + + @Test + public void rootCanBeAttached() { + Context fork = Context.ROOT.fork(); + fork.attach(); + Context.ROOT.attach(); + assertTrue(Context.ROOT.isCurrent()); + fork.attach(); + assertTrue(fork.isCurrent()); + } + + @Test + public void rootCanNeverHaveAListener() { + Context root = Context.current(); + root.addListener(cancellationListener, MoreExecutors.directExecutor()); + assertEquals(0, root.listenerCount()); + } + + @Test + public void rootIsNotCancelled() { + assertFalse(Context.ROOT.isCancelled()); + assertNull(Context.ROOT.cancellationCause()); + } + + @Test + public void attachedCancellableContextCannotBeCastFromCurrent() { + Context initial = Context.current(); + Context.CancellableContext base = initial.withCancellation(); + base.attach(); + assertFalse(Context.current() instanceof Context.CancellableContext); + assertNotSame(base, Context.current()); + assertNotSame(initial, Context.current()); + base.detachAndCancel(initial, null); + assertSame(initial, Context.current()); + } + + @Test + public void attachingNonCurrentReturnsCurrent() { + Context initial = Context.current(); + Context base = initial.withValue(PET, "dog"); + assertSame(initial, base.attach()); + assertSame(base, initial.attach()); + } + + @Test + public void detachingNonCurrentLogsSevereMessage() { + final AtomicReference<LogRecord> logRef = new AtomicReference<LogRecord>(); + Handler handler = new Handler() { + @Override + public void publish(LogRecord record) { + logRef.set(record); + } + + @Override + public void flush() { + } + + @Override + public void close() throws SecurityException { + } + }; + Logger logger = Logger.getLogger(Context.class.getName()); + try { + logger.addHandler(handler); + Context initial = Context.current(); + Context base = initial.withValue(PET, "dog"); + // Base is not attached + base.detach(initial); + assertSame(initial, Context.current()); + assertNotNull(logRef.get()); + assertEquals(Level.SEVERE, logRef.get().getLevel()); + } finally { + logger.removeHandler(handler); + } + } + + @Test + public void valuesAndOverrides() { + Context base = Context.current().withValue(PET, "dog"); + Context child = base.withValues(PET, null, FOOD, "cheese"); + + base.attach(); + + assertEquals("dog", PET.get()); + assertEquals("lasagna", FOOD.get()); + assertNull(COLOR.get()); + + child.attach(); + + assertNull(PET.get()); + assertEquals("cheese", FOOD.get()); + assertNull(COLOR.get()); + + child.detach(base); + + // Should have values from base + assertEquals("dog", PET.get()); + assertEquals("lasagna", FOOD.get()); + assertNull(COLOR.get()); + + base.detach(Context.ROOT); + + assertNull(PET.get()); + assertEquals("lasagna", FOOD.get()); + assertNull(COLOR.get()); + } + + @Test + public void withValuesThree() { + Object fav = new Object(); + Context base = Context.current().withValues(PET, "dog", COLOR, "blue"); + Context child = base.withValues(PET, "cat", FOOD, "cheese", FAVORITE, fav); + + child.attach(); + + assertEquals("cat", PET.get()); + assertEquals("cheese", FOOD.get()); + assertEquals("blue", COLOR.get()); + assertEquals(fav, FAVORITE.get()); + + base.attach(); + } + + @Test + public void cancelReturnsFalseIfAlreadyCancelled() { + Context.CancellableContext base = Context.current().withCancellation(); + assertTrue(base.cancel(null)); + assertTrue(base.isCancelled()); + assertFalse(base.cancel(null)); + } + + @Test + public void notifyListenersOnCancel() { + class SetContextCancellationListener implements Context.CancellationListener { + private final AtomicReference<Context> observed; + + public SetContextCancellationListener(AtomicReference<Context> observed) { + this.observed = observed; + } + + @Override + public void cancelled(Context context) { + observed.set(context); + } + } + + Context.CancellableContext base = Context.current().withCancellation(); + final AtomicReference<Context> observed1 = new AtomicReference<Context>(); + base.addListener(new SetContextCancellationListener(observed1), MoreExecutors.directExecutor()); + final AtomicReference<Context> observed2 = new AtomicReference<Context>(); + base.addListener(new SetContextCancellationListener(observed2), MoreExecutors.directExecutor()); + assertNull(observed1.get()); + assertNull(observed2.get()); + base.cancel(null); + assertSame(base, observed1.get()); + assertSame(base, observed2.get()); + + final AtomicReference<Context> observed3 = new AtomicReference<Context>(); + base.addListener(new SetContextCancellationListener(observed3), MoreExecutors.directExecutor()); + assertSame(base, observed3.get()); + } + + @Test + public void exceptionOfExecutorDoesntThrow() { + final AtomicReference<Throwable> loggedThrowable = new AtomicReference<Throwable>(); + Handler logHandler = new Handler() { + @Override + public void publish(LogRecord record) { + Throwable thrown = record.getThrown(); + if (thrown != null) { + if (loggedThrowable.get() == null) { + loggedThrowable.set(thrown); + } else { + loggedThrowable.set(new RuntimeException("Too many exceptions", thrown)); + } + } + } + + @Override + public void close() {} + + @Override + public void flush() {} + }; + Logger logger = Logger.getLogger(Context.class.getName()); + logger.addHandler(logHandler); + try { + Context.CancellableContext base = Context.current().withCancellation(); + final AtomicReference<Runnable> observed1 = new AtomicReference<Runnable>(); + final Error err = new Error(); + base.addListener(cancellationListener, new Executor() { + @Override + public void execute(Runnable runnable) { + observed1.set(runnable); + throw err; + } + }); + assertNull(observed1.get()); + assertNull(loggedThrowable.get()); + base.cancel(null); + assertNotNull(observed1.get()); + assertSame(err, loggedThrowable.get()); + + final Error err2 = new Error(); + loggedThrowable.set(null); + final AtomicReference<Runnable> observed2 = new AtomicReference<Runnable>(); + base.addListener(cancellationListener, new Executor() { + @Override + public void execute(Runnable runnable) { + observed2.set(runnable); + throw err2; + } + }); + assertNotNull(observed2.get()); + assertSame(err2, loggedThrowable.get()); + } finally { + logger.removeHandler(logHandler); + } + } + + @Test + public void cascadingCancellationNotifiesChild() { + // Root is not cancellable so we can't cascade from it + Context.CancellableContext base = Context.current().withCancellation(); + assertEquals(0, base.listenerCount()); + Context child = base.withValue(FOOD, "lasagna"); + assertEquals(0, child.listenerCount()); + child.addListener(cancellationListener, MoreExecutors.directExecutor()); + assertEquals(1, child.listenerCount()); + assertEquals(1, base.listenerCount()); // child is now listening to base + assertFalse(base.isCancelled()); + assertFalse(child.isCancelled()); + IllegalStateException cause = new IllegalStateException(); + base.cancel(cause); + assertTrue(base.isCancelled()); + assertSame(cause, base.cancellationCause()); + assertSame(child, listenerNotifedContext); + assertTrue(child.isCancelled()); + assertSame(cause, child.cancellationCause()); + assertEquals(0, base.listenerCount()); + assertEquals(0, child.listenerCount()); + } + + @Test + public void cascadingCancellationWithoutListener() { + Context.CancellableContext base = Context.current().withCancellation(); + Context child = base.withCancellation(); + Throwable t = new Throwable(); + base.cancel(t); + assertTrue(child.isCancelled()); + assertSame(t, child.cancellationCause()); + } + + @Test + public void cancellableContextIsAttached() { + Context.CancellableContext base = Context.current().withValue(FOOD, "fish").withCancellation(); + assertFalse(base.isCurrent()); + base.attach(); + + Context attached = Context.current(); + assertSame("fish", FOOD.get()); + assertFalse(attached.isCancelled()); + assertNull(attached.cancellationCause()); + assertTrue(attached.canBeCancelled()); + assertTrue(attached.isCurrent()); + assertTrue(base.isCurrent()); + + attached.addListener(cancellationListener, MoreExecutors.directExecutor()); + Throwable t = new Throwable(); + base.cancel(t); + assertTrue(attached.isCancelled()); + assertSame(t, attached.cancellationCause()); + assertSame(attached, listenerNotifedContext); + + Context.ROOT.attach(); + } + + @Test + public void cancellableContextCascadesFromCancellableParent() { + // Root is not cancellable so we can't cascade from it + Context.CancellableContext base = Context.current().withCancellation(); + Context child = base.withCancellation(); + child.addListener(cancellationListener, MoreExecutors.directExecutor()); + assertFalse(base.isCancelled()); + assertFalse(child.isCancelled()); + IllegalStateException cause = new IllegalStateException(); + base.cancel(cause); + assertTrue(base.isCancelled()); + assertSame(cause, base.cancellationCause()); + assertSame(child, listenerNotifedContext); + assertTrue(child.isCancelled()); + assertSame(cause, child.cancellationCause()); + assertEquals(0, base.listenerCount()); + assertEquals(0, child.listenerCount()); + } + + @Test + public void nonCascadingCancellationDoesNotNotifyForked() { + Context.CancellableContext base = Context.current().withCancellation(); + Context fork = base.fork(); + fork.addListener(cancellationListener, MoreExecutors.directExecutor()); + assertEquals(0, base.listenerCount()); + assertEquals(0, fork.listenerCount()); + assertTrue(base.cancel(new Throwable())); + assertNull(listenerNotifedContext); + assertFalse(fork.isCancelled()); + assertNull(fork.cancellationCause()); + } + + @Test + public void testWrapRunnable() throws Exception { + Context base = Context.current().withValue(PET, "cat"); + Context current = Context.current().withValue(PET, "fish"); + current.attach(); + + base.wrap(runner).run(); + assertSame(base, observed); + assertSame(current, Context.current()); + + current.wrap(runner).run(); + assertSame(current, observed); + assertSame(current, Context.current()); + + final Error err = new Error(); + try { + base.wrap(new Runnable() { + @Override + public void run() { + throw err; + } + }).run(); + fail("Expected exception"); + } catch (Error ex) { + assertSame(err, ex); + } + assertSame(current, Context.current()); + + current.detach(Context.ROOT); + } + + @Test + public void testWrapCallable() throws Exception { + Context base = Context.current().withValue(PET, "cat"); + Context current = Context.current().withValue(PET, "fish"); + current.attach(); + + final Object ret = new Object(); + Callable<Object> callable = new Callable<Object>() { + @Override + public Object call() { + runner.run(); + return ret; + } + }; + + assertSame(ret, base.wrap(callable).call()); + assertSame(base, observed); + assertSame(current, Context.current()); + + assertSame(ret, current.wrap(callable).call()); + assertSame(current, observed); + assertSame(current, Context.current()); + + final Error err = new Error(); + try { + base.wrap(new Callable<Object>() { + @Override + public Object call() { + throw err; + } + }).call(); + fail("Excepted exception"); + } catch (Error ex) { + assertSame(err, ex); + } + assertSame(current, Context.current()); + + current.detach(Context.ROOT); + } + + @Test + public void currentContextExecutor() throws Exception { + QueuedExecutor queuedExecutor = new QueuedExecutor(); + Executor executor = Context.currentContextExecutor(queuedExecutor); + Context base = Context.current().withValue(PET, "cat"); + Context previous = base.attach(); + try { + executor.execute(runner); + } finally { + base.detach(previous); + } + assertEquals(1, queuedExecutor.runnables.size()); + queuedExecutor.runnables.remove().run(); + assertSame(base, observed); + } + + @Test + public void fixedContextExecutor() throws Exception { + Context base = Context.current().withValue(PET, "cat"); + QueuedExecutor queuedExecutor = new QueuedExecutor(); + base.fixedContextExecutor(queuedExecutor).execute(runner); + assertEquals(1, queuedExecutor.runnables.size()); + queuedExecutor.runnables.remove().run(); + assertSame(base, observed); + } + + @Test + public void typicalTryFinallyHandling() throws Exception { + Context base = Context.current().withValue(COLOR, "blue"); + Context previous = base.attach(); + try { + assertTrue(base.isCurrent()); + // Do something + } finally { + base.detach(previous); + } + assertFalse(base.isCurrent()); + } + + @Test + public void typicalCancellableTryCatchFinallyHandling() throws Exception { + Context.CancellableContext base = Context.current().withCancellation(); + Context previous = base.attach(); + try { + // Do something + throw new IllegalStateException("Argh"); + } catch (IllegalStateException ise) { + base.cancel(ise); + } finally { + base.detachAndCancel(previous, null); + } + assertTrue(base.isCancelled()); + assertNotNull(base.cancellationCause()); + } + + @Test + public void rootHasNoDeadline() { + assertNull(Context.ROOT.getDeadline()); + } + + @Test + public void contextWithDeadlineHasDeadline() { + Context.CancellableContext cancellableContext = + Context.ROOT.withDeadlineAfter(1, TimeUnit.SECONDS, scheduler); + assertNotNull(cancellableContext.getDeadline()); + } + + @Test + public void earlierParentDeadlineTakesPrecedenceOverLaterChildDeadline() throws Exception { + final Deadline sooner = Deadline.after(100, TimeUnit.MILLISECONDS); + final Deadline later = Deadline.after(1, TimeUnit.MINUTES); + Context.CancellableContext parent = Context.ROOT.withDeadline(sooner, scheduler); + Context.CancellableContext child = parent.withDeadline(later, scheduler); + assertSame(parent.getDeadline(), sooner); + assertSame(child.getDeadline(), sooner); + final CountDownLatch latch = new CountDownLatch(1); + final AtomicReference<Exception> error = new AtomicReference<Exception>(); + child.addListener(new Context.CancellationListener() { + @Override + public void cancelled(Context context) { + try { + assertTrue(sooner.isExpired()); + assertFalse(later.isExpired()); + } catch (Exception e) { + error.set(e); + } + latch.countDown(); + } + }, MoreExecutors.directExecutor()); + latch.await(3, TimeUnit.SECONDS); + if (error.get() != null) { + throw error.get(); + } + } + + @Test + public void earlierChldDeadlineTakesPrecedenceOverLaterParentDeadline() { + Deadline sooner = Deadline.after(1, TimeUnit.HOURS); + Deadline later = Deadline.after(1, TimeUnit.DAYS); + Context.CancellableContext parent = Context.ROOT.withDeadline(later, scheduler); + Context.CancellableContext child = parent.withDeadline(sooner, scheduler); + assertSame(parent.getDeadline(), later); + assertSame(child.getDeadline(), sooner); + } + + @Test + public void forkingContextDoesNotCarryDeadline() { + Deadline deadline = Deadline.after(1, TimeUnit.HOURS); + Context.CancellableContext parent = Context.ROOT.withDeadline(deadline, scheduler); + Context fork = parent.fork(); + assertNull(fork.getDeadline()); + } + + @Test + public void cancellationDoesNotExpireDeadline() { + Deadline deadline = Deadline.after(1, TimeUnit.HOURS); + Context.CancellableContext parent = Context.ROOT.withDeadline(deadline, scheduler); + parent.cancel(null); + assertFalse(deadline.isExpired()); + } + + @Test + public void absoluteDeadlineTriggersAndPropagates() throws Exception { + Context base = Context.current().withDeadline(Deadline.after(1, TimeUnit.SECONDS), scheduler); + Context child = base.withValue(FOOD, "lasagna"); + child.addListener(cancellationListener, MoreExecutors.directExecutor()); + assertFalse(base.isCancelled()); + assertFalse(child.isCancelled()); + assertTrue(deadlineLatch.await(2, TimeUnit.SECONDS)); + assertTrue(base.isCancelled()); + assertTrue(base.cancellationCause() instanceof TimeoutException); + assertSame(child, listenerNotifedContext); + assertTrue(child.isCancelled()); + assertSame(base.cancellationCause(), child.cancellationCause()); + } + + @Test + public void relativeDeadlineTriggersAndPropagates() throws Exception { + Context base = Context.current().withDeadline(Deadline.after(1, TimeUnit.SECONDS), scheduler); + Context child = base.withValue(FOOD, "lasagna"); + child.addListener(cancellationListener, MoreExecutors.directExecutor()); + assertFalse(base.isCancelled()); + assertFalse(child.isCancelled()); + assertTrue(deadlineLatch.await(2, TimeUnit.SECONDS)); + assertTrue(base.isCancelled()); + assertTrue(base.cancellationCause() instanceof TimeoutException); + assertSame(child, listenerNotifedContext); + assertTrue(child.isCancelled()); + assertSame(base.cancellationCause(), child.cancellationCause()); + } + + @Test + public void innerDeadlineCompletesBeforeOuter() throws Exception { + Context base = Context.current().withDeadline(Deadline.after(2, TimeUnit.SECONDS), scheduler); + Context child = base.withDeadline(Deadline.after(1, TimeUnit.SECONDS), scheduler); + child.addListener(cancellationListener, MoreExecutors.directExecutor()); + assertFalse(base.isCancelled()); + assertFalse(child.isCancelled()); + assertTrue(deadlineLatch.await(2, TimeUnit.SECONDS)); + assertFalse(base.isCancelled()); + assertSame(child, listenerNotifedContext); + assertTrue(child.isCancelled()); + assertTrue(child.cancellationCause() instanceof TimeoutException); + + deadlineLatch = new CountDownLatch(1); + base.addListener(cancellationListener, MoreExecutors.directExecutor()); + assertTrue(deadlineLatch.await(2, TimeUnit.SECONDS)); + assertTrue(base.isCancelled()); + assertTrue(base.cancellationCause() instanceof TimeoutException); + assertNotSame(base.cancellationCause(), child.cancellationCause()); + } + + @Test + public void cancellationCancelsScheduledTask() { + ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + try { + assertEquals(0, executor.getQueue().size()); + Context.CancellableContext base + = Context.current().withDeadline(Deadline.after(1, TimeUnit.DAYS), executor); + assertEquals(1, executor.getQueue().size()); + base.cancel(null); + executor.purge(); + assertEquals(0, executor.getQueue().size()); + } finally { + executor.shutdown(); + } + } + + private static class QueuedExecutor implements Executor { + private final Queue<Runnable> runnables = new ArrayDeque<Runnable>(); + + @Override + public void execute(Runnable r) { + runnables.add(r); + } + } + + @Test + public void childContextListenerNotifiedAfterParentListener() { + Context.CancellableContext parent = Context.current().withCancellation(); + Context child = parent.withValue(COLOR, "red"); + final AtomicBoolean childAfterParent = new AtomicBoolean(); + final AtomicBoolean parentCalled = new AtomicBoolean(); + child.addListener(new Context.CancellationListener() { + @Override + public void cancelled(Context context) { + if (parentCalled.get()) { + childAfterParent.set(true); + } + } + }, MoreExecutors.directExecutor()); + parent.addListener(new Context.CancellationListener() { + @Override + public void cancelled(Context context) { + parentCalled.set(true); + } + }, MoreExecutors.directExecutor()); + parent.cancel(null); + assertTrue(parentCalled.get()); + assertTrue(childAfterParent.get()); + } + + @Test + public void expiredDeadlineShouldCancelContextImmediately() { + Context parent = Context.current(); + assertFalse(parent.isCancelled()); + + Context.CancellableContext context = parent.withDeadlineAfter(0, TimeUnit.SECONDS, scheduler); + assertTrue(context.isCancelled()); + assertThat(context.cancellationCause(), instanceOf(TimeoutException.class)); + + assertFalse(parent.isCancelled()); + Deadline deadline = Deadline.after(-10, TimeUnit.SECONDS); + assertTrue(deadline.isExpired()); + context = parent.withDeadline(deadline, scheduler); + assertTrue(context.isCancelled()); + assertThat(context.cancellationCause(), instanceOf(TimeoutException.class)); + } +} diff --git a/context/src/test/java/io/grpc/DeadlineTest.java b/context/src/test/java/io/grpc/DeadlineTest.java new file mode 100644 index 000000000..f85edf18c --- /dev/null +++ b/context/src/test/java/io/grpc/DeadlineTest.java @@ -0,0 +1,287 @@ +/* + * Copyright 2016, Google Inc. All rights reserved. + * + * Redistribution and use in source and binary forms, with or without + * modification, are permitted provided that the following conditions are + * met: + * + * * Redistributions of source code must retain the above copyright + * notice, this list of conditions and the following disclaimer. + * * Redistributions in binary form must reproduce the above + * copyright notice, this list of conditions and the following disclaimer + * in the documentation and/or other materials provided with the + * distribution. + * + * * Neither the name of Google Inc. nor the names of its + * contributors may be used to endorse or promote products derived from + * this software without specific prior written permission. + * + * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS + * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT + * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR + * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT + * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, + * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT + * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, + * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY + * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT + * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE + * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. + */ + +package io.grpc; + +import static com.google.common.truth.Truth.assertAbout; +import static io.grpc.testing.DeadlineSubject.deadline; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertSame; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.verify; + +import com.google.common.truth.Truth; + +import org.junit.Test; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; +import org.junit.runners.Parameterized.Parameters; +import org.mockito.ArgumentCaptor; + +import java.util.Arrays; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Tests for {@link Context}. + */ +@RunWith(Parameterized.class) +public class DeadlineTest { + /** Ticker epochs to vary testing. */ + @Parameters + public static Iterable<Object[]> data() { + return Arrays.asList(new Object[][] { + // MAX_VALUE / 2 is important because the signs are generally the same for past and future + // deadlines. + {Long.MAX_VALUE / 2}, {0}, {Long.MAX_VALUE}, {Long.MIN_VALUE} + }); + } + + private FakeTicker ticker = new FakeTicker(); + + public DeadlineTest(long epoch) { + ticker.reset(epoch); + } + + @Test + public void defaultTickerIsSystemTicker() { + Deadline d = Deadline.after(0, TimeUnit.SECONDS); + ticker.reset(System.nanoTime()); + Deadline reference = Deadline.after(0, TimeUnit.SECONDS, ticker); + // Allow inaccuracy to account for system time advancing during test. + assertAbout(deadline()).that(d).isWithin(1, TimeUnit.SECONDS).of(reference); + } + + @Test + public void timeCanOverflow() { + ticker.reset(Long.MAX_VALUE); + Deadline d = Deadline.after(10, TimeUnit.DAYS, ticker); + assertEquals(10, d.timeRemaining(TimeUnit.DAYS)); + assertTrue(Deadline.after(0, TimeUnit.DAYS, ticker).isBefore(d)); + assertFalse(d.isExpired()); + + ticker.increment(10, TimeUnit.DAYS); + assertTrue(d.isExpired()); + } + + @Test + public void timeCanUnderflow() { + ticker.reset(Long.MIN_VALUE); + Deadline d = Deadline.after(-10, TimeUnit.DAYS, ticker); + assertEquals(-10, d.timeRemaining(TimeUnit.DAYS)); + assertTrue(d.isBefore(Deadline.after(0, TimeUnit.DAYS, ticker))); + assertTrue(d.isExpired()); + } + + @Test + public void deadlineClamps() { + Deadline d = Deadline.after(-300 * 365, TimeUnit.DAYS, ticker); + Deadline d2 = Deadline.after(300 * 365, TimeUnit.DAYS, ticker); + assertTrue(d.isBefore(d2)); + + Deadline d3 = Deadline.after(-200 * 365, TimeUnit.DAYS, ticker); + // d and d3 are equal + assertFalse(d.isBefore(d3)); + assertFalse(d3.isBefore(d)); + } + + @Test + public void immediateDeadlineIsExpired() { + Deadline deadline = Deadline.after(0, TimeUnit.SECONDS, ticker); + assertTrue(deadline.isExpired()); + } + + @Test + public void shortDeadlineEventuallyExpires() throws Exception { + Deadline d = Deadline.after(100, TimeUnit.MILLISECONDS, ticker); + assertTrue(d.timeRemaining(TimeUnit.NANOSECONDS) > 0); + assertFalse(d.isExpired()); + ticker.increment(101, TimeUnit.MILLISECONDS); + + assertTrue(d.isExpired()); + assertEquals(-1, d.timeRemaining(TimeUnit.MILLISECONDS)); + } + + @Test + public void deadlineMatchesLongValue() { + assertEquals(10, Deadline.after(10, TimeUnit.MINUTES, ticker).timeRemaining(TimeUnit.MINUTES)); + } + + @Test + public void pastDeadlineIsExpired() { + Deadline d = Deadline.after(-1, TimeUnit.SECONDS, ticker); + assertTrue(d.isExpired()); + assertEquals(-1000, d.timeRemaining(TimeUnit.MILLISECONDS)); + } + + @Test + public void deadlineDoesNotOverflowOrUnderflow() { + Deadline after = Deadline.after(Long.MAX_VALUE, TimeUnit.NANOSECONDS, ticker); + assertFalse(after.isExpired()); + + Deadline before = Deadline.after(-Long.MAX_VALUE, TimeUnit.NANOSECONDS, ticker); + assertTrue(before.isExpired()); + + assertTrue(before.isBefore(after)); + } + + @Test + public void beforeExpiredDeadlineIsExpired() { + Deadline base = Deadline.after(0, TimeUnit.SECONDS, ticker); + assertTrue(base.isExpired()); + assertTrue(base.offset(-1, TimeUnit.SECONDS).isExpired()); + } + + @Test + public void beforeNotExpiredDeadlineMayBeExpired() { + Deadline base = Deadline.after(10, TimeUnit.SECONDS, ticker); + assertFalse(base.isExpired()); + assertFalse(base.offset(-1, TimeUnit.SECONDS).isExpired()); + assertTrue(base.offset(-11, TimeUnit.SECONDS).isExpired()); + } + + @Test + public void afterExpiredDeadlineMayBeExpired() { + Deadline base = Deadline.after(-10, TimeUnit.SECONDS, ticker); + assertTrue(base.isExpired()); + assertTrue(base.offset(1, TimeUnit.SECONDS).isExpired()); + assertFalse(base.offset(11, TimeUnit.SECONDS).isExpired()); + } + + @Test + public void zeroOffsetIsSameDeadline() { + Deadline base = Deadline.after(0, TimeUnit.SECONDS, ticker); + assertSame(base, base.offset(0, TimeUnit.SECONDS)); + } + + @Test + public void runOnEventualExpirationIsExecuted() throws Exception { + Deadline base = Deadline.after(50, TimeUnit.MICROSECONDS, ticker); + ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class); + final AtomicBoolean executed = new AtomicBoolean(); + base.runOnExpiration( + new Runnable() { + @Override + public void run() { + executed.set(true); + } + }, mockScheduler); + assertFalse(executed.get()); + ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(mockScheduler).schedule(runnableCaptor.capture(), eq(50000L), eq(TimeUnit.NANOSECONDS)); + runnableCaptor.getValue().run(); + assertTrue(executed.get()); + } + + @Test + public void runOnAlreadyExpiredIsExecutedOnExecutor() throws Exception { + Deadline base = Deadline.after(0, TimeUnit.MICROSECONDS, ticker); + ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class); + final AtomicBoolean executed = new AtomicBoolean(); + base.runOnExpiration( + new Runnable() { + @Override + public void run() { + executed.set(true); + } + }, mockScheduler); + assertFalse(executed.get()); + ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class); + verify(mockScheduler).schedule(runnableCaptor.capture(), eq(0L), eq(TimeUnit.NANOSECONDS)); + runnableCaptor.getValue().run(); + assertTrue(executed.get()); + } + + @Test + public void toString_exact() { + Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS, ticker); + assertEquals("0 ns from now", d.toString()); + } + + @Test + public void toString_after() { + Deadline d = Deadline.after(-1, TimeUnit.MINUTES, ticker); + assertEquals("-60000000000 ns from now", d.toString()); + } + + @Test + public void compareTo_greater() { + Deadline d1 = Deadline.after(10, TimeUnit.SECONDS, ticker); + ticker.increment(1, TimeUnit.NANOSECONDS); + Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker); + Truth.assertThat(d2).isGreaterThan(d1); + } + + @Test + public void compareTo_less() { + Deadline d1 = Deadline.after(10, TimeUnit.SECONDS, ticker); + ticker.increment(1, TimeUnit.NANOSECONDS); + Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker); + Truth.assertThat(d1).isLessThan(d2); + } + + @Test + public void compareTo_same() { + Deadline d1 = Deadline.after(10, TimeUnit.SECONDS, ticker); + Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker); + Truth.assertThat(d1).isEquivalentAccordingToCompareTo(d2); + } + + @Test + public void toString_before() { + Deadline d = Deadline.after(12, TimeUnit.MICROSECONDS, ticker); + assertEquals("12000 ns from now", d.toString()); + } + + private static class FakeTicker extends Deadline.Ticker { + private long time; + + @Override + public long read() { + return time; + } + + public void reset(long time) { + this.time = time; + } + + public void increment(long period, TimeUnit unit) { + if (period < 0) { + throw new IllegalArgumentException(); + } + this.time += unit.toNanos(period); + } + } +} |