728x90
반응형
Apache Airflow에서 XCom (Cross-Communication)은 작업 간의 데이터를 교환하는 데 사용되는 메커니즘입니다. XCom은 DAG의 태스크 간에 작은 데이터 조각을 공유하거나 전달하는 데 유용합니다. 이를 통해 한 태스크의 출력 데이터를 다른 태스크의 입력 데이터로 사용할 수 있습니다.
주요 특징
- XCom의 저장 및 검색:
- XCom은 키-값 쌍의 형태로 데이터베이스에 저장됩니다.
- 각 XCom은 key, value, timestamp, task_id, dag_id 등의 메타데이터와 함께 저장됩니다.
- XCom 푸시:
- xcom_push 메서드를 사용하여 데이터를 푸시할 수 있습니다.
- 일반적으로 PythonOperator 내에서 return 문을 사용하면 자동으로 XCom에 푸시됩니다.
- XCom 풀:
- xcom_pull 메서드를 사용하여 데이터를 가져올 수 있습니다.
- 특정 task_id와 key를 지정하여 데이터를 가져옵니다.
- 자동 XCom 푸시:
- PythonOperator의 반환 값은 자동으로 XCom에 저장됩니다.
XCom 사용 시 유의사항
- 데이터 크기:
- XCom은 작은 데이터를 교환하는 데 적합합니다. 큰 데이터는 저장과 검색이 비효율적일 수 있으므로 주의해야 합니다.
- 보안:
- XCom을 사용하여 민감한 정보를 주고받을 때는 주의해야 합니다. 데이터베이스에 평문으로 저장되기 때문에 보안에 취약할 수 있습니다.
- 성능:
- 너무 많은 XCom 데이터를 사용하거나 자주 저장하면 Airflow 메타데이터 데이터베이스에 부하를 줄 수 있습니다.
사용방법
from airflow import DAG
from airflow.operators.python import PythonOperator
from datetime import datetime
def push_function(**kwargs):
kwargs['ti'].xcom_push(key='my_key', value='This is an XCom message')
def pull_function(**kwargs):
ti = kwargs['ti']
message = ti.xcom_pull(task_ids='push_task', key='my_key')
print(f"Pulled XCom message: {message}")
dag = DAG(
'xcom_full_example',
default_args={'owner': 'airflow', 'start_date': datetime(2023, 1, 1)},
schedule_interval='@daily',
)
push_task = PythonOperator(
task_id='push_task',
python_callable=push_function,
provide_context=True,
dag=dag,
)
pull_task = PythonOperator(
task_id='pull_task',
python_callable=pull_function,
provide_context=True,
dag=dag,
)
push_task >> pull_task
위 예제에서 push_function은 XCom에 메시지를 푸시하고, pull_function은 해당 메시지를 풀하여 출력합니다. 이처럼 XCom을 활용하면 작업 간 데이터를 쉽게 공유하고 전달할 수 있습니다.
728x90
반응형
'Bigdata > Airflow' 카테고리의 다른 글
Airflow Plugins (0) | 2024.05.28 |
---|---|
Airflow PYTHONPATH (0) | 2024.05.28 |
[Airflow]Operator 정의 및 예제 (0) | 2024.05.23 |
Airflow Hook 정의 및 예제 (0) | 2024.05.22 |
Airflow 설치 및 Tutorial 실행 (0) | 2024.05.20 |