diff options
author | Carl Mastrangelo <notcarl@google.com> | 2018-10-03 14:09:00 -0700 |
---|---|---|
committer | GitHub <noreply@github.com> | 2018-10-03 14:09:00 -0700 |
commit | 6b7fa40378464a0ce4d9986ca751d1219d591952 (patch) | |
tree | 0dcfdbabe51b8cab8da2f830a835a43d9110614d /core | |
parent | 2c9bdd1cdae170dd933446f33f9cc482fddb8b9b (diff) | |
download | grpc-grpc-java-6b7fa40378464a0ce4d9986ca751d1219d591952.tar.gz |
core: name anonymous classes in ManagedChannel for clear stacktraces
Diffstat (limited to 'core')
-rw-r--r-- | core/src/main/java/io/grpc/internal/ManagedChannelImpl.java | 587 |
1 files changed, 319 insertions, 268 deletions
diff --git a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java index 0278ce42a..e1f840ee3 100644 --- a/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java +++ b/core/src/main/java/io/grpc/internal/ManagedChannelImpl.java @@ -125,13 +125,7 @@ final class ManagedChannelImpl extends ManagedChannel implements private final TimeProvider timeProvider; private final int maxTraceEvents; - private final ChannelExecutor channelExecutor = new ChannelExecutor() { - @Override - void handleUncaughtThrowable(Throwable t) { - super.handleUncaughtThrowable(t); - panic(t); - } - }; + private final ChannelExecutor channelExecutor = new PanicChannelExecutor(); private boolean fullStreamDecompression; @@ -236,34 +230,7 @@ final class ManagedChannelImpl extends ManagedChannel implements // Called from channelExecutor private final ManagedClientTransport.Listener delayedTransportListener = - new ManagedClientTransport.Listener() { - @Override - public void transportShutdown(Status s) { - checkState(shutdown.get(), "Channel must have been shut down"); - } - - @Override - public void transportReady() { - // Don't care - } - - @Override - public void transportInUse(final boolean inUse) { - inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); - } - - @Override - public void transportTerminated() { - checkState(shutdown.get(), "Channel must have been shut down"); - terminating = true; - shutdownNameResolverAndLoadBalancer(false); - // No need to call channelStateManager since we are already in SHUTDOWN state. - // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them - // here. - maybeShutdownNowSubchannels(); - maybeTerminateChannel(); - } - }; + new DelayedTransportListener(); // Must be called from channelExecutor private void maybeShutdownNowSubchannels() { @@ -279,27 +246,12 @@ final class ManagedChannelImpl extends ManagedChannel implements // Must be accessed from channelExecutor @VisibleForTesting - final InUseStateAggregator<Object> inUseStateAggregator = - new InUseStateAggregator<Object>() { - @Override - void handleInUse() { - exitIdleMode(); - } - - @Override - void handleNotInUse() { - if (shutdown.get()) { - return; - } - rescheduleIdleTimer(); - } - }; + final InUseStateAggregator<Object> inUseStateAggregator = new IdleModeStateAggregator(); @Override public ListenableFuture<ChannelStats> getStats() { final SettableFuture<ChannelStats> ret = SettableFuture.create(); - // subchannels and oobchannels can only be accessed from channelExecutor - channelExecutor.executeLater(new Runnable() { + final class StatsFetcher implements Runnable { @Override public void run() { ChannelStats.Builder builder = new InternalChannelz.ChannelStats.Builder(); @@ -314,7 +266,10 @@ final class ManagedChannelImpl extends ManagedChannel implements builder.setSubchannels(children); ret.set(builder.build()); } - }).drain(); + } + + // subchannels and oobchannels can only be accessed from channelExecutor + channelExecutor.executeLater(new StatsFetcher()).drain(); return ret; } @@ -463,7 +418,7 @@ final class ManagedChannelImpl extends ManagedChannel implements } } - private final ClientTransportProvider transportProvider = new ClientTransportProvider() { + private final class ChannelTransportProvider implements ClientTransportProvider { @Override public ClientTransport get(PickSubchannelArgs args) { SubchannelPicker pickerCopy = subchannelPicker; @@ -473,12 +428,14 @@ final class ManagedChannelImpl extends ManagedChannel implements return delayedTransport; } if (pickerCopy == null) { - channelExecutor.executeLater(new Runnable() { - @Override - public void run() { - exitIdleMode(); - } - }).drain(); + final class ExitIdleModeForTransport implements Runnable { + @Override + public void run() { + exitIdleMode(); + } + } + + channelExecutor.executeLater(new ExitIdleModeForTransport()).drain(); return delayedTransport; } // There is no need to reschedule the idle timer here. @@ -507,11 +464,21 @@ final class ManagedChannelImpl extends ManagedChannel implements final Metadata headers, final Context context) { checkState(retryEnabled, "retry should be enabled"); - return new RetriableStream<ReqT>( - method, headers, channelBufferUsed, perRpcBufferLimit, channelBufferLimit, - getCallExecutor(callOptions), transportFactory.getScheduledExecutorService(), - callOptions.getOption(RETRY_POLICY_KEY), callOptions.getOption(HEDGING_POLICY_KEY), - throttle) { + final class RetryStream extends RetriableStream<ReqT> { + RetryStream() { + super( + method, + headers, + channelBufferUsed, + perRpcBufferLimit, + channelBufferLimit, + getCallExecutor(callOptions), + transportFactory.getScheduledExecutorService(), + callOptions.getOption(RETRY_POLICY_KEY), + callOptions.getOption(HEDGING_POLICY_KEY), + throttle); + } + @Override Status prestart() { return uncommittedRetriableStreamsRegistry.add(this); @@ -534,9 +501,13 @@ final class ManagedChannelImpl extends ManagedChannel implements context.detach(origContext); } } - }; + } + + return new RetryStream(); } - }; + } + + private final ClientTransportProvider transportProvider = new ChannelTransportProvider(); private final Rescheduler idleTimer; @@ -614,12 +585,14 @@ final class ManagedChannelImpl extends ManagedChannel implements this.channelBufferLimit = builder.retryBufferSize; this.perRpcBufferLimit = builder.perRpcBufferLimit; - this.callTracerFactory = new CallTracer.Factory() { + final class ChannelCallTracerFactory implements CallTracer.Factory { @Override public CallTracer create() { return new CallTracer(timeProvider); } - }; + } + + this.callTracerFactory = new ChannelCallTracerFactory(); channelCallTracer = callTracerFactory.create(); this.channelz = checkNotNull(builder.channelz); channelz.addRootChannel(this); @@ -687,7 +660,7 @@ final class ManagedChannelImpl extends ManagedChannel implements // delayedTransport.shutdown() may also add some tasks into the queue. But some things inside // delayedTransport.shutdown() like setting delayedTransport.shutdown = true are not run in the // channelExecutor's queue and should not be blocked, so we do not drain() immediately here. - channelExecutor.executeLater(new Runnable() { + final class Shutdown implements Runnable { @Override public void run() { if (channelTracer != null) { @@ -699,15 +672,19 @@ final class ManagedChannelImpl extends ManagedChannel implements } channelStateManager.gotoState(SHUTDOWN); } - }); + } + + channelExecutor.executeLater(new Shutdown()); uncommittedRetriableStreamsRegistry.onShutdown(SHUTDOWN_STATUS); - channelExecutor.executeLater(new Runnable() { - @Override - public void run() { - cancelIdleTimer(/* permanent= */ true); - } - }).drain(); + final class CancelIdleTimer implements Runnable { + @Override + public void run() { + cancelIdleTimer(/* permanent= */ true); + } + } + + channelExecutor.executeLater(new CancelIdleTimer()).drain(); logger.log(Level.FINE, "[{0}] Shutting down", getLogId()); return this; } @@ -722,16 +699,18 @@ final class ManagedChannelImpl extends ManagedChannel implements logger.log(Level.FINE, "[{0}] shutdownNow() called", getLogId()); shutdown(); uncommittedRetriableStreamsRegistry.onShutdownNow(SHUTDOWN_NOW_STATUS); - channelExecutor.executeLater(new Runnable() { - @Override - public void run() { - if (shutdownNowed) { - return; - } - shutdownNowed = true; - maybeShutdownNowSubchannels(); + final class ShutdownNow implements Runnable { + @Override + public void run() { + if (shutdownNowed) { + return; } - }).drain(); + shutdownNowed = true; + maybeShutdownNowSubchannels(); + } + } + + channelExecutor.executeLater(new ShutdownNow()).drain(); return this; } @@ -745,16 +724,18 @@ final class ManagedChannelImpl extends ManagedChannel implements panicMode = true; cancelIdleTimer(/* permanent= */ true); shutdownNameResolverAndLoadBalancer(false); - SubchannelPicker newPicker = new SubchannelPicker() { - final PickResult panicPickResult = + final class PanicSubchannelPicker extends SubchannelPicker { + private final PickResult panicPickResult = PickResult.withDrop( Status.INTERNAL.withDescription("Panic! This is a bug!").withCause(t)); + @Override public PickResult pickSubchannel(PickSubchannelArgs args) { return panicPickResult; } - }; - updateSubchannelPicker(newPicker); + } + + updateSubchannelPicker(new PanicSubchannelPicker()); if (channelTracer != null) { channelTracer.reportEvent( new ChannelTrace.Event.Builder() @@ -863,60 +844,61 @@ final class ManagedChannelImpl extends ManagedChannel implements public ConnectivityState getState(boolean requestConnection) { ConnectivityState savedChannelState = channelStateManager.getState(); if (requestConnection && savedChannelState == IDLE) { - channelExecutor.executeLater( - new Runnable() { - @Override - public void run() { - exitIdleMode(); - if (subchannelPicker != null) { - subchannelPicker.requestConnection(); - } - } - }).drain(); + final class RequestConnection implements Runnable { + @Override + public void run() { + exitIdleMode(); + if (subchannelPicker != null) { + subchannelPicker.requestConnection(); + } + } + } + + channelExecutor.executeLater(new RequestConnection()).drain(); } return savedChannelState; } @Override public void notifyWhenStateChanged(final ConnectivityState source, final Runnable callback) { - channelExecutor.executeLater( - new Runnable() { - @Override - public void run() { - channelStateManager.notifyWhenStateChanged(callback, executor, source); - } - }).drain(); + final class NotifyStateChanged implements Runnable { + @Override + public void run() { + channelStateManager.notifyWhenStateChanged(callback, executor, source); + } + } + + channelExecutor.executeLater(new NotifyStateChanged()).drain(); } @Override public void resetConnectBackoff() { - channelExecutor - .executeLater( - new Runnable() { - @Override - public void run() { - if (shutdown.get()) { - return; - } - if (nameResolverRefreshFuture != null) { - checkState(nameResolverStarted, "name resolver must be started"); - cancelNameResolverBackoff(); - nameResolver.refresh(); - } - for (InternalSubchannel subchannel : subchannels) { - subchannel.resetConnectBackoff(); - } - for (OobChannel oobChannel : oobChannels) { - oobChannel.resetConnectBackoff(); - } - } - }) - .drain(); + final class ResetConnectBackoff implements Runnable { + @Override + public void run() { + if (shutdown.get()) { + return; + } + if (nameResolverRefreshFuture != null) { + checkState(nameResolverStarted, "name resolver must be started"); + cancelNameResolverBackoff(); + nameResolver.refresh(); + } + for (InternalSubchannel subchannel : subchannels) { + subchannel.resetConnectBackoff(); + } + for (OobChannel oobChannel : oobChannels) { + oobChannel.resetConnectBackoff(); + } + } + } + + channelExecutor.executeLater(new ResetConnectBackoff()).drain(); } @Override public void enterIdle() { - class PrepareToLoseNetworkRunnable implements Runnable { + final class PrepareToLoseNetworkRunnable implements Runnable { @Override public void run() { if (shutdown.get() || lbHelper == null) { @@ -940,7 +922,7 @@ final class ManagedChannelImpl extends ManagedChannel implements final Object lock = new Object(); @GuardedBy("lock") - Collection<ClientStream> uncommittedRetriableStreams = new HashSet<ClientStream>(); + Collection<ClientStream> uncommittedRetriableStreams = new HashSet<>(); @GuardedBy("lock") Status shutdownStatus; @@ -1003,7 +985,7 @@ final class ManagedChannelImpl extends ManagedChannel implements shutdownStatusCopy = shutdownStatus; // Because retriable transport is long-lived, we take this opportunity to down-size the // hashmap. - uncommittedRetriableStreams = new HashSet<ClientStream>(); + uncommittedRetriableStreams = new HashSet<>(); } } @@ -1041,6 +1023,36 @@ final class ManagedChannelImpl extends ManagedChannel implements if (maxTraceEvents > 0) { subchannelTracer = new ChannelTracer(maxTraceEvents, subchannelCreationTime, "Subchannel"); } + + final class ManagedInternalSubchannelCallback extends InternalSubchannel.Callback { + // All callbacks are run in channelExecutor + @Override + void onTerminated(InternalSubchannel is) { + subchannels.remove(is); + channelz.removeSubchannel(is); + maybeTerminateChannel(); + } + + @Override + void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { + handleInternalSubchannelState(newState); + // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. + if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) { + lb.handleSubchannelState(subchannel, newState); + } + } + + @Override + void onInUse(InternalSubchannel is) { + inUseStateAggregator.updateObjectInUse(is, true); + } + + @Override + void onNotInUse(InternalSubchannel is) { + inUseStateAggregator.updateObjectInUse(is, false); + } + } + final InternalSubchannel internalSubchannel = new InternalSubchannel( addressGroups, authority(), @@ -1050,34 +1062,7 @@ final class ManagedChannelImpl extends ManagedChannel implements transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor, - new InternalSubchannel.Callback() { - // All callbacks are run in channelExecutor - @Override - void onTerminated(InternalSubchannel is) { - subchannels.remove(is); - channelz.removeSubchannel(is); - maybeTerminateChannel(); - } - - @Override - void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { - handleInternalSubchannelState(newState); - // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. - if (LbHelperImpl.this == ManagedChannelImpl.this.lbHelper) { - lb.handleSubchannelState(subchannel, newState); - } - } - - @Override - void onInUse(InternalSubchannel is) { - inUseStateAggregator.updateObjectInUse(is, true); - } - - @Override - void onNotInUse(InternalSubchannel is) { - inUseStateAggregator.updateObjectInUse(is, false); - } - }, + new ManagedInternalSubchannelCallback(), channelz, callTracerFactory.create(), subchannelTracer, @@ -1094,24 +1079,27 @@ final class ManagedChannelImpl extends ManagedChannel implements subchannel.subchannel = internalSubchannel; logger.log(Level.FINE, "[{0}] {1} created for {2}", new Object[] {getLogId(), internalSubchannel.getLogId(), addressGroups}); - runSerialized(new Runnable() { - @Override - public void run() { - if (terminating) { - // Because runSerialized() doesn't guarantee the runnable has been executed upon when - // returning, the subchannel may still be returned to the balancer without being - // shutdown even if "terminating" is already true. The subchannel will not be used in - // this case, because delayed transport has terminated when "terminating" becomes - // true, and no more requests will be sent to balancer beyond this point. - internalSubchannel.shutdown(SHUTDOWN_STATUS); - } - if (!terminated) { - // If channel has not terminated, it will track the subchannel and block termination - // for it. - subchannels.add(internalSubchannel); - } + + final class AddSubchannel implements Runnable { + @Override + public void run() { + if (terminating) { + // Because runSerialized() doesn't guarantee the runnable has been executed upon when + // returning, the subchannel may still be returned to the balancer without being + // shutdown even if "terminating" is already true. The subchannel will not be used in + // this case, because delayed transport has terminated when "terminating" becomes + // true, and no more requests will be sent to balancer beyond this point. + internalSubchannel.shutdown(SHUTDOWN_STATUS); } - }); + if (!terminated) { + // If channel has not terminated, it will track the subchannel and block termination + // for it. + subchannels.add(internalSubchannel); + } + } + } + + runSerialized(new AddSubchannel()); return subchannel; } @@ -1120,30 +1108,30 @@ final class ManagedChannelImpl extends ManagedChannel implements final ConnectivityState newState, final SubchannelPicker newPicker) { checkNotNull(newState, "newState"); checkNotNull(newPicker, "newPicker"); - - runSerialized( - new Runnable() { - @Override - public void run() { - if (LbHelperImpl.this != lbHelper) { - return; - } - updateSubchannelPicker(newPicker); - // It's not appropriate to report SHUTDOWN state from lb. - // Ignore the case of newState == SHUTDOWN for now. - if (newState != SHUTDOWN) { - if (channelTracer != null) { - channelTracer.reportEvent( - new ChannelTrace.Event.Builder() - .setDescription("Entering " + newState + " state") - .setSeverity(ChannelTrace.Event.Severity.CT_INFO) - .setTimestampNanos(timeProvider.currentTimeNanos()) - .build()); - } - channelStateManager.gotoState(newState); - } + final class UpdateBalancingState implements Runnable { + @Override + public void run() { + if (LbHelperImpl.this != lbHelper) { + return; + } + updateSubchannelPicker(newPicker); + // It's not appropriate to report SHUTDOWN state from lb. + // Ignore the case of newState == SHUTDOWN for now. + if (newState != SHUTDOWN) { + if (channelTracer != null) { + channelTracer.reportEvent( + new ChannelTrace.Event.Builder() + .setDescription("Entering " + newState + " state") + .setSeverity(ChannelTrace.Event.Severity.CT_INFO) + .setTimestampNanos(timeProvider.currentTimeNanos()) + .build()); } - }); + channelStateManager.gotoState(newState); + } + } + } + + runSerialized(new UpdateBalancingState()); } @Override @@ -1176,26 +1164,28 @@ final class ManagedChannelImpl extends ManagedChannel implements .build()); subchannelTracer = new ChannelTracer(maxTraceEvents, oobChannelCreationTime, "Subchannel"); } + final class ManagedOobChannelCallback extends InternalSubchannel.Callback { + @Override + void onTerminated(InternalSubchannel is) { + oobChannels.remove(oobChannel); + channelz.removeSubchannel(is); + oobChannel.handleSubchannelTerminated(); + maybeTerminateChannel(); + } + + @Override + void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { + handleInternalSubchannelState(newState); + oobChannel.handleSubchannelStateChange(newState); + } + } + final InternalSubchannel internalSubchannel = new InternalSubchannel( Collections.singletonList(addressGroup), authority, userAgent, backoffPolicyProvider, transportFactory, transportFactory.getScheduledExecutorService(), stopwatchSupplier, channelExecutor, // All callback methods are run from channelExecutor - new InternalSubchannel.Callback() { - @Override - void onTerminated(InternalSubchannel is) { - oobChannels.remove(oobChannel); - channelz.removeSubchannel(is); - oobChannel.handleSubchannelTerminated(); - maybeTerminateChannel(); - } - - @Override - void onStateChange(InternalSubchannel is, ConnectivityStateInfo newState) { - handleInternalSubchannelState(newState); - oobChannel.handleSubchannelStateChange(newState); - } - }, + new ManagedOobChannelCallback(), channelz, callTracerFactory.create(), subchannelTracer, @@ -1211,19 +1201,21 @@ final class ManagedChannelImpl extends ManagedChannel implements channelz.addSubchannel(oobChannel); channelz.addSubchannel(internalSubchannel); oobChannel.setSubchannel(internalSubchannel); - runSerialized(new Runnable() { - @Override - public void run() { - if (terminating) { - oobChannel.shutdown(); - } - if (!terminated) { - // If channel has not terminated, it will track the subchannel and block termination - // for it. - oobChannels.add(oobChannel); - } + final class AddOobChannel implements Runnable { + @Override + public void run() { + if (terminating) { + oobChannel.shutdown(); + } + if (!terminated) { + // If channel has not terminated, it will track the subchannel and block termination + // for it. + oobChannels.add(oobChannel); } - }); + } + } + + runSerialized(new AddOobChannel()); return oobChannel; } @@ -1332,41 +1324,40 @@ final class ManagedChannelImpl extends ManagedChannel implements .build()); haveBackends = false; } - channelExecutor - .executeLater( - new Runnable() { - @Override - public void run() { - // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. - if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { - return; - } - helper.lb.handleNameResolutionError(error); - if (nameResolverRefreshFuture != null) { - // The name resolver may invoke onError multiple times, but we only want to - // schedule one backoff attempt - // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we - // want to reset the backoff interval upon repeated onError() calls - return; - } - if (nameResolverBackoffPolicy == null) { - nameResolverBackoffPolicy = backoffPolicyProvider.get(); - } - long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos(); - if (logger.isLoggable(Level.FINE)) { - logger.log( - Level.FINE, - "[{0}] Scheduling DNS resolution backoff for {1} ns", - new Object[] {logId, delayNanos}); - } - nameResolverRefresh = new NameResolverRefresh(); - nameResolverRefreshFuture = - transportFactory - .getScheduledExecutorService() - .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS); - } - }) - .drain(); + final class NameResolverErrorHandler implements Runnable { + @Override + public void run() { + // Call LB only if it's not shutdown. If LB is shutdown, lbHelper won't match. + if (NameResolverListenerImpl.this.helper != ManagedChannelImpl.this.lbHelper) { + return; + } + helper.lb.handleNameResolutionError(error); + if (nameResolverRefreshFuture != null) { + // The name resolver may invoke onError multiple times, but we only want to + // schedule one backoff attempt + // TODO(ericgribkoff) Update contract of NameResolver.Listener or decide if we + // want to reset the backoff interval upon repeated onError() calls + return; + } + if (nameResolverBackoffPolicy == null) { + nameResolverBackoffPolicy = backoffPolicyProvider.get(); + } + long delayNanos = nameResolverBackoffPolicy.nextBackoffNanos(); + if (logger.isLoggable(Level.FINE)) { + logger.log( + Level.FINE, + "[{0}] Scheduling DNS resolution backoff for {1} ns", + new Object[] {logId, delayNanos}); + } + nameResolverRefresh = new NameResolverRefresh(); + nameResolverRefreshFuture = + transportFactory + .getScheduledExecutorService() + .schedule(nameResolverRefresh, delayNanos, TimeUnit.NANOSECONDS); + } + } + + channelExecutor.executeLater(new NameResolverErrorHandler()).drain(); } } @@ -1426,14 +1417,16 @@ final class ManagedChannelImpl extends ManagedChannel implements // TODO(zhangkun83): consider a better approach // (https://github.com/grpc/grpc-java/issues/2562). if (!terminating) { + final class ShutdownSubchannel implements Runnable { + @Override + public void run() { + subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS); + } + } + delayedShutdownTask = transportFactory.getScheduledExecutorService().schedule( new LogExceptionRunnable( - new Runnable() { - @Override - public void run() { - subchannel.shutdown(SUBCHANNEL_SHUTDOWN_STATUS); - } - }), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); + new ShutdownSubchannel()), SUBCHANNEL_SHUTDOWN_DELAY_SECONDS, TimeUnit.SECONDS); return; } } @@ -1470,4 +1463,62 @@ final class ManagedChannelImpl extends ManagedChannel implements .add("target", target) .toString(); } + + private final class PanicChannelExecutor extends ChannelExecutor { + @Override + void handleUncaughtThrowable(Throwable t) { + super.handleUncaughtThrowable(t); + panic(t); + } + } + + /** + * Called from channelExecutor. + */ + private final class DelayedTransportListener implements ManagedClientTransport.Listener { + @Override + public void transportShutdown(Status s) { + checkState(shutdown.get(), "Channel must have been shut down"); + } + + @Override + public void transportReady() { + // Don't care + } + + @Override + public void transportInUse(final boolean inUse) { + inUseStateAggregator.updateObjectInUse(delayedTransport, inUse); + } + + @Override + public void transportTerminated() { + checkState(shutdown.get(), "Channel must have been shut down"); + terminating = true; + shutdownNameResolverAndLoadBalancer(false); + // No need to call channelStateManager since we are already in SHUTDOWN state. + // Until LoadBalancer is shutdown, it may still create new subchannels. We catch them + // here. + maybeShutdownNowSubchannels(); + maybeTerminateChannel(); + } + } + + /** + * Must be accessed from channelExecutor. + */ + private final class IdleModeStateAggregator extends InUseStateAggregator<Object> { + @Override + void handleInUse() { + exitIdleMode(); + } + + @Override + void handleNotInUse() { + if (shutdown.get()) { + return; + } + rescheduleIdleTimer(); + } + } } |