Airflow에는 $AIRFLOW_HOME/plugins 폴더에 파일을 놓으면 외부 기능을 코어에 통합할 수 있는 플러그인 관리자가 내장되어 있습니다. 플러그인 폴더의 Python 모듈을 가져오고, 매크로와 웹 보기가 Airflow의 기본 컬렉션에 통합되어 사용할 수 있습니다.
airflow plugins 명령어: 로드된 플러그인에 대한 정보를 덤프
버전 2.0에서 변경됨: airflow.{operators,sensors,hooks}.<plugin_name>을 통해 플러그인에 추가된 연산자, 센서, 후크 가져오기는 더 이상 지원되지 않으며 이러한 확장은 일반 Python 모듈로 가져와야 합니다.
What for?
조직마다 스택과 요구 사항이 다릅니다. Airflow 플러그인을 사용하면 기업이 생태계를 반영하도록 Airflow 설치를 맞춤설정할 수 있습니다. 플러그인은 새로운 기능 세트를 작성, 공유 및 활성화하는 쉬운 방법으로 사용할 수 있습니다.
Examples
- Hive 로그를 구문 분석하고 Hive 메타데이터(CPU /IO / 단계/ 왜곡 /…)를 노출하는 도구 세트
- 사람들이 지표를 수집하고 임계값과 경고를 설정할 수 있는 이상 탐지 프레임워크
- 누가 무엇에 액세스하는지 이해하는 데 도움이 되는 감사 도구
- 구성 기반 SLA 모니터링 도구를 사용하면 모니터링되는 테이블과 테이블이 착륙해야 하는 시간을 설정하고, 사람들에게 경고하고, 중단에 대한 시각화를 노출할 수 있음
When are plugins (re)loaded? - 로드 시점
- 플러그인은 기본적으로 지연 로드(lazy load)되며, 일단 로드되면 다시 로드되지 않음(단, UI 플러그인은 웹 서버에 자동으로 로드됩니다).
- 각 Airflow 프로세스 시작 시 이를 로드하려면 airflow.cfg에서 [core]lazy_load_plugins = False로 성정해야 함
- 플러그인을 변경하고 웹 서버나 스케줄러가 새 코드를 사용하도록 하려면 해당 프로세스(웹 서버나 스케줄러)를 다시 시작해야 함
기본적으로 작업 실행은 forking을 사용합니다. 이렇게 하면 새로운 Python 인터프리터를 생성하여 Airflow의 모든 코드와 시작 루틴을 다시 파싱하는 것과 관련된 속도 저하를 방지할 수 있습니다.
이는 작업에 플러그인을 사용하고 이를 업데이트하려면 작업자(CeleryExecutor를 사용하는 경우) 또는 스케줄러(로컬 또는 순차 실행기)를 다시 시작해야 합니다.
다른 옵션은 시작 시 속도 저하를 허용하고 core.execute_tasks_new_python_interpreter 구성 설정을 True로 설정하여 작업에 대해 완전히 새로운 Python 인터프리터를 시작할 수 있다는 것입니다.
(반면에 DAG 파일로만 가져온 모듈에는 이 문제가 발생하지 않습니다. DAG 파일은 장기 실행 Airflow 프로세스에서 로드/파싱되지 않기 때문입니다.)
Interface
플러그인을 생성하려면 airflow.plugins_manager.AirflowPlugin 클래스를 파생시키고 Airflow에 연결하려는 객체를 참조해야 합니다. 파생해야 하는 클래스는 다음과 같습니다.
class AirflowPlugin:
# The name of your plugin (str)
name = None
# A list of class(es) derived from BaseHook
hooks = []
# A list of references to inject into the macros namespace
macros = []
# A list of Blueprint object created from flask.Blueprint. For use with the flask_appbuilder based GUI
flask_blueprints = []
# A list of dictionaries containing FlaskAppBuilder BaseView object and some metadata. See example below
appbuilder_views = []
# A list of dictionaries containing kwargs for FlaskAppBuilder add_link. See example below
appbuilder_menu_items = []
# A callback to perform actions when airflow starts and the plugin is loaded.
# NOTE: Ensure your plugin has *args, and **kwargs in the method definition
# to protect against extra parameters injected into the on_load(...)
# function in future changes
def on_load(*args, **kwargs):
# ... perform Plugin boot actions
pass
# A list of global operator extra links that can redirect users to
# external systems. These extra links will be available on the
# task page in the form of buttons.
#
# Note: the global operator extra link can be overridden at each
# operator level.
global_operator_extra_links = []
# A list of operator extra links to override or add operator links
# to existing Airflow Operators.
# These extra links will be available on the task page in form of
# buttons.
operator_extra_links = []
# A list of timetable classes to register so they can be used in DAGs.
timetables = []
# A list of Listeners that plugin provides. Listeners can register to
# listen to particular events that happen in Airflow, like
# TaskInstance state changes. Listeners are python modules.
listeners = []
상속을 통해 파생시킬 수 있습니다(아래 예시 참조). 예제에서는 모든 옵션이 클래스 속성으로 정의되었지만 추가 초기화를 수행해야 하는 경우 속성으로 정의할 수도 있습니다. 이 클래스 내의 이름을 지정해야 합니다.
Examples
# This is the class you derive to create a plugin
from airflow.plugins_manager import AirflowPlugin
from airflow.security import permissions
from airflow.www.auth import has_access
from flask import Blueprint
from flask_appbuilder import expose, BaseView as AppBuilderBaseView
# Importing base classes that we need to derive
from airflow.hooks.base import BaseHook
from airflow.providers.amazon.aws.transfers.gcs_to_s3 import GCSToS3Operator
# Will show up in Connections screen in a future version
class PluginHook(BaseHook):
pass
# Will show up under airflow.macros.test_plugin.plugin_macro
# and in templates through {{ macros.test_plugin.plugin_macro }}
def plugin_macro():
pass
# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
"test_plugin",
__name__,
template_folder="templates", # registers airflow/plugins/templates as a Jinja template folder
static_folder="static",
static_url_path="/static/test_plugin",
)
# Creating a flask appbuilder BaseView
class TestAppBuilderBaseView(AppBuilderBaseView):
default_view = "test"
@expose("/")
@has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
]
)
def test(self):
return self.render_template("test_plugin/test.html", content="Hello galaxy!")
# Creating a flask appbuilder BaseView
class TestAppBuilderBaseNoMenuView(AppBuilderBaseView):
default_view = "test"
@expose("/")
@has_access(
[
(permissions.ACTION_CAN_READ, permissions.RESOURCE_WEBSITE),
]
)
def test(self):
return self.render_template("test_plugin/test.html", content="Hello galaxy!")
v_appbuilder_view = TestAppBuilderBaseView()
v_appbuilder_package = {
"name": "Test View",
"category": "Test Plugin",
"view": v_appbuilder_view,
}
v_appbuilder_nomenu_view = TestAppBuilderBaseNoMenuView()
v_appbuilder_nomenu_package = {"view": v_appbuilder_nomenu_view}
# Creating flask appbuilder Menu Items
appbuilder_mitem = {
"name": "Google",
"href": "https://www.google.com",
"category": "Search",
}
appbuilder_mitem_toplevel = {
"name": "Apache",
"href": "https://www.apache.org/",
}
# Defining the plugin class
class AirflowTestPlugin(AirflowPlugin):
name = "test_plugin"
hooks = [PluginHook]
macros = [plugin_macro]
flask_blueprints = [bp]
appbuilder_views = [v_appbuilder_package, v_appbuilder_nomenu_package]
appbuilder_menu_items = [appbuilder_mitem, appbuilder_mitem_toplevel]
Plugins as Python packages
setuptools 진입점 메커니즘을 통해 플러그인을 로드할 수 있습니다. 이렇게 하려면 패키지의 진입점을 사용하여 플러그인을 연결하세요. 패키지가 설치되면 Airflow는 진입점 목록에서 등록된 플러그인을 자동으로 로드합니다.
# my_package/my_plugin.py
from airflow.plugins_manager import AirflowPlugin
from flask import Blueprint
# Creating a flask blueprint to integrate the templates and static folder
bp = Blueprint(
"test_plugin",
__name__,
template_folder="templates", # registers airflow/plugins/templates as a Jinja template folder
static_folder="static",
static_url_path="/static/test_plugin",
)
class MyAirflowPlugin(AirflowPlugin):
name = "my_namespace"
flask_blueprints = [bp]
from setuptools import setup
setup(
name="my-package",
# ...
entry_points={"airflow.plugins": ["my_plugin = my_package.my_plugin:MyAirflowPlugin"]},
)
Automatic reloading webserver
웹서버 자동 리로딩을 활성화하려면, 플러그인이 포함된 디렉터리의 변경 사항이 감지되었을 때 [webserver] 섹션의 reload_on_plugin_change 옵션을 True로 설정해야 합니다.
Troubleshooting
Flask CLI를 사용하여 문제를 해결할 수 있습니다. 이를 실행하려면 FLASK_APP 변수를 airflow.www.app:create_app 으로 설정해야 합니다.
For example, to print all routes, run:
FLASK_APP=airflow.www.app:create_app flask routes
#AIRFLOW_HOME/plugin/test/plugin_test.py 에 plugin 파일 생성
from airflow.models import BaseOperator
class PluginTestOperator(BaseOperator):
def __init__(self, *args, **kwargs):
super(PluginTestOperator, self).__init__(*args, **kwargs)
def execute(self, context):
print("PluginTest execute()")
from datetime import datetime
import pendulum
from airflow import DAG
from airflow.decorators import task
from airflow.operators.bash import BashOperator
from test.plugin_test import PluginTestOperator
default_args= {
'owner' : 'di',
'start_date': pendulum.today('Asia/Seoul'),
}
# A DAG represents a workflow, a collection of tasks
with DAG(
dag_id="plugin_test",
default_args=default_args,
schedule="0 * * * *"
) as dag:
DT="{{ ds }}"
# Tasks are represented as operators
t1 = BashOperator(
task_id="t1",
bash_command="echo hello",
)
t2 = PluginTestOperator(
task_id="t2"
)
@task()
def airflow():
print("airflow")
# Set dependencies between tasks
airflow() >> t1 >> t2
https://airflow.apache.org/docs/apache-airflow/stable/authoring-and-scheduling/plugins.html
'Bigdata > Airflow' 카테고리의 다른 글
alembic.util.exc.CommandError: Can't locate revision identified by '22ed7efa9da2' (0) | 2024.08.23 |
---|---|
Airflow PYTHONPATH (0) | 2024.05.28 |
Airflow XCcom(Cross-Communication) 정의 및 예시 (0) | 2024.05.23 |
[Airflow]Operator 정의 및 예제 (0) | 2024.05.23 |
Airflow Hook 정의 및 예제 (0) | 2024.05.22 |