- DAGs:即有向无环图(Directed Acyclic Graph),将所有需要运行的tasks按照依赖关系组织起来,描述的是所有tasks执行顺序。
- Operators:可以简单理解为一个class,描述了DAG中某个的task具体要做的事。其中,airflow内置了很多operators,如BashOperator 执行一个bash 命令,PythonOperator 调用任意的Python 函数,EmailOperator 用于发送邮件,HTTPOperator 用于发送HTTP请求, SqlOperator 用于执行SQL命令等等,同时,用户可以自定义Operator,这给用户提供了极大的便利性。
- Tasks:Task 是 Operator的一个实例,也就是DAGs中的一个node。
- Task Instance:task的一次运行。Web 界面中可以看到task instance 有自己的状态,包括”running”, “success”, “failed”, “skipped”, “up for retry”等。
- Task Relationships:DAGs中的不同Tasks之间可以有依赖关系,如 Task1 >> Task2,表明Task2依赖于Task1了。
通过将DAGs和Operators结合起来,用户就可以创建各种复杂的 工作流(workflow)
pip3 install apache-airflow[all]
上述命令为安装airflow的全部包, 可能会出现报错。下面是几种解决方法。
FIX:
python3 -m pip install -U pip
python3 -m pip install -U setuptools
FIX:
yum install mysql-devel gcc gcc-devel python-devel
FIX:
yum -y install cyrus-sasl cyrus-sasl-devel cyrus-sasl-lib
yum install unixODBC-devel
yum install python-devel openldap-devel
airflow db init
airflow users create --username admin --firstname YongHeng --lastname Wang --role Admin --email your_email@email.com
password: 123456
yum update
yum install sqlite
从https://sqlite.org/下载源代码,在本地制作并安装。
1)下载源码
[root@stg-airflow001 ~]$ wget https://www.sqlite.org/2019/sqlite-autoconf-3290000.tar.gz
2) 编译
[root@stg-airflow001 ~]$ tar zxvf sqlite-autoconf-3290000.tar.gz
[root@stg-airflow001 ~]$ cd sqlite-autoconf-3290000/
[root@stg-airflow001 ~/sqlite-autoconf-3290000]$ ./configure --prefix=/usr/local
[root@stg-airflow001 ~/sqlite-autoconf-3290000]$ make && make install
3)替换系统低版本 sqlite3
[root@stg-airflow001 ~/sqlite-autoconf-3290000]$ cd
[root@stg-airflow001 ~]$ mv /usr/bin/sqlite3 /usr/bin/sqlite3_old
[root@stg-airflow001 ~]$ ln -s /usr/local/bin/sqlite3 /usr/bin/sqlite3
[root@stg-airflow001 ~$ echo "/usr/local/lib" > /etc/ld.so.conf.d/sqlite3.conf
[root@stg-airflow001 ~]$ ldconfig
[root@stg-airflow001 ~]$ sqlite3 -version
3.29.0 2019-07-10 17:32:03 fc82b73eaac8b36950e527f12c4b5dc1e147e6f4ad2217ae43ad82882a88bfa6
airflow webserver --port 8282
airflow scheduler
airflow db init
airflow dags list
airflow tasks list tutorial
airflow tasks list tutorial --tree
airflow tasks test wyh_test print_date 2020-06-01
airflow tasks test wyh_test hello_task 2020-06-01
找到 airflow/dags
增加一个TestDag.py
from datetime import datetime, timedelta
from textwrap import dedent
from airflow import DAG
from airflow.operators.bash import BashOperator
from airflow.operators.python_operator import PythonOperator
default_args = {
'owner': 'wyh',
'depends_on_past': False,
'email': ['your@email.com'],
'email_on_failure': False,
'email_on_retry': False,
'retries': 1,
'retry_delay': timedelta(minutes=5),
}
with DAG(
'wyh_test',
default_args=default_args,
description='A simple DAG',
schedule_interval=timedelta(days=1),
start_date=datetime(2021, 11, 1),
catchup=False,
tags=['test'],
) as dag:
t1 = BashOperator(
task_id='print_date',
bash_command='date',
)
t2 = BashOperator(
task_id='sleep',
depends_on_past=False,
bash_command='sleep 5',
retries=3,
)
t1.doc_md = dedent(
"""\
#### Task Documentation
You can document your task using the attributes doc_md
(markdown),
doc
(plain text), doc_rst
, doc_json
, doc_yaml
which gets
rendered in the UI's Task Instance Details page.
![img](http://montcs.bloomu.edu/~bobmon/Semesters/2012-01/491/import%20soul.png)
"""
)
dag.doc_md = __doc__
def print_hello():
print("Hello_Airflow")
return 'Hello world!'
t3 = PythonOperator(
task_id='hello_task',
python_callable=print_hello,
)
t1 >> [t2, t3]
用 python3 TestDag.py 验证下有没有错误
接着 airflow dags list
就会看到自己定义的 dag_id filepath owner paused
刷新页面即可看到。
CREATE DATABASE airflow CHARACTER SET utf8 COLLATE utf8_unicode_ci;
CREATE USER 'airflow' IDENTIFIED BY 'airflow';
GRANT ALL PRIVILEGES ON airflow.* TO 'airflow';
flush privileges;
$AIRFLOW_HOME/airflow.cfg文件中修改
sql_alchemy_conn = mysql+pymysql://airflow:airflow@127.0.0.1/airflow
executor = LocalExecutor
airflow db reset
airflow db init
注意修改MySQL的配置否则会报错如下:
Exception: Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql
mysql -u root
show global variables like '%timestamp%';
set global explicit_defaults_for_timestamp =TRUE;
exit;
service mysql restart
接着重新注册角色用户
启动 webserver 和 schedule
要先配置好redis
vim airflow.cfg
executor = CeleryExecutor
broker_url = redis://10.11.ip.ip:6379/0
result_backend = db+mysql://airflow:airflow@10.11.ip.ip/airflow
之后启动worker
airflow celery worker -D
其实是利用Celery把任务发给集群中其他的机器,搞几台机器设置成Worker角色。
相同配置的airflow安装到其他机器上, 配置文件更要一致。
再执行 airflow celery worker
就激活了一个Worker.
Original: https://blog.csdn.net/wyh1618/article/details/124737786
Author: Loganer
Title: AirFlow_使用
原创文章受到原创版权保护。转载请注明出处:https://www.johngo689.com/815360/
转载文章受原作者版权保护。转载请注明原作者出处!