이번 편에서 다루는 것
AI 추천 시스템의 첫 번째 단계는 데이터 확보다. 아무리 좋은 알고리즘도 데이터가 없으면 무용지물이다.
이번 편에서는 외부 학사 API에서 데이터를 가져와 PostgreSQL에 저장하는 파이프라인을 어떻게 설계하고 구현했는지 다룬다.
데이터 소스 분석
필요한 데이터
추천 시스템에 필요한 데이터를 먼저 정리했다.
데이터 용도 특징
| 학생 정보 | 추천 대상 식별, 학과 정보 | 5,000건+, 학기마다 변동 |
| 강좌 정보 | 추천 후보군 | 2,000건+, 매 학기 갱신 |
| 학과 정보 | 학과 기반 필터링 | 50건, 거의 고정 |
| 비교과 활동 | 추천 후보군 | 500건+, 수시 등록 |
| 관심분야 | 개인화 요소 | 100건, 고정 |
| 학생별 관심분야 | 개인화 매칭 | 10,000건+, 학생이 직접 선택 |
외부 API 구조
학사 시스템은 REST API를 제공했다. 각 데이터별로 별도 엔드포인트가 있었다.
GET /api/students # 학생 정보
GET /api/courses # 강좌 정보
GET /api/departments # 학과 정보
GET /api/activities # 비교과 활동
GET /api/interests # 관심분야 목록
GET /api/user-settings # 학생별 관심분야 선택
첫 번째 도전: 불안정한 외부 API
문제 상황
외부 API를 호출하면서 여러 문제를 마주했다.
- 간헐적 타임아웃: 네트워크 상태에 따라 응답이 늦어짐
- 일시적 서버 오류: 500 에러가 가끔 발생
- 대용량 응답: 학생 5,000명 데이터를 한 번에 못 가져옴
해결: 재시도 로직 구현
일시적 오류에 대응하기 위해 Exponential Backoff 방식의 재시도 로직을 구현했다.
import time
import requests
from typing import Optional, Dict, Any
class APIClient:
def __init__(self, base_url: str, api_key: str):
self.base_url = base_url
self.api_key = api_key
self.max_retries = 3
self.base_delay = 1 # 초
def _request_with_retry(
self,
endpoint: str,
params: Optional[Dict] = None
) -> Dict[str, Any]:
"""재시도 로직이 포함된 API 요청"""
last_exception = None
for attempt in range(self.max_retries):
try:
response = requests.get(
f"{self.base_url}{endpoint}",
params=params,
headers={"Authorization": f"Bearer {self.api_key}"},
timeout=30
)
response.raise_for_status()
return response.json()
except (requests.Timeout, requests.HTTPError) as e:
last_exception = e
if attempt < self.max_retries - 1:
# Exponential Backoff: 1초, 2초, 4초...
delay = self.base_delay * (2 ** attempt)
print(f"요청 실패, {delay}초 후 재시도 ({attempt + 1}/{self.max_retries})")
time.sleep(delay)
raise last_exception
설계 의도:
- 첫 실패 후 1초, 두 번째 실패 후 2초, 세 번째 실패 후 4초 대기
- 일시적 오류는 대부분 3회 내에 복구됨
- 지수적 증가로 서버에 부담을 주지 않음
두 번째 도전: 대용량 데이터 처리
문제 상황
학생 데이터 5,000건을 한 번에 요청하면:
- 응답 시간이 너무 길어 타임아웃 발생
- 메모리 사용량 급증
- 중간에 실패하면 처음부터 다시 시작
해결: 자동 페이징 처리
API가 페이징을 지원했기에 이를 활용했다.
from typing import Generator, Dict, Any
class APIClient:
# ... (이전 코드)
def fetch_all(self, endpoint: str) -> Generator[Dict[str, Any], None, None]:
"""페이징을 자동 처리하며 전체 데이터를 가져오는 제너레이터"""
page = 1
page_size = 100 # 한 번에 100건씩
while True:
params = {
"page": page,
"size": page_size
}
response = self._request_with_retry(endpoint, params)
records = response.get("data", [])
if not records:
break # 더 이상 데이터 없음
# 레코드를 하나씩 yield
for record in records:
yield record
# 마지막 페이지 체크
if len(records) < page_size:
break
page += 1
print(f"페이지 {page} 처리 중...")
제너레이터를 사용한 이유:
- 메모리 효율: 전체 데이터를 메모리에 올리지 않음
- 스트리밍 처리: 가져오는 즉시 DB에 저장 가능
- 실패 지점 파악 용이
세 번째 도전: 중복 데이터 처리
문제 상황
마이그레이션을 반복 실행하면:
- 이미 있는 데이터가 중복 삽입됨
- Primary Key 충돌로 에러 발생
- 업데이트된 데이터가 반영 안 됨
해결: PostgreSQL Upsert
PostgreSQL의 ON CONFLICT DO UPDATE 구문을 활용했다.
from sqlalchemy.dialects.postgresql import insert
from sqlalchemy.orm import Session
from typing import List, Dict, Any
class DataMigrator:
def __init__(self, session: Session):
self.session = session
self.batch_size = 500
def upsert_batch(self, table, records: List[Dict[str, Any]], key_columns: List[str]):
"""배치 단위 Upsert 처리"""
if not records:
return
stmt = insert(table).values(records)
# Primary Key 충돌 시 업데이트할 컬럼들
update_columns = {
col.name: col
for col in stmt.excluded
if col.name not in key_columns
}
upsert_stmt = stmt.on_conflict_do_update(
index_elements=key_columns,
set_=update_columns
)
self.session.execute(upsert_stmt)
self.session.commit()
Upsert의 장점:
- 존재하면 업데이트, 없으면 삽입
- 단일 쿼리로 처리 → 성능 우수
- 트랜잭션 안전성 보장
배치 처리로 성능 최적화
레코드를 하나씩 처리하면 너무 느리다. 500건씩 배치로 처리했다.
def migrate_endpoint(self, endpoint: str, table, key_columns: List[str]):
"""특정 엔드포인트의 전체 데이터를 마이그레이션"""
batch = []
total_count = 0
for record in self.api_client.fetch_all(endpoint):
# 데이터 정제
cleaned = self._clean_record(record)
batch.append(cleaned)
# 배치가 차면 DB에 저장
if len(batch) >= self.batch_size:
self.upsert_batch(table, batch, key_columns)
total_count += len(batch)
print(f"{total_count}건 처리 완료")
batch = []
# 남은 배치 처리
if batch:
self.upsert_batch(table, batch, key_columns)
total_count += len(batch)
print(f"총 {total_count}건 마이그레이션 완료")
네 번째 도전: 효율적인 동기화
문제 상황
매일 전체 데이터를 다시 가져오면:
- 불필요한 API 호출
- 네트워크 비용 증가
- 처리 시간 낭비
해결: 증분 동기화 (Incremental Sync)
변경된 데이터만 가져오도록 구현했다.
from datetime import datetime, timedelta
class DataMigrator:
# ... (이전 코드)
def incremental_sync(self, endpoint: str, table, key_columns: List[str], since_date: str):
"""특정 날짜 이후 변경된 데이터만 동기화"""
batch = []
total_count = 0
# 날짜 필터 파라미터 추가
for record in self.api_client.fetch_all(
endpoint,
extra_params={"modified_after": since_date}
):
cleaned = self._clean_record(record)
batch.append(cleaned)
if len(batch) >= self.batch_size:
self.upsert_batch(table, batch, key_columns)
total_count += len(batch)
batch = []
if batch:
self.upsert_batch(table, batch, key_columns)
total_count += len(batch)
print(f"증분 동기화: {total_count}건 업데이트")
동기화 전략
상황 전략 주기
| 최초 구축 | 전체 마이그레이션 | 1회 |
| 일상 운영 | 증분 동기화 | 매일 새벽 |
| 학기 초 | 전체 마이그레이션 | 학기당 1회 |
| 장애 복구 | 전체 마이그레이션 | 필요시 |
데이터 정제 처리
외부 API 데이터는 그대로 쓸 수 없는 경우가 많았다.
마주한 데이터 품질 이슈
- NULL 값 처리: 빈 문자열 vs NULL 혼재
- 날짜 형식: 다양한 포맷 (YYYYMMDD, YYYY-MM-DD 등)
- 불필요한 공백: 앞뒤 공백, 중간 이중 공백
- 인코딩 문제: 특수문자 깨짐
정제 로직 구현
from typing import Dict, Any, Optional
import re
class DataCleaner:
@staticmethod
def clean_string(value: Optional[str]) -> Optional[str]:
"""문자열 정제"""
if value is None:
return None
# 빈 문자열은 NULL로
value = value.strip()
if not value:
return None
# 이중 공백 제거
value = re.sub(r'\\s+', ' ', value)
return value
@staticmethod
def parse_date(value: Optional[str]) -> Optional[str]:
"""날짜 형식 통일 (ISO 8601)"""
if not value:
return None
# YYYYMMDD -> YYYY-MM-DD
if re.match(r'^\\d{8}$', value):
return f"{value[:4]}-{value[4:6]}-{value[6:8]}"
return value
def clean_record(self, record: Dict[str, Any]) -> Dict[str, Any]:
"""레코드 전체 정제"""
cleaned = {}
for key, value in record.items():
if isinstance(value, str):
cleaned[key] = self.clean_string(value)
else:
cleaned[key] = value
# 날짜 필드 처리
date_fields = ['created_at', 'updated_at', 'start_date', 'end_date']
for field in date_fields:
if field in cleaned:
cleaned[field] = self.parse_date(cleaned[field])
return cleaned
CLI 인터페이스 설계
마이그레이션을 쉽게 실행할 수 있도록 CLI를 구현했다.
import typer
from typing import Optional, List
app = typer.Typer()
@app.command()
def migrate(
api: Optional[List[str]] = typer.Option(
None, "--api", "-a",
help="마이그레이션할 API (students, courses, activities 등)"
),
incremental: bool = typer.Option(
False, "--incremental", "-i",
help="증분 동기화 모드"
),
date: Optional[str] = typer.Option(
None, "--date", "-d",
help="증분 동기화 기준 날짜 (YYYYMMDD)"
),
test: bool = typer.Option(
False, "--test", "-t",
help="연결 테스트만 수행"
),
verbose: bool = typer.Option(
False, "--verbose", "-v",
help="상세 로그 출력"
)
):
"""외부 API 데이터를 DB로 마이그레이션"""
if test:
# 연결 테스트
test_connections()
return
migrator = DataMigrator()
# API 미지정시 전체 실행
targets = api or ['students', 'courses', 'departments', 'activities', 'interests', 'user_settings']
for target in targets:
if incremental and date:
migrator.incremental_sync(target, date)
else:
migrator.full_migrate(target)
실행 예시
# 전체 마이그레이션
python main.py migrate
# 특정 API만
python main.py migrate --api students --api courses
# 증분 동기화
python main.py migrate --incremental --date 20240101
# 연결 테스트
python main.py migrate --test
최종 파이프라인 구조
┌─────────────────────────────────────────────────────────┐
│ 외부 학사 API │
└─────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ API Client │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 재시도 로직 │ │ 자동 페이징 │ │ 인증 처리 │ │
│ │ (3회, 지수) │ │ (100건/페이지)│ │ (API Key) │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Data Cleaner │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ NULL 처리 │ │ 날짜 통일 │ │ 공백 제거 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ Migrator │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────┐ │
│ │ 배치 처리 │ │ Upsert │ │ 트랜잭션 │ │
│ │ (500건) │ │ (충돌 처리) │ │ 관리 │ │
│ └─────────────┘ └─────────────┘ └─────────────┘ │
└─────────────────────────┬───────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────┐
│ PostgreSQL │
│ students | courses | departments | activities | ... │
└─────────────────────────────────────────────────────────┘
성과 및 교훈
성과
지표 수치
| 전체 마이그레이션 | 약 18,000건 / 5분 |
| 증분 동기화 | 평균 500건 / 30초 |
| 재시도 성공률 | 99.5% |
| 데이터 정합성 | 100% (Upsert 덕분) |
배운 점
- 외부 시스템은 믿지 말 것: 항상 실패를 가정하고 설계
- 배치 처리는 필수: 대용량 데이터는 나눠서 처리
- Upsert로 멱등성 확보: 몇 번을 실행해도 같은 결과
- CLI는 운영의 핵심: 명령 한 줄로 복잡한 작업 수행
다음 편 예고
데이터 파이프라인이 완성되었으니, 다음 편에서는 XLM-R로 강좌 임베딩 구축하기를 다룬다.
- 왜 XLM-R을 선택했는가?
- 강좌 설명을 벡터로 변환하는 과정
- 배치 임베딩 처리
- 임베딩 품질 검증 방법
시리즈 목차
- 프로젝트 소개 - 왜 AI 추천이 필요했나
- 추천을 위한 데이터 파이프라인 설계 ← 현재 글
- XLM-R로 강좌 임베딩 구축하기
- 추천 알고리즘 설계 - 가중치 기반 개인화
- LLM으로 강좌 설명 보강하기
- 성능 최적화 - CPU에서 27배 빠르게
- API 서버 구축과 배포
'프로그래밍 > AI 교과 추천 시스템 개발기' 카테고리의 다른 글
| 6편. 성능 최적화 - CPU에서 27배 빠르게 (0) | 2026.02.20 |
|---|---|
| 5편. LLM으로 강좌 설명 보강하기 (0) | 2026.02.19 |
| 4편. 추천 알고리즘 설계 - 가중치 기반 개인화 (0) | 2026.02.18 |
| 3편. XLM-R로 강좌 임베딩 구축하기 (0) | 2026.02.17 |
| 1편. 프로젝트 소개 - 왜 AI 추천이 필요했나 (0) | 2026.02.15 |