From 6adad2e0c855ce50d70e061e19aaebd53231a4e0 Mon Sep 17 00:00:00 2001 From: Luc De Meyer Date: Thu, 3 Aug 2023 23:42:58 +0200 Subject: [PATCH] account_ebics 16.0 : add support for CFONB --- account_ebics/__manifest__.py | 2 +- account_ebics/models/ebics_file.py | 340 ++++++++---------- .../wizards/account_statement_import.py | 34 +- 3 files changed, 174 insertions(+), 202 deletions(-) diff --git a/account_ebics/__manifest__.py b/account_ebics/__manifest__.py index 9b32d3f..5080e83 100644 --- a/account_ebics/__manifest__.py +++ b/account_ebics/__manifest__.py @@ -3,7 +3,7 @@ { "name": "EBICS banking protocol", - "version": "16.0.1.3.1", + "version": "16.0.1.3.2", "license": "LGPL-3", "author": "Noviat", "website": "https://www.noviat.com", diff --git a/account_ebics/models/ebics_file.py b/account_ebics/models/ebics_file.py index 63f4353..20ac47e 100644 --- a/account_ebics/models/ebics_file.py +++ b/account_ebics/models/ebics_file.py @@ -11,7 +11,6 @@ from lxml import etree from odoo import _, fields, models from odoo.exceptions import UserError -from odoo.tools.safe_eval import safe_eval from odoo.addons.base.models.res_bank import sanitize_account_number @@ -177,6 +176,54 @@ class EbicsFile(models.Model): return False return True + def _lookup_journal(self, res, acc_number, currency_code): + currency = self.env["res.currency"].search( + [("name", "=ilike", currency_code)], limit=1 + ) + journal = self.env["account.journal"] + if not currency: + message = _("Currency %(cc)s not found.", cc=currency_code) + res["notifications"].append({"type": "error", "message": message}) + return (currency, journal) + + journals = self.env["account.journal"].search( + [ + ("type", "=", "bank"), + ( + "bank_account_id.sanitized_acc_number", + "ilike", + acc_number, + ), + ] + ) + if not journals: + message = _( + "No financial journal found for Account Number %(nbr)s, " + "Currency %(cc)s", + nbr=acc_number, + cc=currency_code, + ) + res["notifications"].append({"type": "error", "message": message}) + return (currency, journal) + + for jrnl in journals: + journal_currency = jrnl.currency_id or jrnl.company_id.currency_id + if journal_currency != currency: + continue + else: + journal = jrnl + break + + if not journal: + message = _( + "No financial journal found for Account Number %(nbr)s, " + "Currency %(cc)s", + nbr=acc_number, + cc=currency_code, + ) + res["notifications"].append({"type": "error", "message": message}) + return (currency, journal) + def _process_download_result(self, res): statement_ids = res["statement_ids"] notifications = res["notifications"] @@ -184,24 +231,30 @@ class EbicsFile(models.Model): st_cnt = len(statement_ids) warning_cnt = error_cnt = 0 if notifications: + errors = [] + warnings = [] for notif in notifications: if notif["type"] == "error": error_cnt += 1 + parts = [notif[k] for k in notif if k in ("message", "details")] + errors.append("\n".join(parts)) elif notif["type"] == "warning": warning_cnt += 1 - parts = [notif[k] for k in notif if k in ("message", "details")] - self.note_process += "\n".join(parts) - self.note_process += "\n\n" - self.note_process += "\n" + parts = [notif[k] for k in notif if k in ("message", "details")] + warnings.append("\n".join(parts)) + + self.note_process += _("Process file %(fn)s results:", fn=self.name) if error_cnt: - self.note_process += ( - _("Number of errors detected during import: %s") % error_cnt - ) - self.note_process += "\n" + self.note_process += "\n\n" + _("Errors") + ":\n" + self.note_process += "\n".join(errors) + self.note_process += "\n\n" + self.note_process += _("Number of errors: %(nr)s", nr=error_cnt) if warning_cnt: - self.note_process += ( - _("Number of warnings detected during import: %s") % warning_cnt - ) + self.note_process += "\n\n" + _("Warnings") + ":\n" + self.note_process += "\n".join(warnings) + self.note_process += "\n\n" + self.note_process += _("Number of warnings: %(nr)s", nr=warning_cnt) + self.note_process += "\n" if st_cnt: self.note_process += "\n\n" self.note_process += _( @@ -211,10 +264,11 @@ class EbicsFile(models.Model): ) self.note_process += "\n" for statement in statements: - self.note_process += ("\n%s, %s (%s)") % ( - statement.date, - statement.name, - statement.company_id.name, + self.note_process += "\n" + _( + "Statement %(st)s dated %(date)s (Company: %(cpy)s)", + st=statement.name, + date=statement.date, + cpy=statement.company_id.name, ) if statement_ids: self.sudo().bank_statement_ids = [(4, x) for x in statement_ids] @@ -236,103 +290,12 @@ class EbicsFile(models.Model): } def _process_cfonb120(self): - """ - Disable this code while waiting on OCA cfonb release for 16.0 - """ - # pylint: disable=W0101 - raise NotImplementedError - import_module = "account_statement_import_fr_cfonb" self._check_import_module(import_module) - wiz_model = "account.statement.import" - data_file = base64.b64decode(self.data) - lines = data_file.split(b"\n") - wiz_vals_list = [] - st_lines = b"" - transactions = False - for line in lines: - rec_type = line[0:2] - acc_number = line[21:32] - st_lines += line + b"\n" - if rec_type == b"04": - transactions = True - if rec_type == b"07": - if transactions: - fn = "_".join([acc_number.decode(), self.name]) - wiz_vals_list.append( - { - "statement_filename": fn, - "statement_file": base64.b64encode(st_lines), - } - ) - st_lines = b"" - transactions = False - result_action = self.env["ir.actions.act_window"]._for_xml_id( - "account.action_bank_statement_tree" - ) - result_action["context"] = safe_eval(result_action["context"]) - statement_ids = [] - notifications = [] - for i, wiz_vals in enumerate(wiz_vals_list, start=1): - result = { - "statement_ids": [], - "notifications": [], - } - statement_filename = wiz_vals["statement_filename"] - wiz = ( - self.env[wiz_model] - .with_context(active_model="ebics.file") - .create(wiz_vals) - ) - try: - with self.env.cr.savepoint(): - file_data = base64.b64decode(wiz_vals["statement_file"]) - msg_hdr = _( - "{} : Import failed for statement number %(index)s, filename %(fn)s:\n", - index=i, - fn=statement_filename, - ) - wiz.import_single_file(file_data, result) - if not result["statement_ids"]: - message = msg_hdr.format(_("Warning")) - message += _( - "You have already imported this file, or this file " - "only contains already imported transactions." - ) - notifications += [ - { - "type": "warning", - "message": message, - } - ] - else: - statement_ids.extend(result["statement_ids"]) - notifications.extend(result["notifications"]) - - except UserError as e: - message = msg_hdr.format(_("Error")) - message += "".join(e.args) - notifications += [ - { - "type": "error", - "message": message, - } - ] - - except Exception: - tb = "".join(format_exception(*exc_info())) - message = msg_hdr.format(_("Error")) - message += tb - notifications += [ - { - "type": "error", - "message": message, - } - ] - - result_action["context"]["notifications"] = notifications - result_action["domain"] = [("id", "in", statement_ids)] - return self._process_result_action(result_action) + res = {"statement_ids": [], "notifications": []} + st_datas = self._split_cfonb(res) + self._process_bank_statement_oca(res, st_datas) + return self._process_download_result(res) def _unlink_cfonb120(self): """ @@ -340,6 +303,41 @@ class EbicsFile(models.Model): EBICS data file and its related bank statements. """ + def _split_cfonb(self, res): + """ + Split CFONB file received via EBICS per statement. + Statements without transactions are removed. + """ + datas = [] + file_data = base64.b64decode(self.data) + lines = file_data.split(b"\n") + st_lines = b"" + transactions = False + for line in lines: + rec_type = line[0:2] + currency_code = line[16:19].decode() + acc_number = line[21:32].decode() + st_lines += line + b"\n" + if rec_type == b"04": + transactions = True + if rec_type == b"07": + if transactions: + currency, journal = self._lookup_journal( + res, acc_number, currency_code + ) + if currency and journal: + datas.append( + { + "acc_number": acc_number, + "journal_id": journal.id, + "company_id": journal.company_id.id, + "data": base64.b64encode(st_lines), + } + ) + st_lines = b"" + transactions = False + return datas + def _process_camt052(self): import_module = "account_statement_import_camt" self._check_import_module(import_module) @@ -394,28 +392,25 @@ class EbicsFile(models.Model): res = {"statement_ids": [], "notifications": []} st_datas = self._split_camt(res) if author == "oca": - self._process_camt053_oca(res, st_datas) + self._process_bank_statement_oca(res, st_datas) else: - self._process_camt053_oe(res, st_datas) + self._process_bank_statement_oe(res, st_datas) return self._process_download_result(res) - def _process_camt053_oca(self, res, st_datas): - msg_hdr = _("{} : Import failed for file %(fn)s:\n", fn=self.name) + def _process_bank_statement_oca(self, res, st_datas): for st_data in st_datas: try: with self.env.cr.savepoint(): - self._create_statement_camt053_oca(res, st_data) + self._create_bank_statement_oca(res, st_data) except UserError as e: - message = msg_hdr.format(_("Error")) - message += "".join(e.args) - res["notifications"].append({"type": "error", "message": message}) + res["notifications"].append( + {"type": "error", "message": "".join(e.args)} + ) except Exception: tb = "".join(format_exception(*exc_info())) - message = msg_hdr.format(_("Error")) - message += tb - res["notifications"].append({"type": "error", "message": message}) + res["notifications"].append({"type": "error", "message": tb}) - def _create_statement_camt053_oca(self, res, st_data): + def _create_bank_statement_oca(self, res, st_data): wiz = ( self.env["account.statement.import"] .with_company(st_data["company_id"]) @@ -424,27 +419,28 @@ class EbicsFile(models.Model): ) wiz.import_single_file(base64.b64decode(st_data["data"]), res) - def _process_camt053_oe(self, res, st_datas): + def _process_bank_statement_oe(self, res, st_datas): """ We execute a cr.commit() after every statement import since we get a 'savepoint does not exist' error when using 'with self.env.cr.savepoint()'. """ - msg_hdr = _("{} : Import failed for file %(fn)s:\n", fn=self.name) for st_data in st_datas: try: - self._create_statement_camt053_oe(res, st_data) + self._create_bank_statement_oe(res, st_data) self.env.cr.commit() # pylint: disable=E8102 except UserError as e: - message = msg_hdr.format(_("Error")) - message += "".join(e.args) - res["notifications"].append({"type": "error", "message": message}) + msg = "".join(e.args) + msg += "\n" + msg += _( + "Statement for Account Number %(nr)s has not been processed.", + nr=st_data["acc_number"], + ) + res["notifications"].append({"type": "error", "message": msg}) except Exception: tb = "".join(format_exception(*exc_info())) - message = msg_hdr.format(_("Error")) - message += tb - res["notifications"].append({"type": "error", "message": message}) + res["notifications"].append({"type": "error", "message": tb}) - def _create_statement_camt053_oe(self, res, st_data): + def _create_bank_statement_oe(self, res, st_data): attachment = ( self.env["ir.attachment"] .with_company(st_data["company_id"]) @@ -486,20 +482,14 @@ class EbicsFile(models.Model): Statements without transactions are removed. """ datas = [] - msg_hdr = _("{} : Import failed for file %(fn)s:\n", fn=self.name) file_data = base64.b64decode(self.data) root = etree.fromstring(file_data, parser=etree.XMLParser(recover=True)) if root is None: - message = msg_hdr.format(_("Error")) - message += _("Invalid XML file.") + message = _("Invalid XML file.") res["notifications"].append({"type": "error", "message": message}) ns = {k or "ns": v for k, v in root.nsmap.items()} - for i, stmt in enumerate(root[0].findall("ns:Stmt", ns), start=1): - msg_hdr = _( - "{} : Import failed for statement number %(index)s, filename %(fn)s:\n", - index=i, - fn=self.name, - ) + stmts = root[0].findall("ns:Stmt", ns) + for i, stmt in enumerate(stmts): acc_number = sanitize_account_number( stmt.xpath( "ns:Acct/ns:Id/ns:IBAN/text() | ns:Acct/ns:Id/ns:Othr/ns:Id/text()", @@ -507,72 +497,34 @@ class EbicsFile(models.Model): )[0] ) if not acc_number: - message = msg_hdr.format(_("Error")) - message += _("No bank account number found.") + message = _("No bank account number found.") res["notifications"].append({"type": "error", "message": message}) continue currency_code = stmt.xpath( "ns:Acct/ns:Ccy/text() | ns:Bal/ns:Amt/@Ccy", namespaces=ns )[0] - currency = self.env["res.currency"].search( - [("name", "=ilike", currency_code)], limit=1 - ) - if not currency: - message = msg_hdr.format(_("Error")) - message += _("Currency %(cc)s not found.", cc=currency_code) - res["notifications"].append({"type": "error", "message": message}) - continue - journal = self.env["account.journal"].search( - [ - ("type", "=", "bank"), - ( - "bank_account_id.sanitized_acc_number", - "ilike", - acc_number, - ), - ] - ) - if not journal: - message = msg_hdr.format(_("Error")) - message += _( - "No financial journal found for Account Number %(nbr)s, " - "Currency %(cc)s", - nbr=acc_number, - cc=currency_code, - ) - res["notifications"].append({"type": "error", "message": message}) - continue - - journal_currency = journal.currency_id or journal.company_id.currency_id - if journal_currency != currency: - message = msg_hdr.format(_("Error")) - message += _( - "No financial journal found for Account Number %(nbr)s, " - "Currency %(cc)s", - nbr=acc_number, - cc=currency_code, - ) - res["notifications"].append({"type": "error", "message": message}) - continue root_new = deepcopy(root) entries = False - for j, el in enumerate(root_new[0].findall("ns:Stmt", ns), start=1): + for j, el in enumerate(root_new[0].findall("ns:Stmt", ns)): if j != i: el.getparent().remove(el) else: entries = el.findall("ns:Ntry", ns) if not entries: continue - - datas.append( - { - "acc_number": acc_number, - "journal_id": journal.id, - "company_id": journal.company_id.id, - "data": base64.b64encode(etree.tostring(root_new)), - } - ) + else: + currency, journal = self._lookup_journal(res, acc_number, currency_code) + if not (currency and journal): + continue + datas.append( + { + "acc_number": acc_number, + "journal_id": journal.id, + "company_id": journal.company_id.id, + "data": base64.b64encode(etree.tostring(root_new)), + } + ) return datas diff --git a/account_ebics_oca_statement_import/wizards/account_statement_import.py b/account_ebics_oca_statement_import/wizards/account_statement_import.py index f4eb9e5..9615723 100644 --- a/account_ebics_oca_statement_import/wizards/account_statement_import.py +++ b/account_ebics_oca_statement_import/wizards/account_statement_import.py @@ -46,15 +46,35 @@ class AccountStatementImport(models.TransientModel): show days without transactions via the bank statement list view. """ if self.env.context.get("active_model") == "ebics.file": + messages = [] transactions = False for st_vals in stmts_vals: + statement_ids = result["statement_ids"][:] + self._set_statement_name(st_vals) if st_vals.get("transactions"): transactions = True - break - if not transactions: - message = _("This file doesn't contain any transaction.") - st_line_ids = [] - notifications = {"type": "warning", "message": message, "details": ""} - return st_line_ids, [notifications] + super()._create_bank_statements(stmts_vals, result) + if result["statement_ids"] == statement_ids: + # no statement has been created, this is the case + # when all transactions have been imported already + messages.append( + _( + "Statement %(st_name)s dated %(date)s " + "has already been imported.", + st_name=st_vals["name"], + date=st_vals["date"].strftime("%Y-%m-%d"), + ) + ) - return super()._create_bank_statements(stmts_vals, result) + if not transactions: + messages.append(_("This file doesn't contain any transaction.")) + if messages: + result["notifications"].append( + {"type": "warning", "message": "\n".join(messages)} + ) + return + + def _set_statement_name(self, st_vals): + """ + Inherit this method to set your own statement naming policy. + """