[15.0]cfonb processing - set savepoint per statement

This commit is contained in:
Luc De Meyer 2022-12-10 21:34:09 +01:00
parent 510a244ced
commit 598c25ce67
2 changed files with 96 additions and 65 deletions

View File

@ -3,7 +3,7 @@
{ {
"name": "EBICS banking protocol", "name": "EBICS banking protocol",
"version": "15.0.1.0.0", "version": "15.0.1.0.1",
"license": "LGPL-3", "license": "LGPL-3",
"author": "Noviat", "author": "Noviat",
"website": "https://www.noviat.com", "website": "https://www.noviat.com",

View File

@ -3,6 +3,8 @@
import base64 import base64
import logging import logging
from sys import exc_info
from traceback import format_exception
from odoo import _, fields, models from odoo import _, fields, models
from odoo.exceptions import UserError from odoo.exceptions import UserError
@ -167,24 +169,13 @@ class EbicsFile(models.Model):
return False return False
return True return True
def _process_result_action(self, res): def _process_result_action(self, res_action):
notifications = [] notifications = []
st_line_ids = [] st_line_ids = []
statement_ids = [] statement_ids = []
sts_data = [] sts_data = []
if res.get("type") and res["type"] == "ir.actions.client": if res_action.get("type") and res_action["type"] == "ir.actions.client":
notifications = res["context"].get("notifications", []) st_line_ids = res_action["context"].get("statement_line_ids", [])
st_line_ids = res["context"].get("statement_line_ids", [])
if notifications:
for notif in notifications:
parts = []
for k in ["type", "message", "details"]:
if notif.get(k):
msg = "{}: {}".format(k, notif[k])
parts.append(msg)
self.note_process += "\n".join(parts)
self.note_process += "\n"
self.note_process += "\n"
if st_line_ids: if st_line_ids:
self.flush() self.flush()
self.env.cr.execute( self.env.cr.execute(
@ -205,10 +196,10 @@ class EbicsFile(models.Model):
) )
sts_data = self.env.cr.dictfetchall() sts_data = self.env.cr.dictfetchall()
else: else:
if res.get("res_id"): if res_action.get("res_id"):
st_ids = res["res_id"] st_ids = res_action["res_id"]
else: else:
st_ids = res["domain"][2] st_ids = res_action["domain"][0][2]
statements = self.env["account.bank.statement"].browse(st_ids) statements = self.env["account.bank.statement"].browse(st_ids)
for statement in statements: for statement in statements:
sts_data.append( sts_data.append(
@ -220,7 +211,29 @@ class EbicsFile(models.Model):
} }
) )
st_cnt = len(sts_data) st_cnt = len(sts_data)
warning_cnt = error_cnt = 0
notifications = res_action["context"].get("notifications", [])
if notifications:
for notif in notifications:
if notif["type"] == "error":
error_cnt += 1
elif notif["type"] == "warning":
warning_cnt += 1
parts = [notif[k] for k in notif if k in ("message", "details")]
self.note_process += "\n".join(parts)
self.note_process += "\n\n"
self.note_process += "\n"
if error_cnt:
self.note_process += (
_("Number of errors detected during import: %s: ") % error_cnt
)
self.note_process += "\n"
if warning_cnt:
self.note_process += (
_("Number of watnings detected during import: %s: ") % warning_cnt
)
if st_cnt: if st_cnt:
self.note_process += "\n\n"
self.note_process += _("%s bank statements have been imported: ") % st_cnt self.note_process += _("%s bank statements have been imported: ") % st_cnt
self.note_process += "\n" self.note_process += "\n"
for st_data in sts_data: for st_data in sts_data:
@ -231,7 +244,7 @@ class EbicsFile(models.Model):
) )
statement_ids = [x["statement_id"] for x in sts_data] statement_ids = [x["statement_id"] for x in sts_data]
if statement_ids: if statement_ids:
self.sudo().bank_statement_ids = [(6, 0, statement_ids)] self.sudo().bank_statement_ids = [(4, x) for x in statement_ids]
ctx = dict(self.env.context, statement_ids=statement_ids) ctx = dict(self.env.context, statement_ids=statement_ids)
module = __name__.split("addons.")[1].split(".")[0] module = __name__.split("addons.")[1].split(".")[0]
result_view = self.env.ref("%s.ebics_file_view_form_result" % module) result_view = self.env.ref("%s.ebics_file_view_form_result" % module)
@ -278,42 +291,72 @@ class EbicsFile(models.Model):
) )
st_lines = b"" st_lines = b""
transactions = False transactions = False
result = { result_action = self.env["ir.actions.act_window"]._for_xml_id(
"type": "ir.actions.client", "account.action_bank_statement_tree"
"tag": "bank_statement_reconciliation_view", )
"context": { result_action["context"] = safe_eval(result_action["context"])
"statement_line_ids": [], statement_ids = []
"company_ids": self.env.user.company_ids.ids, notifications = []
"notifications": [],
},
}
for i, wiz_vals in enumerate(wiz_vals_list, start=1): for i, wiz_vals in enumerate(wiz_vals_list, start=1):
result = {
"statement_ids": [],
"notifications": [],
}
statement_filename = wiz_vals["statement_filename"]
wiz = ( wiz = (
self.env[wiz_model] self.env[wiz_model]
.with_context(active_model="ebics.file") .with_context(active_model="ebics.file")
.create(wiz_vals) .create(wiz_vals)
) )
res = wiz.import_file_button() try:
ctx = res.get("context") with self.env.cr.savepoint():
if res.get("res_model") == "account.bank.statement.import.journal.creation": file_data = base64.b64decode(wiz_vals["statement_file"])
message = _("Error detected while importing statement number %s.\n") % i msg_hdr = _(
message += _("No financial journal found.") "{} : Import failed for statement number %(index)s, filename %(fn)s:\n",
details = _("Bank account number: %s") % ctx.get( index=i,
"default_bank_acc_number" fn=statement_filename,
) )
result["context"]["notifications"].extend( wiz.import_single_file(file_data, result)
[ if not result["statement_ids"]:
message = msg_hdr.format(_("Warning"))
message += _(
"You have already imported this file, or this file "
"only contains already imported transactions."
)
notifications += [
{ {
"type": "warning", "type": "warning",
"message": message, "message": message,
"details": details,
} }
] ]
) else:
continue statement_ids.extend(result["statement_ids"])
result["context"]["statement_line_ids"].extend(ctx["statement_line_ids"]) notifications.extend(result["notifications"])
result["context"]["notifications"].extend(ctx["notifications"])
return self._process_result_action(result) except UserError as e:
message = msg_hdr.format(_("Error"))
message += "".join(e.args)
notifications += [
{
"type": "error",
"message": message,
}
]
except Exception:
tb = "".join(format_exception(*exc_info()))
message = msg_hdr.format(_("Error"))
message += tb
notifications += [
{
"type": "error",
"message": message,
}
]
result_action["context"]["notifications"] = notifications
result_action["domain"] = [("id", "in", statement_ids)]
return self._process_result_action(result_action)
@staticmethod @staticmethod
def _unlink_cfonb120(self): def _unlink_cfonb120(self):
@ -393,22 +436,10 @@ class EbicsFile(models.Model):
self.env[wiz_model].with_context(active_model="ebics.file").create(wiz_vals) self.env[wiz_model].with_context(active_model="ebics.file").create(wiz_vals)
) )
res = wiz.import_file_button() res = wiz.import_file_button()
ctx = res.get("context") result["context"]["statement_line_ids"].extend(
if res.get("res_model") == "account.bank.statement.import.journal.creation": res["context"]["statement_line_ids"]
message = _("Error detected while importing statement %s.\n") % self.name
message += _("No financial journal found.")
details = _("Bank account number: %s") % ctx.get("default_bank_acc_number")
result["context"]["notifications"].extend(
[
{
"type": "warning",
"message": message,
"details": details,
}
]
) )
result["context"]["statement_line_ids"].extend(ctx["statement_line_ids"]) result["context"]["notifications"].extend(res["context"]["notifications"])
result["context"]["notifications"].extend(ctx["notifications"])
return self._process_result_action(result) return self._process_result_action(result)
def _process_camt053_oe(self): def _process_camt053_oe(self):