[MLOps] 대용량 데이터셋을 다루는 여러가지 도구(Spark, Kafka, Flink)

Apache Spark, Kafka, Flink를 이용해 대규모 데이터와 실시간 스트림을 빠르고 안정적으로 처리하여 비즈니스 가치를 만드는 핵심 기술을 알아봅니다.

개요

소규모, 중규모의 데이터를 넘어 기가, 페타바이트 단위의 큰 데이터셋을 다루는 방법을 정리하였습니다.

Large Datasets - Apache Spark

실제 사용 사례에서는 수천개의 파일과 대용량 크기의 데이터를 다루는 일이 빈번합니다.

1. 대규모 데이터셋과 분산 처리

2. RDD (Resilient Distributed Datasets)

3. 인메모리(In-memory) 컴퓨팅과 속도

4. 실시간 처리와 머신러닝(ML) 통합

5. 통합 API와 유연성 💻

Streaming Datasets - Kafka & Apache Flink

실시간 스트리밍 데이터를 다루는 도구를 알아봅니다.

Kafka에서 Apache Flink를 통해 데이터를 실시간으로 집계

집계된 데이터는 데이터베이스 내 다른 데이터간 집계 가능

Apache kafka

실시간 데이터 스트림을 관리하는 도구로 사용하며, 실시간 데이터 스트림을 통해 낮은 지연 시간 처리와 높은 처리량을 제공합니다.

따라서 데이터 일관성과 속도가 중요한 애플리케이션에 많이 사용되고 있습니다.

Kafka가 실시간으로 수집한다면, 이를 처리할 강력한 도구가 필요합니다.

Apache Kafka Demo

기본적인 설정을 구현해보고 Kafka가 메시지 버스로 어떻게 작동하는지 확인합니다.

  1. 환경 설정을 진행합니다.
sudo apt update
sudo apt install -y python3-pip python3-venv

python3 -m venv kafka_venv
source kafka_venv/bin/activate
  1. Kafka 실행
version: '3'
services:
  # Zookeeper service
  zookeeper:
    image: confluentinc/cp-zookeeper:7.6.6
    container_name: admin-zookeeper-1
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - "2181:2181"

  # Kafka service
  kafka:
    image: confluentinc/cp-kafka:7.6.6
    container_name: admin-kafka-1
    depends_on:
      - zookeeper
    ports:
      - "9092:9092" # Kafka port exposed to the host port
    environment:
      KAFKA_BROKER_ID: 1
      # Zookeeper connection zookeeper service name and port
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      # Kafka advertised listeners
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://localhost:9092
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 0
  1. Topic 생성하기
docker exec admin-kafka-1 kafka-topics \
  --create \
  --topic sample-topic \
  --bootstrap-server localhost:9092 \
  --partitions 1 \
  --replication-factor 1
  1. 생산자 샘플 코드
"""
This script is a Python producer that sends messages to a Kafka topic
"""
#!/usr/bin/env python3

from kafka import KafkaProducer
import json
import time
from datetime import datetime

def create_producer():
    """
    Create and return a Kafka producer with JSON serialization.

    Returns:
        KafkaProducer: A configured Kafka producer that serializes messages to JSON.
    """
    return KafkaProducer(
        bootstrap_servers=['localhost:9092'],
        value_serializer=lambda x: json.dumps(x).encode('utf-8')
    )

def generate_message():
    """
    Generate a sample message with a timestamp and a random value.

    Returns:
        dict: A dictionary containing a timestamp and a random value.
    """
    return {
        'timestamp': datetime.now().strftime('%Y-%m-%d %H:%M:%S'),
        'value': round(time.time() % 100, 2)
    }

def main():
    """
    Main function to create a Kafka producer and continuously send messages.

    Sends messages to a Kafka topic every second until interrupted.
    Handles keyboard interrupt to gracefully close the producer.
    """
    producer = create_producer()
    topic_name = 'sample-topic'

    try:
        while True:
            # Generate and send a message to the Kafka topic
            message = generate_message()
            producer.send(topic_name, value=message)
            print(f"Produced message: {message}")
            time.sleep(1)

    except KeyboardInterrupt:
        print("Stopping producer...")
        producer.close()

if __name__ == "__main__":
    main()
Notion Image