1. 概述
在现代数据驱动的组织中,为了做出更明智的业务决策,通常需要构建大量的数据流水线来完成数据的获取、转换、提取和存储等操作。这些流程如果缺乏统一管理,很容易变得难以维护。因此,许多组织选择使用工作流自动化平台来管理这些流程,Kestra 就是其中之一。
本文将介绍 Kestra 这个开源的工作流自动化平台,帮助你快速理解其核心概念、安装方式,并通过一个实际案例演示如何构建自动化流程。
2. Kestra 简介
Kestra 是一个开源的工作流自动化平台。它的一个显著特点是:将工作流管理视为基础设施即代码(Infrastructure as Code)。这意味着所有的流程定义和配置都通过 YAML 文件进行管理,并支持版本控制。
这种设计带来了以下优势:
✅ 易于版本控制
✅ 提高流程变更的可追溯性
✅ 增强流程配置的健壮性
3. 核心概念
在使用 Kestra 前,需要了解几个核心概念:
3.1 流程(Flow)
一个 Flow 是 Kestra 中的基本工作单元,由唯一的命名空间(Namespace)和 ID 标识。每个 Flow 可以包含多个任务(Tasks)。
3.2 任务(Task)
任务是流程中的最小执行单元,分为以下三类:
- Flowable Tasks:控制流程逻辑,例如条件判断、分支等,类似编程语言中的 if-else 和 switch。
- 示例:
io.kestra.plugin.core.flow.If
- 示例:
- Runnable Tasks:执行数据处理或与外部系统交互,例如发送 HTTP 请求、执行数据库查询。
- 示例:
io.kestra.plugin.core.http.Request
- 示例:
- Script Tasks:在独立的 Docker 容器中执行任意脚本,支持多种语言,如 Python、Node.js、Shell 等。
- 示例:
io.kestra.plugin.scripts.python.Script
- 示例:
3.3 触发器(Trigger)
触发器用于自动启动流程,主要有以下几种类型:
- Schedule Trigger:基于 Cron 表达式定时触发流程。
- Flow Trigger:当另一个流程执行完成后触发当前流程。
- Webhook Trigger:通过 HTTP Webhook 调用触发流程。
4. 安装与运行 Kestra
Kestra 支持多种部署方式,包括 Docker 容器和 Kubernetes 集群。为方便演示,我们使用 Docker 来运行 Kestra:
docker run --pull=always --rm -it -p 8080:8080 -d -v /tmp:/tmp kestra/kestra:latest server local
⚠️ 说明:local
模式使用 H2 本地数据库,仅适用于测试环境。生产环境请参考官方文档进行部署。
启动后,访问 http://localhost:8080
即可进入 Kestra 的 Web 界面:
5. 实战案例:比特币价格抓取流程
为了演示 Kestra 的完整使用流程,我们构建一个每 5 分钟抓取一次比特币价格并存储到数据库的自动化流程。
5.1 使用的 API
我们使用以下 API 获取比特币价格数据:
https://blockchain.info/ticker
示例输出:
{
"USD": {
"15m": 30000.0,
"last": 30000.0,
"buy": 30000.0,
"sell": 30000.0,
"symbol": "USD"
}
}
5.2 创建流程
进入 Kestra 的 Flows 页面,点击右上角的 Create 按钮,创建一个新流程。设置如下:
id: bitcoin-price-scraper
namespace: finance
5.3 定义任务
任务 1:获取比特币价格(HTTP 请求)
- id: fetch_btc_price
type: io.kestra.plugin.core.http.Request
uri: "https://blockchain.info/ticker"
method: GET
任务 2:解析响应(Python 脚本)
- id: parse_response
type: io.kestra.plugin.scripts.python.Script
script: |
import json
from kestra import Kestra
from datetime import datetime
data = json.loads('{{ outputs.fetch_btc_price.body }}')
Kestra.outputs({"price": data["USD"]["last"], "timestamp": datetime.utcnow().isoformat()})
任务 3:存储到数据库(PostgreSQL)
- id: store_to_db
type: io.kestra.plugin.jdbc.postgresql.Query
url: "jdbc:postgresql://localhost:5432/exchange"
username: "user"
password: "password"
sql: |
INSERT INTO exchange_rates (currency, last_price, timestamp)
VALUES (:currency, CAST(:last_price AS DECIMAL), CAST(:timestamp AS TIMESTAMP));
parameters:
currency: "USD"
last_price: "{{ outputs.parse_response.vars.price }}"
timestamp: "{{ outputs.parse_response.vars.timestamp }}"
5.4 定义触发器
每 5 分钟执行一次:
triggers:
- id: periodic_trigger
type: io.kestra.plugin.core.trigger.Schedule
cron: "*/5 * * * *"
5.5 完整流程定义
id: bitcoin-price-scraper
namespace: finance
triggers:
- id: periodic_trigger
type: io.kestra.plugin.core.trigger.Schedule
cron: "*/5 * * * *"
tasks:
- id: fetch_btc_price
type: io.kestra.plugin.core.http.Request
uri: "https://blockchain.info/ticker"
method: GET
- id: parse_response
type: io.kestra.plugin.scripts.python.Script
script: |
import json
from kestra import Kestra
from datetime import datetime
data = json.loads('{{ outputs.fetch_btc_price.body }}')
Kestra.outputs({"price": data["USD"]["last"], "timestamp": datetime.utcnow().isoformat()})
- id: store_to_db
type: io.kestra.plugin.jdbc.postgresql.Query
url: "jdbc:postgresql://localhost:5432/exchange"
username: "user"
password: "password"
sql: |
INSERT INTO exchange_rates (currency, last_price, timestamp)
VALUES (:currency, CAST(:last_price AS DECIMAL), CAST(:timestamp AS TIMESTAMP));
parameters:
currency: "USD"
last_price: "{{ outputs.parse_response.vars.price }}"
timestamp: "{{ outputs.parse_response.vars.timestamp }}"
保存流程后,Kestra 会自动按照触发器设定的时间执行流程。
5.6 查看执行记录
进入流程详情页,点击 Executions 标签页,可以看到流程的执行历史记录:
你还可以查看每个执行的输出变量、日志以及甘特图(Gantt 图),帮助排查流程执行中的问题。
6. 总结
本文介绍了 Kestra 这个强大的开源工作流自动化平台,重点讲解了其核心概念、安装方式,并通过一个实际的比特币价格抓取流程展示了如何构建自动化流程。
通过 Kestra,你可以:
✅ 将工作流代码化,便于版本管理和协作
✅ 利用丰富插件快速构建复杂流程
✅ 自动化定时任务、依赖处理、数据流转等操作
如果你正在寻找一个轻量级、灵活且支持 YAML 配置的工作流平台,Kestra 是一个非常值得尝试的工具。