Elastic 工具类使用指南

本章介绍了 JavaDB 提供的 Elastic 工具类的使用方法,包括索引的创建、查看、删除,以及文档的批量导入、查询、更新和删除等操作。

索引操作

创建索引

创建索引的命令如下:

curl -X PUT "http://127.0.0.1:9200/service" -H 'Content-Type: application/json' -u elastic:YourElasticPassword -d'
{
  "settings": {
    "index": {
      "max_result_window": 10000000
    }
  }
}'

查看索引

要查看索引的设置,可以使用以下命令:

curl -X GET "http://127.0.0.1:9200/service/_settings" -u elastic:YourElasticPassword

示例返回结果:

{
  "service": {
    "settings": {
      "index": {
        "routing": {
          "allocation": {
            "include": {
              "_tier_preference": "data_content"
            }
          }
        },
        "number_of_shards": "1",
        "provided_name": "service",
        "max_result_window": "10000000",
        "creation_date": "1724328711514",
        "number_of_replicas": "1",
        "uuid": "x-I-6xfZQGyeqwvvqWg8XA",
        "version": {
          "created": "7100099"
        }
      }
    }
  }
}

删除索引

删除索引的命令如下:

curl -X DELETE "http://127.0.0.1:9200/_all" -u elastic:YourElasticPassword

Java 代码示例

import org.elasticsearch.action.admin.indices.create.CreateIndexRequest;
import org.elasticsearch.action.admin.indices.delete.DeleteIndexRequest;
import org.elasticsearch.action.support.master.AcknowledgedResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.common.settings.Settings;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.Test;

import com.litongjava.es.client.Elastic;
import com.litongjava.tio.utils.environment.EnvUtils;

public class IndexTest {

  @Test
  public void createIndex() {
    try {
      EnvUtils.load();
      new ElasticSearchConfig().config();
      RestHighLevelClient client = Elastic.getClient();

      CreateIndexRequest request = new CreateIndexRequest("service");
      request.settings(Settings.builder().put("index.max_result_window", 10000000));
      request.source("{}", XContentType.JSON);

      org.elasticsearch.action.admin.indices.create.CreateIndexResponse createIndexResponse = Elastic.createIndex(request, RequestOptions.DEFAULT);
      System.out.println("Index created: " + createIndexResponse.index());
      client.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }

  @Test
  public void deleteAllIndex() {
    try {
      EnvUtils.load();
      new ElasticSearchConfig().config();
      RestHighLevelClient client = Elastic.getClient();

      DeleteIndexRequest request = new DeleteIndexRequest("_all");
      AcknowledgedResponse deleteIndexResponse = client.indices().delete(request, RequestOptions.DEFAULT);
      System.out.println("All indices deleted: " + deleteIndexResponse.isAcknowledged());

      client.close();
    } catch (Exception e) {
      e.printStackTrace();
    }
  }
}

数据文档操作

批量导入

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.elasticsearch.action.bulk.BulkRequest;
import org.elasticsearch.action.bulk.BulkResponse;
import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.Test;

import com.alibaba.fastjson.JSON;
import com.litongjava.db.activerecord.Db;
import com.litongjava.db.activerecord.Page;
import com.litongjava.db.activerecord.Record;
import com.litongjava.db.activerecord.SqlPara;
import com.litongjava.es.client.Elastic;
import com.litongjava.tio.utils.environment.EnvUtils;

public class DocumentTest {

  String tableName = "service";

  @Test
  public void importService() {
    EnvUtils.load();
    new DbConfig().configWithoutSpring();
    new ElasticSearchConfig().config();

    // 获取所有服务
    Long count = Db.countTable("service");
    long round = count / 1000 + 1;

    for (int i = 0; i < round; i++) {
      String sql = "select * from " + tableName + " order by service_id";
      SqlPara sqlPara = SqlPara.by(sql);
      int pageNo = i + 1;
      Page<Record> page = Db.paginate(pageNo, 1000, sqlPara);

      List<Record> list = page.getList();

      BulkRequest bulkRequest = new BulkRequest();
      System.out.println("start:" + pageNo);
      for (Record r : list) {
        IndexRequest source = new IndexRequest(tableName).id(r.getLong("service_id").toString())
            .source(JSON.toJSONString(r.toMap()), XContentType.JSON);
        bulkRequest.add(source);
      }
      System.out.println("finish:" + pageNo);

      // 导入到 ES
      BulkResponse bulk = Elastic.bulk(bulkRequest, RequestOptions.DEFAULT);
      System.out.println(bulk);
    }
  }
}

查询所有文档

  @Test
  public void selectAllDocument() {
    EnvUtils.load();
    new DbConfig().configWithoutSpring();
    new ElasticSearchConfig().config();

    int curId = 0;

    List<Map<String, Object>> list = new ArrayList<>();
    while (true) {
      SearchRequest searchRequest = new SearchRequest(tableName);
      SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().trackTotalHits(true);

      // 查询条件
      MatchAllQueryBuilder allQueryBuilder = QueryBuilders.matchAllQuery();

      sourceBuilder.query(allQueryBuilder);
      sourceBuilder.trackTotalHits(true);
      sourceBuilder.trackTotalHitsUpTo(Integer.MAX_VALUE);
      sourceBuilder.from(curId * 10000);
      sourceBuilder.size(10000);
      searchRequest.source(sourceBuilder);
      curId++;

      SearchResponse search = Elastic.search(searchRequest, RequestOptions.DEFAULT);
      if (search.getHits().getHits().length == 0) {
        break;
      }
      for (SearchHit hits : search.getHits().getHits()) {
        Map<String, Object> map = hits.getSourceAsMap();
        list.add(map);
      }
    }

    System.out.println(list.size());
  }

部分返回数据示例:

{
  "service_title": "",
  "service_state": 1,
  "service_is_show": 0,
  ...
}

查看所有索引

curl -X GET "http://localhost:9200/_cat/indices?v" -u elastic:YourElasticPassword
health status index       uuid                   pri rep docs.count docs.deleted store.size pri.store.size
yellow open   service     x-I-6xfZQGyeqwvvqWg8XA   1   1      53864        44958     53.8mb         53.8mb

在 Elasticsearch 中,docs.deleted 字段表示被标记为删除但尚未被真正删除的文档数量。出现这种情况的原因通常如下:

  1. 文档更新:在 Elasticsearch 中,更新文档实际上是删除旧文档并插入新文档。因此,即使没有显式执行删除操作,但如果文档进行了更新,docs.deleted 的计数也会增加。

  2. 自动合并(Merge)操作:Elasticsearch 会定期执行自动合并操作,将已删除的文档清理掉并回收磁盘空间。合并操作不是立即执行的,因此在合并完成之前,被删除的文档仍然会被计数。

  3. 删除索引的文档:如果索引过程中发现某些文档需要删除,Elasticsearch 会将这些文档标记为删除,尽管没有显式执行删除操作。

处理 docs.deleted 的建议

  1. 手动触发合并:可以手动触发合并操作来清理已删除的文档。不过,在生产环境中频繁手动执行合并可能会影响性能。

    curl -X POST "http://localhost:9200/service/_forcemerge?only_expunge_deletes=true" -u elastic:YourElasticPassword
    
  2. 监控和优化索引:定期监控 docs.deleted 的数量,并在必要时优化索引。

  3. 确认是否有未预期的文档更新:检查索引逻辑,确认是否有意外的文档更新或操作,导致文档被标记为删除。

搜索文档

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.index.query.MatchQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api

.Test;

import com.litongjava.es.client.Elastic;
import com.litongjava.tio.utils.environment.EnvUtils;

public class DocumentTitleSearch {

  @Test
  public void searchByTitle() {
    EnvUtils.load();
    new DbConfig().configWithoutSpring();
    new ElasticSearchConfig().config();

    String title = "服务好"; // 你要搜索的标题

    int curId = 0;

    List<Map<String, Object>> list = new ArrayList<>();
    while (true) {
      SearchRequest searchRequest = new SearchRequest("service");
      SearchSourceBuilder sourceBuilder = new SearchSourceBuilder().trackTotalHits(true);

      // 使用 matchQuery 进行标题搜索
      MatchQueryBuilder titleQueryBuilder = QueryBuilders.matchQuery("service_title", title);

      sourceBuilder.query(titleQueryBuilder);
      sourceBuilder.trackTotalHits(true);
      sourceBuilder.trackTotalHitsUpTo(Integer.MAX_VALUE);
      sourceBuilder.from(curId * 10000);
      sourceBuilder.size(10000);
      searchRequest.source(sourceBuilder);
      curId++;

      SearchResponse search = Elastic.search(searchRequest, RequestOptions.DEFAULT);
      if (search.getHits().getHits().length == 0) {
        break;
      }
      for (SearchHit hits : search.getHits().getHits()) {
        Map<String, Object> map = hits.getSourceAsMap();
        list.add(map);
      }
    }

    System.out.println(list.size());
  }
}

更新文档

package com.elasticsearch;

import java.util.HashMap;
import java.util.Map;

import org.elasticsearch.action.index.IndexRequest;
import org.elasticsearch.action.index.IndexResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.common.xcontent.XContentType;
import org.junit.jupiter.api.Test;

import com.alibaba.fastjson2.JSON;
import com.litongjava.es.client.Elastic;

public class DocumentUpdateTest {

  @Test
  public void testUpdate() {
    // 模拟更新数据
    Map<String, Object> service = new HashMap<>();

    try {
      IndexRequest indexRequest = new IndexRequest("service");
      indexRequest.id(service.get("service_id").toString());
      String jsonUser = JSON.toJSONString(service);
      indexRequest.source(jsonUser, XContentType.JSON);

      IndexResponse index = Elastic.index(indexRequest, RequestOptions.DEFAULT);
    } catch (Exception es) {
      es.printStackTrace();
    }
  }
}

删除文档

package com.elasticsearch;

import org.elasticsearch.action.delete.DeleteRequest;
import org.elasticsearch.client.RequestOptions;
import org.junit.jupiter.api.Test;

import com.litongjava.es.client.Elastic;

public class DocumentDeleteTest {

  @Test
  public void testDelete() {
    String id = "";
    try {
      DeleteRequest deleteRequest = new DeleteRequest("service", id);
      Elastic.delete(deleteRequest, RequestOptions.DEFAULT);
    } catch (Exception es) {
      es.printStackTrace();
    }
  }
}

搜索详解

分页查询

package com.elasticsearch;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;

import org.elasticsearch.action.search.SearchRequest;
import org.elasticsearch.action.search.SearchResponse;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.index.query.BoolQueryBuilder;
import org.elasticsearch.index.query.QueryBuilders;
import org.elasticsearch.search.SearchHit;
import org.elasticsearch.search.builder.SearchSourceBuilder;
import org.junit.jupiter.api.Test;

import com.litongjava.es.client.Elastic;
import com.litongjava.tio.utils.environment.EnvUtils;
import com.litongjava.tio.utils.json.JsonUtils;

import cn.hutool.core.util.StrUtil;

public class DocumentPageSearch {

  @Test
  public void pageSearch() {
    EnvUtils.load();
    new ElasticSearchConfig().config();

    String keyWord = "";
    int pageNo = 1;
    int pageSize = 9;
    String indexName = "service";

    List<Map<String, Object>> list = new ArrayList<>();
    try {
      SearchRequest searchRequest = new SearchRequest(indexName);
      SearchSourceBuilder sourceBuilder = new SearchSourceBuilder();
      BoolQueryBuilder boolQuery = QueryBuilders.boolQuery();
      boolQuery.must(QueryBuilders.termsQuery("service_state", "1"));
      boolQuery.must(QueryBuilders.matchPhrasePrefixQuery("service_city", "澳门特别行政区"));

      if (StrUtil.isNotBlank(keyWord)) {
        boolQuery.must(QueryBuilders.queryStringQuery(keyWord));
      }
      sourceBuilder.query(boolQuery);
      sourceBuilder.from((pageNo - 1) * pageSize);
      sourceBuilder.size(pageSize);
      searchRequest.source(sourceBuilder);
      SearchResponse search = Elastic.search(searchRequest, RequestOptions.DEFAULT);
      for (SearchHit hits : search.getHits().getHits()) {
        Map<String, Object> sourceAsMap = hits.getSourceAsMap();
        list.add(sourceAsMap);
      }
    } catch (Exception e) {
      e.printStackTrace();
    }
    System.out.println(JsonUtils.toJson(list.get(0)));
  }
}

通过 ID 查询

  public static Map<String, Object> getService(String id) {
    try {
      GetRequest getRequest = new GetRequest("service", id);
      GetResponse getResponse = Elastic.get(getRequest, RequestOptions.DEFAULT);
      Map<String, Object> sourceAsMap = getResponse.getSourceAsMap();
      return sourceAsMap;
    } catch (Exception es) {
      es.printStackTrace();
    }
    return null;
  }