[14.0] add support for EBICS 3.0

This commit is contained in:
Luc De Meyer
2022-12-18 20:55:23 +01:00
parent 4f9151617c
commit a98ea8ec23
19 changed files with 612 additions and 319 deletions

View File

@@ -1,4 +1,4 @@
# Copyright 2009-2020 Noviat.
# Copyright 2009-2022 Noviat.
# License LGPL-3 or later (http://www.gnu.org/licenses/lpgl).
from odoo import fields, models

View File

@@ -22,13 +22,13 @@ class EbicsConfig(models.Model):
_order = "name"
name = fields.Char(
string="Name",
readonly=True,
states={"draft": [("readonly", False)]},
required=True,
)
journal_ids = fields.Many2many(
comodel_name="account.journal",
relation="account_journal_ebics_config_rel",
readonly=True,
states={"draft": [("readonly", False)]},
string="Bank Accounts",
@@ -52,7 +52,11 @@ class EbicsConfig(models.Model):
help="Contact your bank to get the EBICS URL.",
)
ebics_version = fields.Selection(
selection=[("H003", "H003 (2.4)"), ("H004", "H004 (2.5)")],
selection=[
("H003", "H003 (2.4)"),
("H004", "H004 (2.5)"),
("H005", "H005 (3.0)"),
],
string="EBICS protocol version",
readonly=True,
states={"draft": [("readonly", False)]},
@@ -130,7 +134,6 @@ class EbicsConfig(models.Model):
)
state = fields.Selection(
[("draft", "Draft"), ("confirm", "Confirmed")],
string="State",
default="draft",
required=True,
readonly=True,
@@ -143,11 +146,12 @@ class EbicsConfig(models.Model):
"\nThis number should match the following pattern : "
"[A-Z]{1}[A-Z0-9]{3}",
)
active = fields.Boolean(string="Active", default=True)
active = fields.Boolean(default=True)
company_ids = fields.Many2many(
comodel_name="res.company",
relation="ebics_config_res_company_rel",
string="Companies",
required=True,
readonly=True,
help="Companies sharing this EBICS contract.",
)
@@ -179,9 +183,26 @@ class EbicsConfig(models.Model):
)
)
@api.onchange("journal_ids")
def _onchange_journal_ids(self):
self.company_ids = self.journal_ids.mapped("company_id")
def write(self, vals):
"""
Due to the multi-company nature of the EBICS config we
need to adapt the company_ids in the write method.
"""
if "journal_ids" not in vals:
return super().write(vals)
for rec in self:
old_company_ids = rec.journal_ids.mapped("company_id").ids
super(EbicsConfig, rec).write(vals)
new_company_ids = rec.journal_ids.mapped("company_id").ids
updates = []
for cid in new_company_ids:
if cid in old_company_ids:
old_company_ids.remove(cid)
else:
updates += [(4, cid)]
updates += [(3, x) for x in old_company_ids]
super(EbicsConfig, rec).write({"company_ids": updates})
return True
def unlink(self):
for ebics_config in self:

View File

@@ -3,6 +3,8 @@
import base64
import logging
from sys import exc_info
from traceback import format_exception
from odoo import _, fields, models
from odoo.exceptions import UserError
@@ -46,7 +48,6 @@ class EbicsFile(models.Model):
)
state = fields.Selection(
[("draft", "Draft"), ("done", "Done")],
string="State",
default="draft",
required=True,
readonly=True,
@@ -93,8 +94,7 @@ class EbicsFile(models.Model):
def process(self):
self.ensure_one()
ctx = dict(self.env.context, allowed_company_ids=self.env.user.company_ids.ids)
self = self.with_context(ctx)
self = self.with_context(allowed_company_ids=self.env.user.company_ids.ids)
self.note_process = ""
ff_methods = self._file_format_methods()
ff = self.format_id.download_process_method
@@ -159,33 +159,23 @@ class EbicsFile(models.Model):
if raise_if_not_found:
raise UserError(
_(
"The module to process the '%s' format is not installed "
"on your system. "
"\nPlease install module '%s'"
"The module to process the '%(ebics_format)s' "
"format is not installed on your system. "
"\nPlease install module '%(module)s'",
ebics_format=self.format_id.name,
module=module,
)
% (self.format_id.name, module)
)
return False
return True
def _process_result_action(self, res):
def _process_result_action(self, res_action):
notifications = []
st_line_ids = []
statement_ids = []
sts_data = []
if res.get("type") and res["type"] == "ir.actions.client":
notifications = res["context"].get("notifications", [])
st_line_ids = res["context"].get("statement_line_ids", [])
if notifications:
for notif in notifications:
parts = []
for k in ["type", "message", "details"]:
if notif.get(k):
msg = "{}: {}".format(k, notif[k])
parts.append(msg)
self.note_process += "\n".join(parts)
self.note_process += "\n"
self.note_process += "\n"
if res_action.get("type") and res_action["type"] == "ir.actions.client":
st_line_ids = res_action["context"].get("statement_line_ids", [])
if st_line_ids:
self.flush()
self.env.cr.execute(
@@ -206,10 +196,10 @@ class EbicsFile(models.Model):
)
sts_data = self.env.cr.dictfetchall()
else:
if res.get("res_id"):
st_ids = res["res_id"]
if res_action.get("res_id"):
st_ids = res_action["res_id"]
else:
st_ids = res["domain"][2]
st_ids = res_action["domain"][0][2]
statements = self.env["account.bank.statement"].browse(st_ids)
for statement in statements:
sts_data.append(
@@ -221,7 +211,29 @@ class EbicsFile(models.Model):
}
)
st_cnt = len(sts_data)
warning_cnt = error_cnt = 0
notifications = res_action["context"].get("notifications", [])
if notifications:
for notif in notifications:
if notif["type"] == "error":
error_cnt += 1
elif notif["type"] == "warning":
warning_cnt += 1
parts = [notif[k] for k in notif if k in ("message", "details")]
self.note_process += "\n".join(parts)
self.note_process += "\n\n"
self.note_process += "\n"
if error_cnt:
self.note_process += (
_("Number of errors detected during import: %s: ") % error_cnt
)
self.note_process += "\n"
if warning_cnt:
self.note_process += (
_("Number of watnings detected during import: %s: ") % warning_cnt
)
if st_cnt:
self.note_process += "\n\n"
self.note_process += _("%s bank statements have been imported: ") % st_cnt
self.note_process += "\n"
for st_data in sts_data:
@@ -232,7 +244,9 @@ class EbicsFile(models.Model):
)
statement_ids = [x["statement_id"] for x in sts_data]
if statement_ids:
self.sudo().bank_statement_ids = [(6, 0, statement_ids)]
self.sudo().bank_statement_ids = [(4, x) for x in statement_ids]
company_ids = self.sudo().bank_statement_ids.mapped("company_id").ids
self.company_ids = [(6, 0, company_ids)]
ctx = dict(self.env.context, statement_ids=statement_ids)
module = __name__.split("addons.")[1].split(".")[0]
result_view = self.env.ref("%s.ebics_file_view_form_result" % module)
@@ -279,39 +293,73 @@ class EbicsFile(models.Model):
)
st_lines = b""
transactions = False
result = {
"type": "ir.actions.client",
"tag": "bank_statement_reconciliation_view",
"context": {
"statement_line_ids": [],
"company_ids": self.env.user.company_ids.ids,
"notifications": [],
},
}
wiz_ctx = dict(self.env.context, active_model="ebics.file")
result_action = self.env["ir.actions.act_window"]._for_xml_id(
"account.action_bank_statement_tree"
)
result_action["context"] = safe_eval(result_action["context"])
statement_ids = []
notifications = []
for i, wiz_vals in enumerate(wiz_vals_list, start=1):
wiz = self.env[wiz_model].with_context(wiz_ctx).create(wiz_vals)
res = wiz.import_file_button()
ctx = res.get("context")
if res.get("res_model") == "account.bank.statement.import.journal.creation":
message = _("Error detected while importing statement number %s.\n") % i
message += _("No financial journal found.")
details = _("Bank account number: %s") % ctx.get(
"default_bank_acc_number"
)
result["context"]["notifications"].extend(
[
{
"type": "warning",
"message": message,
"details": details,
}
]
)
continue
result["context"]["statement_line_ids"].extend(ctx["statement_line_ids"])
result["context"]["notifications"].extend(ctx["notifications"])
return self._process_result_action(result)
result = {
"statement_ids": [],
"notifications": [],
}
statement_filename = wiz_vals["statement_filename"]
wiz = (
self.env[wiz_model]
.with_context(active_model="ebics.file")
.create(wiz_vals)
)
try:
with self.env.cr.savepoint():
file_data = base64.b64decode(wiz_vals["statement_file"])
msg_hdr = _(
"{} : Import failed for statement number %(index)s, "
"filename %(fn)s:\n",
index=i,
fn=statement_filename,
)
wiz.import_single_file(file_data, result)
if not result["statement_ids"]:
message = msg_hdr.format(_("Warning"))
message += _(
"You have already imported this file, or this file "
"only contains already imported transactions."
)
notifications += [
{
"type": "warning",
"message": message,
}
]
else:
statement_ids.extend(result["statement_ids"])
notifications.extend(result["notifications"])
except UserError as e:
message = msg_hdr.format(_("Error"))
message += "".join(e.args)
notifications += [
{
"type": "error",
"message": message,
}
]
except Exception:
tb = "".join(format_exception(*exc_info()))
message = msg_hdr.format(_("Error"))
message += tb
notifications += [
{
"type": "error",
"message": message,
}
]
result_action["context"]["notifications"] = notifications
result_action["domain"] = [("id", "in", statement_ids)]
return self._process_result_action(result_action)
@staticmethod
def _unlink_cfonb120(self):
@@ -360,11 +408,12 @@ class EbicsFile(models.Model):
if not found:
raise UserError(
_(
"The module to process the '%s' format is not installed "
"on your system. "
"\nPlease install one of the following modules: \n%s."
"The module to process the '%(ebics_format)s' format is "
"not installed on your system. "
"\nPlease install one of the following modules: \n%(modules)s.",
ebics_format=self.format_id.name,
modules=", ".join([x[1] for x in modules]),
)
% (self.format_id.name, ", ".join([x[1] for x in modules]))
)
if _src == "oca":
self._process_camt053_oca()
@@ -386,25 +435,14 @@ class EbicsFile(models.Model):
"notifications": [],
},
}
wiz_ctx = dict(self.env.context, active_model="ebics.file")
wiz = self.env[wiz_model].with_context(wiz_ctx).create(wiz_vals)
wiz = (
self.env[wiz_model].with_context(active_model="ebics.file").create(wiz_vals)
)
res = wiz.import_file_button()
ctx = res.get("context")
if res.get("res_model") == "account.bank.statement.import.journal.creation":
message = _("Error detected while importing statement %s.\n") % self.name
message += _("No financial journal found.")
details = _("Bank account number: %s") % ctx.get("default_bank_acc_number")
result["context"]["notifications"].extend(
[
{
"type": "warning",
"message": message,
"details": details,
}
]
)
result["context"]["statement_line_ids"].extend(ctx["statement_line_ids"])
result["context"]["notifications"].extend(ctx["notifications"])
result["context"]["statement_line_ids"].extend(
res["context"]["statement_line_ids"]
)
result["context"]["notifications"].extend(res["context"]["notifications"])
return self._process_result_action(result)
def _process_camt053_oe(self):
@@ -418,8 +456,9 @@ class EbicsFile(models.Model):
)
]
}
ctx = dict(self.env.context, active_model="ebics.file")
wiz = self.env[wiz_model].with_context(ctx).create(wiz_vals)
wiz = (
self.env[wiz_model].with_context(active_model="ebics.file").create(wiz_vals)
)
res = wiz.import_file()
if res.get("res_model") == "account.bank.statement.import.journal.creation":
if res.get("context"):

View File

@@ -1,4 +1,4 @@
# Copyright 2009-2020 Noviat.
# Copyright 2009-2022 Noviat.
# License LGPL-3 or later (http://www.gnu.org/licenses/lpgl).
from odoo import api, fields, models
@@ -9,9 +9,17 @@ class EbicsFileFormat(models.Model):
_description = "EBICS File Formats"
_order = "type,name,order_type"
ebics_version = fields.Selection(
selection=[
("2", "2"),
("3", "3"),
],
string="EBICS protocol version",
required=True,
default="2",
)
name = fields.Char(
string="Request Type",
required=True,
help="E.g. camt.xxx.cfonb120.stm, pain.001.001.03.sct.\n"
"Specify camt.052, camt.053, camt.054 for camt "
"Order Types such as C53, Z53, C54, Z54.\n"
@@ -22,9 +30,9 @@ class EbicsFileFormat(models.Model):
selection=[("down", "Download"), ("up", "Upload")], required=True
)
order_type = fields.Char(
string="Order Type",
required=True,
help="E.g. C53 (check your EBICS contract).\n"
help="EBICS 3.0: BTD (download) or BTU (upload).\n"
"EBICS 2.0: E.g. C53 (check your EBICS contract). "
"For most banks in France you should use the "
"format neutral Order Types 'FUL' for upload "
"and 'FDL' for download.",
@@ -40,7 +48,6 @@ class EbicsFileFormat(models.Model):
# move signature_class parameter so that it can be set per EBICS config
signature_class = fields.Selection(
selection=[("E", "Single signature"), ("T", "Transport signature")],
string="Signature Class",
help="Please doublecheck the security of your Odoo "
"ERP system when using class 'E' to prevent unauthorised "
"users to make supplier payments."
@@ -50,7 +57,48 @@ class EbicsFileFormat(models.Model):
description = fields.Char()
suffix = fields.Char(
required=True,
help="Specify the filename suffix for this File Format." "\nE.g. c53.xml",
help="Specify the filename suffix for this File Format.\nE.g. c53.xml",
)
# EBICS 3.0 BTF
btf_service = fields.Char(
string="BTF Service",
help="BTF Service Name)\n"
"The service code name consisting of 3 alphanumeric characters "
"[A-Z0-9] (e.g. SCT, SDD, STM, EOP)",
)
btf_message = fields.Char(
string="BTF Message Name",
help="BTF Message Name\n"
"The message name consisting of up to 10 alphanumeric characters "
"[a-z0-9.] (eg. pain.001, pain.008, camt.053)",
)
btf_scope = fields.Char(
string="BTF Scope",
help="Scope of service.\n"
"Either an ISO-3166 ALPHA 2 country code or an issuer code "
"of 3 alphanumeric characters [A-Z0-9].",
)
btf_option = fields.Char(
string="BTF Option",
help="The service option code consisting of 3-10 alphanumeric "
"characters [A-Z0-9] (eg. COR, B2B)",
)
btf_container = fields.Char(
string="BTF Container",
help="Type of container consisting of 3 characters [A-Z] (eg. XML, ZIP).",
)
btf_version = fields.Char(
string="BTF Version",
help="Message version consisting of 2 numeric characters [0-9] (eg. 03).",
)
btf_variant = fields.Char(
string="BTF Variant",
help="Message variant consisting of 3 numeric characters [0-9] (eg. 001).",
)
btf_format = fields.Char(
string="BTF Format",
help="Message format consisting of 1-4 alphanumeric characters [A-Z0-9] "
"(eg. XML, JSON, PDF).",
)
@api.model
@@ -62,3 +110,10 @@ class EbicsFileFormat(models.Model):
def _onchange_type(self):
if self.type == "up":
self.download_process_method = False
def name_get(self):
res = []
for rec in self:
name = rec.ebics_version == "2" and rec.name or rec.btf_message
res.append((rec.id, name))
return res

View File

@@ -1,4 +1,4 @@
# Copyright 2009-2020 Noviat.
# Copyright 2009-2022 Noviat.
# License LGPL-3 or later (http://www.gnu.org/licenses/lpgl).
import base64
@@ -14,8 +14,8 @@ from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
# logging.basicConfig(
# level=logging.DEBUG,
# format='[%(asctime)s] %(levelname)s - %(name)s: %(message)s')
# level=logging.DEBUG,
# format='[%(asctime)s] %(levelname)s - %(name)s: %(message)s')
try:
import fintech
@@ -75,7 +75,6 @@ class EbicsUserID(models.Model):
# Classes A and B are not yet supported.
signature_class = fields.Selection(
selection=[("E", "Single signature"), ("T", "Transport signature")],
string="Signature Class",
required=True,
default="T",
readonly=True,
@@ -157,12 +156,11 @@ class EbicsUserID(models.Model):
("to_verify", "Verification"),
("active_keys", "Active Keys"),
],
string="State",
default="draft",
required=True,
readonly=True,
)
active = fields.Boolean(string="Active", default=True)
active = fields.Boolean(default=True)
company_ids = fields.Many2many(
comodel_name="res.company",
string="Companies",
@@ -175,7 +173,9 @@ class EbicsUserID(models.Model):
for rec in self:
keys_dir = rec.ebics_config_id.ebics_keys
rec.ebics_keys_fn = (
rec.name and keys_dir and (keys_dir + "/" + rec.name + "_keys")
rec.name
and keys_dir
and (keys_dir + "/" + rec.name.replace(" ", "_") + "_keys")
)
@api.depends("ebics_keys_fn")
@@ -243,11 +243,11 @@ class EbicsUserID(models.Model):
partnerid=self.ebics_config_id.ebics_partner,
userid=self.name,
)
except Exception:
except Exception as err:
exctype, value = exc_info()[:2]
error = _("EBICS Initialisation Error:")
error += "\n" + str(exctype) + "\n" + str(value)
raise UserError(error)
raise UserError(error) from err
self.ebics_config_id._check_ebics_keys()
if not os.path.isfile(self.ebics_keys_fn):
@@ -256,7 +256,7 @@ class EbicsUserID(models.Model):
# enable import of all type of certicates: A00x, X002, E002
if self.swift_3skey:
kwargs = {
self.ebics_config_id.ebics_key_version: base64.decodestring(
self.ebics_config_id.ebics_key_version: base64.decodebytes(
self.swift_3skey_certificate
),
}
@@ -265,11 +265,11 @@ class EbicsUserID(models.Model):
keyversion=self.ebics_config_id.ebics_key_version,
bitlength=self.ebics_config_id.ebics_key_bitlength,
)
except Exception:
except Exception as err:
exctype, value = exc_info()[:2]
error = _("EBICS Initialisation Error:")
error += "\n" + str(exctype) + "\n" + str(value)
raise UserError(error)
raise UserError(error) from err
if self.swift_3skey and not self.ebics_key_x509:
raise UserError(
@@ -302,7 +302,7 @@ class EbicsUserID(models.Model):
)
try:
supported_versions = client.HEV()
if ebics_version not in supported_versions:
if supported_versions and ebics_version not in supported_versions:
err_msg = _("EBICS version mismatch.") + "\n"
err_msg += _("Versions supported by your bank:")
for k in supported_versions:
@@ -314,7 +314,7 @@ class EbicsUserID(models.Model):
_logger.info("%s, EBICS INI command, OrderID=%s", self._name, OrderID)
if ebics_version == "H003":
self.ebics_config_id._update_order_number(OrderID)
except URLError:
except URLError as err:
exctype, value = exc_info()[:2]
tb = "".join(format_exception(*exc_info()))
_logger.error(
@@ -323,21 +323,24 @@ class EbicsUserID(models.Model):
tb,
)
raise UserError(
_("urlopen error:\n url '%s' - %s")
% (self.ebics_config_id.ebics_url, str(value))
)
except EbicsFunctionalError:
_(
"urlopen error:\n url '%(url)s' - %(val)s",
url=self.ebics_config_id.ebics_url,
val=str(value),
)
) from err
except EbicsFunctionalError as err:
e = exc_info()
error = _("EBICS Functional Error:")
error += "\n"
error += "{} (code: {})".format(e[1].message, e[1].code)
raise UserError(error)
except EbicsTechnicalError:
raise UserError(error) from err
except EbicsTechnicalError as err:
e = exc_info()
error = _("EBICS Technical Error:")
error += "\n"
error += "{} (code: {})".format(e[1].message, e[1].code)
raise UserError(error)
raise UserError(error) from err
# Send the public authentication and encryption keys to the bank.
if ebics_version == "H003":
@@ -411,25 +414,25 @@ class EbicsUserID(models.Model):
userid=self.name,
)
client = EbicsClient(bank, user, version=self.ebics_config_id.ebics_version)
except Exception:
except Exception as err:
exctype, value = exc_info()[:2]
error = _("EBICS Initialisation Error:")
error += "\n" + str(exctype) + "\n" + str(value)
raise UserError(error)
raise UserError(error) from err
try:
public_bank_keys = client.HPB()
except EbicsFunctionalError:
except EbicsFunctionalError as err:
e = exc_info()
error = _("EBICS Functional Error:")
error += "\n"
error += "{} (code: {})".format(e[1].message, e[1].code)
raise UserError(error)
except Exception:
raise UserError(error) from err
except Exception as err:
exctype, value = exc_info()[:2]
error = _("EBICS Initialisation Error:")
error += "\n" + str(exctype) + "\n" + str(value)
raise UserError(error)
raise UserError(error) from err
public_bank_keys = public_bank_keys.encode()
tmp_dir = os.path.normpath(self.ebics_config_id.ebics_files + "/tmp")
@@ -442,7 +445,7 @@ class EbicsUserID(models.Model):
)
self.write(
{
"ebics_public_bank_keys": base64.encodestring(public_bank_keys),
"ebics_public_bank_keys": base64.encodebytes(public_bank_keys),
"ebics_public_bank_keys_fn": fn,
"state": "to_verify",
}