aboutsummaryrefslogtreecommitdiff
path: root/context
diff options
context:
space:
mode:
authorKun Zhang <zhangkun@google.com>2016-09-01 16:00:43 -0700
committerKun Zhang <zhangkun@google.com>2016-09-02 13:18:35 -0700
commitc4f7f5c4fd3d4f2909ea92f879c4b882098032a9 (patch)
tree2c69ec97ce39765f6e408aa93a7eae20d2d4fd90 /context
parent58d78dd0aae93b23d9351cd03876baaf37164b73 (diff)
downloadgrpc-grpc-java-c4f7f5c4fd3d4f2909ea92f879c4b882098032a9.tar.gz
core: split Context into a separate grpc-context artifact.
The Context API is not particularly gRPC-specific, and will be used by Census as its context propagation mechanism. Removed all dependencies to make it easy for other libraries to depend on.
Diffstat (limited to 'context')
-rw-r--r--context/build.gradle14
-rw-r--r--context/src/main/java/io/grpc/Context.java858
-rw-r--r--context/src/main/java/io/grpc/Deadline.java186
-rw-r--r--context/src/test/java/io/grpc/ContextTest.java746
-rw-r--r--context/src/test/java/io/grpc/DeadlineTest.java287
5 files changed, 2091 insertions, 0 deletions
diff --git a/context/build.gradle b/context/build.gradle
new file mode 100644
index 000000000..0566f6a0f
--- /dev/null
+++ b/context/build.gradle
@@ -0,0 +1,14 @@
+plugins {
+ id "be.insaneprogramming.gradle.animalsniffer" version "1.4.0"
+}
+
+description = 'gRPC: Context'
+
+dependencies {
+ testCompile project(':grpc-testing')
+}
+
+// Configure the animal sniffer plugin
+animalsniffer {
+ signature = "org.codehaus.mojo.signature:java16:+@signature"
+}
diff --git a/context/src/main/java/io/grpc/Context.java b/context/src/main/java/io/grpc/Context.java
new file mode 100644
index 000000000..27a736750
--- /dev/null
+++ b/context/src/main/java/io/grpc/Context.java
@@ -0,0 +1,858 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import java.util.ArrayList;
+import java.util.concurrent.Callable;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.logging.Level;
+import java.util.logging.Logger;
+
+/**
+ * A context propagation mechanism which can carry scoped-values across API boundaries and between
+ * threads. Examples of state propagated via context include:
+ * <ul>
+ * <li>Security principals and credentials.</li>
+ * <li>Local and distributed tracing information.</li>
+ * </ul>
+ *
+ * <p>Context objects make their state available by being attached to the executing thread using
+ * a {@link ThreadLocal}. The context object bound to a thread is considered {@link #current()}.
+ * Context objects are immutable and inherit state from their parent. To add or overwrite the
+ * current state a new context object must be created and then attached to the thread replacing the
+ * previously bound context. For example:
+ * <pre>
+ * Context withCredential = Context.current().withValue(CRED_KEY, cred);
+ * executorService.execute(withCredential.wrap(new Runnable() {
+ * public void run() {
+ * readUserRecords(userId, CRED_KEY.get());
+ * }
+ * }));
+
+ * </pre>
+ *
+ *
+ * <p>Contexts are also used to represent a scoped unit of work. When the unit of work is
+ * done the context can be cancelled. This cancellation will also cascade to all descendant
+ * contexts. You can add a {@link CancellationListener} to a context to be notified when it or
+ * one of its ancestors has been cancelled. Cancellation does not release the state stored by
+ * a context and it's perfectly valid to {@link #attach()} an already cancelled context to a
+ * thread to make it current. To cancel a context (and its descendants) you first create a
+ * {@link CancellableContext} and when you need to signal cancellation call
+ * {@link CancellableContext#cancel} or {@link CancellableContext#detachAndCancel}. For example:
+ * <pre>
+ * CancellableContext withCancellation = Context.current().withCancellation();
+ * try {
+ * executorService.execute(withCancellation.wrap(new Runnable() {
+ * public void run() {
+ * while (waitingForData() &amp;&amp; !Context.current().isCancelled()) {}
+ * }
+ * });
+ * doSomeWork();
+ * } catch (Throwable t) {
+ * withCancellation.cancel(t);
+ * }
+ * </pre>
+ *
+ * <p>Contexts can also be created with a timeout relative to the system nano clock which will
+ * cause it to automatically cancel at the desired time.
+ *
+ *
+ * <p>Notes and cautions on use:
+ * <ul>
+ * <li>While Context objects are immutable they do not place such a restriction on the state
+ * they store.</li>
+ * <li>Context is not intended for passing optional parameters to an API and developers should
+ * take care to avoid excessive dependence on context when designing an API.</li>
+ * <li>If Context is being used in an environment that needs to support class unloading it is the
+ * responsibility of the application to ensure that all contexts are properly cancelled.</li>
+ * </ul>
+ */
+public class Context {
+
+ private static final Logger log = Logger.getLogger(Context.class.getName());
+
+ private static final Object[][] EMPTY_ENTRIES = new Object[0][2];
+
+ private static final Key<Deadline> DEADLINE_KEY = new Key<Deadline>("deadline");
+
+ /**
+ * The logical root context which is {@link #current()} if no other context is bound. This context
+ * is not cancellable and so will not cascade cancellation or retain listeners.
+ */
+ public static final Context ROOT = new Context(null);
+
+ /**
+ * Currently bound context.
+ */
+ private static final ThreadLocal<Context> localContext = new ThreadLocal<Context>() {
+ @Override
+ protected Context initialValue() {
+ return ROOT;
+ }
+ };
+
+ /**
+ * Create a {@link Key} with the given debug name. Multiple different keys may have the same name;
+ * the name is intended for debugging purposes and does not impact behavior.
+ */
+ public static <T> Key<T> key(String name) {
+ return new Key<T>(name);
+ }
+
+ /**
+ * Create a {@link Key} with the given debug name and default value. Multiple different keys may
+ * have the same name; the name is intended for debugging purposes and does not impact behavior.
+ */
+ public static <T> Key<T> keyWithDefault(String name, T defaultValue) {
+ return new Key<T>(name, defaultValue);
+ }
+
+ /**
+ * Return the context associated with the current thread, will never return {@code null} as
+ * the {@link #ROOT} context is implicitly associated with all threads.
+ *
+ * <p>Will never return {@link CancellableContext} even if one is attached, instead a
+ * {@link Context} is returned with the same properties and lifetime. This is to avoid
+ * code stealing the ability to cancel arbitrarily.
+ */
+ public static Context current() {
+ Context current = localContext.get();
+ if (current == null) {
+ return ROOT;
+ }
+ return current;
+ }
+
+ private final Context parent;
+ private final Object[][] keyValueEntries;
+ private final boolean cascadesCancellation;
+ private ArrayList<ExecutableListener> listeners;
+ private CancellationListener parentListener = new ParentListener();
+ private final boolean canBeCancelled;
+
+ /**
+ * Construct a context that cannot be cancelled and will not cascade cancellation from its parent.
+ */
+ private Context(Context parent) {
+ this.parent = parent;
+ // Not inheriting cancellation implies not inheriting a deadline too.
+ keyValueEntries = new Object[][]{{DEADLINE_KEY, null}};
+ cascadesCancellation = false;
+ canBeCancelled = false;
+ }
+
+ /**
+ * Construct a context that cannot be cancelled but will cascade cancellation from its parent if
+ * it is cancellable.
+ */
+ private Context(Context parent, Object[][] keyValueEntries) {
+ this.parent = parent;
+ this.keyValueEntries = keyValueEntries;
+ cascadesCancellation = true;
+ canBeCancelled = this.parent != null && this.parent.canBeCancelled;
+ }
+
+ /**
+ * Construct a context that can be cancelled and will cascade cancellation from its parent if
+ * it is cancellable.
+ */
+ private Context(Context parent, Object[][] keyValueEntries, boolean isCancellable) {
+ this.parent = parent;
+ this.keyValueEntries = keyValueEntries;
+ cascadesCancellation = true;
+ canBeCancelled = isCancellable;
+ }
+
+ /**
+ * Create a new context which is independently cancellable and also cascades cancellation from
+ * its parent. Callers should ensure that either {@link CancellableContext#cancel(Throwable)}
+ * or {@link CancellableContext#detachAndCancel(Context, Throwable)} are called to notify
+ * listeners and release the resources associated with them.
+ *
+ * <p>Sample usage:
+ * <pre>
+ * Context.CancellableContext withCancellation = Context.current().withCancellation();
+ * try {
+ * executorService.execute(withCancellation.wrap(new Runnable() {
+ * public void run() {
+ * Context current = Context.current();
+ * while (!current.isCancelled()) {
+ * keepWorking();
+ * }
+ * }
+ * });
+ * doSomethingRelatedWork();
+ * } catch (Throwable t) {
+ * withCancellation.cancel(t);
+ * }
+ * </pre>
+ */
+ public CancellableContext withCancellation() {
+ return new CancellableContext(this);
+ }
+
+ /**
+ * Create a new context which will cancel itself after the given {@code duration} from now.
+ * The returned context will cascade cancellation of its parent. Callers may explicitly cancel
+ * the returned context prior to the deadline just as for {@link #withCancellation()},
+ *
+ * <p>Sample usage:
+ * <pre>
+ * Context.CancellableContext withDeadline = Context.current().withDeadlineAfter(5,
+ * TimeUnit.SECONDS, scheduler);
+ * executorService.execute(withDeadline.wrap(new Runnable() {
+ * public void run() {
+ * Context current = Context.current();
+ * while (!current.isCancelled()) {
+ * keepWorking();
+ * }
+ * }
+ * });
+ * </pre>
+ */
+ public CancellableContext withDeadlineAfter(long duration, TimeUnit unit,
+ ScheduledExecutorService scheduler) {
+ return withDeadline(Deadline.after(duration, unit), scheduler);
+ }
+
+ /**
+ * Create a new context which will cancel itself at the given {@link Deadline}.
+ * The returned context will cascade cancellation of its parent. Callers may explicitly cancel
+ * the returned context prior to the deadline just as for {@link #withCancellation()},
+ *
+ * <p>Sample usage:
+ * <pre>
+ * Context.CancellableContext withDeadline = Context.current()
+ * .withDeadline(someReceivedDeadline);
+ * executorService.execute(withDeadline.wrap(new Runnable() {
+ * public void run() {
+ * Context current = Context.current();
+ * while (!current.isCancelled()) {
+ * keepWorking();
+ * }
+ * }
+ * });
+ * </pre>
+ */
+ public CancellableContext withDeadline(Deadline deadline,
+ ScheduledExecutorService scheduler) {
+ checkNotNull(deadline, "deadline");
+ checkNotNull(scheduler, "scheduler");
+ return new CancellableContext(this, deadline, scheduler);
+ }
+
+ /**
+ * Create a new context with the given key value set. The new context will cascade cancellation
+ * from its parent.
+ *
+ <pre>
+ * Context withCredential = Context.current().withValue(CRED_KEY, cred);
+ * executorService.execute(withCredential.wrap(new Runnable() {
+ * public void run() {
+ * readUserRecords(userId, CRED_KEY.get());
+ * }
+ * }));
+ * </pre>
+ *
+ */
+ public <V> Context withValue(Key<V> k1, V v1) {
+ return new Context(this, new Object[][]{{k1, v1}});
+ }
+
+ /**
+ * Create a new context with the given key value set. The new context will cascade cancellation
+ * from its parent.
+ */
+ public <V1, V2> Context withValues(Key<V1> k1, V1 v1, Key<V2> k2, V2 v2) {
+ return new Context(this, new Object[][]{{k1, v1}, {k2, v2}});
+ }
+
+ /**
+ * Create a new context with the given key value set. The new context will cascade cancellation
+ * from its parent.
+ */
+ public <V1, V2, V3> Context withValues(Key<V1> k1, V1 v1, Key<V2> k2, V2 v2, Key<V3> k3, V3 v3) {
+ return new Context(this, new Object[][]{{k1, v1}, {k2, v2}, {k3, v3}});
+ }
+
+ /**
+ * Create a new context which propagates the values of this context but does not cascade its
+ * cancellation.
+ */
+ public Context fork() {
+ return new Context(this);
+ }
+
+ boolean canBeCancelled() {
+ // A context is cancellable if it cascades from its parent and its parent is
+ // cancellable or is itself directly cancellable..
+ return canBeCancelled;
+ }
+
+ /**
+ * Attach this context to the thread and make it {@link #current}, the previously current context
+ * is returned. It is allowed to attach contexts where {@link #isCancelled()} is {@code true}.
+ *
+ * <p>Instead of using {@link #attach()} & {@link #detach(Context)} most use-cases are better
+ * served by using the {@link #run(Runnable)} or {@link #call(java.util.concurrent.Callable)}
+ * to execute work immediately within a context. If work needs to be done in other threads
+ * it is recommended to use the 'wrap' methods or to use a propagating executor.
+ */
+ public Context attach() {
+ Context previous = current();
+ localContext.set(this);
+ return previous;
+ }
+
+ /**
+ * Detach the current context from the thread and attach the provided replacement. If this
+ * context is not {@link #current()} a SEVERE message will be logged but the context to attach
+ * will still be bound.
+ */
+ public void detach(Context toAttach) {
+ checkNotNull(toAttach, "toAttach");
+ if (toAttach.attach() != this) {
+ // Log a severe message instead of throwing an exception as the context to attach is assumed
+ // to be the correct one and the unbalanced state represents a coding mistake in a lower
+ // layer in the stack that cannot be recovered from here.
+ log.log(Level.SEVERE, "Context was not attached when detaching",
+ new Throwable().fillInStackTrace());
+ }
+ }
+
+ // Visible for testing
+ boolean isCurrent() {
+ return current() == this;
+ }
+
+ /**
+ * Is this context cancelled.
+ */
+ public boolean isCancelled() {
+ if (parent == null || !cascadesCancellation) {
+ return false;
+ } else {
+ return parent.isCancelled();
+ }
+ }
+
+ /**
+ * If a context {@link #isCancelled()} then return the cause of the cancellation or
+ * {@code null} if context was cancelled without a cause. If the context is not yet cancelled
+ * will always return {@code null}.
+ *
+ * <p>The cancellation cause is provided for informational purposes only and implementations
+ * should generally assume that it has already been handled and logged properly.
+ */
+ public Throwable cancellationCause() {
+ if (parent == null || !cascadesCancellation) {
+ return null;
+ } else {
+ return parent.cancellationCause();
+ }
+ }
+
+ /**
+ * A context may have an associated {@link Deadline} at which it will be automatically cancelled.
+ * @return A {@link io.grpc.Deadline} or {@code null} if no deadline is set.
+ */
+ public Deadline getDeadline() {
+ return DEADLINE_KEY.get(this);
+ }
+
+ /**
+ * Add a listener that will be notified when the context becomes cancelled.
+ */
+ public void addListener(final CancellationListener cancellationListener,
+ final Executor executor) {
+ checkNotNull(cancellationListener, "cancellationListener");
+ checkNotNull(executor, "executor");
+ if (canBeCancelled) {
+ ExecutableListener executableListener =
+ new ExecutableListener(executor, cancellationListener);
+ synchronized (this) {
+ if (isCancelled()) {
+ executableListener.deliver();
+ } else {
+ if (listeners == null) {
+ // Now that we have a listener we need to listen to our parent so
+ // we can cascade listener notification.
+ listeners = new ArrayList<ExecutableListener>();
+ listeners.add(executableListener);
+ parent.addListener(parentListener, DirectExecutor.INSTANCE);
+ } else {
+ listeners.add(executableListener);
+ }
+ }
+ }
+ }
+ }
+
+ /**
+ * Remove a {@link CancellationListener}.
+ */
+ public void removeListener(CancellationListener cancellationListener) {
+ if (!canBeCancelled) {
+ return;
+ }
+ synchronized (this) {
+ if (listeners != null) {
+ for (int i = listeners.size() - 1; i >= 0; i--) {
+ if (listeners.get(i).listener == cancellationListener) {
+ listeners.remove(i);
+ // Just remove the first matching listener, given that we allow duplicate
+ // adds we should allow for duplicates after remove.
+ break;
+ }
+ }
+ // We have no listeners so no need to listen to our parent
+ if (listeners.isEmpty()) {
+ parent.removeListener(parentListener);
+ listeners = null;
+ }
+ }
+ }
+ }
+
+ /**
+ * Notify all listeners that this context has been cancelled and immediately release
+ * any reference to them so that they may be garbage collected.
+ */
+ void notifyAndClearListeners() {
+ if (!canBeCancelled) {
+ return;
+ }
+ ArrayList<ExecutableListener> tmpListeners;
+ synchronized (this) {
+ if (listeners == null) {
+ return;
+ }
+ tmpListeners = listeners;
+ listeners = null;
+ }
+ // Deliver events to non-child context listeners before we notify child contexts. We do this
+ // to cancel higher level units of work before child units. This allows for a better error
+ // handling paradigm where the higher level unit of work knows it is cancelled and so can
+ // ignore errors that bubble up as a result of cancellation of lower level units.
+ for (int i = 0; i < tmpListeners.size(); i++) {
+ if (!(tmpListeners.get(i).listener instanceof ParentListener)) {
+ tmpListeners.get(i).deliver();
+ }
+ }
+ for (int i = 0; i < tmpListeners.size(); i++) {
+ if (tmpListeners.get(i).listener instanceof ParentListener) {
+ tmpListeners.get(i).deliver();
+ }
+ }
+ parent.removeListener(parentListener);
+ }
+
+ // Used in tests to ensure that listeners are defined and released when cancellation cascades.
+ // It's very important to ensure that we do not accidentally retain listeners.
+ int listenerCount() {
+ synchronized (this) {
+ return listeners == null ? 0 : listeners.size();
+ }
+ }
+
+ /**
+ * Immediately run a {@link Runnable} with this context as the {@link #current} context.
+ * @param r {@link Runnable} to run.
+ */
+ public void run(Runnable r) {
+ Context previous = attach();
+ try {
+ r.run();
+ } finally {
+ detach(previous);
+ }
+ }
+
+ /**
+ * Immediately call a {@link Callable} with this context as the {@link #current} context.
+ * @param c {@link Callable} to call.
+ * @return result of call.
+ */
+ public <V> V call(Callable<V> c) throws Exception {
+ Context previous = attach();
+ try {
+ return c.call();
+ } finally {
+ detach(previous);
+ }
+ }
+
+ /**
+ * Wrap a {@link Runnable} so that it executes with this context as the {@link #current} context.
+ */
+ public Runnable wrap(final Runnable r) {
+ return new Runnable() {
+ @Override
+ public void run() {
+ Context previous = attach();
+ try {
+ r.run();
+ } finally {
+ detach(previous);
+ }
+ }
+ };
+ }
+
+ /**
+ * Wrap a {@link Callable} so that it executes with this context as the {@link #current} context.
+ */
+ public <C> Callable<C> wrap(final Callable<C> c) {
+ return new Callable<C>() {
+ @Override
+ public C call() throws Exception {
+ Context previous = attach();
+ try {
+ return c.call();
+ } finally {
+ detach(previous);
+ }
+ }
+ };
+ }
+
+ /**
+ * Wrap an {@link Executor} so that it always executes with this context as the {@link #current}
+ * context. It is generally expected that {@link #currentContextExecutor(Executor)} would be
+ * used more commonly than this method.
+ *
+ * <p>One scenario in which this executor may be useful is when a single thread is sharding work
+ * to multiple threads.
+ *
+ * @see #currentContextExecutor(Executor)
+ */
+ public Executor fixedContextExecutor(final Executor e) {
+ class FixedContextExecutor implements Executor {
+ @Override
+ public void execute(Runnable r) {
+ e.execute(wrap(r));
+ }
+ }
+
+ return new FixedContextExecutor();
+ }
+
+ /**
+ * Create an executor that propagates the {@link #current} context when {@link Executor#execute}
+ * is called as the {@link #current} context of the {@code Runnable} scheduled. <em>Note that this
+ * is a static method.</em>
+ *
+ * @see #fixedContextExecutor(Executor)
+ */
+ public static Executor currentContextExecutor(final Executor e) {
+ class CurrentContextExecutor implements Executor {
+ @Override
+ public void execute(Runnable r) {
+ e.execute(Context.current().wrap(r));
+ }
+ }
+
+ return new CurrentContextExecutor();
+ }
+
+ /**
+ * Lookup the value for a key in the context inheritance chain.
+ */
+ private Object lookup(Key<?> key) {
+ for (int i = 0; i < keyValueEntries.length; i++) {
+ if (key.equals(keyValueEntries[i][0])) {
+ return keyValueEntries[i][1];
+ }
+ }
+ if (parent == null) {
+ return null;
+ }
+ return parent.lookup(key);
+ }
+
+ /**
+ * A context which inherits cancellation from its parent but which can also be independently
+ * cancelled and which will propagate cancellation to its descendants.
+ */
+ public static final class CancellableContext extends Context {
+
+ private boolean cancelled;
+ private Throwable cancellationCause;
+ private final Context uncancellableSurrogate;
+ private ScheduledFuture<?> pendingDeadline;
+
+ /**
+ * If the parent deadline is before the given deadline there is no need to install the value
+ * or listen for its expiration as the parent context will already be listening for it.
+ */
+ private static Object[][] deriveDeadline(Context parent, Deadline deadline) {
+ Deadline parentDeadline = DEADLINE_KEY.get(parent);
+ return parentDeadline == null || deadline.isBefore(parentDeadline)
+ ? new Object[][]{{ DEADLINE_KEY, deadline}} :
+ EMPTY_ENTRIES;
+ }
+
+ /**
+ * Create a cancellable context that does not have a deadline.
+ */
+ private CancellableContext(Context parent) {
+ super(parent, EMPTY_ENTRIES, true);
+ // Create a surrogate that inherits from this to attach so that you cannot retrieve a
+ // cancellable context from Context.current()
+ uncancellableSurrogate = new Context(this, EMPTY_ENTRIES);
+ }
+
+ /**
+ * Create a cancellable context that has a deadline.
+ */
+ private CancellableContext(Context parent, Deadline deadline,
+ ScheduledExecutorService scheduler) {
+ super(parent, deriveDeadline(parent, deadline), true);
+ if (DEADLINE_KEY.get(this) == deadline) {
+ final TimeoutException cause = new TimeoutException("context timed out");
+ if (!deadline.isExpired()) {
+ // The parent deadline was after the new deadline so we need to install a listener
+ // on the new earlier deadline to trigger expiration for this context.
+ pendingDeadline = deadline.runOnExpiration(new Runnable() {
+ @Override
+ public void run() {
+ try {
+ cancel(cause);
+ } catch (Throwable t) {
+ log.log(Level.SEVERE, "Cancel threw an exception, which should not happen", t);
+ }
+ }
+ }, scheduler);
+ } else {
+ // Cancel immediately if the deadline is already expired.
+ cancel(cause);
+ }
+ }
+ uncancellableSurrogate = new Context(this, EMPTY_ENTRIES);
+ }
+
+
+ @Override
+ public Context attach() {
+ return uncancellableSurrogate.attach();
+ }
+
+ @Override
+ public void detach(Context toAttach) {
+ uncancellableSurrogate.detach(toAttach);
+ }
+
+ @Override
+ public boolean isCurrent() {
+ return uncancellableSurrogate.isCurrent();
+ }
+
+ /**
+ * Cancel this context and optionally provide a cause (can be {@code null}) for the
+ * cancellation. This will trigger notification of listeners.
+ *
+ * @return {@code true} if this context cancelled the context and notified listeners,
+ * {@code false} if the context was already cancelled.
+ */
+ public boolean cancel(Throwable cause) {
+ boolean triggeredCancel = false;
+ synchronized (this) {
+ if (!cancelled) {
+ cancelled = true;
+ if (pendingDeadline != null) {
+ // If we have a scheduled cancellation pending attempt to cancel it.
+ pendingDeadline.cancel(false);
+ pendingDeadline = null;
+ }
+ this.cancellationCause = cause;
+ triggeredCancel = true;
+ }
+ }
+ if (triggeredCancel) {
+ notifyAndClearListeners();
+ }
+ return triggeredCancel;
+ }
+
+ /**
+ * Cancel this context and detach it as the current context from the thread.
+ *
+ * @param toAttach context to make current.
+ * @param cause of cancellation, can be {@code null}.
+ */
+ public void detachAndCancel(Context toAttach, Throwable cause) {
+ try {
+ detach(toAttach);
+ } finally {
+ cancel(cause);
+ }
+ }
+
+ @Override
+ public boolean isCancelled() {
+ synchronized (this) {
+ if (cancelled) {
+ return true;
+ }
+ }
+ // Detect cancellation of parent in the case where we have no listeners and
+ // record it.
+ if (super.isCancelled()) {
+ cancel(super.cancellationCause());
+ return true;
+ }
+ return false;
+ }
+
+ @Override
+ public Throwable cancellationCause() {
+ if (isCancelled()) {
+ return cancellationCause;
+ }
+ return null;
+ }
+ }
+
+ /**
+ * A listener notified on context cancellation.
+ */
+ public interface CancellationListener {
+ /**
+ * @param context the newly cancelled context.
+ */
+ public void cancelled(Context context);
+ }
+
+ /**
+ * Key for indexing values stored in a context.
+ */
+ public static class Key<T> {
+
+ private final String name;
+ private final T defaultValue;
+
+ Key(String name) {
+ this(name, null);
+ }
+
+ Key(String name, T defaultValue) {
+ this.name = checkNotNull(name, "name");
+ this.defaultValue = defaultValue;
+ }
+
+ /**
+ * Get the value from the {@link #current()} context for this key.
+ */
+ @SuppressWarnings("unchecked")
+ public T get() {
+ return get(Context.current());
+ }
+
+ /**
+ * Get the value from the specified context for this key.
+ */
+ @SuppressWarnings("unchecked")
+ public T get(Context context) {
+ T value = (T) context.lookup(this);
+ return value == null ? defaultValue : value;
+ }
+
+ @Override
+ public String toString() {
+ return name;
+ }
+ }
+
+ /**
+ * Stores listener & executor pair.
+ */
+ private class ExecutableListener implements Runnable {
+ private final Executor executor;
+ private final CancellationListener listener;
+
+ private ExecutableListener(Executor executor, CancellationListener listener) {
+ this.executor = executor;
+ this.listener = listener;
+ }
+
+ private void deliver() {
+ try {
+ executor.execute(this);
+ } catch (Throwable t) {
+ log.log(Level.INFO, "Exception notifying context listener", t);
+ }
+ }
+
+ @Override
+ public void run() {
+ listener.cancelled(Context.this);
+ }
+ }
+
+ private class ParentListener implements CancellationListener {
+ @Override
+ public void cancelled(Context context) {
+ if (Context.this instanceof CancellableContext) {
+ // Record cancellation with its cancellationCause.
+ ((CancellableContext) Context.this).cancel(context.cancellationCause());
+ } else {
+ notifyAndClearListeners();
+ }
+ }
+ }
+
+ private static <T> T checkNotNull(T reference, Object errorMessage) {
+ if (reference == null) {
+ throw new NullPointerException(String.valueOf(errorMessage));
+ }
+ return reference;
+ }
+
+ private enum DirectExecutor implements Executor {
+ INSTANCE;
+
+ @Override
+ public void execute(Runnable command) {
+ command.run();
+ }
+
+ @Override
+ public String toString() {
+ return "Context.DirectExecutor";
+ }
+ }
+}
diff --git a/context/src/main/java/io/grpc/Deadline.java b/context/src/main/java/io/grpc/Deadline.java
new file mode 100644
index 000000000..c3ab53e32
--- /dev/null
+++ b/context/src/main/java/io/grpc/Deadline.java
@@ -0,0 +1,186 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledFuture;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * An absolute deadline in system time.
+ */
+public final class Deadline implements Comparable<Deadline> {
+ private static final SystemTicker SYSTEM_TICKER = new SystemTicker();
+ // nanoTime has a range of just under 300 years. Only allow up to 100 years in the past or future
+ // to prevent wraparound as long as process runs for less than ~100 years.
+ private static final long MAX_OFFSET = TimeUnit.DAYS.toNanos(100 * 365);
+ private static final long MIN_OFFSET = -MAX_OFFSET;
+
+ /**
+ * Create a deadline that will expire at the specified offset from the current system clock.
+ * @param duration A non-negative duration.
+ * @param units The time unit for the duration.
+ * @return A new deadline.
+ */
+ public static Deadline after(long duration, TimeUnit units) {
+ return after(duration, units, SYSTEM_TICKER);
+ }
+
+ // For testing
+ static Deadline after(long duration, TimeUnit units, Ticker ticker) {
+ checkNotNull(units, "units");
+ return new Deadline(ticker, units.toNanos(duration), true);
+ }
+
+ private final Ticker ticker;
+ private final long deadlineNanos;
+ private volatile boolean expired;
+
+ private Deadline(Ticker ticker, long offset, boolean baseInstantAlreadyExpired) {
+ this(ticker, ticker.read(), offset, baseInstantAlreadyExpired);
+ }
+
+ private Deadline(Ticker ticker, long baseInstant, long offset,
+ boolean baseInstantAlreadyExpired) {
+ this.ticker = ticker;
+ // Clamp to range [MIN_OFFSET, MAX_OFFSET]
+ offset = Math.min(MAX_OFFSET, Math.max(MIN_OFFSET, offset));
+ deadlineNanos = baseInstant + offset;
+ expired = baseInstantAlreadyExpired && offset <= 0;
+ }
+
+ /**
+ * Has this deadline expired
+ * @return {@code true} if it has, otherwise {@code false}.
+ */
+ public boolean isExpired() {
+ if (!expired) {
+ if (deadlineNanos - ticker.read() <= 0) {
+ expired = true;
+ } else {
+ return false;
+ }
+ }
+ return true;
+ }
+
+ /**
+ * Is {@code this} deadline before another.
+ */
+ public boolean isBefore(Deadline other) {
+ return this.deadlineNanos - other.deadlineNanos < 0;
+ }
+
+ /**
+ * Return the minimum deadline of {@code this} or an other deadline.
+ * @param other deadline to compare with {@code this}.
+ */
+ public Deadline minimum(Deadline other) {
+ return isBefore(other) ? this : other;
+ }
+
+ /**
+ * Create a new deadline that is offset from {@code this}.
+ */
+ // TODO(ejona): This method can cause deadlines to grow too far apart. For example:
+ // Deadline.after(100 * 365, DAYS).offset(100 * 365, DAYS) would be less than
+ // Deadline.after(-100 * 365, DAYS)
+ public Deadline offset(long offset, TimeUnit units) {
+ // May already be expired
+ if (offset == 0) {
+ return this;
+ }
+ return new Deadline(ticker, deadlineNanos, units.toNanos(offset), isExpired());
+ }
+
+ /**
+ * How much time is remaining in the specified time unit. Internal units are maintained as
+ * nanoseconds and conversions are subject to the constraints documented for
+ * {@link TimeUnit#convert}. If there is no time remaining, the returned duration is how
+ * long ago the deadline expired.
+ */
+ public long timeRemaining(TimeUnit unit) {
+ final long nowNanos = ticker.read();
+ if (!expired && deadlineNanos - nowNanos <= 0) {
+ expired = true;
+ }
+ return unit.convert(deadlineNanos - nowNanos, TimeUnit.NANOSECONDS);
+ }
+
+ /**
+ * Schedule a task to be run when the deadline expires.
+ * @param task to run on expiration
+ * @param scheduler used to execute the task
+ * @return {@link ScheduledFuture} which can be used to cancel execution of the task
+ */
+ public ScheduledFuture<?> runOnExpiration(Runnable task, ScheduledExecutorService scheduler) {
+ checkNotNull(task, "task");
+ checkNotNull(scheduler, "scheduler");
+ return scheduler.schedule(task, deadlineNanos - ticker.read(), TimeUnit.NANOSECONDS);
+ }
+
+ @Override
+ public String toString() {
+ return timeRemaining(TimeUnit.NANOSECONDS) + " ns from now";
+ }
+
+ @Override
+ public int compareTo(Deadline that) {
+ long diff = this.deadlineNanos - that.deadlineNanos;
+ if (diff < 0) {
+ return -1;
+ } else if (diff > 0) {
+ return 1;
+ }
+ return 0;
+ }
+
+ /** Time source representing nanoseconds since fixed but arbitrary point in time. */
+ abstract static class Ticker {
+ /** Returns the number of nanoseconds since this source's epoch. */
+ public abstract long read();
+ }
+
+ private static class SystemTicker extends Ticker {
+ @Override
+ public long read() {
+ return System.nanoTime();
+ }
+ }
+
+ private static <T> T checkNotNull(T reference, Object errorMessage) {
+ if (reference == null) {
+ throw new NullPointerException(String.valueOf(errorMessage));
+ }
+ return reference;
+ }
+}
diff --git a/context/src/test/java/io/grpc/ContextTest.java b/context/src/test/java/io/grpc/ContextTest.java
new file mode 100644
index 000000000..f36cdfae4
--- /dev/null
+++ b/context/src/test/java/io/grpc/ContextTest.java
@@ -0,0 +1,746 @@
+/*
+ * Copyright 2015, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNotSame;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+import com.google.common.util.concurrent.MoreExecutors;
+import com.google.common.util.concurrent.SettableFuture;
+
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.JUnit4;
+
+import java.util.ArrayDeque;
+import java.util.Queue;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.Executor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicReference;
+import java.util.logging.Handler;
+import java.util.logging.Level;
+import java.util.logging.LogRecord;
+import java.util.logging.Logger;
+
+/**
+ * Tests for {@link Context}.
+ */
+@RunWith(JUnit4.class)
+public class ContextTest {
+
+ private static final Context.Key<String> PET = Context.key("pet");
+ private static final Context.Key<String> FOOD = Context.keyWithDefault("food", "lasagna");
+ private static final Context.Key<String> COLOR = Context.key("color");
+ private static final Context.Key<Object> FAVORITE = Context.key("favorite");
+
+ private Context listenerNotifedContext;
+ private CountDownLatch deadlineLatch = new CountDownLatch(1);
+ private Context.CancellationListener cancellationListener = new Context.CancellationListener() {
+ @Override
+ public void cancelled(Context context) {
+ listenerNotifedContext = context;
+ deadlineLatch.countDown();
+ }
+ };
+
+ private Context observed;
+ private Runnable runner = new Runnable() {
+ @Override
+ public void run() {
+ observed = Context.current();
+ }
+ };
+ private ScheduledExecutorService scheduler = Executors.newSingleThreadScheduledExecutor();
+
+ @Before
+ public void setUp() throws Exception {
+ Context.ROOT.attach();
+ }
+
+ @After
+ public void tearDown() throws Exception {
+ scheduler.shutdown();
+ }
+
+ @Test
+ public void rootIsInitialContext() {
+ assertNotNull(Context.ROOT);
+ assertTrue(Context.ROOT.isCurrent());
+ }
+
+ @Test
+ public void rootIsAlwaysBound() throws Exception {
+ final SettableFuture<Boolean> rootIsBound = SettableFuture.create();
+ new Thread(new Runnable() {
+ @Override
+ public void run() {
+ rootIsBound.set(Context.current() == Context.ROOT);
+ }
+ }).start();
+ assertTrue(rootIsBound.get(5, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void rootCanBeAttached() {
+ Context fork = Context.ROOT.fork();
+ fork.attach();
+ Context.ROOT.attach();
+ assertTrue(Context.ROOT.isCurrent());
+ fork.attach();
+ assertTrue(fork.isCurrent());
+ }
+
+ @Test
+ public void rootCanNeverHaveAListener() {
+ Context root = Context.current();
+ root.addListener(cancellationListener, MoreExecutors.directExecutor());
+ assertEquals(0, root.listenerCount());
+ }
+
+ @Test
+ public void rootIsNotCancelled() {
+ assertFalse(Context.ROOT.isCancelled());
+ assertNull(Context.ROOT.cancellationCause());
+ }
+
+ @Test
+ public void attachedCancellableContextCannotBeCastFromCurrent() {
+ Context initial = Context.current();
+ Context.CancellableContext base = initial.withCancellation();
+ base.attach();
+ assertFalse(Context.current() instanceof Context.CancellableContext);
+ assertNotSame(base, Context.current());
+ assertNotSame(initial, Context.current());
+ base.detachAndCancel(initial, null);
+ assertSame(initial, Context.current());
+ }
+
+ @Test
+ public void attachingNonCurrentReturnsCurrent() {
+ Context initial = Context.current();
+ Context base = initial.withValue(PET, "dog");
+ assertSame(initial, base.attach());
+ assertSame(base, initial.attach());
+ }
+
+ @Test
+ public void detachingNonCurrentLogsSevereMessage() {
+ final AtomicReference<LogRecord> logRef = new AtomicReference<LogRecord>();
+ Handler handler = new Handler() {
+ @Override
+ public void publish(LogRecord record) {
+ logRef.set(record);
+ }
+
+ @Override
+ public void flush() {
+ }
+
+ @Override
+ public void close() throws SecurityException {
+ }
+ };
+ Logger logger = Logger.getLogger(Context.class.getName());
+ try {
+ logger.addHandler(handler);
+ Context initial = Context.current();
+ Context base = initial.withValue(PET, "dog");
+ // Base is not attached
+ base.detach(initial);
+ assertSame(initial, Context.current());
+ assertNotNull(logRef.get());
+ assertEquals(Level.SEVERE, logRef.get().getLevel());
+ } finally {
+ logger.removeHandler(handler);
+ }
+ }
+
+ @Test
+ public void valuesAndOverrides() {
+ Context base = Context.current().withValue(PET, "dog");
+ Context child = base.withValues(PET, null, FOOD, "cheese");
+
+ base.attach();
+
+ assertEquals("dog", PET.get());
+ assertEquals("lasagna", FOOD.get());
+ assertNull(COLOR.get());
+
+ child.attach();
+
+ assertNull(PET.get());
+ assertEquals("cheese", FOOD.get());
+ assertNull(COLOR.get());
+
+ child.detach(base);
+
+ // Should have values from base
+ assertEquals("dog", PET.get());
+ assertEquals("lasagna", FOOD.get());
+ assertNull(COLOR.get());
+
+ base.detach(Context.ROOT);
+
+ assertNull(PET.get());
+ assertEquals("lasagna", FOOD.get());
+ assertNull(COLOR.get());
+ }
+
+ @Test
+ public void withValuesThree() {
+ Object fav = new Object();
+ Context base = Context.current().withValues(PET, "dog", COLOR, "blue");
+ Context child = base.withValues(PET, "cat", FOOD, "cheese", FAVORITE, fav);
+
+ child.attach();
+
+ assertEquals("cat", PET.get());
+ assertEquals("cheese", FOOD.get());
+ assertEquals("blue", COLOR.get());
+ assertEquals(fav, FAVORITE.get());
+
+ base.attach();
+ }
+
+ @Test
+ public void cancelReturnsFalseIfAlreadyCancelled() {
+ Context.CancellableContext base = Context.current().withCancellation();
+ assertTrue(base.cancel(null));
+ assertTrue(base.isCancelled());
+ assertFalse(base.cancel(null));
+ }
+
+ @Test
+ public void notifyListenersOnCancel() {
+ class SetContextCancellationListener implements Context.CancellationListener {
+ private final AtomicReference<Context> observed;
+
+ public SetContextCancellationListener(AtomicReference<Context> observed) {
+ this.observed = observed;
+ }
+
+ @Override
+ public void cancelled(Context context) {
+ observed.set(context);
+ }
+ }
+
+ Context.CancellableContext base = Context.current().withCancellation();
+ final AtomicReference<Context> observed1 = new AtomicReference<Context>();
+ base.addListener(new SetContextCancellationListener(observed1), MoreExecutors.directExecutor());
+ final AtomicReference<Context> observed2 = new AtomicReference<Context>();
+ base.addListener(new SetContextCancellationListener(observed2), MoreExecutors.directExecutor());
+ assertNull(observed1.get());
+ assertNull(observed2.get());
+ base.cancel(null);
+ assertSame(base, observed1.get());
+ assertSame(base, observed2.get());
+
+ final AtomicReference<Context> observed3 = new AtomicReference<Context>();
+ base.addListener(new SetContextCancellationListener(observed3), MoreExecutors.directExecutor());
+ assertSame(base, observed3.get());
+ }
+
+ @Test
+ public void exceptionOfExecutorDoesntThrow() {
+ final AtomicReference<Throwable> loggedThrowable = new AtomicReference<Throwable>();
+ Handler logHandler = new Handler() {
+ @Override
+ public void publish(LogRecord record) {
+ Throwable thrown = record.getThrown();
+ if (thrown != null) {
+ if (loggedThrowable.get() == null) {
+ loggedThrowable.set(thrown);
+ } else {
+ loggedThrowable.set(new RuntimeException("Too many exceptions", thrown));
+ }
+ }
+ }
+
+ @Override
+ public void close() {}
+
+ @Override
+ public void flush() {}
+ };
+ Logger logger = Logger.getLogger(Context.class.getName());
+ logger.addHandler(logHandler);
+ try {
+ Context.CancellableContext base = Context.current().withCancellation();
+ final AtomicReference<Runnable> observed1 = new AtomicReference<Runnable>();
+ final Error err = new Error();
+ base.addListener(cancellationListener, new Executor() {
+ @Override
+ public void execute(Runnable runnable) {
+ observed1.set(runnable);
+ throw err;
+ }
+ });
+ assertNull(observed1.get());
+ assertNull(loggedThrowable.get());
+ base.cancel(null);
+ assertNotNull(observed1.get());
+ assertSame(err, loggedThrowable.get());
+
+ final Error err2 = new Error();
+ loggedThrowable.set(null);
+ final AtomicReference<Runnable> observed2 = new AtomicReference<Runnable>();
+ base.addListener(cancellationListener, new Executor() {
+ @Override
+ public void execute(Runnable runnable) {
+ observed2.set(runnable);
+ throw err2;
+ }
+ });
+ assertNotNull(observed2.get());
+ assertSame(err2, loggedThrowable.get());
+ } finally {
+ logger.removeHandler(logHandler);
+ }
+ }
+
+ @Test
+ public void cascadingCancellationNotifiesChild() {
+ // Root is not cancellable so we can't cascade from it
+ Context.CancellableContext base = Context.current().withCancellation();
+ assertEquals(0, base.listenerCount());
+ Context child = base.withValue(FOOD, "lasagna");
+ assertEquals(0, child.listenerCount());
+ child.addListener(cancellationListener, MoreExecutors.directExecutor());
+ assertEquals(1, child.listenerCount());
+ assertEquals(1, base.listenerCount()); // child is now listening to base
+ assertFalse(base.isCancelled());
+ assertFalse(child.isCancelled());
+ IllegalStateException cause = new IllegalStateException();
+ base.cancel(cause);
+ assertTrue(base.isCancelled());
+ assertSame(cause, base.cancellationCause());
+ assertSame(child, listenerNotifedContext);
+ assertTrue(child.isCancelled());
+ assertSame(cause, child.cancellationCause());
+ assertEquals(0, base.listenerCount());
+ assertEquals(0, child.listenerCount());
+ }
+
+ @Test
+ public void cascadingCancellationWithoutListener() {
+ Context.CancellableContext base = Context.current().withCancellation();
+ Context child = base.withCancellation();
+ Throwable t = new Throwable();
+ base.cancel(t);
+ assertTrue(child.isCancelled());
+ assertSame(t, child.cancellationCause());
+ }
+
+ @Test
+ public void cancellableContextIsAttached() {
+ Context.CancellableContext base = Context.current().withValue(FOOD, "fish").withCancellation();
+ assertFalse(base.isCurrent());
+ base.attach();
+
+ Context attached = Context.current();
+ assertSame("fish", FOOD.get());
+ assertFalse(attached.isCancelled());
+ assertNull(attached.cancellationCause());
+ assertTrue(attached.canBeCancelled());
+ assertTrue(attached.isCurrent());
+ assertTrue(base.isCurrent());
+
+ attached.addListener(cancellationListener, MoreExecutors.directExecutor());
+ Throwable t = new Throwable();
+ base.cancel(t);
+ assertTrue(attached.isCancelled());
+ assertSame(t, attached.cancellationCause());
+ assertSame(attached, listenerNotifedContext);
+
+ Context.ROOT.attach();
+ }
+
+ @Test
+ public void cancellableContextCascadesFromCancellableParent() {
+ // Root is not cancellable so we can't cascade from it
+ Context.CancellableContext base = Context.current().withCancellation();
+ Context child = base.withCancellation();
+ child.addListener(cancellationListener, MoreExecutors.directExecutor());
+ assertFalse(base.isCancelled());
+ assertFalse(child.isCancelled());
+ IllegalStateException cause = new IllegalStateException();
+ base.cancel(cause);
+ assertTrue(base.isCancelled());
+ assertSame(cause, base.cancellationCause());
+ assertSame(child, listenerNotifedContext);
+ assertTrue(child.isCancelled());
+ assertSame(cause, child.cancellationCause());
+ assertEquals(0, base.listenerCount());
+ assertEquals(0, child.listenerCount());
+ }
+
+ @Test
+ public void nonCascadingCancellationDoesNotNotifyForked() {
+ Context.CancellableContext base = Context.current().withCancellation();
+ Context fork = base.fork();
+ fork.addListener(cancellationListener, MoreExecutors.directExecutor());
+ assertEquals(0, base.listenerCount());
+ assertEquals(0, fork.listenerCount());
+ assertTrue(base.cancel(new Throwable()));
+ assertNull(listenerNotifedContext);
+ assertFalse(fork.isCancelled());
+ assertNull(fork.cancellationCause());
+ }
+
+ @Test
+ public void testWrapRunnable() throws Exception {
+ Context base = Context.current().withValue(PET, "cat");
+ Context current = Context.current().withValue(PET, "fish");
+ current.attach();
+
+ base.wrap(runner).run();
+ assertSame(base, observed);
+ assertSame(current, Context.current());
+
+ current.wrap(runner).run();
+ assertSame(current, observed);
+ assertSame(current, Context.current());
+
+ final Error err = new Error();
+ try {
+ base.wrap(new Runnable() {
+ @Override
+ public void run() {
+ throw err;
+ }
+ }).run();
+ fail("Expected exception");
+ } catch (Error ex) {
+ assertSame(err, ex);
+ }
+ assertSame(current, Context.current());
+
+ current.detach(Context.ROOT);
+ }
+
+ @Test
+ public void testWrapCallable() throws Exception {
+ Context base = Context.current().withValue(PET, "cat");
+ Context current = Context.current().withValue(PET, "fish");
+ current.attach();
+
+ final Object ret = new Object();
+ Callable<Object> callable = new Callable<Object>() {
+ @Override
+ public Object call() {
+ runner.run();
+ return ret;
+ }
+ };
+
+ assertSame(ret, base.wrap(callable).call());
+ assertSame(base, observed);
+ assertSame(current, Context.current());
+
+ assertSame(ret, current.wrap(callable).call());
+ assertSame(current, observed);
+ assertSame(current, Context.current());
+
+ final Error err = new Error();
+ try {
+ base.wrap(new Callable<Object>() {
+ @Override
+ public Object call() {
+ throw err;
+ }
+ }).call();
+ fail("Excepted exception");
+ } catch (Error ex) {
+ assertSame(err, ex);
+ }
+ assertSame(current, Context.current());
+
+ current.detach(Context.ROOT);
+ }
+
+ @Test
+ public void currentContextExecutor() throws Exception {
+ QueuedExecutor queuedExecutor = new QueuedExecutor();
+ Executor executor = Context.currentContextExecutor(queuedExecutor);
+ Context base = Context.current().withValue(PET, "cat");
+ Context previous = base.attach();
+ try {
+ executor.execute(runner);
+ } finally {
+ base.detach(previous);
+ }
+ assertEquals(1, queuedExecutor.runnables.size());
+ queuedExecutor.runnables.remove().run();
+ assertSame(base, observed);
+ }
+
+ @Test
+ public void fixedContextExecutor() throws Exception {
+ Context base = Context.current().withValue(PET, "cat");
+ QueuedExecutor queuedExecutor = new QueuedExecutor();
+ base.fixedContextExecutor(queuedExecutor).execute(runner);
+ assertEquals(1, queuedExecutor.runnables.size());
+ queuedExecutor.runnables.remove().run();
+ assertSame(base, observed);
+ }
+
+ @Test
+ public void typicalTryFinallyHandling() throws Exception {
+ Context base = Context.current().withValue(COLOR, "blue");
+ Context previous = base.attach();
+ try {
+ assertTrue(base.isCurrent());
+ // Do something
+ } finally {
+ base.detach(previous);
+ }
+ assertFalse(base.isCurrent());
+ }
+
+ @Test
+ public void typicalCancellableTryCatchFinallyHandling() throws Exception {
+ Context.CancellableContext base = Context.current().withCancellation();
+ Context previous = base.attach();
+ try {
+ // Do something
+ throw new IllegalStateException("Argh");
+ } catch (IllegalStateException ise) {
+ base.cancel(ise);
+ } finally {
+ base.detachAndCancel(previous, null);
+ }
+ assertTrue(base.isCancelled());
+ assertNotNull(base.cancellationCause());
+ }
+
+ @Test
+ public void rootHasNoDeadline() {
+ assertNull(Context.ROOT.getDeadline());
+ }
+
+ @Test
+ public void contextWithDeadlineHasDeadline() {
+ Context.CancellableContext cancellableContext =
+ Context.ROOT.withDeadlineAfter(1, TimeUnit.SECONDS, scheduler);
+ assertNotNull(cancellableContext.getDeadline());
+ }
+
+ @Test
+ public void earlierParentDeadlineTakesPrecedenceOverLaterChildDeadline() throws Exception {
+ final Deadline sooner = Deadline.after(100, TimeUnit.MILLISECONDS);
+ final Deadline later = Deadline.after(1, TimeUnit.MINUTES);
+ Context.CancellableContext parent = Context.ROOT.withDeadline(sooner, scheduler);
+ Context.CancellableContext child = parent.withDeadline(later, scheduler);
+ assertSame(parent.getDeadline(), sooner);
+ assertSame(child.getDeadline(), sooner);
+ final CountDownLatch latch = new CountDownLatch(1);
+ final AtomicReference<Exception> error = new AtomicReference<Exception>();
+ child.addListener(new Context.CancellationListener() {
+ @Override
+ public void cancelled(Context context) {
+ try {
+ assertTrue(sooner.isExpired());
+ assertFalse(later.isExpired());
+ } catch (Exception e) {
+ error.set(e);
+ }
+ latch.countDown();
+ }
+ }, MoreExecutors.directExecutor());
+ latch.await(3, TimeUnit.SECONDS);
+ if (error.get() != null) {
+ throw error.get();
+ }
+ }
+
+ @Test
+ public void earlierChldDeadlineTakesPrecedenceOverLaterParentDeadline() {
+ Deadline sooner = Deadline.after(1, TimeUnit.HOURS);
+ Deadline later = Deadline.after(1, TimeUnit.DAYS);
+ Context.CancellableContext parent = Context.ROOT.withDeadline(later, scheduler);
+ Context.CancellableContext child = parent.withDeadline(sooner, scheduler);
+ assertSame(parent.getDeadline(), later);
+ assertSame(child.getDeadline(), sooner);
+ }
+
+ @Test
+ public void forkingContextDoesNotCarryDeadline() {
+ Deadline deadline = Deadline.after(1, TimeUnit.HOURS);
+ Context.CancellableContext parent = Context.ROOT.withDeadline(deadline, scheduler);
+ Context fork = parent.fork();
+ assertNull(fork.getDeadline());
+ }
+
+ @Test
+ public void cancellationDoesNotExpireDeadline() {
+ Deadline deadline = Deadline.after(1, TimeUnit.HOURS);
+ Context.CancellableContext parent = Context.ROOT.withDeadline(deadline, scheduler);
+ parent.cancel(null);
+ assertFalse(deadline.isExpired());
+ }
+
+ @Test
+ public void absoluteDeadlineTriggersAndPropagates() throws Exception {
+ Context base = Context.current().withDeadline(Deadline.after(1, TimeUnit.SECONDS), scheduler);
+ Context child = base.withValue(FOOD, "lasagna");
+ child.addListener(cancellationListener, MoreExecutors.directExecutor());
+ assertFalse(base.isCancelled());
+ assertFalse(child.isCancelled());
+ assertTrue(deadlineLatch.await(2, TimeUnit.SECONDS));
+ assertTrue(base.isCancelled());
+ assertTrue(base.cancellationCause() instanceof TimeoutException);
+ assertSame(child, listenerNotifedContext);
+ assertTrue(child.isCancelled());
+ assertSame(base.cancellationCause(), child.cancellationCause());
+ }
+
+ @Test
+ public void relativeDeadlineTriggersAndPropagates() throws Exception {
+ Context base = Context.current().withDeadline(Deadline.after(1, TimeUnit.SECONDS), scheduler);
+ Context child = base.withValue(FOOD, "lasagna");
+ child.addListener(cancellationListener, MoreExecutors.directExecutor());
+ assertFalse(base.isCancelled());
+ assertFalse(child.isCancelled());
+ assertTrue(deadlineLatch.await(2, TimeUnit.SECONDS));
+ assertTrue(base.isCancelled());
+ assertTrue(base.cancellationCause() instanceof TimeoutException);
+ assertSame(child, listenerNotifedContext);
+ assertTrue(child.isCancelled());
+ assertSame(base.cancellationCause(), child.cancellationCause());
+ }
+
+ @Test
+ public void innerDeadlineCompletesBeforeOuter() throws Exception {
+ Context base = Context.current().withDeadline(Deadline.after(2, TimeUnit.SECONDS), scheduler);
+ Context child = base.withDeadline(Deadline.after(1, TimeUnit.SECONDS), scheduler);
+ child.addListener(cancellationListener, MoreExecutors.directExecutor());
+ assertFalse(base.isCancelled());
+ assertFalse(child.isCancelled());
+ assertTrue(deadlineLatch.await(2, TimeUnit.SECONDS));
+ assertFalse(base.isCancelled());
+ assertSame(child, listenerNotifedContext);
+ assertTrue(child.isCancelled());
+ assertTrue(child.cancellationCause() instanceof TimeoutException);
+
+ deadlineLatch = new CountDownLatch(1);
+ base.addListener(cancellationListener, MoreExecutors.directExecutor());
+ assertTrue(deadlineLatch.await(2, TimeUnit.SECONDS));
+ assertTrue(base.isCancelled());
+ assertTrue(base.cancellationCause() instanceof TimeoutException);
+ assertNotSame(base.cancellationCause(), child.cancellationCause());
+ }
+
+ @Test
+ public void cancellationCancelsScheduledTask() {
+ ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1);
+ try {
+ assertEquals(0, executor.getQueue().size());
+ Context.CancellableContext base
+ = Context.current().withDeadline(Deadline.after(1, TimeUnit.DAYS), executor);
+ assertEquals(1, executor.getQueue().size());
+ base.cancel(null);
+ executor.purge();
+ assertEquals(0, executor.getQueue().size());
+ } finally {
+ executor.shutdown();
+ }
+ }
+
+ private static class QueuedExecutor implements Executor {
+ private final Queue<Runnable> runnables = new ArrayDeque<Runnable>();
+
+ @Override
+ public void execute(Runnable r) {
+ runnables.add(r);
+ }
+ }
+
+ @Test
+ public void childContextListenerNotifiedAfterParentListener() {
+ Context.CancellableContext parent = Context.current().withCancellation();
+ Context child = parent.withValue(COLOR, "red");
+ final AtomicBoolean childAfterParent = new AtomicBoolean();
+ final AtomicBoolean parentCalled = new AtomicBoolean();
+ child.addListener(new Context.CancellationListener() {
+ @Override
+ public void cancelled(Context context) {
+ if (parentCalled.get()) {
+ childAfterParent.set(true);
+ }
+ }
+ }, MoreExecutors.directExecutor());
+ parent.addListener(new Context.CancellationListener() {
+ @Override
+ public void cancelled(Context context) {
+ parentCalled.set(true);
+ }
+ }, MoreExecutors.directExecutor());
+ parent.cancel(null);
+ assertTrue(parentCalled.get());
+ assertTrue(childAfterParent.get());
+ }
+
+ @Test
+ public void expiredDeadlineShouldCancelContextImmediately() {
+ Context parent = Context.current();
+ assertFalse(parent.isCancelled());
+
+ Context.CancellableContext context = parent.withDeadlineAfter(0, TimeUnit.SECONDS, scheduler);
+ assertTrue(context.isCancelled());
+ assertThat(context.cancellationCause(), instanceOf(TimeoutException.class));
+
+ assertFalse(parent.isCancelled());
+ Deadline deadline = Deadline.after(-10, TimeUnit.SECONDS);
+ assertTrue(deadline.isExpired());
+ context = parent.withDeadline(deadline, scheduler);
+ assertTrue(context.isCancelled());
+ assertThat(context.cancellationCause(), instanceOf(TimeoutException.class));
+ }
+}
diff --git a/context/src/test/java/io/grpc/DeadlineTest.java b/context/src/test/java/io/grpc/DeadlineTest.java
new file mode 100644
index 000000000..f85edf18c
--- /dev/null
+++ b/context/src/test/java/io/grpc/DeadlineTest.java
@@ -0,0 +1,287 @@
+/*
+ * Copyright 2016, Google Inc. All rights reserved.
+ *
+ * Redistribution and use in source and binary forms, with or without
+ * modification, are permitted provided that the following conditions are
+ * met:
+ *
+ * * Redistributions of source code must retain the above copyright
+ * notice, this list of conditions and the following disclaimer.
+ * * Redistributions in binary form must reproduce the above
+ * copyright notice, this list of conditions and the following disclaimer
+ * in the documentation and/or other materials provided with the
+ * distribution.
+ *
+ * * Neither the name of Google Inc. nor the names of its
+ * contributors may be used to endorse or promote products derived from
+ * this software without specific prior written permission.
+ *
+ * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
+ * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
+ * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
+ * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
+ * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
+ * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
+ * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
+ * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
+ * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
+ * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
+ * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
+ */
+
+package io.grpc;
+
+import static com.google.common.truth.Truth.assertAbout;
+import static io.grpc.testing.DeadlineSubject.deadline;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertSame;
+import static org.junit.Assert.assertTrue;
+import static org.mockito.Matchers.eq;
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.verify;
+
+import com.google.common.truth.Truth;
+
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+import org.junit.runners.Parameterized.Parameters;
+import org.mockito.ArgumentCaptor;
+
+import java.util.Arrays;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+/**
+ * Tests for {@link Context}.
+ */
+@RunWith(Parameterized.class)
+public class DeadlineTest {
+ /** Ticker epochs to vary testing. */
+ @Parameters
+ public static Iterable<Object[]> data() {
+ return Arrays.asList(new Object[][] {
+ // MAX_VALUE / 2 is important because the signs are generally the same for past and future
+ // deadlines.
+ {Long.MAX_VALUE / 2}, {0}, {Long.MAX_VALUE}, {Long.MIN_VALUE}
+ });
+ }
+
+ private FakeTicker ticker = new FakeTicker();
+
+ public DeadlineTest(long epoch) {
+ ticker.reset(epoch);
+ }
+
+ @Test
+ public void defaultTickerIsSystemTicker() {
+ Deadline d = Deadline.after(0, TimeUnit.SECONDS);
+ ticker.reset(System.nanoTime());
+ Deadline reference = Deadline.after(0, TimeUnit.SECONDS, ticker);
+ // Allow inaccuracy to account for system time advancing during test.
+ assertAbout(deadline()).that(d).isWithin(1, TimeUnit.SECONDS).of(reference);
+ }
+
+ @Test
+ public void timeCanOverflow() {
+ ticker.reset(Long.MAX_VALUE);
+ Deadline d = Deadline.after(10, TimeUnit.DAYS, ticker);
+ assertEquals(10, d.timeRemaining(TimeUnit.DAYS));
+ assertTrue(Deadline.after(0, TimeUnit.DAYS, ticker).isBefore(d));
+ assertFalse(d.isExpired());
+
+ ticker.increment(10, TimeUnit.DAYS);
+ assertTrue(d.isExpired());
+ }
+
+ @Test
+ public void timeCanUnderflow() {
+ ticker.reset(Long.MIN_VALUE);
+ Deadline d = Deadline.after(-10, TimeUnit.DAYS, ticker);
+ assertEquals(-10, d.timeRemaining(TimeUnit.DAYS));
+ assertTrue(d.isBefore(Deadline.after(0, TimeUnit.DAYS, ticker)));
+ assertTrue(d.isExpired());
+ }
+
+ @Test
+ public void deadlineClamps() {
+ Deadline d = Deadline.after(-300 * 365, TimeUnit.DAYS, ticker);
+ Deadline d2 = Deadline.after(300 * 365, TimeUnit.DAYS, ticker);
+ assertTrue(d.isBefore(d2));
+
+ Deadline d3 = Deadline.after(-200 * 365, TimeUnit.DAYS, ticker);
+ // d and d3 are equal
+ assertFalse(d.isBefore(d3));
+ assertFalse(d3.isBefore(d));
+ }
+
+ @Test
+ public void immediateDeadlineIsExpired() {
+ Deadline deadline = Deadline.after(0, TimeUnit.SECONDS, ticker);
+ assertTrue(deadline.isExpired());
+ }
+
+ @Test
+ public void shortDeadlineEventuallyExpires() throws Exception {
+ Deadline d = Deadline.after(100, TimeUnit.MILLISECONDS, ticker);
+ assertTrue(d.timeRemaining(TimeUnit.NANOSECONDS) > 0);
+ assertFalse(d.isExpired());
+ ticker.increment(101, TimeUnit.MILLISECONDS);
+
+ assertTrue(d.isExpired());
+ assertEquals(-1, d.timeRemaining(TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void deadlineMatchesLongValue() {
+ assertEquals(10, Deadline.after(10, TimeUnit.MINUTES, ticker).timeRemaining(TimeUnit.MINUTES));
+ }
+
+ @Test
+ public void pastDeadlineIsExpired() {
+ Deadline d = Deadline.after(-1, TimeUnit.SECONDS, ticker);
+ assertTrue(d.isExpired());
+ assertEquals(-1000, d.timeRemaining(TimeUnit.MILLISECONDS));
+ }
+
+ @Test
+ public void deadlineDoesNotOverflowOrUnderflow() {
+ Deadline after = Deadline.after(Long.MAX_VALUE, TimeUnit.NANOSECONDS, ticker);
+ assertFalse(after.isExpired());
+
+ Deadline before = Deadline.after(-Long.MAX_VALUE, TimeUnit.NANOSECONDS, ticker);
+ assertTrue(before.isExpired());
+
+ assertTrue(before.isBefore(after));
+ }
+
+ @Test
+ public void beforeExpiredDeadlineIsExpired() {
+ Deadline base = Deadline.after(0, TimeUnit.SECONDS, ticker);
+ assertTrue(base.isExpired());
+ assertTrue(base.offset(-1, TimeUnit.SECONDS).isExpired());
+ }
+
+ @Test
+ public void beforeNotExpiredDeadlineMayBeExpired() {
+ Deadline base = Deadline.after(10, TimeUnit.SECONDS, ticker);
+ assertFalse(base.isExpired());
+ assertFalse(base.offset(-1, TimeUnit.SECONDS).isExpired());
+ assertTrue(base.offset(-11, TimeUnit.SECONDS).isExpired());
+ }
+
+ @Test
+ public void afterExpiredDeadlineMayBeExpired() {
+ Deadline base = Deadline.after(-10, TimeUnit.SECONDS, ticker);
+ assertTrue(base.isExpired());
+ assertTrue(base.offset(1, TimeUnit.SECONDS).isExpired());
+ assertFalse(base.offset(11, TimeUnit.SECONDS).isExpired());
+ }
+
+ @Test
+ public void zeroOffsetIsSameDeadline() {
+ Deadline base = Deadline.after(0, TimeUnit.SECONDS, ticker);
+ assertSame(base, base.offset(0, TimeUnit.SECONDS));
+ }
+
+ @Test
+ public void runOnEventualExpirationIsExecuted() throws Exception {
+ Deadline base = Deadline.after(50, TimeUnit.MICROSECONDS, ticker);
+ ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
+ final AtomicBoolean executed = new AtomicBoolean();
+ base.runOnExpiration(
+ new Runnable() {
+ @Override
+ public void run() {
+ executed.set(true);
+ }
+ }, mockScheduler);
+ assertFalse(executed.get());
+ ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(mockScheduler).schedule(runnableCaptor.capture(), eq(50000L), eq(TimeUnit.NANOSECONDS));
+ runnableCaptor.getValue().run();
+ assertTrue(executed.get());
+ }
+
+ @Test
+ public void runOnAlreadyExpiredIsExecutedOnExecutor() throws Exception {
+ Deadline base = Deadline.after(0, TimeUnit.MICROSECONDS, ticker);
+ ScheduledExecutorService mockScheduler = mock(ScheduledExecutorService.class);
+ final AtomicBoolean executed = new AtomicBoolean();
+ base.runOnExpiration(
+ new Runnable() {
+ @Override
+ public void run() {
+ executed.set(true);
+ }
+ }, mockScheduler);
+ assertFalse(executed.get());
+ ArgumentCaptor<Runnable> runnableCaptor = ArgumentCaptor.forClass(Runnable.class);
+ verify(mockScheduler).schedule(runnableCaptor.capture(), eq(0L), eq(TimeUnit.NANOSECONDS));
+ runnableCaptor.getValue().run();
+ assertTrue(executed.get());
+ }
+
+ @Test
+ public void toString_exact() {
+ Deadline d = Deadline.after(0, TimeUnit.MILLISECONDS, ticker);
+ assertEquals("0 ns from now", d.toString());
+ }
+
+ @Test
+ public void toString_after() {
+ Deadline d = Deadline.after(-1, TimeUnit.MINUTES, ticker);
+ assertEquals("-60000000000 ns from now", d.toString());
+ }
+
+ @Test
+ public void compareTo_greater() {
+ Deadline d1 = Deadline.after(10, TimeUnit.SECONDS, ticker);
+ ticker.increment(1, TimeUnit.NANOSECONDS);
+ Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker);
+ Truth.assertThat(d2).isGreaterThan(d1);
+ }
+
+ @Test
+ public void compareTo_less() {
+ Deadline d1 = Deadline.after(10, TimeUnit.SECONDS, ticker);
+ ticker.increment(1, TimeUnit.NANOSECONDS);
+ Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker);
+ Truth.assertThat(d1).isLessThan(d2);
+ }
+
+ @Test
+ public void compareTo_same() {
+ Deadline d1 = Deadline.after(10, TimeUnit.SECONDS, ticker);
+ Deadline d2 = Deadline.after(10, TimeUnit.SECONDS, ticker);
+ Truth.assertThat(d1).isEquivalentAccordingToCompareTo(d2);
+ }
+
+ @Test
+ public void toString_before() {
+ Deadline d = Deadline.after(12, TimeUnit.MICROSECONDS, ticker);
+ assertEquals("12000 ns from now", d.toString());
+ }
+
+ private static class FakeTicker extends Deadline.Ticker {
+ private long time;
+
+ @Override
+ public long read() {
+ return time;
+ }
+
+ public void reset(long time) {
+ this.time = time;
+ }
+
+ public void increment(long period, TimeUnit unit) {
+ if (period < 0) {
+ throw new IllegalArgumentException();
+ }
+ this.time += unit.toNanos(period);
+ }
+ }
+}