hesabixArc/hesabixAPI/app/services/wallet_service.py

675 lines
24 KiB
Python

from __future__ import annotations
from typing import Optional, Dict, Any, List
from decimal import Decimal
from sqlalchemy.orm import Session
from sqlalchemy import select, and_
from adapters.db.models.wallet import WalletAccount, WalletTransaction, WalletPayout, WalletSetting
from adapters.db.models.bank_account import BankAccount
from adapters.db.models.business import Business
from adapters.db.models.document import Document
from adapters.db.models.document_line import DocumentLine
from adapters.db.models.account import Account
from adapters.db.models.fiscal_year import FiscalYear
from app.core.responses import ApiError
from app.services.system_settings_service import get_wallet_settings
from datetime import datetime, date
def _ensure_wallet_account(db: Session, business_id: int) -> WalletAccount:
obj = db.execute(
select(WalletAccount).where(WalletAccount.business_id == int(business_id))
).scalars().first()
if obj:
return obj
obj = WalletAccount(
business_id=int(business_id),
available_balance=Decimal("0"),
pending_balance=Decimal("0"),
status="active",
)
db.add(obj)
db.flush()
return obj
def _get_wallet_account_for_update(db: Session, business_id: int) -> WalletAccount:
"""
قفل ردیفی روی حساب کیف‌پول برای جلوگیری از رقابت در به‌روزرسانی مانده‌ها
"""
acc = (
db.query(WalletAccount)
.filter(WalletAccount.business_id == int(business_id))
.with_for_update()
.first()
)
if acc:
return acc
# اگر وجود ندارد، ایجاد سپس تلاش مجدد برای قفل
acc = _ensure_wallet_account(db, business_id)
db.flush()
try:
acc = (
db.query(WalletAccount)
.filter(WalletAccount.business_id == int(business_id))
.with_for_update()
.first()
) or acc
except Exception:
pass
return acc
def get_wallet_overview(db: Session, business_id: int) -> Dict[str, Any]:
_ = db.query(Business).filter(Business.id == int(business_id)).first() or None
if _ is None:
raise ApiError("BUSINESS_NOT_FOUND", "کسب‌وکار یافت نشد", http_status=404)
account = _ensure_wallet_account(db, business_id)
settings = get_wallet_settings(db)
return {
"business_id": business_id,
"available_balance": float(account.available_balance or 0),
"pending_balance": float(account.pending_balance or 0),
"status": account.status,
"base_currency_code": settings.get("wallet_base_currency_code"),
"base_currency_id": settings.get("wallet_base_currency_id"),
}
def list_wallet_transactions(
db: Session,
business_id: int,
limit: int = 50,
skip: int = 0,
from_date: Optional[datetime] = None,
to_date: Optional[datetime] = None,
) -> List[Dict[str, Any]]:
q = (
db.query(WalletTransaction)
.filter(WalletTransaction.business_id == int(business_id))
.order_by(WalletTransaction.id.desc())
)
if from_date is not None:
q = q.filter(WalletTransaction.created_at >= from_date)
if to_date is not None:
q = q.filter(WalletTransaction.created_at <= to_date)
items = q.offset(max(0, int(skip))).limit(max(1, min(200, int(limit)))).all()
return [
{
"id": it.id,
"type": it.type,
"status": it.status,
"amount": float(it.amount or 0),
"fee_amount": float(it.fee_amount or 0) if it.fee_amount is not None else None,
"description": it.description,
"external_ref": it.external_ref,
"document_id": it.document_id,
"created_at": it.created_at,
"updated_at": it.updated_at,
}
for it in items
]
def get_wallet_metrics(
db: Session,
business_id: int,
from_date: Optional[datetime] = None,
to_date: Optional[datetime] = None,
) -> Dict[str, Any]:
account = _ensure_wallet_account(db, business_id)
# پایه: مجموع‌ها از WalletTransaction
q = db.query(WalletTransaction).filter(WalletTransaction.business_id == int(business_id))
if from_date is not None:
q = q.filter(WalletTransaction.created_at >= from_date)
if to_date is not None:
q = q.filter(WalletTransaction.created_at <= to_date)
transactions = q.all()
gross_in = Decimal("0")
fees_in = Decimal("0")
gross_out = Decimal("0")
fees_out = Decimal("0")
for tx in transactions:
amt = Decimal(str(tx.amount or 0))
fee = Decimal(str(tx.fee_amount or 0))
t = (tx.type or "").lower()
st = (tx.status or "").lower()
if st not in ("succeeded", "pending", "approved", "processing"): # موفق/در جریان را در گزارش لحاظ می‌کنیم
continue
if t in ("top_up", "customer_payment"):
gross_in += amt
fees_in += fee if fee > 0 else Decimal("0")
elif t in ("payout_settlement", "refund"):
gross_out += amt
fees_out += fee if fee > 0 else Decimal("0")
# سایر انواع در صورت نیاز بعداً اضافه شوند
# همچنین از wallet_payouts برای کارمزدهای تسویه استفاده کنیم
pq = db.query(WalletPayout).filter(WalletPayout.business_id == int(business_id))
if from_date is not None:
pq = pq.filter(WalletPayout.created_at >= from_date)
if to_date is not None:
pq = pq.filter(WalletPayout.created_at <= to_date)
for p in pq.all():
fees_out += Decimal(str(p.fees or 0))
net_in = gross_in - fees_in
net_out = gross_out + fees_out # خروجی خالصی که از کیف‌پول خارج می‌شود
return {
"period": {
"from": from_date,
"to": to_date,
},
"totals": {
"gross_in": float(gross_in),
"fees_in": float(fees_in),
"net_in": float(net_in),
"gross_out": float(gross_out),
"fees_out": float(fees_out),
"net_out": float(net_out),
},
"balances": {
"available": float(account.available_balance or 0),
"pending": float(account.pending_balance or 0),
},
}
def create_payout_request(
db: Session,
business_id: int,
user_id: int,
payload: Dict[str, Any],
) -> Dict[str, Any]:
amount = Decimal(str(payload.get("amount") or 0))
if amount <= 0:
raise ApiError("INVALID_AMOUNT", "مبلغ نامعتبر است", http_status=400)
bank_account_id = payload.get("bank_account_id")
if not bank_account_id:
raise ApiError("BANK_ACCOUNT_REQUIRED", "شناسه حساب بانکی الزامی است", http_status=400)
bank_acc = db.query(BankAccount).filter(BankAccount.id == int(bank_account_id)).first()
if not bank_acc:
raise ApiError("BANK_ACCOUNT_NOT_FOUND", "حساب بانکی یافت نشد", http_status=404)
if not bank_acc.is_active:
raise ApiError("BANK_ACCOUNT_INACTIVE", "حساب بانکی غیرفعال است", http_status=400)
account = _get_wallet_account_for_update(db, business_id)
available = Decimal(str(account.available_balance or 0))
if amount > available:
raise ApiError("INSUFFICIENT_FUNDS", "موجودی کافی نیست", http_status=400)
# قفل مبلغ: کسر از مانده قابل برداشت
account.available_balance = available - amount
db.flush()
payout = WalletPayout(
business_id=int(business_id),
bank_account_id=int(bank_account_id),
gross_amount=amount,
fees=Decimal("0"),
net_amount=amount,
status="requested",
schedule_type=str(payload.get("schedule_type") or "manual"),
external_ref=None,
)
db.add(payout)
db.flush()
# ثبت تراکنش کنترلی
tx = WalletTransaction(
business_id=int(business_id),
type="payout_request",
status="pending",
amount=amount,
fee_amount=Decimal("0"),
description=str(payload.get("description") or "درخواست تسویه"),
external_ref=str(payout.id),
document_id=None,
)
db.add(tx)
db.flush()
return {
"id": payout.id,
"status": payout.status,
"gross_amount": float(payout.gross_amount),
"net_amount": float(payout.net_amount),
"bank_account_id": payout.bank_account_id,
}
def approve_payout_request(db: Session, payout_id: int, approver_user_id: int) -> Dict[str, Any]:
payout = db.query(WalletPayout).filter(WalletPayout.id == int(payout_id)).first()
if not payout:
raise ApiError("PAYOUT_NOT_FOUND", "درخواست تسویه یافت نشد", http_status=404)
if payout.status != "requested":
raise ApiError("INVALID_STATE", "تنها درخواست‌های در وضعیت requested قابل تایید هستند", http_status=400)
payout.status = "approved"
db.flush()
return {"id": payout.id, "status": payout.status}
def cancel_payout_request(db: Session, payout_id: int, canceller_user_id: int) -> Dict[str, Any]:
payout = db.query(WalletPayout).filter(WalletPayout.id == int(payout_id)).first()
if not payout:
raise ApiError("PAYOUT_NOT_FOUND", "درخواست تسویه یافت نشد", http_status=404)
if payout.status not in ("requested", "approved"):
raise ApiError("INVALID_STATE", "فقط درخواست‌های requested/approved قابل لغو هستند", http_status=400)
# بازگردانی مبلغ به مانده قابل برداشت
account = _get_wallet_account_for_update(db, payout.business_id)
account.available_balance = Decimal(str(account.available_balance or 0)) + Decimal(str(payout.gross_amount or 0))
db.flush()
payout.status = "canceled"
db.flush()
return {"id": payout.id, "status": payout.status}
def settle_payout(db: Session, payout_id: int, user_id: int) -> Dict[str, Any]:
payout = db.query(WalletPayout).filter(WalletPayout.id == int(payout_id)).first()
if not payout:
raise ApiError("PAYOUT_NOT_FOUND", "درخواست تسویه یافت نشد", http_status=404)
if payout.status not in ("approved", "processing"):
raise ApiError("INVALID_STATE", "تسویه تنها پس از تایید/در حال پردازش مجاز است", http_status=400)
# ایجاد سند پرداخت برای خالص دریافتی بانک
try:
doc_id = _post_payout_document(
db,
business_id=int(payout.business_id),
user_id=int(user_id),
net_amount=Decimal(str(payout.net_amount or 0)),
fee_amount=Decimal(str(payout.fees or 0)),
)
except Exception:
doc_id = None
payout.status = "settled"
db.flush()
# ثبت تراکنش کیف‌پول برای گزارش‌ها
try:
tx = WalletTransaction(
business_id=int(payout.business_id),
type="payout_settlement",
status="succeeded",
amount=Decimal(str(payout.net_amount or 0)),
fee_amount=Decimal(str(payout.fees or 0)),
description="تسویه کیف‌پول",
document_id=doc_id,
)
db.add(tx)
db.flush()
except Exception:
pass
return {"id": payout.id, "status": payout.status, "document_id": doc_id}
def get_business_wallet_settings(db: Session, business_id: int) -> Dict[str, Any]:
obj = db.query(WalletSetting).filter(WalletSetting.business_id == int(business_id)).first()
if not obj:
return {
"business_id": business_id,
"mode": "manual",
"frequency": None,
"threshold_amount": None,
"min_reserve": None,
"default_bank_account_id": None,
}
return {
"business_id": business_id,
"mode": obj.mode,
"frequency": obj.frequency,
"threshold_amount": float(obj.threshold_amount) if obj.threshold_amount is not None else None,
"min_reserve": float(obj.min_reserve) if obj.min_reserve is not None else None,
"default_bank_account_id": obj.default_bank_account_id,
}
def update_business_wallet_settings(db: Session, business_id: int, payload: Dict[str, Any]) -> Dict[str, Any]:
obj = db.query(WalletSetting).filter(WalletSetting.business_id == int(business_id)).first()
if not obj:
obj = WalletSetting(business_id=int(business_id))
db.add(obj)
mode = str(payload.get("mode") or obj.mode or "manual")
frequency = payload.get("frequency") if payload.get("frequency") in (None, "daily", "weekly") else obj.frequency
def _dec(v):
return Decimal(str(v)) if v is not None and str(v).strip() != "" else None
obj.mode = mode
obj.frequency = frequency
obj.threshold_amount = _dec(payload.get("threshold_amount"))
obj.min_reserve = _dec(payload.get("min_reserve"))
obj.default_bank_account_id = int(payload.get("default_bank_account_id")) if payload.get("default_bank_account_id") else None
db.flush()
return get_business_wallet_settings(db, business_id)
def run_auto_settlement(db: Session, business_id: int, user_id: int) -> Dict[str, Any]:
"""
منطق ساده: اگر mode=auto و (available - min_reserve) >= threshold آنگاه به حساب پیش‌فرض تسویه کن.
"""
settings = get_business_wallet_settings(db, business_id)
if (settings.get("mode") or "manual") != "auto":
return {"executed": False, "reason": "AUTO_MODE_DISABLED"}
threshold = Decimal(str(settings.get("threshold_amount") or 0))
min_reserve = Decimal(str(settings.get("min_reserve") or 0))
default_bank_account_id = settings.get("default_bank_account_id")
if not default_bank_account_id:
return {"executed": False, "reason": "NO_DEFAULT_BANK_ACCOUNT"}
account = _get_wallet_account_for_update(db, business_id)
available = Decimal(str(account.available_balance or 0))
cand = available - min_reserve
if cand <= 0 or cand < threshold:
return {"executed": False, "reason": "THRESHOLD_NOT_MET", "available": float(available)}
# ایجاد payout و تسویه
payload = {
"bank_account_id": int(default_bank_account_id),
"amount": float(cand),
"description": "تسویه خودکار",
}
pr = create_payout_request(db, business_id, user_id, payload)
pa = db.query(WalletPayout).filter(WalletPayout.id == int(pr["id"])).first()
# تایید و تسویه
approve_payout_request(db, pa.id, user_id)
result = settle_payout(db, pa.id, user_id)
return {"executed": True, "payout": result}
def create_top_up_request(db: Session, business_id: int, user_id: int, payload: Dict[str, Any]) -> Dict[str, Any]:
"""
ایجاد درخواست افزایش اعتبار (در انتظار تایید درگاه)
- مانده pending افزایش می‌یابد تا پس از تایید به available منتقل شود
"""
amount = Decimal(str(payload.get("amount") or 0))
if amount <= 0:
raise ApiError("INVALID_AMOUNT", "مبلغ نامعتبر است", http_status=400)
gateway_id = payload.get("gateway_id")
if not gateway_id:
# اجازه می‌دهیم بدون gateway_id نیز ساخته شود، اما برای پرداخت آنلاین لازم است
pass
account = _get_wallet_account_for_update(db, business_id)
account.pending_balance = Decimal(str(account.pending_balance or 0)) + amount
db.flush()
tx = WalletTransaction(
business_id=int(business_id),
type="top_up",
status="pending",
amount=amount,
fee_amount=Decimal("0"),
description=str(payload.get("description") or "افزایش اعتبار"),
external_ref=None,
document_id=None,
)
db.add(tx)
db.flush()
# تولید لینک درگاه پرداخت (در صورت ارسال gateway_id)
payment_url = None
if gateway_id:
try:
from app.services.payment_service import initiate_payment
init_res = initiate_payment(
db=db,
business_id=int(business_id),
tx_id=int(tx.id),
amount=float(amount),
gateway_id=int(gateway_id),
)
payment_url = init_res.payment_url
except Exception as ex:
# اگر ایجاد لینک شکست بخورد، مانده pending به حالت قبل برگردد و تراکنش failed شود
try:
account.pending_balance = Decimal(str(account.pending_balance or 0)) - amount
if account.pending_balance < 0:
account.pending_balance = Decimal("0")
tx.status = "failed"
db.flush()
finally:
import structlog
logger = structlog.get_logger()
logger.warning("gateway_initiate_failed", error=str(ex))
return {"transaction_id": tx.id, "status": tx.status, **({"payment_url": payment_url} if payment_url else {})}
def confirm_top_up(db: Session, tx_id: int, success: bool, external_ref: str | None = None) -> Dict[str, Any]:
"""
تایید/لغو top-up از وبهوک درگاه
- در موفقیت: انتقال از pending به available
- در عدم موفقیت: کاهش از pending
"""
tx = db.query(WalletTransaction).filter(WalletTransaction.id == int(tx_id)).first()
if not tx or tx.type != "top_up":
raise ApiError("TX_NOT_FOUND", "تراکنش افزایش اعتبار یافت نشد", http_status=404)
# Idempotency guard: if already finalized, do nothing
if (tx.status or "").lower() in ("succeeded", "failed"):
tx.external_ref = external_ref or tx.external_ref
db.flush()
return {"transaction_id": tx.id, "status": tx.status}
account = _get_wallet_account_for_update(db, tx.business_id)
if success:
# move pending -> available
gross = Decimal(str(tx.amount or 0))
fee = Decimal(str(tx.fee_amount or 0))
if fee < 0:
fee = Decimal("0")
if fee > gross:
fee = gross
net = gross - fee
# Prevent negative pending due to duplicate webhook/callback
current_pending = Decimal(str(account.pending_balance or 0))
account.pending_balance = current_pending - gross if current_pending >= gross else Decimal("0")
account.available_balance = Decimal(str(account.available_balance or 0)) + net
tx.status = "succeeded"
# create accounting document
try:
doc_id = _post_topup_document(db, tx.business_id, user_id=0, amount=gross, fee_amount=fee)
tx.document_id = int(doc_id)
except Exception:
# اگر سند ایجاد نشد، تراکنش مالی معتبر است اما سند ندارد
pass
else:
# rollback pending
current_pending = Decimal(str(account.pending_balance or 0))
dec_amt = Decimal(str(tx.amount or 0))
account.pending_balance = current_pending - dec_amt if current_pending >= dec_amt else Decimal("0")
tx.status = "failed"
tx.external_ref = external_ref
db.flush()
return {"transaction_id": tx.id, "status": tx.status}
def refund_transaction(db: Session, tx_id: int, amount: Decimal | None = None, reason: str | None = None) -> Dict[str, Any]:
"""
استرداد تراکنش موفق (بازگشت وجه از کیف‌پول)
- کاهش از available به میزان مبلغ استرداد
"""
src = db.query(WalletTransaction).filter(WalletTransaction.id == int(tx_id)).first()
if not src or src.status != "succeeded":
raise ApiError("TX_NOT_REFUNDABLE", "تراکنش موفق برای استرداد پیدا نشد", http_status=400)
refund_amount = Decimal(str(amount if amount is not None else src.amount or 0))
if refund_amount <= 0 or refund_amount > Decimal(str(src.amount or 0)):
raise ApiError("INVALID_REFUND_AMOUNT", "مبلغ استرداد نامعتبر است", http_status=400)
account = _ensure_wallet_account(db, src.business_id)
available = Decimal(str(account.available_balance or 0))
if refund_amount > available:
raise ApiError("INSUFFICIENT_FUNDS", "موجودی کافی برای استرداد نیست", http_status=400)
account.available_balance = float(available - refund_amount)
db.flush()
tx = WalletTransaction(
business_id=int(src.business_id),
type="refund",
status="succeeded",
amount=refund_amount,
description=reason or f"استرداد تراکنش {src.id}",
external_ref=None,
document_id=None,
)
db.add(tx)
db.flush()
return {"refund_transaction_id": tx.id, "status": tx.status}
def _parse_iso_date_only(dt: str | datetime | date) -> date:
try:
if isinstance(dt, date) and not isinstance(dt, datetime):
return dt
if isinstance(dt, datetime):
return dt.date()
return datetime.fromisoformat(str(dt)).date()
except Exception:
return datetime.utcnow().date()
def _get_current_fiscal_year(db: Session, business_id: int) -> FiscalYear:
fy = (
db.query(FiscalYear)
.filter(
and_(
FiscalYear.business_id == int(business_id),
FiscalYear.is_last == True, # noqa: E712
)
)
.first()
)
if not fy:
raise ApiError("FISCAL_YEAR_NOT_FOUND", "سال مالی جاری یافت نشد", http_status=400)
return fy
def _get_fixed_account_by_code(db: Session, account_code: str) -> Account:
acc = db.query(Account).filter(
and_(Account.business_id == None, Account.code == str(account_code)) # noqa: E711
).first()
if not acc:
raise ApiError("ACCOUNT_NOT_FOUND", f"Account with code {account_code} not found", http_status=500)
return acc
def _resolve_wallet_currency_id(db: Session) -> int:
settings = get_wallet_settings(db)
cid = settings.get("wallet_base_currency_id")
if cid:
return int(cid)
# fallback: resolve by code IRR
from adapters.db.models.currency import Currency
cur = db.query(Currency).filter(Currency.code == "IRR").first()
if not cur:
raise ApiError("CURRENCY_NOT_FOUND", "ارز پایه کیف‌پول یافت نشد", http_status=400)
return int(cur.id)
def _create_simple_document(
db: Session,
business_id: int,
user_id: int,
document_type: str, # 'receipt' | 'payment'
currency_id: int,
document_date: date,
description: str | None,
accounting_lines: list[dict],
) -> Document:
fiscal_year = _get_current_fiscal_year(db, business_id)
today = _parse_iso_date_only(document_date)
prefix = f"{'RC' if document_type == 'receipt' else 'PY'}-{today.strftime('%Y%m%d')}"
last_doc = (
db.query(Document)
.filter(
and_(
Document.business_id == business_id,
Document.code.like(f"{prefix}-%"),
)
)
.order_by(Document.code.desc())
.first()
)
if last_doc:
try:
last_num = int(str(last_doc.code).split("-")[-1])
next_num = last_num + 1
except Exception:
next_num = 1
else:
next_num = 1
doc_code = f"{prefix}-{next_num:04d}"
document = Document(
business_id=business_id,
fiscal_year_id=fiscal_year.id,
code=doc_code,
document_type=document_type,
document_date=today,
currency_id=int(currency_id),
created_by_user_id=user_id,
registered_at=datetime.utcnow(),
is_proforma=False,
description=description,
extra_info={"source": "wallet"},
)
db.add(document)
db.flush()
for ln in accounting_lines:
db.add(DocumentLine(
document_id=document.id,
account_id=int(ln["account_id"]),
debit=Decimal(str(ln.get("debit", 0) or 0)),
credit=Decimal(str(ln.get("credit", 0) or 0)),
description=ln.get("description"),
))
db.flush()
return document
def _post_topup_document(db: Session, business_id: int, user_id: int, amount: Decimal, fee_amount: Decimal | None = None, doc_date: date | None = None) -> int:
currency_id = _resolve_wallet_currency_id(db)
wallet_acc = _get_fixed_account_by_code(db, "10204")
bank_acc = _get_fixed_account_by_code(db, "10203")
fee_amt = Decimal(str(fee_amount or 0))
net = amount - fee_amt if amount >= fee_amt else Decimal("0")
lines = [
# Receipt pattern with commission (per existing commission logic):
# Dr 10204 (wallet) = net, Dr 70902 (fee expense) = fee, Cr 10203 (bank) = gross
{"account_id": wallet_acc.id, "debit": net, "credit": 0, "description": "افزایش اعتبار (خالص)"},
]
if fee_amt > 0:
commission_expense = _get_fixed_account_by_code(db, "70902")
lines.append({"account_id": commission_expense.id, "debit": fee_amt, "credit": 0, "description": "کارمزد درگاه"})
lines.append({"account_id": bank_acc.id, "debit": 0, "credit": amount, "description": "واریز از درگاه/بانک (ناخالص)"})
document = _create_simple_document(
db=db,
business_id=business_id,
user_id=user_id,
document_type="receipt",
currency_id=currency_id,
document_date=doc_date or datetime.utcnow().date(),
description="افزایش اعتبار کیف‌پول",
accounting_lines=lines,
)
return int(document.id)
def _post_payout_document(db: Session, business_id: int, user_id: int, net_amount: Decimal, fee_amount: Decimal | None = None, doc_date: date | None = None) -> int:
currency_id = _resolve_wallet_currency_id(db)
wallet_acc = _get_fixed_account_by_code(db, "10204")
bank_acc = _get_fixed_account_by_code(db, "10203")
fee_amt = Decimal(str(fee_amount or 0))
# Per existing commission logic for Payment: Dr bank = fee, Cr 70902 = fee
lines = [
{"account_id": bank_acc.id, "debit": net_amount, "credit": 0, "description": "وصول تسویه کیف‌پول (خالص)"},
{"account_id": wallet_acc.id, "debit": 0, "credit": net_amount, "description": "انتقال از کیف‌پول"},
]
if fee_amt > 0:
commission_expense = _get_fixed_account_by_code(db, "70902")
lines.append({"account_id": bank_acc.id, "debit": fee_amt, "credit": 0, "description": "کارمزد تسویه (الگوی پرداخت)"})
lines.append({"account_id": commission_expense.id, "debit": 0, "credit": fee_amt, "description": "کارمزد خدمات بانکی"})
document = _create_simple_document(
db=db,
business_id=business_id,
user_id=user_id,
document_type="payment",
currency_id=currency_id,
document_date=doc_date or datetime.utcnow().date(),
description="تسویه کیف‌پول به حساب بانکی",
accounting_lines=lines,
)
return int(document.id)