account_ebics 16.0 : add support for CFONB

This commit is contained in:
Luc De Meyer 2023-08-03 23:42:58 +02:00
parent d2af7e9fb3
commit 6adad2e0c8
3 changed files with 174 additions and 202 deletions

View File

@ -3,7 +3,7 @@
{ {
"name": "EBICS banking protocol", "name": "EBICS banking protocol",
"version": "16.0.1.3.1", "version": "16.0.1.3.2",
"license": "LGPL-3", "license": "LGPL-3",
"author": "Noviat", "author": "Noviat",
"website": "https://www.noviat.com", "website": "https://www.noviat.com",

View File

@ -11,7 +11,6 @@ from lxml import etree
from odoo import _, fields, models from odoo import _, fields, models
from odoo.exceptions import UserError from odoo.exceptions import UserError
from odoo.tools.safe_eval import safe_eval
from odoo.addons.base.models.res_bank import sanitize_account_number from odoo.addons.base.models.res_bank import sanitize_account_number
@ -177,6 +176,54 @@ class EbicsFile(models.Model):
return False return False
return True return True
def _lookup_journal(self, res, acc_number, currency_code):
currency = self.env["res.currency"].search(
[("name", "=ilike", currency_code)], limit=1
)
journal = self.env["account.journal"]
if not currency:
message = _("Currency %(cc)s not found.", cc=currency_code)
res["notifications"].append({"type": "error", "message": message})
return (currency, journal)
journals = self.env["account.journal"].search(
[
("type", "=", "bank"),
(
"bank_account_id.sanitized_acc_number",
"ilike",
acc_number,
),
]
)
if not journals:
message = _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append({"type": "error", "message": message})
return (currency, journal)
for jrnl in journals:
journal_currency = jrnl.currency_id or jrnl.company_id.currency_id
if journal_currency != currency:
continue
else:
journal = jrnl
break
if not journal:
message = _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append({"type": "error", "message": message})
return (currency, journal)
def _process_download_result(self, res): def _process_download_result(self, res):
statement_ids = res["statement_ids"] statement_ids = res["statement_ids"]
notifications = res["notifications"] notifications = res["notifications"]
@ -184,24 +231,30 @@ class EbicsFile(models.Model):
st_cnt = len(statement_ids) st_cnt = len(statement_ids)
warning_cnt = error_cnt = 0 warning_cnt = error_cnt = 0
if notifications: if notifications:
errors = []
warnings = []
for notif in notifications: for notif in notifications:
if notif["type"] == "error": if notif["type"] == "error":
error_cnt += 1 error_cnt += 1
parts = [notif[k] for k in notif if k in ("message", "details")]
errors.append("\n".join(parts))
elif notif["type"] == "warning": elif notif["type"] == "warning":
warning_cnt += 1 warning_cnt += 1
parts = [notif[k] for k in notif if k in ("message", "details")] parts = [notif[k] for k in notif if k in ("message", "details")]
self.note_process += "\n".join(parts) warnings.append("\n".join(parts))
self.note_process += "\n\n"
self.note_process += "\n" self.note_process += _("Process file %(fn)s results:", fn=self.name)
if error_cnt: if error_cnt:
self.note_process += ( self.note_process += "\n\n" + _("Errors") + ":\n"
_("Number of errors detected during import: %s") % error_cnt self.note_process += "\n".join(errors)
) self.note_process += "\n\n"
self.note_process += "\n" self.note_process += _("Number of errors: %(nr)s", nr=error_cnt)
if warning_cnt: if warning_cnt:
self.note_process += ( self.note_process += "\n\n" + _("Warnings") + ":\n"
_("Number of warnings detected during import: %s") % warning_cnt self.note_process += "\n".join(warnings)
) self.note_process += "\n\n"
self.note_process += _("Number of warnings: %(nr)s", nr=warning_cnt)
self.note_process += "\n"
if st_cnt: if st_cnt:
self.note_process += "\n\n" self.note_process += "\n\n"
self.note_process += _( self.note_process += _(
@ -211,10 +264,11 @@ class EbicsFile(models.Model):
) )
self.note_process += "\n" self.note_process += "\n"
for statement in statements: for statement in statements:
self.note_process += ("\n%s, %s (%s)") % ( self.note_process += "\n" + _(
statement.date, "Statement %(st)s dated %(date)s (Company: %(cpy)s)",
statement.name, st=statement.name,
statement.company_id.name, date=statement.date,
cpy=statement.company_id.name,
) )
if statement_ids: if statement_ids:
self.sudo().bank_statement_ids = [(4, x) for x in statement_ids] self.sudo().bank_statement_ids = [(4, x) for x in statement_ids]
@ -236,103 +290,12 @@ class EbicsFile(models.Model):
} }
def _process_cfonb120(self): def _process_cfonb120(self):
"""
Disable this code while waiting on OCA cfonb release for 16.0
"""
# pylint: disable=W0101
raise NotImplementedError
import_module = "account_statement_import_fr_cfonb" import_module = "account_statement_import_fr_cfonb"
self._check_import_module(import_module) self._check_import_module(import_module)
wiz_model = "account.statement.import" res = {"statement_ids": [], "notifications": []}
data_file = base64.b64decode(self.data) st_datas = self._split_cfonb(res)
lines = data_file.split(b"\n") self._process_bank_statement_oca(res, st_datas)
wiz_vals_list = [] return self._process_download_result(res)
st_lines = b""
transactions = False
for line in lines:
rec_type = line[0:2]
acc_number = line[21:32]
st_lines += line + b"\n"
if rec_type == b"04":
transactions = True
if rec_type == b"07":
if transactions:
fn = "_".join([acc_number.decode(), self.name])
wiz_vals_list.append(
{
"statement_filename": fn,
"statement_file": base64.b64encode(st_lines),
}
)
st_lines = b""
transactions = False
result_action = self.env["ir.actions.act_window"]._for_xml_id(
"account.action_bank_statement_tree"
)
result_action["context"] = safe_eval(result_action["context"])
statement_ids = []
notifications = []
for i, wiz_vals in enumerate(wiz_vals_list, start=1):
result = {
"statement_ids": [],
"notifications": [],
}
statement_filename = wiz_vals["statement_filename"]
wiz = (
self.env[wiz_model]
.with_context(active_model="ebics.file")
.create(wiz_vals)
)
try:
with self.env.cr.savepoint():
file_data = base64.b64decode(wiz_vals["statement_file"])
msg_hdr = _(
"{} : Import failed for statement number %(index)s, filename %(fn)s:\n",
index=i,
fn=statement_filename,
)
wiz.import_single_file(file_data, result)
if not result["statement_ids"]:
message = msg_hdr.format(_("Warning"))
message += _(
"You have already imported this file, or this file "
"only contains already imported transactions."
)
notifications += [
{
"type": "warning",
"message": message,
}
]
else:
statement_ids.extend(result["statement_ids"])
notifications.extend(result["notifications"])
except UserError as e:
message = msg_hdr.format(_("Error"))
message += "".join(e.args)
notifications += [
{
"type": "error",
"message": message,
}
]
except Exception:
tb = "".join(format_exception(*exc_info()))
message = msg_hdr.format(_("Error"))
message += tb
notifications += [
{
"type": "error",
"message": message,
}
]
result_action["context"]["notifications"] = notifications
result_action["domain"] = [("id", "in", statement_ids)]
return self._process_result_action(result_action)
def _unlink_cfonb120(self): def _unlink_cfonb120(self):
""" """
@ -340,6 +303,41 @@ class EbicsFile(models.Model):
EBICS data file and its related bank statements. EBICS data file and its related bank statements.
""" """
def _split_cfonb(self, res):
"""
Split CFONB file received via EBICS per statement.
Statements without transactions are removed.
"""
datas = []
file_data = base64.b64decode(self.data)
lines = file_data.split(b"\n")
st_lines = b""
transactions = False
for line in lines:
rec_type = line[0:2]
currency_code = line[16:19].decode()
acc_number = line[21:32].decode()
st_lines += line + b"\n"
if rec_type == b"04":
transactions = True
if rec_type == b"07":
if transactions:
currency, journal = self._lookup_journal(
res, acc_number, currency_code
)
if currency and journal:
datas.append(
{
"acc_number": acc_number,
"journal_id": journal.id,
"company_id": journal.company_id.id,
"data": base64.b64encode(st_lines),
}
)
st_lines = b""
transactions = False
return datas
def _process_camt052(self): def _process_camt052(self):
import_module = "account_statement_import_camt" import_module = "account_statement_import_camt"
self._check_import_module(import_module) self._check_import_module(import_module)
@ -394,28 +392,25 @@ class EbicsFile(models.Model):
res = {"statement_ids": [], "notifications": []} res = {"statement_ids": [], "notifications": []}
st_datas = self._split_camt(res) st_datas = self._split_camt(res)
if author == "oca": if author == "oca":
self._process_camt053_oca(res, st_datas) self._process_bank_statement_oca(res, st_datas)
else: else:
self._process_camt053_oe(res, st_datas) self._process_bank_statement_oe(res, st_datas)
return self._process_download_result(res) return self._process_download_result(res)
def _process_camt053_oca(self, res, st_datas): def _process_bank_statement_oca(self, res, st_datas):
msg_hdr = _("{} : Import failed for file %(fn)s:\n", fn=self.name)
for st_data in st_datas: for st_data in st_datas:
try: try:
with self.env.cr.savepoint(): with self.env.cr.savepoint():
self._create_statement_camt053_oca(res, st_data) self._create_bank_statement_oca(res, st_data)
except UserError as e: except UserError as e:
message = msg_hdr.format(_("Error")) res["notifications"].append(
message += "".join(e.args) {"type": "error", "message": "".join(e.args)}
res["notifications"].append({"type": "error", "message": message}) )
except Exception: except Exception:
tb = "".join(format_exception(*exc_info())) tb = "".join(format_exception(*exc_info()))
message = msg_hdr.format(_("Error")) res["notifications"].append({"type": "error", "message": tb})
message += tb
res["notifications"].append({"type": "error", "message": message})
def _create_statement_camt053_oca(self, res, st_data): def _create_bank_statement_oca(self, res, st_data):
wiz = ( wiz = (
self.env["account.statement.import"] self.env["account.statement.import"]
.with_company(st_data["company_id"]) .with_company(st_data["company_id"])
@ -424,27 +419,28 @@ class EbicsFile(models.Model):
) )
wiz.import_single_file(base64.b64decode(st_data["data"]), res) wiz.import_single_file(base64.b64decode(st_data["data"]), res)
def _process_camt053_oe(self, res, st_datas): def _process_bank_statement_oe(self, res, st_datas):
""" """
We execute a cr.commit() after every statement import since we get a We execute a cr.commit() after every statement import since we get a
'savepoint does not exist' error when using 'with self.env.cr.savepoint()'. 'savepoint does not exist' error when using 'with self.env.cr.savepoint()'.
""" """
msg_hdr = _("{} : Import failed for file %(fn)s:\n", fn=self.name)
for st_data in st_datas: for st_data in st_datas:
try: try:
self._create_statement_camt053_oe(res, st_data) self._create_bank_statement_oe(res, st_data)
self.env.cr.commit() # pylint: disable=E8102 self.env.cr.commit() # pylint: disable=E8102
except UserError as e: except UserError as e:
message = msg_hdr.format(_("Error")) msg = "".join(e.args)
message += "".join(e.args) msg += "\n"
res["notifications"].append({"type": "error", "message": message}) msg += _(
"Statement for Account Number %(nr)s has not been processed.",
nr=st_data["acc_number"],
)
res["notifications"].append({"type": "error", "message": msg})
except Exception: except Exception:
tb = "".join(format_exception(*exc_info())) tb = "".join(format_exception(*exc_info()))
message = msg_hdr.format(_("Error")) res["notifications"].append({"type": "error", "message": tb})
message += tb
res["notifications"].append({"type": "error", "message": message})
def _create_statement_camt053_oe(self, res, st_data): def _create_bank_statement_oe(self, res, st_data):
attachment = ( attachment = (
self.env["ir.attachment"] self.env["ir.attachment"]
.with_company(st_data["company_id"]) .with_company(st_data["company_id"])
@ -486,20 +482,14 @@ class EbicsFile(models.Model):
Statements without transactions are removed. Statements without transactions are removed.
""" """
datas = [] datas = []
msg_hdr = _("{} : Import failed for file %(fn)s:\n", fn=self.name)
file_data = base64.b64decode(self.data) file_data = base64.b64decode(self.data)
root = etree.fromstring(file_data, parser=etree.XMLParser(recover=True)) root = etree.fromstring(file_data, parser=etree.XMLParser(recover=True))
if root is None: if root is None:
message = msg_hdr.format(_("Error")) message = _("Invalid XML file.")
message += _("Invalid XML file.")
res["notifications"].append({"type": "error", "message": message}) res["notifications"].append({"type": "error", "message": message})
ns = {k or "ns": v for k, v in root.nsmap.items()} ns = {k or "ns": v for k, v in root.nsmap.items()}
for i, stmt in enumerate(root[0].findall("ns:Stmt", ns), start=1): stmts = root[0].findall("ns:Stmt", ns)
msg_hdr = _( for i, stmt in enumerate(stmts):
"{} : Import failed for statement number %(index)s, filename %(fn)s:\n",
index=i,
fn=self.name,
)
acc_number = sanitize_account_number( acc_number = sanitize_account_number(
stmt.xpath( stmt.xpath(
"ns:Acct/ns:Id/ns:IBAN/text() | ns:Acct/ns:Id/ns:Othr/ns:Id/text()", "ns:Acct/ns:Id/ns:IBAN/text() | ns:Acct/ns:Id/ns:Othr/ns:Id/text()",
@ -507,72 +497,34 @@ class EbicsFile(models.Model):
)[0] )[0]
) )
if not acc_number: if not acc_number:
message = msg_hdr.format(_("Error")) message = _("No bank account number found.")
message += _("No bank account number found.")
res["notifications"].append({"type": "error", "message": message}) res["notifications"].append({"type": "error", "message": message})
continue continue
currency_code = stmt.xpath( currency_code = stmt.xpath(
"ns:Acct/ns:Ccy/text() | ns:Bal/ns:Amt/@Ccy", namespaces=ns "ns:Acct/ns:Ccy/text() | ns:Bal/ns:Amt/@Ccy", namespaces=ns
)[0] )[0]
currency = self.env["res.currency"].search(
[("name", "=ilike", currency_code)], limit=1
)
if not currency:
message = msg_hdr.format(_("Error"))
message += _("Currency %(cc)s not found.", cc=currency_code)
res["notifications"].append({"type": "error", "message": message})
continue
journal = self.env["account.journal"].search(
[
("type", "=", "bank"),
(
"bank_account_id.sanitized_acc_number",
"ilike",
acc_number,
),
]
)
if not journal:
message = msg_hdr.format(_("Error"))
message += _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append({"type": "error", "message": message})
continue
journal_currency = journal.currency_id or journal.company_id.currency_id
if journal_currency != currency:
message = msg_hdr.format(_("Error"))
message += _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append({"type": "error", "message": message})
continue
root_new = deepcopy(root) root_new = deepcopy(root)
entries = False entries = False
for j, el in enumerate(root_new[0].findall("ns:Stmt", ns), start=1): for j, el in enumerate(root_new[0].findall("ns:Stmt", ns)):
if j != i: if j != i:
el.getparent().remove(el) el.getparent().remove(el)
else: else:
entries = el.findall("ns:Ntry", ns) entries = el.findall("ns:Ntry", ns)
if not entries: if not entries:
continue continue
else:
datas.append( currency, journal = self._lookup_journal(res, acc_number, currency_code)
{ if not (currency and journal):
"acc_number": acc_number, continue
"journal_id": journal.id, datas.append(
"company_id": journal.company_id.id, {
"data": base64.b64encode(etree.tostring(root_new)), "acc_number": acc_number,
} "journal_id": journal.id,
) "company_id": journal.company_id.id,
"data": base64.b64encode(etree.tostring(root_new)),
}
)
return datas return datas

View File

@ -46,15 +46,35 @@ class AccountStatementImport(models.TransientModel):
show days without transactions via the bank statement list view. show days without transactions via the bank statement list view.
""" """
if self.env.context.get("active_model") == "ebics.file": if self.env.context.get("active_model") == "ebics.file":
messages = []
transactions = False transactions = False
for st_vals in stmts_vals: for st_vals in stmts_vals:
statement_ids = result["statement_ids"][:]
self._set_statement_name(st_vals)
if st_vals.get("transactions"): if st_vals.get("transactions"):
transactions = True transactions = True
break super()._create_bank_statements(stmts_vals, result)
if not transactions: if result["statement_ids"] == statement_ids:
message = _("This file doesn't contain any transaction.") # no statement has been created, this is the case
st_line_ids = [] # when all transactions have been imported already
notifications = {"type": "warning", "message": message, "details": ""} messages.append(
return st_line_ids, [notifications] _(
"Statement %(st_name)s dated %(date)s "
"has already been imported.",
st_name=st_vals["name"],
date=st_vals["date"].strftime("%Y-%m-%d"),
)
)
return super()._create_bank_statements(stmts_vals, result) if not transactions:
messages.append(_("This file doesn't contain any transaction."))
if messages:
result["notifications"].append(
{"type": "warning", "message": "\n".join(messages)}
)
return
def _set_statement_name(self, st_vals):
"""
Inherit this method to set your own statement naming policy.
"""