본문 바로가기
Bigdata/Airflow

Airflow Hook 정의 및 예제

by 빅경 2024. 5. 22.
728x90
반응형

Airflow에서 Hook은 외부 시스템(데이터베이스, 클라우드 서비스, API 등)과 통신하기 위한 인터페이스를 제공하는 추상화된 API입니다. Airflow Hook은 주로 Operator와 함께 사용되며, DAG 코드에서 직접 사용할 수도 있습니다.

주요 특징

  • 외부 시스템(데이터베이스, API 등)과 상호작용하기 위한 인터페이스를 제공하는 모듈로, 연결을 표준화
    • 예를 들어 MySqlHook, S3Hook, SlackHook 등
  • Hook은 Operator 내부에서 사용되거나 DAG 코드에서 직접 사용될 수 있음
  • 외부 시스템에 연결하기 위해 일반적으로 Connection ID만 필요
  • BaseHook 클래스를 상속받아 외부 시스템 연결 로직을 구현

주요 기능

  • 연결 설정: 외부 시스템과의 연결을 설정하고 유지함
  • 데이터 전송: 데이터베이스 쿼리 실행, 파일 업로드 및 다운로드, API 호출 등의 작업을 수행함
  • 인증 관리: OAuth, API 키 등 다양한 인증 방식을 지원함

작성된 커스텀 Hook은 Airflow의 Operator에서 사용될 수 있습니다.
Airflow Hook을 활용하면 다양한 외부 시스템과의 상호작용을 간편하게 처리할 수 있으며, 이를 통해 워크플로우를 더욱 유연하고 강력하게 구성할 수 있습니다.


예시: MySQL Hook

MySQL 데이터베이스에 연결하여 데이터를 조회하거나 조작할 때 사용되는 MySQL Hook 예시입니다.

from airflow.hooks.mysql_hook import MySqlHook

def fetch_data_from_mysql():
    mysql_hook = MySqlHook(mysql_conn_id='my_mysql_connection')
    connection = mysql_hook.get_conn()
    cursor = connection.cursor()
    cursor.execute("SELECT * FROM my_table")
    result = cursor.fetchall()
    return result

 

 

예시: S3 Hook

Amazon S3에 파일을 업로드하거나 다운로드할 때 사용되는 S3 Hook 예시입니다.

from airflow.hooks.S3_hook import S3Hook

def upload_to_s3(bucket_name, key, file_path):
    s3_hook = S3Hook(aws_conn_id='my_aws_connection')
    s3_hook.load_file(filename=file_path, bucket_name=bucket_name, replace=True, key=key)

def download_from_s3(bucket_name, key, file_path):
    s3_hook = S3Hook(aws_conn_id='my_aws_connection')
    s3_hook.download_file(key=key, bucket_name=bucket_name, local_path=file_path)

 

커스텀 Hook 작성

필요에 따라 커스텀 Hook을 작성할 수도 있습니다. 예를 들어, 특정 API와 통신하기 위한 Hook을 만들 수 있습니다.

from airflow.hooks.base import BaseHook
import requests

class CustomApiHook(BaseHook):

    def __init__(self, api_conn_id):
        super().__init__()
        self.api_conn_id = api_conn_id

    def get_conn(self):
        connection = self.get_connection(self.api_conn_id)
        return connection

    def make_request(self, endpoint):
        connection = self.get_conn()
        url = f"{connection.host}/{endpoint}"
        response = requests.get(url, headers={"Authorization": f"Bearer {connection.password}"})
        return response.json()


감사합니다.

728x90
반응형

'Bigdata > Airflow' 카테고리의 다른 글

Airflow Plugins  (0) 2024.05.28
Airflow PYTHONPATH  (0) 2024.05.28
Airflow XCcom(Cross-Communication) 정의 및 예시  (0) 2024.05.23
[Airflow]Operator 정의 및 예제  (0) 2024.05.23
Airflow 설치 및 Tutorial 실행  (0) 2024.05.20