Merge pull request #103 from Noviat/16.0.1.4.0

[IMP] prevent creation of dup statements, removal of unneeded code
This commit is contained in:
Luc De Meyer 2023-08-05 18:09:27 +02:00 committed by GitHub
commit fa968216c4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 61 additions and 64 deletions

View File

@ -3,7 +3,7 @@
{ {
"name": "EBICS banking protocol", "name": "EBICS banking protocol",
"version": "16.0.1.3.2", "version": "16.0.1.4.0",
"license": "LGPL-3", "license": "LGPL-3",
"author": "Noviat", "author": "Noviat",
"website": "https://www.noviat.com", "website": "https://www.noviat.com",

View File

@ -8,3 +8,4 @@ class AccountBankStatement(models.Model):
_inherit = "account.bank.statement" _inherit = "account.bank.statement"
ebics_file_id = fields.Many2one(comodel_name="ebics.file", string="EBICS Data File") ebics_file_id = fields.Many2one(comodel_name="ebics.file", string="EBICS Data File")
import_format = fields.Char(readonly=True)

View File

@ -91,14 +91,6 @@ class EbicsConfig(models.Model):
"between customer and financial institution. " "between customer and financial institution. "
"The human user also can authorise orders.", "The human user also can authorise orders.",
) )
ebics_files = fields.Char(
string="EBICS Files Root",
required=True,
readonly=True,
states={"draft": [("readonly", False)]},
default=lambda self: self._default_ebics_files(),
help="Root Directory for EBICS File Transfer Folders.",
)
# We store the EBICS keys in a separate directory in the file system. # We store the EBICS keys in a separate directory in the file system.
# This directory requires special protection to reduce fraude. # This directory requires special protection to reduce fraude.
ebics_keys = fields.Char( ebics_keys = fields.Char(
@ -253,14 +245,3 @@ class EbicsConfig(models.Model):
) )
% dirname % dirname
) )
def _check_ebics_files(self):
dirname = self.ebics_files or ""
if not os.path.exists(dirname):
raise UserError(
_(
"EBICS Files Root Directory %s is not available."
"\nPlease contact your system administrator."
)
% dirname
)

View File

@ -16,6 +16,8 @@ from odoo.addons.base.models.res_bank import sanitize_account_number
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
DUP_CHECK_FORMATS = ["cfonb120", "camt053"]
class EbicsFile(models.Model): class EbicsFile(models.Model):
_name = "ebics.file" _name = "ebics.file"
@ -224,11 +226,21 @@ class EbicsFile(models.Model):
res["notifications"].append({"type": "error", "message": message}) res["notifications"].append({"type": "error", "message": message})
return (currency, journal) return (currency, journal)
def _process_download_result(self, res): def _process_download_result(self, res, file_format=None):
"""
We perform a duplicate statement check after the creation of the bank
statements since we rely on Odoo Enterprise or OCA modules for the
bank statement creation.
From a development standpoint (code creation/maintenance) a check after
creation is the easiest way.
"""
statement_ids = res["statement_ids"] statement_ids = res["statement_ids"]
notifications = res["notifications"] notifications = res["notifications"]
statements = self.env["account.bank.statement"].sudo().browse(statement_ids) statements = self.env["account.bank.statement"].sudo().browse(statement_ids)
st_cnt = len(statement_ids) if statements:
statements.write({"import_format": file_format})
statements = self._statement_duplicate_check(res, statements)
st_cnt = len(statements)
warning_cnt = error_cnt = 0 warning_cnt = error_cnt = 0
if notifications: if notifications:
errors = [] errors = []
@ -270,11 +282,11 @@ class EbicsFile(models.Model):
date=statement.date, date=statement.date,
cpy=statement.company_id.name, cpy=statement.company_id.name,
) )
if statement_ids: if statements:
self.sudo().bank_statement_ids = [(4, x) for x in statement_ids] self.sudo().bank_statement_ids = [(4, x) for x in statements.ids]
company_ids = self.sudo().bank_statement_ids.mapped("company_id").ids company_ids = self.sudo().bank_statement_ids.mapped("company_id").ids
self.company_ids = [(6, 0, company_ids)] self.company_ids = [(6, 0, company_ids)]
ctx = dict(self.env.context, statement_ids=statement_ids) ctx = dict(self.env.context, statement_ids=statements.ids)
module = __name__.split("addons.")[1].split(".")[0] module = __name__.split("addons.")[1].split(".")[0]
result_view = self.env.ref("%s.ebics_file_view_form_result" % module) result_view = self.env.ref("%s.ebics_file_view_form_result" % module)
return { return {
@ -289,13 +301,47 @@ class EbicsFile(models.Model):
"type": "ir.actions.act_window", "type": "ir.actions.act_window",
} }
def _statement_duplicate_check(self, res, statements):
"""
This check is required for import modules that do not
set the 'unique_import_id' on the statement lines.
E.g. OCA camt import
"""
to_unlink = self.env["account.bank.statement"]
for statement in statements.filtered(
lambda r: r.import_format in DUP_CHECK_FORMATS
):
dup = self.env["account.bank.statement"].search_count(
[
("id", "!=", statement.id),
("name", "=", statement.name),
("company_id", "=", statement.company_id.id),
("date", "=", statement.date),
("import_format", "=", statement.import_format),
]
)
if dup:
message = _(
"Statement %(st_name)s dated %(date)s has already been imported.",
st_name=statement.name,
date=statement.date,
)
res["notifications"].append({"type": "warning", "message": message})
to_unlink += statement
res["statement_ids"] = [
x for x in res["statement_ids"] if x not in to_unlink.ids
]
statements -= to_unlink
to_unlink.unlink()
return statements
def _process_cfonb120(self): def _process_cfonb120(self):
import_module = "account_statement_import_fr_cfonb" import_module = "account_statement_import_fr_cfonb"
self._check_import_module(import_module) self._check_import_module(import_module)
res = {"statement_ids": [], "notifications": []} res = {"statement_ids": [], "notifications": []}
st_datas = self._split_cfonb(res) st_datas = self._split_cfonb(res)
self._process_bank_statement_oca(res, st_datas) self._process_bank_statement_oca(res, st_datas)
return self._process_download_result(res) return self._process_download_result(res, file_format="cfonb120")
def _unlink_cfonb120(self): def _unlink_cfonb120(self):
""" """
@ -341,7 +387,7 @@ class EbicsFile(models.Model):
def _process_camt052(self): def _process_camt052(self):
import_module = "account_statement_import_camt" import_module = "account_statement_import_camt"
self._check_import_module(import_module) self._check_import_module(import_module)
return self._process_camt053(self) return self._process_camt053(file_format="camt052")
def _unlink_camt052(self): def _unlink_camt052(self):
""" """
@ -352,7 +398,7 @@ class EbicsFile(models.Model):
def _process_camt054(self): def _process_camt054(self):
import_module = "account_statement_import_camt" import_module = "account_statement_import_camt"
self._check_import_module(import_module) self._check_import_module(import_module)
return self._process_camt053(self) return self._process_camt053(file_format="camt054")
def _unlink_camt054(self): def _unlink_camt054(self):
""" """
@ -360,7 +406,7 @@ class EbicsFile(models.Model):
EBICS data file and its related bank statements. EBICS data file and its related bank statements.
""" """
def _process_camt053(self): def _process_camt053(self, file_format=None):
""" """
The Odoo standard statement import is based on manual selection The Odoo standard statement import is based on manual selection
of a financial journal before importing the electronic statement file. of a financial journal before importing the electronic statement file.
@ -395,7 +441,8 @@ class EbicsFile(models.Model):
self._process_bank_statement_oca(res, st_datas) self._process_bank_statement_oca(res, st_datas)
else: else:
self._process_bank_statement_oe(res, st_datas) self._process_bank_statement_oe(res, st_datas)
return self._process_download_result(res) file_format = file_format or "camt053"
return self._process_download_result(res, file_format=file_format)
def _process_bank_statement_oca(self, res, st_datas): def _process_bank_statement_oca(self, res, st_datas):
for st_data in st_datas: for st_data in st_datas:

View File

@ -255,7 +255,6 @@ class EbicsUserID(models.Model):
Create new keys and certificates for this user Create new keys and certificates for this user
""" """
self.ensure_one() self.ensure_one()
self.ebics_config_id._check_ebics_files()
if self.state != "draft": if self.state != "draft":
raise UserError( raise UserError(
_("Set state to 'draft' before Bank Key (re)initialisation.") _("Set state to 'draft' before Bank Key (re)initialisation.")
@ -442,7 +441,6 @@ class EbicsUserID(models.Model):
must be downloaded and checked for consistency. must be downloaded and checked for consistency.
""" """
self.ensure_one() self.ensure_one()
self.ebics_config_id._check_ebics_files()
if self.state != "get_bank_keys": if self.state != "get_bank_keys":
raise UserError(_("Set state to 'Get Keys from Bank'.")) raise UserError(_("Set state to 'Get Keys from Bank'."))
try: try:

View File

@ -52,7 +52,6 @@
<field name="ebics_host" /> <field name="ebics_host" />
<field name="ebics_url" /> <field name="ebics_url" />
<field name="ebics_partner" /> <field name="ebics_partner" />
<field name="ebics_files" />
<field name="ebics_keys" /> <field name="ebics_keys" />
</group> </group>
<group name="main-right"> <group name="main-right">

View File

@ -27,7 +27,6 @@ class EbicsAdminOrder(models.TransientModel):
def ebics_admin_order(self): def ebics_admin_order(self):
self.ensure_one() self.ensure_one()
self.ebics_config_id._check_ebics_files()
client = self._setup_client() client = self._setup_client()
if not client: if not client:
self.note += ( self.note += (

View File

@ -10,7 +10,6 @@ logging.basicConfig(
import base64 import base64
import logging import logging
import os
from sys import exc_info from sys import exc_info
from traceback import format_exception from traceback import format_exception
@ -193,7 +192,6 @@ class EbicsXfer(models.TransientModel):
def ebics_download(self): def ebics_download(self):
self.ensure_one() self.ensure_one()
self.ebics_config_id._check_ebics_files()
ctx = self.env.context.copy() ctx = self.env.context.copy()
self.note = "" self.note = ""
err_cnt = 0 err_cnt = 0
@ -532,43 +530,17 @@ class EbicsXfer(models.TransientModel):
return ebics_files return ebics_files
def _create_ebics_file(self, data, file_format, docname=None): def _create_ebics_file(self, data, file_format, docname=None):
"""
Write the data as received over the EBICS connection
to a temporary file so that is is available for
analysis (e.g. in case formats are received that cannot
be handled in the current version of this module).
TODO: add code to clean-up /tmp on a regular basis.
After saving the data received we call the method to perform
file format specific processing.
"""
ebics_files_root = self.ebics_config_id.ebics_files
tmp_dir = os.path.normpath(ebics_files_root + "/tmp")
if not os.path.isdir(tmp_dir):
os.makedirs(tmp_dir, mode=0o700)
fn_parts = [self.ebics_config_id.ebics_host, self.ebics_config_id.ebics_partner] fn_parts = [self.ebics_config_id.ebics_host, self.ebics_config_id.ebics_partner]
if docname: if docname:
fn_parts.append(docname) fn_parts.append(docname)
else: else:
fn_date = self.date_to or fields.Date.today() fn_date = self.date_to or fields.Date.today()
fn_parts.append(fn_date.isoformat()) fn_parts.append(fn_date.isoformat())
base_fn = "_".join(fn_parts) fn = "_".join(fn_parts)
n = 1
full_tmp_fn = os.path.normpath(tmp_dir + "/" + base_fn)
while os.path.exists(full_tmp_fn):
n += 1
tmp_fn = base_fn + "_" + str(n).rjust(3, "0")
full_tmp_fn = os.path.normpath(tmp_dir + "/" + tmp_fn)
with open(full_tmp_fn, "wb") as f:
f.write(data)
ff_methods = self._file_format_methods() ff_methods = self._file_format_methods()
if file_format.name in ff_methods: if file_format.name in ff_methods:
data = ff_methods[file_format.name](data) data = ff_methods[file_format.name](data)
fn = base_fn
suffix = file_format.suffix suffix = file_format.suffix
if suffix and not fn.endswith(suffix): if suffix and not fn.endswith(suffix):
fn = ".".join([fn, suffix]) fn = ".".join([fn, suffix])

View File

@ -53,7 +53,7 @@ class AccountStatementImport(models.TransientModel):
self._set_statement_name(st_vals) self._set_statement_name(st_vals)
if st_vals.get("transactions"): if st_vals.get("transactions"):
transactions = True transactions = True
super()._create_bank_statements(stmts_vals, result) super()._create_bank_statements([st_vals], result)
if result["statement_ids"] == statement_ids: if result["statement_ids"] == statement_ids:
# no statement has been created, this is the case # no statement has been created, this is the case
# when all transactions have been imported already # when all transactions have been imported already