account-closing/account_cutoff_base/models/account_cutoff.py

435 lines
15 KiB
Python
Raw Normal View History

2024-05-03 08:29:54 +00:00
# Copyright 2013-2021 Akretion (http://www.akretion.com/)
# @author: Alexis de Lattre <alexis.delattre@akretion.com>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import json
from collections import defaultdict
from dateutil.relativedelta import relativedelta
from flectra import _, api, fields, models
from flectra.exceptions import UserError
from flectra.tools import date_utils, float_is_zero
from flectra.tools.misc import format_date
class AccountCutoff(models.Model):
_name = "account.cutoff"
_rec_name = "cutoff_date"
_order = "cutoff_date desc"
_inherit = ["mail.thread", "mail.activity.mixin"]
_check_company_auto = True
_description = "Account Cut-off"
@api.depends("line_ids", "line_ids.cutoff_amount")
def _compute_total_cutoff(self):
rg_res = self.env["account.cutoff.line"].read_group(
[("parent_id", "in", self.ids)],
["parent_id", "cutoff_amount"],
["parent_id"],
)
mapped_data = {x["parent_id"][0]: x["cutoff_amount"] for x in rg_res}
for cutoff in self:
cutoff.total_cutoff_amount = mapped_data.get(cutoff.id, 0)
@property
def cutoff_type_label_map(self):
return {
"accrued_expense": _("Accrued Expense"),
"accrued_revenue": _("Accrued Revenue"),
"prepaid_revenue": _("Prepaid Revenue"),
"prepaid_expense": _("Prepaid Expense"),
}
@api.model
def _default_move_ref(self):
cutoff_type = self.env.context.get("default_cutoff_type")
ref = self.cutoff_type_label_map.get(cutoff_type, "")
return ref
@api.model
def _default_cutoff_date(self):
today = fields.Date.context_today(self)
company = self.env.company
date_from, date_to = date_utils.get_fiscal_year(
today,
day=company.fiscalyear_last_day,
month=int(company.fiscalyear_last_month),
)
if date_from:
return date_from - relativedelta(days=1)
else:
return False
def _selection_cutoff_type(self):
# generate cutoff types from mapping
return list(self.cutoff_type_label_map.items())
@api.model
def _default_cutoff_account_id(self):
cutoff_type = self.env.context.get("default_cutoff_type")
company = self.env.company
if cutoff_type == "accrued_expense":
account_id = company.default_accrued_expense_account_id.id or False
elif cutoff_type == "accrued_revenue":
account_id = company.default_accrued_revenue_account_id.id or False
elif cutoff_type == "prepaid_revenue":
account_id = company.default_prepaid_revenue_account_id.id or False
elif cutoff_type == "prepaid_expense":
account_id = company.default_prepaid_expense_account_id.id or False
else:
account_id = False
return account_id
cutoff_date = fields.Date(
string="Cut-off Date",
copy=False,
tracking=True,
default=lambda self: self._default_cutoff_date(),
)
cutoff_type = fields.Selection(
selection="_selection_cutoff_type",
string="Type",
required=True,
)
source_move_state = fields.Selection(
[("posted", "Posted Entries"), ("draft_posted", "Draft and Posted Entries")],
string="Source Entries",
required=True,
default="posted",
tracking=True,
)
move_id = fields.Many2one(
"account.move",
string="Cut-off Journal Entry",
readonly=True,
copy=False,
check_company=True,
)
move_ref = fields.Char(
string="Reference of the Cut-off Journal Entry",
default=lambda self: self._default_move_ref(),
)
move_partner = fields.Boolean(
string="Partner on Journal Items",
default=lambda self: self.env.company.default_cutoff_move_partner,
tracking=True,
)
cutoff_account_id = fields.Many2one(
comodel_name="account.account",
string="Cut-off Account",
domain="[('deprecated', '=', False), ('company_id', '=', company_id)]",
default=lambda self: self._default_cutoff_account_id(),
check_company=True,
tracking=True,
)
cutoff_journal_id = fields.Many2one(
comodel_name="account.journal",
string="Cut-off Account Journal",
default=lambda self: self.env.company.default_cutoff_journal_id,
domain="[('company_id', '=', company_id)]",
check_company=True,
tracking=True,
)
total_cutoff_amount = fields.Monetary(
compute="_compute_total_cutoff",
string="Total Cut-off Amount",
currency_field="company_currency_id",
tracking=True,
)
company_id = fields.Many2one(
"res.company",
string="Company",
required=True,
default=lambda self: self.env.company,
)
company_currency_id = fields.Many2one(
related="company_id.currency_id", string="Company Currency"
)
line_ids = fields.One2many(
comodel_name="account.cutoff.line",
inverse_name="parent_id",
string="Cut-off Lines",
)
state = fields.Selection(
selection=[("draft", "Draft"), ("done", "Done")],
index=True,
readonly=True,
tracking=True,
default="draft",
copy=False,
help="State of the cutoff. When the Journal Entry is created, "
"the state is set to 'Done' and the fields become read-only.",
)
_sql_constraints = [
(
"date_type_company_uniq",
"unique(cutoff_date, company_id, cutoff_type)",
_("A cutoff of the same type already exists with this cut-off date !"),
)
]
def _compute_display_name(self):
type2label = self.cutoff_type_label_map
for rec in self:
name = type2label.get(rec.cutoff_type, "")
if rec.cutoff_date:
name = f"({name}, {format_date(self.env, rec.cutoff_date)})"
rec.display_name = name or f"#{rec.id}"
def back2draft(self):
self.ensure_one()
if self.move_id:
self.move_id.unlink()
self.write({"state": "draft"})
def _get_merge_keys(self):
"""Return merge criteria for provision lines
The returned list must contain valid field names
for account.move.line. Provision lines with the
same values for these fields will be merged.
The list must at least contain account_id.
"""
return ["partner_id", "account_id", "analytic_distribution"]
def _prepare_move(self, to_provision):
self.ensure_one()
movelines_to_create = []
amount_total = 0
ref = self.move_ref
merge_keys = self._get_merge_keys()
for merge_values, amount in to_provision.items():
amount = self.company_currency_id.round(amount)
vals = {
"debit": amount < 0 and amount * -1 or 0,
"credit": amount >= 0 and amount or 0,
"tax_ids": False, # neutralize defaut tax of account
}
for k, v in zip(merge_keys, merge_values, strict=True):
value = v
if k == "analytic_distribution" and isinstance(v, str):
value = json.loads(value)
vals[k] = value
movelines_to_create.append((0, 0, vals))
amount_total += amount
# add counter-part
counterpart_amount = self.company_currency_id.round(amount_total * -1)
movelines_to_create.append(
(
0,
0,
{
"account_id": self.cutoff_account_id.id,
"debit": counterpart_amount < 0 and counterpart_amount * -1 or 0,
"credit": counterpart_amount >= 0 and counterpart_amount or 0,
},
)
)
res = {
"company_id": self.company_id.id,
"journal_id": self.cutoff_journal_id.id,
"date": self.cutoff_date,
"ref": ref,
"line_ids": movelines_to_create,
}
return res
def _prepare_provision_line(self, cutoff_line):
"""Convert a cutoff line to elements of a move line.
The returned dictionary must at least contain 'account_id'
and 'amount' (< 0 means debit).
If you override this, the added fields must also be
added in an override of _get_merge_keys.
"""
partner_id = cutoff_line.partner_id.id or False
return {
"partner_id": self.move_partner and partner_id or False,
"account_id": cutoff_line.cutoff_account_id.id,
"analytic_distribution": cutoff_line.analytic_distribution,
"amount": cutoff_line.cutoff_amount,
}
def _prepare_provision_tax_line(self, cutoff_tax_line):
"""Convert a cutoff tax line to elements of a move line.
See _prepare_provision_line for more info.
"""
return {
"partner_id": False,
"account_id": cutoff_tax_line.cutoff_account_id.id,
"analytic_distribution": cutoff_tax_line.analytic_distribution,
"amount": cutoff_tax_line.cutoff_amount,
}
def _merge_provision_lines(self, provision_lines):
"""Merge provision line.
Returns a dictionary {key, amount} where key is
a tuple containing the values of the properties in _get_merge_keys()
"""
to_provision = defaultdict(float)
merge_keys = self._get_merge_keys()
for provision_line in provision_lines:
key = tuple(
isinstance(provision_line.get(key), dict)
and json.dumps(provision_line.get(key))
or provision_line.get(key)
for key in merge_keys
)
to_provision[key] += provision_line["amount"]
return to_provision
def create_move(self):
self.ensure_one()
move_obj = self.env["account.move"]
if self.move_id:
raise UserError(
_(
"The Cut-off Journal Entry already exists. You should "
"delete it before running this function."
)
)
if not self.line_ids:
raise UserError(
_(
"There are no lines on this Cut-off, so we can't create "
"a Journal Entry."
)
)
provision_lines = []
for line in self.line_ids:
provision_lines.append(self._prepare_provision_line(line))
for tax_line in line.tax_line_ids:
provision_lines.append(self._prepare_provision_tax_line(tax_line))
to_provision = self._merge_provision_lines(provision_lines)
vals = self._prepare_move(to_provision)
move = move_obj.create(vals)
if self.company_id.post_cutoff_move:
move._post(soft=False)
self.write({"move_id": move.id, "state": "done"})
self.message_post(body=_("Journal entry generated"))
action = self.env.ref("account.action_move_journal_line").sudo().read()[0]
action.update(
{
"view_mode": "form,tree",
"res_id": move.id,
"view_id": False,
"views": False,
}
)
return action
def get_lines(self):
"""This method is designed to be inherited in other modules"""
self.ensure_one()
assert self.state != "done"
# I test self.state == 'draft' below because other modules
# (e.g. account_cutoff_start_end_dates) add additional states
# and don't require self.cutoff_date
if self.state == "draft" and not self.cutoff_date:
raise UserError(_("Cutoff date is not set."))
# Delete existing lines
self.line_ids.unlink()
self.message_post(body=_("Cut-off lines re-generated"))
def unlink(self):
for rec in self:
if rec.state == "done":
raise UserError(
_("You cannot delete cutoff records that are in done state.")
)
return super().unlink()
def button_line_tree(self):
action = self.env["ir.actions.actions"]._for_xml_id(
"account_cutoff_base.account_cutoff_line_action"
)
action.update(
{
"domain": [("parent_id", "=", self.id)],
"views": False,
}
)
return action
def _get_mapping_dict(self):
"""return a dict with:
key = ID of account,
value = ID of cutoff_account"""
self.ensure_one()
mappings = self.env["account.cutoff.mapping"].search(
[
("company_id", "=", self.company_id.id),
("cutoff_type", "in", ("all", self.cutoff_type)),
]
)
mapping = {}
for item in mappings:
mapping[item.account_id.id] = item.cutoff_account_id.id
return mapping
def _prepare_tax_lines(self, tax_compute_all_res, currency):
res = []
ato = self.env["account.tax"]
company_currency = self.company_id.currency_id
cur_rprec = company_currency.rounding
for tax_line in tax_compute_all_res["taxes"]:
tax = ato.browse(tax_line["id"])
if float_is_zero(tax_line["amount"], precision_rounding=cur_rprec):
continue
tax_accrual_account = False
tax_account_field_label = ""
if self.cutoff_type == "accrued_expense":
tax_accrual_account = (
tax.account_accrued_expense_id
or self.company_id.default_accrued_expense_tax_account_id
)
tax_account_field_label = _("Accrued Expense Tax Account")
elif self.cutoff_type == "accrued_revenue":
tax_accrual_account = (
tax.account_accrued_revenue_id
or self.company_id.default_accrued_revenue_tax_account_id
)
tax_account_field_label = _("Accrued Revenue Tax Account")
if not tax_accrual_account:
raise UserError(
_(
"Missing '%(tax_account_field_label)s'. You must configure it "
"on the tax '%(tax_display_name)s' or on the accounting "
"configuration page of the company '%(company)s'.",
tax_account_field_label=tax_account_field_label,
tax_display_name=tax.display_name,
company=self.company_id.display_name,
)
)
tax_amount = currency.round(tax_line["amount"])
tax_accrual_amount = currency._convert(
tax_amount, company_currency, self.company_id, self.cutoff_date
)
res.append(
(
0,
0,
{
"tax_id": tax_line["id"],
"base": tax_line["base"], # in currency
"amount": tax_amount, # in currency
"sequence": tax_line["sequence"],
"cutoff_account_id": tax_accrual_account.id,
"cutoff_amount": tax_accrual_amount, # in company currency
},
)
)
return res