Spring boot定时任务的原理及动态创建详解
这篇文章主要给大家介绍了关于Spring boot定时任务的原理及动态创建的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧v一、前言
定时任务一般是项目中都需要用到的,可以用于定时处理一些特殊的任务。这篇文章主要给大家介绍了关于spring boot定时任务的原理及动态创建的相关内容,下面来一起看看详细的介绍吧
上周工作遇到了一个需求,同步多个省份销号数据,解绑微信粉丝。分省定时将销号数据放到sftp服务器上,我需要开发定时任务去解析文件。因为是多省份,服务器、文件名规则、数据规则都不一定,所以要做成可配置是有一定难度的。数据规则这块必须强烈要求统一,服务器、文件名规则都可以从配置中心去读。每新增一个省份的配置,后台感知到后,动态生成定时任务。
v二、springboot引入定时任务核心配置
@target(elementtype.type)
@retention(retentionpolicy.runtime)
@import(schedulingconfiguration.class)
@documented
public @interface enablescheduling {
}
@configuration
@role(beandefinition.role_infrastructure)
public class schedulingconfiguration {
@bean(name = taskmanagementconfigutils.scheduled_annotation_processor_bean_name)
@role(beandefinition.role_infrastructure)
public scheduledannotationbeanpostprocessor scheduledannotationprocessor() {
return new scheduledannotationbeanpostprocessor();
}
}
接下来主要看一下这个核心后置处理器:scheduledannotationbeanpostprocessor 。
@override
public object postprocessafterinitialization(object bean, string beanname) {
if (bean instanceof aopinfrastructurebean || bean instanceof taskscheduler ||
bean instanceof scheduledexecutorservice) {
// ignore aop infrastructure such as scoped proxies.
return bean;
}
class<?> targetclass = aopproxyutils.ultimatetargetclass(bean);
if (!this.nonannotatedclasses.contains(targetclass)) {
map<method, set<scheduled>> annotatedmethods = methodintrospector.selectmethods(targetclass,
(methodintrospector.metadatalookup<set<scheduled>>) method -> {
set<scheduled> scheduledmethods = annotatedelementutils.getmergedrepeatableannotations(
method, scheduled.class, schedules.class);
return (!scheduledmethods.isempty() ? scheduledmethods : null);
});
if (annotatedmethods.isempty()) {
this.nonannotatedclasses.add(targetclass);
if (logger.istraceenabled()) {
logger.trace("no @scheduled annotations found on bean class: " + targetclass);
}
}
else {
// non-empty set of methods
annotatedmethods.foreach((method, scheduledmethods) ->
scheduledmethods.foreach(scheduled -> processscheduled(scheduled, method, bean)));
if (logger.istraceenabled()) {
logger.trace(annotatedmethods.size() + " @scheduled methods processed on bean '" + beanname +
"': " + annotatedmethods);
}
}
}
return bean;
}
1、处理scheduled注解,通过scheduledtaskregistrar注册定时任务。
private void finishregistration() {
if (this.scheduler != null) {
this.registrar.setscheduler(this.scheduler);
}
if (this.beanfactory instanceof listablebeanfactory) {
map<string, schedulingconfigurer> beans =
((listablebeanfactory) this.beanfactory).getbeansoftype(schedulingconfigurer.class);
list<schedulingconfigurer> configurers = new arraylist<>(beans.values());
annotationawareordercomparator.sort(configurers);
for (schedulingconfigurer configurer : configurers) {
configurer.configuretasks(this.registrar);
}
}
if (this.registrar.hastasks() && this.registrar.getscheduler() == null) {
assert.state(this.beanfactory != null, "beanfactory must be set to find scheduler by type");
try {
// search for taskscheduler bean...
this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, false));
}
catch (nouniquebeandefinitionexception ex) {
logger.trace("could not find unique taskscheduler bean", ex);
try {
this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, true));
}
catch (nosuchbeandefinitionexception ex2) {
if (logger.isinfoenabled()) {
logger.info("more than one taskscheduler bean exists within the context, and " +
"none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " +
"(possibly as an alias); or implement the schedulingconfigurer interface and call " +
"scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " +
ex.getbeannamesfound());
}
}
}
catch (nosuchbeandefinitionexception ex) {
logger.trace("could not find default taskscheduler bean", ex);
// search for scheduledexecutorservice bean next...
try {
this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, false));
}
catch (nouniquebeandefinitionexception ex2) {
logger.trace("could not find unique scheduledexecutorservice bean", ex2);
try {
this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, true));
}
catch (nosuchbeandefinitionexception ex3) {
if (logger.isinfoenabled()) {
logger.info("more than one scheduledexecutorservice bean exists within the context, and " +
"none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " +
"(possibly as an alias); or implement the schedulingconfigurer interface and call " +
"scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " +
ex2.getbeannamesfound());
}
}
}
catch (nosuchbeandefinitionexception ex2) {
logger.trace("could not find default scheduledexecutorservice bean", ex2);
// giving up -> falling back to default scheduler within the registrar...
logger.info("no taskscheduler/scheduledexecutorservice bean found for scheduled processing");
}
}
}
this.registrar.afterpropertiesset();
}
1、通过一系列的schedulingconfigurer动态配置scheduledtaskregistrar。
2、向scheduledtaskregistrar注册一个taskscheduler(用于对runnable的任务进行调度,它包含有多种触发规则)。
3、registrar.afterpropertiesset(),在这开始安排所有的定时任务开始执行了。
protected void scheduletasks() {
if (this.taskscheduler == null) {
this.localexecutor = executors.newsinglethreadscheduledexecutor();
this.taskscheduler = new concurrenttaskscheduler(this.localexecutor);
}
if (this.triggertasks != null) {
for (triggertask task : this.triggertasks) {
addscheduledtask(scheduletriggertask(task));
}
}
if (this.crontasks != null) {
for (crontask task : this.crontasks) {
addscheduledtask(schedulecrontask(task));
}
}
if (this.fixedratetasks != null) {
for (intervaltask task : this.fixedratetasks) {
addscheduledtask(schedulefixedratetask(task));
}
}
if (this.fixeddelaytasks != null) {
for (intervaltask task : this.fixeddelaytasks) {
addscheduledtask(schedulefixeddelaytask(task));
}
}
}
1、triggertask:动态定时任务。通过trigger#nextexecutiontime 给定的触发上下文确定下一个执行时间。
2、crontask:动态定时任务,triggertask子类。通过cron表达式确定的时间触发下一个任务执行。
3、intervaltask:一定时间延迟之后,周期性执行的任务。
4、taskscheduler 如果为空,默认是concurrenttaskscheduler,并使用默认单线程的scheduledexecutor。
v三、主要看一下crontask工作原理
scheduledtaskregistrar.java
@nullable
public scheduledtask schedulecrontask(crontask task) {
scheduledtask scheduledtask = this.unresolvedtasks.remove(task);
boolean newtask = false;
if (scheduledtask == null) {
scheduledtask = new scheduledtask(task);
newtask = true;
}
if (this.taskscheduler != null) {
scheduledtask.future = this.taskscheduler.schedule(task.getrunnable(), task.gettrigger());
}
else {
addcrontask(task);
this.unresolvedtasks.put(task, scheduledtask);
}
return (newtask ? scheduledtask : null);
}
concurrenttaskscheduler.java
@override
@nullable
public scheduledfuture<?> schedule(runnable task, trigger trigger) {
try {
if (this.enterpriseconcurrentscheduler) {
return new enterpriseconcurrenttriggerscheduler().schedule(decoratetask(task, true), trigger);
}
else {
errorhandler errorhandler =
(this.errorhandler != null ? this.errorhandler : taskutils.getdefaulterrorhandler(true));
return new reschedulingrunnable(task, trigger, this.scheduledexecutor, errorhandler).schedule();
}
}
catch (rejectedexecutionexception ex) {
throw new taskrejectedexception("executor [" + this.scheduledexecutor + "] did not accept task: " + task, ex);
}
}
reschedulingrunnable.java
@nullable
public scheduledfuture<?> schedule() {
synchronized (this.triggercontextmonitor) {
this.scheduledexecutiontime = this.trigger.nextexecutiontime(this.triggercontext);
if (this.scheduledexecutiontime == null) {
return null;
}
long initialdelay = this.scheduledexecutiontime.gettime() - system.currenttimemillis();
this.currentfuture = this.executor.schedule(this, initialdelay, timeunit.milliseconds);
return this;
}
}
private scheduledfuture<?> obtaincurrentfuture() {
assert.state(this.currentfuture != null, "no scheduled future");
return this.currentfuture;
}
@override
public void run() {
date actualexecutiontime = new date();
super.run();
date completiontime = new date();
synchronized (this.triggercontextmonitor) {
assert.state(this.scheduledexecutiontime != null, "no scheduled execution");
this.triggercontext.update(this.scheduledexecutiontime, actualexecutiontime, completiontime);
if (!obtaincurrentfuture().iscancelled()) {
schedule();
}
}
}
1、最终将task和trigger都封装到了reschedulingrunnable中。
2、reschedulingrunnable实现了任务重复调度(schedule方法中调用调度器executor并传入自身对象,executor会调用run方法,run方法又调用了schedule方法)。
3、reschedulingrunnable schedule方法加了同步锁,只能有一个线程拿到下次执行时间并加入执行器的调度。
4、不同的reschedulingrunnable对象之间在线程池够用的情况下是不会相互影响的,也就是说满足线程池的条件下,taskscheduler的schedule方法的多次调用是可以交叉执行的。
scheduledthreadpoolexecutor.java
public scheduledfuture<?> schedule(runnable command,
long delay,
timeunit unit) {
if (command == null || unit == null)
throw new nullpointerexception();
runnablescheduledfuture<?> t = decoratetask(command,
new scheduledfuturetask<void>(command, null,
triggertime(delay, unit)));
delayedexecute(t);
return t;
}
private void delayedexecute(runnablescheduledfuture<?> task) {
if (isshutdown())
reject(task);
else {
super.getqueue().add(task);
if (isshutdown() &&
!canrunincurrentrunstate(task.isperiodic()) &&
remove(task))
task.cancel(false);
else
ensureprestart();
}
}
scheduledfuturetask 工作原理如下图所示【太懒了,不想画图了,盗图一张】。
1、scheduledfuturetask会放入优先阻塞队列:scheduledthreadpoolexecutor.delayedworkqueue(二叉最小堆实现)
2、上图中的thread对象即threadpoolexecutor.worker,实现了runnable接口
/**
* creates with given first task and thread from threadfactory.
* @param firsttask the first task (null if none)
*/
worker(runnable firsttask) {
setstate(-1); // inhibit interrupts until runworker
this.firsttask = firsttask;
this.thread = getthreadfactory().newthread(this);
}
/** delegates main run loop to outer runworker */
public void run() {
runworker(this);
}
1、worker中维护了thread对象,thread对象的runnable实例即worker自身
2、threadpoolexecutor#addworker方法中会创建worker对象,然后拿到worker中的thread实例并start,这样就创建了线程池中的一个线程实例
3、worker的run方法会调用threadpoolexecutor#runworker方法,这才是任务最终被执行的地方,该方法示意如下
(1)首先取传入的task执行,如果task是null,只要该线程池处于运行状态,就会通过gettask方法从workqueue中取任务。threadpoolexecutor的execute方法会在无法产生core线程的时候向workqueue队列中offer任务。
gettask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果gettask方法结束,返回的是null,runworker循环结束,执行processworkerexit方法。
至此,该线程结束自己的使命,从线程池中“消失”。
(2)在开始执行任务之前,会调用worker的lock方法,目的是阻止task正在被执行的时候被interrupt,通过调用clearinterruptsfortaskrun方法来保证的(后面可以看一下这个方法),该线程没有自己的interrupt set了。
(3)beforeexecute和afterexecute方法用于在执行任务前后执行一些自定义的操作,这两个方法是空的,留给继承类去填充功能。
我们可以在beforeexecute方法中抛出异常,这样task不会被执行,而且在跳出该循环的时候completedabruptly的值是true,表示the worker died due to user exception,会用decrementworkercount调整wc。
(4)因为runnable的run方法不能抛出throwables异常,所以这里重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterexecute中对异常做一些处理。
(5)afterexecute方法也可能抛出异常,也可能使当前线程死掉。
v四、动态创建定时任务
vtaskconfiguration 配置类
@configuration
@enablescheduling
@role(beandefinition.role_infrastructure)
public class taskconfiguration {
@bean(name = scheduledannotationbeanpostprocessor.default_task_scheduler_bean_name)
@role(beandefinition.role_infrastructure)
public scheduledexecutorservice scheduledannotationprocessor() {
return executors.newscheduledthreadpool(5, new defaultthreadfactory());
}
private static class defaultthreadfactory implements threadfactory {
private static final atomicinteger poolnumber = new atomicinteger(1);
private final threadgroup group;
private final atomicinteger threadnumber = new atomicinteger(1);
private final string nameprefix;
defaultthreadfactory() {
securitymanager s = system.getsecuritymanager();
group = (s != null) ? s.getthreadgroup() :
thread.currentthread().getthreadgroup();
nameprefix = "pool-" +
poolnumber.getandincrement() +
"-schedule-";
}
@override
public thread newthread(runnable r) {
thread t = new thread(group, r,
nameprefix + threadnumber.getandincrement(),
0);
if (t.isdaemon()) {
t.setdaemon(false);
}
if (t.getpriority() != thread.norm_priority) {
t.setpriority(thread.norm_priority);
}
return t;
}
}
}
1、保证concurrenttaskscheduler不使用默认单线程的scheduledexecutor,而是corepoolsize=5的线程池
2、自定义线程池工厂类
vdynamictask 动态定时任务
@configuration
public class dynamictask implements schedulingconfigurer {
private static logger logger = loggerfactory.getlogger(dynamictask.class);
private static final executorservice es = new threadpoolexecutor(10, 20,
0l, timeunit.milliseconds,
new linkedblockingqueue<>(10),
new dynamictaskconsumethreadfactory());
private volatile scheduledtaskregistrar registrar;
private final concurrenthashmap<string, scheduledfuture<?>> scheduledfutures = new concurrenthashmap<>();
private final concurrenthashmap<string, crontask> crontasks = new concurrenthashmap<>();
private volatile list<taskconstant> taskconstants = lists.newarraylist();
@override
public void configuretasks(scheduledtaskregistrar registrar) {
this.registrar = registrar;
this.registrar.addtriggertask(() -> {
if (!collectionutils.isempty(taskconstants)) {
logger.info("检测动态定时任务列表...");
list<timingtask> tts = new arraylist<>();
taskconstants
.foreach(taskconstant -> {
timingtask tt = new timingtask();
tt.setexpression(taskconstant.getcron());
tt.settaskid("dynamic-task-" + taskconstant.gettaskid());
tts.add(tt);
});
this.refreshtasks(tts);
}
}
, triggercontext -> new periodictrigger(5l, timeunit.seconds).nextexecutiontime(triggercontext));
}
public list<taskconstant> gettaskconstants() {
return taskconstants;
}
private void refreshtasks(list<timingtask> tasks) {
//取消已经删除的策略任务
set<string> taskids = scheduledfutures.keyset();
for (string taskid : taskids) {
if (!exists(tasks, taskid)) {
scheduledfutures.get(taskid).cancel(false);
}
}
for (timingtask tt : tasks) {
string expression = tt.getexpression();
if (stringutils.isblank(expression) || !cronsequencegenerator.isvalidexpression(expression)) {
logger.error("定时任务dynamictask cron表达式不合法: " + expression);
continue;
}
//如果配置一致,则不需要重新创建定时任务
if (scheduledfutures.containskey(tt.gettaskid())
&& crontasks.get(tt.gettaskid()).getexpression().equals(expression)) {
continue;
}
//如果策略执行时间发生了变化,则取消当前策略的任务
if (scheduledfutures.containskey(tt.gettaskid())) {
scheduledfutures.remove(tt.gettaskid()).cancel(false);
crontasks.remove(tt.gettaskid());
}
crontask task = new crontask(tt, expression);
scheduledfuture<?> future = registrar.getscheduler().schedule(task.getrunnable(), task.gettrigger());
crontasks.put(tt.gettaskid(), task);
scheduledfutures.put(tt.gettaskid(), future);
}
}
private boolean exists(list<timingtask> tasks, string taskid) {
for (timingtask task : tasks) {
if (task.gettaskid().equals(taskid)) {
return true;
}
}
return false;
}
@predestroy
public void destroy() {
this.registrar.destroy();
}
public static class taskconstant {
private string cron;
private string taskid;
public string getcron() {
return cron;
}
public void setcron(string cron) {
this.cron = cron;
}
public string gettaskid() {
return taskid;
}
public void settaskid(string taskid) {
this.taskid = taskid;
}
}
private class timingtask implements runnable {
private string expression;
private string taskid;
public string gettaskid() {
return taskid;
}
public void settaskid(string taskid) {
this.taskid = taskid;
}
@override
public void run() {
//设置队列大小10
logger.error("当前crontask: " + this);
dynamicblockingqueue queue = new dynamicblockingqueue(3);
es.submit(() -> {
while (!queue.isdone() || !queue.isempty()) {
try {
string content = queue.poll(500, timeunit.milliseconds);
if (stringutils.isblank(content)) {
return;
}
logger.info("dynamicblockingqueue 消费:" + content);
timeunit.milliseconds.sleep(500);
} catch (interruptedexception e) {
e.printstacktrace();
}
}
});
//队列放入数据
for (int i = 0; i < 5; ++i) {
try {
queue.put(string.valueof(i));
logger.info("dynamicblockingqueue 生产:" + i);
} catch (interruptedexception e) {
e.printstacktrace();
}
}
queue.setdone(true);
}
public string getexpression() {
return expression;
}
public void setexpression(string expression) {
this.expression = expression;
}
@override
public string tostring() {
return reflectiontostringbuilder.tostring(this
, tostringstyle.json_style
, false
, false
, timingtask.class);
}
}
/**
* 队列消费线程工厂类
*/
private static class dynamictaskconsumethreadfactory implements threadfactory {
private static final atomicinteger poolnumber = new atomicinteger(1);
private final threadgroup group;
private final atomicinteger threadnumber = new atomicinteger(1);
private final string nameprefix;
dynamictaskconsumethreadfactory() {
securitymanager s = system.getsecuritymanager();
group = (s != null) ? s.getthreadgroup() :
thread.currentthread().getthreadgroup();
nameprefix = "pool-" +
poolnumber.getandincrement() +
"-dynamic-task-";
}
@override
public thread newthread(runnable r) {
thread t = new thread(group, r,
nameprefix + threadnumber.getandincrement(),
0);
if (t.isdaemon()) {
t.setdaemon(false);
}
if (t.getpriority() != thread.norm_priority) {
t.setpriority(thread.norm_priority);
}
return t;
}
}
private static class dynamicblockingqueue extends linkedblockingqueue<string> {
dynamicblockingqueue(int capacity) {
super(capacity);
}
private volatile boolean done = false;
public boolean isdone() {
return done;
}
public void setdone(boolean done) {
this.done = done;
}
}
}
1、taskconstants 动态任务列表
2、scheduledtaskregistrar#addtriggertask 添加动态周期定时任务,检测动态任务列表的变化
crontask task = new crontask(tt, expression);
scheduledfuture<?> future = registrar.getscheduler().schedule(task.getrunnable(), task.gettrigger());
crontasks.put(tt.gettaskid(), task);
scheduledfutures.put(tt.gettaskid(), future);
3、动态创建cron定时任务,拿到scheduledfuture实例并缓存起来
4、在刷新任务列表时,通过缓存的scheduledfuture实例和crontask实例,来决定是否取消、移除失效的动态定时任务。
vdynamictasktest 动态定时任务测试类
@runwith(springrunner.class)
@springboottest
public class dynamictasktest {
@autowired
private dynamictask dynamictask;
@test
public void test() throws interruptedexception {
list<dynamictask.taskconstant> taskconstans = dynamictask.gettaskconstants();
dynamictask.taskconstant taskconstant = new dynamictask.taskconstant();
taskconstant.setcron("0/5 * * * * ?");
taskconstant.settaskid("test1");
taskconstans.add(taskconstant);
dynamictask.taskconstant taskconstant1 = new dynamictask.taskconstant();
taskconstant1.setcron("0/5 * * * * ?");
taskconstant1.settaskid("test2");
taskconstans.add(taskconstant1);
dynamictask.taskconstant taskconstant2 = new dynamictask.taskconstant();
taskconstant2.setcron("0/5 * * * * ?");
taskconstant2.settaskid("test3");
taskconstans.add(taskconstant2);
timeunit.seconds.sleep(40);
//移除并添加新的配置
taskconstans.remove(taskconstans.size() - 1);
dynamictask.taskconstant taskconstant3 = new dynamictask.taskconstant();
taskconstant3.setcron("0/5 * * * * ?");
taskconstant3.settaskid("test4");
taskconstans.add(taskconstant3);
//
timeunit.minutes.sleep(50);
}
}
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对CodeAE代码之家的支持。
原文链接:https://www.cnblogs.com/hujunzheng/p/10353390.html
http://www.zzvips.com/article/177217.html
页:
[1]