1. 简介
Amazon Kinesis 是 AWS 提供的实时数据流处理服务,广泛用于构建事件驱动架构。它能高效地收集、处理和分析流式数据,是现代微服务和大数据系统中的关键组件。
本文将带你使用 Spring 框架对接 Kinesis,实现数据的生产与消费。我们会覆盖三种主流方式:
✅ 原生 AWS SDK
✅ KCL / KPL 客户端库
✅ Spring Cloud Stream Binder Kinesis
文中代码仅为演示核心逻辑,实际生产环境需补充错误处理、重试、监控等机制。
2. 前置条件
在开始编码前,需完成以下准备:
- ✅ 创建一个 Spring Boot 项目(建议使用
spring-boot-starter-web
+spring-boot-starter
) - ✅ 在 AWS 控制台或通过 CLI 创建 Kinesis Data Stream(例如命名为
live-ips
) - ✅ 获取 IAM 凭据(Access Key + Secret Key)并配置权限(
kinesis:PutRecord
,kinesis:GetRecords
等) - ✅ 明确 AWS 区域(如
eu-central-1
)
我们模拟的场景是:生产者持续写入伪造的 IP 地址,消费者从流中读取并打印到控制台。
3. 使用 AWS SDK for Java
这是最底层的方式,直接调用 AWS 提供的 Java SDK。优点是控制粒度最细;缺点是需要手动处理分片(shard)、游标(shard iterator)、错误重试、消费者守护等,生产环境使用成本高。
3.1 Maven 依赖
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk-kinesis</artifactId>
<version>1.12.380</version>
</dependency>
⚠️ 注意:这是 V1 版本 SDK,V2 已重构,API 不同。
3.2 Spring 配置
将 AmazonKinesis
客户端注册为 Spring Bean:
@Bean
public AmazonKinesis buildAmazonKinesis() {
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
return AmazonKinesisClientBuilder.standard()
.withCredentials(new AWSStaticCredentialsProvider(awsCredentials))
.withRegion(Regions.EU_CENTRAL_1)
.build();
}
在 application.properties
中配置凭证:
aws.access.key=AKIAIOSFODNN7EXAMPLE
aws.secret.key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
通过 @Value
注入:
@Value("${aws.access.key}")
private String accessKey;
@Value("${aws.secret.key}")
private String secretKey;
3.3 消费者实现
AWS SDK 使用拉模型(pull model),需主动从分片获取数据。
首先初始化分片迭代器(shard iterator):
private GetShardIteratorResult shardIterator;
@PostConstruct
private void buildShardIterator() {
GetShardIteratorRequest readShardsRequest = new GetShardIteratorRequest();
readShardsRequest.setStreamName("live-ips");
readShardsRequest.setShardIteratorType(ShardIteratorType.LATEST);
readShardsRequest.setShardId("shardId-000000000000");
this.shardIterator = kinesis.getShardIterator(readShardsRequest);
}
然后轮询获取记录:
GetRecordsRequest recordsRequest = new GetRecordsRequest();
recordsRequest.setShardIterator(shardIterator.getShardIterator());
recordsRequest.setLimit(25);
GetRecordsResult recordsResult = kinesis.getRecords(recordsRequest);
while (!recordsResult.getRecords().isEmpty()) {
recordsResult.getRecords().stream()
.map(record -> new String(record.getData().array()))
.forEach(System.out::println);
recordsRequest.setShardIterator(recordsResult.getNextShardIterator());
recordsResult = kinesis.getRecords(recordsRequest);
}
⚠️ 踩坑提示:GetRecordsResult
返回的记录可能为空(无新数据),需配合 Thread.sleep()
避免空转。
3.4 生产者实现
使用 PutRecordsRequest
批量写入数据:
List<PutRecordsRequestEntry> entries = IntStream.range(1, 200).mapToObj(ipSuffix -> {
PutRecordsRequestEntry entry = new PutRecordsRequestEntry();
entry.setData(ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()));
entry.setPartitionKey("ip-partition-key");
return entry;
}).collect(Collectors.toList());
PutRecordsRequest createRecordsRequest = new PutRecordsRequest();
createRecordsRequest.setStreamName("live-ips");
createRecordsRequest.setRecords(entries);
kinesis.putRecords(createRecordsRequest);
✅ 简单粗暴,但注意 PutRecords
有大小和数量限制(最大 500 条 / 5MB)。
4. 使用 KCL 与 KPL
Kinesis Client Library(KCL)和 Kinesis Producer Library(KPL)是对 SDK 的封装,极大简化开发。
- ✅ KCL:自动处理分片分配、故障转移、检查点(checkpoint)、resharding
- ✅ KPL:自动批处理、重试、多线程发送,提升吞吐量
适合需要高可用、高性能的场景。
4.1 Maven 依赖
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-producer</artifactId>
<version>0.13.1</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>amazon-kinesis-client</artifactId>
<version>1.14.9</version>
</dependency>
4.2 Spring 配置
与 SDK 类似,只需准备好 IAM 凭证即可。
4.3 消费者实现
实现 IRecordProcessor
接口定义处理逻辑:
public class IpProcessor implements IRecordProcessor {
@Override
public void initialize(InitializationInput initializationInput) { }
@Override
public void processRecords(ProcessRecordsInput processRecordsInput) {
processRecordsInput.getRecords()
.forEach(record -> System.out.println(new String(record.getData().array())));
}
@Override
public void shutdown(ShutdownInput shutdownInput) { }
}
创建工厂类:
public class IpProcessorFactory implements IRecordProcessorFactory {
@Override
public IRecordProcessor createProcessor() {
return new IpProcessor();
}
}
启动 Worker(自动管理生命周期):
BasicAWSCredentials awsCredentials = new BasicAWSCredentials(accessKey, secretKey);
KinesisClientLibConfiguration consumerConfig = new KinesisClientLibConfiguration(
"KinesisKCLConsumer",
"live-ips",
new AWSStaticCredentialsProvider(awsCredentials),
"KinesisKCLConsumer"
).withRegionName(Regions.EU_CENTRAL_1.getName());
final Worker worker = new Worker.Builder()
.recordProcessorFactory(new IpProcessorFactory())
.config(consumerConfig)
.build();
CompletableFuture.runAsync(worker::run);
✅ KCL 会自动在 DynamoDB 中维护检查点,避免重复消费。
4.4 生产者实现
配置 KPL 并发送数据:
KinesisProducerConfiguration producerConfig = new KinesisProducerConfiguration()
.setCredentialsProvider(new AWSStaticCredentialsProvider(awsCredentials))
.setRegion(Regions.EU_CENTRAL_1.getName());
this.kinesisProducer = new KinesisProducer(producerConfig);
发送记录(自动批处理):
IntStream.range(1, 200)
.mapToObj(ipSuffix -> ByteBuffer.wrap(("192.168.0." + ipSuffix).getBytes()))
.forEach(entry -> kinesisProducer.addUserRecord("live-ips", "ip-partition-key", entry));
⚠️ 注意:addUserRecord
是异步的,需监听回调处理失败。
5. 使用 Spring Cloud Stream Binder Kinesis
这是最推荐的方式,完全契合 Spring 编程模型,通过函数式编程(Function as a Service)实现声明式绑定。
底层基于 Spring Cloud Stream,屏蔽了客户端细节,开发效率极高。
5.1 Maven 依赖
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-stream-binder-kinesis</artifactId>
<version>2.2.0</version>
</dependency>
5.2 Spring 配置
在 application.properties
中配置 AWS 信息:
cloud.aws.credentials.access-key=AKIAIOSFODNN7EXAMPLE
cloud.aws.credentials.secret-key=wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY
cloud.aws.region.static=eu-central-1
cloud.aws.stack.auto=false
Spring Cloud Stream 提供了三个预定义接口:
Supplier<T>
:数据源(生产者)Consumer<T>
:接收器(消费者)Function<T, R>
:处理器(转换)
5.3 消费者实现
配置绑定目标:
spring.cloud.stream.bindings.input-in-0.destination=live-ips
spring.cloud.stream.bindings.input-in-0.group=ip-consumer-group
spring.cloud.stream.bindings.input-in-0.content-type=text/plain
spring.cloud.stream.function.definition=input
定义消费者 Bean:
@Configuration
public class ConsumerBinder {
@Bean
public Consumer<String> input() {
return str -> {
System.out.println("Received: " + str);
};
}
}
✅ group
对应 KCL 的消费者组,实现负载均衡。
5.4 生产者实现
配置输出绑定:
spring.cloud.stream.bindings.output-out-0.destination=live-ips
spring.cloud.stream.bindings.output-out-0.content-type=text/plain
spring.cloud.stream.poller.fixed-delay=3000
定义生产者 Bean:
@Configuration
public class ProducerBinder {
@Bean
public Supplier<Message<String>> output() {
return () -> IntStream.range(1, 10)
.mapToObj(i -> "192.168.0." + i)
.findFirst()
.map(ip -> MessageBuilder.withPayload(ip).build())
.orElse(null);
}
}
✅ poller.fixed-delay
控制发送频率,简单易用。
6. 总结
方式 | 适用场景 | 推荐指数 |
---|---|---|
AWS SDK | 学习原理、特殊定制 | ⭐⭐ |
KCL/KPL | 高性能、高可用需求 | ⭐⭐⭐⭐ |
Spring Cloud Stream Binder | 快速开发、Spring 生态集成 | ⭐⭐⭐⭐⭐ |
生产环境强烈推荐使用 Spring Cloud Stream Binder Kinesis,它让开发者专注业务逻辑,而非基础设施细节。