1. 概述

本教程将介绍一个强大的事件流处理平台 Redpanda。它是业界事实标准流处理平台 Kafka 的竞争者,有趣的是,它还兼容 Kafka API。

我们将探讨 Redpanda 的核心组件、特性和应用场景,并编写 Java 程序实现向 Redpanda 主题发布消息以及从中读取消息。

2. Redpanda vs Kafka

既然 Redpanda 声称是 Kafka 的竞争者,我们从几个关键维度进行对比:

特性 Redpanda Kafka
开发体验 ✅ 单一二进制包,安装简单
✅ 无需 JVM 和第三方工具依赖
❌ 依赖 Zookeeper 或 KRaft
❌ 安装需要更多专业知识
性能 ✅ 每核每秒处理 1GB 数据
✅ C++ 实现,p99999 延迟仅 16ms
✅ 支持自动内核调优
❌ Java 实现,p99999 延迟 1.8 秒
❌ 未针对现代多核 CPU 优化
成本 ✅ 基础设施成本仅为 Kafka 的 1/6 ❌ 需要更多资源支撑同等性能
连接器 ⚠️ Redpanda Cloud 提供少量托管连接器 ✅ 生态成熟,提供丰富开箱即用连接器
社区支持 ⚠️ 社区仍在成长中
✅ 提供 Slack 频道
✅ 各行业广泛采用,社区极其成熟

3. Redpanda 架构

Redpanda 架构不仅简单,而且极易理解。它采用单一二进制安装包,极大简化了部署流程,让开发者能快速上手。同时,它提供了高吞吐量的高性能流处理平台。

3.1 核心组件与特性

以下是 Redpanda 实现高可靠性和高性能的关键组件:

Redpanda 集群架构

控制平面支持 Kafka API,可用于管理 Broker、创建消息主题、发布和消费消息等操作。这使得依赖 Kafka 的遗留系统可以低成本迁移到 Redpanda。但需注意,Redpanda 集群管理使用独立的 Admin API。

支持分层存储:可配置将数据日志从本地缓存卸载到云端低成本对象存储。当消费者需要时,数据会实时从远程对象存储回传到本地缓存。

内置 Raft 共识算法实现层,在节点间复制主题分区数据。这确保了故障发生时不会丢失数据,提供了高数据安全性和容错能力。

提供强大的认证与授权支持,支持 SASL、OAuth、OIDC、基础认证、Kerberos 等多种认证方式。通过基于角色的访问控制(RBAC)实现细粒度资源访问控制。

Schema Registry 用于定义 Broker、消费者和生产者之间交换的数据结构。Schema Registry API 负责注册和修改这些模式。

HTTP 代理(pandaproxy)API 提供便捷的 REST 接口,支持列出主题和 Broker、获取事件、生产事件等基础操作。

提供监控指标接口,可配置 Prometheus 拉取关键指标并在 Grafana 仪表盘 展示。

3.2 单一二进制安装包

Redpanda 采用单一二进制安装包,安装过程显著简化。与 Kafka 不同,它不依赖 JVM 或 Zookeeper 等集群管理工具,运维极其简单。

基于 C++ 开发,采用每核线程模型,能最优利用 CPU 核心、内存和网络资源。这大幅降低了硬件部署成本,同时实现低延迟和高吞吐。

Redpanda 集群由多个节点组成,每个节点可承担数据平面或控制平面角色。只需在节点上安装单一二进制包并适当配置即可。若节点具备高性能计算能力,可同时承担两种角色而不会造成性能瓶颈。

3.3 管理工具

提供两种管理工具Web 控制台 和名为 Redpanda Keeper (RPK) 的 CLI 工具。控制台是集群管理员使用的友好 Web 应用。

RPK 主要用于底层集群管理和调优,而控制台提供数据流可见性,支持故障排查和集群管理。

4. 部署方式

Redpanda 支持自托管和 Redpanda 云部署

自托管部署允许客户在私有数据中心或公有云 VPC 中部署 Redpanda 集群。支持物理机、虚拟机和 Kubernetes 部署。最佳实践是为每个 Broker 分配专用节点。当前支持 RHEL/CentOS 和 Ubuntu 操作系统。

分层存储可使用 AWS S3、Azure Blob Storage (ABS) 或 Google Cloud Storage (GCS)。

客户也可选择Redpanda 云托管服务:可将整个集群部署在 Redpanda 云上,或选择在私有数据中心/公有云账户中运行数据平面,控制平面仍由 Redpanda 云负责监控、配置和升级。

5. 核心应用场景

得益于简洁架构和易用性,Redpanda 是开发者构建流处理平台的理想选择。主要应用场景如下:

核心应用场景

流处理平台参与者包括:

  • 源系统生成数据流(监控事件、指标、通知等)
  • 集群中的 Broker 管理主题
  • 生产者从源系统读取数据并发布到主题
  • 消费者持续轮询订阅的主题
  • 目标系统接收消费者处理后的消息

Redpanda 能以比 Kafka 低 10 倍的平均延迟,确保来自监控工具、合规安全平台、IoT 设备等源的实时数据流可靠传递。

支持生产者-消费者模型处理实时数据流:生产者从源系统读取数据发布到 Redpanda 主题,集群中的 Broker 高可靠且容错,保证消息传递。

消费者应用订阅集群主题,读取数据后经进一步处理发送到目标系统,如分析平台、NoSQL 数据库、关系型数据库或其他流处理平台。

在微服务架构中,Redpanda 通过服务间异步通信实现解耦。

典型行业应用包括:

  • 可观测性平台(事件/日志处理、报告、故障排查、自愈)
  • 实时合规与欺诈检测系统
  • 实时分析仪表盘与应用

6. 使用 Kafka API 实现 Redpanda 客户端

Redpanda 兼容 Kafka API,因此我们使用 Kafka 客户端编写与 Redpanda 交互的程序。

示例使用 Java Testcontainers 在 Windows 桌面部署单节点 Redpanda。我们将演示主题创建、消息发布和消费的基础操作,不深入探讨 Kafka API 细节。

6.1 前置准备

添加 Kafka 客户端库的 Maven 依赖

<dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka-clients</artifactId>
    <version>3.6.1</version>
</dependency>

6.2 创建主题

使用 Kafka 客户端库的 AdminClient 创建主题:

AdminClient createAdminClient() {
    Properties adminProps = new Properties();
    adminProps.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
    return KafkaAdminClient.create(adminProps);
}

通过获取 Broker URL 并传递给静态 create() 方法初始化 AdminClient

创建主题的实现:

void createTopic(String topicName) {

    try (AdminClient adminClient = createAdminClient()) {
        NewTopic topic = new NewTopic(topicName, 1, (short) 1);
        adminClient.createTopics(Collections.singleton(topic));
    } catch (Exception e) {
        LOGGER.error("Error occurred during topic creation:", e);
    }
}

AdminClientcreateTopics() 方法接收 NewTopic 对象作为参数。

验证主题创建:

@Test
void whenCreateTopic_thenSuccess() throws ExecutionException, InterruptedException {
    String topic = "test-topic";
    createTopic(topic);
    try(AdminClient adminClient = createAdminClient()) {
        assertTrue(adminClient.listTopics()
          .names()
          .get()
          .contains(topic));
    }
}

程序成功在 Redpanda 创建 test-topic 主题,并通过 listTopics() 验证其存在。

6.3 发布消息到主题

生产者应用的核心需求是向主题发布消息。使用 KafkaProducer 实现:

KafkaProducer<String, String> createProducer() {
    Properties producerProps = new Properties();
    producerProps.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
    producerProps.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    producerProps.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

    return new KafkaProducer<String, String>(producerProps);
}

通过提供 Broker URL 和 StringSerializer 类等必要属性初始化生产者。

发布消息的实现:

void publishMessage(String msgKey, String msg, String topic, KafkaProducer<String, String> producer)
    throws ExecutionException, InterruptedException {
    ProducerRecord<String, String> record = new ProducerRecord<>(topic, msgKey, msg);
    producer.send(record).get();
}

创建 ProducerRecord 对象后,传递给 KafkaProducersend() 方法。send() 是异步操作,调用 get() 确保阻塞直到消息发布完成。

发布消息测试:

@Test
void givenTopic_whenPublishMsg_thenSuccess() {
    try (final KafkaProducer<String, String> producer = createProducer()) {
        assertDoesNotThrow(() -> publishMessage("test_msg_key_2", "Hello Redpanda!", "baeldung-topic", producer));
    }
}

通过 createProducer() 创建 KafkaProducer,调用 publishMessage() 将消息 "Hello Redpanda!" 发布到 baeldung-topic 主题。

6.4 从主题消费消息

使用 KafkaConsumer 消费消息:

KafkaConsumer<String, String> createConsumer() {
    Properties consumerProps = new Properties();
    consumerProps.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
    consumerProps.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, getBrokerUrl());
    consumerProps.put(ConsumerConfig.GROUP_ID_CONFIG, "test-consumer-group");
    consumerProps.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
    consumerProps.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());

    return new KafkaConsumer<String, String>(consumerProps);
}

通过提供 Broker URL、StringDeSerializer 类等属性初始化消费者。设置 AUTO_OFFSET_RESET_CONFIG"earliest" 确保从偏移量 0 开始消费。

消费消息测试:

@Test
void givenTopic_whenConsumeMessage_thenSuccess() {

    try (KafkaConsumer<String, String> kafkaConsumer = createConsumer()) {
        kafkaConsumer.subscribe(Collections.singletonList(TOPIC_NAME));

        while(true) {
            ConsumerRecords<String, String> records = kafkaConsumer.poll(Duration.ofMillis(1000));
            if(records.count() == 0) {
                continue;
            }
            assertTrue(records.count() >= 1);
            break;
        }
    }
}

创建 KafkaConsumer 后订阅主题,每 1000ms 轮询一次读取消息。实际应用中会持续轮询并处理消息,此处为演示仅读取一次。

7. 总结

本教程探讨了 Redpanda 流处理平台。概念上它与 Apache Kafka 类似,但安装、监控和管理更简单。在更少的计算和内存资源下,它能实现高性能和高容错。

然而,与 Kafka 相比,Redpanda 在行业采用度上仍有差距,社区支持也不如 Kafka 强大。

由于兼容 Kafka API,应用从 Kafka 迁移到 Redpanda 的成本显著降低。

本文代码可在 GitHub 获取。


原始标题:Introduction to Redpanda