Files
waiting-system/sse_manager.py
Jun-dev f699a29a85 Add waiting system application files
- Add main application files (main.py, models.py, schemas.py, etc.)
- Add routers for all features (waiting, attendance, members, etc.)
- Add HTML templates for admin and user interfaces
- Add migration scripts and utility files
- Add Docker configuration
- Add documentation files
- Add .gitignore to exclude database and cache files

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude Sonnet 4.5 <noreply@anthropic.com>
2025-12-14 00:29:39 +09:00

142 lines
5.6 KiB
Python

from typing import Dict, Set
import asyncio
from fastapi import Request
from starlette.responses import StreamingResponse
import json
class SSEConnectionManager:
"""SSE 연결 관리자"""
def __init__(self):
# store_id별로 연결된 클라이언트들을 관리
self.active_connections: Dict[str, Set[asyncio.Queue]] = {}
# franchise_id별로 연결된 클라이언트들을 관리 (프랜차이즈 관리자용)
self.franchise_connections: Dict[str, Set[asyncio.Queue]] = {}
async def connect(self, store_id: str) -> asyncio.Queue:
"""새로운 SSE 연결 추가 (매장용)"""
if store_id not in self.active_connections:
self.active_connections[store_id] = set()
queue = asyncio.Queue()
self.active_connections[store_id].add(queue)
return queue
async def connect_franchise(self, franchise_id: str) -> asyncio.Queue:
"""새로운 SSE 연결 추가 (프랜차이즈 관리자용)"""
if franchise_id not in self.franchise_connections:
self.franchise_connections[franchise_id] = set()
queue = asyncio.Queue()
self.franchise_connections[franchise_id].add(queue)
return queue
def disconnect(self, store_id: str, queue: asyncio.Queue):
"""SSE 연결 제거 (매장용)"""
if store_id in self.active_connections:
self.active_connections[store_id].discard(queue)
if not self.active_connections[store_id]:
del self.active_connections[store_id]
def disconnect_franchise(self, franchise_id: str, queue: asyncio.Queue):
"""SSE 연결 제거 (프랜차이즈 관리자용)"""
if franchise_id in self.franchise_connections:
self.franchise_connections[franchise_id].discard(queue)
if not self.franchise_connections[franchise_id]:
del self.franchise_connections[franchise_id]
async def broadcast(self, store_id: str, event_type: str, data: dict = None, franchise_id: str = None):
"""특정 매장의 모든 연결된 클라이언트에게 메시지 브로드캐스트 (프랜차이즈 관리자 포함)"""
message = {
"event": event_type,
"data": data or {},
"store_id": store_id # 프랜차이즈 관리자가 어떤 매장의 이벤트인지 알 수 있게 추가
}
# 1. 매장별 연결된 클라이언트에게 전송
if store_id in self.active_connections:
disconnected_queues = []
for queue in self.active_connections[store_id]:
try:
await queue.put(message)
except Exception:
disconnected_queues.append(queue)
for queue in disconnected_queues:
self.disconnect(store_id, queue)
# 2. 프랜차이즈 관리자에게 전송 (franchise_id가 제공된 경우)
if franchise_id and franchise_id in self.franchise_connections:
disconnected_franchise_queues = []
for queue in self.franchise_connections[franchise_id]:
try:
await queue.put(message)
except Exception:
disconnected_franchise_queues.append(queue)
for queue in disconnected_franchise_queues:
self.disconnect_franchise(franchise_id, queue)
async def send_personal_message(self, store_id: str, queue: asyncio.Queue, event_type: str, data: dict = None):
"""특정 클라이언트에게만 메시지 전송"""
message = {
"event": event_type,
"data": data or {}
}
try:
await queue.put(message)
except Exception:
self.disconnect(store_id, queue)
# 전역 SSE 매니저 인스턴스
sse_manager = SSEConnectionManager()
async def event_generator(queue: asyncio.Queue):
"""SSE 이벤트 스트림 생성기"""
try:
# 연결 확인용 초기 메시지 (표준 형식을 따름)
initial_message = {
"event": "connected",
"data": {}
}
yield f"data: {json.dumps(initial_message)}\n\n"
while True:
try:
# 큐에서 메시지 대기 (타임아웃 적용으로 heartbeat 구현)
# 30초 동안 메시지가 없으면 heartbeat 전송
message = await asyncio.wait_for(queue.get(), timeout=30.0)
# SSE 형식으로 메시지 전송
# 클라이언트의 일관된 처리를 위해 모든 이벤트를 'message' 타입으로 전송하고
# 실제 이벤트 타입은 데이터 페이로드 안에 포함시킴
event_type = message.get("event", "message")
data = message.get("data", {})
payload = {
"event": event_type,
"data": data
}
# 프랜차이즈 관리자를 위해 store_id 포함
if "store_id" in message:
payload["store_id"] = message["store_id"]
yield f"data: {json.dumps(payload)}\n\n"
except asyncio.TimeoutError:
# Heartbeat (keep-alive)
# 연결 유지를 위한 ping 메시지
ping_message = {
"event": "ping",
"data": {"timestamp": asyncio.get_event_loop().time()}
}
yield f"data: {json.dumps(ping_message)}\n\n"
except asyncio.CancelledError:
# 클라이언트 연결 종료
pass