1. 概述

变更数据捕获(Change Data Capture,CDC) 是一种在分布式系统中实现数据实时同步的技术,它通过捕获数据库的实时变更来驱动下游系统更新。Maxwell 是一款专为 MySQL 设计的开源 CDC 工具。

在本文中,我们将介绍 CDC 的基本原理,并通过一个完整的实战案例演示如何使用 Maxwell 捕获 MySQL 数据库中的数据和结构变更。我们会搭建 MySQL、Maxwell 和 Kafka 的 Docker 环境,并验证变更数据是否能被正确捕获并推送到 Kafka 中。

2. 变更数据捕获(CDC)

大多数数据库都会通过事务日志记录数据和结构的变更,例如 PostgreSQL 使用 WAL(Write-Ahead Log)机制,MySQL 则使用 binlog(二进制日志)。

✅ CDC 的核心思想是:直接读取这些事务日志,实时获取数据变更,避免传统轮询方式带来的延迟和性能损耗

传统的数据同步方式通常依赖定时任务查询数据库并对比差异,这种方式存在以下几个问题:

  • 延迟高
  • 对源数据库造成额外负载
  • 难以做到实时同步

而 CDC 通过解析数据库的事务日志,将变更事件实时推送给下游系统,显著提升了效率和响应速度。

3. Maxwell 简介

Maxwell 是一款专为 MySQL 设计的 CDC 工具。它通过读取并解析 MySQL 的 binlog,将数据变更转化为结构化的 JSON 消息,并发布到配置的“生产者”中,例如 Kafka、RabbitMQ、Redis 等。

3.1. binlog 格式

MySQL 的 binlog 记录了所有对数据库的 DML(INSERT、UPDATE、DELETE)和 DDL(CREATE、ALTER、DROP)操作。

⚠️ 要注意的是:binlog 是二进制格式的,必须用专用解析器读取。Maxwell 的价值之一就是将这些二进制内容转换为 JSON 格式,便于下游系统处理。

3.2. 变更处理机制

Maxwell 会将表结构信息存储在自己的 maxwell 数据库中,以便正确解析 binlog 中的变更事件。

当有 DDL 操作(如新增列)时,Maxwell 会更新其内部的 schema 信息,确保后续的 DML 操作能被正确识别和处理。

以下是一个典型的 JSON 输出示例,展示插入操作的变更事件:

{
  "database": "ecommerce",
  "table": "products",
  "type": "insert",
  "ts": 1691234567,
  "data": { 
    "id": 1001, 
    "name": "Wireless Headphones", 
    "price": 99.99, 
    "category": "Electronics" 
  }
}

变更类型包括:

  • insert
  • delete
  • update

对于更新操作,Maxwell 会额外提供 old 字段,表示变更前的值:

{
  "database": "ecommerce",
  "table": "products", 
  "type": "update",
  "ts": 1691234890,
  "data": { 
    "id": 1001, 
    "name": "Wireless Headphones", 
    "price": 89.99, 
    "category": "Electronics" 
  },
  "old": { 
    "price": 99.99 
  }
}

4. 使用 Maxwell 捕获 MySQL 数据变更

我们通过一个完整的实战案例演示 Maxwell 的使用流程:

4.1. 使用 Docker 搭建 MySQL

首先,创建一个 MySQL 配置文件 mysql.cnf,启用 binlog 并设置格式为 ROW:

$ cat mysql.cnf 
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL

⚠️ 踩坑提示:必须设置 binlog_format = ROW,否则 Maxwell 无法捕获到行级变更

然后启动 MySQL 容器:

$ docker run -d \
  --name mysql-cdc \
  --network host \
  -e MYSQL_ROOT_PASSWORD=password \
  -e MYSQL_DATABASE=ecommerce \
  -v $(pwd)/mysql.cnf:/etc/mysql/conf.d/mysql.cnf \
  mysql:8.0

创建 products 表:

$ docker exec -i mysql-cdc mysql -u root -ppassword <<EOF
  USE ecommerce;
  CREATE TABLE products (
      id INT PRIMARY KEY AUTO_INCREMENT,
      name VARCHAR(255) NOT NULL,
      price DECIMAL(10,2) NOT NULL,
      category VARCHAR(100),
      created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
      updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
  );
  EOF

4.2. 安装与运行 Maxwell

创建 config.properties 配置文件:

$ cat config.properties
# MySQL connection
host=localhost
port=3306
user=root
password=password
producer=kafka

# Kafka configuration
kafka.bootstrap.servers=localhost:9092
kafka_topic=maxwell
kafka_partition_by=database
kafka_key_format=hash

# Output configuration
output_format=json
include_schema_id=true
include_commit_transaction=true

✅ 推荐做法:生产环境中应为 Maxwell 创建专用数据库用户,避免使用 root

启动 Maxwell 容器:

$ docker run -it -d --rm \
  --name maxwell \
  --network host \
  -v $(pwd)/config.properties:/app/config.properties \
  zendesk/maxwell:v1.40.2 \
  bin/maxwell --config=/app/config.properties

4.3. 搭建 Kafka 生产者

启动 Kafka 容器(KRaft 模式,无需 Zookeeper):

$ docker run -d \
  --name=kafka \
  --network=host \
  -e KAFKA_NODE_ID=1 \
  -e CLUSTER_ID=uXke6Yw_Q6u4T0S6b1zVzw \
  -e KAFKA_PROCESS_ROLES=broker,controller \
  -e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
  -e KAFKA_LISTENERS=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
  -e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
  -e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT \
confluentinc/cp-server:8.0.0

4.4. 触发变更并观察输出

执行插入操作:

$ docker exec -it mysql-cdc mysql -u root -ppassword -e "USE ecommerce; INSERT INTO products 
  (name, price, category) VALUES ('Wireless Mouse', 29.99, 'Electronics');"

使用 Kafka 控制台消费者查看输出:

$ docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic maxwell --from-beginning
{"database":"ecommerce","table":"products","type":"insert","ts":1754123961,"xid":322,"commit":true,"data":{"id":1,"name":"Wireless Mouse","price":29.99,"category":"Electronics","created_at":"2025-08-02 08:39:21","updated_at":"2025-08-02 08:39:21"}}

✅ 输出中包含完整的变更信息,包括时间戳、操作类型、原始数据和提交状态。

5. 总结

本文介绍了 Maxwell 的工作原理及其在 MySQL CDC 场景中的应用。

✅ Maxwell 的优势在于:

  • 实时性强,基于 binlog 解析,延迟低
  • 支持多种消息中间件作为输出目标(如 Kafka)
  • 输出为 JSON 格式,易于集成和处理

我们通过搭建 MySQL、Maxwell 和 Kafka 的 Docker 环境,成功验证了 Maxwell 能够实时捕获数据库变更,并将变更事件推送到 Kafka 主题中。

✅ 实际应用中,可以将 Kafka 消费者对接到数据湖、搜索引擎、数据仓库等系统,实现数据实时同步与集成。


原始标题:Capturing MySQL Data Change Using Maxwell