Files
web-utils/backend/app/tools/image_ops.py
2026-01-28 15:33:47 +09:00

145 lines
5.2 KiB
Python

import shutil
import subprocess
import uuid
import os
import zipfile
from pathlib import Path
from typing import List
from fastapi import APIRouter, UploadFile, File, HTTPException, BackgroundTasks
from fastapi.responses import FileResponse, JSONResponse
from app.core.cleanup import TEMP_DIR
router = APIRouter()
def get_png_color_type(file_path: Path) -> int | None:
"""PNG 파일의 color type 확인 (참조 문서 로직)"""
try:
with open(file_path, "rb") as f:
header = f.read(26)
if header[:8] != b'\x89PNG\r\n\x1a\n':
return None
return header[25]
except Exception:
return None
@router.post("/png-compress")
async def compress_png_files(files: List[UploadFile] = File(...)):
"""
업로드된 PNG 파일들을 pngquant로 압축하고 통계를 반환합니다.
"""
# 작업 ID 생성 (폴더 분리용)
job_id = str(uuid.uuid4())
job_dir = TEMP_DIR / job_id
job_dir.mkdir(parents=True, exist_ok=True)
results = []
compressed_files = []
try:
for file in files:
if not file.filename.lower().endswith('.png'):
continue
# 1. 파일 저장
file_path = job_dir / file.filename
with open(file_path, "wb") as buffer:
shutil.copyfileobj(file.file, buffer)
orig_size = file_path.stat().st_size
# 2. 이미 압축된 파일인지 확인 (Color Type 3 = Indexed)
color_type = get_png_color_type(file_path)
if color_type == 3:
# 이미 압축됨 -> 그냥 결과에 추가 (압축 안 함)
results.append({
"filename": file.filename,
"original_size": orig_size,
"compressed_size": orig_size,
"ratio": 0.0,
"status": "Skipped (Already Indexed)"
})
compressed_files.append(file_path)
continue
# 3. pngquant 실행
output_path = job_dir / f"compressed_{file.filename}"
try:
# --force: 덮어쓰기 허용, --quality: 품질 설정
cmd = [
"pngquant",
"--quality", "65-80",
"--force",
"--output", str(output_path),
str(file_path)
]
subprocess.run(cmd, check=True, capture_output=True)
if output_path.exists():
comp_size = output_path.stat().st_size
ratio = (1 - comp_size / orig_size) * 100
results.append({
"filename": file.filename,
"original_size": orig_size,
"compressed_size": comp_size,
"ratio": round(ratio, 1),
"status": "Success"
})
compressed_files.append(output_path)
else:
raise Exception("Output file not created")
except subprocess.CalledProcessError:
# 압축 실패 (품질 기준 미달 등) -> 원본 유지
results.append({
"filename": file.filename,
"original_size": orig_size,
"compressed_size": orig_size,
"ratio": 0.0,
"status": "Failed (Quality check)"
})
compressed_files.append(file_path)
# 4. ZIP 생성
zip_filename = f"compressed_images_{job_id}.zip"
zip_path = TEMP_DIR / zip_filename
with zipfile.ZipFile(zip_path, 'w') as zipf:
for f in compressed_files:
# ZIP 내부에는 원래 파일명으로 저장
arcname = f.name.replace("compressed_", "")
zipf.write(f, arcname)
return {
"job_id": job_id,
"zip_filename": zip_filename,
"stats": results
}
except Exception as e:
cleanup_files([job_dir])
raise HTTPException(status_code=500, detail=str(e))
@router.get("/download/{filename}")
async def download_file(filename: str):
file_path = TEMP_DIR / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
# 즉시 삭제하지 않고 스케줄러(10분 후)에 맡김
return FileResponse(file_path, filename=filename)
@router.get("/download-single/{job_id}/{filename}")
async def download_single_file(job_id: str, filename: str):
"""개별 파일 다운로드 (삭제 예약 없음 - 전체 ZIP 다운로드 등을 위해 유지)"""
file_path = TEMP_DIR / job_id / f"compressed_{filename}"
# 만약 compressed_ 접두사가 없다면 원본 파일일 수도 있음 (압축 실패 시)
if not file_path.exists():
file_path = TEMP_DIR / job_id / filename
if not file_path.exists():
raise HTTPException(status_code=404, detail="File not found")
return FileResponse(file_path, filename=filename)