1. 简介

Amazon Kinesis 是 AWS 提供的实时数据流处理服务,广泛用于构建事件驱动架构。它能高效地收集、处理和分析流式数据,是现代微服务和大数据系统中的关键组件。

本文将带你使用 Spring 框架对接 Kinesis,实现数据的生产与消费。我们会覆盖三种主流方式:

✅ 原生 AWS SDK
✅ KCL / KPL 客户端库
✅ Spring Cloud Stream Binder Kinesis

文中代码仅为演示核心逻辑,实际生产环境需补充错误处理、重试、监控等机制


2. 前置条件

在开始编码前,需完成以下准备:

  1. ✅ 创建一个 Spring Boot 项目(建议使用 spring-boot-starter-web + spring-boot-starter
  2. ✅ 在 AWS 控制台或通过 CLI 创建 Kinesis Data Stream(例如命名为 live-ips
  3. ✅ 获取 IAM 凭据(Access Key + Secret Key)并配置权限(kinesis:PutRecord, kinesis:GetRecords 等)
  4. ✅ 明确 AWS 区域(如 eu-central-1

我们模拟的场景是:生产者持续写入伪造的 IP 地址,消费者从流中读取并打印到控制台。


3. 使用 AWS SDK for Java

这是最底层的方式,直接调用 AWS 提供的 Java SDK。优点是控制粒度最细;缺点是需要手动处理分片(shard)、游标(shard iterator)、错误重试、消费者守护等,生产环境使用成本高

3.1 Maven 依赖

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>aws-java-sdk-kinesis</artifactId>
    <version>1.12.380</version>
</dependency>

⚠️ 注意:这是 V1 版本 SDK,V2 已重构,API 不同。

3.2 Spring 配置

AmazonKinesis 客户端注册为 Spring Bean:

@Bean
public AmazonKinesis buildAmazonKinesis() {
    BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
    return AmazonKinesisClientBuilder.standard()
      .withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
      .withRegion(Regions.EU_CENTRAL_1)
      .build();
}

application.properties 中配置凭证:

aws.access.key=AKIAIOSFODNN7EXAMPLE
aws.secret.key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY

通过 @Value 注入:

@Value("${aws.access.key}")
private String accessKey;

@Value("${aws.secret.key}")
private String secretKey;

3.3 消费者实现

AWS SDK 使用拉模型(pull model),需主动从分片获取数据。

首先初始化分片迭代器(shard iterator):

private GetShardIteratorResult shardIterator;

@PostConstruct
private void buildShardIterator() {
    GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
    readShardsRequest.setStreamName("live-ips");
    readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
    readShardsRequest.setShardId("shardId-000000000000");

    this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}

然后轮询获取记录:

GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);

GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
    recordsResult.getRecords().stream()
      .map(record -> new String(record.getData().array()))
      .forEach(System.out::println);

    recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
    recordsResult = kinesis.getRecords(recordsRequest);
}

⚠️ 踩坑提示:GetRecordsResult 返回的记录可能为空(无新数据),需配合 Thread.sleep() 避免空转。

3.4 生产者实现

使用 PutRecordsRequest 批量写入数据:

List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
    PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
    entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
    entry.setPartitionKey("ip-partition-key");
    return entry;
}).collect(Collectors.toList());

PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName("live-ips");
createRecordsRequest.setRecords(entries);

kinesis.putRecords(createRecordsRequest);

✅ 简单粗暴,但注意 PutRecords 有大小和数量限制(最大 500 条 / 5MB)。


4. 使用 KCL 与 KPL

Kinesis Client Library(KCL)和 Kinesis Producer Library(KPL)是对 SDK 的封装,极大简化开发。

  • KCL:自动处理分片分配、故障转移、检查点(checkpoint)、resharding
  • KPL:自动批处理、重试、多线程发送,提升吞吐量

适合需要高可用、高性能的场景。

4.1 Maven 依赖

<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-producer</artifactId>
    <version>0.13.1</version>
</dependency>
<dependency>
    <groupId>com.amazonaws</groupId>
    <artifactId>amazon-kinesis-client</artifactId>
    <version>1.14.9</version>
</dependency>

4.2 Spring 配置

与 SDK 类似,只需准备好 IAM 凭证即可。

4.3 消费者实现

实现 IRecordProcessor 接口定义处理逻辑:

public class IpProcessor implements IRecordProcessor {
    @Override
    public void initialize(InitializationInput initializationInput) { }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.getRecords()
          .forEach(record -> System.out.println(new String(record.getData().array())));
    }

    @Override
    public void shutdown(ShutdownInput shutdownInput) { }
}

创建工厂类:

public class IpProcessorFactory implements IRecordProcessorFactory {
    @Override
    public IRecordProcessor createProcessor() {
        return new IpProcessor();
    }
}

启动 Worker(自动管理生命周期):

BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
  "KinesisKCLConsumer",
  "live-ips",
  new AWSStaticCredentialsProvider(awsCredentials),
  "KinesisKCLConsumer"
).withRegionName(Regions.EU_CENTRAL_1.getName());

final Worker worker = new Worker.Builder()
 .recordProcessorFactory(new IpProcessorFactory())
 .config(consumerConfig)
 .build();

CompletableFuture.runAsync(worker::run);

✅ KCL 会自动在 DynamoDB 中维护检查点,避免重复消费。

4.4 生产者实现

配置 KPL 并发送数据:

KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
  .setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
  .setRegion(Regions.EU_CENTRAL_1.getName());

this.kinesisProducer = new KinesisProducer(producerConfig);

发送记录(自动批处理):

IntStream.range(1, 200)
  .mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
  .forEach(entry -> kinesisProducer.addUserRecord("live-ips", "ip-partition-key", entry));

⚠️ 注意:addUserRecord 是异步的,需监听回调处理失败。


5. 使用 Spring Cloud Stream Binder Kinesis

这是最推荐的方式,完全契合 Spring 编程模型,通过函数式编程(Function as a Service)实现声明式绑定。

底层基于 Spring Cloud Stream,屏蔽了客户端细节,开发效率极高。

5.1 Maven 依赖

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kinesis</artifactId>
    <version>2.2.0</version>
</dependency>

5.2 Spring 配置

application.properties 中配置 AWS 信息:

cloud.aws.credentials.access-key=AKIAIOSFODNN7EXAMPLE
cloud.aws.credentials.secret-key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false

Spring Cloud Stream 提供了三个预定义接口:

  • Supplier<T>:数据源(生产者)
  • Consumer<T>:接收器(消费者)
  • Function<T, R>:处理器(转换)

5.3 消费者实现

配置绑定目标:

spring.cloud.stream.bindings.input-in-0.destination=live-ips
spring.cloud.stream.bindings.input-in-0.group=ip-consumer-group
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.function.definition=input

定义消费者 Bean:

@Configuration
public class ConsumerBinder {
    @Bean
    public Consumer<String> input() {
        return str -> {
            System.out.println("Received: " + str);
        };
    }
}

group 对应 KCL 的消费者组,实现负载均衡。

5.4 生产者实现

配置输出绑定:

spring.cloud.stream.bindings.output-out-0.destination=live-ips
spring.cloud.stream.bindings.output-out-0.content-type=text/plain
spring.cloud.stream.poller.fixed-delay=3000

定义生产者 Bean:

@Configuration
public class ProducerBinder {
    @Bean
    public Supplier<Message<String>> output() {
        return () -> IntStream.range(1, 10)
                 .mapToObj(i -> "192.168.0." + i)
                 .findFirst()
                 .map(ip -> MessageBuilder.withPayload(ip).build())
                 .orElse(null);
    }
}

poller.fixed-delay 控制发送频率,简单易用。


6. 总结

方式 适用场景 推荐指数
AWS SDK 学习原理、特殊定制 ⭐⭐
KCL/KPL 高性能、高可用需求 ⭐⭐⭐⭐
Spring Cloud Stream Binder 快速开发、Spring 生态集成 ⭐⭐⭐⭐⭐

生产环境强烈推荐使用 Spring Cloud Stream Binder Kinesis,它让开发者专注业务逻辑,而非基础设施细节。

完整示例代码见:GitHub - spring-cloud-stream-kinesis


原始标题:Integrating Spring with AWS Kinesis