分布式定时任务 xxl-jb

20.1.简介

xxl-job 是一个分布式的定时任务执行框架,有调度中心和执行器两部分组成 调度中心:负责定时任务的调用 执行器:服务定时任务的执行 本篇文章介绍如何基于 tio-boot 创建一个执行器项目并注册的调度中心

20.2.安装 mysql

20.2.1.docker-run-mysql-8

docker pull mysql/mysql-server:8.0.32
docker run --restart=always -d --name mysql_8 --hostname mysql \
-p 3306:3306 \
-e 'MYSQL_ROOT_PASSWORD=robot_123456#' -e 'MYSQL_ROOT_HOST=%' -e 'MYSQL_DATABASE=robot_ex' \
mysql/mysql-server:8.0.32 \
--character-set-server=utf8mb4 --collation-server=utf8mb4_unicode_ci --lower_case_table_names=1

使用命令连接

mysql -uroot -p"robot_123456#" -h127.0.0.1 -P3306

20.3.服务端-任务调度中心 xxl-job-admin 部署

20.3.1.解压文件

下载地址 https://github.com/litongjava/java-packge/releases/download/v1.0.0/xxl-job-admin-2.4.0-SNAPSHOT.tar.gz

mkdir /opt/pakcage/xxl-job-admin -p
cd /opt/pakcage/xxl-job-admin
#upload xxl-job-admin-2.4.0-SNAPSHOT.tar.gz here
mkdir /opt/xxl-job
tar -xf xxl-job-admin-2.4.0-SNAPSHOT.tar.gz -C /opt/xxl-job
cd /opt/xxl-job/xxl-job-admin-2.4.0-SNAPSHOT

20.3.2.初始化数据

创建数据库 xxl_job 数据库 创建 xxl_job 用户 授权 xxl_job 用户拥有 xxl_job 的所有权限

mysql 8

CREATE DATABASE xxl_job;
CREATE USER 'xxl_job'@'%' IDENTIFIED WITH MYSQL_NATIVE_PASSWORD BY 'Litong@2023';
GRANT all privileges ON xxl_job.* TO 'xxl_job'@'%';
FLUSH PRIVILEGES;

如果是 mysql 8 创建用户的语句如下

使用初始化脚本为新数据库创建数据表,将 tables_xxl_job.sql 导入到数据库

mysql -uxxl_job -pLitong@2023 -h127.0.0.1 -Dxxl_job<db/tables_xxl_job.sql

tables_xxl_job.sql 在解压后的 db 目录下

20.3.3.配置数据库连接

vi resources/application.properties

spring.datasource.url=jdbc:mysql://127.0.0.1:3306/xxl_job?useUnicode=true&characterEncoding=UTF-8&autoReconnect=true&serverTimezone=Asia/Shanghai
spring.datasource.username=xxl_job
spring.datasource.password=Litong@2023
spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver

20.3.4.启动

启动调度中心

cd /opt/xxl-job/xxl-job-admin-2.4.0-SNAPSHOT
./spring-boot-v1.2.sh restart

查看日志

tail -f logs/spring-boot-v1.2.sh.log

测试访问 http://127.0.0.1:10407/xxl-job-admin/ 默认的用户名和密码是 admin/123456

20.3.5.设置防火墙

20.3.5.1.firewalld

设置入栈规则

firewall-cmd --zone=public --add-port=10407/tcp --permanent
firewall-cmd --reload

20.3.6.设置为开机自启动

20.3.6.1.CentOS-7

vi /lib/systemd/system/xxl-job-admin.service xxl-job-admin.service 内容如下

[Unit]
Description=xxl-job-admin
After=network.target network-online.target syslog.target
Wants=network.target network-online.target

[Service]
ExecStart=/opt/xxl-job/xxl-job-admin-2.4.0-SNAPSHOT/spring-boot-v1.2.sh start
ExecStop=/opt/xxl-job/xxl-job-admin-2.4.0-SNAPSHOT/spring-boot-v1.2.sh stop
ExecReload=/opt/xxl-job/xxl-job-admin-2.4.0-SNAPSHOT/spring-boot-v1.2.sh restart
Type=forking
PrivateTmp=true

[Install]
WantedBy=multi-user.target

启动,开机自启,查看状态

systemctl start xxl-job-admin
systemctl enable xxl-job-admin
systemctl status xxl-job-admin

20.3.7.配置 nginx

  location /xxl-job-admin {
    auth_basic "Please input password";
    auth_basic_user_file /etc/nginx/passwd;
    proxy_pass http://127.0.0.1:10407;
    proxy_http_version 1.1;
    proxy_read_timeout 300;
    proxy_set_header Upgrade $http_upgrade;
    proxy_set_header Connection "upgrade";
    proxy_set_header Host $host:$server_port;
    proxy_set_header X-Real-IP $remote_addr;
    proxy_set_header X-Real-PORT $remote_port;
  }

20.4.客户端 xxl-job 执行器开发

20.4.1.添加依赖

<!-- xxl-rpc-core -->
<!-- https://mvnrepository.com/artifact/com.xuxueli/xxl-job-core -->
<dependency>
  <groupId>com.xuxueli</groupId>
  <artifactId>xxl-job-core</artifactId>
  <version>2.4.0</version>
</dependency>

<dependency>
  <groupId>io.netty</groupId>
  <artifactId>netty-all</artifactId>
  <version>4.1.90.Final</version>
</dependency>

xxl-job-core 依赖 4.1.90.Final 版本,为了防止和其他依赖的 netty 冲突,这里手动指定组件版本

20.4.2.2.添加配置文件

src\main\resources\xxl-job-executor.properties 添加配置文件指定 xxl.job.admin.addresses 为你的任务调度中心地址

### xxl-job admin address list, such as "http://address" or "http://address01,http://address02"
xxl.job.admin.addresses=http://127.0.0.1:10407/xxl-job-admin
### xxl-job, access token
xxl.job.accessToken=default_token
### xxl-job executor appname
xxl.job.executor.appname=xxl-job-executor-sample
### xxl-job executor registry-address: default use address to registry , otherwise use ip:port if address is null
xxl.job.executor.address=
### xxl-job executor server-info
xxl.job.executor.ip=
xxl.job.executor.port=9998
### xxl-job executor log-path
xxl.job.executor.logpath=/data/applogs/xxl-job/jobhandler
### xxl-job executor log-retention-days
xxl.job.executor.logretentiondays=30

20.4.3.添加执行器配置类 XxlJobExecutorConfig

package com.litongjava.tio.boot.hello.config;

import java.io.IOException;
import java.io.InputStreamReader;
import java.util.Properties;

import com.litongjava.jfinal.aop.annotation.ABean;
import com.litongjava.tio.boot.hello.job.MyJobHandler;
import com.xxl.job.core.executor.XxlJobExecutor;
import com.xxl.job.core.executor.impl.XxlJobSimpleExecutor;

import lombok.extern.slf4j.Slf4j;

@Slf4j
//@Configuration
public class XxlJobExecutorConfig {

  @ABean(destroyMethod = "destroy")
  public XxlJobSimpleExecutor xxlJobSimpleExecutor() {
    // load executor prop
    Properties xxlJobProp = loadProperties("xxl-job-executor.properties");
    // init executor
    XxlJobSimpleExecutor xxlJobExecutor = new XxlJobSimpleExecutor();
    xxlJobExecutor.setAdminAddresses(xxlJobProp.getProperty("xxl.job.admin.addresses"));
    xxlJobExecutor.setAccessToken(xxlJobProp.getProperty("xxl.job.accessToken"));
    xxlJobExecutor.setAppname(xxlJobProp.getProperty("xxl.job.executor.appname"));
    xxlJobExecutor.setAddress(xxlJobProp.getProperty("xxl.job.executor.address"));
    xxlJobExecutor.setIp(xxlJobProp.getProperty("xxl.job.executor.ip"));
    xxlJobExecutor.setPort(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.port")));
    xxlJobExecutor.setLogPath(xxlJobProp.getProperty("xxl.job.executor.logpath"));
    xxlJobExecutor.setLogRetentionDays(Integer.valueOf(xxlJobProp.getProperty("xxl.job.executor.logretentiondays")));
    // registry job bean
    // xxlJobExecutor.setXxlJobBeanList(Arrays.asList(new SyncXxlJob()));
    XxlJobExecutor.registJobHandler("my_job", new MyJobHandler());
    // start executor
    try {
      xxlJobExecutor.start();
    } catch (Exception e) {
      log.error(e.getMessage(), e);
    }
    return xxlJobExecutor;
  }

  public static Properties loadProperties(String propertyFileName) {
    InputStreamReader in = null;
    try {
      ClassLoader loder = Thread.currentThread().getContextClassLoader();
      in = new InputStreamReader(loder.getResourceAsStream(propertyFileName), "UTF-8");
      if (in != null) {
        Properties prop = new Properties();
        prop.load(in);
        return prop;
      }
    } catch (IOException e) {
      log.error("load {} error!", propertyFileName);
    } finally {
      if (in != null) {
        try {
          in.close();
        } catch (IOException e) {
          log.error("close {} error!", propertyFileName);
        }
      }
    }
    return null;
  }
}

这段 Java 代码是用于在使用 tio-boot 框架的项目中设置 XXL-Job 执行器的配置类。以下是代码的详细解释:

类定义
  • XxlJobExecutorConfig 被注解 @Slf4j 标记,这是 Lombok 的注解,用于注入日志记录器;@AConfiguration 是 tio-boot 的注解,表示这个类用于配置目的。
Bean 定义
  • 类中有一个名为 xxlJobSimpleExecutor() 的方法,该方法被 @ABean(destroyMethod = "destroy") 注解标记。这表明该方法返回一个 Bean(由 tio-boot 框架管理的对象),destroyMethod = "destroy" 指定当应用上下文关闭时要调用的方法。
加载 XXL-Job 属性
  • 方法 xxlJobSimpleExecutor() 首先从名为 xxl-job-executor.properties 的文件加载属性。这通过 loadProperties() 方法实现,该方法读取属性文件并加载配置。
初始化执行器
  • 创建了一个 XxlJobSimpleExecutor 的实例。
  • 设置了执行器的属性,如管理员地址、访问令牌、应用名称、地址、IP、端口、日志路径和日志保留天数,这些都基于加载的属性。
注册作业处理器
  • 通过调用 XxlJobExecutor.registJobHandler("my_job", new MyJobHandler()) 配置了执行器的作业处理器。这样注册了一个名为 "my_job" 的作业处理器,它是 MyJobHandler 的一个实例。MyJobHandler 类应该定义当触发此处理器的作业时要执行的逻辑。
启动执行器
  • 使用 xxlJobExecutor.start() 启动了执行器。此调用可能抛出异常,异常被捕获并记录。
错误处理
  • loadProperties() 方法包含了在加载属性文件过程中的错误处理。
总结

这个配置类为处理分布式任务执行框架中的作业设置了 XXL-Job 执行器。它涉及加载配置属性、用这些属性初始化执行器、注册作业处理器,以及处理这些过程中可能出现的任何潜在错误。这种设置对于将 XXL-Job 集成到基于 tio-boot 的应用程序中至关重要,使其能够有效执行分布式的定时任务。 在执行器配类类中已经使用 registJobHandler 添加类执行任务

20.4.4.添加 job

MyJobHandler 是具体的执行任务,代码如下

package com.litongjava.tio.boot.hello.job;

import com.xxl.job.core.handler.IJobHandler;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class MyJobHandler extends IJobHandler {
  @Override
  public void execute() throws Exception {
    log.info("执行代码");
  }
}

这段代码是用于定义一个 XXL-Job 的作业处理器类,名为 MyJobHandler。以下是该代码的详细解释:

导入
  • 导入了 XXL-Job 的核心处理器接口 com.xxl.job.core.handler.IJobHandler,用于实现自定义的任务逻辑。
  • 导入了 Lombok 的 @Slf4j 注解,用于提供日志记录功能。
类定义
  • MyJobHandler 类继承自 IJobHandler,这意味着它必须实现 IJobHandler 接口中定义的方法。IJobHandler 是 XXL-Job 框架中用于定义任务执行逻辑的核心接口。
  • 类使用了 @Slf4j 注解,这会在类中自动创建一个日志对象 log,用于记录日志。
方法实现
  • 类中重写了 execute() 方法。这是任务执行的核心方法,当任务触发时,XXL-Job 框架将调用此方法。
  • execute() 方法中的代码是执行任务时的具体逻辑。在这个例子中,它仅包含一个日志记录操作,即 log.info("执行代码")。这条日志表示任务已被触发并正在执行。在实际应用中,这里可以填充具体的业务逻辑。
异常处理
  • execute() 方法声明抛出了 Exception 异常。这意味着在执行任务过程中如果遇到任何异常,都可以通过抛出异常的方式进行处理。XXL-Job 框架将捕获并处理这些异常。
总结

MyJobHandler 类是一个 XXL-Job 任务处理器的实现,用于定义当任务被调度时需要执行的具体逻辑。这个简单的例子中,它只是记录了一条日志,但在实际应用中,可以在这里实现任何需要定时执行的业务逻辑。通过继承 IJobHandler 并重写 execute() 方法,开发者可以自定义任务执行的行为。

20.4.5.启动服务

启动 tio-boot,出现下面的日志表示连接 xxl-job-admin 成功,启动成功,客户端会启动 9998 用于和 xxl-job-admin 进行通讯

2023-12-16 00:57:01.056 [Thread-5] INFO  EmbedServer.run:82 - >>>>>>>>>>> xxl-job remoting server start success, nettype = class com.xxl.job.core.server.EmbedServer, port = 9998

在浏览器上测试访问http://127.0.0.1:9998/返回下面的信息说明执行器启动成功

{"code":500,"msg":"invalid request, HttpMethod not support."}

20.4.5.在调度中心中添加定时任务并执行

将上面的配置后,执行器项目在启动后会注册到调度中心,但是定时任务仍然需要手动添加, 登录调度中心-->任务管理-->新增-->

  • 基础配置 执行器 示例执行器 任务描述 my_job 负责人 Tong Li 报警邮件 可以为空

  • 调度配置 调度类型 CRON Cron* 0/1****?

  • 任务配置 运行模式 BEAN JobHandler* my_job 设置为 XxlJobExecutor.registJobHandler 是的名称 任务参数 请输入任务参数

  • 高级配置 路由策略 第一个 子任务 ID 可以为空 调度过期策略 忽略 阻塞处理策略 单机串行 任务超时时间 0 失败重试次数 0

  • 执行任务 添加完成后选择操作-->启动,在执行 "操作"-->"执行一次" 点击"执行一次"之后任务调动中心会执行器发送 Post 请求,执行器手动请求后会执行对于的任务

20.5.手动触发执行任务

你可以手动想执行器发送 post 请求触发执行任务,发送的消息格式如下

POST /run HTTP/1.1
Content-Type: application/json;charset=UTF-8
Accept-Charset: application/json;charset=UTF-8
XXL-JOB-ACCESS-TOKEN: default_token
Cache-Control: no-cache
Pragma: no-cache
User-Agent: Java/1.8.0_211
Host: 192.168.3.8:9998
Accept: text/html, image/gif, image/jpeg, *; q=.2, */*; q=.2
Connection: keep-alive
Content-Length: 265

{"jobId":2,"executorHandler":"my_job","executorParams":"","executorBlockStrategy":"SERIAL_EXECUTION","executorTimeout":0,"logId":1357,"logDateTime":1702727731010,"glueType":"BEAN","glueSource":"","glueUpdatetime":1702724718000,"broadcastIndex":0,"broadcastTotal":1}

推荐使用使用我开发的ide-rest-clientopen in new window发送 http 请求,将上面内容新建一个后缀名为.http 文件,使用 ide-rest-client 打开即可发送