- Add main application files (main.py, models.py, schemas.py, etc.) - Add routers for all features (waiting, attendance, members, etc.) - Add HTML templates for admin and user interfaces - Add migration scripts and utility files - Add Docker configuration - Add documentation files - Add .gitignore to exclude database and cache files 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
301 lines
10 KiB
Python
301 lines
10 KiB
Python
"""
|
|
인증 유틸리티
|
|
- 비밀번호 해싱 및 검증
|
|
- JWT 토큰 생성 및 검증
|
|
- 현재 사용자 가져오기 (dependency)
|
|
"""
|
|
|
|
from datetime import datetime, timedelta
|
|
from typing import Optional
|
|
from fastapi import Depends, HTTPException, status, Cookie, Request, Query
|
|
from fastapi.security import OAuth2PasswordBearer
|
|
from jose import JWTError, jwt
|
|
import bcrypt
|
|
from sqlalchemy.orm import Session
|
|
|
|
from database import get_db
|
|
from models import User, Store
|
|
from schemas import TokenData
|
|
|
|
# 설정
|
|
SECRET_KEY = "your-secret-key-change-this-in-production-123456789" # 실제 환경에서는 환경변수로 관리
|
|
ALGORITHM = "HS256"
|
|
ACCESS_TOKEN_EXPIRE_HOURS = 8
|
|
|
|
# OAuth2 (선택적 토큰, 쿠키도 지원)
|
|
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False)
|
|
|
|
|
|
def verify_password(plain_password: str, hashed_password: str) -> bool:
|
|
"""비밀번호 검증"""
|
|
try:
|
|
password_bytes = plain_password.encode('utf-8')
|
|
hashed_bytes = hashed_password.encode('utf-8')
|
|
return bcrypt.checkpw(password_bytes, hashed_bytes)
|
|
except Exception:
|
|
return False
|
|
|
|
|
|
def get_password_hash(password: str) -> str:
|
|
"""비밀번호 해싱"""
|
|
password_bytes = password.encode('utf-8')
|
|
salt = bcrypt.gensalt()
|
|
hashed = bcrypt.hashpw(password_bytes, salt)
|
|
return hashed.decode('utf-8')
|
|
|
|
|
|
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
|
|
"""JWT 액세스 토큰 생성"""
|
|
to_encode = data.copy()
|
|
if expires_delta:
|
|
expire = datetime.utcnow() + expires_delta
|
|
else:
|
|
expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
|
|
|
|
to_encode.update({"exp": expire})
|
|
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
|
|
return encoded_jwt
|
|
|
|
|
|
def decode_access_token(token: str) -> Optional[TokenData]:
|
|
"""JWT 토큰 디코딩"""
|
|
try:
|
|
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
|
|
username: str = payload.get("sub")
|
|
if username is None:
|
|
return None
|
|
return TokenData(username=username)
|
|
except JWTError:
|
|
return None
|
|
|
|
|
|
async def get_current_user(
|
|
token: Optional[str] = Depends(oauth2_scheme),
|
|
access_token: Optional[str] = Cookie(None),
|
|
token_query: Optional[str] = Query(None, alias="token"),
|
|
db: Session = Depends(get_db)
|
|
) -> User:
|
|
"""현재 로그인한 사용자 가져오기
|
|
|
|
Args (토큰 우선순위):
|
|
1. Authorization 헤더의 Bearer 토큰
|
|
2. Cookie의 access_token
|
|
3. Query Parameter의 token (SSE용)
|
|
|
|
Raises:
|
|
HTTPException: 인증 실패 시
|
|
"""
|
|
credentials_exception = HTTPException(
|
|
status_code=status.HTTP_401_UNAUTHORIZED,
|
|
detail="인증이 필요합니다",
|
|
headers={"WWW-Authenticate": "Bearer"},
|
|
)
|
|
|
|
# 토큰 가져오기 (Authorization 헤더 우선, 그 다음 쿠키, 마지막으로 쿼리 파라미터)
|
|
token_to_use = token if token else (access_token if access_token else token_query)
|
|
if not token_to_use:
|
|
raise credentials_exception
|
|
|
|
# 토큰 디코딩
|
|
token_data = decode_access_token(token_to_use)
|
|
if token_data is None or token_data.username is None:
|
|
raise credentials_exception
|
|
|
|
# 사용자 조회
|
|
user = db.query(User).filter(User.username == token_data.username).first()
|
|
if user is None:
|
|
raise credentials_exception
|
|
|
|
if not user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="비활성화된 사용자입니다"
|
|
)
|
|
|
|
return user
|
|
|
|
|
|
async def get_current_active_user(
|
|
current_user: User = Depends(get_current_user)
|
|
) -> User:
|
|
"""현재 활성화된 사용자 가져오기"""
|
|
if not current_user.is_active:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="비활성화된 사용자입니다"
|
|
)
|
|
return current_user
|
|
|
|
|
|
async def require_franchise_admin(
|
|
current_user: User = Depends(get_current_user)
|
|
) -> User:
|
|
"""프랜차이즈 관리자(최종/중간) 또는 시스템 관리자 권한 필요"""
|
|
if current_user.role not in ["franchise_admin", "system_admin", "franchise_manager"]:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="프랜차이즈 관리자 권한이 필요합니다"
|
|
)
|
|
return current_user
|
|
|
|
|
|
async def require_system_admin(
|
|
current_user: User = Depends(get_current_user)
|
|
) -> User:
|
|
"""시스템 관리자 권한 필요 (최상위 관리자)"""
|
|
if current_user.role != "system_admin":
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="시스템 관리자 권한이 필요합니다"
|
|
)
|
|
return current_user
|
|
|
|
|
|
async def get_current_store(
|
|
current_user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db),
|
|
store_id: int = None,
|
|
request: Request = None
|
|
) -> Store:
|
|
"""현재 사용자의 매장 가져오기
|
|
|
|
- franchise_admin: 매장 선택 필요 (헤더, 쿼리 파라미터, 또는 첫 번째 매장)
|
|
- franchise_manager: 관리 권한이 있는 매장 중에서만 선택 가능
|
|
- store_admin: 자신의 매장 자동 반환
|
|
"""
|
|
if current_user.role == "store_admin":
|
|
if not current_user.store_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_400_BAD_REQUEST,
|
|
detail="매장 정보가 없습니다"
|
|
)
|
|
|
|
store = db.query(Store).filter(Store.id == current_user.store_id).first()
|
|
if not store:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="매장을 찾을 수 없습니다"
|
|
)
|
|
return store
|
|
|
|
# system_admin은 모든 매장 접근 가능 (활성화 상태 무관)
|
|
if current_user.role == "system_admin":
|
|
selected_store_id = store_id
|
|
|
|
if not selected_store_id and request:
|
|
# X-Store-Id 헤더에서 가져오기
|
|
store_id_header = request.headers.get('X-Store-Id')
|
|
if store_id_header:
|
|
try:
|
|
selected_store_id = int(store_id_header)
|
|
except ValueError:
|
|
pass
|
|
|
|
if selected_store_id:
|
|
store = db.query(Store).filter(Store.id == selected_store_id).first()
|
|
if store:
|
|
return store
|
|
|
|
# 기본값: 첫 번째 매장 (활성화 상태 무관)
|
|
store = db.query(Store).first()
|
|
if not store:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="매장이 없습니다"
|
|
)
|
|
return store
|
|
|
|
# franchise_admin 또는 franchise_manager인 경우
|
|
selected_store_id = store_id
|
|
|
|
if not selected_store_id and request:
|
|
# X-Store-Id 헤더에서 가져오기
|
|
store_id_header = request.headers.get('X-Store-Id')
|
|
if store_id_header:
|
|
try:
|
|
selected_store_id = int(store_id_header)
|
|
except ValueError:
|
|
pass
|
|
|
|
if selected_store_id:
|
|
# 선택된 매장 검증
|
|
if current_user.role == 'franchise_manager':
|
|
# 관리 매장인지 확인
|
|
# active check is theoretically needed but relationship might include inactive
|
|
managed_ids = [s.id for s in current_user.managed_stores]
|
|
if selected_store_id not in managed_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="해당 매장에 대한 접근 권한이 없습니다"
|
|
)
|
|
|
|
# 선택된 매장 가져오기 (프랜차이즈 소유 확인, 활성화 상태 무관)
|
|
store = db.query(Store).filter(
|
|
Store.id == selected_store_id,
|
|
Store.franchise_id == current_user.franchise_id
|
|
).first()
|
|
|
|
if store:
|
|
return store
|
|
|
|
# 기본값 처리 (선택된 매장이 없거나 잘못된 경우)
|
|
if current_user.role == 'franchise_manager':
|
|
# 첫 번째 관리 매장
|
|
if current_user.managed_stores:
|
|
return current_user.managed_stores[0]
|
|
else:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="관리할 수 있는 매장이 없습니다"
|
|
)
|
|
|
|
# franchise_admin: 프랜차이즈의 첫 번째 활성 매장
|
|
store = db.query(Store).filter(
|
|
Store.franchise_id == current_user.franchise_id,
|
|
Store.is_active == True
|
|
).first()
|
|
|
|
if not store:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_404_NOT_FOUND,
|
|
detail="활성화된 매장이 없습니다"
|
|
)
|
|
return store
|
|
|
|
|
|
def require_store_access(store_id: int):
|
|
"""특정 매장 접근 권한 체크"""
|
|
async def check_access(
|
|
current_user: User = Depends(get_current_user),
|
|
db: Session = Depends(get_db)
|
|
):
|
|
# franchise_admin은 모든 매장 접근 가능
|
|
if current_user.role == "franchise_admin":
|
|
return current_user
|
|
|
|
# franchise_manager는 관리 매장만 접근 가능
|
|
if current_user.role == "franchise_manager":
|
|
managed_ids = [s.id for s in current_user.managed_stores]
|
|
if store_id not in managed_ids:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="해당 매장에 대한 권한이 없습니다"
|
|
)
|
|
return current_user
|
|
|
|
# store_admin은 자신의 매장만 접근 가능
|
|
if current_user.role == "store_admin":
|
|
if current_user.store_id != store_id:
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="해당 매장에 대한 권한이 없습니다"
|
|
)
|
|
return current_user
|
|
|
|
raise HTTPException(
|
|
status_code=status.HTTP_403_FORBIDDEN,
|
|
detail="권한이 없습니다"
|
|
)
|
|
|
|
return check_access
|