在使用 Java 高级别的客户端批量 API 创建动态弹性搜索索引时需要帮助

分享于2023年04月15日 elasticsearch java rest-client 问答
【问题标题】:Need help in creating dynamic elasticsearch index using Java high level rest client bulk API在使用 Java 高级别的客户端批量 API 创建动态弹性搜索索引时需要帮助
【发布时间】:2023-04-14 22:44:02
【问题描述】:

我正在创建一个将与我们的 Java 代码库集成的 Elasticsearch 集群。我想创建一个 Elasticsearch 索引并将来自多个数据库的 SQL Query 数据插入其中。所有数据库的查询结果都应该插入到同一个索引中。为此,我正在使用 Java High level Rest Client。 但我不太确定如何执行此操作,因为旧 API 中的许多方法已被弃用。 我也不太确定如何处理 createIndexResponse 实例。 任何人都可以帮助我吗?

        public static void method_1(Connection con) throws Exception {

            Statement statement = con.createStatement();
            try {
                ResultSet result = statement.executeQuery("SELECT Field_1, Field_2, Field_3 from Table_1");
                int counter = 1;
                CreateIndexRequest createIndexRequest = new CreateIndexRequest("index_name");
                createIndexRequest.settings(new Settings.Builder()
                        .put("cluster.name", "my_cluster")
                        .put("http.enabled", true)
                        .put("node.data", true)
                        .put("index.number_of_shards", 3)
                        .put("index.number_of_replicas", 1)
                        .build());
                CreateIndexResponse createIndexResponse = ElasticSearch.eclient.indices().create(createIndexRequest, RequestOptions.DEFAULT);
                BulkRequest bulkRequest = new BulkRequest();
                while (result.next()) {
                    String field_1 = result.getString("Field_1");
                    int field_2 = result.getInt("Field_2");
                    String field_3 = result.getString("Field_3");
                    XContentBuilder builder = XContentFactory.jsonBuilder()
                            .startObject()
                            .field("Field 1", field_1)
                            .field("Field 2", field_2)
                            .field("Field 3", field_3)
                            .endObject();
                    UpdateRequest updateRequest = new UpdateRequest("index_name", "_doc", Integer.toString(counter));
                    updateRequest.doc(builder);
                    bulkRequest.add(updateRequest);
                }
                BulkResponse response = ElasticSearch.eclient.bulk(bulkRequest, RequestOptions.DEFAULT);
                if (response.hasFailures()) {
                    for (BulkItemResponse item : response.getItems()) {
                        System.out.println(item.getFailureMessage());
                    }
                }
                counter++;
                statement.close();
            } catch (Exception ex) {
                ex.printStackTrace();
            }
        }

  • 标题误导了 IMO。你应该修复它。

【解决方案1】:

您没有提供您使用的版本。无论如何,我正在考虑您使用的是 7.0.0。

这是一个我猜应该没问题的例子:

try (RestHighLevelClient client = new RestHighLevelClient(
        RestClient.builder(HttpHost.create("http://localhost:9200")))) {
    CreateIndexRequest createIndexRequest = new CreateIndexRequest("index_name");
    createIndexRequest.settings(Settings.builder()
            .put("index.number_of_shards", 3)
            .put("index.number_of_replicas", 1)
            .build());
    client.indices().create(createIndexRequest, RequestOptions.DEFAULT);
    BulkRequest bulkRequest = new BulkRequest();
    XContentBuilder builder = XContentFactory.jsonBuilder()
            .startObject()
                .field("foo", "bar")
            .endObject();
    IndexRequest indexRequest = new IndexRequest("index_name");
    indexRequest.source(builder);
    bulkRequest.add(indexRequest);
    BulkResponse response = client.bulk(bulkRequest, RequestOptions.DEFAULT);
    if (response.hasFailures()) {
        for (BulkItemResponse item : response.getItems()) {
            System.out.println(item.getFailureMessage());
        }
    }
} catch (IOException e) {
    e.printStackTrace();
}

我建议使用更容易处理 IMO 的 BulkProcessor 类。我有 a full demo repository ,我会在每个新版本中更新它。它也可能对您有所帮助。

【讨论】:

  • 感谢您提供有用的见解。所以我想对于循环插入文档,我不必使用UpdateRequest,IndexRequest本身就足够了。
  • 更新 API 对于只更新非常大的文档的一小部分(如您想要更新单个字段的 10mb 文档)很有用。通常,只需使用 Index Document API 来插入和更新文档。这是首选方式。
  • 我还注意到另一件事。在更新的 IntelliJ 版本中,方法“create(createIndexRequest, RequestOptions.DEFAULT)”显示为已弃用(删除)。有什么解决方法吗?