Notice
Recent Posts
Recent Comments
Link
일 | 월 | 화 | 수 | 목 | 금 | 토 |
---|---|---|---|---|---|---|
1 | 2 | 3 | 4 | 5 | 6 | |
7 | 8 | 9 | 10 | 11 | 12 | 13 |
14 | 15 | 16 | 17 | 18 | 19 | 20 |
21 | 22 | 23 | 24 | 25 | 26 | 27 |
28 | 29 | 30 |
Tags
- elastic
- grok
- CSV
- ubuntu
- Crontab
- ELK
- devtools
- package.json
- DSL
- json
- query
- venv
- Tutorial
- elasticsearch
- KoA
- node.js
- OPCUA
- windows
- 7.7.1
- configure
- typescript
- PYTHON
- filebeat
- path.data
- dense_vector
- framework
- airflow
- Data Engineering
- logstash
- kibana
Archives
- Today
- Total
Gibbs Kim's playground
[Data Engineering-8] Airflow DAGs test with python class 본문
Tech 기록지/Data Engineering
[Data Engineering-8] Airflow DAGs test with python class
Lio Grande 2024. 7. 30. 13:58Python script를 통해 DAGs를 등록하는 예제(LINK)를 수행하다가 python 내부에서 class로 구조를 구성하면 어떨까 테스트 해보았다.
#yolo_inference example from https://da2so.tistory.com/40
from datetime import datetime
from pathlib import Path
from airflow import DAG
from airflow.operators.python import PythonOperator
from airflow.operators.bash import BashOperator
class DAGinstance:
def __init__(self, args) -> None:
self.dag_id = args['dag_id']
self.start_date = args['start_date']
self.tags = args['tags']
self.schedule_interval = args['schedule_interval']
self.catchup = args['catchup']
self.image_dir = "/mnt/c/Users/minsi/OneDrive/바탕\ 화면/mskim/sample_imgs"
def gen_dag(self):
# Define DAG
dag = DAG(
description="https://da2so.tistory.com/40",
dag_id= self.dag_id,
start_date= self.start_date,
tags= self.tags,
schedule_interval= self.schedule_interval,
catchup= self.catchup
)
with dag:
# Struct DAGs tasks
task1 = self.task_py_operator(dag)
task2 = self.task_bash_operator(dag)
task1 >> task2
return dag
def mk_img_store(self):
Path(self.image_dir).mkdir(exist_ok=True, parents=True)
def task_py_operator(self, dag):
# Define Task #1
make_image_store = PythonOperator(
task_id="make_image_store",
python_callable=self.mk_img_store,
dag=dag
)
return make_image_store
def task_bash_operator(self, dag):
# Define Task #2
download_person_picture = BashOperator(
task_id="download_person_pricture",
bash_command=f"curl -L https://images.inc.com/uploaded_files/image/1024x576/getty_561098557_970647970450064_87098.jpg --output {Path(self.image_dir)/'image_via_instance.png'}",
dag=dag
)
return download_person_picture
# Default args for DAG
default_args = {
'dag_id': 'yolo_sample_with_instance',
'start_date': datetime(2024, 7, 30),
'tags' : ["yolo5"],
'schedule_interval': None,
'catchup': False
}
# Create class instance
dag_instance = DAGinstance(default_args)
# Run method of the class instance
a = dag_instance.gen_dag()
해당 코드를 작성하면서 이해한 바는 다음과 같다.
1. DAG object는 global variable 단위로 올려야 Airflow UI에서 제대로 인식이 되는것 같다. (참고 LINK)
2. gen_dag()로 구현된 함수는 반환값을 가져야 하며, if __name__ == '__main__' 으로 감싸면 안된다.
3. Airflow set downstream 부분 (>>)은 with 문법을 통해 object 종속성을 구현한다
'Tech 기록지 > Data Engineering' 카테고리의 다른 글
[Data Engineering-9] Python venv install specific version (Windows) (0) | 2024.08.05 |
---|---|
[Data Engineering-7] Ubuntu (Linux) Symbolic Link (0) | 2024.07.30 |
[Data Engineering-6] 윈도우 검색 findstr (0) | 2024.07.26 |
[Data Engineering-5] Airflow 설치 및 실행 테스트 (with postgreSQL) (1) | 2024.07.26 |
[Data Engineering-4] Ubuntu 고정(static) IP 설정하기 (with Netplan) (0) | 2024.07.15 |