分布式定时任务 xxl-jb
20.1.简介
xxl-job 是一个分布式的定时任务执行框架,有调度中心和执行器两部分组成 调度中心:负责定时任务的调用 执行器:服务定时任务的执行 本篇文章介绍如何基于 tio-boot 创建一个执行器项目并注册的调度中心
20.2.安装 mysql
20.2.1.docker-run-mysql-8
docker pull mysql/mysql-server:8.0.32
docker run --restart=always -d --name mysql_8 --hostname mysql \
-p 3306:3306 \
-e 'MYSQL_ROOT_PASSWORD=robot_123456#' -e 'MYSQL_ROOT_HOST=%' -e 'MYSQL_DATABASE=robot_ex' \
mysql/mysql-server:8.0.32 \
--character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --lower_case_table_names=1
使用命令连接
mysql -uroot -p"robot_123456#" -h127.0.0.1 -P3306
20.3.服务端-任务调度中心 xxl-job-admin 部署
20.3.1.解压文件
下载地址 https://github.com/litongjava/java-packge/releases/download/v1.0.0/xxl-job-admin-2.4.0-SNAPSHOT.tar.gz
mkdir /opt/pakcage/xxl-job-admin -p
cd /opt/pakcage/xxl-job-admin
#upload xxl-job-admin-2.4.0-SNAPSHOT.tar.gz here
mkdir /opt/xxl-job
tar -xf xxl-job-admin-2.4.0-SNAPSHOT.tar.gz -C /opt/xxl-job
cd /opt/xxl-job/xxl-job-admin-2.4.0-SNAPSHOT
20.3.2.初始化数据
创建数据库 xxl_job 数据库 创建 xxl_job 用户 授权 xxl_job 用户拥有 xxl_job 的所有权限
mysql 8
CREATE DATABASE xxl_job;
CREATE USER 'xxl_job'@'%' IDENTIFIED WITH MYSQL_NATIVE_PASSWORD BY 'Litong@2023';
GRANT all privileges ON xxl_job.* TO 'xxl_job'@'%';
FLUSH PRIVILEGES;
如果是 mysql 8 创建用户的语句如下
使用初始化脚本为新数据库创建数据表,将 tables_xxl_job.sql 导入到数据库
mysql -uxxl_job -pLitong@2023 -h127.0.0.1 -Dxxl_job<db/tables_xxl_job.sql
tables_xxl_job.sql 在解压后的 db 目录下
20.3.3.配置数据库连接
vi resources/application.properties
spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=xxl_job
spring.datasource.password=Litong@2023
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver
20.3.4.启动
启动调度中心
cd /opt/xxl-job/xxl-job-admin-2.4.0-SNAPSHOT
./spring-boot-v1.2.sh restart
查看日志
tail -f logs/spring-boot-v1.2.sh.log
测试访问 http://127.0.0.1:10407/xxl-job-admin/ 默认的用户名和密码是 admin/123456
20.3.5.设置防火墙
20.3.5.1.firewalld
设置入栈规则
firewall-cmd --zone=public --add-port=10407/tcp --permanent
firewall-cmd --reload
20.3.6.设置为开机自启动
20.3.6.1.CentOS-7
vi /lib/systemd/system/xxl-job-admin.service xxl-job-admin.service 内容如下
[Unit]
Description=xxl-job-admin
After=network.target network-online.target syslog.target
Wants=network.target network-online.target
[Service]
ExecStart=/opt/xxl-job/xxl-job-admin-2.4.0-SNAPSHOT/spring-boot-v1.2.sh start
ExecStop=/opt/xxl-job/xxl-job-admin-2.4.0-SNAPSHOT/spring-boot-v1.2.sh stop
ExecReload=/opt/xxl-job/xxl-job-admin-2.4.0-SNAPSHOT/spring-boot-v1.2.sh restart
Type=forking
PrivateTmp=true
[Install]
WantedBy=multi-user.target
启动,开机自启,查看状态
systemctl start xxl-job-admin
systemctl enable xxl-job-admin
systemctl status xxl-job-admin
20.3.7.配置 nginx
location /xxl-job-admin {
auth_basic "Please input password";
auth_basic_user_file /etc/nginx/passwd;
proxy_pass http://127.0.0.1:10407;
proxy_http_version 1.1;
proxy_read_timeout 300;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "upgrade";
proxy_set_header Host $host:$server_port;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Real-PORT $remote_port;
}
20.4.客户端 xxl-job 执行器开发
20.4.1.添加依赖
<!-- xxl-rpc-core -->
<!-- https://mvnrepository.com/artifact/com.xuxueli/xxl-job-core -->
<dependency>
<groupId>com.xuxueli</groupId>
<artifactId>xxl-job-core</artifactId>
<version>2.4.0</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.90.Final</version>
</dependency>
xxl-job-core 依赖 4.1.90.Final 版本,为了防止和其他依赖的 netty 冲突,这里手动指定组件版本
20.4.2.2.添加配置文件
src\main\resources\xxl-job-executor.properties 添加配置文件指定 xxl.job.admin.addresses 为你的任务调度中心地址
### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:10407/xxl-job-admin
### xxl-job, access token
xxl.job.accessToken=default_token
### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9998
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30
20.4.3.添加执行器配置类 XxlJobExecutorConfig
package com.litongjava.tio.boot.hello.config;
import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;
import com.litongjava.jfinal.aop.annotation.ABean;
import com.litongjava.tio.boot.hello.job.MyJobHandler;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.executor.impl.XxlJobSimpleExecutor;
import lombok.extern.slf4j.Slf4j;
@Slf4j
//@Configuration
public class XxlJobExecutorConfig {
@ABean(destroyMethod = "destroy")
public XxlJobSimpleExecutor xxlJobSimpleExecutor() {
// load executor prop
Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
// init executor
XxlJobSimpleExecutor xxlJobExecutor = new XxlJobSimpleExecutor();
xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));
// registry job bean
// xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SyncXxlJob()));
XxlJobExecutor.registJobHandler("my_job", new MyJobHandler());
// start executor
try {
xxlJobExecutor.start();
} catch (Exception e) {
log.error(e.getMessage(), e);
}
return xxlJobExecutor;
}
public static Properties loadProperties(String propertyFileName) {
InputStreamReader in = null;
try {
ClassLoader loder = Thread.currentThread().getContextClassLoader();
in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");
if (in != null) {
Properties prop = new Properties();
prop.load(in);
return prop;
}
} catch (IOException e) {
log.error("load {} error!", propertyFileName);
} finally {
if (in != null) {
try {
in.close();
} catch (IOException e) {
log.error("close {} error!", propertyFileName);
}
}
}
return null;
}
}
这段 Java 代码是用于在使用 tio-boot 框架的项目中设置 XXL-Job 执行器的配置类。以下是代码的详细解释:
类定义
- 类
XxlJobExecutorConfig
被注解@Slf4j
标记,这是 Lombok 的注解,用于注入日志记录器;@AConfiguration
是 tio-boot 的注解,表示这个类用于配置目的。
Bean 定义
- 类中有一个名为
xxlJobSimpleExecutor()
的方法,该方法被@ABean(destroyMethod = "destroy")
注解标记。这表明该方法返回一个 Bean(由 tio-boot 框架管理的对象),destroyMethod = "destroy"
指定当应用上下文关闭时要调用的方法。
加载 XXL-Job 属性
- 方法
xxlJobSimpleExecutor()
首先从名为xxl-job-executor.properties
的文件加载属性。这通过loadProperties()
方法实现,该方法读取属性文件并加载配置。
初始化执行器
- 创建了一个
XxlJobSimpleExecutor
的实例。 - 设置了执行器的属性,如管理员地址、访问令牌、应用名称、地址、IP、端口、日志路径和日志保留天数,这些都基于加载的属性。
注册作业处理器
- 通过调用
XxlJobExecutor.registJobHandler("my_job", new MyJobHandler())
配置了执行器的作业处理器。这样注册了一个名为"my_job"
的作业处理器,它是MyJobHandler
的一个实例。MyJobHandler
类应该定义当触发此处理器的作业时要执行的逻辑。
启动执行器
- 使用
xxlJobExecutor.start()
启动了执行器。此调用可能抛出异常,异常被捕获并记录。
错误处理
loadProperties()
方法包含了在加载属性文件过程中的错误处理。
总结
这个配置类为处理分布式任务执行框架中的作业设置了 XXL-Job 执行器。它涉及加载配置属性、用这些属性初始化执行器、注册作业处理器,以及处理这些过程中可能出现的任何潜在错误。这种设置对于将 XXL-Job 集成到基于 tio-boot 的应用程序中至关重要,使其能够有效执行分布式的定时任务。 在执行器配类类中已经使用 registJobHandler 添加类执行任务
20.4.4.添加 job
MyJobHandler 是具体的执行任务,代码如下
package com.litongjava.tio.boot.hello.job;
import com.xxl.job.core.handler.IJobHandler;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class MyJobHandler extends IJobHandler {
@Override
public void execute() throws Exception {
log.info("执行代码");
}
}
这段代码是用于定义一个 XXL-Job 的作业处理器类,名为 MyJobHandler
。以下是该代码的详细解释:
导入
- 导入了 XXL-Job 的核心处理器接口
com.xxl.job.core.handler.IJobHandler
,用于实现自定义的任务逻辑。 - 导入了 Lombok 的
@Slf4j
注解,用于提供日志记录功能。
类定义
MyJobHandler
类继承自IJobHandler
,这意味着它必须实现IJobHandler
接口中定义的方法。IJobHandler
是 XXL-Job 框架中用于定义任务执行逻辑的核心接口。- 类使用了
@Slf4j
注解,这会在类中自动创建一个日志对象log
,用于记录日志。
方法实现
- 类中重写了
execute()
方法。这是任务执行的核心方法,当任务触发时,XXL-Job 框架将调用此方法。 execute()
方法中的代码是执行任务时的具体逻辑。在这个例子中,它仅包含一个日志记录操作,即log.info("执行代码")
。这条日志表示任务已被触发并正在执行。在实际应用中,这里可以填充具体的业务逻辑。
异常处理
execute()
方法声明抛出了Exception
异常。这意味着在执行任务过程中如果遇到任何异常,都可以通过抛出异常的方式进行处理。XXL-Job 框架将捕获并处理这些异常。
总结
MyJobHandler
类是一个 XXL-Job 任务处理器的实现,用于定义当任务被调度时需要执行的具体逻辑。这个简单的例子中,它只是记录了一条日志,但在实际应用中,可以在这里实现任何需要定时执行的业务逻辑。通过继承 IJobHandler
并重写 execute()
方法,开发者可以自定义任务执行的行为。
20.4.5.启动服务
启动 tio-boot,出现下面的日志表示连接 xxl-job-admin 成功,启动成功,客户端会启动 9998 用于和 xxl-job-admin 进行通讯
2023-12-16 00:57:01.056 [Thread-5] INFO EmbedServer.run:82 - >>>>>>>>>>> xxl-job remoting server start success, nettype = class com.xxl.job.core.server.EmbedServer, port = 9998
在浏览器上测试访问http://127.0.0.1:9998/返回下面的信息说明执行器启动成功
{"code":500,"msg":"invalid request, HttpMethod not support."}
20.4.5.在调度中心中添加定时任务并执行
将上面的配置后,执行器项目在启动后会注册到调度中心,但是定时任务仍然需要手动添加, 登录调度中心-->任务管理-->新增-->
基础配置 执行器 示例执行器 任务描述 my_job 负责人 Tong Li 报警邮件 可以为空
调度配置 调度类型 CRON Cron* 0/1****?
任务配置 运行模式 BEAN JobHandler* my_job 设置为 XxlJobExecutor.registJobHandler 是的名称 任务参数 请输入任务参数
高级配置 路由策略 第一个 子任务 ID 可以为空 调度过期策略 忽略 阻塞处理策略 单机串行 任务超时时间 0 失败重试次数 0
执行任务 添加完成后选择操作-->启动,在执行 "操作"-->"执行一次" 点击"执行一次"之后任务调动中心会执行器发送 Post 请求,执行器手动请求后会执行对于的任务
20.5.手动触发执行任务
你可以手动想执行器发送 post 请求触发执行任务,发送的消息格式如下
POST /run HTTP/1.1
Content-Type: application/json;charset=UTF-8
Accept-Charset: application/json;charset=UTF-8
XXL-JOB-ACCESS-TOKEN: default_token
Cache-Control: no-cache
Pragma: no-cache
User-Agent: Java/1.8.0_211
Host: 192.168.3.8:9998
Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
Connection: keep-alive
Content-Length: 265
{"jobId":2,"executorHandler":"my_job","executorParams":"","executorBlockStrategy":"SERIAL_EXECUTION","executorTimeout":0,"logId":1357,"logDateTime":1702727731010,"glueType":"BEAN","glueSource":"","glueUpdatetime":1702724718000,"broadcastIndex":0,"broadcastTotal":1}
推荐使用使用我开发的ide-rest-client发送 http 请求,将上面内容新建一个后缀名为.http 文件,使用 ide-rest-client 打开即可发送