1. 引言

本文将介绍如何使用Spring Boot将CSV文件数据导入Elasticsearch。从遗留系统或外部源迁移数据时,CSV导入是常见需求,同时也是准备测试数据集的实用方法。

2. 使用Docker搭建Elasticsearch环境

通过Docker快速本地部署Elasticsearch:

docker pull docker.elastic.co/elasticsearch/elasticsearch:8.17.0

启动容器:

docker run -d --name elasticsearch -p 9200:9200 -e "discovery.type=single-node" docker.elastic.co/elasticsearch/elasticsearch:8.17.0

准备测试数据文件 products.csv

id,name,category,price,stock
1,Microwave,Appliances,705.77,136
2,Vacuum Cleaner,Appliances,1397.23,92
...

3. 使用手动for循环处理CSV数据

这种方法通过手动循环读取CSV记录并索引到Elasticsearch。我们将使用:

3.1 添加依赖

<dependency>
    <groupId>org.apache.commons</groupId>
    <artifactId>commons-csv</artifactId>
    <version>1.12.0</version>
</dependency>
<dependency>
    <groupId>org.elasticsearch.client</groupId>
    <artifactId>elasticsearch-rest-high-level-client</artifactId>
    <version>7.17.11</version>
</dependency>

3.2 配置Elasticsearch客户端

@Configuration
public class ElasticsearchConfig {
    @Bean
    public RestHighLevelClient restHighLevelClient() {
        return RestClients.create(ClientConfiguration.builder()
          .connectedTo("localhost:9200")
          .build()).rest();
    }
}

3.3 创建实体类

@Document(indexName = "products")
public class Product {
    @Id
    private String id;
    private String name;
    private String category;
    private double price;
    private int stock;

    // Getters and setters
}

3.4 实现导入服务

@Autowired
private RestHighLevelClient restHighLevelClient;

public void importCSV(File file) {
    try (Reader reader = new FileReader(file)) {
        Iterable<CSVRecord> records = CSVFormat.DEFAULT
          .withHeader("id", "name", "category", "price", "stock")
          .withFirstRecordAsHeader()
          .parse(reader);

        for (CSVRecord record : records) {
            IndexRequest request = new IndexRequest("products")
              .id(record.get("id"))
              .source(Map.of(
                "name", record.get("name"),
                "category", record.get("category"),
                "price", Double.parseDouble(record.get("price")),
                "stock", Integer.parseInt(record.get("stock"))
              ));

            restHighLevelClient.index(request, RequestOptions.DEFAULT);
        }
    } catch (Exception e) {
        // 异常处理
    }
}

3.5 执行导入

File csvFile = Paths.get("src", "test", "resources", "products.csv").toFile();
importCSV(csvFile);

验证数据:

IndexRequest firstRequest = captor.getAllValues().get(0);
assertEquals(Map.of(
  "name", "Microwave",
  "category", "Appliances",
  "price", 705.77,
  "stock", 136
), firstRequest.sourceAsMap());

⚠️ 这种方法简单粗暴,但只适合小数据量。处理大文件时效率低下,容易踩坑。

4. 使用Spring Batch进行可扩展的数据导入

Spring Batch 是Java批处理框架,适合大规模数据导入。

4.1 添加依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-batch</artifactId>
    <version>3.4.1</version>
</dependency>

4.2 定义Spring配置文件

@Configuration
@EnableBatchProcessing
public class BatchConfig {
    @Autowired
    private RestHighLevelClient restHighLevelClient;
    // ...
}

4.3 定义Reader

@Bean
public FlatFileItemReader<Product> reader() {
    return new FlatFileItemReaderBuilder<Product>()
      .name("productReader")
      .resource(new FileSystemResource("products.csv"))
      .delimited()
      .names("id", "name", "category", "price", "stock")
      .fieldSetMapper(new BeanWrapperFieldSetMapper<>() {{
          setTargetType(Product.class);
      }})
      .build();
}

✅ 关键配置说明:

  • name():设置Reader名称
  • resource():指定CSV文件路径
  • delimited():声明逗号分隔格式
  • fieldSetMapper():自动映射CSV行到Product对象

4.4 定义Writer

@Bean
public ItemWriter<Product> writer(RestHighLevelClient restHighLevelClient) {
    return products -> {
        for (Product product : products) {
            IndexRequest request = new IndexRequest("products")
              .id(product.getId())
              .source(Map.of(
                "name", product.getName(),
                "category", product.getCategory(),
                "price", product.getPrice(),
                "stock", product.getStock()
              ));
            restHighLevelClient.index(request, RequestOptions.DEFAULT);
        }
    };
}

4.5 定义Spring Batch作业

@Bean
public Job importJob(JobRepository jobRepository, PlatformTransactionManager transactionManager, 
  RestHighLevelClient restHighLevelClient) {
    return new JobBuilder("importJob", jobRepository)
      .start(new StepBuilder("step1", jobRepository)
        .<Product, Product>chunk(10, transactionManager)
        .reader(reader())
        .writer(writer(restHighLevelClient))
        .build())
      .build();
}

✅ 核心配置:

  • chunk(10):每处理10条记录提交一次事务
  • reader()writer():集成数据流

4.6 运行批处理作业

@Configuration
public class JobRunnerConfig {
    @Autowired
    private JobLauncher jobLauncher;
    @Autowired
    private Job importJob;

    @Bean
    public CommandLineRunner runJob() {
        return args -> {
            try {
                JobExecution execution = jobLauncher.run(importJob, new JobParameters());
            } catch (Exception e) {
                // 异常处理
            }
        };
    }
}

验证结果:

curl -X GET "http://localhost:9200/products/_search" \
  -H "Content-Type: application/json" \
  -d '{
        "query": {
          "match_all": {}
        }
      }'

预期返回:

{
  ...
  "hits": {
    "total": {
      "value": 25,
      "relation": "eq"
    },
    "max_score": 1.0,
    "hits": [
      {
        "_index": "products",
        "_type": "_doc",
        "_id": "1",
        "_score": 1.0,
        "_source": {
          "id": "1",
          "name": "Microwave",
          "category": "Appliances",
          "price": 705.77,
          "stock": 136
        }
      },
      {
        "_index": "products",
        "_type": "_doc",
        "_id": "2",
        "_score": 1.0,
        "_source": {
          "id": "2",
          "name": "Vacuum Cleaner",
          "category": "Appliances",
          "price": 1397.23,
          "stock": 92
        }
      }
      ...
    ]
  }
}

⚠️ 配置复杂但扩展性强,适合生产环境大数据量场景。

5. 使用Logstash导入CSV数据

Logstash 是Elastic Stack的数据处理组件。

5.1 部署Logstash

docker pull docker.elastic.co/logstash/logstash:8.17.0

5.2 创建配置文件 csv-to-es.conf

input {
    file {
        path => "/usr/share/logstash/products.csv"
        start_position => "beginning"
        sincedb_path => "/dev/null"
    }
}

filter {
    csv {
        separator => ","
        columns => ["id", "name", "category", "price", "stock"]
    }

    mutate {
        convert => { "price" => "float" }
        convert => { "stock" => "integer" }
    }
}

output {
    elasticsearch {
        hosts => ["http://elasticsearch:9200"]
        index => "products"
    }

    stdout {
        codec => json_lines
    }
}

✅ 配置解析:

  • input:指定CSV文件路径
  • filter:数据转换(类型转换)
  • output:写入Elasticsearch

5.3 执行导入

docker run --rm -v $(pwd)/csv-to-es.conf:/usr/share/logstash/pipeline/logstash.conf \
  -v $(pwd)/products.csv:/usr/share/logstash/products.csv \
  docker.elastic.co/logstash/logstash:8.17.0

✅ 优势:

  • 无需编写代码
  • 高性能处理大数据集
  • 支持复杂数据转换

❌ 缺点:

  • 需要额外部署Logstash
  • 配置语法学习成本

6. 方法对比

方法 优点 缺点
手动for循环 实现简单;完全控制流程 大文件效率低;不适合生产环境
Spring Batch 可扩展性强;事务支持 配置复杂;学习曲线陡峭
Logstash 零代码;高性能;生态完善 需要额外部署;配置语法特殊

7. 结论

本文介绍了三种CSV导入Elasticsearch的方法:

  1. 手动循环:适合快速原型和小数据量测试
  2. Spring Batch:生产环境大数据量导入首选
  3. Logstash:无需编码的自动化数据管道方案

根据实际场景选择:

  • ✅ 小数据量/临时需求 → 手动循环
  • ✅ 生产环境/大数据量 → Spring Batch
  • ✅ 自动化数据管道 → Logstash

每种方案都有其适用场景,理解其核心原理才能避免踩坑,高效完成任务。


原始标题:Import CSV in Elasticsearch Using Spring Boot | Baeldung