아파치 에어플로우 DAG 수동으로 시작하기

이전 글에서 에어플로우의 기본 개념을 살피며 두니아 생존자들이 끼니를 준비하는 과정을 DAG로 표현할 수 있음을 보였습니다.  이렇게 정의된 DAG는 schedule_interval로 전달되는 crontab 표현식을 통해 정해진 타이밍에 실행될 수 있습니다.

만약 정해진 시간이 아니라, 필요할 때 DAG를 실행하려면 어떻게 해야 할까요?  두니아 생존의 예에서는 매일 오전 10시 오후 4시가 아니라, 배가 고플 때 식사를 준비해야 한다면요?

에어플로우는 schedule_interval에 더해 외부에서 DAG를 실행할 수 있는 방식도 제공합니다.  이렇게 수동으로 실행되는 DAG의 경우, schedule_interval에 None을 대입하는 것이 일반적입니다.

 

1. DAG이 다른 DAG를 실행하는 경우

에어플로우 연산자 가운데는 다른 DAG를 실행하기 위해 존재하는 것이 있습니다.  바로 TriggerDagRunOperator가 그것입니다.  에어플로우 웹서버를 실행하면 기본적으로 보여지는 예제 가운데 example_trigger_controller_dag와 example_trigger_target_dag가 있습니다.

전자가 후자 DAG을 실행하는데, 이 때 사용되는 연산자가 TriggerDagRunOperator입니다.  example_trigger_controller_dag를 클릭하면 아래 화면이 나타납니다.

상단의 “Code”를 클릭하면 이 DAG의 코드를 확인할 수 있습니다.

from airflow import DAG
from airflow.operators.dagrun_operator import TriggerDagRunOperator
from datetime import datetime

import pprint

pp = pprint.PrettyPrinter(indent=4)


def conditionally_trigger(context, dag_run_obj):
    """This function decides whether or not to Trigger the remote DAG"""
    c_p =context['params']['condition_param']
    print("Controller DAG : conditionally_trigger = {}".format(c_p))
    if context['params']['condition_param']:
        dag_run_obj.payload = {'message': context['params']['message']}
        pp.pprint(dag_run_obj.payload)
        return dag_run_obj


# Define the DAG
dag = DAG(dag_id='example_trigger_controller_dag',
          default_args={"owner": "airflow",
                        "start_date": datetime.now()},
          schedule_interval='@once')


# Define the single task in this controller example DAG
trigger = TriggerDagRunOperator(task_id='test_trigger_dagrun',
                                trigger_dag_id="example_trigger_target_dag",
                                python_callable=conditionally_trigger,
                                params={'condition_param': True,
                                        'message': 'Hello World'},
                                dag=dag)

위 코드의 conditionally_trigger 함수는example_trigger_target_dag를 실행할지, 만약 그렇다면 어떤 context를 payload로 target_dag로 전달할지 결정해 줍니다.

DAG의 schedule_interval은 @once이므로 이 DAG는 단 한 번 실행되며, 이 때 target_dag를 실행하거나 아무 일도 하지 않게 됩니다.

TriggerDagRunOperator의 params 값에서 condition_param이 True로 설정되어 있으므로 target_dag는 항상 실행됩니다.  message는 target_dag의 payload로 전달됩니다.

 

2. CLI에서 airflow 명령을 사용하는 경우

커맨드 라인에서 airflow trigger_dag 명령을 이용해서 DAG를 실행할 수도 있습니다.  이 때는 실행하고자 하는 DAG의 dag_id만 전달하면 됩니다.  만약 dag_id가 dunia라면 명령은 다음과 같습니다.

airflow trigger_dag dunia

옵션으로 실행 설정이나 시간을 함께 전달할 수도 있습니다.

 

3. REST API를 사용하는 방법

만약 타 어플리케이션에서 DAG를 실행하고 싶다면 REST API를 이용하는 것이 가장 깔끔한 방식일 것입니다.  아래 링크에서 REST API를 통해 에어플로우의 여러 기능을 이용할 수 있게 하는 플러그인인 airflow-rest-api-plugin을 다운로드 받을 수 있습니다.

https://github.com/teamclairvoyant/airflow-rest-api-plugin

다운로드 받은 zip파일을 풀어서 에어플로우 플러그인 디렉토리에 복사하고, 에어플로우 웹 서버를 재시동 하기만 하면 바로 사용할 수 있습니다.  trigger_dab 뿐만 아니라 CLI로 제공되는 모든 기능들을 REST API 인터페이스로 실행할 수 있습니다.

 

이상으로 DAG를 수동으로 실행할 수 있는 세 가지 방법을 살펴 봤습니다.



댓글 남기기