Tech 기록지/Data Engineering

[Data Engineering-8] Airflow DAGs test with python class

Lio Grande 2024. 7. 30. 13:58

Python script를 통해 DAGs를 등록하는 예제(LINK)를 수행하다가 python 내부에서 class로 구조를 구성하면 어떨까 테스트 해보았다.

 

#yolo_inference example from https://da2so.tistory.com/40
from datetime import datetime
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator


class DAGinstance:
    def __init__(self, args) -> None:
        self.dag_id = args['dag_id']
        self.start_date = args['start_date']
        self.tags = args['tags']
        self.schedule_interval = args['schedule_interval']
        self.catchup = args['catchup']

        self.image_dir = "/mnt/c/Users/minsi/OneDrive/바탕\ 화면/mskim/sample_imgs"

    def gen_dag(self):
        # Define DAG
        dag = DAG(
            description="https://da2so.tistory.com/40", 
            dag_id= self.dag_id, 
            start_date= self.start_date, 
            tags= self.tags, 
            schedule_interval= self.schedule_interval, 
            catchup= self.catchup
        )

        with dag:
            # Struct DAGs tasks
            task1 = self.task_py_operator(dag)
            task2 = self.task_bash_operator(dag)

            task1 >> task2

        return dag

    def mk_img_store(self):
        Path(self.image_dir).mkdir(exist_ok=True, parents=True)

    def task_py_operator(self, dag):
        # Define Task #1
        make_image_store = PythonOperator(
            task_id="make_image_store", 
            python_callable=self.mk_img_store, 
            dag=dag
        )
        return make_image_store
    
    def task_bash_operator(self, dag):
        # Define Task #2
        download_person_picture = BashOperator(
            task_id="download_person_pricture", 
            bash_command=f"curl -L https://images.inc.com/uploaded_files/image/1024x576/getty_561098557_970647970450064_87098.jpg --output {Path(self.image_dir)/'image_via_instance.png'}", 
            dag=dag
        )
        return download_person_picture


# Default args for DAG
default_args = {
    'dag_id': 'yolo_sample_with_instance', 
    'start_date': datetime(2024, 7, 30), 
    'tags' : ["yolo5"], 
    'schedule_interval': None, 
    'catchup': False
}

# Create class instance
dag_instance = DAGinstance(default_args)
# Run method of the class instance
a = dag_instance.gen_dag()

 

해당 코드를 작성하면서 이해한 바는 다음과 같다.

 

1. DAG object는 global variable 단위로 올려야 Airflow UI에서 제대로 인식이 되는것 같다. (참고 LINK)

2. gen_dag()로 구현된 함수는 반환값을 가져야 하며, if __name__ == '__main__' 으로 감싸면 안된다.

3. Airflow set downstream 부분 (>>)은 with 문법을 통해 object 종속성을 구현한다