1. 简介

Apache Pulsar 是由 Yahoo 开发的开源分布式发布/订阅消息系统

它最初是为支撑 Yahoo 的关键业务应用(如 Yahoo Mail、Yahoo Finance、Yahoo Sports 等)而设计的。2016 年,该项目被开源,并由 Apache 软件基金会进行孵化。

2. 架构设计

Pulsar 是一个支持多租户、高性能的服务器间消息通信解决方案。它由一组 Broker 和 Bookie 组成,并内建使用 Apache ZooKeeper 来进行配置和管理。其中,Bookie 来自于 *Apache BookKeeper*,负责在消息被消费前进行持久化存储。

在一个典型的集群中,我们会看到以下组件:

✅ 多个 Broker 用于接收来自生产者的消息,并将消息分发给消费者
✅ Apache BookKeeper 用于支持消息持久化
✅ Apache ZooKeeper 用于存储集群配置信息

为了更直观地理解其架构,我们可以参考官方文档中的架构图:

pulsar system architecture

3. 核心特性

我们先快速过一下 Pulsar 的几个关键特性:

✅ 内置支持多集群部署
✅ 支持跨多个集群的消息地理复制(Geo-replication)
✅ 提供多种订阅模式
✅ 可扩展至百万级 Topic
✅ 使用 Apache BookKeeper 保证消息不丢失
✅ 高性能低延迟

接下来我们详细聊聊其中几个核心特性。

3.1. 消息模型

Pulsar 提供了灵活的消息模型。传统的消息系统通常有两种模型:队列(Queue)和发布/订阅(Pub/Sub)。前者是点对点通信,后者是广播式消息分发。

Pulsar 将这两种模型整合到一个统一的 API 中。生产者将消息发布到 Topic,然后这些消息会被广播到所有订阅该 Topic 的消费者。

消费者可以选择不同的订阅方式来消费消息,包括:

  • Exclusive(独占)
  • Shared(共享)
  • Failover(故障转移)

这些订阅模式我们将在后续章节详细介绍。

3.2. 部署方式

Pulsar 支持多种环境部署,这意味着你可以在本地物理机、Kubernetes 集群、Google Cloud 或 AWS 上运行它。

对于开发和测试,Pulsar 还支持单节点运行模式。在这种模式下,Broker、BookKeeper 和 ZooKeeper 都运行在同一个进程中。

3.3. 地理复制(Geo-Replication)

Pulsar 提供开箱即用的地理复制能力。通过配置不同的地理区域,可以轻松实现跨集群的消息复制。

消息复制是近实时的。即使网络中断,数据也会安全地存储在 BookKeeper 中,直到恢复后继续复制。

此外,这一功能还允许企业在多个云厂商之间部署 Pulsar 并实现数据同步,从而避免对特定云厂商 API 的依赖。

3.4. 消息持久性

一旦 Pulsar 成功读取并确认消息,就保证消息不会丢失。消息的持久性取决于存储节点的磁盘数量。

BookKeeper 通过在多个存储节点(bookie)上复制数据来确保持久性。每当 bookie 接收到消息,它会先写入内存并记录到 WAL(Write Ahead Log)中,类似数据库的 WAL 机制。这样即使机器宕机,数据也不会丢失。

此外,Pulsar 还能容忍多个节点同时故障。它会将数据复制到多个 bookie 上,并在确认后才向生产者返回成功响应。这种机制确保了即使发生多次硬件故障,也依然不会丢数据。

4. 单节点部署

接下来我们看看如何搭建一个单节点的 Pulsar 集群。

**Apache 还提供了 client API,支持 Java、Python 和 C++**。我们后面会用 Java 来演示生产者和消费者的例子。

4.1. 安装步骤

Pulsar 提供了二进制发行包,我们可以直接下载:

wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-2.1.1-incubating-bin.tar.gz

下载完成后解压,你会看到如下目录结构:bin, conf, example, licenseslib

接着我们还需要下载内置的 Connectors,它们现在是独立的包:

wget https://archive.apache.org/dist/incubator/pulsar/pulsar-2.1.1-incubating/apache-pulsar-io-connectors-2.1.1-incubating-bin.tar.gz

解压后,将 Connectors 文件夹复制到 Pulsar 主目录中。

4.2. 启动实例

使用以下命令即可启动单节点实例:

bin/pulsar standalone

5. Java 客户端示例

下面我们将创建一个 Java 项目,演示如何生产和消费消息,并展示不同的订阅模式。

5.1. 项目依赖配置

首先添加 pulsar-client 依赖:

<dependency>
    <groupId>org.apache.pulsar</groupId>
    <artifactId>pulsar-client</artifactId>
    <version>2.1.1-incubating</version>
</dependency>

5.2. 生产者示例

我们来创建一个 Producer 示例,定义 Topic 和 Producer:

**首先创建 PulsarClient,连接到指定的 Pulsar 服务地址和端口**,多个生产者和消费者可以共享一个客户端实例。

private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String TOPIC_NAME = "test-topic";
PulsarClient client = PulsarClient.builder()
  .serviceUrl(SERVICE_URL)
  .build();

Producer<byte[]> producer = client.newProducer()
  .topic(TOPIC_NAME)
  .compressionType(CompressionType.LZ4)
  .create();

发送 5 条消息:

IntStream.range(1, 5).forEach(i -> {
    String content = String.format("hi-pulsar-%d", i);

    Message<byte[]> msg = MessageBuilder.create()
      .setContent(content.getBytes())
      .build();
    MessageId msgId = producer.send(msg);
});

5.3. 消费者示例

消费者也需要使用同一个 PulsarClient 连接服务端:

Consumer<byte[]> consumer = client.newConsumer()
  .topic(TOPIC_NAME)
  .subscriptionType(SubscriptionType.Shared)
  .subscriptionName(SUBSCRIPTION_NAME)
  .subscribe();

这里我们使用了 Shared 订阅类型,允许多个消费者订阅同一个订阅名并消费消息。

5.4. 消费者订阅类型

在上面的例子中,我们用了 Shared 订阅类型。Pulsar 还支持以下几种订阅方式:

  • Exclusive:仅允许一个消费者订阅
  • Failover:主备模式,当主消费者失败时,备用消费者接管

如下图所示(来自 Apache 官方文档):

pulsar subscription modes

6. 总结

本文介绍了 Apache Pulsar 的核心特性,包括其灵活的消息模型、地理复制能力、以及强大的持久化保障。

我们还演示了如何搭建单节点环境,并使用 Java 客户端完成消息的生产和消费。

完整代码示例可以在 GitHub 仓库 中找到。


原始标题:Introduction to Apache Pulsar