174 lines
6.4 KiB
Python
174 lines
6.4 KiB
Python
from __future__ import annotations
|
|
|
|
from functools import wraps
|
|
from typing import Callable, Any
|
|
|
|
from fastapi import Depends
|
|
from app.core.auth_dependency import get_current_user, AuthContext
|
|
from app.core.responses import ApiError
|
|
|
|
|
|
def require_app_permission(permission: str):
|
|
"""Decorator برای بررسی دسترسی در سطح اپلیکیشن"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
async def wrapper(*args, **kwargs) -> Any:
|
|
# پیدا کردن AuthContext در kwargs
|
|
ctx = None
|
|
for key, value in kwargs.items():
|
|
if isinstance(value, AuthContext):
|
|
ctx = value
|
|
break
|
|
|
|
if not ctx:
|
|
raise ApiError("UNAUTHORIZED", "Authentication required", http_status=401)
|
|
|
|
if not ctx.has_app_permission(permission):
|
|
raise ApiError("FORBIDDEN", f"Missing app permission: {permission}", http_status=403)
|
|
return await func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_business_permission(section: str, action: str):
|
|
"""Decorator برای بررسی دسترسی در سطح کسب و کار"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs) -> Any:
|
|
ctx = get_current_user()
|
|
if not ctx.has_business_permission(section, action):
|
|
raise ApiError("FORBIDDEN", f"Missing business permission: {section}.{action}", http_status=403)
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_any_permission(section: str, action: str):
|
|
"""Decorator برای بررسی دسترسی در هر دو سطح (app یا business)"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs) -> Any:
|
|
ctx = get_current_user()
|
|
if not ctx.has_any_permission(section, action):
|
|
raise ApiError("FORBIDDEN", f"Missing permission: {section}.{action}", http_status=403)
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_superadmin():
|
|
"""Decorator برای بررسی superadmin بودن"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs) -> Any:
|
|
ctx = get_current_user()
|
|
if not ctx.is_superadmin():
|
|
raise ApiError("FORBIDDEN", "Superadmin access required", http_status=403)
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
def require_business_access(business_id_param: str = "business_id"):
|
|
"""Decorator برای بررسی دسترسی به کسب و کار خاص"""
|
|
def decorator(func: Callable) -> Callable:
|
|
@wraps(func)
|
|
def wrapper(*args, **kwargs) -> Any:
|
|
import logging
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Find request in args or kwargs
|
|
request = None
|
|
for arg in args:
|
|
if hasattr(arg, 'headers'): # Check if it's a Request object
|
|
request = arg
|
|
break
|
|
|
|
if not request and 'request' in kwargs:
|
|
request = kwargs['request']
|
|
|
|
if not request:
|
|
logger.error("Request not found in function arguments")
|
|
raise ApiError("INTERNAL_ERROR", "Request not found", http_status=500)
|
|
|
|
# Get database session
|
|
from adapters.db.session import get_db
|
|
db = next(get_db())
|
|
ctx = get_current_user(request, db)
|
|
business_id = kwargs.get(business_id_param)
|
|
|
|
logger.info(f"Checking business access for user {ctx.get_user_id()} to business {business_id}")
|
|
logger.info(f"User business_id from context: {ctx.business_id}")
|
|
logger.info(f"User is superadmin: {ctx.is_superadmin()}")
|
|
logger.info(f"User is business owner: {ctx.is_business_owner()}")
|
|
|
|
if business_id and not ctx.can_access_business(business_id):
|
|
logger.warning(f"User {ctx.get_user_id()} does not have access to business {business_id}")
|
|
raise ApiError("FORBIDDEN", f"No access to business {business_id}", http_status=403)
|
|
|
|
logger.info(f"User {ctx.get_user_id()} has access to business {business_id}")
|
|
return func(*args, **kwargs)
|
|
return wrapper
|
|
return decorator
|
|
|
|
|
|
# Decorator های ترکیبی برای استفاده آسان
|
|
def require_sales_write():
|
|
"""دسترسی نوشتن در بخش فروش"""
|
|
return require_any_permission("sales", "write")
|
|
|
|
|
|
def require_sales_delete():
|
|
"""دسترسی حذف در بخش فروش"""
|
|
return require_any_permission("sales", "delete")
|
|
|
|
|
|
def require_sales_approve():
|
|
"""دسترسی تأیید در بخش فروش"""
|
|
return require_any_permission("sales", "approve")
|
|
|
|
|
|
def require_purchases_write():
|
|
"""دسترسی نوشتن در بخش خرید"""
|
|
return require_any_permission("purchases", "write")
|
|
|
|
|
|
def require_accounting_write():
|
|
"""دسترسی نوشتن در بخش حسابداری"""
|
|
return require_any_permission("accounting", "write")
|
|
|
|
|
|
def require_inventory_write():
|
|
"""دسترسی نوشتن در بخش موجودی"""
|
|
return require_any_permission("inventory", "write")
|
|
|
|
|
|
def require_reports_export():
|
|
"""دسترسی صادرات گزارش"""
|
|
return require_any_permission("reports", "export")
|
|
|
|
|
|
def require_settings_manage_users():
|
|
"""دسترسی مدیریت کاربران کسب و کار"""
|
|
return require_any_permission("settings", "manage_users")
|
|
|
|
|
|
def require_user_management():
|
|
"""دسترسی مدیریت کاربران در سطح اپلیکیشن"""
|
|
return require_app_permission("user_management")
|
|
|
|
|
|
def require_business_management():
|
|
"""دسترسی مدیریت کسب و کارها"""
|
|
return require_app_permission("business_management")
|
|
|
|
|
|
def require_system_settings():
|
|
"""دسترسی تنظیمات سیستم"""
|
|
return require_app_permission("system_settings")
|
|
|
|
|
|
def require_permission(permission: str):
|
|
"""Decorator عمومی برای بررسی دسترسی - wrapper برای require_app_permission"""
|
|
return require_app_permission(permission)
|