1. 概述
Elasticsearch 是一个强大的搜索和分析引擎,特别适合需要灵活过滤的场景。有时我们不仅需要获取查询结果,还需要同时获取这些结果的聚合统计信息。本文将带你实现这个功能。
2. 在Elasticsearch查询中添加聚合
先来了解Elasticsearch原生的聚合功能。假设我们已经在本地运行了Elasticsearch实例,首先创建一个名为 store-items
的索引并插入几条测试数据:
POST http://localhost:9200/store-items/_doc
{
"type": "Multimedia",
"name": "PC Monitor",
"price": 1000
}
...
POST http://localhost:9200/store-items/_doc
{
"type": "Pets",
"name": "Dog Toy",
"price": 10
}
现在执行一个不带任何过滤的查询:
GET http://localhost:9200/store-items/_search
响应结果如下:
{
...
"hits": {
"total": {
"value": 5,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "store-items",
"_type": "_doc",
"_id": "J49VVI8B6ADL84Kpbm8A",
"_score": 1.0,
"_source": {
"_class": "com.baeldung.model.StoreItem",
"type": "Multimedia",
"name": "PC Monitor",
"price": 1000
}
},
{
"_index": "store-items",
"_type": "_doc",
"_id": "KI9VVI8B6ADL84Kpbm8A",
"_score": 1.0,
"_source": {
"type": "Pets",
"name": "Dog Toy",
"price": 10
}
},
...
]
}
}
现在假设我们需要统计每种商品类型的数量。在请求体中添加聚合部分再次查询:
GET http://localhost:9200/store-items/_search
{
"aggs": {
"type_aggregation": {
"terms": {
"field": "type"
}
}
}
}
我们添加了一个名为 type_aggregation
的聚合,使用 terms
聚合类型。响应中新增了 aggregations
部分,展示了每种类型的商品数量:
{
...
"aggregations": {
"type_aggregation": {
"doc_count_error_upper_bound": 0,
"sum_other_doc_count": 0,
"buckets": [
{
"key": "Multimedia",
"doc_count": 2
},
{
"key": "Pets",
"doc_count": 2
},
{
"key": "Home tech",
"doc_count": 1
}
]
}
}
}
3. 在Spring Data Elasticsearch中实现带聚合的查询
现在用Spring Data Elasticsearch实现相同的功能。首先添加依赖:
<dependency>
<groupId>org.springframework.data</groupId>
<artifactId>spring-data-elasticsearch</artifactId>
</dependency>
创建Elasticsearch配置类:
@Configuration
@EnableElasticsearchRepositories(basePackages = "com.baeldung.spring.data.es.aggregation.repository")
@ComponentScan(basePackages = "com.baeldung.spring.data.es.aggregation")
public class ElasticSearchConfig {
@Bean
public RestClient elasticsearchRestClient() {
return RestClient.builder(HttpHost.create("localhost:9200"))
.setHttpClientConfigCallback(httpClientBuilder -> {
httpClientBuilder.addInterceptorLast((HttpResponseInterceptor) (response, context) ->
response.addHeader("X-Elastic-Product", "Elasticsearch"));
return httpClientBuilder;
})
.build();
}
@Bean
public ElasticsearchClient elasticsearchClient(RestClient restClient) {
return ElasticsearchClients.createImperative(restClient);
}
@Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
public ElasticsearchOperations elasticsearchOperations(
ElasticsearchClient elasticsearchClient) {
ElasticsearchTemplate template = new ElasticsearchTemplate(elasticsearchClient);
template.setRefreshPolicy(null);
return template;
}
}
这里配置了底层REST客户端和实现了 ElasticsearchOperations
接口的包装Bean。接下来创建 StoreItem
实体类:
@Document(indexName = "store-items")
public class StoreItem {
@Id
private String id;
@Field(type = Keyword)
private String type;
@Field(type = Keyword)
private String name;
@Field(type = Keyword)
private Long price;
//getters and setters
}
由于Spring Data仓库无法直接获取聚合结果,我们需要创建自定义扩展。先定义扩展接口:
public interface StoreItemRepositoryExtension {
SearchPage<StoreItem> findAllWithAggregations(Pageable pageable);
}
接口包含 findAllWithAggregations()
方法,接收分页参数并返回包含聚合结果的 SearchPage
。然后实现该接口:
@Component
public class StoreItemRepositoryExtensionImpl implements StoreItemRepositoryExtension {
@Autowired
private ElasticsearchOperations elasticsearchOperations;
@Override
public SearchPage<StoreItem> findAllWithAggregations(Pageable pageable) {
Query query = NativeQuery.builder()
.withAggregation("type_aggregation",
Aggregation.of(b -> b.terms(t -> t.field("type"))))
.build();
SearchHits<StoreItem> response = elasticsearchOperations.search(query, StoreItem.class);
return SearchHitSupport.searchPageFor(response, pageable);
}
}
我们构建了原生查询并添加了聚合部分。沿用前例的聚合名称 type_aggregation
,使用 terms
聚合类型统计指定字段的文档数量。
最后创建Spring Data仓库,同时继承 ElasticsearchRepository
和自定义扩展:
@Repository
public interface StoreItemRepository extends ElasticsearchRepository<StoreItem, String>,
StoreItemRepositoryExtension {
}
编写测试验证聚合功能:
@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = ElasticSearchConfig.class)
public class ElasticSearchAggregationManualTest {
private static final List<StoreItem> EXPECTED_ITEMS = List.of(
new StoreItem("Multimedia", "PC Monitor", 1000L),
new StoreItem("Multimedia", "Headphones", 200L),
new StoreItem("Home tech", "Barbecue Grill", 2000L),
new StoreItem("Pets", "Dog Toy", 10L),
new StoreItem("Pets", "Cat shampoo", 5L));
...
@BeforeEach
public void before() {
repository.saveAll(EXPECTED_ITEMS);
}
...
}
测试数据包含5种商品,每种类型有不同数量的商品。测试执行前会初始化这些数据。现在调用 findAllWithAggregations()
方法并验证结果:
@Test
void givenFullTitle_whenRunMatchQuery_thenDocIsFound() {
SearchHits<StoreItem> searchHits = repository.findAllWithAggregations(Pageable.ofSize(2))
.getSearchHits();
List<StoreItem> data = searchHits.getSearchHits()
.stream()
.map(SearchHit::getContent)
.toList();
Assertions.assertThat(data).containsAll(EXPECTED_ITEMS);
Map<String, Long> aggregatedData = ((ElasticsearchAggregations) searchHits
.getAggregations())
.get("type_aggregation")
.aggregation()
.getAggregate()
.sterms()
.buckets()
.array()
.stream()
.collect(Collectors.toMap(bucket -> bucket.key()
.stringValue(), MultiBucketBase::docCount));
Assertions.assertThat(aggregatedData).containsExactlyInAnyOrderEntriesOf(
Map.of("Multimedia", 2L, "Home tech", 1L, "Pets", 2L));
}
✅ 验证结果包含两部分:
- 从
searchHits
获取的原始查询结果 - 从聚合结果中提取的统计信息,包含每种商品类型的数量
4. 总结
本文展示了如何在Spring Data仓库中集成Elasticsearch的聚合功能。我们使用了 terms
聚合作为示例,但Elasticsearch还支持多种其他聚合类型,可以满足更复杂的统计需求。
完整源码可在GitHub仓库获取。