hesabixArc/hesabixAPI/app/services/person_service.py
2025-10-27 18:47:45 +00:00

753 lines
30 KiB
Python

from typing import List, Optional, Dict, Any
import json
from sqlalchemy.exc import IntegrityError
from app.core.responses import ApiError
from sqlalchemy.orm import Session
from sqlalchemy import and_, or_, func
from adapters.db.models.person import Person, PersonBankAccount, PersonType
from adapters.db.models.document import Document
from adapters.db.models.document_line import DocumentLine
from adapters.api.v1.schema_models.person import (
PersonCreateRequest, PersonUpdateRequest, PersonBankAccountCreateRequest
)
from app.core.responses import success_response
def create_person(db: Session, business_id: int, person_data: PersonCreateRequest) -> Dict[str, Any]:
"""ایجاد شخص جدید"""
# محاسبه/اعتبارسنجی کد یکتا
code: Optional[int] = getattr(person_data, 'code', None)
if code is not None:
exists = db.query(Person).filter(
and_(Person.business_id == business_id, Person.code == code)
).first()
if exists:
raise ApiError("DUPLICATE_PERSON_CODE", "کد شخص تکراری است", http_status=400)
else:
# تولید خودکار کد: بیشینه فعلی + 1 (نسبت به همان کسب و کار)
max_code = db.query(func.max(Person.code)).filter(Person.business_id == business_id).scalar()
code = (max_code or 0) + 1
# آماده‌سازی person_types (چندانتخابی) و سازگاری person_type تکی
types_list: List[str] = []
if getattr(person_data, 'person_types', None):
types_list = [t.value if hasattr(t, 'value') else str(t) for t in person_data.person_types] # type: ignore[attr-defined]
elif getattr(person_data, 'person_type', None):
t = person_data.person_type
types_list = [t.value if hasattr(t, 'value') else str(t)]
# نوع تکی برای استفاده‌های بعدی (قبل از هر استفاده تعریف شود)
incoming_single_type = getattr(person_data, 'person_type', None)
# اعتبارسنجی سهام برای سهامدار
is_shareholder = False
if types_list:
is_shareholder = 'سهامدار' in types_list
if not is_shareholder and incoming_single_type is not None:
try:
is_shareholder = (getattr(incoming_single_type, 'value', str(incoming_single_type)) == 'سهامدار')
except Exception:
is_shareholder = False
if is_shareholder:
sc_val = getattr(person_data, 'share_count', None)
if sc_val is None or not isinstance(sc_val, int) or sc_val <= 0:
raise ApiError("INVALID_SHARE_COUNT", "برای سهامدار، تعداد سهام الزامی و باید بزرگتر از صفر باشد", http_status=400)
# ایجاد شخص
# نگاشت person_type دریافتی از اسکیما به Enum مدل
mapped_single_type = None
if incoming_single_type is not None:
try:
# incoming_single_type.value مقدار فارسی مانند "سهامدار"
mapped_single_type = PersonType(getattr(incoming_single_type, 'value', str(incoming_single_type)))
except Exception:
mapped_single_type = None
person = Person(
business_id=business_id,
code=code,
alias_name=person_data.alias_name,
first_name=person_data.first_name,
last_name=person_data.last_name,
# ذخیره مقدار Enum با مقدار فارسی (values_callable در مدل مقادیر فارسی را می‌نویسد)
person_types=json.dumps(types_list, ensure_ascii=False) if types_list else None,
company_name=person_data.company_name,
payment_id=person_data.payment_id,
national_id=person_data.national_id,
registration_number=person_data.registration_number,
economic_id=person_data.economic_id,
country=person_data.country,
province=person_data.province,
city=person_data.city,
address=person_data.address,
postal_code=person_data.postal_code,
phone=person_data.phone,
mobile=person_data.mobile,
fax=person_data.fax,
email=person_data.email,
website=person_data.website,
share_count=getattr(person_data, 'share_count', None),
commission_sale_percent=getattr(person_data, 'commission_sale_percent', None),
commission_sales_return_percent=getattr(person_data, 'commission_sales_return_percent', None),
commission_sales_amount=getattr(person_data, 'commission_sales_amount', None),
commission_sales_return_amount=getattr(person_data, 'commission_sales_return_amount', None),
commission_exclude_discounts=bool(getattr(person_data, 'commission_exclude_discounts', False)),
commission_exclude_additions_deductions=bool(getattr(person_data, 'commission_exclude_additions_deductions', False)),
commission_post_in_invoice_document=bool(getattr(person_data, 'commission_post_in_invoice_document', False)),
)
db.add(person)
db.flush() # برای دریافت ID
# ایجاد حساب‌های بانکی
if person_data.bank_accounts:
for bank_account_data in person_data.bank_accounts:
bank_account = PersonBankAccount(
person_id=person.id,
bank_name=bank_account_data.bank_name,
account_number=bank_account_data.account_number,
card_number=bank_account_data.card_number,
sheba_number=bank_account_data.sheba_number,
)
db.add(bank_account)
try:
db.commit()
except IntegrityError:
db.rollback()
raise ApiError("DUPLICATE_PERSON_CODE", "کد شخص تکراری است", http_status=400)
db.refresh(person)
return success_response(
message="شخص با موفقیت ایجاد شد",
data=_person_to_dict(person)
)
def get_person_by_id(db: Session, person_id: int, business_id: int) -> Optional[Dict[str, Any]]:
"""دریافت شخص بر اساس شناسه"""
person = db.query(Person).filter(
and_(Person.id == person_id, Person.business_id == business_id)
).first()
if not person:
return None
return _person_to_dict(person)
def get_persons_by_business(
db: Session,
business_id: int,
query_info: Dict[str, Any],
fiscal_year_id: Optional[int] = None
) -> Dict[str, Any]:
"""دریافت لیست اشخاص با جستجو و فیلتر"""
query = db.query(Person).filter(Person.business_id == business_id)
# بررسی نیاز به محاسبه تراز قبل از pagination
# (برای فیلتر یا مرتب‌سازی بر اساس تراز/وضعیت)
needs_balance_before_pagination = False
sort_by = query_info.get('sort_by', 'created_at')
if sort_by in ['balance', 'status']:
needs_balance_before_pagination = True
# بررسی فیلترها برای balance و status
if query_info.get('filters'):
for filter_item in query_info['filters']:
if isinstance(filter_item, dict):
field = filter_item.get('property')
else:
field = getattr(filter_item, 'property', None)
if field in ['balance', 'status']:
needs_balance_before_pagination = True
break
# اعمال جستجو
if query_info.get('search') and query_info.get('search_fields'):
search_term = f"%{query_info['search']}%"
search_conditions = []
for field in query_info['search_fields']:
if field == 'code':
# تبدیل به رشته برای جستجو مانند LIKE
try:
code_int = int(query_info['search']) # type: ignore[arg-type]
search_conditions.append(Person.code == code_int)
except Exception:
pass
if field == 'alias_name':
search_conditions.append(Person.alias_name.ilike(search_term))
elif field == 'first_name':
search_conditions.append(Person.first_name.ilike(search_term))
elif field == 'last_name':
search_conditions.append(Person.last_name.ilike(search_term))
elif field == 'company_name':
search_conditions.append(Person.company_name.ilike(search_term))
elif field == 'mobile':
search_conditions.append(Person.mobile.ilike(search_term))
elif field == 'email':
search_conditions.append(Person.email.ilike(search_term))
elif field == 'national_id':
search_conditions.append(Person.national_id.ilike(search_term))
if search_conditions:
query = query.filter(or_(*search_conditions))
# اعمال فیلترها
if query_info.get('filters'):
for filter_item in query_info['filters']:
# پشتیبانی از هر دو حالت: دیکشنری یا شیء Pydantic
if isinstance(filter_item, dict):
field = filter_item.get('property')
operator = filter_item.get('operator')
value = filter_item.get('value')
else:
field = getattr(filter_item, 'property', None)
operator = getattr(filter_item, 'operator', None)
value = getattr(filter_item, 'value', None)
if not field or not operator:
continue
# کد
if field == 'code':
if operator == '=':
query = query.filter(Person.code == value)
elif operator == 'in' and isinstance(value, list):
query = query.filter(Person.code.in_(value))
continue
# انواع شخص چندانتخابی (رشته JSON)
if field == 'person_types':
if operator == '=' and isinstance(value, str):
query = query.filter(Person.person_types.ilike(f'%"{value}"%'))
elif operator == 'in' and isinstance(value, list):
sub_filters = [Person.person_types.ilike(f'%"{v}"%') for v in value]
if sub_filters:
query = query.filter(or_(*sub_filters))
continue
# فیلترهای متنی عمومی (حمایت از عملگرهای contains/startsWith/endsWith)
def apply_text_filter(column):
nonlocal query
if operator == '=':
query = query.filter(column == value)
elif operator == 'like' or operator == '*':
query = query.filter(column.ilike(f"%{value}%"))
elif operator == '*?': # starts with
query = query.filter(column.ilike(f"{value}%"))
elif operator == '?*': # ends with
query = query.filter(column.ilike(f"%{value}"))
if field == 'country':
apply_text_filter(Person.country)
continue
if field == 'province':
apply_text_filter(Person.province)
continue
if field == 'alias_name':
apply_text_filter(Person.alias_name)
continue
if field == 'first_name':
apply_text_filter(Person.first_name)
continue
if field == 'last_name':
apply_text_filter(Person.last_name)
continue
if field == 'company_name':
apply_text_filter(Person.company_name)
continue
if field == 'mobile':
apply_text_filter(Person.mobile)
continue
if field == 'email':
apply_text_filter(Person.email)
continue
if field == 'national_id':
apply_text_filter(Person.national_id)
continue
if field == 'registration_number':
apply_text_filter(Person.registration_number)
continue
if field == 'economic_id':
apply_text_filter(Person.economic_id)
continue
if field == 'city':
apply_text_filter(Person.city)
continue
if field == 'address':
apply_text_filter(Person.address)
continue
# شمارش کل رکوردها
total = query.count()
# اعمال مرتب‌سازی (فقط برای فیلدهای دیتابیس)
sort_desc = query_info.get('sort_desc', True)
if sort_by not in ['balance', 'status']:
# مرتب‌سازی در دیتابیس
if sort_by == 'code':
query = query.order_by(Person.code.desc() if sort_desc else Person.code.asc())
elif sort_by == 'alias_name':
query = query.order_by(Person.alias_name.desc() if sort_desc else Person.alias_name.asc())
elif sort_by == 'first_name':
query = query.order_by(Person.first_name.desc() if sort_desc else Person.first_name.asc())
elif sort_by == 'last_name':
query = query.order_by(Person.last_name.desc() if sort_desc else Person.last_name.asc())
elif sort_by == 'created_at':
query = query.order_by(Person.created_at.desc() if sort_desc else Person.created_at.asc())
elif sort_by == 'updated_at':
query = query.order_by(Person.updated_at.desc() if sort_desc else Person.updated_at.asc())
else:
query = query.order_by(Person.created_at.desc())
skip = query_info.get('skip', 0)
take = query_info.get('take', 20)
# اگر نیاز به محاسبه تراز قبل از pagination است
if needs_balance_before_pagination:
# دریافت همه persons
all_persons = query.all()
# تبدیل به دیکشنری و محاسبه تراز
all_items = []
person_ids = [p.id for p in all_persons]
balances = calculate_persons_balances_bulk(db, person_ids, fiscal_year_id)
for person in all_persons:
item = _person_to_dict(person)
balance, status = balances.get(person.id, (0.0, "بدون تراکنش"))
item['balance'] = balance
item['status'] = status
all_items.append(item)
# اعمال فیلتر balance و status
if query_info.get('filters'):
for filter_item in query_info['filters']:
if isinstance(filter_item, dict):
field = filter_item.get('property')
operator = filter_item.get('operator')
value = filter_item.get('value')
else:
field = getattr(filter_item, 'property', None)
operator = getattr(filter_item, 'operator', None)
value = getattr(filter_item, 'value', None)
if field == 'balance':
if operator == '=':
all_items = [item for item in all_items if item['balance'] == value]
elif operator == '>':
all_items = [item for item in all_items if item['balance'] > value]
elif operator == '>=':
all_items = [item for item in all_items if item['balance'] >= value]
elif operator == '<':
all_items = [item for item in all_items if item['balance'] < value]
elif operator == '<=':
all_items = [item for item in all_items if item['balance'] <= value]
elif field == 'status':
if operator == '=' and isinstance(value, str):
all_items = [item for item in all_items if item['status'] == value]
elif operator == 'in' and isinstance(value, list):
all_items = [item for item in all_items if item['status'] in value]
# مرتب‌سازی
if sort_by == 'balance':
all_items.sort(key=lambda x: x['balance'], reverse=sort_desc)
elif sort_by == 'status':
all_items.sort(key=lambda x: x['status'], reverse=sort_desc)
# محاسبه total بعد از فیلتر
total = len(all_items)
# اعمال pagination
items = all_items[skip:skip + take]
else:
# روش معمولی: ابتدا pagination، سپس محاسبه تراز
persons = query.offset(skip).limit(take).all()
# تبدیل به دیکشنری
items = [_person_to_dict(person) for person in persons]
# محاسبه تراز برای persons فعلی
person_ids = [p.id for p in persons]
balances = calculate_persons_balances_bulk(db, person_ids, fiscal_year_id)
for item in items:
person_id = item['id']
balance, status = balances.get(person_id, (0.0, "بدون تراکنش"))
item['balance'] = balance
item['status'] = status
# محاسبه اطلاعات صفحه‌بندی
total_pages = (total + take - 1) // take if take > 0 else 0
current_page = (skip // take) + 1 if take > 0 else 1
pagination = {
'total': total,
'page': current_page,
'per_page': take,
'total_pages': total_pages,
'has_next': current_page < total_pages,
'has_prev': current_page > 1
}
return {
'items': items,
'pagination': pagination,
'query_info': query_info
}
def update_person(
db: Session,
person_id: int,
business_id: int,
person_data: PersonUpdateRequest
) -> Optional[Dict[str, Any]]:
"""ویرایش شخص"""
person = db.query(Person).filter(
and_(Person.id == person_id, Person.business_id == business_id)
).first()
if not person:
return None
# به‌روزرسانی فیلدها
update_data = person_data.dict(exclude_unset=True)
# مدیریت کد یکتا
if 'code' in update_data and update_data['code'] is not None:
desired_code = update_data['code']
exists = db.query(Person).filter(
and_(Person.business_id == business_id, Person.code == desired_code, Person.id != person_id)
).first()
if exists:
raise ValueError("کد شخص تکراری است")
person.code = desired_code
# مدیریت انواع شخص چندگانه
types_list: Optional[List[str]] = None
if 'person_types' in update_data and update_data['person_types'] is not None:
incoming = update_data['person_types'] or []
types_list = [t.value if hasattr(t, 'value') else str(t) for t in incoming]
person.person_types = json.dumps(types_list, ensure_ascii=False) if types_list else None
# همگام کردن person_type تکی برای سازگاری
# person_type handling removed - only person_types is used now
# اگر شخص سهامدار شد، share_count معتبر باشد
resulting_types: List[str] = []
if person.person_types:
try:
tmp = json.loads(person.person_types)
if isinstance(tmp, list):
resulting_types = [str(x) for x in tmp]
except Exception:
resulting_types = []
if 'سهامدار' in resulting_types:
sc_val2 = update_data.get('share_count', person.share_count)
if sc_val2 is None or (isinstance(sc_val2, int) and sc_val2 <= 0):
raise ApiError("INVALID_SHARE_COUNT", "برای سهامدار، تعداد سهام الزامی و باید بزرگتر از صفر باشد", http_status=400)
# سایر فیلدها
for field in list(update_data.keys()):
if field in {'code', 'person_types'}:
continue
setattr(person, field, update_data[field])
db.commit()
db.refresh(person)
return success_response(
message="شخص با موفقیت ویرایش شد",
data=_person_to_dict(person)
)
def delete_person(db: Session, person_id: int, business_id: int) -> bool:
"""حذف شخص"""
person = db.query(Person).filter(
and_(Person.id == person_id, Person.business_id == business_id)
).first()
if not person:
return False
db.delete(person)
db.commit()
return True
def get_person_summary(db: Session, business_id: int) -> Dict[str, Any]:
"""دریافت خلاصه اشخاص"""
# تعداد کل اشخاص
total_persons = db.query(Person).filter(Person.business_id == business_id).count()
# حذف مفهوم فعال/غیرفعال
active_persons = 0
inactive_persons = total_persons
# تعداد بر اساس نوع
by_type = {}
for person_type in PersonType:
count = db.query(Person).filter(
and_(Person.business_id == business_id, Person.person_types.ilike(f'%"{person_type.value}"%'))
).count()
by_type[person_type.value] = count
return {
'total_persons': total_persons,
'by_type': by_type,
'active_persons': active_persons,
'inactive_persons': inactive_persons
}
def _person_to_dict(person: Person) -> Dict[str, Any]:
"""تبدیل مدل Person به دیکشنری"""
# Parse person_types JSON to list
types_list: List[str] = []
if person.person_types:
try:
types = json.loads(person.person_types)
if isinstance(types, list):
types_list = [str(x) for x in types]
except Exception:
types_list = []
return {
'id': person.id,
'business_id': person.business_id,
'code': person.code,
'alias_name': person.alias_name,
'first_name': person.first_name,
'last_name': person.last_name,
'person_types': types_list,
'company_name': person.company_name,
'payment_id': person.payment_id,
'share_count': person.share_count,
'commission_sale_percent': float(person.commission_sale_percent) if getattr(person, 'commission_sale_percent', None) is not None else None,
'commission_sales_return_percent': float(person.commission_sales_return_percent) if getattr(person, 'commission_sales_return_percent', None) is not None else None,
'commission_sales_amount': float(person.commission_sales_amount) if getattr(person, 'commission_sales_amount', None) is not None else None,
'commission_sales_return_amount': float(person.commission_sales_return_amount) if getattr(person, 'commission_sales_return_amount', None) is not None else None,
'commission_exclude_discounts': bool(person.commission_exclude_discounts),
'commission_exclude_additions_deductions': bool(person.commission_exclude_additions_deductions),
'commission_post_in_invoice_document': bool(person.commission_post_in_invoice_document),
'national_id': person.national_id,
'registration_number': person.registration_number,
'economic_id': person.economic_id,
'country': person.country,
'province': person.province,
'city': person.city,
'address': person.address,
'postal_code': person.postal_code,
'phone': person.phone,
'mobile': person.mobile,
'fax': person.fax,
'email': person.email,
'website': person.website,
'created_at': person.created_at.isoformat(),
'updated_at': person.updated_at.isoformat(),
'bank_accounts': [
{
'id': ba.id,
'person_id': ba.person_id,
'bank_name': ba.bank_name,
'account_number': ba.account_number,
'card_number': ba.card_number,
'sheba_number': ba.sheba_number,
'created_at': ba.created_at.isoformat(),
'updated_at': ba.updated_at.isoformat(),
}
for ba in person.bank_accounts
]
}
def search_persons(db: Session, business_id: int, search_query: Optional[str] = None,
page: int = 1, limit: int = 20) -> List[Person]:
"""جست‌وجو در اشخاص"""
query = db.query(Person).filter(Person.business_id == business_id)
if search_query:
# جست‌وجو در نام، نام خانوادگی، نام مستعار، کد، تلفن و ایمیل
search_filter = or_(
Person.alias_name.ilike(f"%{search_query}%"),
Person.first_name.ilike(f"%{search_query}%"),
Person.last_name.ilike(f"%{search_query}%"),
Person.company_name.ilike(f"%{search_query}%"),
Person.phone.ilike(f"%{search_query}%"),
Person.mobile.ilike(f"%{search_query}%"),
Person.email.ilike(f"%{search_query}%"),
Person.code == int(search_query) if search_query.isdigit() else False
)
query = query.filter(search_filter)
# مرتب‌سازی بر اساس نام مستعار
query = query.order_by(Person.alias_name)
# صفحه‌بندی
offset = (page - 1) * limit
return query.offset(offset).limit(limit).all()
def count_persons(db: Session, business_id: int, search_query: Optional[str] = None) -> int:
"""شمارش تعداد اشخاص"""
query = db.query(Person).filter(Person.business_id == business_id)
if search_query:
# جست‌وجو در نام، نام خانوادگی، نام مستعار، کد، تلفن و ایمیل
search_filter = or_(
Person.alias_name.ilike(f"%{search_query}%"),
Person.first_name.ilike(f"%{search_query}%"),
Person.last_name.ilike(f"%{search_query}%"),
Person.company_name.ilike(f"%{search_query}%"),
Person.phone.ilike(f"%{search_query}%"),
Person.mobile.ilike(f"%{search_query}%"),
Person.email.ilike(f"%{search_query}%"),
Person.code == int(search_query) if search_query.isdigit() else False
)
query = query.filter(search_filter)
return query.count()
def calculate_person_balance(
db: Session,
person_id: int,
fiscal_year_id: Optional[int] = None
) -> tuple[float, str]:
"""
محاسبه تراز و وضعیت مالی یک شخص
Args:
db: نشست پایگاه داده
person_id: شناسه شخص
fiscal_year_id: شناسه سال مالی (اختیاری)
Returns:
tuple: (تراز, وضعیت)
- تراز: credit - debit
- وضعیت: "بستانکار" | "بدهکار" | "بالانس" | "بدون تراکنش"
"""
# Query برای محاسبه مجموع بستانکار و بدهکار
query = db.query(
func.coalesce(func.sum(DocumentLine.credit), 0).label('total_credit'),
func.coalesce(func.sum(DocumentLine.debit), 0).label('total_debit')
).join(
Document, DocumentLine.document_id == Document.id
).filter(
DocumentLine.person_id == person_id,
Document.is_proforma == False # فقط اسناد قطعی
)
# اعمال فیلتر سال مالی
if fiscal_year_id:
query = query.filter(Document.fiscal_year_id == fiscal_year_id)
result = query.first()
if result is None:
return 0.0, "بدون تراکنش"
total_credit = float(result.total_credit or 0)
total_debit = float(result.total_debit or 0)
# محاسبه تراز: بستانکار - بدهکار
balance = total_credit - total_debit
# تعیین وضعیت
if total_credit == 0 and total_debit == 0:
status = "بدون تراکنش"
elif balance > 0:
status = "بستانکار"
elif balance < 0:
status = "بدهکار"
else: # balance == 0
status = "بالانس"
return balance, status
def calculate_persons_balances_bulk(
db: Session,
person_ids: List[int],
fiscal_year_id: Optional[int] = None
) -> Dict[int, tuple[float, str]]:
"""
محاسبه تراز و وضعیت چندین شخص به صورت دسته‌جمعی
Args:
db: نشست پایگاه داده
person_ids: لیست شناسه‌های اشخاص
fiscal_year_id: شناسه سال مالی (اختیاری)
Returns:
dict: {person_id: (balance, status)}
"""
if not person_ids:
return {}
# Query برای محاسبه مجموع بستانکار و بدهکار برای هر شخص
query = db.query(
DocumentLine.person_id,
func.coalesce(func.sum(DocumentLine.credit), 0).label('total_credit'),
func.coalesce(func.sum(DocumentLine.debit), 0).label('total_debit')
).join(
Document, DocumentLine.document_id == Document.id
).filter(
DocumentLine.person_id.in_(person_ids),
Document.is_proforma == False # فقط اسناد قطعی
)
# اعمال فیلتر سال مالی
if fiscal_year_id:
query = query.filter(Document.fiscal_year_id == fiscal_year_id)
# Group by person_id
query = query.group_by(DocumentLine.person_id)
results = query.all()
# ساخت دیکشنری نتایج
balances: Dict[int, tuple[float, str]] = {}
# ابتدا همه را به "بدون تراکنش" تنظیم می‌کنیم
for person_id in person_ids:
balances[person_id] = (0.0, "بدون تراکنش")
# سپس نتایج واقعی را اعمال می‌کنیم
for result in results:
person_id = result.person_id
total_credit = float(result.total_credit or 0)
total_debit = float(result.total_debit or 0)
# محاسبه تراز
balance = total_credit - total_debit
# تعیین وضعیت
if balance > 0:
status = "بستانکار"
elif balance < 0:
status = "بدهکار"
else: # balance == 0
status = "بالانس"
balances[person_id] = (balance, status)
return balances