少即是多,ElasticSearch 中的 Bulk 操作
495 字大约 2 分钟2024年11月20日
使用场景
ElasticSearch 的 Bulk API 主要用于高性能操作,如索引、更新和删除大量数据。在需要高效地插入数百万条文档而不影响 ElasticSearch 集群时,非常有用。
原理
Bulk API 可以在一次调用中处理多个索引、更新和删除请求,显著减少多次 HTTP 请求带来的开销。
执行操作
为了执行批量操作,我会创建一个 BulkRequest
并添加多个 IndexRequest
、UpdateRequest
或 DeleteRequest
。
BulkRequest bulkRequest = new BulkRequest();
bulkRequest.add(new IndexRequest("index").id("1").source(jsonBuilder()
.startObject()
.field("field", "value")
.endObject()));
bulkRequest.add(new UpdateRequest("index", "1")
.doc(jsonBuilder()
.startObject()
.field("field", "new_value")
.endObject()));
bulkRequest.add(new DeleteRequest("index", "1"));
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
获取查看校验操作结果
我会检查响应以确保所有操作都成功。每个响应都有状态来指示成功或失败。
for (BulkItemResponse bulkItemResponse : bulkResponse) {
if (bulkItemResponse.isFailed()) {
BulkItemResponse.Failure failure = bulkItemResponse.getFailure();
System.out.println("Error: " + failure.getMessage());
} else {
System.out.println("Operation successful: " + bulkItemResponse.getId());
}
}
批量大小控制
控制批量大小对于避免影响 ElasticSearch 集群非常重要。如果出现错误或集群变得不稳定,建议发送较小的批次。
BulkRequest bulkRequest = new BulkRequest();
int batchSize = 1000;
for (int i = 0; i < documents.size(); i++) {
bulkRequest.add(new IndexRequest("index").id(documents.get(i).getId()).source(documents.get(i).getSource()));
if (i % batchSize == 0 && i != 0) {
BulkResponse bulkResponse = client.bulk(bulkRequest, RequestOptions.DEFAULT);
bulkRequest = new BulkRequest();
}
}