[MLOps] Data Pipeline Orchestration - Airflow, Perfect

데이터 오케스트레이션 툴은 복잡한 파이프라인의 종속성, 에러, 모니터링 과제를 해결합니다. 워크플로우를 자동화하여 안정적이고 효율적인 데이터 흐름을 구축할 수 있습니다.

개요

데이터 기반의 의사결정이 중요해지면서 수많은 데이터 파이프라인을 구축하고 운영하는 것은 기업의 핵심 과제가 되었습니다. 하지만 이 파이프라인들을 각자 관리하고 실행시키면 매우 복잡하고 방대한 작업을 필요로 합니다.

데이터 오케스트레이션 툴은 바로 이 복잡하게 얽힌 데이터 파이프라인의 흐름을 제어하는 역할을 수행하며, 전체 워크플로우를 안정적이고 효율적으로 관리합니다.

데이터 파이프라인의 도전 과제

데이터의 양과 처리 로직이 복잡해지면서 다음과 같은 도전 과제에 직면하게 됩니다.

데이터 오케스트레이션 툴의 핵심 기능과 장점

데이터 오케스트레이션 툴은 위와 같은 문제들을 해결하기 위해 다음과 같은 강력한 기능들을 제공합니다.

대표적인 툴: Airflow & Perfect

가장 널리 알려진 오픈소스 데이터 오케스트레이션 툴 중 하나는 Airflow입니다.

Airflow는 DAG(Directed Acyclic Graph, 방향성 비순환 그래프) 라는 핵심 개념을 사용하여 워크플로우를 정의합니다. 각 작업(Task)과 그들 사이의 실행 순서 및 의존 관계를 그래프 형태로 명확하게 표현합니다.

이 DAG 설계도를 통해 사용자는 어떤 작업들이 순차적으로 실행되어야 하고, 어떤 작업들이 동시에 병렬로 처리될 수 있는지 한눈에 파악할 수 있으며, 전체 파이프라인을 체계적으로 시각화하고 관리할 수 있습니다.

Airflow는 이 DAG를 기반으로 풍부한 UI, 강력한 스케줄링 및 모니터링 기능을 제공하여 복잡한 데이터 워크플로우를 안정적으로 운영할 수 있도록 지원합니다.

Airflow Demo

from airflow import DAG
from airflow.operators.dummy import DummyOperator
from airflow.operators.python import PythonOperator
from airflow.utils.dates import days_ago
from airflow.operators.email import EmailOperator
import random
import time

# Function to generate random IoT data
def generate_iot_data(**kwargs):
    data = []
    for _ in range(60):  # 60 seconds x 5 minutes = 300 readings (1 every second)
        data.append(random.choice([0, 1]))
        time.sleep(1)  # simulate 1-second intervals
    return data

# Function to aggregate the IoT data
def aggregate_machine_data(**kwargs):
    ti = kwargs['ti']
    data = ti.xcom_pull(task_ids='getting_iot_data')
    count_0 = data.count(0)
    count_1 = data.count(1)
    aggregated_data = {'count_0': count_0, 'count_1': count_1}
    return aggregated_data

# Email content generation
def create_email_content(**kwargs):
    ti = kwargs['ti']
    aggregated_data = ti.xcom_pull(task_ids='aggrigate_machine_data')
    return f"Aggregated IoT Data:\nCount of 0: {aggregated_data['count_0']}\nCount of 1: {aggregated_data['count_1']}"

# Default arguments for the DAG
default_args = {
    'owner': 'airflow',
    'start_date': days_ago(1),
    'email_on_failure': False,
    'email_on_retry': False,
    'retries': 1,
}

# Define the DAG
with DAG(
    dag_id='iot_data_pipeline',
    default_args=default_args,
    schedule_interval=None,
    catchup=False,
) as dag:

    start_task = DummyOperator(task_id='start_task')

    getting_iot_data = PythonOperator(
        task_id='getting_iot_data',
        python_callable=generate_iot_data,
    )

    aggregate_machine_data = PythonOperator(
        task_id='aggregate_machine_data',
        python_callable=aggregate_machine_data,
    )

    end_task = DummyOperator(task_id='end_task')

    # Task dependencies
    start_task >> getting_iot_data >> aggregate_machine_data >> end_task
Notion Image
Notion Image