1. 概述
变更数据捕获(Change Data Capture,CDC) 是一种在分布式系统中实现数据实时同步的技术,它通过捕获数据库的实时变更来驱动下游系统更新。Maxwell 是一款专为 MySQL 设计的开源 CDC 工具。
在本文中,我们将介绍 CDC 的基本原理,并通过一个完整的实战案例演示如何使用 Maxwell 捕获 MySQL 数据库中的数据和结构变更。我们会搭建 MySQL、Maxwell 和 Kafka 的 Docker 环境,并验证变更数据是否能被正确捕获并推送到 Kafka 中。
2. 变更数据捕获(CDC)
大多数数据库都会通过事务日志记录数据和结构的变更,例如 PostgreSQL 使用 WAL(Write-Ahead Log)机制,MySQL 则使用 binlog(二进制日志)。
✅ CDC 的核心思想是:直接读取这些事务日志,实时获取数据变更,避免传统轮询方式带来的延迟和性能损耗。
传统的数据同步方式通常依赖定时任务查询数据库并对比差异,这种方式存在以下几个问题:
- 延迟高
- 对源数据库造成额外负载
- 难以做到实时同步
而 CDC 通过解析数据库的事务日志,将变更事件实时推送给下游系统,显著提升了效率和响应速度。
3. Maxwell 简介
Maxwell 是一款专为 MySQL 设计的 CDC 工具。它通过读取并解析 MySQL 的 binlog,将数据变更转化为结构化的 JSON 消息,并发布到配置的“生产者”中,例如 Kafka、RabbitMQ、Redis 等。
3.1. binlog 格式
MySQL 的 binlog 记录了所有对数据库的 DML(INSERT、UPDATE、DELETE)和 DDL(CREATE、ALTER、DROP)操作。
⚠️ 要注意的是:binlog 是二进制格式的,必须用专用解析器读取。Maxwell 的价值之一就是将这些二进制内容转换为 JSON 格式,便于下游系统处理。
3.2. 变更处理机制
Maxwell 会将表结构信息存储在自己的 maxwell
数据库中,以便正确解析 binlog 中的变更事件。
当有 DDL 操作(如新增列)时,Maxwell 会更新其内部的 schema 信息,确保后续的 DML 操作能被正确识别和处理。
以下是一个典型的 JSON 输出示例,展示插入操作的变更事件:
{
"database": "ecommerce",
"table": "products",
"type": "insert",
"ts": 1691234567,
"data": {
"id": 1001,
"name": "Wireless Headphones",
"price": 99.99,
"category": "Electronics"
}
}
变更类型包括:
- insert
- delete
- update
对于更新操作,Maxwell 会额外提供 old
字段,表示变更前的值:
{
"database": "ecommerce",
"table": "products",
"type": "update",
"ts": 1691234890,
"data": {
"id": 1001,
"name": "Wireless Headphones",
"price": 89.99,
"category": "Electronics"
},
"old": {
"price": 99.99
}
}
4. 使用 Maxwell 捕获 MySQL 数据变更
我们通过一个完整的实战案例演示 Maxwell 的使用流程:
4.1. 使用 Docker 搭建 MySQL
首先,创建一个 MySQL 配置文件 mysql.cnf
,启用 binlog 并设置格式为 ROW:
$ cat mysql.cnf
[mysqld]
server-id = 1
log-bin = mysql-bin
binlog_format = ROW
binlog_row_image = FULL
⚠️ 踩坑提示:必须设置 binlog_format = ROW
,否则 Maxwell 无法捕获到行级变更
然后启动 MySQL 容器:
$ docker run -d \
--name mysql-cdc \
--network host \
-e MYSQL_ROOT_PASSWORD=password \
-e MYSQL_DATABASE=ecommerce \
-v $(pwd)/mysql.cnf:/etc/mysql/conf.d/mysql.cnf \
mysql:8.0
创建 products
表:
$ docker exec -i mysql-cdc mysql -u root -ppassword <<EOF
USE ecommerce;
CREATE TABLE products (
id INT PRIMARY KEY AUTO_INCREMENT,
name VARCHAR(255) NOT NULL,
price DECIMAL(10,2) NOT NULL,
category VARCHAR(100),
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
);
EOF
4.2. 安装与运行 Maxwell
创建 config.properties
配置文件:
$ cat config.properties
# MySQL connection
host=localhost
port=3306
user=root
password=password
producer=kafka
# Kafka configuration
kafka.bootstrap.servers=localhost:9092
kafka_topic=maxwell
kafka_partition_by=database
kafka_key_format=hash
# Output configuration
output_format=json
include_schema_id=true
include_commit_transaction=true
✅ 推荐做法:生产环境中应为 Maxwell 创建专用数据库用户,避免使用 root
启动 Maxwell 容器:
$ docker run -it -d --rm \
--name maxwell \
--network host \
-v $(pwd)/config.properties:/app/config.properties \
zendesk/maxwell:v1.40.2 \
bin/maxwell --config=/app/config.properties
4.3. 搭建 Kafka 生产者
启动 Kafka 容器(KRaft 模式,无需 Zookeeper):
$ docker run -d \
--name=kafka \
--network=host \
-e KAFKA_NODE_ID=1 \
-e CLUSTER_ID=uXke6Yw_Q6u4T0S6b1zVzw \
-e KAFKA_PROCESS_ROLES=broker,controller \
-e KAFKA_CONTROLLER_QUORUM_VOTERS=1@localhost:9093 \
-e KAFKA_LISTENERS=PLAINTEXT://localhost:9092,CONTROLLER://localhost:9093 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://localhost:9092 \
-e KAFKA_CONTROLLER_LISTENER_NAMES=CONTROLLER \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT \
-e KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,CONTROLLER:PLAINTEXT \
confluentinc/cp-server:8.0.0
4.4. 触发变更并观察输出
执行插入操作:
$ docker exec -it mysql-cdc mysql -u root -ppassword -e "USE ecommerce; INSERT INTO products
(name, price, category) VALUES ('Wireless Mouse', 29.99, 'Electronics');"
使用 Kafka 控制台消费者查看输出:
$ docker exec -it kafka kafka-console-consumer --bootstrap-server localhost:9092 --topic maxwell --from-beginning
{"database":"ecommerce","table":"products","type":"insert","ts":1754123961,"xid":322,"commit":true,"data":{"id":1,"name":"Wireless Mouse","price":29.99,"category":"Electronics","created_at":"2025-08-02 08:39:21","updated_at":"2025-08-02 08:39:21"}}
✅ 输出中包含完整的变更信息,包括时间戳、操作类型、原始数据和提交状态。
5. 总结
本文介绍了 Maxwell 的工作原理及其在 MySQL CDC 场景中的应用。
✅ Maxwell 的优势在于:
- 实时性强,基于 binlog 解析,延迟低
- 支持多种消息中间件作为输出目标(如 Kafka)
- 输出为 JSON 格式,易于集成和处理
我们通过搭建 MySQL、Maxwell 和 Kafka 的 Docker 环境,成功验证了 Maxwell 能够实时捕获数据库变更,并将变更事件推送到 Kafka 主题中。
✅ 实际应用中,可以将 Kafka 消费者对接到数据湖、搜索引擎、数据仓库等系统,实现数据实时同步与集成。