1. 概述
本教程将介绍一个强大的事件流处理平台 Redpanda。它是业界事实标准流处理平台 Kafka 的竞争者,有趣的是,它还兼容 Kafka API。
我们将探讨 Redpanda 的核心组件、特性和应用场景,并编写 Java 程序实现向 Redpanda 主题发布消息以及从中读取消息。
2. Redpanda vs Kafka
既然 Redpanda 声称是 Kafka 的竞争者,我们从几个关键维度进行对比:
特性 | Redpanda | Kafka |
---|---|---|
开发体验 | ✅ 单一二进制包,安装简单 ✅ 无需 JVM 和第三方工具依赖 |
❌ 依赖 Zookeeper 或 KRaft ❌ 安装需要更多专业知识 |
性能 | ✅ 每核每秒处理 1GB 数据 ✅ C++ 实现,p99999 延迟仅 16ms ✅ 支持自动内核调优 |
❌ Java 实现,p99999 延迟 1.8 秒 ❌ 未针对现代多核 CPU 优化 |
成本 | ✅ 基础设施成本仅为 Kafka 的 1/6 | ❌ 需要更多资源支撑同等性能 |
连接器 | ⚠️ Redpanda Cloud 提供少量托管连接器 | ✅ 生态成熟,提供丰富开箱即用连接器 |
社区支持 | ⚠️ 社区仍在成长中 ✅ 提供 Slack 频道 |
✅ 各行业广泛采用,社区极其成熟 |
3. Redpanda 架构
Redpanda 架构不仅简单,而且极易理解。它采用单一二进制安装包,极大简化了部署流程,让开发者能快速上手。同时,它提供了高吞吐量的高性能流处理平台。
3.1 核心组件与特性
以下是 Redpanda 实现高可靠性和高性能的关键组件:
控制平面支持 Kafka API,可用于管理 Broker、创建消息主题、发布和消费消息等操作。这使得依赖 Kafka 的遗留系统可以低成本迁移到 Redpanda。但需注意,Redpanda 集群管理使用独立的 Admin API。
支持分层存储:可配置将数据日志从本地缓存卸载到云端低成本对象存储。当消费者需要时,数据会实时从远程对象存储回传到本地缓存。
内置 Raft 共识算法实现层,在节点间复制主题分区数据。这确保了故障发生时不会丢失数据,提供了高数据安全性和容错能力。
提供强大的认证与授权支持,支持 SASL、OAuth、OIDC、基础认证、Kerberos 等多种认证方式。通过基于角色的访问控制(RBAC)实现细粒度资源访问控制。
Schema Registry 用于定义 Broker、消费者和生产者之间交换的数据结构。Schema Registry API 负责注册和修改这些模式。
HTTP 代理(pandaproxy)API 提供便捷的 REST 接口,支持列出主题和 Broker、获取事件、生产事件等基础操作。
提供监控指标接口,可配置 Prometheus 拉取关键指标并在 Grafana 仪表盘 展示。
3.2 单一二进制安装包
Redpanda 采用单一二进制安装包,安装过程显著简化。与 Kafka 不同,它不依赖 JVM 或 Zookeeper 等集群管理工具,运维极其简单。
基于 C++ 开发,采用每核线程模型,能最优利用 CPU 核心、内存和网络资源。这大幅降低了硬件部署成本,同时实现低延迟和高吞吐。
Redpanda 集群由多个节点组成,每个节点可承担数据平面或控制平面角色。只需在节点上安装单一二进制包并适当配置即可。若节点具备高性能计算能力,可同时承担两种角色而不会造成性能瓶颈。
3.3 管理工具
提供两种管理工具:Web 控制台 和名为 Redpanda Keeper (RPK) 的 CLI 工具。控制台是集群管理员使用的友好 Web 应用。
RPK 主要用于底层集群管理和调优,而控制台提供数据流可见性,支持故障排查和集群管理。
4. 部署方式
Redpanda 支持自托管和 Redpanda 云部署。
自托管部署允许客户在私有数据中心或公有云 VPC 中部署 Redpanda 集群。支持物理机、虚拟机和 Kubernetes 部署。最佳实践是为每个 Broker 分配专用节点。当前支持 RHEL/CentOS 和 Ubuntu 操作系统。
分层存储可使用 AWS S3、Azure Blob Storage (ABS) 或 Google Cloud Storage (GCS)。
客户也可选择Redpanda 云托管服务:可将整个集群部署在 Redpanda 云上,或选择在私有数据中心/公有云账户中运行数据平面,控制平面仍由 Redpanda 云负责监控、配置和升级。
5. 核心应用场景
得益于简洁架构和易用性,Redpanda 是开发者构建流处理平台的理想选择。主要应用场景如下:
流处理平台参与者包括:
- 源系统生成数据流(监控事件、指标、通知等)
- 集群中的 Broker 管理主题
- 生产者从源系统读取数据并发布到主题
- 消费者持续轮询订阅的主题
- 目标系统接收消费者处理后的消息
Redpanda 能以比 Kafka 低 10 倍的平均延迟,确保来自监控工具、合规安全平台、IoT 设备等源的实时数据流可靠传递。
支持生产者-消费者模型处理实时数据流:生产者从源系统读取数据发布到 Redpanda 主题,集群中的 Broker 高可靠且容错,保证消息传递。
消费者应用订阅集群主题,读取数据后经进一步处理发送到目标系统,如分析平台、NoSQL 数据库、关系型数据库或其他流处理平台。
在微服务架构中,Redpanda 通过服务间异步通信实现解耦。
典型行业应用包括:
- 可观测性平台(事件/日志处理、报告、故障排查、自愈)
- 实时合规与欺诈检测系统
- 实时分析仪表盘与应用
6. 使用 Kafka API 实现 Redpanda 客户端
Redpanda 兼容 Kafka API,因此我们使用 Kafka 客户端编写与 Redpanda 交互的程序。
示例使用 Java Testcontainers 在 Windows 桌面部署单节点 Redpanda。我们将演示主题创建、消息发布和消费的基础操作,不深入探讨 Kafka API 细节。
6.1 前置准备
添加 Kafka 客户端库的 Maven 依赖:
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>kafka-clients</artifactId>
<version>3.6.1</version>
</dependency>
6.2 创建主题
使用 Kafka 客户端库的 AdminClient 创建主题:
AdminClient createAdminClient() {
Properties adminProps = new Properties();
adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
return KafkaAdminClient.create(adminProps);
}
通过获取 Broker URL 并传递给静态 create()
方法初始化 AdminClient
。
创建主题的实现:
void createTopic(String topicName) {
try (AdminClient adminClient = createAdminClient()) {
NewTopic topic = new NewTopic(topicName, 1, (short) 1);
adminClient.createTopics(Collections.singleton(topic));
} catch (Exception e) {
LOGGER.error("Error occurred during topic creation:", e);
}
}
AdminClient
的 createTopics()
方法接收 NewTopic
对象作为参数。
验证主题创建:
@Test
void whenCreateTopic_thenSuccess() throws ExecutionException, InterruptedException {
String topic = "test-topic";
createTopic(topic);
try(AdminClient adminClient = createAdminClient()) {
assertTrue(adminClient.listTopics()
.names()
.get()
.contains(topic));
}
}
程序成功在 Redpanda 创建 test-topic
主题,并通过 listTopics()
验证其存在。
6.3 发布消息到主题
生产者应用的核心需求是向主题发布消息。使用 KafkaProducer 实现:
KafkaProducer<String, String> createProducer() {
Properties producerProps = new Properties();
producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
return new KafkaProducer<String, String>(producerProps);
}
通过提供 Broker URL 和 StringSerializer
类等必要属性初始化生产者。
发布消息的实现:
void publishMessage(String msgKey, String msg, String topic, KafkaProducer<String, String> producer)
throws ExecutionException, InterruptedException {
ProducerRecord<String, String> record = new ProducerRecord<>(topic, msgKey, msg);
producer.send(record).get();
}
创建 ProducerRecord
对象后,传递给 KafkaProducer
的 send()
方法。send()
是异步操作,调用 get()
确保阻塞直到消息发布完成。
发布消息测试:
@Test
void givenTopic_whenPublishMsg_thenSuccess() {
try (final KafkaProducer<String, String> producer = createProducer()) {
assertDoesNotThrow(() -> publishMessage("test_msg_key_2", "Hello Redpanda!", "baeldung-topic", producer));
}
}
通过 createProducer()
创建 KafkaProducer
,调用 publishMessage()
将消息 "Hello Redpanda!"
发布到 baeldung-topic
主题。
6.4 从主题消费消息
使用 KafkaConsumer 消费消息:
KafkaConsumer<String, String> createConsumer() {
Properties consumerProps = new Properties();
consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
return new KafkaConsumer<String, String>(consumerProps);
}
通过提供 Broker URL、StringDeSerializer
类等属性初始化消费者。设置 AUTO_OFFSET_RESET_CONFIG
为 "earliest"
确保从偏移量 0 开始消费。
消费消息测试:
@Test
void givenTopic_whenConsumeMessage_thenSuccess() {
try (KafkaConsumer<String, String> kafkaConsumer = createConsumer()) {
kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME));
while(true) {
ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
if(records.count() == 0) {
continue;
}
assertTrue(records.count() >= 1);
break;
}
}
}
创建 KafkaConsumer
后订阅主题,每 1000ms 轮询一次读取消息。实际应用中会持续轮询并处理消息,此处为演示仅读取一次。
7. 总结
本教程探讨了 Redpanda 流处理平台。概念上它与 Apache Kafka 类似,但安装、监控和管理更简单。在更少的计算和内存资源下,它能实现高性能和高容错。
然而,与 Kafka 相比,Redpanda 在行业采用度上仍有差距,社区支持也不如 Kafka 强大。
由于兼容 Kafka API,应用从 Kafka 迁移到 Redpanda 的成本显著降低。
本文代码可在 GitHub 获取。