1. 概述

Elasticsearch 是一个强大的搜索和分析引擎,特别适合需要灵活过滤的场景。有时我们不仅需要获取查询结果,还需要同时获取这些结果的聚合统计信息。本文将带你实现这个功能。

2. 在Elasticsearch查询中添加聚合

先来了解Elasticsearch原生的聚合功能。假设我们已经在本地运行了Elasticsearch实例,首先创建一个名为 store-items 的索引并插入几条测试数据

POST http://localhost:9200/store-items/_doc
{
    "type": "Multimedia",
    "name": "PC Monitor",
    "price": 1000
}
...
POST http://localhost:9200/store-items/_doc
{
    "type": "Pets",
    "name": "Dog Toy",
    "price": 10
}

现在执行一个不带任何过滤的查询:

GET http://localhost:9200/store-items/_search

响应结果如下:

{
...
    "hits": {
        "total": {
            "value": 5,
            "relation": "eq"
        },
        "max_score": 1.0,
        "hits": [
            {
                "_index": "store-items",
                "_type": "_doc",
                "_id": "J49VVI8B6ADL84Kpbm8A",
                "_score": 1.0,
                "_source": {
                    "_class": "com.baeldung.model.StoreItem",
                    "type": "Multimedia",
                    "name": "PC Monitor",
                    "price": 1000
                }
            },
            {
                "_index": "store-items",
                "_type": "_doc",
                "_id": "KI9VVI8B6ADL84Kpbm8A",
                "_score": 1.0,
                "_source": {
                    "type": "Pets",
                    "name": "Dog Toy",
                    "price": 10
                }
            },
 ...
        ]
    }
}

现在假设我们需要统计每种商品类型的数量。在请求体中添加聚合部分再次查询

GET http://localhost:9200/store-items/_search
{
    "aggs": {
        "type_aggregation": {
            "terms": {
                "field": "type"
            }
        }
    }
}

我们添加了一个名为 type_aggregation 的聚合,使用 terms 聚合类型。响应中新增了 aggregations 部分,展示了每种类型的商品数量

{
...
    "aggregations": {
        "type_aggregation": {
            "doc_count_error_upper_bound": 0,
            "sum_other_doc_count": 0,
            "buckets": [
                {
                    "key": "Multimedia",
                    "doc_count": 2
                },
                {
                    "key": "Pets",
                    "doc_count": 2
                },
                {
                    "key": "Home tech",
                    "doc_count": 1
                }
            ]
        }
    }
}

3. 在Spring Data Elasticsearch中实现带聚合的查询

现在用Spring Data Elasticsearch实现相同的功能。首先添加依赖:

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-elasticsearch</artifactId>
</dependency>

创建Elasticsearch配置类

@Configuration
@EnableElasticsearchRepositories(basePackages = "com.baeldung.spring.data.es.aggregation.repository")
@ComponentScan(basePackages = "com.baeldung.spring.data.es.aggregation")
public class ElasticSearchConfig {

    @Bean
    public RestClient elasticsearchRestClient() {
        return RestClient.builder(HttpHost.create("localhost:9200"))
          .setHttpClientConfigCallback(httpClientBuilder -> {
              httpClientBuilder.addInterceptorLast((HttpResponseInterceptor) (response, context) ->
                  response.addHeader("X-Elastic-Product", "Elasticsearch"));
              return httpClientBuilder;
            })
          .build();
    }

    @Bean
    public ElasticsearchClient elasticsearchClient(RestClient restClient) {
        return ElasticsearchClients.createImperative(restClient);
    }

    @Bean(name = { "elasticsearchOperations", "elasticsearchTemplate" })
    public ElasticsearchOperations elasticsearchOperations(
        ElasticsearchClient elasticsearchClient) {

        ElasticsearchTemplate template = new ElasticsearchTemplate(elasticsearchClient);
        template.setRefreshPolicy(null);

        return template;
    }
}

这里配置了底层REST客户端和实现了 ElasticsearchOperations 接口的包装Bean。接下来创建 StoreItem 实体类:

@Document(indexName = "store-items")
public class StoreItem {
    @Id
    private String id;

    @Field(type = Keyword)
    private String type;
    @Field(type = Keyword)
    private String name;

    @Field(type = Keyword)
    private Long price;

    //getters and setters
}

由于Spring Data仓库无法直接获取聚合结果,我们需要创建自定义扩展先定义扩展接口

public interface StoreItemRepositoryExtension {
    SearchPage<StoreItem> findAllWithAggregations(Pageable pageable);
}

接口包含 findAllWithAggregations() 方法,接收分页参数并返回包含聚合结果的 SearchPage。然后实现该接口:

@Component
public class StoreItemRepositoryExtensionImpl implements StoreItemRepositoryExtension {

    @Autowired
    private ElasticsearchOperations elasticsearchOperations;

    @Override
    public SearchPage<StoreItem> findAllWithAggregations(Pageable pageable) {
        Query query = NativeQuery.builder()
          .withAggregation("type_aggregation",
            Aggregation.of(b -> b.terms(t -> t.field("type"))))
          .build();
        SearchHits<StoreItem> response = elasticsearchOperations.search(query, StoreItem.class);
        return SearchHitSupport.searchPageFor(response, pageable);
    }
}

我们构建了原生查询并添加了聚合部分。沿用前例的聚合名称 type_aggregation,使用 terms 聚合类型统计指定字段的文档数量。

最后创建Spring Data仓库,同时继承 ElasticsearchRepository 和自定义扩展:

@Repository
public interface StoreItemRepository extends ElasticsearchRepository<StoreItem, String>,
  StoreItemRepositoryExtension {
}

编写测试验证聚合功能:

@ExtendWith(SpringExtension.class)
@ContextConfiguration(classes = ElasticSearchConfig.class)
public class ElasticSearchAggregationManualTest {

    private static final List<StoreItem> EXPECTED_ITEMS = List.of(
      new StoreItem("Multimedia", "PC Monitor", 1000L),
      new StoreItem("Multimedia", "Headphones", 200L), 
      new StoreItem("Home tech", "Barbecue Grill", 2000L), 
      new StoreItem("Pets", "Dog Toy", 10L),
      new StoreItem("Pets", "Cat shampoo", 5L));
...

    @BeforeEach
    public void before() {
        repository.saveAll(EXPECTED_ITEMS);
    }

...
} 

测试数据包含5种商品,每种类型有不同数量的商品。测试执行前会初始化这些数据。现在调用 findAllWithAggregations() 方法并验证结果:

@Test
void givenFullTitle_whenRunMatchQuery_thenDocIsFound() {
    SearchHits<StoreItem> searchHits = repository.findAllWithAggregations(Pageable.ofSize(2))
      .getSearchHits();
    List<StoreItem> data = searchHits.getSearchHits()
      .stream()
      .map(SearchHit::getContent)
      .toList();

    Assertions.assertThat(data).containsAll(EXPECTED_ITEMS);

    Map<String, Long> aggregatedData = ((ElasticsearchAggregations) searchHits
      .getAggregations())
      .get("type_aggregation")
      .aggregation()
      .getAggregate()
      .sterms()
      .buckets()
      .array()
      .stream()
      .collect(Collectors.toMap(bucket -> bucket.key()
        .stringValue(), MultiBucketBase::docCount));

    Assertions.assertThat(aggregatedData).containsExactlyInAnyOrderEntriesOf(
      Map.of("Multimedia", 2L, "Home tech", 1L, "Pets", 2L));
}

验证结果包含两部分

  1. searchHits 获取的原始查询结果
  2. 从聚合结果中提取的统计信息,包含每种商品类型的数量

4. 总结

本文展示了如何在Spring Data仓库中集成Elasticsearch的聚合功能。我们使用了 terms 聚合作为示例,但Elasticsearch还支持多种其他聚合类型,可以满足更复杂的统计需求。

完整源码可在GitHub仓库获取。


原始标题:Add an Aggregation to an Elasticsearch Query | Baeldung