AirFlow_使用

  • DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
  • Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
  • Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
  • Task Instance:task的一次运行。Web 界面中可以看到task instance 有自己的状态,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
  • Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >> Task2,表明Task2依赖于Task1了。
    通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 工作流(workflow)
pip3 install apache-airflow[all]

上述命令为安装airflow的全部包, 可能会出现报错。下面是几种解决方法。

FIX:

    python3 -m pip install -U pip
    python3 -m pip install -U setuptools

FIX:

    yum install mysql-devel gcc gcc-devel python-devel

FIX:

    yum -y install cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib
    yum install unixODBC-devel
    yum install python-devel openldap-devel
airflow db init
airflow users create --username admin --firstname YongHeng --lastname Wang --role Admin --email your_email@email.com
password: 123456
    yum update
    yum install sqlite
从https://sqlite.org/下载源代码,在本地制作并安装。
1)下载源码
[root@stg-airflow001 ~]$ wget https://www.sqlite.org/2019/sqlite-autoconf-3290000.tar.gz

2) 编译
[root@stg-airflow001 ~]$ tar zxvf sqlite-autoconf-3290000.tar.gz
[root@stg-airflow001 ~]$ cd sqlite-autoconf-3290000/
[root@stg-airflow001 ~/sqlite-autoconf-3290000]$ ./configure --prefix=/usr/local
[root@stg-airflow001 ~/sqlite-autoconf-3290000]$ make && make install

3)替换系统低版本 sqlite3
[root@stg-airflow001 ~/sqlite-autoconf-3290000]$ cd
[root@stg-airflow001 ~]$ mv /usr/bin/sqlite3  /usr/bin/sqlite3_old
[root@stg-airflow001 ~]$ ln -s /usr/local/bin/sqlite3   /usr/bin/sqlite3
[root@stg-airflow001 ~$ echo "/usr/local/lib" > /etc/ld.so.conf.d/sqlite3.conf
[root@stg-airflow001 ~]$ ldconfig
[root@stg-airflow001 ~]$ sqlite3 -version
3.29.0 2019-07-10 17:32:03 fc82b73eaac8b36950e527f12c4b5dc1e147e6f4ad2217ae43ad82882a88bfa6
airflow webserver --port 8282
airflow scheduler

airflow db init

airflow dags list

airflow tasks list tutorial

airflow tasks list tutorial --tree

airflow tasks test wyh_test print_date 2020-06-01
airflow tasks test wyh_test hello_task 2020-06-01

找到 airflow/dags
增加一个TestDag.py

from datetime import datetime, timedelta
from textwrap import dedent

from airflow import DAG

from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator

default_args = {
    'owner': 'wyh',
    'depends_on_past': False,
    'email': ['your@email.com'],
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
    'retry_delay': timedelta(minutes=5),

}
with DAG(
    'wyh_test',
    default_args=default_args,
    description='A simple DAG',
    schedule_interval=timedelta(days=1),
    start_date=datetime(2021, 11, 1),
    catchup=False,
    tags=['test'],
) as dag:

    t1 = BashOperator(
        task_id='print_date',
        bash_command='date',
    )

    t2 = BashOperator(
        task_id='sleep',
        depends_on_past=False,
        bash_command='sleep 5',
        retries=3,
    )
    t1.doc_md = dedent(
        """\
    #### Task Documentation
    You can document your task using the attributes doc_md (markdown),
    doc (plain text), doc_rst, doc_json, doc_yaml which gets
    rendered in the UI's Task Instance Details page.

    ![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)

"""
    )

    dag.doc_md = __doc__

    def print_hello():
        print("Hello_Airflow")
        return 'Hello world!'

    t3 = PythonOperator(
        task_id='hello_task',
        python_callable=print_hello,
    )

    t1 >> [t2, t3]

用 python3 TestDag.py 验证下有没有错误
接着 airflow dags list
就会看到自己定义的 dag_id filepath owner paused
刷新页面即可看到。

CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
CREATE USER 'airflow' IDENTIFIED BY 'airflow';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow';
flush privileges;

$AIRFLOW_HOME/airflow.cfg文件中修改

sql_alchemy_conn = mysql+pymysql://airflow:airflow@127.0.0.1/airflow

executor = LocalExecutor


airflow db reset

airflow db init

注意修改MySQL的配置否则会报错如下:
Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql

mysql -u root
show global variables like '%timestamp%';
set global explicit_defaults_for_timestamp =TRUE;
exit;
service mysql restart

接着重新注册角色用户
启动 webserverschedule

要先配置好redis

vim airflow.cfg
executor = CeleryExecutor
broker_url = redis://10.11.ip.ip:6379/0
result_backend = db+mysql://airflow:airflow@10.11.ip.ip/airflow
之后启动worker
airflow celery worker -D

其实是利用Celery把任务发给集群中其他的机器,搞几台机器设置成Worker角色。
相同配置的airflow安装到其他机器上, 配置文件更要一致。
再执行 airflow celery worker就激活了一个Worker.

Original: https://blog.csdn.net/wyh1618/article/details/124737786
Author: Loganer
Title: AirFlow_使用

原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/815360/

转载文章受原作者版权保护。转载请注明原作者出处!

(0)

大家都在看

亲爱的 Coder【最近整理,可免费获取】👉 最新必读书单  | 👏 面试题下载  | 🌎 免费的AI知识星球