1. 引言
本文将介绍如何使用Spring Boot将CSV文件数据导入Elasticsearch。从遗留系统或外部源迁移数据时,CSV导入是常见需求,同时也是准备测试数据集的实用方法。
2. 使用Docker搭建Elasticsearch环境
通过Docker快速本地部署Elasticsearch:
docker pull docker.elastic.co/elasticsearch/elasticsearch:8.17.0
启动容器:
docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:8.17.0
准备测试数据文件 products.csv
:
id,name,category,price,stock
1,Microwave,Appliances,705.77,136
2,Vacuum Cleaner,Appliances,1397.23,92
...
3. 使用手动for循环处理CSV数据
这种方法通过手动循环读取CSV记录并索引到Elasticsearch。我们将使用:
3.1 添加依赖
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-csv</artifactId>
<version>1.12.0</version>
</dependency>
<dependency>
<groupId>org.elasticsearch.client</groupId>
<artifactId>elasticsearch-rest-high-level-client</artifactId>
<version>7.17.11</version>
</dependency>
3.2 配置Elasticsearch客户端
@Configuration
public class ElasticsearchConfig {
@Bean
public RestHighLevelClient restHighLevelClient() {
return RestClients.create(ClientConfiguration.builder()
.connectedTo("localhost:9200")
.build()).rest();
}
}
3.3 创建实体类
@Document(indexName = "products")
public class Product {
@Id
private String id;
private String name;
private String category;
private double price;
private int stock;
// Getters and setters
}
3.4 实现导入服务
@Autowired
private RestHighLevelClient restHighLevelClient;
public void importCSV(File file) {
try (Reader reader = new FileReader(file)) {
Iterable<CSVRecord> records = CSVFormat.DEFAULT
.withHeader("id", "name", "category", "price", "stock")
.withFirstRecordAsHeader()
.parse(reader);
for (CSVRecord record : records) {
IndexRequest request = new IndexRequest("products")
.id(record.get("id"))
.source(Map.of(
"name", record.get("name"),
"category", record.get("category"),
"price", Double.parseDouble(record.get("price")),
"stock", Integer.parseInt(record.get("stock"))
));
restHighLevelClient.index(request, RequestOptions.DEFAULT);
}
} catch (Exception e) {
// 异常处理
}
}
3.5 执行导入
File csvFile = Paths.get("src", "test", "resources", "products.csv").toFile();
importCSV(csvFile);
验证数据:
IndexRequest firstRequest = captor.getAllValues().get(0);
assertEquals(Map.of(
"name", "Microwave",
"category", "Appliances",
"price", 705.77,
"stock", 136
), firstRequest.sourceAsMap());
⚠️ 这种方法简单粗暴,但只适合小数据量。处理大文件时效率低下,容易踩坑。
4. 使用Spring Batch进行可扩展的数据导入
Spring Batch 是Java批处理框架,适合大规模数据导入。
4.1 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-batch</artifactId>
<version>3.4.1</version>
</dependency>
4.2 定义Spring配置文件
@Configuration
@EnableBatchProcessing
public class BatchConfig {
@Autowired
private RestHighLevelClient restHighLevelClient;
// ...
}
4.3 定义Reader
@Bean
public FlatFileItemReader<Product> reader() {
return new FlatFileItemReaderBuilder<Product>()
.name("productReader")
.resource(new FileSystemResource("products.csv"))
.delimited()
.names("id", "name", "category", "price", "stock")
.fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
setTargetType(Product.class);
}})
.build();
}
✅ 关键配置说明:
name()
:设置Reader名称resource()
:指定CSV文件路径delimited()
:声明逗号分隔格式fieldSetMapper()
:自动映射CSV行到Product对象
4.4 定义Writer
@Bean
public ItemWriter<Product> writer(RestHighLevelClient restHighLevelClient) {
return products -> {
for (Product product : products) {
IndexRequest request = new IndexRequest("products")
.id(product.getId())
.source(Map.of(
"name", product.getName(),
"category", product.getCategory(),
"price", product.getPrice(),
"stock", product.getStock()
));
restHighLevelClient.index(request, RequestOptions.DEFAULT);
}
};
}
4.5 定义Spring Batch作业
@Bean
public Job importJob(JobRepository jobRepository, PlatformTransactionManager transactionManager,
RestHighLevelClient restHighLevelClient) {
return new JobBuilder("importJob", jobRepository)
.start(new StepBuilder("step1", jobRepository)
.<Product, Product>chunk(10, transactionManager)
.reader(reader())
.writer(writer(restHighLevelClient))
.build())
.build();
}
✅ 核心配置:
chunk(10)
:每处理10条记录提交一次事务reader()
和writer()
:集成数据流
4.6 运行批处理作业
@Configuration
public class JobRunnerConfig {
@Autowired
private JobLauncher jobLauncher;
@Autowired
private Job importJob;
@Bean
public CommandLineRunner runJob() {
return args -> {
try {
JobExecution execution = jobLauncher.run(importJob, new JobParameters());
} catch (Exception e) {
// 异常处理
}
};
}
}
验证结果:
curl -X GET "http://localhost:9200/products/_search" \
-H "Content-Type: application/json" \
-d '{
"query": {
"match_all": {}
}
}'
预期返回:
{
...
"hits": {
"total": {
"value": 25,
"relation": "eq"
},
"max_score": 1.0,
"hits": [
{
"_index": "products",
"_type": "_doc",
"_id": "1",
"_score": 1.0,
"_source": {
"id": "1",
"name": "Microwave",
"category": "Appliances",
"price": 705.77,
"stock": 136
}
},
{
"_index": "products",
"_type": "_doc",
"_id": "2",
"_score": 1.0,
"_source": {
"id": "2",
"name": "Vacuum Cleaner",
"category": "Appliances",
"price": 1397.23,
"stock": 92
}
}
...
]
}
}
⚠️ 配置复杂但扩展性强,适合生产环境大数据量场景。
5. 使用Logstash导入CSV数据
Logstash 是Elastic Stack的数据处理组件。
5.1 部署Logstash
docker pull docker.elastic.co/logstash/logstash:8.17.0
5.2 创建配置文件 csv-to-es.conf
input {
file {
path => "/usr/share/logstash/products.csv"
start_position => "beginning"
sincedb_path => "/dev/null"
}
}
filter {
csv {
separator => ","
columns => ["id", "name", "category", "price", "stock"]
}
mutate {
convert => { "price" => "float" }
convert => { "stock" => "integer" }
}
}
output {
elasticsearch {
hosts => ["http://elasticsearch:9200"]
index => "products"
}
stdout {
codec => json_lines
}
}
✅ 配置解析:
- input:指定CSV文件路径
- filter:数据转换(类型转换)
- output:写入Elasticsearch
5.3 执行导入
docker run --rm -v $(pwd)/csv-to-es.conf:/usr/share/logstash/pipeline/logstash.conf \
-v $(pwd)/products.csv:/usr/share/logstash/products.csv \
docker.elastic.co/logstash/logstash:8.17.0
✅ 优势:
- 无需编写代码
- 高性能处理大数据集
- 支持复杂数据转换
❌ 缺点:
- 需要额外部署Logstash
- 配置语法学习成本
6. 方法对比
方法 | 优点 | 缺点 |
---|---|---|
手动for循环 | 实现简单;完全控制流程 | 大文件效率低;不适合生产环境 |
Spring Batch | 可扩展性强;事务支持 | 配置复杂;学习曲线陡峭 |
Logstash | 零代码;高性能;生态完善 | 需要额外部署;配置语法特殊 |
7. 结论
本文介绍了三种CSV导入Elasticsearch的方法:
- 手动循环:适合快速原型和小数据量测试
- Spring Batch:生产环境大数据量导入首选
- Logstash:无需编码的自动化数据管道方案
根据实际场景选择:
- ✅ 小数据量/临时需求 → 手动循环
- ✅ 生产环境/大数据量 → Spring Batch
- ✅ 自动化数据管道 → Logstash
每种方案都有其适用场景,理解其核心原理才能避免踩坑,高效完成任务。