ZooKeeper Master容错完成之后则重新由DolphinScheduler中Scheduler线程调度,遍历 DAG 找到“正在运行”和“提交成功”的任务,对“正在运行”的任务监控其任务实例的状态,对“提交成功”的任务需要判断Task Queue中是否已经存在,如果存在则同样监控任务实例的状态,如果不存在则重新提交任务实例。
#本地工作目录,用于存放临时文件
data.basedir.path=/tmp/dolphinscheduler
#资源文件存储类型: HDFS,S3,NONE
resource.storage.type=NONE
#资源文件存储路径
resource.upload.path=/dolphinscheduler
#hadoop是否开启kerberos权限
hadoop.security.authentication.startup.state=false
#kerberos配置目录
java.security.krb5.conf.path=/opt/krb5.conf
#kerberos登录用户
login.user.keytab.username=hdfs-mycluster@ESZ.COM
#kerberos登录用户keytab
login.user.keytab.path=/opt/hdfs.headless.keytab
#kerberos过期时间,整数,单位为小时
kerberos.expire.time=2
#如果存储类型为HDFS,需要配置拥有对应操作权限的用户
hdfs.root.user=hdfs
#请求地址如果resource.storage.type=S3,该值类似为: s3a://dolphinscheduler. 如果resource.storage.type=HDFS, 如果 hadoop 配置了 HA,需要复制core-site.xml 和 hdfs-site.xml 文件到conf目录
fs.defaultFS=hdfs://mycluster:8020
aws.access.key.id=minioadmin
aws.secret.access.key=minioadmin
aws.region=us-east-1
aws.endpoint=http://localhost:9000
# resourcemanager port, the default value is 8088 if not specified
resource.manager.httpaddress.port=8088
#yarn resourcemanager 地址, 如果resourcemanager开启了HA, 输入HA的IP地址(以逗号分隔),如果resourcemanager为单节点, 该值为空即可
yarn.resourcemanager.ha.rm.ids=192.168.xx.xx,192.168.xx.xx
#如果resourcemanager开启了HA或者没有使用resourcemanager,保持默认值即可. 如果resourcemanager为单节点,你需要将ds1 配置为resourcemanager对应的hostname
yarn.application.status.address=http://ds1:%s/ws/v1/cluster/apps/%s
# job history status url when application number threshold is reached(default 10000, maybe it was set to 1000)
yarn.job.history.status.address=http://ds1:19888/ws/v1/history/mapreduce/jobs/%s
# datasource encryption enable
datasource.encryption.enable=false
# datasource encryption salt
datasource.encryption.salt=!@#$%^&*
# data quality option
data-quality.jar.name=dolphinscheduler-data-quality-dev-SNAPSHOT.jar
#data-quality.error.output.path=/tmp/data-quality-error-data
# Network IP gets priority, default inner outer
# Whether hive SQL is executed in the same session
support.hive.oneSession=false
# use sudo or not, if set true, executing user is tenant user and deploy user needs sudo permissions; if set false, executing user is the deploy user and doesn't need sudo permissions
sudo.enable=true
# network interface preferred like eth0, default: empty
#dolphin.scheduler.network.interface.preferred=
# network IP gets priority, default: inner outer
#dolphin.scheduler.network.priority.strategy=default
# system env path
#dolphinscheduler.env.path=dolphinscheduler_env.sh
#是否处于开发模式
development.state=false
# rpc port
alert.rpc.port=50052
# Url endpoint for zeppelin RESTful API
zeppelin.rest.url=http://localhost:8080
dolphinscheduler-api application.yaml
server:
port: 12345
servlet:
session:
timeout: 120m
context-path: /dolphinscheduler/
compression:
enabled: true
mime-types: text/html,text/xml,text/plain,text/css,text/javascript,application/javascript,application/json,application/xml
jetty:
max-http-form-post-size: 5000000
spring:
application:
name: api-server
banner:
charset: UTF-8
jackson:
time-zone: UTC
date-format: "yyyy-MM-dd HH:mm:ss"
servlet:
multipart:
max-file-size: 1024MB
max-request-size: 1024MB
messages:
basename: i18n/messages
datasource:
# driver-class-name: org.postgresql.Driver
# url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
username: root
password: root
hikari:
connection-test-query: select 1
minimum-idle: 5
auto-commit: true
validation-timeout: 3000
pool-name: DolphinScheduler
maximum-pool-size: 50
connection-timeout: 30000
idle-timeout: 600000
leak-detection-threshold: 0
initialization-fail-timeout: 1
quartz:
auto-startup: false
job-store-type: jdbc
jdbc:
initialize-schema: never
properties:
org.quartz.threadPool:threadPriority: 5
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.scheduler.instanceId: AUTO
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.acquireTriggersWithinLock: true
org.quartz.scheduler.instanceName: DolphinScheduler
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.jobStore.useProperties: false
org.quartz.threadPool.makeThreadsDaemons: true
org.quartz.threadPool.threadCount: 25
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.scheduler.makeSchedulerThreadDaemon: true
# org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.clusterCheckinInterval: 5000
management:
endpoints:
web:
exposure:
include: '*'
metrics:
tags:
application: ${spring.application.name}
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
# connect-string: localhost:2181
connect-string: 10.255.158.70:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
audit:
enabled: false
metrics:
enabled: true
python-gateway:
# Weather enable python gateway server or not. The default value is true.
enabled: true
# The address of Python gateway server start. Set its value to `0.0.0.0` if your Python API run in different
# between Python gateway server. It could be be specific to other address like `127.0.0.1` or `localhost`
gateway-server-address: 0.0.0.0
# The port of Python gateway server start. Define which port you could connect to Python gateway server from
# Python API side.
gateway-server-port: 25333
# The address of Python callback client.
python-address: 127.0.0.1
# The port of Python callback client.
python-port: 25334
# Close connection of socket server if no other request accept after x milliseconds. Define value is (0 = infinite),
# and socket server would never close even though no requests accept
connect-timeout: 0
# Close each active connection of socket server if python program not active after x milliseconds. Define value is
# (0 = infinite), and socket server would never close even though no requests accept
read-timeout: 0
# Override by profile
---
spring:
config:
activate:
on-profile: mysql
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&characterEncoding=UTF-8
quartz:
properties:
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
dolphinscheduler-master application.yaml
spring:
banner:
charset: UTF-8
application:
name: master-server
jackson:
time-zone: UTC
date-format: "yyyy-MM-dd HH:mm:ss"
cache:
# default enable cache, you can disable by `type: none`
type: none
cache-names:
- tenant
- user
- processDefinition
- processTaskRelation
- taskDefinition
caffeine:
spec: maximumSize=100,expireAfterWrite=300s,recordStats
datasource:
#driver-class-name: org.postgresql.Driver
#url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
username: root
password:
hikari:
connection-test-query: select 1
minimum-idle: 5
auto-commit: true
validation-timeout: 3000
pool-name: DolphinScheduler
maximum-pool-size: 50
connection-timeout: 30000
idle-timeout: 600000
leak-detection-threshold: 0
initialization-fail-timeout: 1
quartz:
job-store-type: jdbc
jdbc:
initialize-schema: never
properties:
org.quartz.threadPool:threadPriority: 5
org.quartz.jobStore.isClustered: true
org.quartz.jobStore.class: org.quartz.impl.jdbcjobstore.JobStoreTX
org.quartz.scheduler.instanceId: AUTO
org.quartz.jobStore.tablePrefix: QRTZ_
org.quartz.jobStore.acquireTriggersWithinLock: true
org.quartz.scheduler.instanceName: DolphinScheduler
org.quartz.threadPool.class: org.quartz.simpl.SimpleThreadPool
org.quartz.jobStore.useProperties: false
org.quartz.threadPool.makeThreadsDaemons: true
org.quartz.threadPool.threadCount: 25
org.quartz.jobStore.misfireThreshold: 60000
org.quartz.scheduler.makeSchedulerThreadDaemon: true
# org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.PostgreSQLDelegate
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
org.quartz.jobStore.clusterCheckinInterval: 5000
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
# connect-string: localhost:2181
connect-string: 10.255.158.70:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
master:
listen-port: 5678
# master fetch command num
fetch-command-num: 10
# master prepare execute thread number to limit handle commands in parallel
pre-exec-threads: 10
# master execute thread number to limit process instances in parallel
exec-threads: 100
# master dispatch task number per batch
dispatch-task-number: 3
# master host selector to select a suitable worker, default value: LowerWeight. Optional values include random, round_robin, lower_weight
host-selector: lower_weight
# master heartbeat interval, the unit is second
heartbeat-interval: 10
# master commit task retry times
task-commit-retry-times: 5
# master commit task interval, the unit is millisecond
task-commit-interval: 1000
state-wheel-interval: 5
# master max cpuload avg, only higher than the system cpu load average, master server can schedule. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1
# master reserved memory, only lower than system available memory, master server can schedule. default value 0.3, the unit is G
reserved-memory: 0.3
# failover interval, the unit is minute
failover-interval: 10
# kill yarn jon when failover taskInstance, default true
kill-yarn-job-when-task-failover: true
server:
port: 5679
management:
endpoints:
web:
exposure:
include: '*'
metrics:
tags:
application: ${spring.application.name}
metrics:
enabled: true
# Override by profile
---
spring:
config:
activate:
on-profile: mysql
datasource:
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
quartz:
properties:
org.quartz.jobStore.driverDelegateClass: org.quartz.impl.jdbcjobstore.StdJDBCDelegate
dolphinscheduler-worker application.yaml
spring:
banner:
charset: UTF-8
application:
name: worker-server
jackson:
time-zone: UTC
date-format: "yyyy-MM-dd HH:mm:ss"
datasource:
#driver-class-name: org.postgresql.Driver
#url: jdbc:postgresql://127.0.0.1:5432/dolphinscheduler
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://127.0.0.1:3306/dolphinscheduler?useUnicode=true&serverTimezone=UTC&characterEncoding=UTF-8&autoReconnect=true&useSSL=false&zeroDateTimeBehavior=convertToNull
username: root
#password: root
password:
hikari:
connection-test-query: select 1
minimum-idle: 5
auto-commit: true
validation-timeout: 3000
pool-name: DolphinScheduler
maximum-pool-size: 50
connection-timeout: 30000
idle-timeout: 600000
leak-detection-threshold: 0
initialization-fail-timeout: 1
registry:
type: zookeeper
zookeeper:
namespace: dolphinscheduler
# connect-string: localhost:2181
connect-string: 10.255.158.70:2181
retry-policy:
base-sleep-time: 60ms
max-sleep: 300ms
max-retries: 5
session-timeout: 30s
connection-timeout: 9s
block-until-connected: 600ms
digest: ~
worker:
# worker listener port
listen-port: 1234
# worker execute thread number to limit task instances in parallel
exec-threads: 100
# worker heartbeat interval, the unit is second
heartbeat-interval: 10
# worker host weight to dispatch tasks, default value 100
host-weight: 100
# worker tenant auto create
tenant-auto-create: true
# worker max cpuload avg, only higher than the system cpu load average, worker server can be dispatched tasks. default value -1: the number of cpu cores * 2
max-cpu-load-avg: -1
# worker reserved memory, only lower than system available memory, worker server can be dispatched tasks. default value 0.3, the unit is G
reserved-memory: 0.3
# default worker groups separated by comma, like 'worker.groups=default,test'
groups:
- default
# alert server listen host
alert-listen-host: localhost
alert-listen-port: 50052
server:
port: 1235
management:
endpoints:
web:
exposure:
include: '*'
metrics:
tags:
application: ${spring.application.name}
metrics:
enabled: true
public Map<String, Object> setScheduleState(User loginUser, long projectCode, Integer id, ReleaseState scheduleStatus) { Map<String, Object> result = new HashMap<>();
Project project = projectMapper.queryByCode(projectCode); // check project auth boolean hasProjectAndPerm = projectService.hasProjectAndPerm(loginUser, project, result); if (!hasProjectAndPerm) { return result; }
// check schedule exists Schedule scheduleObj = scheduleMapper.selectById(id);
if (scheduleObj == null) { putMsg(result, Status.SCHEDULE_CRON_NOT_EXISTS, id); return result; } // check schedule release state if (scheduleObj.getReleaseState() == scheduleStatus) { logger.info("schedule release is already {},needn't to change schedule id: {} from {} to {}", scheduleObj.getReleaseState(), scheduleObj.getId(), scheduleObj.getReleaseState(), scheduleStatus); putMsg(result, Status.SCHEDULE_CRON_REALEASE_NEED_NOT_CHANGE, scheduleStatus); return result; } ProcessDefinition processDefinition = processDefinitionMapper.queryByCode(scheduleObj.getProcessDefinitionCode()); if (processDefinition == null || projectCode != processDefinition.getProjectCode()) { putMsg(result, Status.PROCESS_DEFINE_NOT_EXIST, String.valueOf(scheduleObj.getProcessDefinitionCode())); return result; } List<ProcessTaskRelation> processTaskRelations = processTaskRelationMapper.queryByProcessCode(projectCode, scheduleObj.getProcessDefinitionCode()); if (processTaskRelations.isEmpty()) { putMsg(result, Status.PROCESS_DAG_IS_EMPTY); return result; } if (scheduleStatus == ReleaseState.ONLINE) { // check process definition release state if (processDefinition.getReleaseState() != ReleaseState.ONLINE) { logger.info("not release process definition id: {} , name : {}", processDefinition.getId(), processDefinition.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, processDefinition.getName()); return result; } // check sub process definition release state List<Long> subProcessDefineCodes = new ArrayList<>(); processService.recurseFindSubProcess(processDefinition.getCode(), subProcessDefineCodes); if (!subProcessDefineCodes.isEmpty()) { List<ProcessDefinition> subProcessDefinitionList = processDefinitionMapper.queryByCodes(subProcessDefineCodes); if (subProcessDefinitionList != null && !subProcessDefinitionList.isEmpty()) { for (ProcessDefinition subProcessDefinition : subProcessDefinitionList) { /** * if there is no online process, exit directly */ if (subProcessDefinition.getReleaseState() != ReleaseState.ONLINE) { logger.info("not release process definition id: {} , name : {}", subProcessDefinition.getId(), subProcessDefinition.getName()); putMsg(result, Status.PROCESS_DEFINE_NOT_RELEASE, String.valueOf(subProcessDefinition.getId())); return result; } } } } }
// check master server exists List<Server> masterServers = monitorService.getServerListFromRegistry(true);
if (masterServers.isEmpty()) { putMsg(result, Status.MASTER_NOT_EXISTS); return result; }
// set status scheduleObj.setReleaseState(scheduleStatus);
scheduleMapper.updateById(scheduleObj);
try { switch (scheduleStatus) { case ONLINE: logger.info("Call master client set schedule online, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers); setSchedule(project.getId(), scheduleObj); break; case OFFLINE: logger.info("Call master client set schedule offline, project id: {}, flow id: {},host: {}", project.getId(), processDefinition.getId(), masterServers); deleteSchedule(project.getId(), id); break; default: putMsg(result, Status.SCHEDULE_STATUS_UNKNOWN, scheduleStatus.toString()); return result; } } catch (Exception e) { result.put(Constants.MSG, scheduleStatus == ReleaseState.ONLINE ? "set online failure" : "set offline failure"); throw new ServiceException(result.get(Constants.MSG).toString(), e); }
putMsg(result, Status.SUCCESS); return result; }
public void setSchedule(int projectId, Schedule schedule) {
logger.info("set schedule, project id: {}, scheduleId: {}", projectId, schedule.getId());
quartzExecutor.addJob(ProcessScheduleJob.class, projectId, schedule);
}
public void addJob(Class<? extends Job> clazz, int projectId, final Schedule schedule) {
String jobName = this.buildJobName(schedule.getId());
String jobGroupName = this.buildJobGroupName(projectId);
Map<String, Object> jobDataMap = this.buildDataMap(projectId, schedule);
String cronExpression = schedule.getCrontab();
String timezoneId = schedule.getTimezoneId();
/**
* transform from server default timezone to schedule timezone
* e.g. server default timezone is `UTC`
* user set a schedule with startTime `2022-04-28 10:00:00`, timezone is `Asia/Shanghai`,
* api skip to transform it and save into databases directly, startTime `2022-04-28 10:00:00`, timezone is `UTC`, which actually added 8 hours,
* so when add job to quartz, it should recover by transform timezone
*/
Date startDate = DateUtils.transformTimezoneDate(schedule.getStartTime(), timezoneId);
Date endDate = DateUtils.transformTimezoneDate(schedule.getEndTime(), timezoneId);
lock.writeLock().lock();
try {
JobKey jobKey = new JobKey(jobName, jobGroupName);
JobDetail jobDetail;
//add a task (if this task already exists, return this task directly)
if (scheduler.checkExists(jobKey)) {
jobDetail = scheduler.getJobDetail(jobKey);
jobDetail.getJobDataMap().putAll(jobDataMap);
} else {
jobDetail = newJob(clazz).withIdentity(jobKey).build();
jobDetail.getJobDataMap().putAll(jobDataMap);
scheduler.addJob(jobDetail, false, true);
logger.info("Add job, job name: {}, group name: {}",
jobName, jobGroupName);
}
TriggerKey triggerKey = new TriggerKey(jobName, jobGroupName);
/*
* Instructs the Scheduler that upon a mis-fire
* situation, the CronTrigger wants to have it's
* next-fire-time updated to the next time in the schedule after the
* current time (taking into account any associated Calendar),
* but it does not want to be fired now.
*/
CronTrigger cronTrigger = newTrigger()
.withIdentity(triggerKey)
.startAt(startDate)
.endAt(endDate)
.withSchedule(
cronSchedule(cronExpression)
.withMisfireHandlingInstructionDoNothing()
.inTimeZone(DateUtils.getTimezone(timezoneId))
)
.forJob(jobDetail).build();
if (scheduler.checkExists(triggerKey)) {
// updateProcessInstance scheduler trigger when scheduler cycle changes
CronTrigger oldCronTrigger = (CronTrigger) scheduler.getTrigger(triggerKey);
String oldCronExpression = oldCronTrigger.getCronExpression();
if (!StringUtils.equalsIgnoreCase(cronExpression, oldCronExpression)) {
// reschedule job trigger
scheduler.rescheduleJob(triggerKey, cronTrigger);
logger.info("reschedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
jobName, jobGroupName, cronExpression, startDate, endDate);
}
} else {
scheduler.scheduleJob(cronTrigger);
logger.info("schedule job trigger, triggerName: {}, triggerGroupName: {}, cronExpression: {}, startDate: {}, endDate: {}",
jobName, jobGroupName, cronExpression, startDate, endDate);
}
} catch (Exception e) {
throw new ServiceException("add job failed", e);
} finally {
lock.writeLock().unlock();
}
}
public TaskCallbackService() {
final NettyClientConfig clientConfig = new NettyClientConfig();
this.nettyRemotingClient = new NettyRemotingClient(clientConfig);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RUNNING_ACK, taskExecuteRunningProcessor);
this.nettyRemotingClient.registerProcessor(CommandType.TASK_EXECUTE_RESPONSE_ACK, taskExecuteResponseAckProcessor);
}