아파치 에어플로우 유용한 팁

이전 글에서 아파치 에어플로우의 기본 개념에 대해 알아 봤습니다.  이 포스트에서는 에어플로우 사용자가 알아두면 유용한 팁과 주의점을 정리해 보겠습니다.

에어플로우의 구성 요소

에어플로우는 다음 네 가지 요소로 구성됩니다.

  1. 데이터베이스: DAG 실행 정보를 보관한다.
  2. 웹 서버: GUI를 구동한다.
  3. 스케쥴러: DAG를 읽어 적절한 타이밍에 DAG를 실행한다.
  4. 작업자(workers): 작업을 실행한다.

데이터베이스

Metadata 데이터베이스는 에어플로우를 설치하는 과정 중에 다음 명령을 통해 생성됩니다.

airflow initdb  # 설치 시 한 번만 실행함

웹 서버

에어플로우가 설치되면 웹 인터페이스를 통해 DAG와 작업을 모니터하고 상태를 관리할 수 있습니다.  웹 서버는 다음 명령을 통해 8080 포트에서 실행됩니다.

airflow webserver

스케쥴러

스케쥴러는 각 DAG의 시작 시간(start_date)와 주기(schedule_interval)를 통해 어떤 DAG를 언제 실행할 지 결정합니다.

이 때 시작 시간이 과거로 설정되어 있으면 스케쥴러는 과거 시간부터 현재까지 실행되었어야 하는 DAG들을 모조리 실행 합니다.  이를 backfill이라고 부르며, DAG 옵션인 catchup을 False로 설정함으로써 비활성화 할 수 있습니다.

작업자

작업자들은 DAG를 구성하는 작업들을 실행합니다.  작업자를 어떻게 준비할지는 실행자(executor) 설정으로 결정합니다.  세 가지 대표 실행자는 다음과 같습니다.

  1. SequentialExecutor(기본): 작업자가 스케쥴러와 같은 프로세스 상에서 하나씩 구동됩니다.
  2. LocalExecutor: 작업자들이 에어플로우와 같은 서버에서 별개의 프로세스로 구동됩니다.
  3. CeleryExecutor: 작업자들이 메시지 큐 플랫폼인 Celery를 통해 작업을 할당 받습니다.

DAG 디버깅 및 테스트 단계에서는 SequentialExecutor를, 스케일을 고려해야 하는 단계에서는 CeleryExecutor를 사용합니다.

 

DAG/작업 스케쥴링

주어진 작업자 설정에 맞춰 한꺼번에 실행될 수 있는 DagRun과 작업 인스턴스의 개수의 상한선을 정해둘 필요가 있습니다.  이 때 필요한 DAG 파라미터가 max_active_runs와 concurrency입니다.

  1. max_active_runs: 동시에 실행될 수 있는 DagRun의 최대치
  2. concurrency: 동시에 실행될 수 있는 작업 인스턴스의 최대치

Backfill이 수행될 때, 이 두 파라미터 값은 무시된다고 합니다.

작업들은 서로 독립된 일들을 하게끔 디자인 합니다.  또한 DAG와 작업 간에 데이터 공유와 의존성을 최소화 합니다.

에어플로우의 기본 가정은 작업이 실패할 수 있다는 것입니다.  적절한 설정을 통해 실패된 작업은 재시도 될 수 있으므로, idempotency, 즉 작업이 반복되어도 같은 결과를 도출하게끔 작업 연산자을 작성해야 합니다.

만약, 순서대로 실행되는 작업들이 있고, 이들이 한꺼번에 성공, 실패해야 한다면 subdag을 사용합니다.  subdag의 retries 인자를 True로, 각 작업의 retries 인자를 False로 설정하고 첫 번째 작업이 초기화를 담당하게 합니다.

작업은 이외에도 다음과 같은 유용한 파라미터들을 갖습니다.

  1. pool (작업자 풀을 설정)
  2. queue (Celery를 위한 설정)
  3. execution_timeout (타임아웃 설정)
  4. trigger_rule (본 작업이 실행되기 위한 이전 작업 조건 설정)
  5. environment variables (Bash 연산자를 위한 환경변수 값)

DAG와 작업의 파라미터들은 기회가 되면 더 자세히 알아 보겠습니다.

 

동적으로 생성 가능한 DAG

DAG은 파이썬 프로그램으로 표현됩니다.  예를 들어, 동적으로 주어지는 특정 파일 리스트에 대한 DAG는 다음과 같이 작성될 수 있습니다.

for f in files:
  task = airflow.operators.PythonOperator(
    task_id='parsing_{}'.format(f),
    python_callable=process_file
    op_kwargs={'fn': f},
    dag=DAG)
  task.set_upstream(init_task)
  task.set_downstream(clean_up_task)

DAG는 스케쥴러가 시작할 때 한 번만 로드되며, 각 DagRun에 대해 다른 모양의 그래프를 지정해 줄 수 없습니다.  예를 들어, 바로 위의 예에서 files의 내용이 스케쥴러가 실행되는 동안 변화한다고 해서 각 DagRun의 그래프 형태가 변화되는 것은 아닙니다.



댓글 남기기