티스토리 뷰

카테고리 없음

Apache Airflow, 설치

미니대왕님 2024. 6. 14. 14:09

1. Airflow란?

Apache Airflow는 초기 에어비엔비(Airfbnb) 엔지니어링 팀에서 개발한 워크플로우 오픈 소스 플랫폼
** 워크플로우란? : 의존성으로 연결된 작업(Task)들의 집합
(ex) ETL의 경우 Extractaction > Transformation > Loading 의 작업의 흐름

프로그래밍 방식으로 워크플로우를 작성, 예약 및 모니터링

 


2. Airflow 기본 구성 및 작동 원리

(1) Airflow Key Concept

a. DAG (Directed Acyclic Graph)

  • 단어 뜻 그대로 순환하지 않는 그래프, DAG(대그)라고 부름
  • 반복이나 순환을 허용하지 않음
  • 순차적으로 작업(task)이 이루어지며, 순환 실행을 방지하기 때문에 매우 중요함
    → 논리적 오류는 교착상태(deadlock)로 이어짐

b. Operator

  • Operator(Task)를 정의하는데 사용
  • Operator Type
    • Action Operators
      • 기능이나 명령을 실행하는 오퍼레이터
      • 실제 연산을 수행, 데이터 추출 및 프로세싱
      • (참고) 내장 Operators는 BashOperator, PythonOperator, EmailOperator
        • 이외의 오퍼레이터는 공식 documnet 참고 (링크)
    • Transfer Operater
      • 하나의 시스템을 다른 시스템으로 옮김 (데이터를 Source에서 Destination으로 전송 등 )
      • 예를 들어, Presto에서 MySQL로 데이터를 전송하는데에 사용
    • Sensor Operators
      • 조건이 만족할 때까지 기다렸다가, 조건이 충족되면 다음 Task를 실행시킴

c. Task & Task Instance

  • Task : 데이터 파이프라인에 존재하는 Operator를 의미
    • Operator를 실행하면 Task가 됨
  • Task Instance
    • 데이터 파이프 라인이 Trigger되어 실행될 때 생성된 Task를 Task Instance라고 함
    • 태스크 실행 순서 정의
      • 오른쪽 시프트 연산자(binary right shift operator), 즉 rshift(>>)를 사용하여 태스크의 의존성 정의함
      • Task1 >> Task2 >> Task3 [Task1, Task2] >> Task3
  • Task VS Operator
    • 사용자 관점에서는 두 용어를 같은 의미지만 Task 는 작업의 올바른 실행을 보장하기 위한 Manager임
    • 사용자는 Operator를 사용해서 수행할 작업에 집중하며, Airflow는 태스크를 통해 작업을 올바르게 실행함
    → 사용자는 각 환경별 작업이 잘 이루어지는지 확인하기 위해서 Operator내 코드를 구성하는데에 집중하고, Airflow는 각 Operator 내의 구성 요소들이 전부 잘 맞아야 작업이 이루어지는 형태의 차이로 보임

(2) Airflow component

  • 에어플로우는 웹서버, 스케줄러, metastore, Executor, Worker로 크게 5개의 기본 구성으로 이루어져 있음
    • 웹서버 : 웹 대시보드 UI로 스케줄러에서 분석한 Dag를 시각화하고 DAG 실행과 결과를 확인할 수 있는 인터페이스를 제공함
    • 스케줄러 : DAG를 분석하고 현재 시점에서 Dag의 스케줄이 지난 경우 airflow worker에 DAG의 태스크를 예약함
    • Worker : 예약된 태스크를 실제로 실행시키는 것
    • Metastore : 에어플로우에 있는 Dag, Task등의 메타데이터 관리
    • Executor : 태스크가 어떻게 실행되는지 정의

Airflow 구성요소

  • Airflow 스케줄러
    DAG를 분석하고 Airflow 워커에 DAG 태스크를 예약합니다.
  • Airflow 워커
    예약된 태스크를 선택하고 실행
  • Airflow 웹서버
    스케줄러에서 분석한 DAG를 시각화하고 DAG 실행과 결과를 확인할 수 있는 인터페이스 제공

 

(3) Airflow 기본 동작 원리

  1. 유저가 새로운 Dag를 작성 → Dags Foolder 안에 py 파일 배치
  2. Web Server와 Scheuler가 파싱하여 읽어옴
  3. Scheduler가 Metastore를 통해 DagRun 오브젝터를 생성함
    1. DagRun은 사용자가 작성한 Dag의 인스턴스임
      DagRun Status : Running
  4. 스케줄러는 Task Instance Object를 스케줄링함
    1. Dag Run object의 인스턴스임
  5. 트리거가 상황이 맞으면 Scheduler가 Task Instance를 Executor로 보냄
  6. Exeutor는 Task Instance를 실행시킴
  7. 완료 후 → MetaStore에 완료했다고 보고함
    1. 완료된 Task Instance는 Dag Run에 업데이트됨
    2. Scheduler는 Dag 실행이 완료되었는지 Metastore를 통해 확인 후에 Dag Run의 생태를 완료로 바꿈
      DagRun Status : Completed
  8. Metastore가 Webserver에 업데이트해서 사용자도 확인

3. Airflow 장단점


(1) 장점

  • 파이썬 코드를 이용하여 파이프라인을 구현하므로 파이썬 언어에서 구현할 수 있는 대부분의 방법을 사용하여 복잡한 커스텀 파이프라인을 만들 수 있음
    • 확장성 - 파이썬 기반으로 쉽게 확장 가능하고 다양한 시스템과 통합이 가능
      • 다양한 유형의 DB, Cloud Service 등 통합 가능
  • 데이터 인프라 관리, 데이터 웨어하우스 구축, 머신러닝/분석/실험에 데이터 환경 구성에 유용함
  • 에어플로우의 스케줄링 기능으로 DAG에 정의된 특정 시점에 트리거할 수 있을 뿐만 아니라 최종 시점과 예상되는 다음 스케줄 주기를 상세하게 알려줌
  • 백필 기능을 사용하면 과거 데이터를 손쉽게 재처리할 수 있기에 코드를 변경후 재생성이 필요한 데이터 재처리가 가능함

(2) 단점

  • 파이썬 경험이 없는 경우 DAG 구성의 어려움이 있을 수 있음
  • 초기 설치는 간단해 보일지라도 작은 환경 변화에도 작동에 오류가 나는 경우가 있어 롤백하는 경우가 꽤 있음

(3) 주의 사항

  • Data Streaming Solution 적용하기엔 적합하지 않음
    • 초단위의(그 이하) 데이터 처리가 필요한 경우에 사용하기에는 부적절함
    • 에어플로우는 반복적이거나 배치 태스크를 실행하는 기능에 초점이 맞춰져 있음
  • Data Processing Framework (like Flink, Spark, Hadoop, etc ..)로 사용하는 것은 부적절함
    • 데이터 프로세싱 작업에 최적화 되어있지않아서 매우 느림
    • 경우에 따라 메모리 부족으로 작업이 진행되지 않을 수도 있음
       SparkSubmitOperator와 같은 Operator를 이용하여, 데이터 프로세싱은 Spark와 같은 외부 Framework로 처리
  • 파이프라인 규모가 커지면 파이썬 코드가 굉장히 복잡해질 수 있음 → 초기 사용 시점에 엄격한 관리를 해야함

 

참조 : https://gngsn.tistory.com/264

 

Apache Airflow, 어렵지 않게 시작하기

Airflow의 간단한 Demo를 제작하며 Airflow에 익숙해지는 것이 본 포스팅의 목표입니다. 안녕하세요. 이번에는 짧게 Airflow 시리즈를 작성해보려고 합니다. 본 포스팅은 Airflow에 대한 가장 기본이 되는

gngsn.tistory.com

 

#1. Setting venv

먼저, Python 가상환경을 생성합니다.

 

1.1. venv 생성

가상 환경을 생성합니다.

 

$ python -m venv .cymvenv

 

 

1.2. 가상 환경 활성화: Activate venv

가상 환경을 생성했다면, 이번엔 가상 환경을 활성화 시킵니다.

N개의 가상 환경이 있을 때, 사용할 가상 환경을 선택해서 해당 환경만을 활성화 시켜야겠죠.

 

$ source .cymvenv/bin/activate

 

 

1.3. 가상 환경 활성 여부 확인

가상 환경의 파이썬을 사용하는지 which 명령어로 확인해보겠습니다.

현재 제작한 프로젝트 하위에 잘 생성된 것을 확인할 수 있습니다.

 

$ pwd
/data/Python-3.9.5

$ which python3
/data/Python-3.9.5/.cymvenv/bin/python3

활성 전의 명령과 비교해보실 수 있습니다.

 

$ which python3
/data/Python-3.9.5/.cymvenv/bin/python3

 

$ deactivate {{your_env}}

# example.
$ deactivate .cymvenv

 

 

#2. Install Airfow

🔗 Official link

 

 

2.1. Airflow Home 설정

환경 변수를 통해 Airflow Home 위치를 설정합니다.

 

$ export AIRFLOW_HOME=~/airflow

 

 

2.2. Airflow 설치

Airflow를 설치하기 전에, 어떤 Airflow 버전과 Dependencies를 설치할 지 알아보도록 하겠습니다.

 

AIRFLOW_VERSION=2.7.1
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"

 

 이제, Airflow를 설치할 수 있습니다.

 

$ pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"

 

 

일반적인 방식의 Python 모듈 설치에 비해 조금 번거로운 과정을 거쳐야 하는데요. Airflow는 library인 동시에 application이기 때문에, 설치 과정에서 버전 호환이나 호환되지 않는 Dependencies 문제가 발생할 수 있습니다.

 

그래서 Airflow 팀은 버전을 명시하게 하도록 setup.cfgsetup.py 를 통해서도 사용자들이 버전을 지정할 수 있게 제작했습니다.

 

#3. Verify installed Airflow

먼저, 모든 Dependencies에 문제가 없는지 확인합니다.

 

$ pip3 check
No broken requirements found.

 

이후, Airflow가 정상적으로 설치되었는지 확인합니다.

 

$ python3 -m airflow
Usage: airflow [-h] GROUP_OR_COMMAND ...

Positional Arguments:
  GROUP_OR_COMMAND 
...

 

 

 

#4. Initializing database

이번에는 Airflow의 초기 설정을 진행합니다.

아래 명령을 톡해 Metastore와 DB 등을 초기 생성하도록 하겠습니다.

 

$ airflow db init

 

이후, AIRFLOW_HOME 변수 명으로 아래와 같은 airflow 폴더 아래의 파일들을 확인할 수 있습니다.

 

$ cd $AIRFLOW_HOME
$ pwd
/Users/gyeongsun/airflow   # "~/airflow"과 동일

 

airflow.cfg : airflow 설정 파일
airflow.db : SQLite DB 파일
/logs : Log directory

 

 

FYI.

개인적으로, Airflow에서 제공하는 기본 예제들을 포함하지 않고 실행하고 싶기 때문에 Example 로드 설정을 off로 설정하겠습니다.

 

# ~/airflow/airflow.cfg

 100 # Whether to load the DAG examples that ship with Airflow. It's good to
 101 # get started, but you probably want to set this to ``False`` in a production
 102 # environment
 103 # 
 104 # Variable: AIRFLOW__CORE__LOAD_EXAMPLES
 105 #
 106 load_examples = False

 

 

 

 

#5. Create User

airflow에 접근할 사용자를 생성합니다.

 

$ airflow users create \
    --username admin \
    --firstname gngsn \
    --lastname park \
    --role Admin \
    --email gngsn@example.com

Password: <<your_password>>
Repeat for confirmation: <<your_password>>

# [Log] User "admin" created with role "Admin"

 

 

 

#6. Run Airlfow webserver

이번엔 ariflorw가 제공하는 webserver를 실행시킵니다.

 

$ airflow webserver --port 9090

 

그리고, username과 입력한 password를 사용해서 airlfow webserver에 로그인합니다.

 

 

 

 

위와 같은 Airflow Webserver 페이지를 확인할 수 있습니다.

 

 

 

 

Troubleshooting

만약, 아래와 같이 airflow 명령어 실행이 실패했다면,

 

bash: airflow: command not found"

 

Airflow 명령을 실행시켜주기 위해 airflow 명령 파일의 위치를 PATH 시스템 환경 변수에 추가해줍니다.

 

PATH=$PATH:~/.local/bin

 

 

 

 

 

Docker

이번에는 Docker를 통해 Airflow를 실행하는 방식을 다뤄보겠습니다.

 

 

#1. Fetch docker-compose file

가장 먼저, docker-compose 파일을 가져오겠습니다.

기호에 따라, Airflow를 관리하기 위한 폴더 (airflow-local) 를 생성합니다.

 

# optional
~$ mkdir airflow-local
~$ cd airflow-local

 

Airflow docker production을 curl 명령어를 통해 다운받습니다.

 

~/airflow-local$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.1/docker-compose.yaml'

 

docker-compose.yaml 에는 아래와 같은 서비스들이 정의됩니다.

 

✔️ airflow-scheduler

Scheduler는 모든 Task와 DAG를 모니터링하고, Task이 실행될 수 있는 상태일 때 (이전 의존성이 실행 완료되었을 때) 해당 Task를 실행합니다.

 

✔️ airflow-webserver

Airflow의 웹 서버는 기본적으로 http://localhost:8080 을 통해 접근할 수 있습니다.

✔️ airflow-worker

Worker들은 스케줄러에 의해 전달 받은 Task를 실행합니다.

✔️ airflow-triggerer

Triggerer는 지연 가능한 작업 deferrable tasks 에 대해 이벤트 루프를 실행합니다.


✔️ airflow-init

Airflow 사용을 위해 서비스의 초기 설정을 진행합니다.

✔️ postgres

Airflow가 사용하는 database 입니다.

✔️ redis

Redis는 Scheduler에서 Worker로 메세지를 전송하는 브로커의 역할로 실행됩니다.

 

 

 

 

#2. Initializing Environment

2.1. Airflow가 사용할 폴더 생성

 

# docker-compose.yaml과 동일한 위치에 생성
~/airflow-local$ mkdir -p ./dags ./logs ./plugins ./config

 

./dags : DAG 파일 보관 
./logs : Task 실행 시, 혹은 Scheduler의 로그 보관
./config : 커스텀 log parser를 추가하거나 Cluster 정책을 위한 airflow_local_settings.py를 추가할 수 있음
./plugins : 커스텀 Plugin 보관

 

 

 

2.2 .env 파일 생성 

.env 파일을 통해 Docker 내의 Airflow UID를 설정해줍니다. 

 

~/airflow-local$ echo -e "AIRFLOW_UID=$(id -u)" > .env

 

확인해보면 아래와 같은 .env 파일의 내용을 확인할 수 있습니다.

 

~/airflow-local$ cat .env
AIRFLOW_UID=501

 

Airflow의 docker-compose.yaml의 주석을 참고해서 .env 에 설정 값을 추가할 수 있습니다. 

 

# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME      - Docker image name used to run Airflow.
#                                                       Default: apache/airflow:2.7.1
# AIRFLOW_UID                         - User ID in Airflow containers
#                                                       Default: 50000
# AIRFLOW_PROJ_DIR             - Base path to which all the files will be volumed.
#                                                       Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME   - Username for the administrator account (if requested).
#                                Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD   - Password for the administrator account (if requested).
#                                Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
#                                Use this option ONLY for quick checks. Installing requirements at container
#                                startup is done EVERY TIME the service is started.
#                                A better way is to build a custom image or extend the official image
#                                as described in https://airflow.apache.org/docs/docker-stack/build.html.
#                                Default: ''

 

 

(Optional) docker-compose customizing

 

실행하기 전에, 몇가지 설정을 변경합니다.

현재 다른 프로젝트를 8080 포트로 사용하고 있기 때문에, docker 외부에서 접근할 port 번호를 9090 으로 설정하겠습니다.

 

ports: - "9090:8080"

 

Airflow는 기본적으로, 처음 실행 시 꽤 많은 양의 예시를 제공합니다. 

Airflow의 예제를 확인하고 싶지 않다면 아래 옵션을 'true' → 'false' 로 설정합니다.

 

AIRFLOW__CORE__LOAD_EXAMPLES: 'false'

 

 

 

#3. Running Containers

database migration 과 첫 사용자 계정을 설정하기 위해 아래 명령어를 실행하세요.

 

~/airflow-local$ docker compose up airflow-init

 

가령, docker compose 파일을 통해 airflow를 실행시키면 아래와 같은 Container들이 실행됩니다.

 

~/airflow-local$ docker compose up
[+] Running 7/7
 ✔ Container gngsn-airflow-lab-redis-1              Running                                                                                         0.0s 
 ✔ Container gngsn-airflow-lab-postgres-1           Running                                                                                         0.0s 
 ✔ Container gngsn-airflow-lab-airflow-init-1       Created                                                                                         0.0s 
 ✔ Container gngsn-airflow-lab-airflow-triggerer-1  Created                                                                                         0.1s 
 ✔ Container gngsn-airflow-lab-airflow-worker-1     Created                                                                                         0.1s 
 ✔ Container gngsn-airflow-lab-airflow-scheduler-1  Created                                                                                         0.1s 
 ✔ Container gngsn-airflow-lab-airflow-webserver-1  Created                                                                                         0.1s 
Attaching to gngsn-airflow-lab-airflow-init-1, gngsn-airflow-lab-airflow-scheduler-1, gngsn-airflow-lab-airflow-triggerer-1, gngsn-airflow-lab-airflow-webserver-1, gngsn-airflow-lab-airflow-worker-1, gngsn-airflow-lab-postgres-1, gngsn-airflow-lab-redis-1

 

그럼 아래와 같은 로그를 확인할 수 있는데,

docker-compose 파일에서 설정된 아이디와 비밀번호가 모두 airflow 인 사용자를 생성합니다.

 

gngsn-airflow-lab-airflow-init-1  | User "airflow" created with role "Admin"

 

✔️ Login 정보

ID: airflow
Password: airflow

 

 

#4. Verify installed Airflow

아래와 같이 airflow-worker를 통해 airflow 명령어를 입력합니다.

 

$ docker compose run airflow-worker airflow info

 

혹은, Linux나 Mac OS를 사용하고 있다면, wrapper script를 다운받아 airflow를 조금 더 쉽게 사용할 수 있습니다.

 

$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.1/airflow.sh'
$ chmod +x airflow.sh

 

위와 같이 airlfow 실행 파일을 설치하고, 해당 파일을 통해 아래와 같이 airflow 명령어를 실행할 수 있습니다.

 

$ ./airflow.sh info

 

 

 

 

 

Running a DAG

이번엔 위에 간단히 정의한 DAG를 실행해보도록 하겠습니다.

먼저, 간단한 DAG 정의를 먼저 알아본 후, 그 중 하나를 통해 실행 해보겠습니다.

 

 

Create a DAG

먼저, DAG를 정의하는 방법은 3가지가 있습니다.

 

첫 번째 방법은 python의 with와 함께 아래와 같이 DAG를 정의하는 방식입니다.

 

import datetime

from airflow import DAG
from airflow.operators.empty import EmptyOperator

with DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
): EmptyOperator(task_id="task")

 

두 번째 방법은 기본 생성자로 생성해서 Operator의 dag 인자로 넘겨주는 것입니다.

 

 import datetime

 from airflow import DAG
 from airflow.operators.empty import EmptyOperator

 my_dag = DAG(
     dag_id="my_dag_name",
     start_date=datetime.datetime(2021, 1, 1),
     schedule="@daily",
 )
 EmptyOperator(task_id="task", dag=my_dag)

 

세 번째 방법은 @dag 어노테이션을 사용해 DAG Generator를 활성화할 수 있습니다.

 

import datetime

from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator

@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
    EmptyOperator(task_id="task")

generate_dag()

 

 

Test a DAG

이번에는 해당 DAG를 실행시키도록 하겠습니다. 

DAG를 실행하기 위해서는 ~/airflow/dags 하위에 DAG를 정의한 파일을 추가하면 됩니다.

docker 로 실행하고 있다면, 위에서 생성한 /dags 하위에 생성합니다.

 

# ~/airflow/dags/say-hello.py

from datetime import datetime

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

# ① A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2023, 10, 1), schedule="0 0 * * *") as dag:

    # ② Tasks are represented as operators
    hello = BashOperator(task_id="hello", bash_command="echo hello")

    @task()
    def airflow():
        print("airflow")

    # ③ Set dependencies between tasks
    hello >> airflow()

 

위와 같이 정의한 후, 아래의 명령어로 airflow가 활성되었는지를 확인합니다.

 

✔️ venv

$  airflow dags list
dag_id | filepath     | owner   | paused
=======+==============+=========+=======
demo   | say-hello.py | airflow | None

 

✔️ docker

$ docker compose run airflow-worker airflow dags list
[+] Creating 3/0
 ✔ Container airflow-local-redis-1         Running                                                                                                                    0.0s 
 ✔ Container airflow-local-postgres-1      Running                                                                                                                    0.0s 
 ✔ Container airflow-local-airflow-init-1  Created                                                                                                                    0.0s 
[+] Running 3/3
 ✔ Container airflow-local-redis-1         Healthy                                                                                                                    0.5s 
 ✔ Container airflow-local-postgres-1      Healthy                                                                                                                    0.5s 
 ✔ Container airflow-local-airflow-init-1  Started                                                                                                                    0.2s 

dag_id | filepath     | owner   | paused
=======+==============+=========+=======
demo   | say-hello.py | airflow | None

 

 

이제, 아래 명령어로 DAG를 실행합니다.

 

✔️ venv

$ airflow dags test "demo"
...
[2023-10-04T00:03:37.223+0900] {subprocess.py:86} INFO - Output:
[2023-10-04T00:03:37.226+0900] {subprocess.py:93} INFO - hello
...

 

 

✔️ docker

$ docker compose run airflow-worker airflow dags test "demo"
...
[2023-10-03T15:24:02.488+0000] {subprocess.py:86} INFO - Output:
[2023-10-03T15:24:02.489+0000] {subprocess.py:93} INFO - hello
...

 

 

다음에는 airflow 명령어를 확인해보면서, 조금 더 정밀한 스케줄링을 할 수 있도록 만들어보겠습니다.

 

 

 

 

 

 

 

| Reference |

 

https://airflow.apache.org/docs/

https://www.altexsoft.com/blog/apache-airflow-pros-cons/

⌜Data Pipelines with Apache Airflow⌟ - Oreilly

https://www.altexsoft.com/blog/apache-airflow-pros-cons/

 

출처: https://gngsn.tistory.com/264 [ENFJ.dev:티스토리]

댓글