aboutsummaryrefslogtreecommitdiff
path: root/core
diff options
context:
space:
mode:
authorCarl Mastrangelo <notcarl@google.com>2018-10-03 14:09:00 -0700
committerGitHub <noreply@github.com>2018-10-03 14:09:00 -0700
commit6b7fa40378464a0ce4d9986ca751d1219d591952 (patch)
tree0dcfdbabe51b8cab8da2f830a835a43d9110614d /core
parent2c9bdd1cdae170dd933446f33f9cc482fddb8b9b (diff)
downloadgrpc-grpc-java-6b7fa40378464a0ce4d9986ca751d1219d591952.tar.gz
core: name anonymous classes in ManagedChannel for clear stacktraces
Diffstat (limited to 'core')
-rw-r--r--core/src/main/java/io/grpc/internal/ManagedChannelImpl.java587
1 files changed, 319 insertions, 268 deletions
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
index 0278ce42a..e1f840ee3 100644
--- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
+++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java
@@ -125,13 +125,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
private final TimeProvider timeProvider;
private final int maxTraceEvents;
- private final ChannelExecutor channelExecutor = new ChannelExecutor() {
- @Override
- void handleUncaughtThrowable(Throwable t) {
- super.handleUncaughtThrowable(t);
- panic(t);
- }
- };
+ private final ChannelExecutor channelExecutor = new PanicChannelExecutor();
private boolean fullStreamDecompression;
@@ -236,34 +230,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
// Called from channelExecutor
private final ManagedClientTransport.Listener delayedTransportListener =
- new ManagedClientTransport.Listener() {
- @Override
- public void transportShutdown(Status s) {
- checkState(shutdown.get(), "Channel must have been shut down");
- }
-
- @Override
- public void transportReady() {
- // Don't care
- }
-
- @Override
- public void transportInUse(final boolean inUse) {
- inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
- }
-
- @Override
- public void transportTerminated() {
- checkState(shutdown.get(), "Channel must have been shut down");
- terminating = true;
- shutdownNameResolverAndLoadBalancer(false);
- // No need to call channelStateManager since we are already in SHUTDOWN state.
- // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them
- // here.
- maybeShutdownNowSubchannels();
- maybeTerminateChannel();
- }
- };
+ new DelayedTransportListener();
// Must be called from channelExecutor
private void maybeShutdownNowSubchannels() {
@@ -279,27 +246,12 @@ final class ManagedChannelImpl extends ManagedChannel implements
// Must be accessed from channelExecutor
@VisibleForTesting
- final InUseStateAggregator<Object> inUseStateAggregator =
- new InUseStateAggregator<Object>() {
- @Override
- void handleInUse() {
- exitIdleMode();
- }
-
- @Override
- void handleNotInUse() {
- if (shutdown.get()) {
- return;
- }
- rescheduleIdleTimer();
- }
- };
+ final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator();
@Override
public ListenableFuture<ChannelStats> getStats() {
final SettableFuture<ChannelStats> ret = SettableFuture.create();
- // subchannels and oobchannels can only be accessed from channelExecutor
- channelExecutor.executeLater(new Runnable() {
+ final class StatsFetcher implements Runnable {
@Override
public void run() {
ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder();
@@ -314,7 +266,10 @@ final class ManagedChannelImpl extends ManagedChannel implements
builder.setSubchannels(children);
ret.set(builder.build());
}
- }).drain();
+ }
+
+ // subchannels and oobchannels can only be accessed from channelExecutor
+ channelExecutor.executeLater(new StatsFetcher()).drain();
return ret;
}
@@ -463,7 +418,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
}
- private final ClientTransportProvider transportProvider = new ClientTransportProvider() {
+ private final class ChannelTransportProvider implements ClientTransportProvider {
@Override
public ClientTransport get(PickSubchannelArgs args) {
SubchannelPicker pickerCopy = subchannelPicker;
@@ -473,12 +428,14 @@ final class ManagedChannelImpl extends ManagedChannel implements
return delayedTransport;
}
if (pickerCopy == null) {
- channelExecutor.executeLater(new Runnable() {
- @Override
- public void run() {
- exitIdleMode();
- }
- }).drain();
+ final class ExitIdleModeForTransport implements Runnable {
+ @Override
+ public void run() {
+ exitIdleMode();
+ }
+ }
+
+ channelExecutor.executeLater(new ExitIdleModeForTransport()).drain();
return delayedTransport;
}
// There is no need to reschedule the idle timer here.
@@ -507,11 +464,21 @@ final class ManagedChannelImpl extends ManagedChannel implements
final Metadata headers,
final Context context) {
checkState(retryEnabled, "retry should be enabled");
- return new RetriableStream<ReqT>(
- method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit,
- getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(),
- callOptions.getOption(RETRY_POLICY_KEY), callOptions.getOption(HEDGING_POLICY_KEY),
- throttle) {
+ final class RetryStream extends RetriableStream<ReqT> {
+ RetryStream() {
+ super(
+ method,
+ headers,
+ channelBufferUsed,
+ perRpcBufferLimit,
+ channelBufferLimit,
+ getCallExecutor(callOptions),
+ transportFactory.getScheduledExecutorService(),
+ callOptions.getOption(RETRY_POLICY_KEY),
+ callOptions.getOption(HEDGING_POLICY_KEY),
+ throttle);
+ }
+
@Override
Status prestart() {
return uncommittedRetriableStreamsRegistry.add(this);
@@ -534,9 +501,13 @@ final class ManagedChannelImpl extends ManagedChannel implements
context.detach(origContext);
}
}
- };
+ }
+
+ return new RetryStream();
}
- };
+ }
+
+ private final ClientTransportProvider transportProvider = new ChannelTransportProvider();
private final Rescheduler idleTimer;
@@ -614,12 +585,14 @@ final class ManagedChannelImpl extends ManagedChannel implements
this.channelBufferLimit = builder.retryBufferSize;
this.perRpcBufferLimit = builder.perRpcBufferLimit;
- this.callTracerFactory = new CallTracer.Factory() {
+ final class ChannelCallTracerFactory implements CallTracer.Factory {
@Override
public CallTracer create() {
return new CallTracer(timeProvider);
}
- };
+ }
+
+ this.callTracerFactory = new ChannelCallTracerFactory();
channelCallTracer = callTracerFactory.create();
this.channelz = checkNotNull(builder.channelz);
channelz.addRootChannel(this);
@@ -687,7 +660,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
// delayedTransport.shutdown() may also add some tasks into the queue. But some things inside
// delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the
// channelExecutor's queue and should not be blocked, so we do not drain() immediately here.
- channelExecutor.executeLater(new Runnable() {
+ final class Shutdown implements Runnable {
@Override
public void run() {
if (channelTracer != null) {
@@ -699,15 +672,19 @@ final class ManagedChannelImpl extends ManagedChannel implements
}
channelStateManager.gotoState(SHUTDOWN);
}
- });
+ }
+
+ channelExecutor.executeLater(new Shutdown());
uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS);
- channelExecutor.executeLater(new Runnable() {
- @Override
- public void run() {
- cancelIdleTimer(/* permanent= */ true);
- }
- }).drain();
+ final class CancelIdleTimer implements Runnable {
+ @Override
+ public void run() {
+ cancelIdleTimer(/* permanent= */ true);
+ }
+ }
+
+ channelExecutor.executeLater(new CancelIdleTimer()).drain();
logger.log(Level.FINE, "[{0}] Shutting down", getLogId());
return this;
}
@@ -722,16 +699,18 @@ final class ManagedChannelImpl extends ManagedChannel implements
logger.log(Level.FINE, "[{0}] shutdownNow() called", getLogId());
shutdown();
uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS);
- channelExecutor.executeLater(new Runnable() {
- @Override
- public void run() {
- if (shutdownNowed) {
- return;
- }
- shutdownNowed = true;
- maybeShutdownNowSubchannels();
+ final class ShutdownNow implements Runnable {
+ @Override
+ public void run() {
+ if (shutdownNowed) {
+ return;
}
- }).drain();
+ shutdownNowed = true;
+ maybeShutdownNowSubchannels();
+ }
+ }
+
+ channelExecutor.executeLater(new ShutdownNow()).drain();
return this;
}
@@ -745,16 +724,18 @@ final class ManagedChannelImpl extends ManagedChannel implements
panicMode = true;
cancelIdleTimer(/* permanent= */ true);
shutdownNameResolverAndLoadBalancer(false);
- SubchannelPicker newPicker = new SubchannelPicker() {
- final PickResult panicPickResult =
+ final class PanicSubchannelPicker extends SubchannelPicker {
+ private final PickResult panicPickResult =
PickResult.withDrop(
Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t));
+
@Override
public PickResult pickSubchannel(PickSubchannelArgs args) {
return panicPickResult;
}
- };
- updateSubchannelPicker(newPicker);
+ }
+
+ updateSubchannelPicker(new PanicSubchannelPicker());
if (channelTracer != null) {
channelTracer.reportEvent(
new ChannelTrace.Event.Builder()
@@ -863,60 +844,61 @@ final class ManagedChannelImpl extends ManagedChannel implements
public ConnectivityState getState(boolean requestConnection) {
ConnectivityState savedChannelState = channelStateManager.getState();
if (requestConnection && savedChannelState == IDLE) {
- channelExecutor.executeLater(
- new Runnable() {
- @Override
- public void run() {
- exitIdleMode();
- if (subchannelPicker != null) {
- subchannelPicker.requestConnection();
- }
- }
- }).drain();
+ final class RequestConnection implements Runnable {
+ @Override
+ public void run() {
+ exitIdleMode();
+ if (subchannelPicker != null) {
+ subchannelPicker.requestConnection();
+ }
+ }
+ }
+
+ channelExecutor.executeLater(new RequestConnection()).drain();
}
return savedChannelState;
}
@Override
public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) {
- channelExecutor.executeLater(
- new Runnable() {
- @Override
- public void run() {
- channelStateManager.notifyWhenStateChanged(callback, executor, source);
- }
- }).drain();
+ final class NotifyStateChanged implements Runnable {
+ @Override
+ public void run() {
+ channelStateManager.notifyWhenStateChanged(callback, executor, source);
+ }
+ }
+
+ channelExecutor.executeLater(new NotifyStateChanged()).drain();
}
@Override
public void resetConnectBackoff() {
- channelExecutor
- .executeLater(
- new Runnable() {
- @Override
- public void run() {
- if (shutdown.get()) {
- return;
- }
- if (nameResolverRefreshFuture != null) {
- checkState(nameResolverStarted, "name resolver must be started");
- cancelNameResolverBackoff();
- nameResolver.refresh();
- }
- for (InternalSubchannel subchannel : subchannels) {
- subchannel.resetConnectBackoff();
- }
- for (OobChannel oobChannel : oobChannels) {
- oobChannel.resetConnectBackoff();
- }
- }
- })
- .drain();
+ final class ResetConnectBackoff implements Runnable {
+ @Override
+ public void run() {
+ if (shutdown.get()) {
+ return;
+ }
+ if (nameResolverRefreshFuture != null) {
+ checkState(nameResolverStarted, "name resolver must be started");
+ cancelNameResolverBackoff();
+ nameResolver.refresh();
+ }
+ for (InternalSubchannel subchannel : subchannels) {
+ subchannel.resetConnectBackoff();
+ }
+ for (OobChannel oobChannel : oobChannels) {
+ oobChannel.resetConnectBackoff();
+ }
+ }
+ }
+
+ channelExecutor.executeLater(new ResetConnectBackoff()).drain();
}
@Override
public void enterIdle() {
- class PrepareToLoseNetworkRunnable implements Runnable {
+ final class PrepareToLoseNetworkRunnable implements Runnable {
@Override
public void run() {
if (shutdown.get() || lbHelper == null) {
@@ -940,7 +922,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
final Object lock = new Object();
@GuardedBy("lock")
- Collection<ClientStream> uncommittedRetriableStreams = new HashSet<ClientStream>();
+ Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>();
@GuardedBy("lock")
Status shutdownStatus;
@@ -1003,7 +985,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
shutdownStatusCopy = shutdownStatus;
// Because retriable transport is long-lived, we take this opportunity to down-size the
// hashmap.
- uncommittedRetriableStreams = new HashSet<ClientStream>();
+ uncommittedRetriableStreams = new HashSet<>();
}
}
@@ -1041,6 +1023,36 @@ final class ManagedChannelImpl extends ManagedChannel implements
if (maxTraceEvents > 0) {
subchannelTracer = new ChannelTracer(maxTraceEvents, subchannelCreationTime, "Subchannel");
}
+
+ final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback {
+ // All callbacks are run in channelExecutor
+ @Override
+ void onTerminated(InternalSubchannel is) {
+ subchannels.remove(is);
+ channelz.removeSubchannel(is);
+ maybeTerminateChannel();
+ }
+
+ @Override
+ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
+ handleInternalSubchannelState(newState);
+ // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
+ if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) {
+ lb.handleSubchannelState(subchannel, newState);
+ }
+ }
+
+ @Override
+ void onInUse(InternalSubchannel is) {
+ inUseStateAggregator.updateObjectInUse(is, true);
+ }
+
+ @Override
+ void onNotInUse(InternalSubchannel is) {
+ inUseStateAggregator.updateObjectInUse(is, false);
+ }
+ }
+
final InternalSubchannel internalSubchannel = new InternalSubchannel(
addressGroups,
authority(),
@@ -1050,34 +1062,7 @@ final class ManagedChannelImpl extends ManagedChannel implements
transportFactory.getScheduledExecutorService(),
stopwatchSupplier,
channelExecutor,
- new InternalSubchannel.Callback() {
- // All callbacks are run in channelExecutor
- @Override
- void onTerminated(InternalSubchannel is) {
- subchannels.remove(is);
- channelz.removeSubchannel(is);
- maybeTerminateChannel();
- }
-
- @Override
- void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
- handleInternalSubchannelState(newState);
- // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
- if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) {
- lb.handleSubchannelState(subchannel, newState);
- }
- }
-
- @Override
- void onInUse(InternalSubchannel is) {
- inUseStateAggregator.updateObjectInUse(is, true);
- }
-
- @Override
- void onNotInUse(InternalSubchannel is) {
- inUseStateAggregator.updateObjectInUse(is, false);
- }
- },
+ new ManagedInternalSubchannelCallback(),
channelz,
callTracerFactory.create(),
subchannelTracer,
@@ -1094,24 +1079,27 @@ final class ManagedChannelImpl extends ManagedChannel implements
subchannel.subchannel = internalSubchannel;
logger.log(Level.FINE, "[{0}] {1} created for {2}",
new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroups});
- runSerialized(new Runnable() {
- @Override
- public void run() {
- if (terminating) {
- // Because runSerialized() doesn't guarantee the runnable has been executed upon when
- // returning, the subchannel may still be returned to the balancer without being
- // shutdown even if "terminating" is already true. The subchannel will not be used in
- // this case, because delayed transport has terminated when "terminating" becomes
- // true, and no more requests will be sent to balancer beyond this point.
- internalSubchannel.shutdown(SHUTDOWN_STATUS);
- }
- if (!terminated) {
- // If channel has not terminated, it will track the subchannel and block termination
- // for it.
- subchannels.add(internalSubchannel);
- }
+
+ final class AddSubchannel implements Runnable {
+ @Override
+ public void run() {
+ if (terminating) {
+ // Because runSerialized() doesn't guarantee the runnable has been executed upon when
+ // returning, the subchannel may still be returned to the balancer without being
+ // shutdown even if "terminating" is already true. The subchannel will not be used in
+ // this case, because delayed transport has terminated when "terminating" becomes
+ // true, and no more requests will be sent to balancer beyond this point.
+ internalSubchannel.shutdown(SHUTDOWN_STATUS);
}
- });
+ if (!terminated) {
+ // If channel has not terminated, it will track the subchannel and block termination
+ // for it.
+ subchannels.add(internalSubchannel);
+ }
+ }
+ }
+
+ runSerialized(new AddSubchannel());
return subchannel;
}
@@ -1120,30 +1108,30 @@ final class ManagedChannelImpl extends ManagedChannel implements
final ConnectivityState newState, final SubchannelPicker newPicker) {
checkNotNull(newState, "newState");
checkNotNull(newPicker, "newPicker");
-
- runSerialized(
- new Runnable() {
- @Override
- public void run() {
- if (LbHelperImpl.this != lbHelper) {
- return;
- }
- updateSubchannelPicker(newPicker);
- // It's not appropriate to report SHUTDOWN state from lb.
- // Ignore the case of newState == SHUTDOWN for now.
- if (newState != SHUTDOWN) {
- if (channelTracer != null) {
- channelTracer.reportEvent(
- new ChannelTrace.Event.Builder()
- .setDescription("Entering " + newState + " state")
- .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
- .setTimestampNanos(timeProvider.currentTimeNanos())
- .build());
- }
- channelStateManager.gotoState(newState);
- }
+ final class UpdateBalancingState implements Runnable {
+ @Override
+ public void run() {
+ if (LbHelperImpl.this != lbHelper) {
+ return;
+ }
+ updateSubchannelPicker(newPicker);
+ // It's not appropriate to report SHUTDOWN state from lb.
+ // Ignore the case of newState == SHUTDOWN for now.
+ if (newState != SHUTDOWN) {
+ if (channelTracer != null) {
+ channelTracer.reportEvent(
+ new ChannelTrace.Event.Builder()
+ .setDescription("Entering " + newState + " state")
+ .setSeverity(ChannelTrace.Event.Severity.CT_INFO)
+ .setTimestampNanos(timeProvider.currentTimeNanos())
+ .build());
}
- });
+ channelStateManager.gotoState(newState);
+ }
+ }
+ }
+
+ runSerialized(new UpdateBalancingState());
}
@Override
@@ -1176,26 +1164,28 @@ final class ManagedChannelImpl extends ManagedChannel implements
.build());
subchannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "Subchannel");
}
+ final class ManagedOobChannelCallback extends InternalSubchannel.Callback {
+ @Override
+ void onTerminated(InternalSubchannel is) {
+ oobChannels.remove(oobChannel);
+ channelz.removeSubchannel(is);
+ oobChannel.handleSubchannelTerminated();
+ maybeTerminateChannel();
+ }
+
+ @Override
+ void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
+ handleInternalSubchannelState(newState);
+ oobChannel.handleSubchannelStateChange(newState);
+ }
+ }
+
final InternalSubchannel internalSubchannel = new InternalSubchannel(
Collections.singletonList(addressGroup),
authority, userAgent, backoffPolicyProvider, transportFactory,
transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor,
// All callback methods are run from channelExecutor
- new InternalSubchannel.Callback() {
- @Override
- void onTerminated(InternalSubchannel is) {
- oobChannels.remove(oobChannel);
- channelz.removeSubchannel(is);
- oobChannel.handleSubchannelTerminated();
- maybeTerminateChannel();
- }
-
- @Override
- void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) {
- handleInternalSubchannelState(newState);
- oobChannel.handleSubchannelStateChange(newState);
- }
- },
+ new ManagedOobChannelCallback(),
channelz,
callTracerFactory.create(),
subchannelTracer,
@@ -1211,19 +1201,21 @@ final class ManagedChannelImpl extends ManagedChannel implements
channelz.addSubchannel(oobChannel);
channelz.addSubchannel(internalSubchannel);
oobChannel.setSubchannel(internalSubchannel);
- runSerialized(new Runnable() {
- @Override
- public void run() {
- if (terminating) {
- oobChannel.shutdown();
- }
- if (!terminated) {
- // If channel has not terminated, it will track the subchannel and block termination
- // for it.
- oobChannels.add(oobChannel);
- }
+ final class AddOobChannel implements Runnable {
+ @Override
+ public void run() {
+ if (terminating) {
+ oobChannel.shutdown();
+ }
+ if (!terminated) {
+ // If channel has not terminated, it will track the subchannel and block termination
+ // for it.
+ oobChannels.add(oobChannel);
}
- });
+ }
+ }
+
+ runSerialized(new AddOobChannel());
return oobChannel;
}
@@ -1332,41 +1324,40 @@ final class ManagedChannelImpl extends ManagedChannel implements
.build());
haveBackends = false;
}
- channelExecutor
- .executeLater(
- new Runnable() {
- @Override
- public void run() {
- // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
- if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
- return;
- }
- helper.lb.handleNameResolutionError(error);
- if (nameResolverRefreshFuture != null) {
- // The name resolver may invoke onError multiple times, but we only want to
- // schedule one backoff attempt
- // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
- // want to reset the backoff interval upon repeated onError() calls
- return;
- }
- if (nameResolverBackoffPolicy == null) {
- nameResolverBackoffPolicy = backoffPolicyProvider.get();
- }
- long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
- if (logger.isLoggable(Level.FINE)) {
- logger.log(
- Level.FINE,
- "[{0}] Scheduling DNS resolution backoff for {1} ns",
- new Object[] {logId, delayNanos});
- }
- nameResolverRefresh = new NameResolverRefresh();
- nameResolverRefreshFuture =
- transportFactory
- .getScheduledExecutorService()
- .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS);
- }
- })
- .drain();
+ final class NameResolverErrorHandler implements Runnable {
+ @Override
+ public void run() {
+ // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match.
+ if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) {
+ return;
+ }
+ helper.lb.handleNameResolutionError(error);
+ if (nameResolverRefreshFuture != null) {
+ // The name resolver may invoke onError multiple times, but we only want to
+ // schedule one backoff attempt
+ // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we
+ // want to reset the backoff interval upon repeated onError() calls
+ return;
+ }
+ if (nameResolverBackoffPolicy == null) {
+ nameResolverBackoffPolicy = backoffPolicyProvider.get();
+ }
+ long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos();
+ if (logger.isLoggable(Level.FINE)) {
+ logger.log(
+ Level.FINE,
+ "[{0}] Scheduling DNS resolution backoff for {1} ns",
+ new Object[] {logId, delayNanos});
+ }
+ nameResolverRefresh = new NameResolverRefresh();
+ nameResolverRefreshFuture =
+ transportFactory
+ .getScheduledExecutorService()
+ .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS);
+ }
+ }
+
+ channelExecutor.executeLater(new NameResolverErrorHandler()).drain();
}
}
@@ -1426,14 +1417,16 @@ final class ManagedChannelImpl extends ManagedChannel implements
// TODO(zhangkun83): consider a better approach
// (https://github.com/grpc/grpc-java/issues/2562).
if (!terminating) {
+ final class ShutdownSubchannel implements Runnable {
+ @Override
+ public void run() {
+ subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
+ }
+ }
+
delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule(
new LogExceptionRunnable(
- new Runnable() {
- @Override
- public void run() {
- subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS);
- }
- }), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
+ new ShutdownSubchannel()), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS);
return;
}
}
@@ -1470,4 +1463,62 @@ final class ManagedChannelImpl extends ManagedChannel implements
.add("target", target)
.toString();
}
+
+ private final class PanicChannelExecutor extends ChannelExecutor {
+ @Override
+ void handleUncaughtThrowable(Throwable t) {
+ super.handleUncaughtThrowable(t);
+ panic(t);
+ }
+ }
+
+ /**
+ * Called from channelExecutor.
+ */
+ private final class DelayedTransportListener implements ManagedClientTransport.Listener {
+ @Override
+ public void transportShutdown(Status s) {
+ checkState(shutdown.get(), "Channel must have been shut down");
+ }
+
+ @Override
+ public void transportReady() {
+ // Don't care
+ }
+
+ @Override
+ public void transportInUse(final boolean inUse) {
+ inUseStateAggregator.updateObjectInUse(delayedTransport, inUse);
+ }
+
+ @Override
+ public void transportTerminated() {
+ checkState(shutdown.get(), "Channel must have been shut down");
+ terminating = true;
+ shutdownNameResolverAndLoadBalancer(false);
+ // No need to call channelStateManager since we are already in SHUTDOWN state.
+ // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them
+ // here.
+ maybeShutdownNowSubchannels();
+ maybeTerminateChannel();
+ }
+ }
+
+ /**
+ * Must be accessed from channelExecutor.
+ */
+ private final class IdleModeStateAggregator extends InUseStateAggregator<Object> {
+ @Override
+ void handleInUse() {
+ exitIdleMode();
+ }
+
+ @Override
+ void handleNotInUse() {
+ if (shutdown.get()) {
+ return;
+ }
+ rescheduleIdleTimer();
+ }
+ }
}