使用 ApiTable 连接 TDEngine

本节列出了整合 TDEngine 所需的 Maven 依赖。这些依赖包括 tio-boot 用于构建基于事件驱动的应用程序,以及相关的 JDBC 驱动和连接池用于数据库连接。

连接 TDEngine

添加依赖

<ApiTable.version>1.3.0</ApiTable.version>

<dependency>
  <groupId>com.litongjava</groupId>
  <artifactId>ApiTable</artifactId>
  <version>${ApiTable.version}</version>
</dependency>

<dependency>
  <groupId>com.taosdata.jdbc</groupId>
  <artifactId>taos-jdbcdriver</artifactId>
  <version>3.2.7</version>
</dependency>

<dependency>
  <groupId>com.zaxxer</groupId>
  <artifactId>HikariCP</artifactId>
  <version>4.0.3</version>
</dependency>

添加配置信息

app.properties

tdengine.host=192.168.3.9
tdengine.port=6041
tdengine.username=root
tdengine.password=root123
tdengine.database=manliang_pen
tdengine.driverClassName=com.taosdata.jdbc.rs.RestfulDriver

启动类

package com.litongjava.tio.web.hello;

import com.litongjava.hotswap.wrapper.tio.boot.TioApplicationWrapper;
import com.litongjava.jfinal.aop.annotation.AComponentScan;

@AComponentScan
public class HelloApp {
  public static void main(String[] args) {
    long start = System.currentTimeMillis();
    TioApplicationWrapper.run(HelloApp.class, args);
    long end = System.currentTimeMillis();
    System.out.println((end - start) + "ms");
  }
}

ActiveRecordPluginConfiguration 类

import javax.sql.DataSource;

import com.jfinal.template.Engine;
import com.jfinal.template.source.ClassPathSourceFactory;
import com.litongjava.data.utils.TioRequestParamUtils;
import com.litongjava.jfinal.aop.annotation.AConfiguration;
import com.litongjava.jfinal.aop.annotation.AInitialization;
import com.litongjava.jfinal.plugin.activerecord.ActiveRecordPlugin;
import com.litongjava.jfinal.plugin.activerecord.OrderedFieldContainerFactory;
import com.litongjava.jfinal.plugin.activerecord.dialect.TdEngineDialect;
import com.litongjava.jfinal.plugin.hikaricp.DsContainer;
import com.litongjava.tio.boot.constatns.ConfigKeys;
import com.litongjava.tio.boot.server.TioBootServer;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.zaxxer.hikari.HikariConfig;
import com.zaxxer.hikari.HikariDataSource;

@AConfiguration
public class ActiveRecordPluginConfiguration {

  @AInitialization
  public void activeRecordPlugin() throws Exception {
    configArpForTdEngine();
  }

  public DataSource getTdengineDataSource() {
    HikariConfig config = new HikariConfig();
    // jdbc properties
    String host = EnvUtils.get("tdengine.host");
    int port = EnvUtils.getInt("tdengine.port");
    String user = EnvUtils.get("tdengine.username");
    String pswd = EnvUtils.get("tdengine.password");
    String dbName = EnvUtils.get("tdengine.database");
    String driverClassName = EnvUtils.get("tdengine.driverClassName");

    // String driverClassName = "com.taosdata.jdbc.TSDBDriver";

    String jdbcUrl = getTdEngineJdbcUrl(host, port, user, pswd, dbName);
    config.setJdbcUrl(jdbcUrl);
    config.setDriverClassName(driverClassName);
    // connection pool configurations
    config.setMinimumIdle(10); // minimum number of idle connection
    config.setMaximumPoolSize(10); // maximum number of connection in the pool
    config.setConnectionTimeout(30000); // maximum wait milliseconds for get connection from pool
    config.setMaxLifetime(0); // maximum life time for each connection
    config.setIdleTimeout(0); // max idle time for recycle idle connection
    config.setConnectionTestQuery("select server_status()"); // validation query

    HikariDataSource ds = new HikariDataSource(config); // create datasource
    DsContainer.setDataSource(ds);
    TioBootServer.addDestroyMethod(ds::close);

    return ds;
  }

  private String getTdEngineJdbcUrl(String host, int port, String user, String pswd, String dbName) {
    // 添加batchfetch=true属性后得到的Websocket连接
    return "jdbc:TAOS-RS://" + host + ":" + port + "/" + dbName + "?user=" + user + "&password=" + pswd
        + "&batchfetch=true";
  }

  public void configArpForTdEngine() {
    String property = EnvUtils.get(ConfigKeys.APP_ENV);
    DataSource tdengineDataSource = getTdengineDataSource();
    // 指定名称为 tdengine
    ActiveRecordPlugin tdengineArp = new ActiveRecordPlugin("tdengine", tdengineDataSource);
    tdengineArp.setDialect(new TdEngineDialect());
    tdengineArp.setContainerFactory(new OrderedFieldContainerFactory());

    if ("dev".equals(property)) {
      tdengineArp.setDevMode(true);
      tdengineArp.setShowSql(true);
    }

    tdengineArp.start();
    TioBootServer.addDestroyMethod(tdengineArp::stop);
  }
}
  1. 数据源配置:

    • getTdengineDataSource(): 此方法创建并配置 TDengine 的数据源。同样使用 HikariCP,并设置了针对 TDengine 特有的属性和连接测试查询。
  2. ActiveRecordPlugin 配置:

    • configArpForTdEngine(): 为 TDengine 配置 ActiveRecordPlugin。同样设置了开发模式和 SQL 显示,数据库标识名为 "tdengine"。
  3. 资源清理: 在 getTdengineDataSource() 方法中,使用 TioBootServer.addDestroyMethod 来确保应用停止时数据源能够正确关闭。

测试类

  1. ActiveRecordPluginConfigurationTest:
    • 使用 TioBootTest.before 方法初始化测试环境。
    • test() 方法中演示了如何使用 DbJsonService 从 MySQL 查询数据,并使用 Db.use("tdengine") 从 TDengine 查询数据。
import org.junit.Before;
import org.junit.Test;

import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.jfinal.plugin.activerecord.Page;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.jfinal.plugin.activerecord.SqlPara;
import com.litongjava.tio.boot.tesing.TioBootTest;

import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ActiveRecordPluginConfigurationTest {

  @Before
  public void before() {
    TioBootTest.before(ActiveRecordPluginConfiguration.class);
  }

  @Test
  public void test() {
    DbJsonService dbJsonService = DbJsonService.getInstance();
    // 查询 mysql
    DbJsonBean<Page<Record>> page = dbJsonService.page("sys_user_info", Kv.create());
    log.info("size:{}", page.getData().getPageSize());
    // 查询 tdgnine
    DbPro dbPro = Db.use("tdengine");
    SqlPara sqlPara = new SqlPara();
    sqlPara.setSql("select `*` from sensor_data");
    Page<Record> page2 = dbPro.paginate(1, 10, sqlPara);
    log.info("page size:{}", page2.getPageSize());
  }

}

控制器测试类

  1. PenRawDataController:
    • 提供了一个 page() 方法,用于处理分页请求。
    • 通过 DbJsonServiceDb.use("tdengine") 从特定的数据源(在这里是 TDengine)查询数据。
import java.util.Map;

import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.model.DbPage;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.data.utils.DbJsonBeanUtils;
import com.litongjava.data.utils.KvUtils;
import com.litongjava.data.utils.TioRequestParamUtils;
import com.litongjava.jfinal.aop.annotation.AAutowired;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.tio.http.common.HttpRequest;
import com.litongjava.tio.http.server.annotation.RequestPath;

import lombok.extern.slf4j.Slf4j;

@Slf4j
@RequestPath("/pen/raw/data")
public class PenRawDataController {

  @AAutowired
  private DbJsonService dbJsonService;

  @RequestPath("/page")
  public DbJsonBean<DbPage<Kv>> page(HttpRequest request) {
    String f = "pen_raw_data_stable";
    Map<String, Object> map = TioRequestParamUtils.getRequestMap(request);
    map.remove("f");
    Kv kv = KvUtils.camelToUnderscore(map);
    log.info("tableName:{},kv:{}", f, kv);
    DbPro dbPro = Db.use("tdengine");
    return DbJsonBeanUtils.pageToDbPage(dbJsonService.page(dbPro, f, kv));
  }
}

测试参数查询

import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.List;

import org.junit.Before;
import org.junit.Test;

import com.jfinal.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.jfinal.aop.Aop;
import com.litongjava.jfinal.plugin.activerecord.Config;
import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.DbPro;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.jfinal.plugin.activerecord.RecordBuilder;
import com.litongjava.jfinal.plugin.hikaricp.DsContainer;
import com.litongjava.tio.boot.tesing.TioBootTest;

import lombok.Cleanup;

public class PenRawDataControllerTest {

  @Before
  public void before() {
    TioBootTest.before(ActiveRecordPluginConfiguration.class);
  }

  @Test
  public void query() throws SQLException {
    String sql = "select payload_ts,data_ts,client_id,user_id from manliang_pen.pen_page_stable where user_id=?";
    String userId = "18374686479671623681";
    @Cleanup
    Connection conn = DsContainer.ds.getConnection();
    Config config = Db.use("tdengine").getConfig();

    List<Record> list = null;
    try (PreparedStatement pstmt = conn.prepareStatement(sql)) {
      pstmt.setString(1, userId);
      ResultSet executeQuery = pstmt.executeQuery();
      list = RecordBuilder.me.build(config, executeQuery);
    }
    System.out.println(list);
  }

  @Test
  public void queryWithDb() {
    String sql = "select payload_ts,data_ts,client_id,user_id from manliang_pen.pen_page_stable where user_id=?";
    String userId = "18374686479671623681";
    List<Record> find = Db.use("tdengine").find(sql, userId);
    System.out.println(find);
  }

  @Test
  public void queryWithDbJsonService() {
    DbJsonService dbJsonService = DbJsonService.getInstance();
    Kv kv = Kv.create();
    kv.set("table_name", "manliang_pen.pen_page_stable");
    kv.set("columns", "payload_ts,data_ts,client_id,user_id");
    kv.set("user_id", "18374686479671623681");
    DbPro dbPro = Db.use("tdengine");
    DbJsonBean<List<Record>> dbJsonBean = dbJsonService.list(dbPro, kv);
    System.out.println(dbJsonBean);
  }
}

基操作示例

增删改查

package com.litongjava.tio.web.hello.controller;

import java.sql.Connection;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.sql.Timestamp;
import java.util.List;
import java.util.Random;

import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.jfinal.plugin.activerecord.RecordBuilder;
import com.litongjava.tio.http.server.annotation.RequestPath;
import com.litongjava.tio.web.hello.config.utils.TDUtils;
import com.taosdata.jdbc.ws.TSWSPreparedStatement;

import lombok.Cleanup;

@RequestPath("/tdeingien/test")
public class TbEngineTestController {

  public String connection() throws SQLException {

    @Cleanup
    Connection connection = TDUtils.ds.getConnection();
    String string = connection.toString();
    return string;
  }

  /**
   * 创建表和数据库
   * @throws SQLException
   */
  public String init() throws SQLException {
    int BINARY_COLUMN_SIZE = 30;
    String[] schemaList = {
        "create table stable1(ts timestamp, f1 tinyint, f2 smallint, f3 int, f4 bigint) tags(t1 tinyint, t2 smallint, t3 int, t4 bigint)",
        "create table stable2(ts timestamp, f1 float, f2 double) tags(t1 float, t2 double)",
        "create table stable3(ts timestamp, f1 bool) tags(t1 bool)",
        "create table stable4(ts timestamp, f1 binary(" + BINARY_COLUMN_SIZE + ")) tags(t1 binary(" + BINARY_COLUMN_SIZE
            + "))",
        "create table stable5(ts timestamp, f1 nchar(" + BINARY_COLUMN_SIZE + ")) tags(t1 nchar(" + BINARY_COLUMN_SIZE
            + "))" };

    @Cleanup
    Connection conn = TDUtils.ds.getConnection();

    try (Statement stmt = conn.createStatement()) {
      stmt.execute("drop database if exists test_ws_parabind");
      stmt.execute("create database if not exists test_ws_parabind");
      stmt.execute("use test_ws_parabind");
      for (int i = 0; i < schemaList.length; i++) {
        stmt.execute(schemaList[i]);
      }
    }
    return "success";
  }

  /**
   * init类型参数查询
   */
  public String bindInteger() throws SQLException {
    String sql = "insert into ? using stable1 tags(?,?,?,?) values(?,?,?,?,?)";
    int numOfSubTable = 10, numOfRow = 10;
    Random random = new Random(System.currentTimeMillis());
    @Cleanup
    Connection conn = TDUtils.ds.getConnection();

    try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {

      pstmt.execute("use test_ws_parabind");

      for (int i = 1; i <= numOfSubTable; i++) {
        // set table name
        pstmt.setTableName("t1_" + i);
        // set tags
        pstmt.setTagByte(1, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
        pstmt.setTagShort(2, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
        pstmt.setTagInt(3, random.nextInt(Integer.MAX_VALUE));
        pstmt.setTagLong(4, random.nextLong());
        // set columns
        long current = System.currentTimeMillis();
        for (int j = 0; j < numOfRow; j++) {
          pstmt.setTimestamp(1, new Timestamp(current + j));
          pstmt.setByte(2, Byte.parseByte(Integer.toString(random.nextInt(Byte.MAX_VALUE))));
          pstmt.setShort(3, Short.parseShort(Integer.toString(random.nextInt(Short.MAX_VALUE))));
          pstmt.setInt(4, random.nextInt(Integer.MAX_VALUE));
          pstmt.setLong(5, random.nextLong());
          pstmt.addBatch();
        }
        pstmt.executeBatch();
      }
    }
    return "success";
  }

  public String bindFloat() throws SQLException {
    String sql = "insert into ? using stable2 tags(?,?) values(?,?,?)";
    Random random = new Random(System.currentTimeMillis());
    @Cleanup
    Connection conn = TDUtils.ds.getConnection();

    int numOfSubTable = 10, numOfRow = 10;

    try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {

      pstmt.execute("use test_ws_parabind");

      for (int i = 1; i <= numOfSubTable; i++) {
        // set table name
        pstmt.setTableName("t2_" + i);
        // set tags
        pstmt.setTagFloat(1, random.nextFloat());
        pstmt.setTagDouble(2, random.nextDouble());
        // set columns
        long current = System.currentTimeMillis();
        for (int j = 0; j < numOfRow; j++) {
          pstmt.setTimestamp(1, new Timestamp(current + j));
          pstmt.setFloat(2, random.nextFloat());
          pstmt.setDouble(3, random.nextDouble());
          pstmt.addBatch();
        }
        pstmt.executeBatch();
      }
    }

    return "success";
  }

  public String bindBoolean() throws SQLException {
    String sql = "insert into ? using stable3 tags(?) values(?,?)";
    int numOfSubTable = 10, numOfRow = 10;
    Random random = new Random(System.currentTimeMillis());
    @Cleanup
    Connection conn = TDUtils.ds.getConnection();
    try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {
      for (int i = 1; i <= numOfSubTable; i++) {
        // set table name
        pstmt.setTableName("t3_" + i);
        // set tags
        pstmt.setTagBoolean(1, random.nextBoolean());
        // set columns
        long current = System.currentTimeMillis();
        for (int j = 0; j < numOfRow; j++) {
          pstmt.setTimestamp(1, new Timestamp(current + j));
          pstmt.setBoolean(2, random.nextBoolean());
          pstmt.addBatch();
        }
        pstmt.executeBatch();
      }
    }
    return "success";
  }

  public String bindBytes() throws SQLException {
    String sql = "insert into ? using stable4 tags(?) values(?,?)";
    int numOfSubTable = 10, numOfRow = 10;
    @Cleanup
    Connection conn = TDUtils.ds.getConnection();
    try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {

      for (int i = 1; i <= numOfSubTable; i++) {
        // set table name
        pstmt.setTableName("t4_" + i);
        // set tags
        pstmt.setTagString(1, new String("abc"));

        // set columns
        long current = System.currentTimeMillis();
        for (int j = 0; j < numOfRow; j++) {
          pstmt.setTimestamp(1, new Timestamp(current + j));
          pstmt.setString(2, "abc");
          pstmt.addBatch();
        }
        pstmt.executeBatch();
      }
    }
    return "success";
  }

  public String bindString() throws SQLException {
    String sql = "insert into ? using stable5 tags(?) values(?,?)";
    int numOfSubTable = 10, numOfRow = 10;
    @Cleanup
    Connection conn = TDUtils.ds.getConnection();

    try (TSWSPreparedStatement pstmt = conn.prepareStatement(sql).unwrap(TSWSPreparedStatement.class)) {

      for (int i = 1; i <= numOfSubTable; i++) {
        // set table name
        pstmt.setTableName("t5_" + i);
        // set tags
        pstmt.setTagNString(1, "California.SanFrancisco");

        // set columns
        long current = System.currentTimeMillis();
        for (int j = 0; j < numOfRow; j++) {
          pstmt.setTimestamp(0, new Timestamp(current + j));
          pstmt.setNString(1, "California.SanFrancisco");
          pstmt.addBatch();
        }
        pstmt.executeBatch();
      }
    }
    return "success";
  }

  public List<Record> selectStable1() throws SQLException {
    String sql = "select * from test.stable1";
    //List<Record> records = Db.find(sql);
    //return records;
    @Cleanup
    Connection conn = TDUtils.ds.getConnection();
    @Cleanup
    Statement stmt = conn.createStatement();
    stmt.execute("use test_ws_parabind");


    ResultSet resultSet = stmt.executeQuery(sql);
    RecordBuilder.me.build(null, resultSet);

    return null;
  }
}

查询示例

package com.litongjava.tio.web.hello.controller;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.tio.http.server.annotation.RequestPath;

@RequestPath("/stable1")
public class Stable1Controller {

  public List<Map<String, Object>> list() {
    List<Record> records = Db.findAll("stable1");
    List<Map<String, Object>> result = records.stream().map((t) -> t.toMap()).collect(Collectors.toList());
    return result;
  }
}

package com.litongjava.tio.web.hello.controller;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.tio.http.server.annotation.RequestPath;

@RequestPath("/stable2")
public class Stable2Controller {

  public List<Map<String, Object>> list() {
    List<Record> records = Db.findAll("stable2");
    List<Map<String, Object>> result = records.stream().map((t) -> t.toMap()).collect(Collectors.toList());
    return result;
  }
}

package com.litongjava.tio.web.hello.controller;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.tio.http.server.annotation.RequestPath;

@RequestPath("/stable3")
public class Stable3Controller {

  public List<Map<String, Object>> list() {
    List<Record> records = Db.findAll("stable3");
    List<Map<String, Object>> result = records.stream().map((t) -> t.toMap()).collect(Collectors.toList());
    return result;
  }
}

package com.litongjava.tio.web.hello.controller;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.tio.http.server.annotation.RequestPath;

@RequestPath("/stable4")
public class Stable4Controller {

  public List<Map<String, Object>> list() {
    List<Record> records = Db.findAll("stable4");
    List<Map<String, Object>> result = records.stream().map((t) -> t.toMap()).collect(Collectors.toList());
    return result;
  }
}

package com.litongjava.tio.web.hello.controller;

import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;

import com.litongjava.jfinal.plugin.activerecord.Db;
import com.litongjava.jfinal.plugin.activerecord.Record;
import com.litongjava.tio.http.server.annotation.RequestPath;

@RequestPath("/stable5")
public class Stable5Controller {

  public List<Map<String, Object>> list() {
    List<Record> records = Db.findAll("stable5");
    List<Map<String, Object>> result = records.stream().map((t) -> t.toMap()).collect(Collectors.toList());
    return result;
  }
}

测试

通过以下 URL 可以测试应用程序的不同功能,验证数据的创建、插入和查询是否正常工作。

  • http://localhost/stable1/list
  • http://localhost/stable2/list
  • http://localhost/stable3/list
  • http://localhost/stable4/list
  • http://localhost/stable5/list