728x90
반응형
Airflow에서 Operator는 작업(Task)을 정의하고 실행하는 객체입니다. Operator는 DAG (Directed Acyclic Graph)의 노드로서, 작업의 실행 논리를 정의하며, 데이터 처리, 시스템 운영, 모니터링 등 다양한 작업을 정의하고 실행하는 데 사용됩니다.
Operator의 주요 특징
- Operator는 단일 작업을 정의하고 실행함.
- 예를 들어 BashOperator는 Bash 명령을 실행하고, PythonOperator는 Python 함수를 실행함.
- Operator는 BaseOperator 클래스를 상속받아 구현되며, Airflow에는 다양한 기본 제공 Operator가 있으며, 사용자 정의 Operator도 작성할 수 있음.
- 예를 들어 BashOperator, PythonOperator, MySqlOperator 등이 있음.
- Operator는 Jinja 템플릿을 지원하므로 동적으로 값을 전달할 수 있음.
- 하나의 Operator 인스턴스가 하나의 Task를 구성함.
주요 Operator 유형
BashOperator:
쉘 명령어를 실행할 때 사용합니다.
from airflow.operators.bash import BashOperator
bash_task = BashOperator(
task_id='run_bash_command',
bash_command='echo "Hello, Airflow!"',
dag=dag,
)
PythonOperator:
Python 함수를 실행할 때 사용합니다.
from airflow.operators.python import PythonOperator
def my_function():
print("Hello, Airflow!")
python_task = PythonOperator(
task_id='run_python_function',
python_callable=my_function,
dag=dag,
)
EmailOperator:
이메일을 보낼 때 사용합니다.
from airflow.operators.email import EmailOperator
email_task = EmailOperator(
task_id='send_email',
to='example@example.com',
subject='Airflow Alert',
html_content='This is a test email sent from Airflow!',
dag=dag,
)
S3Operator:
AWS S3와 상호작용할 때 사용합니다.
from airflow.operators.email import EmailOperator
email_task = EmailOperator(
task_id='send_email',
to='example@example.com',
subject='Airflow Alert',
html_content='This is a test email sent from Airflow!',
dag=dag,
)
MySqlOperator:
MySQL 데이터베이스 쿼리를 실행할 때 사용합니다.
from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator
upload_task = LocalFilesystemToS3Operator(
task_id='upload_file_to_s3',
filename='/path/to/file',
bucket_name='my-bucket',
key='my-file',
dag=dag,
)
커스텀 Operator 작성
Airflow에서 제공하는 기본 Operator 외에도 필요에 따라 커스텀 Operator를 작성할 수 있습니다. 커스텀 Operator를 작성하면 특정 요구 사항에 맞는 기능을 구현할 수 있습니다.
from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults
class CustomOperator(BaseOperator):
@apply_defaults
def __init__(self, my_param, *args, **kwargs):
super(CustomOperator, self).__init__(*args, **kwargs)
self.my_param = my_param
def execute(self, context):
print(f"Custom parameter value: {self.my_param}")
custom_task = CustomOperator(
task_id='run_custom_task',
my_param='Hello, custom parameter!',
dag=dag,
)
DAG에서의 Operator 사용 예시
Operator는 DAG 내에서 정의되며, 서로 의존 관계를 설정할 수 있습니다.
from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime
dag = DAG(
'example_dag',
default_args={
'owner': 'airflow',
'start_date': datetime(2023, 5, 20),
'retries': 1,
},
schedule_interval='@daily',
)
start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)
start >> bash_task >> python_task >> email_task >> end
위 예시에서는 start에서 시작하여 bash_task, python_task, email_task를 차례로 실행하고 end에서 마무리하는 흐름을 보여줍니다.
Apache Airflow의 Operator는 워크플로우의 각 단계를 정의하고 자동화된 작업을 수행하는 핵심 요소로, 다양한 작업을 손쉽게 구현할 수 있는 유연성을 제공합니다.
728x90
반응형
'Bigdata > Airflow' 카테고리의 다른 글
Airflow Plugins (0) | 2024.05.28 |
---|---|
Airflow PYTHONPATH (0) | 2024.05.28 |
Airflow XCcom(Cross-Communication) 정의 및 예시 (0) | 2024.05.23 |
Airflow Hook 정의 및 예제 (0) | 2024.05.22 |
Airflow 설치 및 Tutorial 실행 (0) | 2024.05.20 |