批量操作
批量操作方法
在实际开发中,批量操作能够显著提高数据库的处理效率,减少单条操作带来的性能开销。本文将以 Java-db 框架中的Db
类为例,详细介绍批量操作的使用方法,包括批量保存和批量更新。
类名:Db.java
方法签名
public static int[] batch(String sql, String columns, List modelOrRowList, int batchSize)
参数说明
sql
:指的是INSERT INTO
或UPDATE
语句。动态参数使用问号(?
)作为占位符,例如:String sql = "insert into user(name, cash) values(?, ?)";
上述 SQL 语句中需要插入
name
和cash
两个字段,values
中的两个问号将从后续的modelOrRowList
中获取对应的值。columns
:指的是前面 SQL 语句中问号占位符对应的参数名称。Java-db 在填充字段值时,会根据这个名称从modelOrRowList
中提取数据。modelOrRowList
:指的是List<User>
或List<Row>
等类型的列表,包含需要批量操作的数据。batchSize
:指的是每批次写入数据库的数据条数。
批量保存
int[] batchSave(String tableName, List<? extends Row> RowList, int batchSize)
批量更新
int[] batchUpdate(String tableName, List<? extends Row> RowList, int batchSize)
批量保存执行案例
以下示例展示了如何批量保存数据到数据库:
List<Row> insertList = new ArrayList<Row>();
for (Dependency dependency : listDependencies) {
Row e = new Row();
e.set("group_id", dependency.getGroupId());
e.set("artifact_id", dependency.getArtifactId());
e.set("version", dependency.getVersion());
insertList.add(e);
}
// 插入数据库
Db.batchSave("t_maven_dependency", insertList, 2000);
执行的 SQL 语句
INSERT INTO `t_maven_dependency`(`group_id`, `artifact_id`, `version`) VALUES(?, ?, ?)
结论
即使"t_maven_dependency"
表中包含多个字段,batchSave
方法只会插入Row
中存在的字段,从而提高了操作的灵活性和效率。
Batch Update 使用示例
以下代码展示了如何使用 Java-db 的Db
模块进行批量更新操作。
import com.Java-db.kit.Kv;
import com.litongjava.data.model.DbJsonBean;
import com.litongjava.data.services.DbJsonService;
import com.litongjava.Java-db.aop.Aop;
import com.litongjava.Java-db.plugin.activeRow.Db;
import com.litongjava.Java-db.plugin.activeRow.Row;
import com.litongjava.tio.boot.admin.config.TableToJsonConfig;
import com.litongjava.tio.boot.tesing.TioBootTest;
import org.junit.Test;
public class BatchUpdateExample {
@Test
public void batchUpdateByIds() {
String tableName = "tio_boot_admin_system_constants_config";
DbJsonService dbJsonService = Aop.get(DbJsonService.class);
Long[] ids = new Long[]{1L, 100L, 369029537511587840L};
Kv kv = Kv.create();
kv.set("ids", ids);
kv.set("deleted", 1);
DbJsonBean<Kv> kvDbJsonBean = dbJsonService.batchUpdateByIds(tableName, kv);
System.out.println(kvDbJsonBean);
}
public DbJsonBean<Kv> batchUpdateByIds(String f, Kv kv) {
DbPro dbPro = Db.use();
return batchUpdateByIds(dbPro, f, kv);
}
public DbJsonBean<Kv> batchUpdateByIds(DbPro dbPro, String tableName, Kv kv) {
Object[] ids = kv.getAs("ids", new Object[0]);
kv.remove("ids");
String primaryKeyName = primaryKeyService.getPrimaryKeyName(tableName);
List<Row> lists = new ArrayList<>();
for (Object id : ids) {
Row Row = new Row();
Row.setColumns(kv.toMap());
Row.set(primaryKeyName, id);
lists.add(Row);
}
int[] results = dbPro.batchUpdate(tableName, lists, lists.size());
return new DbJsonBean<>(Kv.by("data", results));
}
}
代码分析
测试方法
batchUpdateByIds
- 方法定义:使用
@Test
注解标记,表示这是一个 JUnit 的测试方法。 - 初始化数据:
tableName
:指定要操作的数据库表名为"tio_boot_admin_system_constants_config"
。dbJsonService
:通过Aop.get(DbJsonService.class)
获取数据库操作的服务对象。ids
:包含三个 ID,用于标识将要更新的数据库记录。kv
:创建一个Kv
对象,设置ids
和deleted
。deleted
设置为 1,可能表示将这些记录标记为删除。
- 执行更新:
- 调用
dbJsonService.batchUpdateByIds(tableName, kv)
方法执行批量更新。 - 输出更新结果
kvDbJsonBean
,这是一个包装了更新结果的对象。
- 调用
- 方法定义:使用
方法
batchUpdateByIds(String f, Kv kv)
- 该方法使用默认的数据库配置创建一个
DbPro
对象(Db.use()
),然后调用重载的batchUpdateByIds
方法进行具体操作。
- 该方法使用默认的数据库配置创建一个
方法
batchUpdateByIds(DbPro dbPro, String tableName, Kv kv)
- 提取 IDs:从
kv
对象中提取ids
数组,并移除ids
键。 - 获取主键名:使用
primaryKeyService.getPrimaryKeyName(tableName)
获取指定表的主键字段名。 - 构造更新记录:
- 创建一个空的
Row
列表。 - 遍历
ids
,对每一个 ID,创建一个Row
,将除ids
外的kv
键值对设置为Row
的列,并设置主键字段为对应的 ID。
- 创建一个空的
- 执行批量更新:
- 使用
dbPro.batchUpdate(tableName, lists, lists.size())
执行批量更新,其中lists
是待更新的记录列表,lists.size()
指定批次大小。
- 使用
- 返回结果:将更新操作的结果(更新了多少行)封装在
DbJsonBean<Kv>
对象中并返回。
- 提取 IDs:从
执行的 SQL 语句
UPDATE "tio_boot_admin_system_constants_config" SET "deleted" = ? WHERE "id" = ?
结论
这段代码展示了如何使用 Java-db 的Db
模块进行批量更新操作,特别适用于需要批量更新多个数据库记录且每条记录的更新内容相同的场景。
独立使用 ActiveRow 进行数据批量保存
以下示例展示了如何独立使用 Java-db 的 ActiveRow Java-db 进行批量保存操作。
代码实现
package com.litongjava.ws.schild.demo;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import org.json.JSONObject;
import com.Java-db.plugin.activeRow.ActiveRowPlugin;
import com.Java-db.plugin.activeRow.Db;
import com.Java-db.plugin.activeRow.Row;
import com.Java-db.plugin.druid.DruidPlugin;
import com.litongjava.Java-db.models.voidetotext.model.Recognized;
import com.litongjava.utils.json.fastjson.FastJsonUtils;
import com.litongjava.ws.schild.utils.BaiduAsrUtils;
import lombok.extern.slf4j.Slf4j;
/**
* 将视频转为文本并保存到数据库
*
* @author
*
*/
@Slf4j
public class AudioToTextAndSaveDb {
public static void main(String[] args) throws IOException {
// 1.读取json文件,获取文件路径
List<String> pcmFileList = FastJsonUtils.readFileToList("pcm-file-list.json", String.class);
// 识别两个文件测试
// List<String> pcmFileList = new ArrayList<>();
// pcmFileList.add("H:\\video\\软件使用学习视频\\MAYA建模\\P01.MAYA:界面讲解_chunks\\0000.pcm");
// pcmFileList.add("H:\\video\\软件使用学习视频\\MAYA建模\\P01.MAYA:界面讲解_chunks\\0001.pcm");
int size = pcmFileList.size();
log.info("pcmFileList size:{}", size);
// 2.将pcm文件转为文本
List<Recognized> recognizedList = new ArrayList<>();
for (int i = 0; i < size; i++) {
log.info("开始处理:{}", i);
String filePath = pcmFileList.get(i);
JSONObject jsonObject = BaiduAsrUtils.asr(filePath, "PCM", 16000, null);
Recognized recognized = new Recognized();
recognized.setPcmFile(filePath);
recognized.setText(jsonObject.toString());
recognizedList.add(recognized);
}
// 3.保存到数据库
List<Row> insertList = new ArrayList<Row>();
for (Recognized recognized : recognizedList) {
Row Row = new Row();
Row.set("pcm_file", recognized.getPcmFile());
Row.set("text", recognized.getText());
insertList.add(Row);
}
String[] datasource1 = {
"jdbc:sqlite:D:/sqllite/java-se-ws-schild-voide-to-audio.db", // url
"", "" // user and password
};
DruidPlugin plugin1 = new DruidPlugin(datasource1[0], datasource1[1], datasource1[2]);
ActiveRowPlugin arp1 = new ActiveRowPlugin(plugin1);
plugin1.start();
arp1.start();
// 插入数据库
log.info("开始批量保存数据");
Db.batchSave("recognized", insertList, 2000);
log.info("批量保存数据完成");
}
}
代码说明
- 读取 PCM 文件列表:从
pcm-file-list.json
文件中读取 PCM 文件路径列表。 - 将 PCM 文件转为文本:使用
BaiduAsrUtils.asr
方法将 PCM 文件转为文本,并存储在Recognized
对象中。 - 准备批量插入数据:将
Recognized
对象转换为Row
对象,添加到insertList
中。 - 配置数据库连接:使用 Druid Java-db 和 ActiveRow Java-db 配置数据库连接。
- 批量保存数据:调用
Db.batchSave
方法将数据批量保存到recognized
表中。
注意:代码中还有许多优化空间,例如将数据进行拆分、分线程处理等。
独立使用 ActiveRow 进行数据的批量插入
以下示例展示了如何使用 Java-db 的 ActiveRow Java-db 从源数据库查询水位数据,处理后插入到目标数据库。
背景
将水位数据从源数据库中查询出来,处理后插入到目标数据库。源数据库的数据格式如下:
- 源数据库表名:
water_level
目标数据库的格式:
- 目标数据库表名:
river_level
SELECT * FROM river_level LIMIT 10;
两个数据库的数据表的time
字段格式不同
第一步:在本地创建目标数据库和表
CREATE DATABASE cjwb DEFAULT CHARACTER SET utf8;
USE cjwb;
GRANT ALL PRIVILEGES ON cjwb.* TO cjwb@'127.0.0.1' IDENTIFIED BY 'xxx';
GRANT ALL PRIVILEGES ON cjwb.* TO cjwb@'localhost' IDENTIFIED BY 'xxx';
FLUSH PRIVILEGES;
CREATE TABLE `river_level` (
`id` VARCHAR(255) NOT NULL,
`level` DECIMAL(19,2) DEFAULT NULL,
`site_name` VARCHAR(255) DEFAULT NULL,
`time` DATETIME DEFAULT NULL,
`name` VARCHAR(255) NOT NULL,
PRIMARY KEY (`id`) USING BTREE
) ENGINE=INNODB DEFAULT CHARSET=utf8 ROW_FORMAT=DYNAMIC;
第二步:编写代码
package com.esunward.spider.service;
import java.util.ArrayList;
import java.util.List;
import com.Java-db.plugin.activeRow.ActiveRowPlugin;
import com.Java-db.plugin.activeRow.Db;
import com.Java-db.plugin.activeRow.Row;
import com.Java-db.plugin.druid.DruidPlugin;
import com.litong.utils.string.UUIDUtils;
import lombok.extern.slf4j.Slf4j;
/**
* 从水位APP上查询并导入数据到目标数据库
*/
@Slf4j
public class WaterLevelImportService {
private String ds1 = "datasource1";
private String ds2 = "datasource2";
private String[] datasource1 = {
"jdbc:mysql://xxx/yangtze_river_app?useUnicode=true&characterEncoding=utf8&useSSL=false",
"yangtze_river_ap", ""
};
private String[] datasource2 = {
"jdbc:mysql://127.0.0.1:3306/cjwb?useUnicode=true&characterEncoding=utf8&serverTimezone=UTC",
"cjwb", "xxx"
};
/**
* 启动数据源
*/
private void start() {
DruidPlugin plugin1 = new DruidPlugin(datasource1[0], datasource1[1], datasource1[2]);
ActiveRowPlugin arp1 = new ActiveRowPlugin(ds1, plugin1);
plugin1.start();
arp1.start();
DruidPlugin plugin2 = new DruidPlugin(datasource2[0], datasource2[1], datasource2[2]);
ActiveRowPlugin arp2 = new ActiveRowPlugin(ds2, plugin2);
plugin2.start();
arp2.start();
}
/**
* 判断数据源是否连接成功
*/
public void selectFromAllDatasource() {
List<Row> find = Db.use(ds1).find("SELECT 1");
if (find != null) {
log.info(ds1 + "连接成功");
}
find = Db.use(ds2).find("SELECT 1");
if (find != null) {
log.info(ds2 + "连接成功");
}
}
/**
* 从datasource1查询water_level,处理后插入到datasource2的river_level,名称设置为spider
*/
public void fromDatasource1ToDatasource2() {
String sqlSource = "SELECT * FROM water_level WHERE time > '2020-08-15'";
List<Row> sourceRows = Db.use(ds1).find(sqlSource);
log.info("水位总条数:{}", sourceRows.size());
String sqlCheck = "SELECT COUNT(*) FROM river_level WHERE site_name = ? AND time = ?";
List<Row> insertList = new ArrayList<Row>();
for (Row r : sourceRows) {
String timeString = r.getStr("time");
String timeAMString = timeString + " 08:00:00";
Row countRow = Db.use(ds2).findFirst(sqlCheck, r.getStr("site_name"), timeAMString);
if (countRow.getInt("COUNT(*)") == 0) {
String randomId = UUIDUtils.random();
Row insertRow = new Row();
insertRow.set("id", randomId);
insertRow.set("site_name", r.getStr("site_name"));
insertRow.set("level", r.getStr("level"));
insertRow.set("time", timeAMString);
insertRow.set("name", "spider");
insertList.add(insertRow);
}
String timePMString = timeString + " 17:00:00";
countRow = Db.use(ds2).findFirst(sqlCheck, r.getStr("site_name"), timePMString);
if (countRow.getInt("COUNT(*)") == 0) {
String randomId = UUIDUtils.random();
Row insertRow = new Row();
insertRow.set("id", randomId);
insertRow.set("site_name", r.getStr("site_name"));
insertRow.set("level", r.getStr("level"));
insertRow.set("time", timePMString);
insertRow.set("name", "spider");
insertList.add(insertRow);
}
}
log.info("插入的水位总条数:{}", insertList.size());
// 批量保存
Db.use(ds2).batchSave("river_level", insertList, insertList.size());
}
public static void main(String[] args) {
WaterLevelImportService service = new WaterLevelImportService();
service.start();
service.fromDatasource1ToDatasource2();
// service.selectFromAllDatasource();
// service.getDataSourceCountResult();
}
}
代码说明
- 启动数据源:通过 Druid 和 ActiveRow Java-db 分别配置源数据库(
datasource1
)和目标数据库(datasource2
),并启动连接。 - 连接测试:通过
selectFromAllDatasource
方法检查两个数据源是否连接成功。 - 数据迁移:
- 从源数据库的
water_level
表中查询出符合条件的数据。 - 检查目标数据库中是否已存在相同的记录(根据
site_name
和time
字段)。 - 如果不存在,则生成一个随机的
id
,设置其他字段值,并添加到插入列表中。
- 从源数据库的
- 批量插入:将处理后的记录批量插入到目标数据库的
river_level
表中。
注意:代码中还有许多优化空间,例如将数据进行拆分、分线程处理等。
从网络获取数据进行批量保存示例
以下示例展示了如何从网络获取数据,转换为JSONObject
,再转换为 Java 对象,最终转为Row
对象并批量保存到数据库中。
代码实现步骤
- 从网络中获取数据。
- 转换为
JSONObject
。 - 转换为 Java 对象。
- 转换为
Row
对象。 - 批量保存到数据库。
具体代码
import java.io.IOException;
import java.math.BigInteger;
import java.util.ArrayList;
import java.util.List;
import org.junit.Before;
import org.junit.Test;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.Java-db.plugin.activeRow.Db;
import com.Java-db.plugin.activeRow.Row;
import cn.hutool.core.util.IdUtil;
import lombok.extern.slf4j.Slf4j;
import okhttp3.MediaType;
import okhttp3.OkHttpClient;
import okhttp3.Request;
import okhttp3.RequestBody;
import okhttp3.Response;
import top.ppnt.chaofu.job.common.model.CfCameraEventWarm;
import top.ppnt.chaofu.job.db.test.ActiveRowInit;
@Slf4j
public class GetHttpData {
@Before
public void before() {
ActiveRowInit.init();
}
@Test
public void getHttpData() throws IOException {
OkHttpClient client = new OkHttpClient().newBuilder().build();
MediaType mediaType = MediaType.parse("text/plain");
RequestBody body = RequestBody.create(mediaType, "");
String cmd = "smart_jobs/get";
String url = "http://221.7.96.175:8182/api?cmd=" + cmd;
Request request = new Request.Builder()
.url(url)
.method("POST", body)
.addHeader("User-Agent", "apifox/1.0.0 (https://www.apifox.cn)")
.build();
Response response = client.newCall(request).execute();
String responseString = response.body().string();
System.out.println(responseString);
JSONObject jsonObject = JSON.parseObject(responseString);
JSONArray jsonArray = jsonObject.getJSONArray("rows");
int size = jsonArray.size();
System.out.println(size);
// 3.保存到数据库
List<Row> insertList = new ArrayList<Row>(size);
for (int i = 0; i < size; i++) {
JSONObject item = jsonArray.getJSONObject(i);
CfCameraEventWarm javaObject = item.toJavaObject(CfCameraEventWarm.class);
javaObject.setId(BigInteger.valueOf(IdUtil.getSnowflakeNextId()));
Row Row = new Row();
Row.setColumns(javaObject);
insertList.add(Row);
}
// 插入数据库
log.info("开始批量保存数据" + insertList.size());
Db.batchSave("cf_camera_event_warm", insertList, 2000);
log.info("批量保存数据完成");
}
}
代码说明
- 初始化 ActiveRow:在测试开始前,通过
ActiveRowInit.init()
方法初始化 ActiveRow Java-db。 - 获取 HTTP 数据:
- 使用 OkHttp 客户端发送 POST 请求到指定的 URL,获取响应数据。
- 将响应数据转换为字符串并打印输出。
- 解析 JSON 数据:
- 使用 FastJSON 库将响应字符串解析为
JSONObject
。 - 提取
rows
数组,并获取其大小。
- 使用 FastJSON 库将响应字符串解析为
- 转换为 Java 对象:
- 遍历
JSONArray
,将每个JSONObject
转换为CfCameraEventWarm
类的 Java 对象。 - 生成唯一的
id
并设置到 Java 对象中。
- 遍历
- 转换为
Row
对象:- 将 Java 对象的属性设置到
Row
对象中,并添加到插入列表insertList
中。
- 将 Java 对象的属性设置到
- 批量保存数据:
- 使用
Db.batchSave
方法将insertList
中的数据批量保存到cf_camera_event_warm
表中。
- 使用
总结
本文详细介绍了如何在 Java-db 框架中使用Db
类进行批量操作,包括批量保存和批量更新。通过具体的代码示例,展示了从数据准备、转换到批量保存的完整流程。这些操作能够显著提高数据库操作的效率,适用于大规模数据处理场景。