Files
waiting-system/auth.py
Jun-dev f699a29a85 Add waiting system application files
- Add main application files (main.py, models.py, schemas.py, etc.)
- Add routers for all features (waiting, attendance, members, etc.)
- Add HTML templates for admin and user interfaces
- Add migration scripts and utility files
- Add Docker configuration
- Add documentation files
- Add .gitignore to exclude database and cache files

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-14 00:29:39 +09:00

301 lines
10 KiB
Python

"""
인증 유틸리티
- 비밀번호 해싱 및 검증
- JWT 토큰 생성 및 검증
- 현재 사용자 가져오기 (dependency)
"""
from datetime import datetime, timedelta
from typing import Optional
from fastapi import Depends, HTTPException, status, Cookie, Request, Query
from fastapi.security import OAuth2PasswordBearer
from jose import JWTError, jwt
import bcrypt
from sqlalchemy.orm import Session
from database import get_db
from models import User, Store
from schemas import TokenData
# 설정
SECRET_KEY = "your-secret-key-change-this-in-production-123456789" # 실제 환경에서는 환경변수로 관리
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_HOURS = 8
# OAuth2 (선택적 토큰, 쿠키도 지원)
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="/api/auth/login", auto_error=False)
def verify_password(plain_password: str, hashed_password: str) -> bool:
"""비밀번호 검증"""
try:
password_bytes = plain_password.encode('utf-8')
hashed_bytes = hashed_password.encode('utf-8')
return bcrypt.checkpw(password_bytes, hashed_bytes)
except Exception:
return False
def get_password_hash(password: str) -> str:
"""비밀번호 해싱"""
password_bytes = password.encode('utf-8')
salt = bcrypt.gensalt()
hashed = bcrypt.hashpw(password_bytes, salt)
return hashed.decode('utf-8')
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None) -> str:
"""JWT 액세스 토큰 생성"""
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(hours=ACCESS_TOKEN_EXPIRE_HOURS)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
def decode_access_token(token: str) -> Optional[TokenData]:
"""JWT 토큰 디코딩"""
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
return None
return TokenData(username=username)
except JWTError:
return None
async def get_current_user(
token: Optional[str] = Depends(oauth2_scheme),
access_token: Optional[str] = Cookie(None),
token_query: Optional[str] = Query(None, alias="token"),
db: Session = Depends(get_db)
) -> User:
"""현재 로그인한 사용자 가져오기
Args (토큰 우선순위):
1. Authorization 헤더의 Bearer 토큰
2. Cookie의 access_token
3. Query Parameter의 token (SSE용)
Raises:
HTTPException: 인증 실패 시
"""
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="인증이 필요합니다",
headers={"WWW-Authenticate": "Bearer"},
)
# 토큰 가져오기 (Authorization 헤더 우선, 그 다음 쿠키, 마지막으로 쿼리 파라미터)
token_to_use = token if token else (access_token if access_token else token_query)
if not token_to_use:
raise credentials_exception
# 토큰 디코딩
token_data = decode_access_token(token_to_use)
if token_data is None or token_data.username is None:
raise credentials_exception
# 사용자 조회
user = db.query(User).filter(User.username == token_data.username).first()
if user is None:
raise credentials_exception
if not user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="비활성화된 사용자입니다"
)
return user
async def get_current_active_user(
current_user: User = Depends(get_current_user)
) -> User:
"""현재 활성화된 사용자 가져오기"""
if not current_user.is_active:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="비활성화된 사용자입니다"
)
return current_user
async def require_franchise_admin(
current_user: User = Depends(get_current_user)
) -> User:
"""프랜차이즈 관리자(최종/중간) 또는 시스템 관리자 권한 필요"""
if current_user.role not in ["franchise_admin", "system_admin", "franchise_manager"]:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="프랜차이즈 관리자 권한이 필요합니다"
)
return current_user
async def require_system_admin(
current_user: User = Depends(get_current_user)
) -> User:
"""시스템 관리자 권한 필요 (최상위 관리자)"""
if current_user.role != "system_admin":
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="시스템 관리자 권한이 필요합니다"
)
return current_user
async def get_current_store(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db),
store_id: int = None,
request: Request = None
) -> Store:
"""현재 사용자의 매장 가져오기
- franchise_admin: 매장 선택 필요 (헤더, 쿼리 파라미터, 또는 첫 번째 매장)
- franchise_manager: 관리 권한이 있는 매장 중에서만 선택 가능
- store_admin: 자신의 매장 자동 반환
"""
if current_user.role == "store_admin":
if not current_user.store_id:
raise HTTPException(
status_code=status.HTTP_400_BAD_REQUEST,
detail="매장 정보가 없습니다"
)
store = db.query(Store).filter(Store.id == current_user.store_id).first()
if not store:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="매장을 찾을 수 없습니다"
)
return store
# system_admin은 모든 매장 접근 가능 (활성화 상태 무관)
if current_user.role == "system_admin":
selected_store_id = store_id
if not selected_store_id and request:
# X-Store-Id 헤더에서 가져오기
store_id_header = request.headers.get('X-Store-Id')
if store_id_header:
try:
selected_store_id = int(store_id_header)
except ValueError:
pass
if selected_store_id:
store = db.query(Store).filter(Store.id == selected_store_id).first()
if store:
return store
# 기본값: 첫 번째 매장 (활성화 상태 무관)
store = db.query(Store).first()
if not store:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="매장이 없습니다"
)
return store
# franchise_admin 또는 franchise_manager인 경우
selected_store_id = store_id
if not selected_store_id and request:
# X-Store-Id 헤더에서 가져오기
store_id_header = request.headers.get('X-Store-Id')
if store_id_header:
try:
selected_store_id = int(store_id_header)
except ValueError:
pass
if selected_store_id:
# 선택된 매장 검증
if current_user.role == 'franchise_manager':
# 관리 매장인지 확인
# active check is theoretically needed but relationship might include inactive
managed_ids = [s.id for s in current_user.managed_stores]
if selected_store_id not in managed_ids:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="해당 매장에 대한 접근 권한이 없습니다"
)
# 선택된 매장 가져오기 (프랜차이즈 소유 확인, 활성화 상태 무관)
store = db.query(Store).filter(
Store.id == selected_store_id,
Store.franchise_id == current_user.franchise_id
).first()
if store:
return store
# 기본값 처리 (선택된 매장이 없거나 잘못된 경우)
if current_user.role == 'franchise_manager':
# 첫 번째 관리 매장
if current_user.managed_stores:
return current_user.managed_stores[0]
else:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="관리할 수 있는 매장이 없습니다"
)
# franchise_admin: 프랜차이즈의 첫 번째 활성 매장
store = db.query(Store).filter(
Store.franchise_id == current_user.franchise_id,
Store.is_active == True
).first()
if not store:
raise HTTPException(
status_code=status.HTTP_404_NOT_FOUND,
detail="활성화된 매장이 없습니다"
)
return store
def require_store_access(store_id: int):
"""특정 매장 접근 권한 체크"""
async def check_access(
current_user: User = Depends(get_current_user),
db: Session = Depends(get_db)
):
# franchise_admin은 모든 매장 접근 가능
if current_user.role == "franchise_admin":
return current_user
# franchise_manager는 관리 매장만 접근 가능
if current_user.role == "franchise_manager":
managed_ids = [s.id for s in current_user.managed_stores]
if store_id not in managed_ids:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="해당 매장에 대한 권한이 없습니다"
)
return current_user
# store_admin은 자신의 매장만 접근 가능
if current_user.role == "store_admin":
if current_user.store_id != store_id:
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="해당 매장에 대한 권한이 없습니다"
)
return current_user
raise HTTPException(
status_code=status.HTTP_403_FORBIDDEN,
detail="권한이 없습니다"
)
return check_access