본문 바로가기

Development

세상에서 가장 간단한 Airflow 튜토리얼

 

머신 러닝 파이프라인의 필요성

일반적으로 학계에서 딥 러닝을 연구한 연구자들은 잘 정제되어 있는 벤치마크 데이터셋에서 명확한 평가 지표를 갖고, 성능을 향상시키기 위한 모델 개선을 고민하는 데에 익숙합니다.

하지만 이러한 연구자들이 현실의 문제를 해결하기 위해 Wild Environment에 발을 내딛으면 여러 종류의 애로사항에 직면합니다. 정제되어 있는 데이터셋은 없으며, 주기적으로 라벨이 수정되거나 새로운 데이터가 끊임 없이 계속 쌓입니다. 그때마다 새로 학습 데이터셋을 구성해야 하며, 평가 데이터셋 또한 주기적으로 업데이트해야 할 것입니다. 또한, 좀 더 유저 경험을 고려한 새로운 평가 지표를 개발해야 하며, 때로는 단일 모델만 사용하는 게 아니라 여러 개의 모델과 연동하기 위해 각각의 모델을 위한 데이터셋을 구성해 학습하고 다시 각각의 모델을 결합해 인퍼런스를 수행하고 성능을 측정해야 합니다.

여기에 더 나아가 제품 탑재를 고려해 서빙이나 중간 과정 모니터링, 데이터와 모델의 버전 컨트롤까지 고려한다면 해야 할 게 너무나 많습니다.

 

MLOps의 설명 글에서 항상 나오는 그 그림입니다. 출처: https://developers.google.com/machine-learning/testing-debugging/pipeline/overview

 

데이터가 업데이트될 때마다, 모델이 변경될 때마다, 연동되는 모델의 구성이 달라질 때마다, 중간에 에러가 발생하거나 원하는 수준의 성능이 나오지 않을 때마다, 매번 일일이 데이터셋을 다시 구성하고 학습 스크립트를 실행하고, 평가를 진행하고, 연동된 모델들의 성능을 측정하는 등 모든 과정에서 수많은 노력과 시간을 필요로 하게 됩니다. 결국 연구자가 가장 잘할 수 있는 모델 개선이 아닌 다른 반복되는 일들에 시간과 노력을 쏟아야 하는 상황이 온다는 말입니다.

이렇게 비효율적으로 반복되는 과정을 개선하기 위해, 각 과정들의 입출력만 표준화하고 각 테스크의 결과에 따른 조건부 처리를 미리 정의하고, 특정 조건에 따른 트리거를 설정하여 모든 과정들을 파이프라인에 맞춰 실행하는 Workflow를 생각해볼 수 있을 것입니다. 이러한 Workflow가 한 번 잘 구축된다면, 연구자는 정말 모델 개선과 연구 자체에만 집중할 수 있을 것입니다.

 

 


Apache Airflow는 여러가지 태스크들(데이터셋 생성, 모델 학습 등)을 일련의 그래프로 연결하고 스케줄링, 모니터링 등 파이프라인 관리를 위한 다양한 기능을 제공하고 있는 Workflow Management Platform입니다.

 

Airflow 설치

Argo, Luigi, Kubeflow 등 머신 러닝 파이프라인을 위한 다양한 툴들이 있습니다. 이번 글에서는 Pipeline 및 Workflow 기능 자체에 충실하면서도 다양한 부가 기능을 제공하며, 많은 사용자와 Maintainer를 보유하고 있는 Apache Airflow에 대해 다뤄보도록 하겠습니다.


Airflow를 사용하기 위해서는 메타데이터를 저장하는 MySQL, Celery backend를 제공하는 Redis, 그리고 무엇보다 필수적인 Airflow를 설치해야 합니다. 이번 글에서는 CeleryExecutor를 사용하지 않기 때문에 Redis를 반드시 설치해야 하는 것은 아니지만, 편의상 포함하도록 하겠습니다.

MySQL, Redis, Airflow를 설치하는 방법은 다양하지만, 이번 글에서는 Docker Container 기반 설치를 소개하겠습니다. 또한 하나의 Container 안에 MySQL, Redis, Airflow를 다 설치할 수도 있겠지만, 이번 글에서는 MySQL Container, Redis Container를 따로 구성하고 로컬에서 Airflow를 설치하는 방법을 설명합니다.

먼저 MySQL과 Redis Container를 만들어보겠습니다.

docker pull mysql:8.0
docker pull redis:5.0
docker run -d --name mysql -p 3306:3306 -v /data/mysql:/var/lib/mysql -e MYSQL_ROOT_PASSWORD=root mysql:8.0
docker run -d --name redis -p 6379:6379 -v /data/redis:/data -e REDIS_PASSWORD=airflow redis:5.0

이렇게 Container 두 개를 생성했습니다.
다음은 Airflow를 설치합니다. 이 포스팅에서는 MySQL, Redis를 사용할 예정이기 때문에 아래의 명령어로 설치를 합니다.

pip install 'apache-airflow[mysql,redis]==1.10.5'

설치 후 MySQL에 Airflow용 데이터베이스를 생성합니다.

mysql -h127.0.0.1 -uroot -proot
mysql > create user 'airflow'@'%' identified by 'airflow'
mysql > grant all privileges on *.* to 'airflow'@'%'
mysql > flush privileges
mysql > create database airflow

데이터베이스를 생성하는데 주의할 것이 있습니다. 첫번째 줄에 127.0.0.1로 기입이 되어 있지만, 이 포스팅은 다른 도커 컨테이너 내에서 Airflow가 Mysql에 접속을 시도하기 때문에 해당 노드(컴퓨터)의 ip를 직접 기입해야 합니다. 예를 들어 설치한 호스트의 IP가 "183.27.11.28"일 때 -h127.0.0.1 -h183.27.11.28과 같이 수정해야 합니다.


다음으로 Airflow의 Configuration을 해당 포스팅의 설치법에 맞게 수정합니다. 일반적으로 Airflow의 Configuration파일은 /root/airflow에 위치해 있으며 파일 이름은 airflow.cfg입니다.

# executor = SquentialExecutor
executor = LocalExecutor

# sql_alchemy_conn = sqlite:////home/airflow/airflow/airflow.db
sql_alchemy_conn = mysql+pymysql://airflow:airflow@xx.xx.xx.xx:3306/airflow

# catchup_by_default = True
catchup_by_default = False

# broker_url = sqla+mysql://airflow:airflow@127.0.0.1:3306/airflow
broker_url = redis://airflow@xx.xx.xx.xx:6379

# result_backend = db+mysql://airflow:airflow@localhost:3306/airflow
result_backend = db+mysql://airflow:airflow@xx.xx.xx.xx:3306/airflow

# load_examples = True
load_examples = False

 

이제 설정 및 설치가 다 끝났습니다. Airflow를 실행시켜봅시다.

airflow initdb
airflow webserver
airflow scheduler
airflow worker
airflow flower

airflow webserver를 실행 했을 때 잘 설치되었을 경우 나타나는 모습입니다.

 

Airflow에서 Worflow 만들어서 실행하기

이제 설치도 끝냈고 Airflow 실행도 해보았으니 Workflow를 만들어서 실행시켜봅시다.
가장 먼저 어떤 workflow를 만들어볼까요? 여기서는 간단하게 파일을 쓰고 읽고 삭제하는 하나의 workflow를 만들어 보겠습니다.

cd /root/airflow/dags && echo hello airflow > test.txt
cd /root/airflow/dags && cat test.txt
cd /root/airflow/dags && rm test.txt

위 코드 라인을 순서대로 설명하자면,

  1. /root/airflow/dags로 이동하여 hello airflow를 test.txt에 작성하고 저장합니다.
  2. 잘 저장이 되었는지 cat 명령어로 읽어봅니다.
  3. 다음으로는 test.txt파일을 삭제합니다.

위에서 설명한 3가지 단계를 Airflow를 사용해서 프로세스 자동화를 해보겠습니다. 먼저 workflow.py를 생성해봅니다. 여기서 중요한 것은 airflow.cfg의 가장 상단에 있는 dags_folder에 존재하는 Workflow 파이썬 파일만 인식한다는 것입니다.

이번에는 airflow.cfg에서 dags_folder를 /root/airflow/dags로 수정했기 때문에 해당 폴더로 이동해서 아래의 파일(echo_test.py)을 생성하도록 합니다. 이때 파일의 이름은 중요하지 않습니다. Airflow는 파이썬 파일 내의 dag_id만을 이용하여 등록하기 때문입니다.

from airflow import models
from airflow.operators.bash_operator import BashOperator
from datetime import datetime, timedelta

default_args = {
        'owner': 'airflow',
        'depends_on_past': False,
        'start_date': datetime(2020, 2, 9),
        'retries': 1,
        'retry_delay': timedelta(minutes=5)}

with models.DAG(
        dag_id='echo_test', description='echo_test',
        schedule_interval=None,
        max_active_runs=5,
        concurrency=10,
        default_args=default_args) as dag:

    text_file_path = '/root/airflow/dags'

    #### create txt file  --> 텍스트 파일을 생성합니다
    create_text_file_command = f'cd {text_file_path} && echo hello airflow > test.txt'
    create_text_file = BashOperator(
            task_id='create_text_file',
            bash_command=create_text_file_command,
            dag=dag)

    #### cat txt file  --> 텍스트 파일을 읽습니다
    read_text_file_command = f'cd {text_file_path} && cat test.txt'
    read_text_file = BashOperator(
            task_id='cat_text_file',
            bash_command=read_text_file_command,
            dag=dag)

    #### remove txt file  --> 텍스트 파일을 삭제합니다.
    remove_text_file_command = f'cd {text_file_path} && rm test.txt'
    remove_text_file = BashOperator(
            task_id='remove_text_file',
            bash_command=remove_text_file_command,
            dag=dag)

    create_text_file >> read_text_file >> remove_text_file  # 이것은 위의 task를 이어주는 줄입니다.

위의 파이썬 파일을 작성하고 난 후 airflow scheduler를 프롬프트에 입력함으로써 등록합니다. 등록하고 웹서버에 접속하면 아래와 같은 그림이 나옵니다. 

echo_test라는 DAG가 잘 등록되어 있는 것을 확인할 수 있습니다.

echo_test 왼쪽의 Off 버튼을 클릭하여 On으로 수정한 후 echo_test를 클릭하여 내부로 들어갑니다.

들어가서 GraphView 버튼을 눌러 Workflow가 잘 만들어졌는지 확인을 하고 Trigger DAG 버튼을 눌러 Workflow를 실행합니다. 완료되면 초록색 선으로 박스의 윤곽선이 바뀌면서 완료되었음을 확인할 수 있습니다.

다음으로 cat_text_file이 잘 수행되었는지 확인해야 합니다. cat_text_file을 클릭하고 ViewLog를 클릭하여 제대로 읽었는지 확인합니다.

빨간색 박스를 확인해보니 hello airflow가 잘 출력된 것을 확인할 수 있습니다.

여기까지 Airflow를 설치하고, 세상에서 가장 간단한 Workflow를 만든 후 실행해보았습니다.

이 포스팅에서는 세상에서 가장 간단한 튜토리얼을 수행해보았지만, 이를 조금 응용하여 더 많은 Task에 대해 복잡한 Worflow를 만들면 아래와 같은 복잡한 파이프라인을 만들 수 있습니다.

일반적인 Detection + Classification의 Workflow입니다. 데이터셋 생성부터 학습, 평가까지 하나의 Workflow로 만들고 각 모델의 성능평가와 연동 성능을 평가합니다.



이와 같이 머신 러닝 파이프라인을 만들어, 비용이 많이 드는 반복되는 과정을 자동화하여 연구자들이 모델 개선과 같은 연구 자체에만 집중할 수 있는 환경을 만들고, 제품에 투입할 모델을 제공하기 위해 좀 더 안정적인 모델 학습 및 평가 파이프라인을 구축할 수 있을 것입니다. 마찬가지로 Scale-Up을 고려하면 수십 개의 Task나 제품군을 운영하는 상황에서 또한 머신 러닝 파이프라인 구축이 필수적입니다.

이번 글을 통해 딥 러닝 모델을 실제 제품에 운영할 때 고려해야 하는 머신 러닝 파이프라인의 필요성과 간단한 구축 방법을 파악할 수 있으셨길 바랍니다.

 

 

[##차금강##]