airflow 的安装与部署

前言

airflow 是 airbnb 开源的由 python 语言开发的一款任务调度管理平台,可以代替 linux 系统的 crontab, 但比 crontab 更强大,支持以编程(Python 脚本)的方式进行任务创建、调度以及监控任务。它还可以通过 DAG(directed acyclic graphs, 有向无环图)编排任务之间的依赖。

安装介质

  • 系统:linux
  • python 3.6.7
  • airflow 1.10.1
  • mysql 5.7
  • redis 5.0
  • airflow-scheduler-failover-controller

Python3 安装

  1. 解压安装包: tar xf Python-3.6.7.tar.xz
  2. 配置安装路径:
    1. cd Python-3.6.7
    2. ./configure –prefix=/home/dip/python3 (安装在 dip用户权限目录)(若无 gcc,则root 安装 yum install gcc)
  3. 编译安装: make && make install
  4. 将 Python3 添加到 PATH
    1. vi ~/.bash_profile
    2. 插入: export PATH=~/python3/bin:$PATH
    3. source .bash_profile
  5. 测试: python3 -V | pip3 -V (出现版本号即成功)

airflow 安装

联网在线可以通过命令 pip3 install apache-airflow[all] 安装

  1. 解压缩包: tar zxvf airflow.tar.gz
  2. 设置 AIRFLOW_HOME
    1. vi .bash_profile
    2. insert: export AIRFLOW_HOME=~/airflow
    3. source .bash_profile
  3. 离线安装 airflow
    1. cd airflow
    2. export SLUGIFY_USES_TEXT_UNIDECODE=yes
    3. pip3 install apache-airflow[all] –no-index -f ./
    4. 备注:
    • OSError: mysql_config not found: yum install mysql-devel
    • gcc: error trying to exec ‘cc1plus’: execvp: 没有那个文件或目录: yum install gcc-c++
    • sasl/saslwrapper.h:22:23: 错误:sasl/sasl.h:没有那个文件或目录: yum install cyrus-sasl-devel.x86_64
  4. cd $AIRFLOW_HOME, 执行 airflow version 查看安装的版本信息,并查看目录是否有 airflow.cfg 文件

    1. 修改配置文件

      • executor = LocalExecutor / CeleryExecutor(分布式任务时,需要启动 airflow worker)
      • sql_alchemy_conn = mysql://{user_name}:{password}@{ip_address}:3306/database_name
      • broker_url = redis://127.0.0.1:6379/0
      • celery_result_backend = redis://127.0.0.1:6379/1
      • 时区配置

        1. 默认时区:default_timezone = Asia/Shanghai
        2. web 界面右上角显示时间:修改${PYTHON_HOME}/lib/python3.6/site-packages/airflow/www/templates/admin/master.html 中 var UTCseconds = x.getTime()
        3. webserver lastRun:${PYTHON_HOME}/lib/python3.6/site-packages/airflow/models.py在get_last_dagrun方法上添加

          1
          2
          3
          4
          5
          def utc2local(self,utc):
          import time
          epoch = time.mktime(utc.timetuple())
          offset = datetime.fromtimestamp(epoch) - datetime.utcfromtimestamp(epoch)
          return utc + offset

          修改${PYTHON_HOME}/lib/python3.6/site-packages/airflow/www/templates/airflow/dags.html中 last_run.execution_date.strftime(“%Y-%m-%d %H:%M”)和last_run.start_date.strftime(“%Y-%m-%d %H:%M”)分别为:
          dag.utc2local(last_run.execution_date).strftime(“%Y-%m-%d %H:%M”)
          dag.utc2local(last_run.start_date).strftime(“%Y-%m-%d %H:%M”)

    • 邮件服务配置

      1
      2
      3
      4
      5
      6
      7
      8
      9
      10
      11
      [smtp]
      #If you want airflow to send emails on retries, failure, and you want to use
      #the airflow.utils.email.send_email_smtp function, you have to configure an
      smtp_host = smtp.163.com
      smtp_starttls = True
      smtp_ssl = False
      #Uncomment and set the user/pass settings if you want to use SMTP AUTH
      smtp_user = mailExample@163.com
      smtp_password = password
      smtp_port = 25
      smtp_mail_from = mailExample@163.com
    • 修改Log地址

      1
      2
      3
      4
      [core]
      base_log_folder = /servers/logs/airflow
      [scheduler]
      child_process_log_directory = servers/logs/airflow/scheduler
    • 修改 webserver 地址与端口(默认8080)

      1
      2
      [webserver]
      base_url = http://host:port
    • 修改 scheduler 线程数

      1
      parallelism = 32 (默认为32)
    • 修改检测新 dag 间隔

      1
      min_file_process_interval = 0 (默认为0,可修改为10)
    • 不加载 example dag

      1
      load_examples = False
    • web auth

      1
      2
      authenticate = True
      auth_backend = airflow.contrib.auth.backends.password_auth
    1. airflow initdb (airflow resetdb)
    2. airflow webserver -D (后台模式-D)
    3. airflow scheduler -D
    4. airflow worker -D (多节点需要启动, airflow flower 启动Flower查看node)

mysql 5.7 安装

  1. 安装步骤略(在线可通过 apt 和 yum 安装,离线可官网下载 deb 和 RPM 包安装)
  2. 创建用户和数据库
    1. 新建数据库:create database airflow;
    2. 创建新用户:
      • 所有IP访问: create user 'airflow'@'%' identified by 'airflow';
      • 只允许本地访问: create user 'airflow'@'localhost' identified by 'airflow';
    3. 为新用户授权:
      • grant all on airflow.* to 'airflow'@'%';
      • flush privileges;
  3. 可能存在问题:
    • 初始化 airflow db时 Global variable explicit_defaults_for_timestamp needs to be on (1) for mysql错误
      修改 mysql.cnf 文件,mysql –help | grep my.cnf
      加入: explicit_defaults_for_timestamp=true
      重启 mysql

安装 redis

  1. 解压: tar zxvf redis-5.0.2.tar.gz
  2. 编译安装:
    1. cd redis-5.0.2
    2. 编译: make
    3. 验证: make test
    4. 复制配置文件到 src: cp redis.conf src/ #将配置文件复制以可执行文件同一目录
    5. 修改配置文件: 是否需要修改 bind 为 0.0.0.0
  3. 后台运行: nohup ./redis-server redis.conf 2>1&

airflow-scheduler-failover-controller 组件扩展 scheduler 服务主从高可用

  1. 下载组件包,离线模式不用
  2. 安装:
    1. cd {AIRFLOW_SCHEDULER_FAILOVER_CONTROLLER}
    2. pip3 install -e .
  3. 初始化 failover: scheduler_failover_controller init (会在 airflow.cfg 文件中追加内容)
  4. 在 airflow.cfg 中更改 failover 配置: scheduler_nodes_in_cluster= host1,host2
  5. 配置 failover 机器免密登录,测试:scheduler_failover_controller test_connection
  6. 启动 failover : scheduler_failover_controller start

可能遇到的问题

  1. 生成 fernet key

    1
    2
    3
    from cryptography.fernet import Fernet
    fernet_key= Fernet.generate_key()
    print(fernet_key) # your fernet_key, keep it in secured place!
  2. 生成 auth user 和 password
    navigate to the airflow installation directory

  • $ cd ~/airflow
  • $ python
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    >>> import airflow
    >>> from airflow import models, settings
    >>> from airflow.contrib.auth.backends.password_auth import PasswordUser
    >>> user = PasswordUser(models.User())
    >>> user.username = 'new_user_name'
    >>> user.email = 'new_user_email@example.com'
    >>> user.password = 'set_the_password'
    >>> session = settings.Session()
    >>> session.add(user)
    >>> session.commit()
    >>> session.close()
    >>> exit()
hudengjin wechat
huprince's 微信公众号
激情打赏,放肆挥霍