1. 概述

在现代数据驱动的组织中,为了做出更明智的业务决策,通常需要构建大量的数据流水线来完成数据的获取、转换、提取和存储等操作。这些流程如果缺乏统一管理,很容易变得难以维护。因此,许多组织选择使用工作流自动化平台来管理这些流程,Kestra 就是其中之一。

本文将介绍 Kestra 这个开源的工作流自动化平台,帮助你快速理解其核心概念、安装方式,并通过一个实际案例演示如何构建自动化流程。

2. Kestra 简介

Kestra 是一个开源的工作流自动化平台。它的一个显著特点是:将工作流管理视为基础设施即代码(Infrastructure as Code)。这意味着所有的流程定义和配置都通过 YAML 文件进行管理,并支持版本控制。

这种设计带来了以下优势:

✅ 易于版本控制
✅ 提高流程变更的可追溯性
✅ 增强流程配置的健壮性

3. 核心概念

在使用 Kestra 前,需要了解几个核心概念:

3.1 流程(Flow)

一个 Flow 是 Kestra 中的基本工作单元,由唯一的命名空间(Namespace)和 ID 标识。每个 Flow 可以包含多个任务(Tasks)。

3.2 任务(Task)

任务是流程中的最小执行单元,分为以下三类:

  • Flowable Tasks:控制流程逻辑,例如条件判断、分支等,类似编程语言中的 if-else 和 switch。
    • 示例:io.kestra.plugin.core.flow.If
  • Runnable Tasks:执行数据处理或与外部系统交互,例如发送 HTTP 请求、执行数据库查询。
    • 示例:io.kestra.plugin.core.http.Request
  • Script Tasks:在独立的 Docker 容器中执行任意脚本,支持多种语言,如 Python、Node.js、Shell 等。
    • 示例:io.kestra.plugin.scripts.python.Script

3.3 触发器(Trigger)

触发器用于自动启动流程,主要有以下几种类型:

  • Schedule Trigger:基于 Cron 表达式定时触发流程。
  • Flow Trigger:当另一个流程执行完成后触发当前流程。
  • Webhook Trigger:通过 HTTP Webhook 调用触发流程。

4. 安装与运行 Kestra

Kestra 支持多种部署方式,包括 Docker 容器和 Kubernetes 集群。为方便演示,我们使用 Docker 来运行 Kestra:

docker run --pull=always --rm -it -p 8080:8080 -d -v /tmp:/tmp kestra/kestra:latest server local

⚠️ 说明:local 模式使用 H2 本地数据库,仅适用于测试环境。生产环境请参考官方文档进行部署。

启动后,访问 http://localhost:8080 即可进入 Kestra 的 Web 界面:

image showing the main page to Kestra workflow automation platform

5. 实战案例:比特币价格抓取流程

为了演示 Kestra 的完整使用流程,我们构建一个每 5 分钟抓取一次比特币价格并存储到数据库的自动化流程。

5.1 使用的 API

我们使用以下 API 获取比特币价格数据:

https://blockchain.info/ticker

示例输出:

{
  "USD": {
    "15m": 30000.0,
    "last": 30000.0,
    "buy": 30000.0,
    "sell": 30000.0,
    "symbol": "USD"
  }
}

5.2 创建流程

进入 Kestra 的 Flows 页面,点击右上角的 Create 按钮,创建一个新流程。设置如下:

id: bitcoin-price-scraper
namespace: finance

5.3 定义任务

任务 1:获取比特币价格(HTTP 请求)

- id: fetch_btc_price
  type: io.kestra.plugin.core.http.Request
  uri: "https://blockchain.info/ticker"
  method: GET

任务 2:解析响应(Python 脚本)

- id: parse_response
  type: io.kestra.plugin.scripts.python.Script
  script: |
    import json
    from kestra import Kestra
    from datetime import datetime

    data = json.loads('{{ outputs.fetch_btc_price.body }}')
    Kestra.outputs({"price": data["USD"]["last"], "timestamp": datetime.utcnow().isoformat()})

任务 3:存储到数据库(PostgreSQL)

- id: store_to_db
  type: io.kestra.plugin.jdbc.postgresql.Query
  url: "jdbc:postgresql://localhost:5432/exchange"
  username: "user"
  password: "password"
  sql: |
    INSERT INTO exchange_rates (currency, last_price, timestamp)
    VALUES (:currency, CAST(:last_price AS DECIMAL), CAST(:timestamp AS TIMESTAMP));
  parameters:
    currency: "USD"
    last_price: "{{ outputs.parse_response.vars.price }}"
    timestamp: "{{ outputs.parse_response.vars.timestamp }}"

5.4 定义触发器

每 5 分钟执行一次:

triggers:
  - id: periodic_trigger
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "*/5 * * * *"

5.5 完整流程定义

id: bitcoin-price-scraper
namespace: finance

triggers:
  - id: periodic_trigger
    type: io.kestra.plugin.core.trigger.Schedule
    cron: "*/5 * * * *"

tasks:
  - id: fetch_btc_price
    type: io.kestra.plugin.core.http.Request
    uri: "https://blockchain.info/ticker"
    method: GET

  - id: parse_response
    type: io.kestra.plugin.scripts.python.Script
    script: |
      import json
      from kestra import Kestra
      from datetime import datetime

      data = json.loads('{{ outputs.fetch_btc_price.body }}')
      Kestra.outputs({"price": data["USD"]["last"], "timestamp": datetime.utcnow().isoformat()})

  - id: store_to_db
    type: io.kestra.plugin.jdbc.postgresql.Query
    url: "jdbc:postgresql://localhost:5432/exchange"
    username: "user"
    password: "password"
    sql: |
      INSERT INTO exchange_rates (currency, last_price, timestamp)
      VALUES (:currency, CAST(:last_price AS DECIMAL), CAST(:timestamp AS TIMESTAMP));
    parameters:
      currency: "USD"
      last_price: "{{ outputs.parse_response.vars.price }}"
      timestamp: "{{ outputs.parse_response.vars.timestamp }}"

保存流程后,Kestra 会自动按照触发器设定的时间执行流程。

5.6 查看执行记录

进入流程详情页,点击 Executions 标签页,可以看到流程的执行历史记录:

Image showing the flow details page. It shows the execution history of the flow

你还可以查看每个执行的输出变量、日志以及甘特图(Gantt 图),帮助排查流程执行中的问题。

6. 总结

本文介绍了 Kestra 这个强大的开源工作流自动化平台,重点讲解了其核心概念、安装方式,并通过一个实际的比特币价格抓取流程展示了如何构建自动化流程。

通过 Kestra,你可以:

✅ 将工作流代码化,便于版本管理和协作
✅ 利用丰富插件快速构建复杂流程
✅ 自动化定时任务、依赖处理、数据流转等操作

如果你正在寻找一个轻量级、灵活且支持 YAML 配置的工作流平台,Kestra 是一个非常值得尝试的工具。


原始标题:Workflow Automation Using Kestra