[MLOps] Apache Airflow 기능 및 용어 정리

Apache Airflow의 핵심 구성 요소인 DAG, Operator 등을 활용하여 복잡한 데이터 파이프라인을 자동화하는 방법을 소개합니다. 파이썬 코드로 워크플로우를 손쉽게 관리할 수 있습니다.

개요

24일 일요일 운좋게 Airflow 원데이 클래스를 확인하여 수강했습니다.

배웠던 이론들과 실습 코드들을 필요한만큼 정리하였습니다.

이번 주 8월 24일(일) 진행되는 Airflow 무료 특강에 무려 110분이 신청해 주셨습니다. 🙌 링크드인 포스트를 보고 신청해 주신 분들도 많을 텐데, 진심으로 감사드립니다. 🙏 사전 질문도 20개 이상 들어와 오늘부터 차근차근 답변을 달며 준비하고 있습니다. 코드잇 정규 강의에서는 2~3일에 걸쳐 강의하던 내용을 4시간에 압축해서… | Hyunsoo (Ryan) Lee | 10 comments
이번 주 8월 24일(일) 진행되는 Airflow 무료 특강에 무려 110분이 신청해 주셨습니다. 🙌 링크드인 포스트를 보고 신청해 주신 분들도 많을 텐데, 진심으로 감사드립니다. 🙏 사전 질문도 20개 이상 들어와 오늘부터 차근차근 답변을 달며 준비하고 있습니다. 코드잇 정규 강의에서는 2~3일에 걸쳐 강의하던 내용을 4시간에 압축해서 전달하려고 하다 보니 어떻게 하면 효율적인 강의가 가능할까 하는 고민도 생기고, 더 많은 내용들을 알려드리고 싶은 욕심도 생기네요.🤔(PPT 슬라이드가 75장.....!🤯) 남은 기간 동안 꼼꼼히 준비해서 여러분의 주말 4시간이 충분히 보람찬 시간이 될 수 있도록 하겠습니다. 💡 질문에 대한 답변은 잘 정리해서 강의 이후 링크드인에도 공유할 수 있도록 할게요! | 10 comments on LinkedIn

Apache Airflow 용어 정리

DAG(Directed Acyclie Graph)

Airflow에서 하나의 온전한 파이프라인 스크립트를 DAG라고 칭한다. → 순환되는 구조가 없어야한다

Operator

XCom

Sensor

Connection

Jinja Template

Hook

Cron 표현식

Variable

Directory 구조

Console

Notion Image

Airflow Architecture

Notion Image

DAG 스크립트 구조

import pendulum
from airflow.sdk import DAG
from airflow.providers.standard.operators.empty import EmptyOperator

default_args = dict(
    owner = 'bda', # 개별 DAG 관리자
    email = ['bda@airflow.com'],
    email_on_failure = False,
    retries = 3
    )

with DAG(
    dag_id="01_tutorial_dag",
    start_date=pendulum.datetime(2025, 8, 1, tz='Asia/Seoul'),
    schedule="30 10 * * *", # cron 표현식
    tags = ['20250824', 'BASIC'],
    default_args = default_args,
    catchup=False
):

    task1 = EmptyOperator(task_id="task1")
    task2 = EmptyOperator(task_id="task2")
    task3 = EmptyOperator(task_id="task3")
    task4 = EmptyOperator(task_id="task4")
    task5 = EmptyOperator(task_id="task5")

# task1 >> task2 >> task3 >> task4 >> task5
task1 >> [task2, task3] >> task4 >> task5
Notion Image
import pendulum, random
from airflow.sdk import DAG, task
"""
🔸 TaskFlow API
    - 파이썬 함수를 데코레이터로 감싸서 태스크로 변환하는 방식
    - 함수의 반환값이 다음 태스크 함수의 인자로 자동 전달되어 데이터 흐름을 간단히 표현 가능
    - AIRFLOW의 의존성 설정(task1 >> task2) 방식 대신, 함수 호출만으로 태스크 간 종속성 정의 가능
    - Pythonic한 방식으로 코드 작성이 가능하기 때문에 PythonOperator 대신 사용하는 것을 권장!
"""

default_args = dict(
    owner = 'bda',
    email = ['bda@airflow.com'],
    email_on_failure = False,
    retries = 3
    )

with DAG(
    dag_id="03_python_taskflow_dag",
    start_date=pendulum.datetime(2025, 8, 1, tz='Asia/Seoul'),
    schedule="30 10 * * *",
    tags = ['20250824','BASIC'],
    default_args = default_args,
    catchup=False
):
    @task(task_id='select_lang')
    def random_language():
        lang_list = ["PYTHON", 'JAVA', 'RUST']
        lang = random.sample(lang_list, 1)[0]
        print("SELECTED LANGUAGE : ", lang)
        return lang

    # @task(task_id='one')
    # def list_one():
    #     return [1,2,3]

    # @task(task_id='two')
    # def list_two(lst):
    #     return lst + [4,5,6]

    # @task(task_id='three')
    # def list_three(lst):
    #     return lst + [7,8,9]

    select_lang = random_language()
    # one = list_one()
    # two = list_two(one)
    # three = list_three(two)

    select_lang
Notion Image

Return이 없다면 결과적으론 XCom에서 표시되지 않음.

@task(task_id='get_brewery_api')
def get_brewery_api():
    URL = 'https://fakerapi.it/api/v2/users'
    response = requests.get(URL)

    res = response.json()['data']

    return res

@task(task_id='api_to_dataframe')
def api_to_dataframe(api_data):
    df = pd.json_normalize(api_data)

    for row in df.head().to_dict(orient='records'):
        pprint(row)
    print("df.shape : ", df.shape)

Jinja Template을 통해 활용 가능 - get_current_context()

import pendulum
from pprint import pprint
from airflow.sdk import DAG, task, get_current_context

"""
🔸 Airflow의 컨텍스트(Context)
    - 태스크가 실행될 때 DAG, Task, 실행 날짜, 매크로, 파라미터 등 실행 환경 관련 메타데이터를 담고 있는 딕셔너리
    - 시간 관련 데이터도 포함하고 있기 때문에 실행별로 값이 달라짐
    - 'ti' 객체를 활용하여 XCOM 저장소 데이터를 저장/조회 가능
"""

default_args = dict(
    owner = 'bda',
    email = ['bda@airflow.com'],
    email_on_failure = False,
    retries = 3
    )

with DAG(
    dag_id="05_context_check_dag",
    start_date=pendulum.datetime(2025, 8, 1, tz='Asia/Seoul'),
    schedule="30 10 * * *",
    tags = ['20250824','BASIC'],
    default_args = default_args,
    catchup=False
):

    @task(task_id='context_check')
    def context_check():
        ctx = get_current_context()
        pprint(ctx)

    context_check()
Notion Image
Notion Image

Airflow 개발 관련 Tip

진행하고자하는 Operator 기능을 찾고 거기에 맞는 코드를 작성하면 Python에 있는 모든 작업을 자동화할 수 있습니다.