fix 'savepoint does not exist' stack trace at statement import

This commit is contained in:
Luc De Meyer 2023-07-30 16:21:11 +02:00
parent 59a85625db
commit d5143617c1
2 changed files with 146 additions and 220 deletions

View File

@ -3,7 +3,7 @@
{
"name": "EBICS banking protocol",
"version": "16.0.1.3.0",
"version": "16.0.1.3.1",
"license": "LGPL-3",
"author": "Noviat",
"website": "https://www.noviat.com",

View File

@ -180,21 +180,7 @@ class EbicsFile(models.Model):
def _process_download_result(self, res):
statement_ids = res["statement_ids"]
notifications = res["notifications"]
sts_data = []
if statement_ids:
self.env.flush_all()
self.env.cr.execute(
"""
SELECT abs.name, abs.date, abs.company_id, rc.name AS company_name
FROM account_bank_statement abs
JOIN res_company rc ON rc.id = abs.company_id
WHERE abs.id in %s
ORDER BY abs.date, rc.id
""",
(tuple(res["statement_ids"]),),
)
sts_data = self.env.cr.dictfetchall()
statements = self.env["account.bank.statement"].sudo().browse(statement_ids)
st_cnt = len(statement_ids)
warning_cnt = error_cnt = 0
if notifications:
@ -224,11 +210,11 @@ class EbicsFile(models.Model):
sp=st_cnt == 1 and _(" has") or _("s have"),
)
self.note_process += "\n"
for st_data in sts_data:
for statement in statements:
self.note_process += ("\n%s, %s (%s)") % (
st_data["date"],
st_data["name"],
st_data["company_name"],
statement.date,
statement.name,
statement.company_id.name,
)
if statement_ids:
self.sudo().bank_statement_ids = [(4, x) for x in statement_ids]
@ -376,7 +362,7 @@ class EbicsFile(models.Model):
EBICS data file and its related bank statements.
"""
def _process_camt053(self): # noqa C901
def _process_camt053(self):
"""
The Odoo standard statement import is based on manual selection
of a financial journal before importing the electronic statement file.
@ -385,8 +371,6 @@ class EbicsFile(models.Model):
Hence we need to split the CAMT file into
single statement CAMT files before we can call the logic
implemented by the Odoo OE or Community CAMT parsers.
TODO: refactor method to enable removal of noqa C901
"""
modules = [
("oca", "account_statement_import_camt"),
@ -408,129 +392,13 @@ class EbicsFile(models.Model):
)
)
res = {"statement_ids": [], "notifications": []}
st_datas = self._split_camt(res)
msg_hdr = _("{} : Import failed for file %(fn)s:\n", fn=self.name)
try:
with self.env.cr.savepoint():
transactions = False
msg_hdr = _("{} : Import failed for file %(fn)s:\n", fn=self.name)
file_data = base64.b64decode(self.data)
root = etree.fromstring(file_data, parser=etree.XMLParser(recover=True))
if root is None:
message = msg_hdr.format(_("Error"))
message += _("Invalid XML file.")
res["notifications"].append({"type": "error", "message": message})
ns = {k or "ns": v for k, v in root.nsmap.items()}
for i, stmt in enumerate(root[0].findall("ns:Stmt", ns), start=1):
msg_hdr = _(
"{} : Import failed for statement number %(index)s, filename %(fn)s:\n",
index=i,
fn=self.name,
)
acc_number = sanitize_account_number(
stmt.xpath(
"ns:Acct/ns:Id/ns:IBAN/text() | ns:Acct/ns:Id/ns:Othr/ns:Id/text()",
namespaces=ns,
)[0]
)
if not acc_number:
message = msg_hdr.format(_("Error"))
message += _("No bank account number found.")
res["notifications"].append(
{"type": "error", "message": message}
)
continue
currency_code = stmt.xpath(
"ns:Acct/ns:Ccy/text() | ns:Bal/ns:Amt/@Ccy", namespaces=ns
)[0]
currency = self.env["res.currency"].search(
[("name", "=ilike", currency_code)], limit=1
)
if not currency:
message = msg_hdr.format(_("Error"))
message += _("Currency %(cc)s not found.", cc=currency_code)
res["notifications"].append(
{"type": "error", "message": message}
)
continue
journal = self.env["account.journal"].search(
[
("type", "=", "bank"),
(
"bank_account_id.sanitized_acc_number",
"ilike",
acc_number,
),
]
)
if not journal:
message = msg_hdr.format(_("Error"))
message += _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append(
{"type": "error", "message": message}
)
continue
journal_currency = (
journal.currency_id or journal.company_id.currency_id
)
if journal_currency != currency:
message = msg_hdr.format(_("Error"))
message += _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append(
{"type": "error", "message": message}
)
continue
root_new = deepcopy(root)
entries = False
for j, el in enumerate(root_new[0].findall("ns:Stmt", ns), start=1):
if j != i:
el.getparent().remove(el)
else:
entries = el.findall("ns:Ntry", ns)
if not entries:
continue
transactions = True
data = base64.b64encode(etree.tostring(root_new))
if author == "oca":
# TODO: implement _process_camt053_oca() once OCA camt is
# released for 16.0
raise NotImplementedError
else:
self.env.company = journal.company_id
attachment = self.env["ir.attachment"].create(
{"name": self.name, "datas": data, "store_fname": self.name}
)
act = journal._import_bank_statement(attachment)
for entry in act["domain"]:
if (
isinstance(entry, tuple)
and entry[0] == "statement_id"
and entry[1] == "in"
):
res["statement_ids"].extend(entry[2])
break
notifications = act["context"]["notifications"]
if notifications:
res["notifications"].append(act["context"]["notifications"])
if not transactions:
message = _(
"Warning:\nNo transactions found in file %(fn)s.", fn=self.name
)
res["notifications"].append({"type": "warning", "message": message})
if author == "oca":
self._process_camt053_oca(res, st_datas)
else:
self._process_camt053_oe(res, st_datas)
except UserError as e:
message = msg_hdr.format(_("Error"))
message += "".join(e.args)
@ -541,87 +409,49 @@ class EbicsFile(models.Model):
message += tb
res["notifications"].append({"type": "error", "message": message})
if author == "oca":
# TODO: implement _process_camt053_oca() once OCA camt is
# released for 16.0
return self._process_camt053_oca()
else:
return self._process_download_result(res)
return self._process_download_result(res)
def _process_camt053_oca(self):
"""
Disable this code while waiting on OCA CAMT parser for 16.0
"""
# pylint: disable=W0101
def _process_camt053_oca(self, res, st_datas):
raise NotImplementedError
wiz_model = "account.statement.import"
wiz_vals = {
"statement_filename": self.name,
"statement_file": self.data,
}
result_action = self.env["ir.actions.act_window"]._for_xml_id(
"account.action_bank_statement_tree"
)
result_action["context"] = safe_eval(result_action["context"])
result = {
"statement_ids": [],
"notifications": [],
}
statement_ids = []
notifications = []
wiz = (
self.env[wiz_model].with_context(active_model="ebics.file").create(wiz_vals)
)
msg_hdr = _(
"{} : Import failed for EBICS File %(fn)s:\n",
fn=wiz.statement_filename,
)
try:
with self.env.cr.savepoint():
file_data = base64.b64decode(self.data)
wiz.import_single_file(file_data, result)
def _process_camt053_oe(self, res, st_datas):
"""
We execute a cr.commit() after every statement import since we get a
'savepoint does not exist' error when using 'with self.env.cr.savepoint()'.
"""
for st_data in st_datas:
self._create_statement_camt053_oe(res, st_data)
self.env.cr.commit() # pylint: disable=E8102
if not result["statement_ids"]:
message = msg_hdr.format(_("Warning"))
message += _(
"You have already imported this file, or this file "
"only contains already imported transactions."
)
notifications += [
{
"type": "warning",
"message": message,
}
]
else:
statement_ids.extend(result["statement_ids"])
notifications.extend(result["notifications"])
except UserError as e:
message = msg_hdr.format(_("Error"))
message += "".join(e.args)
notifications += [
def _create_statement_camt053_oe(self, res, st_data):
attachment = (
self.env["ir.attachment"]
.with_company(st_data["company_id"])
.create(
{
"type": "error",
"message": message,
"name": self.name,
"datas": st_data["data"],
"store_fname": self.name,
}
]
except Exception:
tb = "".join(format_exception(*exc_info()))
message = msg_hdr.format(_("Error"))
message += tb
notifications += [
{
"type": "error",
"message": message,
}
]
result_action["context"]["notifications"] = notifications
result_action["domain"] = [("id", "in", statement_ids)]
return self._process_result_action(result_action)
)
)
journal = (
self.env["account.journal"]
.with_company(st_data["company_id"])
.browse(st_data["journal_id"])
)
act = journal._import_bank_statement(attachment)
for entry in act["domain"]:
if (
isinstance(entry, tuple)
and entry[0] == "statement_id"
and entry[1] == "in"
):
res["statement_ids"].extend(entry[2])
break
notifications = act["context"]["notifications"]
if notifications:
res["notifications"].append(act["context"]["notifications"])
def _unlink_camt053(self):
"""
@ -629,6 +459,102 @@ class EbicsFile(models.Model):
EBICS data file and its related bank statements.
"""
def _split_camt(self, res):
"""
Split CAMT file received via EBICS per statement.
Statements without transactions are removed.
"""
datas = []
msg_hdr = _("{} : Import failed for file %(fn)s:\n", fn=self.name)
file_data = base64.b64decode(self.data)
root = etree.fromstring(file_data, parser=etree.XMLParser(recover=True))
if root is None:
message = msg_hdr.format(_("Error"))
message += _("Invalid XML file.")
res["notifications"].append({"type": "error", "message": message})
ns = {k or "ns": v for k, v in root.nsmap.items()}
for i, stmt in enumerate(root[0].findall("ns:Stmt", ns), start=1):
msg_hdr = _(
"{} : Import failed for statement number %(index)s, filename %(fn)s:\n",
index=i,
fn=self.name,
)
acc_number = sanitize_account_number(
stmt.xpath(
"ns:Acct/ns:Id/ns:IBAN/text() | ns:Acct/ns:Id/ns:Othr/ns:Id/text()",
namespaces=ns,
)[0]
)
if not acc_number:
message = msg_hdr.format(_("Error"))
message += _("No bank account number found.")
res["notifications"].append({"type": "error", "message": message})
continue
currency_code = stmt.xpath(
"ns:Acct/ns:Ccy/text() | ns:Bal/ns:Amt/@Ccy", namespaces=ns
)[0]
currency = self.env["res.currency"].search(
[("name", "=ilike", currency_code)], limit=1
)
if not currency:
message = msg_hdr.format(_("Error"))
message += _("Currency %(cc)s not found.", cc=currency_code)
res["notifications"].append({"type": "error", "message": message})
continue
journal = self.env["account.journal"].search(
[
("type", "=", "bank"),
(
"bank_account_id.sanitized_acc_number",
"ilike",
acc_number,
),
]
)
if not journal:
message = msg_hdr.format(_("Error"))
message += _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append({"type": "error", "message": message})
continue
journal_currency = journal.currency_id or journal.company_id.currency_id
if journal_currency != currency:
message = msg_hdr.format(_("Error"))
message += _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append({"type": "error", "message": message})
continue
root_new = deepcopy(root)
entries = False
for j, el in enumerate(root_new[0].findall("ns:Stmt", ns), start=1):
if j != i:
el.getparent().remove(el)
else:
entries = el.findall("ns:Ntry", ns)
if not entries:
continue
datas.append(
{
"acc_number": acc_number,
"journal_id": journal.id,
"company_id": journal.company_id.id,
"data": base64.b64encode(etree.tostring(root_new)),
}
)
return datas
def _process_pain002(self):
"""
Placeholder for processing pain.002 files.