본문 바로가기
Bigdata/Airflow

[Airflow]Operator 정의 및 예제

by 빅경 2024. 5. 23.
728x90
반응형

Airflow에서 Operator는 작업(Task)을 정의하고 실행하는 객체입니다. Operator는 DAG (Directed Acyclic Graph)의 노드로서, 작업의 실행 논리를 정의하며, 데이터 처리, 시스템 운영, 모니터링 등 다양한 작업을 정의하고 실행하는 데 사용됩니다.

 

Operator의 주요 특징

  • Operator는 단일 작업을 정의하고 실행함.
    • 예를 들어 BashOperator는 Bash 명령을 실행하고, PythonOperator는 Python 함수를 실행함.
  •  Operator는 BaseOperator 클래스를 상속받아 구현되며, Airflow에는 다양한 기본 제공 Operator가 있으며, 사용자 정의 Operator도 작성할 수 있음.
    • 예를 들어 BashOperator, PythonOperator, MySqlOperator 등이 있음.
  • Operator는 Jinja 템플릿을 지원하므로 동적으로 값을 전달할 수 있음.
  • 하나의 Operator 인스턴스가 하나의 Task를 구성함.

 

주요 Operator 유형

BashOperator:

쉘 명령어를 실행할 때 사용합니다.

from airflow.operators.bash import BashOperator

bash_task = BashOperator(
    task_id='run_bash_command',
    bash_command='echo "Hello, Airflow!"',
    dag=dag,
)

 

PythonOperator:

Python 함수를 실행할 때 사용합니다.

from airflow.operators.python import PythonOperator

def my_function():
    print("Hello, Airflow!")

python_task = PythonOperator(
    task_id='run_python_function',
    python_callable=my_function,
    dag=dag,
)

 

EmailOperator:

이메일을 보낼 때 사용합니다.

from airflow.operators.email import EmailOperator

email_task = EmailOperator(
    task_id='send_email',
    to='example@example.com',
    subject='Airflow Alert',
    html_content='This is a test email sent from Airflow!',
    dag=dag,
)

 

 

S3Operator:

AWS S3와 상호작용할 때 사용합니다.

from airflow.operators.email import EmailOperator

email_task = EmailOperator(
    task_id='send_email',
    to='example@example.com',
    subject='Airflow Alert',
    html_content='This is a test email sent from Airflow!',
    dag=dag,
)

 

MySqlOperator:

MySQL 데이터베이스 쿼리를 실행할 때 사용합니다.

from airflow.providers.amazon.aws.transfers.local_to_s3 import LocalFilesystemToS3Operator

upload_task = LocalFilesystemToS3Operator(
    task_id='upload_file_to_s3',
    filename='/path/to/file',
    bucket_name='my-bucket',
    key='my-file',
    dag=dag,
)

 

커스텀 Operator 작성

Airflow에서 제공하는 기본 Operator 외에도 필요에 따라 커스텀 Operator를 작성할 수 있습니다. 커스텀 Operator를 작성하면 특정 요구 사항에 맞는 기능을 구현할 수 있습니다.

from airflow.models import BaseOperator
from airflow.utils.decorators import apply_defaults

class CustomOperator(BaseOperator):

    @apply_defaults
    def __init__(self, my_param, *args, **kwargs):
        super(CustomOperator, self).__init__(*args, **kwargs)
        self.my_param = my_param

    def execute(self, context):
        print(f"Custom parameter value: {self.my_param}")

custom_task = CustomOperator(
    task_id='run_custom_task',
    my_param='Hello, custom parameter!',
    dag=dag,
)

 

DAG에서의 Operator 사용 예시

Operator는 DAG 내에서 정의되며, 서로 의존 관계를 설정할 수 있습니다.

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from datetime import datetime

dag = DAG(
    'example_dag',
    default_args={
        'owner': 'airflow',
        'start_date': datetime(2023, 5, 20),
        'retries': 1,
    },
    schedule_interval='@daily',
)

start = DummyOperator(task_id='start', dag=dag)
end = DummyOperator(task_id='end', dag=dag)

start >> bash_task >> python_task >> email_task >> end

 

위 예시에서는 start에서 시작하여 bash_task, python_task, email_task를 차례로 실행하고 end에서 마무리하는 흐름을 보여줍니다.

 

Apache Airflow의 Operator는 워크플로우의 각 단계를 정의하고 자동화된 작업을 수행하는 핵심 요소로, 다양한 작업을 손쉽게 구현할 수 있는 유연성을 제공합니다.

 

728x90
반응형

'Bigdata > Airflow' 카테고리의 다른 글

Airflow Plugins  (0) 2024.05.28
Airflow PYTHONPATH  (0) 2024.05.28
Airflow XCcom(Cross-Communication) 정의 및 예시  (0) 2024.05.23
Airflow Hook 정의 및 예제  (0) 2024.05.22
Airflow 설치 및 Tutorial 실행  (0) 2024.05.20