评论

收藏

[Java] Spring boot定时任务的原理及动态创建详解

编程语言 编程语言 发布于:2021-09-18 16:12 | 阅读数:529 | 评论:0

这篇文章主要给大家介绍了关于Spring boot定时任务的原理及动态创建的相关资料,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友们下面来一起学习学习吧
v一、前言
定时任务一般是项目中都需要用到的,可以用于定时处理一些特殊的任务。这篇文章主要给大家介绍了关于spring boot定时任务的原理及动态创建的相关内容,下面来一起看看详细的介绍吧
上周工作遇到了一个需求,同步多个省份销号数据,解绑微信粉丝。分省定时将销号数据放到sftp服务器上,我需要开发定时任务去解析文件。因为是多省份,服务器、文件名规则、数据规则都不一定,所以要做成可配置是有一定难度的。数据规则这块必须强烈要求统一,服务器、文件名规则都可以从配置中心去读。每新增一个省份的配置,后台感知到后,动态生成定时任务。
v二、springboot引入定时任务核心配置
@target(elementtype.type)
@retention(retentionpolicy.runtime)
@import(schedulingconfiguration.class)
@documented
public @interface enablescheduling {
 
}
 
@configuration
@role(beandefinition.role_infrastructure)
public class schedulingconfiguration {
 
 @bean(name = taskmanagementconfigutils.scheduled_annotation_processor_bean_name)
 @role(beandefinition.role_infrastructure)
 public scheduledannotationbeanpostprocessor scheduledannotationprocessor() {
 return new scheduledannotationbeanpostprocessor();
 }
 
}
接下来主要看一下这个核心后置处理器:scheduledannotationbeanpostprocessor 。
@override
public object postprocessafterinitialization(object bean, string beanname) {
 if (bean instanceof aopinfrastructurebean || bean instanceof taskscheduler ||
  bean instanceof scheduledexecutorservice) {
 // ignore aop infrastructure such as scoped proxies.
 return bean;
 }
 
 class<?> targetclass = aopproxyutils.ultimatetargetclass(bean);
 if (!this.nonannotatedclasses.contains(targetclass)) {
 map<method, set<scheduled>> annotatedmethods = methodintrospector.selectmethods(targetclass,
  (methodintrospector.metadatalookup<set<scheduled>>) method -> {
   set<scheduled> scheduledmethods = annotatedelementutils.getmergedrepeatableannotations(
  method, scheduled.class, schedules.class);
   return (!scheduledmethods.isempty() ? scheduledmethods : null);
  });
 if (annotatedmethods.isempty()) {
  this.nonannotatedclasses.add(targetclass);
  if (logger.istraceenabled()) {
  logger.trace("no @scheduled annotations found on bean class: " + targetclass);
  }
 }
 else {
  // non-empty set of methods
  annotatedmethods.foreach((method, scheduledmethods) ->
   scheduledmethods.foreach(scheduled -> processscheduled(scheduled, method, bean)));
  if (logger.istraceenabled()) {
  logger.trace(annotatedmethods.size() + " @scheduled methods processed on bean '" + beanname +
   "': " + annotatedmethods);
  }
 }
 }
 return bean;
}
1、处理scheduled注解,通过scheduledtaskregistrar注册定时任务。
private void finishregistration() {
 if (this.scheduler != null) {
 this.registrar.setscheduler(this.scheduler);
 }
 
 if (this.beanfactory instanceof listablebeanfactory) {
 map<string, schedulingconfigurer> beans =
  ((listablebeanfactory) this.beanfactory).getbeansoftype(schedulingconfigurer.class);
 list<schedulingconfigurer> configurers = new arraylist<>(beans.values());
 annotationawareordercomparator.sort(configurers);
 for (schedulingconfigurer configurer : configurers) {
  configurer.configuretasks(this.registrar);
 }
 }
 
 if (this.registrar.hastasks() && this.registrar.getscheduler() == null) {
 assert.state(this.beanfactory != null, "beanfactory must be set to find scheduler by type");
 try {
  // search for taskscheduler bean...
  this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, false));
 }
 catch (nouniquebeandefinitionexception ex) {
  logger.trace("could not find unique taskscheduler bean", ex);
  try {
  this.registrar.settaskscheduler(resolveschedulerbean(this.beanfactory, taskscheduler.class, true));
  }
  catch (nosuchbeandefinitionexception ex2) {
  if (logger.isinfoenabled()) {
   logger.info("more than one taskscheduler bean exists within the context, and " +
  "none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " +
  "(possibly as an alias); or implement the schedulingconfigurer interface and call " +
  "scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " +
  ex.getbeannamesfound());
  }
  }
 }
 catch (nosuchbeandefinitionexception ex) {
  logger.trace("could not find default taskscheduler bean", ex);
  // search for scheduledexecutorservice bean next...
  try {
  this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, false));
  }
  catch (nouniquebeandefinitionexception ex2) {
  logger.trace("could not find unique scheduledexecutorservice bean", ex2);
  try {
   this.registrar.setscheduler(resolveschedulerbean(this.beanfactory, scheduledexecutorservice.class, true));
  }
  catch (nosuchbeandefinitionexception ex3) {
   if (logger.isinfoenabled()) {
   logger.info("more than one scheduledexecutorservice bean exists within the context, and " +
  "none is named 'taskscheduler'. mark one of them as primary or name it 'taskscheduler' " +
  "(possibly as an alias); or implement the schedulingconfigurer interface and call " +
  "scheduledtaskregistrar#setscheduler explicitly within the configuretasks() callback: " +
  ex2.getbeannamesfound());
   }
  }
  }
  catch (nosuchbeandefinitionexception ex2) {
  logger.trace("could not find default scheduledexecutorservice bean", ex2);
  // giving up -> falling back to default scheduler within the registrar...
  logger.info("no taskscheduler/scheduledexecutorservice bean found for scheduled processing");
  }
 }
 }
 
 this.registrar.afterpropertiesset();
}
1、通过一系列的schedulingconfigurer动态配置scheduledtaskregistrar。
2、向scheduledtaskregistrar注册一个taskscheduler(用于对runnable的任务进行调度,它包含有多种触发规则)。
3、registrar.afterpropertiesset(),在这开始安排所有的定时任务开始执行了。
protected void scheduletasks() {
 if (this.taskscheduler == null) {
 this.localexecutor = executors.newsinglethreadscheduledexecutor();
 this.taskscheduler = new concurrenttaskscheduler(this.localexecutor);
 }
 if (this.triggertasks != null) {
 for (triggertask task : this.triggertasks) {
  addscheduledtask(scheduletriggertask(task));
 }
 }
 if (this.crontasks != null) {
 for (crontask task : this.crontasks) {
  addscheduledtask(schedulecrontask(task));
 }
 }
 if (this.fixedratetasks != null) {
 for (intervaltask task : this.fixedratetasks) {
  addscheduledtask(schedulefixedratetask(task));
 }
 }
 if (this.fixeddelaytasks != null) {
 for (intervaltask task : this.fixeddelaytasks) {
  addscheduledtask(schedulefixeddelaytask(task));
 }
 }
}
1、triggertask:动态定时任务。通过trigger#nextexecutiontime 给定的触发上下文确定下一个执行时间。
2、crontask:动态定时任务,triggertask子类。通过cron表达式确定的时间触发下一个任务执行。
3、intervaltask:一定时间延迟之后,周期性执行的任务。
4、taskscheduler 如果为空,默认是concurrenttaskscheduler,并使用默认单线程的scheduledexecutor。
v三、主要看一下crontask工作原理
scheduledtaskregistrar.java
@nullable
public scheduledtask schedulecrontask(crontask task) {
 scheduledtask scheduledtask = this.unresolvedtasks.remove(task);
 boolean newtask = false;
 if (scheduledtask == null) {
 scheduledtask = new scheduledtask(task);
 newtask = true;
 }
 if (this.taskscheduler != null) {
 scheduledtask.future = this.taskscheduler.schedule(task.getrunnable(), task.gettrigger());
 }
 else {
 addcrontask(task);
 this.unresolvedtasks.put(task, scheduledtask);
 }
 return (newtask ? scheduledtask : null);
}
 
concurrenttaskscheduler.java
@override
@nullable
public scheduledfuture<?> schedule(runnable task, trigger trigger) {
 try {
 if (this.enterpriseconcurrentscheduler) {
  return new enterpriseconcurrenttriggerscheduler().schedule(decoratetask(task, true), trigger);
 }
 else {
  errorhandler errorhandler =
   (this.errorhandler != null ? this.errorhandler : taskutils.getdefaulterrorhandler(true));
  return new reschedulingrunnable(task, trigger, this.scheduledexecutor, errorhandler).schedule();
 }
 }
 catch (rejectedexecutionexception ex) {
 throw new taskrejectedexception("executor [" + this.scheduledexecutor + "] did not accept task: " + task, ex);
 }
}
 
reschedulingrunnable.java
@nullable
public scheduledfuture<?> schedule() {
 synchronized (this.triggercontextmonitor) {
 this.scheduledexecutiontime = this.trigger.nextexecutiontime(this.triggercontext);
 if (this.scheduledexecutiontime == null) {
  return null;
 }
 long initialdelay = this.scheduledexecutiontime.gettime() - system.currenttimemillis();
 this.currentfuture = this.executor.schedule(this, initialdelay, timeunit.milliseconds);
 return this;
 }
}
 
private scheduledfuture<?> obtaincurrentfuture() {
 assert.state(this.currentfuture != null, "no scheduled future");
 return this.currentfuture;
}
 
@override
public void run() {
 date actualexecutiontime = new date();
 super.run();
 date completiontime = new date();
 synchronized (this.triggercontextmonitor) {
 assert.state(this.scheduledexecutiontime != null, "no scheduled execution");
 this.triggercontext.update(this.scheduledexecutiontime, actualexecutiontime, completiontime);
 if (!obtaincurrentfuture().iscancelled()) {
  schedule();
 }
 }
}
1、最终将task和trigger都封装到了reschedulingrunnable中。
2、reschedulingrunnable实现了任务重复调度(schedule方法中调用调度器executor并传入自身对象,executor会调用run方法,run方法又调用了schedule方法)。
3、reschedulingrunnable schedule方法加了同步锁,只能有一个线程拿到下次执行时间并加入执行器的调度。
4、不同的reschedulingrunnable对象之间在线程池够用的情况下是不会相互影响的,也就是说满足线程池的条件下,taskscheduler的schedule方法的多次调用是可以交叉执行的。
scheduledthreadpoolexecutor.java
public scheduledfuture<?> schedule(runnable command,
   long delay,
   timeunit unit) {
 if (command == null || unit == null)
 throw new nullpointerexception();
 runnablescheduledfuture<?> t = decoratetask(command,
 new scheduledfuturetask<void>(command, null,
   triggertime(delay, unit)));
 delayedexecute(t);
 return t;
}
 
 
private void delayedexecute(runnablescheduledfuture<?> task) {
 if (isshutdown())
 reject(task);
 else {
 super.getqueue().add(task);
 if (isshutdown() &&
  !canrunincurrentrunstate(task.isperiodic()) &&
  remove(task))
  task.cancel(false);
 else
  ensureprestart();
 }
}
scheduledfuturetask 工作原理如下图所示【太懒了,不想画图了,盗图一张】。
DSC0000.jpg

1、scheduledfuturetask会放入优先阻塞队列:scheduledthreadpoolexecutor.delayedworkqueue(二叉最小堆实现)
2、上图中的thread对象即threadpoolexecutor.worker,实现了runnable接口
/**
 * creates with given first task and thread from threadfactory.
 * @param firsttask the first task (null if none)
 */
worker(runnable firsttask) {
 setstate(-1); // inhibit interrupts until runworker
 this.firsttask = firsttask;
 this.thread = getthreadfactory().newthread(this);
}
 
/** delegates main run loop to outer runworker */
public void run() {
 runworker(this);
}
1、worker中维护了thread对象,thread对象的runnable实例即worker自身
2、threadpoolexecutor#addworker方法中会创建worker对象,然后拿到worker中的thread实例并start,这样就创建了线程池中的一个线程实例
3、worker的run方法会调用threadpoolexecutor#runworker方法,这才是任务最终被执行的地方,该方法示意如下
(1)首先取传入的task执行,如果task是null,只要该线程池处于运行状态,就会通过gettask方法从workqueue中取任务。threadpoolexecutor的execute方法会在无法产生core线程的时候向workqueue队列中offer任务。
gettask方法从队列中取task的时候会根据相关配置决定是否阻塞和阻塞多久。如果gettask方法结束,返回的是null,runworker循环结束,执行processworkerexit方法。
至此,该线程结束自己的使命,从线程池中“消失”。
(2)在开始执行任务之前,会调用worker的lock方法,目的是阻止task正在被执行的时候被interrupt,通过调用clearinterruptsfortaskrun方法来保证的(后面可以看一下这个方法),该线程没有自己的interrupt set了。
(3)beforeexecute和afterexecute方法用于在执行任务前后执行一些自定义的操作,这两个方法是空的,留给继承类去填充功能。
我们可以在beforeexecute方法中抛出异常,这样task不会被执行,而且在跳出该循环的时候completedabruptly的值是true,表示the worker died due to user exception,会用decrementworkercount调整wc。
(4)因为runnable的run方法不能抛出throwables异常,所以这里重新包装异常然后抛出,抛出的异常会使当当前线程死掉,可以在afterexecute中对异常做一些处理。
(5)afterexecute方法也可能抛出异常,也可能使当前线程死掉。
v四、动态创建定时任务
vtaskconfiguration 配置类
@configuration
@enablescheduling
@role(beandefinition.role_infrastructure)
public class taskconfiguration {
 
 @bean(name = scheduledannotationbeanpostprocessor.default_task_scheduler_bean_name)
 @role(beandefinition.role_infrastructure)
 public scheduledexecutorservice scheduledannotationprocessor() {
 return executors.newscheduledthreadpool(5, new defaultthreadfactory());
 }
 
 private static class defaultthreadfactory implements threadfactory {
 private static final atomicinteger poolnumber = new atomicinteger(1);
 private final threadgroup group;
 private final atomicinteger threadnumber = new atomicinteger(1);
 private final string nameprefix;
 
 defaultthreadfactory() {
  securitymanager s = system.getsecuritymanager();
  group = (s != null) ? s.getthreadgroup() :
   thread.currentthread().getthreadgroup();
  nameprefix = "pool-" +
   poolnumber.getandincrement() +
   "-schedule-";
 }
 
 @override
 public thread newthread(runnable r) {
  thread t = new thread(group, r,
   nameprefix + threadnumber.getandincrement(),
   0);
  if (t.isdaemon()) {
  t.setdaemon(false);
  }
  if (t.getpriority() != thread.norm_priority) {
  t.setpriority(thread.norm_priority);
  }
  return t;
 }
 }
}
1、保证concurrenttaskscheduler不使用默认单线程的scheduledexecutor,而是corepoolsize=5的线程池
2、自定义线程池工厂类
vdynamictask 动态定时任务
@configuration
public class dynamictask implements schedulingconfigurer {
 private static logger logger = loggerfactory.getlogger(dynamictask.class);
 
 private static final executorservice es = new threadpoolexecutor(10, 20,
   0l, timeunit.milliseconds,
   new linkedblockingqueue<>(10),
   new dynamictaskconsumethreadfactory());
 
 
 private volatile scheduledtaskregistrar registrar;
 private final concurrenthashmap<string, scheduledfuture<?>> scheduledfutures = new concurrenthashmap<>();
 private final concurrenthashmap<string, crontask> crontasks = new concurrenthashmap<>();
 
 private volatile list<taskconstant> taskconstants = lists.newarraylist();
 
 @override
 public void configuretasks(scheduledtaskregistrar registrar) {
  this.registrar = registrar;
  this.registrar.addtriggertask(() -> {
   if (!collectionutils.isempty(taskconstants)) {
    logger.info("检测动态定时任务列表...");
    list<timingtask> tts = new arraylist<>();
    taskconstants
    .foreach(taskconstant -> {
     timingtask tt = new timingtask();
     tt.setexpression(taskconstant.getcron());
     tt.settaskid("dynamic-task-" + taskconstant.gettaskid());
     tts.add(tt);
    });
    this.refreshtasks(tts);
   }
  }
  , triggercontext -> new periodictrigger(5l, timeunit.seconds).nextexecutiontime(triggercontext));
 }
 
 
 public list<taskconstant> gettaskconstants() {
  return taskconstants;
 }
 
 private void refreshtasks(list<timingtask> tasks) {
  //取消已经删除的策略任务
  set<string> taskids = scheduledfutures.keyset();
  for (string taskid : taskids) {
   if (!exists(tasks, taskid)) {
  scheduledfutures.get(taskid).cancel(false);
   }
  }
  for (timingtask tt : tasks) {
   string expression = tt.getexpression();
   if (stringutils.isblank(expression) || !cronsequencegenerator.isvalidexpression(expression)) {
  logger.error("定时任务dynamictask cron表达式不合法: " + expression);
  continue;
   }
   //如果配置一致,则不需要重新创建定时任务
   if (scheduledfutures.containskey(tt.gettaskid())
   && crontasks.get(tt.gettaskid()).getexpression().equals(expression)) {
  continue;
   }
   //如果策略执行时间发生了变化,则取消当前策略的任务
   if (scheduledfutures.containskey(tt.gettaskid())) {
  scheduledfutures.remove(tt.gettaskid()).cancel(false);
  crontasks.remove(tt.gettaskid());
   }
   crontask task = new crontask(tt, expression);
   scheduledfuture<?> future = registrar.getscheduler().schedule(task.getrunnable(), task.gettrigger());
   crontasks.put(tt.gettaskid(), task);
   scheduledfutures.put(tt.gettaskid(), future);
  }
 }
 
 private boolean exists(list<timingtask> tasks, string taskid) {
  for (timingtask task : tasks) {
   if (task.gettaskid().equals(taskid)) {
  return true;
   }
  }
  return false;
 }
 
 @predestroy
 public void destroy() {
  this.registrar.destroy();
 }
 
 public static class taskconstant {
  private string cron;
  private string taskid;
 
  public string getcron() {
   return cron;
  }
 
  public void setcron(string cron) {
   this.cron = cron;
  }
 
  public string gettaskid() {
   return taskid;
  }
 
  public void settaskid(string taskid) {
   this.taskid = taskid;
  }
 }
 
 private class timingtask implements runnable {
  private string expression;
 
  private string taskid;
 
  public string gettaskid() {
   return taskid;
  }
 
  public void settaskid(string taskid) {
   this.taskid = taskid;
  }
 
  @override
  public void run() {
   //设置队列大小10
   logger.error("当前crontask: " + this);
   dynamicblockingqueue queue = new dynamicblockingqueue(3);
   es.submit(() -> {
  while (!queue.isdone() || !queue.isempty()) {
   try {
    string content = queue.poll(500, timeunit.milliseconds);
    if (stringutils.isblank(content)) {
     return;
    }
    logger.info("dynamicblockingqueue 消费:" + content);
    timeunit.milliseconds.sleep(500);
   } catch (interruptedexception e) {
    e.printstacktrace();
   }
  }
   });
 
   //队列放入数据
   for (int i = 0; i < 5; ++i) {
  try {
   queue.put(string.valueof(i));
   logger.info("dynamicblockingqueue 生产:" + i);
  } catch (interruptedexception e) {
   e.printstacktrace();
  }
   }
   queue.setdone(true);
  }
 
  public string getexpression() {
   return expression;
  }
 
  public void setexpression(string expression) {
   this.expression = expression;
  }
 
  @override
  public string tostring() {
   return reflectiontostringbuilder.tostring(this
   , tostringstyle.json_style
   , false
   , false
   , timingtask.class);
  }
 
 }
 
 /**
  * 队列消费线程工厂类
  */
 private static class dynamictaskconsumethreadfactory implements threadfactory {
  private static final atomicinteger poolnumber = new atomicinteger(1);
  private final threadgroup group;
  private final atomicinteger threadnumber = new atomicinteger(1);
  private final string nameprefix;
 
  dynamictaskconsumethreadfactory() {
   securitymanager s = system.getsecuritymanager();
   group = (s != null) ? s.getthreadgroup() :
   thread.currentthread().getthreadgroup();
   nameprefix = "pool-" +
   poolnumber.getandincrement() +
   "-dynamic-task-";
  }
 
  @override
  public thread newthread(runnable r) {
   thread t = new thread(group, r,
   nameprefix + threadnumber.getandincrement(),
   0);
   if (t.isdaemon()) {
  t.setdaemon(false);
   }
   if (t.getpriority() != thread.norm_priority) {
  t.setpriority(thread.norm_priority);
   }
   return t;
  }
 }
 
 private static class dynamicblockingqueue extends linkedblockingqueue<string> {
  dynamicblockingqueue(int capacity) {
   super(capacity);
  }
 
 
  private volatile boolean done = false;
 
  public boolean isdone() {
   return done;
  }
 
  public void setdone(boolean done) {
   this.done = done;
  }
 }
}
1、taskconstants 动态任务列表
2、scheduledtaskregistrar#addtriggertask 添加动态周期定时任务,检测动态任务列表的变化
crontask task = new crontask(tt, expression);
scheduledfuture<?> future = registrar.getscheduler().schedule(task.getrunnable(), task.gettrigger());
crontasks.put(tt.gettaskid(), task);
scheduledfutures.put(tt.gettaskid(), future);
3、动态创建cron定时任务,拿到scheduledfuture实例并缓存起来
4、在刷新任务列表时,通过缓存的scheduledfuture实例和crontask实例,来决定是否取消、移除失效的动态定时任务。
vdynamictasktest 动态定时任务测试类
@runwith(springrunner.class)
@springboottest
public class dynamictasktest {
 
 @autowired
 private dynamictask dynamictask;
 
 @test
 public void test() throws interruptedexception {
  list<dynamictask.taskconstant> taskconstans = dynamictask.gettaskconstants();
  dynamictask.taskconstant taskconstant = new dynamictask.taskconstant();
  taskconstant.setcron("0/5 * * * * ?");
  taskconstant.settaskid("test1");
  taskconstans.add(taskconstant);
 
 
  dynamictask.taskconstant taskconstant1 = new dynamictask.taskconstant();
  taskconstant1.setcron("0/5 * * * * ?");
  taskconstant1.settaskid("test2");
  taskconstans.add(taskconstant1);
 
  dynamictask.taskconstant taskconstant2 = new dynamictask.taskconstant();
  taskconstant2.setcron("0/5 * * * * ?");
  taskconstant2.settaskid("test3");
  taskconstans.add(taskconstant2);
 
  timeunit.seconds.sleep(40);
  //移除并添加新的配置
  taskconstans.remove(taskconstans.size() - 1);
  dynamictask.taskconstant taskconstant3 = new dynamictask.taskconstant();
  taskconstant3.setcron("0/5 * * * * ?");
  taskconstant3.settaskid("test4");
  taskconstans.add(taskconstant3);
//
  timeunit.minutes.sleep(50);
 }
}
总结
以上就是这篇文章的全部内容了,希望本文的内容对大家的学习或者工作具有一定的参考学习价值,如果有疑问大家可以留言交流,谢谢大家对CodeAE代码之家的支持。
原文链接:https://www.cnblogs.com/hujunzheng/p/10353390.html

关注下面的标签,发现更多相似文章