odoo_account_ebics/account_ebics/models/ebics_file.py

655 lines
24 KiB
Python
Raw Normal View History

# Copyright 2009-2023 Noviat.
2023-05-28 15:01:06 +00:00
# License LGPL-3 or later (http://www.gnu.org/licenses/lgpl).
2018-08-14 15:04:39 +00:00
import base64
2018-08-14 15:04:39 +00:00
import logging
from copy import deepcopy
from sys import exc_info
from traceback import format_exception
2018-08-14 15:04:39 +00:00
from lxml import etree
from odoo import _, fields, models
2018-08-14 15:04:39 +00:00
from odoo.exceptions import UserError
2022-05-10 19:40:54 +00:00
from odoo.tools.safe_eval import safe_eval
2018-08-14 15:04:39 +00:00
from odoo.addons.base.models.res_bank import sanitize_account_number
2018-08-14 15:04:39 +00:00
_logger = logging.getLogger(__name__)
class EbicsFile(models.Model):
2022-05-10 19:40:54 +00:00
_name = "ebics.file"
_description = "Object to store EBICS Data Files"
_order = "date desc"
_sql_constraints = [
2022-05-10 19:40:54 +00:00
(
"name_uniq",
"unique (name, format_id)",
"This File has already been down- or uploaded !",
)
]
2018-08-14 15:04:39 +00:00
2022-05-10 19:40:54 +00:00
name = fields.Char(string="Filename")
data = fields.Binary(string="File", readonly=True)
2018-08-14 15:04:39 +00:00
format_id = fields.Many2one(
2022-05-10 19:40:54 +00:00
comodel_name="ebics.file.format", string="EBICS File Formats", readonly=True
)
type = fields.Selection(related="format_id.type", readonly=True)
2018-08-14 15:04:39 +00:00
date_from = fields.Date(
2022-05-10 19:40:54 +00:00
readonly=True, help="'Date From' as entered in the download wizard."
)
2018-08-14 15:04:39 +00:00
date_to = fields.Date(
2022-05-10 19:40:54 +00:00
readonly=True, help="'Date To' as entered in the download wizard."
)
2018-08-14 15:04:39 +00:00
date = fields.Datetime(
2022-05-10 19:40:54 +00:00
required=True, readonly=True, help="File Upload/Download date"
)
2018-08-14 15:04:39 +00:00
bank_statement_ids = fields.One2many(
2022-05-10 19:40:54 +00:00
comodel_name="account.bank.statement",
inverse_name="ebics_file_id",
string="Generated Bank Statements",
readonly=True,
)
2018-08-14 15:04:39 +00:00
state = fields.Selection(
2022-05-10 19:40:54 +00:00
[("draft", "Draft"), ("done", "Done")],
default="draft",
required=True,
readonly=True,
)
2018-08-14 15:04:39 +00:00
user_id = fields.Many2one(
2022-05-10 19:40:54 +00:00
comodel_name="res.users",
string="User",
2018-08-14 15:04:39 +00:00
default=lambda self: self.env.user,
2022-05-10 19:40:54 +00:00
readonly=True,
)
ebics_userid_id = fields.Many2one(
2022-05-10 19:40:54 +00:00
comodel_name="ebics.userid",
string="EBICS UserID",
ondelete="restrict",
readonly=True,
)
note = fields.Text(string="Notes")
note_process = fields.Text(
string="Process Notes",
readonly=True,
)
company_ids = fields.Many2many(
2022-05-10 19:40:54 +00:00
comodel_name="res.company",
string="Companies",
readonly=True,
2022-05-10 19:40:54 +00:00
help="Companies sharing this EBICS file.",
)
2018-08-14 15:04:39 +00:00
def unlink(self):
ff_methods = self._file_format_methods()
for ebics_file in self:
2022-05-10 19:40:54 +00:00
if ebics_file.state == "done":
raise UserError(_("You can only remove EBICS files in state 'Draft'."))
2018-08-14 15:04:39 +00:00
# execute format specific actions
ff = ebics_file.format_id.download_process_method
2018-08-14 15:04:39 +00:00
if ff in ff_methods:
2022-05-10 19:40:54 +00:00
if ff_methods[ff].get("unlink"):
2023-02-15 07:07:45 +00:00
ff_methods[ff]["unlink"]()
2018-08-14 15:04:39 +00:00
# remove bank statements
ebics_file.bank_statement_ids.unlink()
2022-12-23 10:43:56 +00:00
return super().unlink()
2018-08-14 15:04:39 +00:00
def set_to_draft(self):
2022-05-10 19:40:54 +00:00
return self.write({"state": "draft"})
2018-08-14 15:04:39 +00:00
def set_to_done(self):
2022-05-10 19:40:54 +00:00
return self.write({"state": "done"})
2018-08-14 15:04:39 +00:00
def process(self):
self.ensure_one()
2022-10-27 21:48:16 +00:00
self = self.with_context(allowed_company_ids=self.env.user.company_ids.ids)
2022-05-10 19:40:54 +00:00
self.note_process = ""
2018-08-14 15:04:39 +00:00
ff_methods = self._file_format_methods()
ff = self.format_id.download_process_method
2018-08-14 15:04:39 +00:00
if ff in ff_methods:
2022-05-10 19:40:54 +00:00
if ff_methods[ff].get("process"):
res = ff_methods[ff]["process"]()
2022-05-10 19:40:54 +00:00
self.state = "done"
2018-08-14 15:04:39 +00:00
return res
else:
return self._process_undefined_format()
def action_open_bank_statements(self):
self.ensure_one()
2022-05-10 19:40:54 +00:00
action = self.env["ir.actions.act_window"]._for_xml_id(
"account.action_bank_statement_tree"
)
domain = [("id", "in", self.env.context.get("statement_ids"))]
action["domain"] = domain
2018-08-14 15:04:39 +00:00
return action
def button_close(self):
self.ensure_one()
2022-05-10 19:40:54 +00:00
return {"type": "ir.actions.act_window_close"}
2018-08-14 15:04:39 +00:00
def _file_format_methods(self):
"""
Extend this dictionary in order to add support
for extra file formats.
"""
res = {
2022-05-10 19:40:54 +00:00
"cfonb120": {
"process": self._process_cfonb120,
"unlink": self._unlink_cfonb120,
},
"camt.052": {
"process": self._process_camt052,
"unlink": self._unlink_camt052,
},
"camt.053": {
"process": self._process_camt053,
"unlink": self._unlink_camt053,
},
"camt.054": {
"process": self._process_camt054,
"unlink": self._unlink_camt054,
},
"pain.002": {
"process": self._process_pain002,
"unlink": self._unlink_pain002,
},
2018-08-14 15:04:39 +00:00
}
return res
2021-01-30 18:16:52 +00:00
def _check_import_module(self, module, raise_if_not_found=True):
2022-05-10 19:40:54 +00:00
mod = (
self.env["ir.module.module"]
.sudo()
.search([("name", "=like", module), ("state", "=", "installed")])
)
2018-08-14 15:04:39 +00:00
if not mod:
2021-01-30 18:16:52 +00:00
if raise_if_not_found:
2022-05-10 19:40:54 +00:00
raise UserError(
_(
2022-10-27 21:48:16 +00:00
"The module to process the '%(ebics_format)s' format is not installed "
2022-05-10 19:40:54 +00:00
"on your system. "
2022-10-27 21:48:16 +00:00
"\nPlease install module '%(module)s'",
ebics_format=self.format_id.name,
module=module,
2022-05-10 19:40:54 +00:00
)
)
2021-01-30 18:16:52 +00:00
return False
return True
2018-08-14 15:04:39 +00:00
def _process_download_result(self, res):
statement_ids = res["statement_ids"]
notifications = res["notifications"]
sts_data = []
if statement_ids:
self.env.flush_all()
self.env.cr.execute(
"""
SELECT abs.name, abs.date, abs.company_id, rc.name AS company_name
FROM account_bank_statement abs
JOIN res_company rc ON rc.id = abs.company_id
WHERE abs.id in %s
ORDER BY abs.date, rc.id
""",
(tuple(res["statement_ids"]),),
)
sts_data = self.env.cr.dictfetchall()
st_cnt = len(statement_ids)
warning_cnt = error_cnt = 0
if notifications:
for notif in notifications:
if notif["type"] == "error":
error_cnt += 1
elif notif["type"] == "warning":
warning_cnt += 1
parts = [notif[k] for k in notif if k in ("message", "details")]
self.note_process += "\n".join(parts)
self.note_process += "\n\n"
self.note_process += "\n"
if error_cnt:
self.note_process += (
_("Number of errors detected during import: %s") % error_cnt
)
self.note_process += "\n"
if warning_cnt:
self.note_process += (
_("Number of warnings detected during import: %s") % warning_cnt
)
2020-07-18 15:43:35 +00:00
if st_cnt:
self.note_process += "\n\n"
self.note_process += _(
"%(st_cnt)s bank statement%(sp)s been imported: ",
st_cnt=st_cnt,
sp=st_cnt == 1 and _(" has") or _("s have"),
)
2022-05-10 19:40:54 +00:00
self.note_process += "\n"
2020-07-18 15:43:35 +00:00
for st_data in sts_data:
self.note_process += ("\n%s, %s (%s)") % (
2022-05-10 19:40:54 +00:00
st_data["date"],
st_data["name"],
st_data["company_name"],
)
2020-07-15 18:48:15 +00:00
if statement_ids:
self.sudo().bank_statement_ids = [(4, x) for x in statement_ids]
company_ids = self.sudo().bank_statement_ids.mapped("company_id").ids
self.company_ids = [(6, 0, company_ids)]
2020-07-15 18:48:15 +00:00
ctx = dict(self.env.context, statement_ids=statement_ids)
2022-05-10 19:40:54 +00:00
module = __name__.split("addons.")[1].split(".")[0]
result_view = self.env.ref("%s.ebics_file_view_form_result" % module)
2018-08-14 15:04:39 +00:00
return {
2022-05-10 19:40:54 +00:00
"name": _("Import EBICS File"),
"res_id": self.id,
"view_type": "form",
"view_mode": "form",
"res_model": self._name,
"view_id": result_view.id,
"target": "new",
"context": ctx,
"type": "ir.actions.act_window",
2018-08-14 15:04:39 +00:00
}
def _process_cfonb120(self):
"""
Disable this code while waiting on OCA cfonb release for 16.0
"""
# pylint: disable=W0101
raise NotImplementedError
2022-05-10 19:40:54 +00:00
import_module = "account_statement_import_fr_cfonb"
2018-08-14 15:04:39 +00:00
self._check_import_module(import_module)
2022-05-10 19:40:54 +00:00
wiz_model = "account.statement.import"
data_file = base64.b64decode(self.data)
2022-05-10 19:40:54 +00:00
lines = data_file.split(b"\n")
2020-12-14 17:32:49 +00:00
wiz_vals_list = []
2022-05-10 19:40:54 +00:00
st_lines = b""
2020-07-16 21:32:20 +00:00
transactions = False
for line in lines:
rec_type = line[0:2]
acc_number = line[21:32]
2022-05-10 19:40:54 +00:00
st_lines += line + b"\n"
if rec_type == b"04":
2020-07-16 21:32:20 +00:00
transactions = True
2022-05-10 19:40:54 +00:00
if rec_type == b"07":
2020-07-16 21:32:20 +00:00
if transactions:
2022-05-10 19:40:54 +00:00
fn = "_".join([acc_number.decode(), self.name])
wiz_vals_list.append(
{
"statement_filename": fn,
"statement_file": base64.b64encode(st_lines),
}
)
st_lines = b""
2020-07-16 21:32:20 +00:00
transactions = False
result_action = self.env["ir.actions.act_window"]._for_xml_id(
"account.action_bank_statement_tree"
)
result_action["context"] = safe_eval(result_action["context"])
statement_ids = []
notifications = []
2020-12-14 17:32:49 +00:00
for i, wiz_vals in enumerate(wiz_vals_list, start=1):
result = {
"statement_ids": [],
"notifications": [],
}
statement_filename = wiz_vals["statement_filename"]
2022-10-27 21:48:16 +00:00
wiz = (
self.env[wiz_model]
.with_context(active_model="ebics.file")
.create(wiz_vals)
)
try:
with self.env.cr.savepoint():
file_data = base64.b64decode(wiz_vals["statement_file"])
msg_hdr = _(
"{} : Import failed for statement number %(index)s, filename %(fn)s:\n",
index=i,
fn=statement_filename,
)
wiz.import_single_file(file_data, result)
if not result["statement_ids"]:
message = msg_hdr.format(_("Warning"))
message += _(
"You have already imported this file, or this file "
"only contains already imported transactions."
)
notifications += [
{
"type": "warning",
"message": message,
}
]
else:
statement_ids.extend(result["statement_ids"])
notifications.extend(result["notifications"])
except UserError as e:
message = msg_hdr.format(_("Error"))
message += "".join(e.args)
notifications += [
{
"type": "error",
"message": message,
}
]
except Exception:
tb = "".join(format_exception(*exc_info()))
message = msg_hdr.format(_("Error"))
message += tb
notifications += [
{
"type": "error",
"message": message,
}
]
result_action["context"]["notifications"] = notifications
result_action["domain"] = [("id", "in", statement_ids)]
return self._process_result_action(result_action)
2018-08-14 15:04:39 +00:00
def _unlink_cfonb120(self):
"""
Placeholder for cfonb120 specific actions before removing the
EBICS data file and its related bank statements.
"""
2019-12-18 20:23:15 +00:00
def _process_camt052(self):
2022-05-10 19:40:54 +00:00
import_module = "account_statement_import_camt"
2019-12-18 20:23:15 +00:00
self._check_import_module(import_module)
return self._process_camt053(self)
def _unlink_camt052(self):
"""
Placeholder for camt052 specific actions before removing the
EBICS data file and its related bank statements.
"""
def _process_camt054(self):
2022-05-10 19:40:54 +00:00
import_module = "account_statement_import_camt"
self._check_import_module(import_module)
return self._process_camt053(self)
def _unlink_camt054(self):
"""
Placeholder for camt054 specific actions before removing the
EBICS data file and its related bank statements.
"""
def _process_camt053(self): # noqa C901
"""
The Odoo standard statement import is based on manual selection
of a financial journal before importing the electronic statement file.
An EBICS download may return a single file containing a large number of
statements from different companies/journals.
Hence we need to split the CAMT file into
single statement CAMT files before we can call the logic
implemented by the Odoo OE or Community CAMT parsers.
TODO: refactor method to enable removal of noqa C901
"""
2021-01-30 18:16:52 +00:00
modules = [
2022-05-10 19:40:54 +00:00
("oca", "account_statement_import_camt"),
("oe", "account_bank_statement_import_camt"),
2021-01-30 18:16:52 +00:00
]
author = False
for entry in modules:
if self._check_import_module(entry[1], raise_if_not_found=False):
author = entry[0]
2021-01-30 18:16:52 +00:00
break
if not author:
2022-05-10 19:40:54 +00:00
raise UserError(
_(
2022-10-27 21:48:16 +00:00
"The module to process the '%(ebics_format)s' format is "
"not installed on your system. "
"\nPlease install one of the following modules: \n%(modules)s.",
ebics_format=self.format_id.name,
modules=", ".join([x[1] for x in modules]),
2022-05-10 19:40:54 +00:00
)
2021-01-30 18:16:52 +00:00
)
res = {"statement_ids": [], "notifications": []}
try:
with self.env.cr.savepoint():
transactions = False
msg_hdr = _("{} : Import failed for file %(fn)s:\n", fn=self.name)
file_data = base64.b64decode(self.data)
root = etree.fromstring(file_data, parser=etree.XMLParser(recover=True))
if root is None:
message = msg_hdr.format(_("Error"))
message += _("Invalid XML file.")
res["notifications"].append({"type": "error", "message": message})
ns = {k or "ns": v for k, v in root.nsmap.items()}
for i, stmt in enumerate(root[0].findall("ns:Stmt", ns), start=1):
msg_hdr = _(
"{} : Import failed for statement number %(index)s, filename %(fn)s:\n",
index=i,
fn=self.name,
)
acc_number = sanitize_account_number(
stmt.xpath(
"ns:Acct/ns:Id/ns:IBAN/text() | ns:Acct/ns:Id/ns:Othr/ns:Id/text()",
namespaces=ns,
)[0]
)
if not acc_number:
message = msg_hdr.format(_("Error"))
message += _("No bank account number found.")
res["notifications"].append(
{"type": "error", "message": message}
)
continue
currency_code = stmt.xpath(
"ns:Acct/ns:Ccy/text() | ns:Bal/ns:Amt/@Ccy", namespaces=ns
)[0]
currency = self.env["res.currency"].search(
[("name", "=ilike", currency_code)], limit=1
)
if not currency:
message = msg_hdr.format(_("Error"))
2023-02-15 07:07:45 +00:00
message += _("Currency %(cc)s not found.", cc=currency_code)
res["notifications"].append(
{"type": "error", "message": message}
)
continue
journal = self.env["account.journal"].search(
[
("type", "=", "bank"),
(
"bank_account_id.sanitized_acc_number",
"ilike",
acc_number,
),
]
)
if not journal:
message = msg_hdr.format(_("Error"))
message += _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append(
{"type": "error", "message": message}
)
continue
journal_currency = (
journal.currency_id or journal.company_id.currency_id
)
if journal_currency != currency:
message = msg_hdr.format(_("Error"))
message += _(
"No financial journal found for Account Number %(nbr)s, "
2023-02-15 07:07:45 +00:00
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append(
{"type": "error", "message": message}
)
continue
root_new = deepcopy(root)
entries = False
for j, el in enumerate(root_new[0].findall("ns:Stmt", ns), start=1):
if j != i:
el.getparent().remove(el)
else:
entries = el.findall("ns:Ntry", ns)
if not entries:
continue
transactions = True
data = base64.b64encode(etree.tostring(root_new))
if author == "oca":
# TODO: implement _process_camt053_oca() once OCA camt is
# released for 16.0
raise NotImplementedError
else:
self.env.company = journal.company_id
attachment = self.env["ir.attachment"].create(
{"name": self.name, "datas": data, "store_fname": self.name}
)
act = journal._import_bank_statement(attachment)
for entry in act["domain"]:
if (
isinstance(entry, tuple)
and entry[0] == "statement_id"
and entry[1] == "in"
):
res["statement_ids"].extend(entry[2])
break
notifications = act["context"]["notifications"]
if notifications:
res["notifications"].append(act["context"]["notifications"])
if not transactions:
message = _(
"Warning:\nNo transactions found in file %(fn)s.", fn=self.name
)
res["notifications"].append({"type": "warning", "message": message})
except UserError as e:
message = msg_hdr.format(_("Error"))
message += "".join(e.args)
res["notifications"].append({"type": "error", "message": message})
except Exception:
tb = "".join(format_exception(*exc_info()))
message = msg_hdr.format(_("Error"))
message += tb
res["notifications"].append({"type": "error", "message": message})
if author == "oca":
# TODO: implement _process_camt053_oca() once OCA camt is
# released for 16.0
return self._process_camt053_oca()
2021-01-30 18:16:52 +00:00
else:
return self._process_download_result(res)
2021-01-30 18:16:52 +00:00
def _process_camt053_oca(self):
"""
Disable this code while waiting on OCA CAMT parser for 16.0
"""
# pylint: disable=W0101
raise NotImplementedError
2022-05-10 19:40:54 +00:00
wiz_model = "account.statement.import"
2021-01-30 18:16:52 +00:00
wiz_vals = {
2022-05-10 19:40:54 +00:00
"statement_filename": self.name,
"statement_file": self.data,
2021-01-30 18:16:52 +00:00
}
result_action = self.env["ir.actions.act_window"]._for_xml_id(
"account.action_bank_statement_tree"
)
result_action["context"] = safe_eval(result_action["context"])
2021-01-30 18:16:52 +00:00
result = {
"statement_ids": [],
"notifications": [],
2021-01-30 18:16:52 +00:00
}
statement_ids = []
notifications = []
2022-10-27 21:48:16 +00:00
wiz = (
self.env[wiz_model].with_context(active_model="ebics.file").create(wiz_vals)
)
msg_hdr = _(
"{} : Import failed for EBICS File %(fn)s:\n",
fn=wiz.statement_filename,
)
try:
with self.env.cr.savepoint():
file_data = base64.b64decode(self.data)
wiz.import_single_file(file_data, result)
if not result["statement_ids"]:
message = msg_hdr.format(_("Warning"))
message += _(
"You have already imported this file, or this file "
"only contains already imported transactions."
)
notifications += [
{
"type": "warning",
"message": message,
}
]
else:
statement_ids.extend(result["statement_ids"])
notifications.extend(result["notifications"])
except UserError as e:
message = msg_hdr.format(_("Error"))
message += "".join(e.args)
notifications += [
{
"type": "error",
"message": message,
}
]
except Exception:
tb = "".join(format_exception(*exc_info()))
message = msg_hdr.format(_("Error"))
message += tb
notifications += [
{
"type": "error",
"message": message,
}
]
result_action["context"]["notifications"] = notifications
result_action["domain"] = [("id", "in", statement_ids)]
return self._process_result_action(result_action)
2021-01-30 18:16:52 +00:00
2018-08-14 15:04:39 +00:00
def _unlink_camt053(self):
"""
Placeholder for camt053 specific actions before removing the
EBICS data file and its related bank statements.
"""
def _process_pain002(self):
"""
Placeholder for processing pain.002 files.
TODO:
add import logic based upon OCA 'account_payment_return_import'
"""
def _unlink_pain002(self):
"""
Placeholder for pain.002 specific actions before removing the
EBICS data file.
"""
raise NotImplementedError
2018-08-14 15:04:39 +00:00
def _process_undefined_format(self):
2022-05-10 19:40:54 +00:00
raise UserError(
_(
"The current version of the 'account_ebics' module "
"has no support to automatically process EBICS files "
"with format %s."
)
% self.format_id.name
)