티스토리 뷰
1. Airflow란?
Apache Airflow는 초기 에어비엔비(Airfbnb) 엔지니어링 팀에서 개발한 워크플로우 오픈 소스 플랫폼
** 워크플로우란? : 의존성으로 연결된 작업(Task)들의 집합
(ex) ETL의 경우 Extractaction > Transformation > Loading 의 작업의 흐름
프로그래밍 방식으로 워크플로우를 작성, 예약 및 모니터링
2. Airflow 기본 구성 및 작동 원리
(1) Airflow Key Concept
a. DAG (Directed Acyclic Graph)
- 단어 뜻 그대로 순환하지 않는 그래프, DAG(대그)라고 부름
- 반복이나 순환을 허용하지 않음
- 순차적으로 작업(task)이 이루어지며, 순환 실행을 방지하기 때문에 매우 중요함
→ 논리적 오류는 교착상태(deadlock)로 이어짐
b. Operator
- Operator(Task)를 정의하는데 사용
- Operator Type
- Action Operators
- 기능이나 명령을 실행하는 오퍼레이터
- 실제 연산을 수행, 데이터 추출 및 프로세싱
- (참고) 내장 Operators는 BashOperator, PythonOperator, EmailOperator
- 이외의 오퍼레이터는 공식 documnet 참고 (링크)
- Transfer Operater
- 하나의 시스템을 다른 시스템으로 옮김 (데이터를 Source에서 Destination으로 전송 등 )
- 예를 들어, Presto에서 MySQL로 데이터를 전송하는데에 사용
- Sensor Operators
- 조건이 만족할 때까지 기다렸다가, 조건이 충족되면 다음 Task를 실행시킴
- Action Operators
c. Task & Task Instance
- Task : 데이터 파이프라인에 존재하는 Operator를 의미
- Operator를 실행하면 Task가 됨
- Task Instance
- 데이터 파이프 라인이 Trigger되어 실행될 때 생성된 Task를 Task Instance라고 함
- 태스크 실행 순서 정의
- 오른쪽 시프트 연산자(binary right shift operator), 즉 rshift(>>)를 사용하여 태스크의 의존성 정의함
- Task1 >> Task2 >> Task3 [Task1, Task2] >> Task3
- Task VS Operator
- 사용자 관점에서는 두 용어를 같은 의미지만 Task 는 작업의 올바른 실행을 보장하기 위한 Manager임
- 사용자는 Operator를 사용해서 수행할 작업에 집중하며, Airflow는 태스크를 통해 작업을 올바르게 실행함
(2) Airflow component
- 에어플로우는 웹서버, 스케줄러, metastore, Executor, Worker로 크게 5개의 기본 구성으로 이루어져 있음
- 웹서버 : 웹 대시보드 UI로 스케줄러에서 분석한 Dag를 시각화하고 DAG 실행과 결과를 확인할 수 있는 인터페이스를 제공함
- 스케줄러 : DAG를 분석하고 현재 시점에서 Dag의 스케줄이 지난 경우 airflow worker에 DAG의 태스크를 예약함
- Worker : 예약된 태스크를 실제로 실행시키는 것
- Metastore : 에어플로우에 있는 Dag, Task등의 메타데이터 관리
- Executor : 태스크가 어떻게 실행되는지 정의
Airflow 구성요소
- Airflow 스케줄러
DAG를 분석하고 Airflow 워커에 DAG 태스크를 예약합니다. - Airflow 워커
예약된 태스크를 선택하고 실행 - Airflow 웹서버
스케줄러에서 분석한 DAG를 시각화하고 DAG 실행과 결과를 확인할 수 있는 인터페이스 제공
(3) Airflow 기본 동작 원리
- 유저가 새로운 Dag를 작성 → Dags Foolder 안에 py 파일 배치
- Web Server와 Scheuler가 파싱하여 읽어옴
- Scheduler가 Metastore를 통해 DagRun 오브젝터를 생성함
- DagRun은 사용자가 작성한 Dag의 인스턴스임
DagRun Status : Running
- DagRun은 사용자가 작성한 Dag의 인스턴스임
- 스케줄러는 Task Instance Object를 스케줄링함
- Dag Run object의 인스턴스임
- 트리거가 상황이 맞으면 Scheduler가 Task Instance를 Executor로 보냄
- Exeutor는 Task Instance를 실행시킴
- 완료 후 → MetaStore에 완료했다고 보고함
- 완료된 Task Instance는 Dag Run에 업데이트됨
- Scheduler는 Dag 실행이 완료되었는지 Metastore를 통해 확인 후에 Dag Run의 생태를 완료로 바꿈
DagRun Status : Completed
- Metastore가 Webserver에 업데이트해서 사용자도 확인
3. Airflow 장단점
(1) 장점
- 파이썬 코드를 이용하여 파이프라인을 구현하므로 파이썬 언어에서 구현할 수 있는 대부분의 방법을 사용하여 복잡한 커스텀 파이프라인을 만들 수 있음
- 확장성 - 파이썬 기반으로 쉽게 확장 가능하고 다양한 시스템과 통합이 가능
- 다양한 유형의 DB, Cloud Service 등 통합 가능
- 확장성 - 파이썬 기반으로 쉽게 확장 가능하고 다양한 시스템과 통합이 가능
- 데이터 인프라 관리, 데이터 웨어하우스 구축, 머신러닝/분석/실험에 데이터 환경 구성에 유용함
- 에어플로우의 스케줄링 기능으로 DAG에 정의된 특정 시점에 트리거할 수 있을 뿐만 아니라 최종 시점과 예상되는 다음 스케줄 주기를 상세하게 알려줌
- 백필 기능을 사용하면 과거 데이터를 손쉽게 재처리할 수 있기에 코드를 변경후 재생성이 필요한 데이터 재처리가 가능함
(2) 단점
- 파이썬 경험이 없는 경우 DAG 구성의 어려움이 있을 수 있음
- 초기 설치는 간단해 보일지라도 작은 환경 변화에도 작동에 오류가 나는 경우가 있어 롤백하는 경우가 꽤 있음
(3) 주의 사항
- Data Streaming Solution 적용하기엔 적합하지 않음
- 초단위의(그 이하) 데이터 처리가 필요한 경우에 사용하기에는 부적절함
- 에어플로우는 반복적이거나 배치 태스크를 실행하는 기능에 초점이 맞춰져 있음
- Data Processing Framework (like Flink, Spark, Hadoop, etc ..)로 사용하는 것은 부적절함
- 데이터 프로세싱 작업에 최적화 되어있지않아서 매우 느림
- 경우에 따라 메모리 부족으로 작업이 진행되지 않을 수도 있음
→ SparkSubmitOperator와 같은 Operator를 이용하여, 데이터 프로세싱은 Spark와 같은 외부 Framework로 처리
- 파이프라인 규모가 커지면 파이썬 코드가 굉장히 복잡해질 수 있음 → 초기 사용 시점에 엄격한 관리를 해야함
참조 : https://gngsn.tistory.com/264
#1. Setting venv
먼저, Python 가상환경을 생성합니다.
1.1. venv 생성
가상 환경을 생성합니다.
$ python -m venv .cymvenv
1.2. 가상 환경 활성화: Activate venv
가상 환경을 생성했다면, 이번엔 가상 환경을 활성화 시킵니다.
N개의 가상 환경이 있을 때, 사용할 가상 환경을 선택해서 해당 환경만을 활성화 시켜야겠죠.
$ source .cymvenv/bin/activate
1.3. 가상 환경 활성 여부 확인
가상 환경의 파이썬을 사용하는지 which 명령어로 확인해보겠습니다.
현재 제작한 프로젝트 하위에 잘 생성된 것을 확인할 수 있습니다.
$ pwd
/data/Python-3.9.5
$ which python3
/data/Python-3.9.5/.cymvenv/bin/python3
활성 전의 명령과 비교해보실 수 있습니다.
$ which python3
/data/Python-3.9.5/.cymvenv/bin/python3
$ deactivate {{your_env}}
# example.
$ deactivate .cymvenv
#2. Install Airfow
2.1. Airflow Home 설정
환경 변수를 통해 Airflow Home 위치를 설정합니다.
$ export AIRFLOW_HOME=~/airflow
2.2. Airflow 설치
Airflow를 설치하기 전에, 어떤 Airflow 버전과 Dependencies를 설치할 지 알아보도록 하겠습니다.
AIRFLOW_VERSION=2.7.1
PYTHON_VERSION="$(python3 --version | cut -d " " -f 2 | cut -d "." -f 1-2)"
CONSTRAINT_URL="https://raw.githubusercontent.com/apache/airflow/constraints-${AIRFLOW_VERSION}/constraints-${PYTHON_VERSION}.txt"
이제, Airflow를 설치할 수 있습니다.
$ pip3 install "apache-airflow==${AIRFLOW_VERSION}" --constraint "${CONSTRAINT_URL}"
일반적인 방식의 Python 모듈 설치에 비해 조금 번거로운 과정을 거쳐야 하는데요. Airflow는 library인 동시에 application이기 때문에, 설치 과정에서 버전 호환이나 호환되지 않는 Dependencies 문제가 발생할 수 있습니다.
그래서 Airflow 팀은 버전을 명시하게 하도록 setup.cfg
나 setup.py
를 통해서도 사용자들이 버전을 지정할 수 있게 제작했습니다.
#3. Verify installed Airflow
먼저, 모든 Dependencies에 문제가 없는지 확인합니다.
$ pip3 check
No broken requirements found.
이후, Airflow가 정상적으로 설치되었는지 확인합니다.
$ python3 -m airflow
Usage: airflow [-h] GROUP_OR_COMMAND ...
Positional Arguments:
GROUP_OR_COMMAND
...
#4. Initializing database
이번에는 Airflow의 초기 설정을 진행합니다.
아래 명령을 톡해 Metastore와 DB 등을 초기 생성하도록 하겠습니다.
$ airflow db init
이후, AIRFLOW_HOME 변수 명으로 아래와 같은 airflow 폴더 아래의 파일들을 확인할 수 있습니다.
$ cd $AIRFLOW_HOME
$ pwd
/Users/gyeongsun/airflow # "~/airflow"과 동일
airflow.cfg
: airflow 설정 파일airflow.db
: SQLite DB 파일/logs
: Log directory
FYI.
개인적으로, Airflow에서 제공하는 기본 예제들을 포함하지 않고 실행하고 싶기 때문에 Example 로드 설정을 off로 설정하겠습니다.
# ~/airflow/airflow.cfg
100 # Whether to load the DAG examples that ship with Airflow. It's good to
101 # get started, but you probably want to set this to ``False`` in a production
102 # environment
103 #
104 # Variable: AIRFLOW__CORE__LOAD_EXAMPLES
105 #
106 load_examples = False
#5. Create User
airflow에 접근할 사용자를 생성합니다.
$ airflow users create \
--username admin \
--firstname gngsn \
--lastname park \
--role Admin \
--email gngsn@example.com
Password: <<your_password>>
Repeat for confirmation: <<your_password>>
# [Log] User "admin" created with role "Admin"
#6. Run Airlfow webserver
이번엔 ariflorw가 제공하는 webserver를 실행시킵니다.
$ airflow webserver --port 9090
그리고, username과 입력한 password를 사용해서 airlfow webserver에 로그인합니다.
위와 같은 Airflow Webserver 페이지를 확인할 수 있습니다.
Troubleshooting
만약, 아래와 같이 airflow 명령어 실행이 실패했다면,
bash: airflow: command not found"
Airflow 명령을 실행시켜주기 위해 airflow 명령 파일의 위치를 PATH 시스템 환경 변수에 추가해줍니다.
PATH=$PATH:~/.local/bin
Docker
이번에는 Docker를 통해 Airflow를 실행하는 방식을 다뤄보겠습니다.
#1. Fetch docker-compose file
가장 먼저, docker-compose 파일을 가져오겠습니다.
기호에 따라, Airflow를 관리하기 위한 폴더 (airflow-local) 를 생성합니다.
# optional
~$ mkdir airflow-local
~$ cd airflow-local
Airflow docker production을 curl 명령어를 통해 다운받습니다.
~/airflow-local$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.1/docker-compose.yaml'
docker-compose.yaml 에는 아래와 같은 서비스들이 정의됩니다.
✔️ airflow-scheduler
Scheduler는 모든 Task와 DAG를 모니터링하고, Task이 실행될 수 있는 상태일 때 (이전 의존성이 실행 완료되었을 때) 해당 Task를 실행합니다.
✔️ airflow-webserver
Airflow의 웹 서버는 기본적으로 http://localhost:8080 을 통해 접근할 수 있습니다.
✔️ airflow-worker
Worker들은 스케줄러에 의해 전달 받은 Task를 실행합니다.
✔️ airflow-triggerer
Triggerer는 지연 가능한 작업 deferrable tasks 에 대해 이벤트 루프를 실행합니다.
✔️ airflow-init
Airflow 사용을 위해 서비스의 초기 설정을 진행합니다.
✔️ postgres
Airflow가 사용하는 database 입니다.
✔️ redis
Redis는 Scheduler에서 Worker로 메세지를 전송하는 브로커의 역할로 실행됩니다.
#2. Initializing Environment
2.1. Airflow가 사용할 폴더 생성
# docker-compose.yaml과 동일한 위치에 생성
~/airflow-local$ mkdir -p ./dags ./logs ./plugins ./config
./dags
: DAG 파일 보관 ./logs
: Task 실행 시, 혹은 Scheduler의 로그 보관./config
: 커스텀 log parser를 추가하거나 Cluster 정책을 위한 airflow_local_settings.py
를 추가할 수 있음./plugins
: 커스텀 Plugin 보관
2.2 .env 파일 생성
.env
파일을 통해 Docker 내의 Airflow UID를 설정해줍니다.
~/airflow-local$ echo -e "AIRFLOW_UID=$(id -u)" > .env
확인해보면 아래와 같은 .env
파일의 내용을 확인할 수 있습니다.
~/airflow-local$ cat .env
AIRFLOW_UID=501
Airflow의 docker-compose.yaml
의 주석을 참고해서 .env
에 설정 값을 추가할 수 있습니다.
# This configuration supports basic configuration using environment variables or an .env file
# The following variables are supported:
#
# AIRFLOW_IMAGE_NAME - Docker image name used to run Airflow.
# Default: apache/airflow:2.7.1
# AIRFLOW_UID - User ID in Airflow containers
# Default: 50000
# AIRFLOW_PROJ_DIR - Base path to which all the files will be volumed.
# Default: .
# Those configurations are useful mostly in case of standalone testing/running Airflow in test/try-out mode
#
# _AIRFLOW_WWW_USER_USERNAME - Username for the administrator account (if requested).
# Default: airflow
# _AIRFLOW_WWW_USER_PASSWORD - Password for the administrator account (if requested).
# Default: airflow
# _PIP_ADDITIONAL_REQUIREMENTS - Additional PIP requirements to add when starting all containers.
# Use this option ONLY for quick checks. Installing requirements at container
# startup is done EVERY TIME the service is started.
# A better way is to build a custom image or extend the official image
# as described in https://airflow.apache.org/docs/docker-stack/build.html.
# Default: ''
(Optional) docker-compose customizing
실행하기 전에, 몇가지 설정을 변경합니다.
현재 다른 프로젝트를 8080 포트로 사용하고 있기 때문에, docker 외부에서 접근할 port 번호를 9090 으로 설정하겠습니다.
ports: - "9090:8080"
Airflow는 기본적으로, 처음 실행 시 꽤 많은 양의 예시를 제공합니다.
Airflow의 예제를 확인하고 싶지 않다면 아래 옵션을 'true' → 'false' 로 설정합니다.
AIRFLOW__CORE__LOAD_EXAMPLES: 'false'
#3. Running Containers
database migration 과 첫 사용자 계정을 설정하기 위해 아래 명령어를 실행하세요.
~/airflow-local$ docker compose up airflow-init
가령, docker compose 파일을 통해 airflow를 실행시키면 아래와 같은 Container들이 실행됩니다.
~/airflow-local$ docker compose up
[+] Running 7/7
✔ Container gngsn-airflow-lab-redis-1 Running 0.0s
✔ Container gngsn-airflow-lab-postgres-1 Running 0.0s
✔ Container gngsn-airflow-lab-airflow-init-1 Created 0.0s
✔ Container gngsn-airflow-lab-airflow-triggerer-1 Created 0.1s
✔ Container gngsn-airflow-lab-airflow-worker-1 Created 0.1s
✔ Container gngsn-airflow-lab-airflow-scheduler-1 Created 0.1s
✔ Container gngsn-airflow-lab-airflow-webserver-1 Created 0.1s
Attaching to gngsn-airflow-lab-airflow-init-1, gngsn-airflow-lab-airflow-scheduler-1, gngsn-airflow-lab-airflow-triggerer-1, gngsn-airflow-lab-airflow-webserver-1, gngsn-airflow-lab-airflow-worker-1, gngsn-airflow-lab-postgres-1, gngsn-airflow-lab-redis-1
그럼 아래와 같은 로그를 확인할 수 있는데,
docker-compose 파일에서 설정된 아이디와 비밀번호가 모두 airflow 인 사용자를 생성합니다.
gngsn-airflow-lab-airflow-init-1 | User "airflow" created with role "Admin"
✔️ Login 정보
ID: airflow
Password: airflow
#4. Verify installed Airflow
아래와 같이 airflow-worker를 통해 airflow 명령어를 입력합니다.
$ docker compose run airflow-worker airflow info
혹은, Linux나 Mac OS를 사용하고 있다면, wrapper script를 다운받아 airflow를 조금 더 쉽게 사용할 수 있습니다.
$ curl -LfO 'https://airflow.apache.org/docs/apache-airflow/2.7.1/airflow.sh'
$ chmod +x airflow.sh
위와 같이 airlfow 실행 파일을 설치하고, 해당 파일을 통해 아래와 같이 airflow 명령어를 실행할 수 있습니다.
$ ./airflow.sh info
Running a DAG
이번엔 위에 간단히 정의한 DAG를 실행해보도록 하겠습니다.
먼저, 간단한 DAG 정의를 먼저 알아본 후, 그 중 하나를 통해 실행 해보겠습니다.
Create a DAG
먼저, DAG를 정의하는 방법은 3가지가 있습니다.
첫 번째 방법은 python의 with와 함께 아래와 같이 DAG를 정의하는 방식입니다.
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
with DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
): EmptyOperator(task_id="task")
두 번째 방법은 기본 생성자로 생성해서 Operator의 dag 인자로 넘겨주는 것입니다.
import datetime
from airflow import DAG
from airflow.operators.empty import EmptyOperator
my_dag = DAG(
dag_id="my_dag_name",
start_date=datetime.datetime(2021, 1, 1),
schedule="@daily",
)
EmptyOperator(task_id="task", dag=my_dag)
세 번째 방법은 @dag 어노테이션을 사용해 DAG Generator를 활성화할 수 있습니다.
import datetime
from airflow.decorators import dag
from airflow.operators.empty import EmptyOperator
@dag(start_date=datetime.datetime(2021, 1, 1), schedule="@daily")
def generate_dag():
EmptyOperator(task_id="task")
generate_dag()
Test a DAG
이번에는 해당 DAG를 실행시키도록 하겠습니다.
DAG를 실행하기 위해서는 ~/airflow/dags 하위에 DAG를 정의한 파일을 추가하면 됩니다.
docker 로 실행하고 있다면, 위에서 생성한 /dags 하위에 생성합니다.
# ~/airflow/dags/say-hello.py
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
# ① A DAG represents a workflow, a collection of tasks
with DAG(dag_id="demo", start_date=datetime(2023, 10, 1), schedule="0 0 * * *") as dag:
# ② Tasks are represented as operators
hello = BashOperator(task_id="hello", bash_command="echo hello")
@task()
def airflow():
print("airflow")
# ③ Set dependencies between tasks
hello >> airflow()
위와 같이 정의한 후, 아래의 명령어로 airflow가 활성되었는지를 확인합니다.
✔️ venv
$ airflow dags list
dag_id | filepath | owner | paused
=======+==============+=========+=======
demo | say-hello.py | airflow | None
✔️ docker
$ docker compose run airflow-worker airflow dags list
[+] Creating 3/0
✔ Container airflow-local-redis-1 Running 0.0s
✔ Container airflow-local-postgres-1 Running 0.0s
✔ Container airflow-local-airflow-init-1 Created 0.0s
[+] Running 3/3
✔ Container airflow-local-redis-1 Healthy 0.5s
✔ Container airflow-local-postgres-1 Healthy 0.5s
✔ Container airflow-local-airflow-init-1 Started 0.2s
dag_id | filepath | owner | paused
=======+==============+=========+=======
demo | say-hello.py | airflow | None
이제, 아래 명령어로 DAG를 실행합니다.
✔️ venv
$ airflow dags test "demo"
...
[2023-10-04T00:03:37.223+0900] {subprocess.py:86} INFO - Output:
[2023-10-04T00:03:37.226+0900] {subprocess.py:93} INFO - hello
...
✔️ docker
$ docker compose run airflow-worker airflow dags test "demo"
...
[2023-10-03T15:24:02.488+0000] {subprocess.py:86} INFO - Output:
[2023-10-03T15:24:02.489+0000] {subprocess.py:93} INFO - hello
...
다음에는 airflow 명령어를 확인해보면서, 조금 더 정밀한 스케줄링을 할 수 있도록 만들어보겠습니다.
| Reference |
https://airflow.apache.org/docs/
https://www.altexsoft.com/blog/apache-airflow-pros-cons/
⌜Data Pipelines with Apache Airflow⌟ - Oreilly
https://www.altexsoft.com/blog/apache-airflow-pros-cons/
출처: https://gngsn.tistory.com/264 [ENFJ.dev:티스토리]
- Total
- Today
- Yesterday
- 5.4.0.1072
- CVE 취약점 점검
- 쿠버네티스
- ubuntu
- 튜닝
- [오라클 튜닝] instance 튜닝2
- 여러서버 컨트롤
- 오라클 인스턴트클라이언트(InstantClient) 설치하기(HP-UX)
- MSA
- 키알리
- 오라클
- K8s
- startup 에러
- pod 상태
- 트리이스
- directory copy 후 startup 에러
- 앤시블
- (InstantClient) 설치하기(HP-UX)
- 스토리지 클레스
- 우분투
- [오라클 튜닝] sql 튜닝
- 커널
- 코로나19
- 오라클 트러블 슈팅(성능 고도화 원리와 해법!)
- 테라폼
- 설치하기(HP-UX)
- Oracle
- 버쳐박스
- 오라클 홈디렉토리 copy 후 startup 에러
- ORACLE 트러블 슈팅(성능 고도화 원리와 해법!)
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | |||
5 | 6 | 7 | 8 | 9 | 10 | 11 |
12 | 13 | 14 | 15 | 16 | 17 | 18 |
19 | 20 | 21 | 22 | 23 | 24 | 25 |
26 | 27 | 28 | 29 | 30 | 31 |