@override
public object postprocessafterinitialization(object bean, string beanname) {
if (bean instanceof aopinfrastructurebean || bean instanceof taskscheduler ||
bean instanceof scheduledexecutorservice) {
// ignore aop infrastructure such as scoped proxies.
return bean;
}
class<?> targetclass = aopproxyutils.ultimatetargetclass(bean);
if (!this.nonannotatedclasses.contains(targetclass)) {
map<method, set<scheduled>> annotatedmethods = methodintrospector.selectmethods(targetclass,
(methodintrospector.metadatalookup<set<scheduled>>) method -> {
set<scheduled> scheduledmethods = annotatedelementutils.getmergedrepeatableannotations(
method, scheduled.class, schedules.class);
return (!scheduledmethods.isempty() ? scheduledmethods : null);
});
if (annotatedmethods.isempty()) {
this.nonannotatedclasses.add(targetclass);
if (logger.istraceenabled()) {
logger.trace("no @scheduled annotations found on bean class: " + targetclass);
}
}
else {
// non-empty set of methods
annotatedmethods.foreach((method, scheduledmethods) ->
scheduledmethods.foreach(scheduled -> processscheduled(scheduled, method, bean)));
if (logger.istraceenabled()) {
logger.trace(annotatedmethods.size() + " @scheduled methods processed on bean '" + beanname +
"': " + annotatedmethods);
}
}
}
return bean;
}
1、处理scheduled注解,通过scheduledtaskregistrar注册定时任务。
private void finishregistration() {
if (this.scheduler != null) {
this.registrar.setscheduler(this.scheduler);
}
if (this.beanfactory instanceof listablebeanfactory) {
map<string, schedulingconfigurer> beans =
((listablebeanfactory) this.beanfactory).getbeansoftype(schedulingconfigurer.class);
list<schedulingconfigurer> configurers = new arraylist<>(beans.values());
annotationawareordercomparator.sort(configurers);
for (schedulingconfigurer configurer : configurers) {
configurer.configuretasks(this.registrar);
}
}
if (this.registrar.hastasks() && this.registrar.getscheduler() == null) {
assert.state(this.beanfactory != null, "beanfactory must be set to find scheduler by type");
try {
// search for taskscheduler bean...
this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, false));
}
catch (nouniquebeandefinitionexception ex) {
logger.trace("could not find unique taskscheduler bean", ex);
try {
this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, true));
}
catch (nosuchbeandefinitionexception ex2) {
if (logger.isinfoenabled()) {
logger.info("more than one taskscheduler bean exists within the context, and " +
"none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " +
"(possibly as an alias); or implement the schedulingconfigurer interface and call " +
"scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " +
ex.getbeannamesfound());
}
}
}
catch (nosuchbeandefinitionexception ex) {
logger.trace("could not find default taskscheduler bean", ex);
// search for scheduledexecutorservice bean next...
try {
this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, false));
}
catch (nouniquebeandefinitionexception ex2) {
logger.trace("could not find unique scheduledexecutorservice bean", ex2);
try {
this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, true));
}
catch (nosuchbeandefinitionexception ex3) {
if (logger.isinfoenabled()) {
logger.info("more than one scheduledexecutorservice bean exists within the context, and " +
"none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " +
"(possibly as an alias); or implement the schedulingconfigurer interface and call " +
"scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " +
ex2.getbeannamesfound());
}
}
}
catch (nosuchbeandefinitionexception ex2) {
logger.trace("could not find default scheduledexecutorservice bean", ex2);
// giving up -> falling back to default scheduler within the registrar...
logger.info("no taskscheduler/scheduledexecutorservice bean found for scheduled processing");
}
}
}
this.registrar.afterpropertiesset();
}
/**
* creates with given first task and thread from threadfactory.
* @param firsttask the first task (null if none)
*/
worker(runnable firsttask) {
setstate(-1); // inhibit interrupts until runworker
this.firsttask = firsttask;
this.thread = getthreadfactory().newthread(this);
}
/** delegates main run loop to outer runworker */
public void run() {
runworker(this);
}
1、worker中维护了thread对象,thread对象的runnable实例即worker自身
2、threadpoolexecutor#addworker方法中会创建worker对象,然后拿到worker中的thread实例并start,这样就创建了线程池中的一个线程实例
3、worker的run方法会调用threadpoolexecutor#runworker方法,这才是任务最终被执行的地方,该方法示意如下
(1)首先取传入的task执行,如果task是null,只要该线程池处于运行状态,就会通过gettask方法从workqueue中取任务。threadpoolexecutor的execute方法会在无法产生core线程的时候向workqueue队列中offer任务。
gettask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果gettask方法结束,返回的是null,runworker循环结束,执行processworkerexit方法。
至此,该线程结束自己的使命,从线程池中“消失”。
(2)在开始执行任务之前,会调用worker的lock方法,目的是阻止task正在被执行的时候被interrupt,通过调用clearinterruptsfortaskrun方法来保证的(后面可以看一下这个方法),该线程没有自己的interrupt set了。
(3)beforeexecute和afterexecute方法用于在执行任务前后执行一些自定义的操作,这两个方法是空的,留给继承类去填充功能。
我们可以在beforeexecute方法中抛出异常,这样task不会被执行,而且在跳出该循环的时候completedabruptly的值是true,表示the worker died due to user exception,会用decrementworkercount调整wc。
(4)因为runnable的run方法不能抛出throwables异常,所以这里重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterexecute中对异常做一些处理。
(5)afterexecute方法也可能抛出异常,也可能使当前线程死掉。 v四、动态创建定时任务 vtaskconfiguration 配置类
@configuration
@enablescheduling
@role(beandefinition.role_infrastructure)
public class taskconfiguration {
@bean(name = scheduledannotationbeanpostprocessor.default_task_scheduler_bean_name)
@role(beandefinition.role_infrastructure)
public scheduledexecutorservice scheduledannotationprocessor() {
return executors.newscheduledthreadpool(5, new defaultthreadfactory());
}
private static class defaultthreadfactory implements threadfactory {
private static final atomicinteger poolnumber = new atomicinteger(1);
private final threadgroup group;
private final atomicinteger threadnumber = new atomicinteger(1);
private final string nameprefix;
defaultthreadfactory() {
securitymanager s = system.getsecuritymanager();
group = (s != null) ? s.getthreadgroup() :
thread.currentthread().getthreadgroup();
nameprefix = "pool-" +
poolnumber.getandincrement() +
"-schedule-";
}
@override
public thread newthread(runnable r) {
thread t = new thread(group, r,
nameprefix + threadnumber.getandincrement(),
0);
if (t.isdaemon()) {
t.setdaemon(false);
}
if (t.getpriority() != thread.norm_priority) {
t.setpriority(thread.norm_priority);
}
return t;
}
}
}