Tech 기록지/Data Engineering
[Data Engineering-8] Airflow DAGs test with python class
Lio Grande
2024. 7. 30. 13:58
Python script를 통해 DAGs를 등록하는 예제(LINK)를 수행하다가 python 내부에서 class로 구조를 구성하면 어떨까 테스트 해보았다.
#yolo_inference example from https://da2so.tistory.com/40
from datetime import datetime
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
class DAGinstance:
def __init__(self, args) -> None:
self.dag_id = args['dag_id']
self.start_date = args['start_date']
self.tags = args['tags']
self.schedule_interval = args['schedule_interval']
self.catchup = args['catchup']
self.image_dir = "/mnt/c/Users/minsi/OneDrive/바탕\ 화면/mskim/sample_imgs"
def gen_dag(self):
# Define DAG
dag = DAG(
description="https://da2so.tistory.com/40",
dag_id= self.dag_id,
start_date= self.start_date,
tags= self.tags,
schedule_interval= self.schedule_interval,
catchup= self.catchup
)
with dag:
# Struct DAGs tasks
task1 = self.task_py_operator(dag)
task2 = self.task_bash_operator(dag)
task1 >> task2
return dag
def mk_img_store(self):
Path(self.image_dir).mkdir(exist_ok=True, parents=True)
def task_py_operator(self, dag):
# Define Task #1
make_image_store = PythonOperator(
task_id="make_image_store",
python_callable=self.mk_img_store,
dag=dag
)
return make_image_store
def task_bash_operator(self, dag):
# Define Task #2
download_person_picture = BashOperator(
task_id="download_person_pricture",
bash_command=f"curl -L https://images.inc.com/uploaded_files/image/1024x576/getty_561098557_970647970450064_87098.jpg --output {Path(self.image_dir)/'image_via_instance.png'}",
dag=dag
)
return download_person_picture
# Default args for DAG
default_args = {
'dag_id': 'yolo_sample_with_instance',
'start_date': datetime(2024, 7, 30),
'tags' : ["yolo5"],
'schedule_interval': None,
'catchup': False
}
# Create class instance
dag_instance = DAGinstance(default_args)
# Run method of the class instance
a = dag_instance.gen_dag()
해당 코드를 작성하면서 이해한 바는 다음과 같다.
1. DAG object는 global variable 단위로 올려야 Airflow UI에서 제대로 인식이 되는것 같다. (참고 LINK)
2. gen_dag()로 구현된 함수는 반환값을 가져야 하며, if __name__ == '__main__' 으로 감싸면 안된다.
3. Airflow set downstream 부분 (>>)은 with 문법을 통해 object 종속성을 구현한다