自动化数据集成调度引擎Apache Airflow

Airflow 支持海量、多源、异构平台、数据集成开发与同步。也支持调度作业复杂依赖较多的数据集成场景。

了解Airflow 前先看下Airflow 可以做什么。

Airflow使用的典型场景为:

  • 多源数据集成对接
  • 数据开发ETL
  • 数据建仓
  • 实时报表
  • 数据迁移
  • 数据建模分析
  • 数据维护
  • 日志转存
  • 数据修复
  • 统一任务调度

Airflow 功能非常强大,只要是工作中有和数据打交道,上面大部分使用场景都会遇到,所以Airflow应该是一个比较全能的数据开发平台。所以一起先深入了解下Airflow内的一些核心知识点,以便可以更深入的了解Airflow。

Airflow主要特点:

动态的:Airflow工作流是使用Python进行配置动态工作流(DAG)。并允许编写动态实例化工作流的代码。
可扩展的:轻松定义您自己的operators、executors并扩展库,使其符合满足您的环境的抽象级别。
优雅的:设计简洁优雅。使用模板引擎将参数化脚本内置到 Airflow 的核心中。
可伸缩的:Airflow 具有模块化架构,并使用消息队列来编排任意数量的workers。Airflow为无限扩展而生。

  • Airflow.cfg:这个是 Airflow 的配置文件,定义所有其他模块需要的配置。例如:meta database、scheduler.
  • Metadata Database:Airflow 使用 SQL Database 存储 meta 信息。比如 DAG、DAG RUN、task、task instance 等信息。
  • Scheduler:Airflow Scheduler 是一个独立的进程,通过读取 meta database 的信息来进行 task 调度,根据 DAGs 定义生成的任务,提交到消息中间队列中(Redis 等)。
  • Webserver:Airflow Webserver 也是一个独立的进程,提供 web 端服务, 定时生成子进程扫描对应的 DAG 信息,以 UI 的方式展示 DAG 或者 task 的信息。
  • Worker:Airflow Worker 是独立的进程,分布在相同 / 不同的机器上,是 task 的执行节点,通过监听消息中间件(redis)领取并且执行任务。

DAGs(有向无环图):

Directed Acyclic Graph 中文解释为有向无环图,是一种被广泛是用的拓扑算法,与区块链的形式有类似性,但更为复杂。DAG原本是计算机领域一种常用数据结构,因为独特的拓扑结构所带来的优异特性,经常被用于处理动态规划、导航中寻求最短路径、数据压缩等多种算法场景。所以在任务调度中与大数据 ETL 中使用 DAGs 来描述定义调度任务就是最合适不过的了。
下图则是一段 DAG 的定义文件:描述不同的算子操作与任务执行调度顺序。

from airflow import DAG

def default_options():
    default_args = {
        'owner': 'airflow',
    }
    return default_args


# 定义DAG
def dag_httpRequest(dag):
    t = "echo 'hallo world'"
    task = HttpRequest(
        task_id='h1',
    )
    return task


def dag_SQLBuild():
    current_time = str(datetime.today())
    print('hello world at {}'.format(current_time))


def dag_SQLBuild(dag):
    # PythonOperator
    task = SQLBuild(
        task_id='h2',
        dag=dag)
    return task

def dag_DataSource(dag):
    # PythonOperator
    task = DataSource(
        task_id='h3',
        dag=dag)
    return task

def dag_HTTPResponse(dag):
    # DummyOperator
    task = HTTPResponse(
        task_id='H4',
        dag=dag)
    return task


with DAG(
        'test_task',  # dag_id
        default_args=default_options(),   
) as d:
    task1 = dag_httpRequest(d)
    task2 = dag_SQLBuild(d)
    task3 = dag_DataSource(d)
    task4 = dag_HTTPResponse(d)
    task1 >> task2 >> task3 >> task4 # 指定执行顺序

Operator(操作算子)

Operator 是 Airflow 内的操作任务执行原子能力组件。Airflow 内置的组件十分丰富,满足我们日常数据集成 ETL常规的任务操作。如下图所示,用户也可以基于 Airflow 的基础操作算子扩展更高级的算子。

Airflow 支持的算子模型(部分)

包含很多大数据平台,在异构处理方面功能强大。

Airflow内置的算子覆盖基本所有的数据类的系统,有兴趣可以取查看实际源码。

https://github.com/apache/airflow/tree/main/airflow/providers

我们以一个内置Redis算子为例:

from typing import TYPE_CHECKING, Sequence

from airflow.models import BaseOperator
from airflow.providers.redis.hooks.redis import RedisHook

if TYPE_CHECKING:
    from airflow.utils.context import Context


class RedisPublishOperator(BaseOperator):
    """
    Publish a message to Redis.
    :param channel: redis channel to which the message is published (templated)
    :param message: the message to publish (templated)
    :param redis_conn_id: redis connection to use
    """

    template_fields: Sequence[str] = ('channel', 'message')

    def __init__(self, *, channel: str, message: str, redis_conn_id: str = 'redis_default', **kwargs) -> None:

        super().__init__(**kwargs)
        self.redis_conn_id = redis_conn_id
        self.channel = channel
        self.message = message

    def execute(self, context: 'Context') -> None:
        """
        Publish the message to Redis channel
        :param context: the context object
        """
        redis_hook = RedisHook(redis_conn_id=self.redis_conn_id)

        self.log.info('Sending message %s to Redis on channel %s', self.message, self.channel)

        result = redis_hook.get_conn().publish(channel=self.channel, message=self.message)

        self.log.info('Result of publishing %s', result)

代码中 execute 部分是实际业务处理逻辑。

我们也可以根据自己的需要来自定义自己的操作算子。借用平台的机制完成一些自定义业务,也可以在其中调用大数据套件,来处理海量大数据,同时 Airflow 也支持丰富的接口便于和其他系统集成来实现自动化的处理与维护。其也可以做为一个数据集成引擎供其他系统集成和调用。

发表回复

您的邮箱地址不会被公开。 必填项已用 * 标注