198 lines
7.4 KiB
Python
198 lines
7.4 KiB
Python
from functools import wraps
|
|
from typing import Callable, Any
|
|
import inspect
|
|
|
|
from fastapi import Depends
|
|
from app.core.auth_dependency import get_current_user, AuthContext
|
|
from app.core.responses import ApiError
|
|
|
|
|
|
def require_app_permission(permission: str):
|
|
"""Decorator برای بررسی دسترسی در سطح اپلیکیشن"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs) -> Any:
|
|
# پیدا کردن AuthContext در kwargs
|
|
ctx = None
|
|
for key, value in kwargs.items():
|
|
if isinstance(value, AuthContext):
|
|
ctx = value
|
|
break
|
|
|
|
if not ctx:
|
|
raise ApiError("UNAUTHORIZED", "Authentication required", http_status=401)
|
|
|
|
if not ctx.has_app_permission(permission):
|
|
raise ApiError("FORBIDDEN", f"Missing app permission: {permission}", http_status=403)
|
|
return await func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_business_permission(section: str, action: str):
|
|
"""Decorator برای بررسی دسترسی در سطح کسب و کار"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs) -> Any:
|
|
ctx = get_current_user()
|
|
if not ctx.has_business_permission(section, action):
|
|
raise ApiError("FORBIDDEN", f"Missing business permission: {section}.{action}", http_status=403)
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_any_permission(section: str, action: str):
|
|
"""Decorator برای بررسی دسترسی در هر دو سطح (app یا business)"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs) -> Any:
|
|
ctx = get_current_user()
|
|
if not ctx.has_any_permission(section, action):
|
|
raise ApiError("FORBIDDEN", f"Missing permission: {section}.{action}", http_status=403)
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_superadmin():
|
|
"""Decorator برای بررسی superadmin بودن"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs) -> Any:
|
|
ctx = get_current_user()
|
|
if not ctx.is_superadmin():
|
|
raise ApiError("FORBIDDEN", "Superadmin access required", http_status=403)
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_business_access(business_id_param: str = "business_id"):
|
|
"""Decorator برای بررسی دسترسی به کسب و کار خاص.
|
|
امضای اصلی endpoint حفظ میشود و Request از آرگومانها استخراج میگردد.
|
|
"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs) -> Any:
|
|
import logging
|
|
from fastapi import Request
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# یافتن Request در args/kwargs
|
|
request = None
|
|
for arg in args:
|
|
if isinstance(arg, Request):
|
|
request = arg
|
|
break
|
|
if request is None:
|
|
request = kwargs.get('request')
|
|
if request is None:
|
|
logger.error("Request not found in function arguments")
|
|
raise ApiError("INTERNAL_ERROR", "Request not found", http_status=500)
|
|
|
|
# دسترسی به DB و کاربر
|
|
from adapters.db.session import get_db
|
|
db = next(get_db())
|
|
ctx = get_current_user(request, db)
|
|
|
|
# استخراج business_id از kwargs یا path params
|
|
business_id = kwargs.get(business_id_param)
|
|
if business_id is None:
|
|
try:
|
|
business_id = request.path_params.get(business_id_param)
|
|
except Exception:
|
|
business_id = None
|
|
|
|
if business_id and not ctx.can_access_business(int(business_id)):
|
|
logger.warning(f"User {ctx.get_user_id()} does not have access to business {business_id}")
|
|
raise ApiError("FORBIDDEN", f"No access to business {business_id}", http_status=403)
|
|
|
|
return func(*args, **kwargs)
|
|
# Preserve original signature so FastAPI sees correct parameters (including Request)
|
|
wrapper.__signature__ = inspect.signature(func) # type: ignore[attr-defined]
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
# Decorator های ترکیبی برای استفاده آسان
|
|
def require_sales_write():
|
|
"""دسترسی نوشتن در بخش فروش"""
|
|
return require_any_permission("sales", "write")
|
|
|
|
|
|
def require_sales_delete():
|
|
"""دسترسی حذف در بخش فروش"""
|
|
return require_any_permission("sales", "delete")
|
|
|
|
|
|
def require_sales_approve():
|
|
"""دسترسی تأیید در بخش فروش"""
|
|
return require_any_permission("sales", "approve")
|
|
|
|
|
|
def require_purchases_write():
|
|
"""دسترسی نوشتن در بخش خرید"""
|
|
return require_any_permission("purchases", "write")
|
|
|
|
|
|
def require_accounting_write():
|
|
"""دسترسی نوشتن در بخش حسابداری"""
|
|
return require_any_permission("accounting", "write")
|
|
|
|
|
|
def require_inventory_write():
|
|
"""دسترسی نوشتن در بخش موجودی"""
|
|
return require_any_permission("inventory", "write")
|
|
|
|
|
|
def require_reports_export():
|
|
"""دسترسی صادرات گزارش"""
|
|
return require_any_permission("reports", "export")
|
|
|
|
|
|
def require_settings_manage_users():
|
|
"""دسترسی مدیریت کاربران کسب و کار"""
|
|
return require_any_permission("settings", "manage_users")
|
|
|
|
|
|
def require_user_management():
|
|
"""دسترسی مدیریت کاربران در سطح اپلیکیشن"""
|
|
return require_app_permission("user_management")
|
|
|
|
|
|
def require_business_management():
|
|
"""دسترسی مدیریت کسب و کارها"""
|
|
return require_app_permission("business_management")
|
|
|
|
|
|
def require_system_settings():
|
|
"""دسترسی تنظیمات سیستم"""
|
|
return require_app_permission("system_settings")
|
|
|
|
|
|
def require_permission(permission: str):
|
|
"""Decorator عمومی برای بررسی دسترسی - wrapper برای require_app_permission"""
|
|
return require_app_permission(permission)
|
|
|
|
|
|
# =========================
|
|
# FastAPI Dependencies (for Depends)
|
|
# =========================
|
|
def require_app_permission_dep(permission: str):
|
|
"""FastAPI dependency جهت بررسی دسترسی در سطح اپلیکیشن.
|
|
|
|
استفاده:
|
|
_: None = Depends(require_app_permission_dep("business_management"))
|
|
"""
|
|
def _dependency(auth_context: AuthContext = Depends(get_current_user)) -> None:
|
|
if not auth_context.has_app_permission(permission):
|
|
raise ApiError("FORBIDDEN", f"Missing app permission: {permission}", http_status=403)
|
|
return _dependency
|
|
|
|
|
|
def require_business_management_dep(auth_context: AuthContext = Depends(get_current_user)) -> None:
|
|
"""FastAPI dependency برای بررسی مجوز مدیریت کسب و کارها."""
|
|
if not auth_context.has_app_permission("business_management"):
|
|
raise ApiError("FORBIDDEN", "Missing app permission: business_management", http_status=403)
|