Airflow 支持海量、多源、异构平台、数据集成开发与同步。也支持调度作业复杂依赖较多的数据集成场景。
了解Airflow 前先看下Airflow 可以做什么。
Airflow使用的典型场景为:
- 多源数据集成对接
- 数据开发ETL
- 数据建仓
- 实时报表
- 数据迁移
- 数据建模分析
- 数据维护
- 日志转存
- 数据修复
- 统一任务调度
Airflow 功能非常强大,只要是工作中有和数据打交道,上面大部分使用场景都会遇到,所以Airflow应该是一个比较全能的数据开发平台。所以一起先深入了解下Airflow内的一些核心知识点,以便可以更深入的了解Airflow。
Airflow主要特点:
动态的:Airflow工作流是使用Python进行配置动态工作流(DAG)。并允许编写动态实例化工作流的代码。
可扩展的:轻松定义您自己的operators、executors并扩展库,使其符合满足您的环境的抽象级别。
优雅的:设计简洁优雅。使用模板引擎将参数化脚本内置到 Airflow 的核心中。
可伸缩的:Airflow 具有模块化架构,并使用消息队列来编排任意数量的workers。Airflow为无限扩展而生。
- Airflow.cfg:这个是 Airflow 的配置文件,定义所有其他模块需要的配置。例如:meta database、scheduler.
- Metadata Database:Airflow 使用 SQL Database 存储 meta 信息。比如 DAG、DAG RUN、task、task instance 等信息。
- Scheduler:Airflow Scheduler 是一个独立的进程,通过读取 meta database 的信息来进行 task 调度,根据 DAGs 定义生成的任务,提交到消息中间队列中(Redis 等)。
- Webserver:Airflow Webserver 也是一个独立的进程,提供 web 端服务, 定时生成子进程扫描对应的 DAG 信息,以 UI 的方式展示 DAG 或者 task 的信息。
- Worker:Airflow Worker 是独立的进程,分布在相同 / 不同的机器上,是 task 的执行节点,通过监听消息中间件(redis)领取并且执行任务。
DAGs(有向无环图):
Directed Acyclic Graph 中文解释为有向无环图,是一种被广泛是用的拓扑算法,与区块链的形式有类似性,但更为复杂。DAG原本是计算机领域一种常用数据结构,因为独特的拓扑结构所带来的优异特性,经常被用于处理动态规划、导航中寻求最短路径、数据压缩等多种算法场景。所以在任务调度中与大数据 ETL 中使用 DAGs 来描述定义调度任务就是最合适不过的了。
下图则是一段 DAG 的定义文件:描述不同的算子操作与任务执行调度顺序。
from airflow import DAG
def default_options():
default_args = {
'owner': 'airflow',
}
return default_args
# 定义DAG
def dag_httpRequest(dag):
t = "echo 'hallo world'"
task = HttpRequest(
task_id='h1',
)
return task
def dag_SQLBuild():
current_time = str(datetime.today())
print('hello world at {}'.format(current_time))
def dag_SQLBuild(dag):
# PythonOperator
task = SQLBuild(
task_id='h2',
dag=dag)
return task
def dag_DataSource(dag):
# PythonOperator
task = DataSource(
task_id='h3',
dag=dag)
return task
def dag_HTTPResponse(dag):
# DummyOperator
task = HTTPResponse(
task_id='H4',
dag=dag)
return task
with DAG(
'test_task', # dag_id
default_args=default_options(),
) as d:
task1 = dag_httpRequest(d)
task2 = dag_SQLBuild(d)
task3 = dag_DataSource(d)
task4 = dag_HTTPResponse(d)
task1 >> task2 >> task3 >> task4 # 指定执行顺序
Operator(操作算子)
Operator 是 Airflow 内的操作任务执行原子能力组件。Airflow 内置的组件十分丰富,满足我们日常数据集成 ETL常规的任务操作。如下图所示,用户也可以基于 Airflow 的基础操作算子扩展更高级的算子。
Airflow 支持的算子模型(部分)
包含很多大数据平台,在异构处理方面功能强大。
Airflow内置的算子覆盖基本所有的数据类的系统,有兴趣可以取查看实际源码。
https://github.com/apache/airflow/tree/main/airflow/providers
我们以一个内置Redis算子为例:
from typing import TYPE_CHECKING, Sequence
from airflow.models import BaseOperator
from airflow.providers.redis.hooks.redis import RedisHook
if TYPE_CHECKING:
from airflow.utils.context import Context
class RedisPublishOperator(BaseOperator):
"""
Publish a message to Redis.
:param channel: redis channel to which the message is published (templated)
:param message: the message to publish (templated)
:param redis_conn_id: redis connection to use
"""
template_fields: Sequence[str] = ('channel', 'message')
def __init__(self, *, channel: str, message: str, redis_conn_id: str = 'redis_default', **kwargs) -> None:
super().__init__(**kwargs)
self.redis_conn_id = redis_conn_id
self.channel = channel
self.message = message
def execute(self, context: 'Context') -> None:
"""
Publish the message to Redis channel
:param context: the context object
"""
redis_hook = RedisHook(redis_conn_id=self.redis_conn_id)
self.log.info('Sending message %s to Redis on channel %s', self.message, self.channel)
result = redis_hook.get_conn().publish(channel=self.channel, message=self.message)
self.log.info('Result of publishing %s', result)
代码中 execute 部分是实际业务处理逻辑。
我们也可以根据自己的需要来自定义自己的操作算子。借用平台的机制完成一些自定义业务,也可以在其中调用大数据套件,来处理海量大数据,同时 Airflow 也支持丰富的接口便于和其他系统集成来实现自动化的处理与维护。其也可以做为一个数据集成引擎供其他系统集成和调用。