这篇文章主要介绍了java 注解实现一个可配置线程池的方法示例,小编觉得挺不错的,现在分享给大家,也给大家做个参考。一起跟随小编过来看看吧
前言
项目需要多线程执行一些task,为了方便各个服务的使用。特意封装了一个公共工具类,下面直接撸代码:
poolconfig(线程池核心配置参数): /**
* <h1>线程池核心配置(<b>基本线程池数量、最大线程池数量、队列初始容量、线程连接保持活动秒数(默认60s)</b>)</h1>
*
* <blockquote><code>
* <table border="1px" width="100%"><tbody>
* <tr><th>
* 属性名称
* </th><th>
* 属性含义
* </th></tr>
* <tr><td>
* queuecapacity
* </td><td>
* 基本线程池数量
* </td></tr>
* <tr><td>
* count
* </td><td>
* 最大线程池数量
* </td></tr>
* <tr><td>
* maxcount
* </td><td>
* 队列初始容量
* </td></tr>
* <tr><td>
* alivesec
* </td><td>
* 线程连接保持活动秒数(默认60s)
* </td></tr>
* </tbody></table>
* </code></blockquote>
*/
public class poolconfig {
private int queuecapacity = 200;
private int count = 0;
private int maxcount = 0;
private int alivesec;
public int getqueuecapacity() {
return queuecapacity;
}
public void setqueuecapacity(int queuecapacity) {
this.queuecapacity = queuecapacity;
}
public void setcount(int count) {
this.count = count;
}
public void setmaxcount(int maxcount) {
this.maxcount = maxcount;
}
public void setalivesec(int alivesec) {
this.alivesec = alivesec;
}
public int getcount() {
return count;
}
public int getmaxcount() {
return maxcount;
}
public int getalivesec() {
return alivesec;
}
}
threadpoolconfig(线程池配置 yml配置项以thread开头):import java.util.arraylist;
import java.util.hashmap;
import java.util.list;
import java.util.map;
import org.springframework.boot.context.properties.configurationproperties;
import org.springframework.stereotype.component;
/**
* <h1>线程池配置(<b>线程池核心配置、各个业务处理的任务数量</b>)</h1>
*
* <blockquote><code>
* <table border="1px" width="100%"><tbody>
* <tr><th>
* 属性名称
* </th><th>
* 属性含义
* </th></tr>
* <tr><td>
* pool
* </td><td>
* 线程池核心配置
* 【{@link poolconfig}】
* </td></tr>
* <tr><td>
* count
* </td><td>
* 线程池各个业务任务初始的任务数
* </td></tr>
* </tbody></table>
* </code></blockquote>
*/
@component
@configurationproperties(prefix="thread")
public class threadpoolconfig {
private poolconfig pool = new poolconfig();
map<string, integer> count = new hashmap<>();
public poolconfig getpool() {
return pool;
}
public void setpool(poolconfig pool) {
this.pool = pool;
}
public map<string, integer> getcount() {
return count;
}
}
定义task注解,方便使用:@target(elementtype.type)
@retention(retentionpolicy.runtime)
@documented
@component
public @interface excutortask {
/**
* the value may indicate a suggestion for a logical excutortask name,
* to be turned into a spring bean in case of an autodetected excutortask .
* @return the suggested excutortask name, if any
*/
string value() default "";
}
通过反射获取使用task注解的任务集合:public class beans {
private static final char prefix = '.';
public static concurrentmap<string, string> scanbeanclassnames(){
concurrentmap<string, string> beanclassnames = new concurrenthashmap<>();
classpathscanningcandidatecomponentprovider provider = new classpathscanningcandidatecomponentprovider(false);
provider.addincludefilter(new annotationtypefilter(excutortask.class));
for(package pkg : package.getpackages()){
string basepackage = pkg.getname();
set<beandefinition> components = provider.findcandidatecomponents(basepackage);
for (beandefinition component : components) {
string beanclassname = component.getbeanclassname();
try {
class<?> clazz = class.forname(component.getbeanclassname());
boolean isannotationpresent = clazz.isannotationpresent(zimatask.class);
if(isannotationpresent){
zimatask task = clazz.getannotation(excutortask.class);
string aliasname = task.value();
if(aliasname != null && !"".equals(aliasname)){
beanclassnames.put(aliasname, component.getbeanclassname());
}
}
} catch (classnotfoundexception e) {
e.printstacktrace();
}
beanclassnames.put(beanclassname.substring(beanclassname.lastindexof(prefix) + 1), component.getbeanclassname());
}
}
return beanclassnames;
}
}
线程执行类taskpool:@component
public class taskpool {
public threadpooltaskexecutor pooltaskexecutor;
@autowired
private threadpoolconfig threadpoolconfig;
@autowired
private applicationcontext context;
private final integer max_pool_size = 2000;
private poolconfig poolcfg;
private map<string, integer> taskscount;
private concurrentmap<string, string> beanclassnames;
@postconstruct
public void init() {
beanclassnames = beans.scanbeanclassnames();
pooltaskexecutor = new threadpooltaskexecutor();
poolcfg = threadpoolconfig.getpool();
taskscount = threadpoolconfig.getcount();
int corepoolsize = poolcfg.getcount(),
maxpoolsize = poolcfg.getmaxcount(),
queuecapacity = poolcfg.getqueuecapacity(),
minpoolsize = 0, maxcount = (corepoolsize << 1);
for(string taskname : taskscount.keyset()){
minpoolsize += taskscount.get(taskname);
}
if(corepoolsize > 0){
if(corepoolsize <= minpoolsize){
corepoolsize = minpoolsize;
}
}else{
corepoolsize = minpoolsize;
}
if(queuecapacity > 0){
pooltaskexecutor.setqueuecapacity(queuecapacity);
}
if(corepoolsize > 0){
if(max_pool_size < corepoolsize){
corepoolsize = max_pool_size;
}
pooltaskexecutor.setcorepoolsize(corepoolsize);
}
if(maxpoolsize > 0){
if(maxpoolsize <= maxcount){
maxpoolsize = maxcount;
}
if(max_pool_size < maxpoolsize){
maxpoolsize = max_pool_size;
}
pooltaskexecutor.setmaxpoolsize(maxpoolsize);
}
if(poolcfg.getalivesec() > 0){
pooltaskexecutor.setkeepaliveseconds(poolcfg.getalivesec());
}
pooltaskexecutor.initialize();
}
public void execute(class<?>... clazz){
int i = 0, len = taskscount.size();
for(; i < len; i++){
integer taskcount = taskscount.get(i);
for(int t = 0; t < taskcount; t++){
try{
object taskobj = context.getbean(clazz[i]);
if(taskobj != null){
pooltaskexecutor.execute((runnable) taskobj);
}
}catch(exception ex){
ex.printstacktrace();
}
}
}
}
public void execute(string... args){
int i = 0, len = taskscount.size();
for(; i < len; i++){
integer taskcount = taskscount.get(i);
for(int t = 0; t < taskcount; t++){
try{
object taskobj = null;
if(context.containsbean(args[i])){
taskobj = context.getbean(args[i]);
}else{
if(beanclassnames.containskey(args[i].tolowercase())){
class<?> clazz = class.forname(beanclassnames.get(args[i].tolowercase()));
taskobj = context.getbean(clazz);
}
}
if(taskobj != null){
pooltaskexecutor.execute((runnable) taskobj);
}
}catch(exception ex){
ex.printstacktrace();
}
}
}
}
public void execute(){
for(string taskname : taskscount.keyset()){
integer taskcount = taskscount.get(taskname);
for(int t = 0; t < taskcount; t++){
try{
object taskobj = null;
if(context.containsbean(taskname)){
taskobj = context.getbean(taskname);
}else{
if(beanclassnames.containskey(taskname)){
class<?> clazz = class.forname(beanclassnames.get(taskname));
taskobj = context.getbean(clazz);
}
}
if(taskobj != null){
pooltaskexecutor.execute((runnable) taskobj);
}
}catch(exception ex){
ex.printstacktrace();
}
}
}
}
}
如何使用?(做事就要做全套 ^_^)
1.因为使用的springboot项目,需要在application.properties 或者 application.yml 添加#配置执行的task线程数
thread.count.needexcutortask=4
#最大存活时间
thread.pool.alivesec=300000
#其他配置同理
2.将我们写的线程配置进行装载到我们的项目中@configuration
public class taskmanager {
@resource
private taskpool taskpool;
@postconstruct
public void executor(){
taskpool.execute();
}
}
3.具体使用@excutortask
public class needexcutortask implements runnable{
@override
public void run() {
thread.sleep(1000l);
log.info("====== 任务执行 =====")
}
}
以上就是创建一个可扩展的线程池相关的配置(望指教~~~)。希望对大家的学习有所帮助,也希望大家多多支持CodeAE代码之家 。
原文链接:https://blog.csdn.net/u011663149/article/details/86497456