使用 api-table 连接 mysql and tdengine 多数据源

如何在 tio-boot 应用程序中配置多个数据源,这里以 MySQL 和 TDengine 为例。

ActiveRecordPluginConfiguration 类

import javax.sql.DataSource;

import com.jfinal.template.Engine;
import com.jfinal.template.source.ClassPathSourceFactory;
import com.litongjava.data.utils.TioRequestParamUtils;
import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.litongjava.jfinal.plugin.activerecord.OrderedFieldContainerFactory;
import com.litongjava.jfinal.plugin.activerecord.dialect.TdEngineDialect;
import com.litongjava.jfinal.plugin.hikaricp.DsContainer;
import com.litongjava.tio.boot.constatns.ConfigKeys;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

@AConfiguration
public class ActiveRecordPluginConfiguration {

  @AInitialization
  public void activeRecordPlugin() throws Exception {
    configArpForMysql();
    configArpForTdEngine();

  }

  public DataSource getMysqlDataSource() {
    String jdbcUrl = EnvUtils.get("jdbc.url");
    String jdbcUser = EnvUtils.get("jdbc.user");
    String jdbcPswd = EnvUtils.get("jdbc.pswd");
    int maximumPoolSize = EnvUtils.getInt("jdbc.MaximumPoolSize", 2);

    HikariConfig config = new HikariConfig();
    // 设定基本参数
    config.setJdbcUrl(jdbcUrl);
    config.setUsername(jdbcUser);
    config.setPassword(jdbcPswd);
    config.setMaximumPoolSize(maximumPoolSize);

    HikariDataSource hikariDataSource = new HikariDataSource(config);
    TioBootServer.addDestroyMethod(hikariDataSource::close);
    return hikariDataSource;
  }

  public ActiveRecordPlugin configArpForMysql() {
    String property = EnvUtils.get(ConfigKeys.APP_ENV);

    DataSource mysqlDataSource = getMysqlDataSource();
    ActiveRecordPlugin mysqlArp = new ActiveRecordPlugin(mysqlDataSource);
    mysqlArp.setContainerFactory(new OrderedFieldContainerFactory());

    Engine engine = mysqlArp.getEngine();
    engine.setSourceFactory(new ClassPathSourceFactory());
    engine.setCompressorOn(' ');
    engine.setCompressorOn('\n');

    if ("dev".equals(property)) {
      mysqlArp.setDevMode(true);
      engine.setDevMode(true);
      mysqlArp.setShowSql(true);
    }

    mysqlArp.addSqlTemplate("/sql/all_sqls.sql");
    mysqlArp.start();
    TioBootServer.addDestroyMethod(mysqlArp::stop);
    // add
    TioRequestParamUtils.types.add("bigint");
    return mysqlArp;
  }

  public DataSource getTdengineDataSource() {
    HikariConfig config = new HikariConfig();
    // jdbc properties
    String host = EnvUtils.get("tdengine.host");
    int port = EnvUtils.getInt("tdengine.port");
    String user = EnvUtils.get("tdengine.username");
    String pswd = EnvUtils.get("tdengine.password");
    String dbName = EnvUtils.get("tdengine.database");
    String driverClassName = EnvUtils.get("tdengine.driverClassName");

    // String driverClassName = "com.taosdata.jdbc.TSDBDriver";

    String jdbcUrl = getTdEngineJdbcUrl(host, port, user, pswd, dbName);
    config.setJdbcUrl(jdbcUrl);
    config.setDriverClassName(driverClassName);
    // connection pool configurations
    config.setMinimumIdle(10); // minimum number of idle connection
    config.setMaximumPoolSize(10); // maximum number of connection in the pool
    config.setConnectionTimeout(30000); // maximum wait milliseconds for get connection from pool
    config.setMaxLifetime(0); // maximum life time for each connection
    config.setIdleTimeout(0); // max idle time for recycle idle connection
    config.setConnectionTestQuery("select server_status()"); // validation query

    HikariDataSource ds = new HikariDataSource(config); // create datasource
    DsContainer.setDataSource(ds);
    TioBootServer.addDestroyMethod(ds::close);

    return ds;
  }

  private String getTdEngineJdbcUrl(String host, int port, String user, String pswd, String dbName) {
    // 添加batchfetch=true属性后得到的Websocket连接
    return "jdbc:TAOS-RS://" + host + ":" + port + "/" + dbName + "?user=" + user + "&password=" + pswd
        + "&batchfetch=true";
  }

  public void configArpForTdEngine() {
    String property = EnvUtils.get(ConfigKeys.APP_ENV);
    DataSource tdengineDataSource = getTdengineDataSource();
    // 指定名称为 tdengine
    ActiveRecordPlugin tdengineArp = new ActiveRecordPlugin("tdengine", tdengineDataSource);
    tdengineArp.setDialect(new TdEngineDialect());
    tdengineArp.setContainerFactory(new OrderedFieldContainerFactory());

    if ("dev".equals(property)) {
      tdengineArp.setDevMode(true);
      tdengineArp.setShowSql(true);
    }

    tdengineArp.start();
    TioBootServer.addDestroyMethod(tdengineArp::stop);
  }
}
  1. 数据源配置:

    • getMysqlDataSource(): 此方法创建并配置 MySQL 的数据源。使用 HikariCP 作为连接池管理器。
    • getTdengineDataSource(): 此方法创建并配置 TDengine 的数据源。同样使用 HikariCP,并设置了针对 TDengine 特有的属性和连接测试查询。
  2. ActiveRecordPlugin 配置:

    • configArpForMysql(): 为 MySQL 配置 ActiveRecordPlugin。设置了 SQL 模板路径、开发模式、SQL 显示等。
    • configArpForTdEngine(): 为 TDengine 配置 ActiveRecordPlugin。同样设置了开发模式和 SQL 显示,数据库标识名为 "tdengine"。
  3. 资源清理: 在 getMysqlDataSource()getTdengineDataSource() 方法中,使用 TioBootServer.addDestroyMethod 来确保应用停止时数据源能够正确关闭。

测试类

  1. ActiveRecordPluginConfigurationTest:
    • 使用 TioBootTest.before 方法初始化测试环境。
    • test() 方法中演示了如何使用 DbJsonService 从 MySQL 查询数据,并使用 Db.use("tdengine") 从 TDengine 查询数据。
import org.junit.Before;
import org.junit.Test;

import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.jfinal.plugin.activerecord.Page;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.jfinal.plugin.activerecord.SqlPara;
import com.litongjava.tio.boot.tesing.TioBootTest;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ActiveRecordPluginConfigurationTest {

  @Before
  public void before() {
    TioBootTest.before(ActiveRecordPluginConfiguration.class);
  }

  @Test
  public void test() {
    DbJsonService dbJsonService = DbJsonService.getInstance();
    // 查询 mysql
    DbJsonBean<Page<Record>> page = dbJsonService.page("sys_user_info", Kv.create());
    log.info("size:{}", page.getData().getPageSize());
    // 查询 tdgnine
    DbPro dbPro = Db.use("tdengine");
    SqlPara sqlPara = new SqlPara();
    sqlPara.setSql("select `*` from sensor_data");
    Page<Record> page2 = dbPro.paginate(1, 10, sqlPara);
    log.info("page size:{}", page2.getPageSize());
  }

}

控制器测试类

  1. PenRawDataController:
    • 提供了一个 page() 方法,用于处理分页请求。
    • 通过 DbJsonServiceDb.use("tdengine") 从特定的数据源(在这里是 TDengine)查询数据。
import java.util.Map;

import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.model.DbPage;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.data.utils.DbJsonBeanUtils;
import com.litongjava.data.utils.KvUtils;
import com.litongjava.data.utils.TioRequestParamUtils;
import com.litongjava.jfinal.aop.annotation.AAutowired;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.server.annotation.RequestPath;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequestPath("/pen/raw/data")
public class PenRawDataController {

  @AAutowired
  private DbJsonService dbJsonService;

  @RequestPath("/page")
  public DbJsonBean<DbPage<Kv>> page(HttpRequest request) {
    String f = "pen_raw_data_stable";
    Map<String, Object> map = TioRequestParamUtils.getRequestMap(request);
    map.remove("f");
    Kv kv = KvUtils.camelToUnderscore(map);
    log.info("tableName:{},kv:{}", f, kv);
    DbPro dbPro = Db.use("tdengine");
    return DbJsonBeanUtils.pageToDbPage(dbJsonService.page(dbPro, f, kv));
  }
}

测试参数查询

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

import org.junit.Before;
import org.junit.Test;

import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.jfinal.plugin.activerecord.Config;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.jfinal.plugin.activerecord.RecordBuilder;
import com.litongjava.jfinal.plugin.hikaricp.DsContainer;
import com.litongjava.tio.boot.tesing.TioBootTest;

import lombok.Cleanup;

public class PenRawDataControllerTest {

  @Before
  public void before() {
    TioBootTest.before(ActiveRecordPluginConfiguration.class);
  }

  @Test
  public void query() throws SQLException {
    String sql = "select payload_ts,data_ts,client_id,user_id from manliang_pen.pen_page_stable where user_id=?";
    String userId = "18374686479671623681";
    @Cleanup
    Connection conn = DsContainer.ds.getConnection();
    Config config = Db.use("tdengine").getConfig();

    List<Record> list = null;
    try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
      pstmt.setString(1, userId);
      ResultSet executeQuery = pstmt.executeQuery();
      list = RecordBuilder.me.build(config, executeQuery);
    }
    System.out.println(list);
  }

  @Test
  public void queryWithDb() {
    String sql = "select payload_ts,data_ts,client_id,user_id from manliang_pen.pen_page_stable where user_id=?";
    String userId = "18374686479671623681";
    List<Record> find = Db.use("tdengine").find(sql, userId);
    System.out.println(find);
  }

  @Test
  public void queryWithDbJsonService() {
    DbJsonService dbJsonService = DbJsonService.getInstance();
    Kv kv = Kv.create();
    kv.set("table_name", "manliang_pen.pen_page_stable");
    kv.set("columns", "payload_ts,data_ts,client_id,user_id");
    kv.set("user_id", "18374686479671623681");
    DbPro dbPro = Db.use("tdengine");
    DbJsonBean<List<Record>> dbJsonBean = dbJsonService.list(dbPro, kv);
    System.out.println(dbJsonBean);
  }
}
···