본문 바로가기
Bigdata/Airflow

Airflow Plugins

by 빅경 2024. 5. 28.
728x90
반응형

Airflow에는 $AIRFLOW_HOME/plugins 폴더에 파일을 놓으면 외부 기능을 코어에 통합할 수 있는 플러그인 관리자가 내장되어 있습니다. 플러그인 폴더의 Python 모듈을 가져오고, 매크로와 웹 보기가 Airflow의 기본 컬렉션에 통합되어 사용할 수 있습니다.

airflow plugins 명령어: 로드된 플러그인에 대한 정보를 덤프

버전 2.0에서 변경됨: airflow.{operators,sensors,hooks}.<plugin_name>을 통해 플러그인에 추가된 연산자, 센서, 후크 가져오기는 더 이상 지원되지 않으며 이러한 확장은 일반 Python 모듈로 가져와야 합니다.

What for?

조직마다 스택과 요구 사항이 다릅니다. Airflow 플러그인을 사용하면 기업이 생태계를 반영하도록 Airflow 설치를 맞춤설정할 수 있습니다. 플러그인은 새로운 기능 세트를 작성, 공유 및 활성화하는 쉬운 방법으로 사용할 수 있습니다.

Examples

  • Hive 로그를 구문 분석하고 Hive 메타데이터(CPU /IO / 단계/ 왜곡 /…)를 노출하는 도구 세트
  • 사람들이 지표를 수집하고 임계값과 경고를 설정할 수 있는 이상 탐지 프레임워크
  • 누가 무엇에 액세스하는지 이해하는 데 도움이 되는 감사 도구
  • 구성 기반 SLA 모니터링 도구를 사용하면 모니터링되는 테이블과 테이블이 착륙해야 하는 시간을 설정하고, 사람들에게 경고하고, 중단에 대한 시각화를 노출할 수 있음

When are plugins (re)loaded? - 로드 시점

  • 플러그인은 기본적으로 지연 로드(lazy load)되며, 일단 로드되면 다시 로드되지 않음(단, UI 플러그인은 웹 서버에 자동으로 로드됩니다).
  • 각 Airflow 프로세스 시작 시 이를 로드하려면 airflow.cfg에서 [core]lazy_load_plugins = False로 성정해야 함
  • 플러그인을 변경하고 웹 서버나 스케줄러가 새 코드를 사용하도록 하려면 해당 프로세스(웹 서버나 스케줄러)를 다시 시작해야 함

 

기본적으로 작업 실행은 forking을 사용합니다. 이렇게 하면 새로운 Python 인터프리터를 생성하여 Airflow의 모든 코드와 시작 루틴을 다시 파싱하는 것과 관련된 속도 저하를 방지할 수 있습니다.

이는 작업에 플러그인을 사용하고 이를 업데이트하려면 작업자(CeleryExecutor를 사용하는 경우) 또는 스케줄러(로컬 또는 순차 실행기)를 다시 시작해야 합니다.

다른 옵션은 시작 시 속도 저하를 허용하고 core.execute_tasks_new_python_interpreter 구성 설정을 True로 설정하여 작업에 대해 완전히 새로운 Python 인터프리터를 시작할 수 있다는 것입니다.

(반면에 DAG 파일로만 가져온 모듈에는 이 문제가 발생하지 않습니다. DAG 파일은 장기 실행 Airflow 프로세스에서 로드/파싱되지 않기 때문입니다.)

Interface

플러그인을 생성하려면 airflow.plugins_manager.AirflowPlugin 클래스를 파생시키고 Airflow에 연결하려는 객체를 참조해야 합니다. 파생해야 하는 클래스는 다음과 같습니다.

class AirflowPlugin:
    # The name of your plugin (str)
    name = None
    # A list of class(es) derived from BaseHook
    hooks = []
    # A list of references to inject into the macros namespace
    macros = []
    # A list of Blueprint object created from flask.Blueprint. For use with the flask_appbuilder based GUI
    flask_blueprints = []
    # A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
    appbuilder_views = []
    # A list of dictionaries containing kwargs for FlaskAppBuilder add_link. See example below
    appbuilder_menu_items = []

    # A callback to perform actions when airflow starts and the plugin is loaded.
    # NOTE: Ensure your plugin has *args, and **kwargs in the method definition
    #   to protect against extra parameters injected into the on_load(...)
    #   function in future changes
    def on_load(*args, **kwargs):
        # ... perform Plugin boot actions
        pass

    # A list of global operator extra links that can redirect users to
    # external systems. These extra links will be available on the
    # task page in the form of buttons.
    #
    # Note: the global operator extra link can be overridden at each
    # operator level.
    global_operator_extra_links = []

    # A list of operator extra links to override or add operator links
    # to existing Airflow Operators.
    # These extra links will be available on the task page in form of
    # buttons.
    operator_extra_links = []

    # A list of timetable classes to register so they can be used in DAGs.
    timetables = []

    # A list of Listeners that plugin provides. Listeners can register to
    # listen to particular events that happen in Airflow, like
    # TaskInstance state changes. Listeners are python modules.
    listeners = []

상속을 통해 파생시킬 수 있습니다(아래 예시 참조). 예제에서는 모든 옵션이 클래스 속성으로 정의되었지만 추가 초기화를 수행해야 하는 경우 속성으로 정의할 수도 있습니다. 이 클래스 내의 이름을 지정해야 합니다.

 

 

Examples

# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access

from flask import Blueprint
from flask_appbuilder import expose, BaseView as AppBuilderBaseView

# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator


# Will show up in Connections screen in a future version
class PluginHook(BaseHook):
    pass


# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
    pass


# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
    static_folder="static",
    static_url_path="/static/test_plugin",
)


# Creating a flask appbuilder BaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


# Creating a flask appbuilder BaseView
class TestAppBuilderBaseNoMenuView(AppBuilderBaseView):
    default_view = "test"

    @expose("/")
    @has_access(
        [
            (permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
        ]
    )
    def test(self):
        return self.render_template("test_plugin/test.html", content="Hello galaxy!")


v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
    "name": "Test View",
    "category": "Test Plugin",
    "view": v_appbuilder_view,
}

v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView()
v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view}

# Creating flask appbuilder Menu Items
appbuilder_mitem = {
    "name": "Google",
    "href": "https://www.google.com",
    "category": "Search",
}
appbuilder_mitem_toplevel = {
    "name": "Apache",
    "href": "https://www.apache.org/",
}


# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
    name = "test_plugin"
    hooks = [PluginHook]
    macros = [plugin_macro]
    flask_blueprints = [bp]
    appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package]
    appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]

 

 

Plugins as Python packages

setuptools 진입점 메커니즘을 통해 플러그인을 로드할 수 있습니다. 이렇게 하려면 패키지의 진입점을 사용하여 플러그인을 연결하세요. 패키지가 설치되면 Airflow는 진입점 목록에서 등록된 플러그인을 자동으로 로드합니다.

# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint

# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
    "test_plugin",
    __name__,
    template_folder="templates",  # registers airflow/plugins/templates as a Jinja template folder
    static_folder="static",
    static_url_path="/static/test_plugin",
)


class MyAirflowPlugin(AirflowPlugin):
    name = "my_namespace"
    flask_blueprints = [bp]
from setuptools import setup

setup(
    name="my-package",
    # ...
    entry_points={"airflow.plugins": ["my_plugin = my_package.my_plugin:MyAirflowPlugin"]},
)

 

Automatic reloading webserver

웹서버 자동 리로딩을 활성화하려면, 플러그인이 포함된 디렉터리의 변경 사항이 감지되었을 때 [webserver] 섹션의 reload_on_plugin_change 옵션을 True로 설정해야 합니다.

Troubleshooting

Flask CLI를 사용하여 문제를 해결할 수 있습니다. 이를 실행하려면 FLASK_APP 변수를 airflow.www.app:create_app 으로 설정해야 합니다.

For example, to print all routes, run:

FLASK_APP=airflow.www.app:create_app flask routes

 

#AIRFLOW_HOME/plugin/test/plugin_test.py 에 plugin 파일 생성

from airflow.models import BaseOperator

class PluginTestOperator(BaseOperator):

    def __init__(self, *args, **kwargs):
        super(PluginTestOperator, self).__init__(*args, **kwargs)

    def execute(self, context):
        print("PluginTest execute()")

 

from datetime import datetime
import pendulum

from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator

from test.plugin_test import PluginTestOperator

default_args= {
    'owner' : 'di',
    'start_date': pendulum.today('Asia/Seoul'),
}

# A DAG represents a workflow, a collection of tasks
with DAG(
    dag_id="plugin_test",
    default_args=default_args,
    schedule="0 * * * *"
) as dag:
    DT="{{ ds }}"
    # Tasks are represented as operators
    t1 = BashOperator(
            task_id="t1",          
            bash_command="echo hello",
    )

    t2 = PluginTestOperator(
            task_id="t2"
    )
    
    @task()
    def airflow():
        print("airflow")

    # Set dependencies between tasks
    airflow() >> t1 >> t2

 

https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html

728x90
반응형