如何优雅的关闭 Java线程池
如何優(yōu)雅的關閉 Java線程池
簡介 在開發(fā)中使用線程池去執(zhí)行異步任務是比較普遍的操作 ,然而雖然有些異步操作我們并不十分要求可靠性和實時性,但總歸業(yè)務還是需要的 。如果在每次的服務發(fā)版過程中,我們不去介入線程池的停機邏輯 ,那么很有可能就會造成線程池中隊列的任務還未執(zhí)行完成 ,自然就會造成數(shù)據(jù)的丟失 。
探究
注意,本文所有前提是對進程進行下線時使用的是kill -15
我們知道Spring已經實現(xiàn)了自己的優(yōu)雅停機方案,詳細請參考org.springframework.context.support.AbstractApplicationContext#registerShutdownHook ,然后主要看調用的org.springframework.context.support.AbstractApplicationContext#doClose, 在這個方法里定義了容器銷毀的執(zhí)行順序
protected void doClose() { // Check whether an actual close attempt is necessary... if (this.active.get() && this.closed.compareAndSet(false, true)) { if (logger.isDebugEnabled()) { logger.debug("Closing " + this); } LiveBeansView.unregisterApplicationContext(this); try { // Publish shutdown event. publishEvent(new ContextClosedEvent(this)); } catch (Throwable ex) { logger.warn("Exception thrown from ApplicationListener handling ContextClosedEvent", ex); } // Stop all Lifecycle beans, to avoid delays during inpidual destruction. if (this.lifecycleProcessor != null) { try { this.lifecycleProcessor.onClose(); } catch (Throwable ex) { logger.warn("Exception thrown from LifecycleProcessor on context close", ex); } } // Destroy all cached singletons in the context's BeanFactory. destroyBeans(); // Close the state of this context itself. closeBeanFactory(); // Let subclasses do some final clean-up if they wish... onClose(); // Reset local application listeners to pre-refresh state. if (this.earlyApplicationListeners != null) { this.applicationListeners.clear(); this.applicationListeners.addAll(this.earlyApplicationListeners); } // Switch to inactive. this.active.set(false); } }我們先主要關注下destroyBeans這個方法,看bean的銷毀邏輯是什么 ,然后看到了下面的一個bean的銷毀順序邏輯,具體方法在org.springframework.beans.factory.support.DefaultSingletonBeanRegistry#destroySingletons
private final MapdisposableBeans = new LinkedHashMap<>(); public void destroySingletons() { if (logger.isTraceEnabled()) { logger.trace("Destroying singletons in " + this); } synchronized (this.singletonObjects) { this.singletonsCurrentlyInDestruction = true; } String[] disposableBeanNames; synchronized (this.disposableBeans) { disposableBeanNames = StringUtils.toStringArray(this.disposableBeans.keySet()); } for (int i = disposableBeanNames.length - 1; i >= 0; i--) { destroySingleton(disposableBeanNames[i]); } this.containedBeanMap.clear(); this.dependentBeanMap.clear(); this.dependenciesForBeanMap.clear(); clearSingletonCache(); } 可以看到最至關重要的就是一個屬性disposableBeans,這個屬性是一個LinkedHashMap , 因此屬性是有序的 ,所以銷毀的時候也是按照某種規(guī)則保持和放入一樣的順序進行銷毀的 ,現(xiàn)在就是要確認這個屬性里到底存的是什么 。
經過調試發(fā)現(xiàn) ,在創(chuàng)建bean的org.springframework.beans.factory.support.AbstractAutowireCapableBeanFactory#doCreateBean方法中,會調用一個方法org.springframework.beans.factory.support.AbstractBeanFactory#registerDisposableBeanIfNecessary, 在這個方法中會調用org.springframework.beans.factory.support.DefaultSingletonBeanRegistry#registerDisposableBean然后將當前創(chuàng)建的bean放入到屬性disposableBeans中 ,那么現(xiàn)在來看一下放入的邏輯什么 ?
相關代碼貼一下
protected void registerDisposableBeanIfNecessary(String beanName, Object bean, RootBeanDefinition mbd) { AccessControlContext acc = (System.getSecurityManager() != null ? getAccessControlContext() : null); if (!mbd.isPrototype() && requiresDestruction(bean, mbd)) { if (mbd.isSingleton()) { // Register a DisposableBean implementation that performs all destruction // work for the given bean: DestructionAwareBeanPostProcessors, // DisposableBean interface, custom destroy method. registerDisposableBean(beanName, new DisposableBeanAdapter(bean, beanName, mbd, getBeanPostProcessors(), acc)); } else { // A bean with a custom scope... Scope scope = this.scopes.get(mbd.getScope()); if (scope == null) { throw new IllegalStateException("No Scope registered for scope name '" + mbd.getScope() + "'"); } scope.registerDestructionCallback(beanName, new DisposableBeanAdapter(bean, beanName, mbd, getBeanPostProcessors(), acc)); } }}org.springframework.beans.factory.support.AbstractBeanFactory#requiresDestruction
protected boolean requiresDestruction(Object bean, RootBeanDefinition mbd) { return (bean != null && (DisposableBeanAdapter.hasDestroyMethod(bean, mbd) || (hasDestructionAwareBeanPostProcessors() && DisposableBeanAdapter.hasApplicableProcessors(bean, getBeanPostProcessors())))); }經過兩個方法可以看到如果一個bean的scope是singleton并且這個bean實現(xiàn)了org.springframework.beans.factory.DisposableBean這個接口的destroy()方法,那么就會滿足條件 。
現(xiàn)在可以確定一點 ,如果我們將線程池交給Spring管理 ,并且實現(xiàn)它的close方法,就可以在應用收到下線信號的時候執(zhí)行這個bean的銷毀方法,那么我們就可以在銷毀方法中寫線程池的停機邏輯 。
我們知道Spring提供了線程池的封裝,在Spring中如果我們要定義線程池一般會使用org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor以及用于任務調度的org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler,先來簡單看個定義ThreadPoolTaskExecutor線程池的例子
@Configurationpublic class ThreadConfig { @Bean public ThreadPoolTaskExecutor testExecutor() { ThreadPoolTaskExecutor threadPoolTaskExecutor = new ThreadPoolTaskExecutor(); threadPoolTaskExecutor.setThreadNamePrefix("test-shutdown-pool-"); threadPoolTaskExecutor.setCorePoolSize(1); threadPoolTaskExecutor.setMaxPoolSize(1); threadPoolTaskExecutor.setKeepAliveSeconds(60); threadPoolTaskExecutor.setQueueCapacity(1000); threadPoolTaskExecutor.setRejectedExecutionHandler(new ThreadPoolExecutor.AbortPolicy()); return threadPoolTaskExecutor; }}現(xiàn)在來一下線程池的這個類結構 ,ThreadPoolTaskExecutor繼承了org.springframework.scheduling.concurrent.ExecutorConfigurationSupport, 實現(xiàn)了org.springframework.beans.factory.DisposableBean,完整結構如下
public class ThreadPoolTaskExecutor extends ExecutorConfigurationSupport implements AsyncListenableTaskExecutor, SchedulingTaskExecutor { }public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean { }從這里就能看到其實線程池類ThreadPoolTaskExecutor是滿足最開始看到的銷毀條件的,那么現(xiàn)在就來看下在父類ExecutorConfigurationSupport中定義的destroy()方法,將其中關鍵部分代碼摘錄下來
public abstract class ExecutorConfigurationSupport extends CustomizableThreadFactory implements BeanNameAware, InitializingBean, DisposableBean { private boolean waitForTasksToCompleteOnShutdown = false; private long awaitTerminationMillis = 0; @Nullable private ExecutorService executor; @Override public void destroy() { shutdown(); } /** * Perform a shutdown on the underlying ExecutorService. * @see java.util.concurrent.ExecutorService#shutdown() * @see java.util.concurrent.ExecutorService#shutdownNow() */ public void shutdown() { if (logger.isInfoEnabled()) { logger.info("Shutting down ExecutorService" + (this.beanName != null ? " '" + this.beanName + "'" : "")); } if (this.executor != null) { if (this.waitForTasksToCompleteOnShutdown) { this.executor.shutdown(); } else { for (Runnable remainingTask : this.executor.shutdownNow()) { cancelRemainingTask(remainingTask); } } awaitTerminationIfNecessary(this.executor); } } private void awaitTerminationIfNecessary(ExecutorService executor) { if (this.awaitTerminationMillis >0) { try { if (!executor.awaitTermination(this.awaitTerminationMillis, TimeUnit.MILLISECONDS)) { if (logger.isWarnEnabled()) { logger.warn("Timed out while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate"); } } } catch (InterruptedException ex) { if (logger.isWarnEnabled()) { logger.warn("Interrupted while waiting for executor" + (this.beanName != null ? " '" + this.beanName + "'" : "") + " to terminate"); } Thread.currentThread().interrupt(); } } } protected void cancelRemainingTask(Runnable task) { if (task instanceof Future) { ((Future<?>) task).cancel(true); } }}整個的邏輯還是比較清晰的