squash everything before 17.0

This commit is contained in:
Luc De Meyer
2018-08-14 17:04:39 +02:00
parent 6fdeb820b1
commit 8db04d2af9
40 changed files with 4893 additions and 0 deletions

View File

@@ -0,0 +1,6 @@
from . import fintech_ebics_register
from . import account_bank_statement
from . import ebics_config
from . import ebics_file
from . import ebics_file_format
from . import ebics_userid

View File

@@ -0,0 +1,11 @@
# Copyright 2009-2023 Noviat.
# License LGPL-3 or later (http://www.gnu.org/licenses/lgpl).
from odoo import fields, models
class AccountBankStatement(models.Model):
_inherit = "account.bank.statement"
ebics_file_id = fields.Many2one(comodel_name="ebics.file", string="EBICS Data File")
import_format = fields.Char(readonly=True)

View File

@@ -0,0 +1,243 @@
# Copyright 2009-2023 Noviat.
# License LGPL-3 or later (http://www.gnu.org/licenses/lgpl).
import logging
import os
import re
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
class EbicsConfig(models.Model):
"""
EBICS configuration is stored in a separate object in order to
allow extra security policies on this object.
"""
_name = "ebics.config"
_description = "EBICS Configuration"
_order = "name"
name = fields.Char(
readonly=True,
states={"draft": [("readonly", False)]},
required=True,
)
journal_ids = fields.Many2many(
comodel_name="account.journal",
relation="account_journal_ebics_config_rel",
readonly=True,
states={"draft": [("readonly", False)]},
string="Bank Accounts",
domain="[('type', '=', 'bank')]",
)
ebics_host = fields.Char(
string="EBICS HostID",
required=True,
readonly=True,
states={"draft": [("readonly", False)]},
help="Contact your bank to get the EBICS HostID."
"\nIn France the BIC is usually allocated to the HostID "
"whereas in Germany it tends to be an institute specific string "
"of 8 characters.",
)
ebics_url = fields.Char(
string="EBICS URL",
required=True,
readonly=True,
states={"draft": [("readonly", False)]},
help="Contact your bank to get the EBICS URL.",
)
ebics_version = fields.Selection(
selection=[
("H003", "H003 (2.4)"),
("H004", "H004 (2.5)"),
("H005", "H005 (3.0)"),
],
string="EBICS protocol version",
readonly=True,
states={"draft": [("readonly", False)]},
required=True,
default="H004",
)
ebics_partner = fields.Char(
string="EBICS PartnerID",
required=True,
readonly=True,
states={"draft": [("readonly", False)]},
help="Organizational unit (company or individual) "
"that concludes a contract with the bank. "
"\nIn this contract it will be agreed which order types "
"(file formats) are used, which accounts are concerned, "
"which of the customer's users (subscribers) "
"communicate with the EBICS bank server and the authorisations "
"that these users will possess. "
"\nIt is identified by the PartnerID.",
)
ebics_userid_ids = fields.One2many(
comodel_name="ebics.userid",
inverse_name="ebics_config_id",
string="EBICS UserID",
readonly=True,
states={"draft": [("readonly", False)]},
help="Human users or a technical system that is/are "
"assigned to a customer. "
"\nOn the EBICS bank server it is identified "
"by the combination of UserID and PartnerID. "
"The technical subscriber serves only for the data exchange "
"between customer and financial institution. "
"The human user also can authorise orders.",
)
# We store the EBICS keys in a separate directory in the file system.
# This directory requires special protection to reduce fraude.
ebics_keys = fields.Char(
string="EBICS Keys Root",
required=True,
readonly=True,
states={"draft": [("readonly", False)]},
default=lambda self: self._default_ebics_keys(),
help="Root Directory for storing the EBICS Keys.",
)
ebics_key_version = fields.Selection(
selection=[("A005", "A005 (RSASSA-PKCS1-v1_5)"), ("A006", "A006 (RSASSA-PSS)")],
string="EBICS key version",
default="A006",
readonly=True,
states={"draft": [("readonly", False)]},
help="The key version of the electronic signature.",
)
ebics_key_bitlength = fields.Integer(
string="EBICS key bitlength",
default=2048,
readonly=True,
states={"draft": [("readonly", False)]},
help="The bit length of the generated keys. "
"\nThe value must be between 1536 and 4096.",
)
ebics_file_format_ids = fields.Many2many(
comodel_name="ebics.file.format",
column1="config_id",
column2="format_id",
string="EBICS File Format",
readonly=True,
states={"draft": [("readonly", False)]},
)
state = fields.Selection(
[("draft", "Draft"), ("confirm", "Confirmed")],
default="draft",
required=True,
readonly=True,
)
order_number = fields.Char(
size=4,
readonly=True,
states={"draft": [("readonly", False)]},
help="Specify the number for the next order."
"\nThis number should match the following pattern : "
"[A-Z]{1}[A-Z0-9]{3}",
)
active = fields.Boolean(default=True)
company_ids = fields.Many2many(
comodel_name="res.company",
relation="ebics_config_res_company_rel",
string="Companies",
readonly=True,
help="Companies sharing this EBICS contract.",
)
@api.model
def _default_ebics_keys(self):
return "/".join(["/etc/odoo/ebics_keys", self._cr.dbname])
@api.constrains("ebics_key_bitlength")
def _check_ebics_key_bitlength(self):
for cfg in self:
if cfg.ebics_version == "H005" and cfg.ebics_key_bitlength < 2048:
raise UserError(_("EBICS key bitlength must be >= 2048."))
@api.constrains("order_number")
def _check_order_number(self):
for cfg in self:
nbr = cfg.order_number
ok = True
if nbr:
if len(nbr) != 4:
ok = False
else:
pattern = re.compile("[A-Z]{1}[A-Z0-9]{3}")
if not pattern.match(nbr):
ok = False
if not ok:
raise UserError(
_(
"Order Number should comply with the following pattern:"
"\n[A-Z]{1}[A-Z0-9]{3}"
)
)
def write(self, vals):
"""
Due to the multi-company nature of the EBICS config we
need to adapt the company_ids in the write method.
"""
if "journal_ids" not in vals:
return super().write(vals)
for rec in self:
old_company_ids = rec.journal_ids.mapped("company_id").ids
super(EbicsConfig, rec).write(vals)
new_company_ids = rec.journal_ids.mapped("company_id").ids
updates = []
for cid in new_company_ids:
if cid in old_company_ids:
old_company_ids.remove(cid)
else:
updates += [(4, cid)]
updates += [(3, x) for x in old_company_ids]
super(EbicsConfig, rec).write({"company_ids": updates})
return True
def unlink(self):
for ebics_config in self:
if ebics_config.state == "active":
raise UserError(_("You cannot remove active EBICS configurations."))
return super().unlink()
def set_to_draft(self):
return self.write({"state": "draft"})
def set_to_confirm(self):
return self.write({"state": "confirm"})
def _get_order_number(self):
return self.order_number
def _update_order_number(self, OrderID):
o_list = list(OrderID)
for i, c in enumerate(reversed(o_list), start=1):
if c == "9":
o_list[-i] = "A"
break
if c == "Z":
o_list[-i] = "0"
continue
else:
o_list[-i] = chr(ord(c) + 1)
break
next_order_number = "".join(o_list)
if next_order_number == "ZZZZ":
next_order_number = "A000"
self.order_number = next_order_number
def _check_ebics_keys(self):
dirname = self.ebics_keys or ""
if not os.path.exists(dirname):
raise UserError(
_(
"EBICS Keys Root Directory %s is not available."
"\nPlease contact your system administrator."
)
% dirname
)

View File

@@ -0,0 +1,619 @@
# Copyright 2009-2023 Noviat.
# License LGPL-3 or later (http://www.gnu.org/licenses/lgpl).
import base64
import logging
from copy import deepcopy
from sys import exc_info
from traceback import format_exception
from lxml import etree
from odoo import _, fields, models
from odoo.exceptions import UserError
from odoo.addons.base.models.res_bank import sanitize_account_number
_logger = logging.getLogger(__name__)
DUP_CHECK_FORMATS = ["cfonb120", "camt053"]
class EbicsFile(models.Model):
_name = "ebics.file"
_description = "Object to store EBICS Data Files"
_order = "date desc"
_sql_constraints = [
(
"name_uniq",
"unique (name, format_id)",
"This File has already been down- or uploaded !",
)
]
name = fields.Char(string="Filename")
data = fields.Binary(string="File", readonly=True)
format_id = fields.Many2one(
comodel_name="ebics.file.format", string="EBICS File Formats", readonly=True
)
type = fields.Selection(related="format_id.type", readonly=True)
date_from = fields.Date(
readonly=True, help="'Date From' as entered in the download wizard."
)
date_to = fields.Date(
readonly=True, help="'Date To' as entered in the download wizard."
)
date = fields.Datetime(
required=True, readonly=True, help="File Upload/Download date"
)
bank_statement_ids = fields.One2many(
comodel_name="account.bank.statement",
inverse_name="ebics_file_id",
string="Generated Bank Statements",
readonly=True,
)
state = fields.Selection(
[("draft", "Draft"), ("done", "Done")],
default="draft",
required=True,
readonly=True,
)
user_id = fields.Many2one(
comodel_name="res.users",
string="User",
default=lambda self: self.env.user,
readonly=True,
)
ebics_userid_id = fields.Many2one(
comodel_name="ebics.userid",
string="EBICS UserID",
ondelete="restrict",
readonly=True,
)
note = fields.Text(string="Notes")
note_process = fields.Text(
string="Process Notes",
readonly=True,
)
company_ids = fields.Many2many(
comodel_name="res.company",
string="Companies",
readonly=True,
help="Companies sharing this EBICS file.",
)
def unlink(self):
ff_methods = self._file_format_methods()
for ebics_file in self:
if ebics_file.state == "done":
raise UserError(_("You can only remove EBICS files in state 'Draft'."))
# execute format specific actions
ff = ebics_file.format_id.download_process_method
if ff in ff_methods:
if ff_methods[ff].get("unlink"):
ff_methods[ff]["unlink"]()
# remove bank statements
ebics_file.bank_statement_ids.unlink()
return super().unlink()
def set_to_draft(self):
return self.write({"state": "draft"})
def set_to_done(self):
return self.write({"state": "done"})
def process(self):
self.ensure_one()
self = self.with_context(allowed_company_ids=self.env.user.company_ids.ids)
self.note_process = ""
ff_methods = self._file_format_methods()
ff = self.format_id.download_process_method
if ff in ff_methods:
if ff_methods[ff].get("process"):
res = ff_methods[ff]["process"]()
self.state = "done"
return res
else:
return self._process_undefined_format()
def action_open_bank_statements(self):
self.ensure_one()
action = self.env["ir.actions.act_window"]._for_xml_id(
"account.action_bank_statement_tree"
)
domain = [("id", "in", self.env.context.get("statement_ids"))]
action["domain"] = domain
return action
def button_close(self):
self.ensure_one()
return {"type": "ir.actions.act_window_close"}
def _file_format_methods(self):
"""
Extend this dictionary in order to add support
for extra file formats.
"""
res = {
"cfonb120": {
"process": self._process_cfonb120,
"unlink": self._unlink_cfonb120,
},
"camt.052": {
"process": self._process_camt052,
"unlink": self._unlink_camt052,
},
"camt.053": {
"process": self._process_camt053,
"unlink": self._unlink_camt053,
},
"camt.054": {
"process": self._process_camt054,
"unlink": self._unlink_camt054,
},
"pain.002": {
"process": self._process_pain002,
"unlink": self._unlink_pain002,
},
}
return res
def _check_import_module(self, module, raise_if_not_found=True):
mod = (
self.env["ir.module.module"]
.sudo()
.search([("name", "=like", module), ("state", "=", "installed")])
)
if not mod:
if raise_if_not_found:
raise UserError(
_(
"The module to process the '%(ebics_format)s' format is not installed "
"on your system. "
"\nPlease install module '%(module)s'",
ebics_format=self.format_id.name,
module=module,
)
)
return False
return True
def _lookup_journal(self, res, acc_number, currency_code):
currency = self.env["res.currency"].search(
[("name", "=ilike", currency_code)], limit=1
)
journal = self.env["account.journal"]
if not currency:
message = _("Currency %(cc)s not found.", cc=currency_code)
res["notifications"].append({"type": "error", "message": message})
return (currency, journal)
journals = self.env["account.journal"].search(
[
("type", "=", "bank"),
(
"bank_account_id.sanitized_acc_number",
"ilike",
acc_number,
),
]
)
if not journals:
message = _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append({"type": "error", "message": message})
return (currency, journal)
for jrnl in journals:
journal_currency = jrnl.currency_id or jrnl.company_id.currency_id
if journal_currency != currency:
continue
else:
journal = jrnl
break
if not journal:
message = _(
"No financial journal found for Account Number %(nbr)s, "
"Currency %(cc)s",
nbr=acc_number,
cc=currency_code,
)
res["notifications"].append({"type": "error", "message": message})
return (currency, journal)
def _process_download_result(self, res, file_format=None):
"""
We perform a duplicate statement check after the creation of the bank
statements since we rely on Odoo Enterprise or OCA modules for the
bank statement creation.
From a development standpoint (code creation/maintenance) a check after
creation is the easiest way.
"""
statement_ids = res["statement_ids"]
notifications = res["notifications"]
statements = self.env["account.bank.statement"].sudo().browse(statement_ids)
if statements:
statements.write({"import_format": file_format})
statements = self._statement_duplicate_check(res, statements)
else:
notifications.append(
{
"type": "warning",
"message": _("This file doesn't contain any transaction."),
}
)
st_cnt = len(statements)
warning_cnt = error_cnt = 0
if notifications:
errors = []
warnings = []
for notif in notifications:
if notif["type"] == "error":
error_cnt += 1
parts = [notif[k] for k in notif if k in ("message", "details")]
errors.append("\n".join(parts))
elif notif["type"] == "warning":
warning_cnt += 1
parts = [notif[k] for k in notif if k in ("message", "details")]
warnings.append("\n".join(parts))
self.note_process += _("Process file %(fn)s results:", fn=self.name)
if error_cnt:
self.note_process += "\n\n" + _("Errors") + ":\n"
self.note_process += "\n".join(errors)
self.note_process += "\n\n"
self.note_process += _("Number of errors: %(nr)s", nr=error_cnt)
if warning_cnt:
self.note_process += "\n\n" + _("Warnings") + ":\n"
self.note_process += "\n".join(warnings)
self.note_process += "\n\n"
self.note_process += _("Number of warnings: %(nr)s", nr=warning_cnt)
self.note_process += "\n"
if st_cnt:
self.note_process += "\n\n"
self.note_process += _(
"%(st_cnt)s bank statement%(sp)s been imported: ",
st_cnt=st_cnt,
sp=st_cnt == 1 and _(" has") or _("s have"),
)
self.note_process += "\n"
for statement in statements:
self.note_process += "\n" + _(
"Statement %(st)s dated %(date)s (Company: %(cpy)s)",
st=statement.name,
date=statement.date,
cpy=statement.company_id.name,
)
if statements:
self.sudo().bank_statement_ids = [(4, x) for x in statements.ids]
company_ids = self.sudo().bank_statement_ids.mapped("company_id").ids
self.company_ids = [(6, 0, company_ids)]
ctx = dict(self.env.context, statement_ids=statements.ids)
module = __name__.split("addons.")[1].split(".")[0]
result_view = self.env.ref("%s.ebics_file_view_form_result" % module)
return {
"name": _("Import EBICS File"),
"res_id": self.id,
"view_type": "form",
"view_mode": "form",
"res_model": self._name,
"view_id": result_view.id,
"target": "new",
"context": ctx,
"type": "ir.actions.act_window",
}
def _statement_duplicate_check(self, res, statements):
"""
This check is required for import modules that do not
set the 'unique_import_id' on the statement lines.
E.g. OCA camt import
"""
to_unlink = self.env["account.bank.statement"]
for statement in statements.filtered(
lambda r: r.import_format in DUP_CHECK_FORMATS
):
dup = self.env["account.bank.statement"].search_count(
[
("id", "!=", statement.id),
("name", "=", statement.name),
("company_id", "=", statement.company_id.id),
("date", "=", statement.date),
("import_format", "=", statement.import_format),
]
)
if dup:
message = _(
"Statement %(st_name)s dated %(date)s has already been imported.",
st_name=statement.name,
date=statement.date,
)
res["notifications"].append({"type": "warning", "message": message})
to_unlink += statement
res["statement_ids"] = [
x for x in res["statement_ids"] if x not in to_unlink.ids
]
statements -= to_unlink
to_unlink.unlink()
return statements
def _process_cfonb120(self):
import_module = "account_statement_import_fr_cfonb"
self._check_import_module(import_module)
res = {"statement_ids": [], "notifications": []}
st_datas = self._split_cfonb(res)
self._process_bank_statement_oca(res, st_datas)
return self._process_download_result(res, file_format="cfonb120")
def _unlink_cfonb120(self):
"""
Placeholder for cfonb120 specific actions before removing the
EBICS data file and its related bank statements.
"""
def _split_cfonb(self, res):
"""
Split CFONB file received via EBICS per statement.
Statements without transactions are removed.
"""
datas = []
file_data = base64.b64decode(self.data)
lines = file_data.split(b"\n")
st_lines = b""
transactions = False
for line in lines:
rec_type = line[0:2]
currency_code = line[16:19].decode()
acc_number = line[21:32].decode()
st_lines += line + b"\n"
if rec_type == b"04":
transactions = True
if rec_type == b"07":
if transactions:
currency, journal = self._lookup_journal(
res, acc_number, currency_code
)
if currency and journal:
datas.append(
{
"acc_number": acc_number,
"journal_id": journal.id,
"company_id": journal.company_id.id,
"data": base64.b64encode(st_lines),
}
)
st_lines = b""
transactions = False
return datas
def _process_camt052(self):
import_module = "account_statement_import_camt"
self._check_import_module(import_module)
return self._process_camt053(file_format="camt052")
def _unlink_camt052(self):
"""
Placeholder for camt052 specific actions before removing the
EBICS data file and its related bank statements.
"""
def _process_camt054(self):
import_module = "account_statement_import_camt"
self._check_import_module(import_module)
return self._process_camt053(file_format="camt054")
def _unlink_camt054(self):
"""
Placeholder for camt054 specific actions before removing the
EBICS data file and its related bank statements.
"""
def _process_camt053(self, file_format=None):
"""
The Odoo standard statement import is based on manual selection
of a financial journal before importing the electronic statement file.
An EBICS download may return a single file containing a large number of
statements from different companies/journals.
Hence we need to split the CAMT file into
single statement CAMT files before we can call the logic
implemented by the Odoo OE or Community CAMT parsers.
"""
modules = [
("oca", "account_statement_import_camt"),
("oe", "account_bank_statement_import_camt"),
]
author = False
for entry in modules:
if self._check_import_module(entry[1], raise_if_not_found=False):
author = entry[0]
break
if not author:
raise UserError(
_(
"The module to process the '%(ebics_format)s' format is "
"not installed on your system. "
"\nPlease install one of the following modules: \n%(modules)s.",
ebics_format=self.format_id.name,
modules=", ".join([x[1] for x in modules]),
)
)
res = {"statement_ids": [], "notifications": []}
st_datas = self._split_camt(res)
if author == "oca":
self._process_bank_statement_oca(res, st_datas)
else:
self._process_bank_statement_oe(res, st_datas)
file_format = file_format or "camt053"
return self._process_download_result(res, file_format=file_format)
def _process_bank_statement_oca(self, res, st_datas):
for st_data in st_datas:
try:
with self.env.cr.savepoint():
self._create_bank_statement_oca(res, st_data)
except UserError as e:
res["notifications"].append(
{"type": "error", "message": "".join(e.args)}
)
except Exception:
tb = "".join(format_exception(*exc_info()))
res["notifications"].append({"type": "error", "message": tb})
def _create_bank_statement_oca(self, res, st_data):
wiz = (
self.env["account.statement.import"]
.with_company(st_data["company_id"])
.with_context(active_model="ebics.file")
.create({"statement_filename": self.name})
)
wiz.import_single_file(base64.b64decode(st_data["data"]), res)
def _process_bank_statement_oe(self, res, st_datas):
"""
We execute a cr.commit() after every statement import since we get a
'savepoint does not exist' error when using 'with self.env.cr.savepoint()'.
"""
for st_data in st_datas:
try:
self._create_bank_statement_oe(res, st_data)
self.env.cr.commit() # pylint: disable=E8102
except UserError as e:
msg = "".join(e.args)
msg += "\n"
msg += _(
"Statement for Account Number %(nr)s has not been processed.",
nr=st_data["acc_number"],
)
res["notifications"].append({"type": "error", "message": msg})
except Exception:
tb = "".join(format_exception(*exc_info()))
res["notifications"].append({"type": "error", "message": tb})
def _create_bank_statement_oe(self, res, st_data):
attachment = (
self.env["ir.attachment"]
.with_company(st_data["company_id"])
.create(
{
"name": self.name,
"datas": st_data["data"],
"store_fname": self.name,
}
)
)
journal = (
self.env["account.journal"]
.with_company(st_data["company_id"])
.browse(st_data["journal_id"])
)
act = journal._import_bank_statement(attachment)
for entry in act["domain"]:
if (
isinstance(entry, tuple)
and entry[0] == "statement_id"
and entry[1] == "in"
):
res["statement_ids"].extend(entry[2])
break
notifications = act["context"]["notifications"]
if notifications:
res["notifications"].append(act["context"]["notifications"])
def _unlink_camt053(self):
"""
Placeholder for camt053 specific actions before removing the
EBICS data file and its related bank statements.
"""
def _split_camt(self, res):
"""
Split CAMT file received via EBICS per statement.
Statements without transactions are removed.
"""
datas = []
file_data = base64.b64decode(self.data)
root = etree.fromstring(file_data, parser=etree.XMLParser(recover=True))
if root is None:
message = _("Invalid XML file.")
res["notifications"].append({"type": "error", "message": message})
ns = {k or "ns": v for k, v in root.nsmap.items()}
camt_variant = ns["ns"].split("camt.")[1][:3]
variant_tags = {
"052": "Rpt",
"053": "Stmt",
"054": "Ntfctn",
}
camt_tag = variant_tags[camt_variant]
stmts = root[0].findall("ns:{}".format(camt_tag), ns)
for i, stmt in enumerate(stmts):
acc_number = sanitize_account_number(
stmt.xpath(
"ns:Acct/ns:Id/ns:IBAN/text() | ns:Acct/ns:Id/ns:Othr/ns:Id/text()",
namespaces=ns,
)[0]
)
if not acc_number:
message = _("No bank account number found.")
res["notifications"].append({"type": "error", "message": message})
continue
currency_code = stmt.xpath(
"ns:Acct/ns:Ccy/text() | ns:Bal/ns:Amt/@Ccy", namespaces=ns
)[0]
# some banks (e.g. COMMERZBANK) add the currency as the last 3 digits
# of the bank account number hence we need to remove this since otherwise
# the journal matching logic fails
if acc_number[-3:] == currency_code:
acc_number = acc_number[:-3]
root_new = deepcopy(root)
entries = False
for j, el in enumerate(root_new[0].findall("ns:{}".format(camt_tag), ns)):
if j != i:
el.getparent().remove(el)
else:
entries = el.findall("ns:Ntry", ns)
if not entries:
continue
else:
currency, journal = self._lookup_journal(res, acc_number, currency_code)
if not (currency and journal):
continue
datas.append(
{
"acc_number": acc_number,
"journal_id": journal.id,
"company_id": journal.company_id.id,
"data": base64.b64encode(etree.tostring(root_new)),
}
)
return datas
def _process_pain002(self):
"""
Placeholder for processing pain.002 files.
TODO:
add import logic based upon OCA 'account_payment_return_import'
"""
def _unlink_pain002(self):
"""
Placeholder for pain.002 specific actions before removing the
EBICS data file.
"""
raise NotImplementedError
def _process_undefined_format(self):
raise UserError(
_(
"The current version of the 'account_ebics' module "
"has no support to automatically process EBICS files "
"with format %s."
)
% self.format_id.name
)

View File

@@ -0,0 +1,120 @@
# Copyright 2009-2023 Noviat.
# License LGPL-3 or later (http://www.gnu.org/licenses/lgpl).
from odoo import api, fields, models
class EbicsFileFormat(models.Model):
_name = "ebics.file.format"
_description = "EBICS File Formats"
_order = "type,name,order_type"
ebics_version = fields.Selection(
selection=[
("2", "2"),
("3", "3"),
],
string="EBICS protocol version",
required=True,
default="2",
)
name = fields.Char(
string="Request Type",
help="E.g. camt.xxx.cfonb120.stm, pain.001.001.03.sct.\n"
"Specify camt.052, camt.053, camt.054 for camt "
"Order Types such as C53, Z53, C54, Z54.\n"
"This name has to match the 'Request Type' in your "
"EBICS contract for Order Type 'FDL' or 'FUL'.\n",
)
type = fields.Selection(
selection=[("down", "Download"), ("up", "Upload")], required=True
)
order_type = fields.Char(
required=True,
help="EBICS 3.0: BTD (download) or BTU (upload).\n"
"EBICS 2.0: E.g. C53 (check your EBICS contract). "
"For most banks in France you should use the "
"format neutral Order Types 'FUL' for upload "
"and 'FDL' for download.",
)
download_process_method = fields.Selection(
selection="_selection_download_process_method",
help="Enable processing within Odoo of the downloaded file "
"via the 'Process' button."
"E.g. specify camt.053 to import a camt.053 file and create "
"a bank statement.",
)
# TODO:
# move signature_class parameter so that it can be set per EBICS config
signature_class = fields.Selection(
selection=[("E", "Single signature"), ("T", "Transport signature")],
help="Please doublecheck the security of your Odoo "
"ERP system when using class 'E' to prevent unauthorised "
"users to make supplier payments."
"\nLeave this field empty to use the default "
"defined for your EBICS UserID.",
)
description = fields.Char()
suffix = fields.Char(
help="Specify the filename suffix for this File Format.\nE.g. c53.xml",
)
# EBICS 3.0 BTF
btf_service = fields.Char(
string="BTF Service",
help="BTF Service Name)\n"
"The service code name consisting of 3 alphanumeric characters "
"[A-Z0-9] (e.g. SCT, SDD, STM, EOP)",
)
btf_message = fields.Char(
string="BTF Message Name",
help="BTF Message Name\n"
"The message name consisting of up to 10 alphanumeric characters "
"[a-z0-9.] (eg. pain.001, pain.008, camt.053)",
)
btf_scope = fields.Char(
string="BTF Scope",
help="Scope of service.\n"
"Either an ISO-3166 ALPHA 2 country code or an issuer code "
"of 3 alphanumeric characters [A-Z0-9].",
)
btf_option = fields.Char(
string="BTF Option",
help="The service option code consisting of 3-10 alphanumeric "
"characters [A-Z0-9] (eg. COR, B2B)",
)
btf_container = fields.Char(
string="BTF Container",
help="Type of container consisting of 3 characters [A-Z] (eg. XML, ZIP).",
)
btf_version = fields.Char(
string="BTF Version",
help="Message version consisting of 2 numeric characters [0-9] (eg. 03).",
)
btf_variant = fields.Char(
string="BTF Variant",
help="Message variant consisting of 3 numeric characters [0-9] (eg. 001).",
)
btf_format = fields.Char(
string="BTF Format",
help="Message format consisting of 1-4 alphanumeric characters [A-Z0-9] "
"(eg. XML, JSON, PDF).",
)
@api.model
def _selection_download_process_method(self):
methods = self.env["ebics.file"]._file_format_methods().keys()
return [(x, x) for x in methods]
@api.onchange("type")
def _onchange_type(self):
if self.type == "up":
self.download_process_method = False
def name_get(self):
res = []
for rec in self:
name = rec.ebics_version == "2" and rec.name or rec.btf_message
if rec.description:
name += " - " + rec.description
res.append((rec.id, name))
return res

View File

@@ -0,0 +1,621 @@
# Copyright 2009-2024 Noviat.
# License LGPL-3 or later (http://www.gnu.org/licenses/lgpl).
import base64
import logging
import os
from sys import exc_info
from traceback import format_exception
from urllib.error import URLError
from odoo import _, api, fields, models
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
try:
import fintech
from fintech.ebics import (
EbicsBank,
EbicsClient,
EbicsFunctionalError,
EbicsKeyRing,
EbicsTechnicalError,
EbicsUser,
)
fintech.cryptolib = "cryptography"
except ImportError:
_logger.warning("Failed to import fintech")
class EbicsBank(EbicsBank):
def _next_order_id(self, partnerid):
"""
EBICS protocol version H003 requires generation of the OrderID.
The OrderID must be a string between 'A000' and 'ZZZZ' and
unique for each partner id.
"""
return hasattr(self, "_order_number") and self._order_number or "A000"
class EbicsUserID(models.Model):
_name = "ebics.userid"
_description = "EBICS UserID"
_order = "name"
name = fields.Char(
string="EBICS UserID",
required=True,
readonly=True,
states={"draft": [("readonly", False)]},
help="Human users or a technical system that is/are "
"assigned to a customer. "
"\nOn the EBICS bank server it is identified "
"by the combination of UserID and PartnerID. "
"The technical subscriber serves only for the data exchange "
"between customer and financial institution. "
"The human user also can authorise orders.",
)
ebics_config_id = fields.Many2one(
comodel_name="ebics.config",
string="EBICS Configuration",
ondelete="cascade",
required=True,
)
ebics_version = fields.Selection(related="ebics_config_id.ebics_version")
user_ids = fields.Many2many(
comodel_name="res.users",
string="Users",
required=True,
help="Users who are allowed to use this EBICS UserID for "
" bank transactions.",
)
# Currently only a single signature class per user is supported
# Classes A and B are not yet supported.
signature_class = fields.Selection(
selection=[("E", "Single signature"), ("T", "Transport signature")],
required=True,
default="T",
readonly=True,
states={"draft": [("readonly", False)]},
help="Default signature class."
"This default can be overriden for specific "
"EBICS transactions (cf. File Formats).",
)
transaction_rights = fields.Selection(
selection=[
("both", "Download and Upload"),
("down", "Download Only"),
("up", "Upload Only"),
],
string="Allowed Transactions",
default="both",
required=True,
help="Use this parameter to limit the transactions for this User "
"to downloads or uploads.",
)
ebics_keys_fn = fields.Char(compute="_compute_ebics_keys_fn")
ebics_keys_found = fields.Boolean(compute="_compute_ebics_keys_found")
ebics_passphrase = fields.Char(string="EBICS Passphrase")
ebics_passphrase_store = fields.Boolean(
string="Store EBICS Passphrase",
default=True,
help="When you uncheck this option the passphrase to unlock "
"your private key will not be stored in the database. "
"We recommend to use this if you want to upload signed "
"payment orders via EBICS.\nYou will be prompted to enter the "
"passphrase for every EBICS transaction, hence do not uncheck this "
"option on a userid for automated EBICS downloads.",
)
ebics_passphrase_required = fields.Boolean(
compute="_compute_ebics_passphrase_view_modifiers"
)
ebics_passphrase_invisible = fields.Boolean(
compute="_compute_ebics_passphrase_view_modifiers"
)
ebics_passphrase_store_readonly = fields.Boolean(
compute="_compute_ebics_passphrase_view_modifiers"
)
ebics_sig_passphrase = fields.Char(
string="EBICS Signature Passphrase",
help="You can set here a different passphrase for the EBICS "
"signing key. This passphrase will never be stored hence "
"you'll need to specify your passphrase for each transaction that "
"requires a digital signature.",
)
ebics_sig_passphrase_invisible = fields.Boolean(
compute="_compute_ebics_sig_passphrase_invisible"
)
ebics_ini_letter = fields.Binary(
string="EBICS INI Letter",
readonly=True,
help="INI-letter PDF document to be sent to your bank.",
)
ebics_ini_letter_fn = fields.Char(string="INI-letter Filename", readonly=True)
ebics_public_bank_keys = fields.Binary(
string="EBICS Public Bank Keys",
readonly=True,
help="EBICS Public Bank Keys to be checked for consistency.",
)
ebics_public_bank_keys_fn = fields.Char(
string="EBICS Public Bank Keys Filename", readonly=True
)
swift_3skey = fields.Boolean(
string="Enable 3SKey support",
help="Transactions for this user will be signed "
"by means of the SWIFT 3SKey token.",
)
swift_3skey_certificate = fields.Binary(string="3SKey Certficate")
swift_3skey_certificate_fn = fields.Char(string="3SKey Certificate Filename")
# X.509 Distinguished Name attributes used to
# create self-signed X.509 certificates
ebics_key_x509 = fields.Boolean(
string="X509 support",
readonly=True,
states={"draft": [("readonly", False)]},
help="Set this flag in order to work with " "self-signed X.509 certificates",
)
ebics_key_x509_dn_cn = fields.Char(
string="Common Name [CN]",
readonly=True,
states={"draft": [("readonly", False)]},
)
ebics_key_x509_dn_o = fields.Char(
string="Organization Name [O]",
readonly=True,
states={"draft": [("readonly", False)]},
)
ebics_key_x509_dn_ou = fields.Char(
string="Organizational Unit Name [OU]",
readonly=True,
states={"draft": [("readonly", False)]},
)
ebics_key_x509_dn_c = fields.Char(
string="Country Name [C]",
readonly=True,
states={"draft": [("readonly", False)]},
)
ebics_key_x509_dn_st = fields.Char(
string="State Or Province Name [ST]",
readonly=True,
states={"draft": [("readonly", False)]},
)
ebics_key_x509_dn_l = fields.Char(
string="Locality Name [L]",
readonly=True,
states={"draft": [("readonly", False)]},
)
ebics_key_x509_dn_e = fields.Char(
string="Email Address",
readonly=True,
states={"draft": [("readonly", False)]},
)
state = fields.Selection(
[
("draft", "Draft"),
("init", "Initialisation"),
("get_bank_keys", "Get Keys from Bank"),
("to_verify", "Verification"),
("active_keys", "Active Keys"),
],
default="draft",
required=True,
readonly=True,
)
active = fields.Boolean(default=True)
company_ids = fields.Many2many(
comodel_name="res.company",
string="Companies",
required=True,
help="Companies sharing this EBICS contract.",
)
@api.depends("name", "ebics_config_id.ebics_keys")
def _compute_ebics_keys_fn(self):
for rec in self:
keys_dir = rec.ebics_config_id.ebics_keys
rec.ebics_keys_fn = (
rec.name
and keys_dir
and (keys_dir + "/" + rec.name.replace(" ", "_") + "_keys")
)
@api.depends("ebics_keys_fn")
def _compute_ebics_keys_found(self):
for rec in self:
rec.ebics_keys_found = rec.ebics_keys_fn and os.path.isfile(
rec.ebics_keys_fn
)
@api.depends("state", "ebics_passphrase")
def _compute_ebics_passphrase_view_modifiers(self):
for rec in self:
rec.ebics_passphrase_invisible = False
rec.ebics_passphrase_store_readonly = True
if rec.state == "draft":
rec.ebics_passphrase_required = True
rec.ebics_passphrase_store_readonly = False
elif rec.state == "init":
rec.ebics_passphrase_invisible = True
elif rec.state in ("get_bank_keys", "to_verify"):
rec.ebics_passphrase_required = not rec.ebics_passphrase
rec.ebics_passphrase_invisible = rec.ebics_passphrase
else:
rec.ebics_passphrase_required = False
rec.ebics_passphrase_invisible = True
@api.depends("state")
def _compute_ebics_sig_passphrase_invisible(self):
for rec in self:
rec.ebics_sig_passphrase_invisible = True
if fintech.__version_info__ < (7, 3, 1):
continue
if rec.transaction_rights != "down" and rec.state == "draft":
rec.ebics_sig_passphrase_invisible = False
@api.constrains("ebics_key_x509")
def _check_ebics_key_x509(self):
for cfg in self:
if cfg.ebics_version == "H005" and not cfg.ebics_key_x509:
raise UserError(_("X.509 certificates must be used with EBICS 3.0."))
@api.constrains("ebics_passphrase")
def _check_ebics_passphrase(self):
for rec in self:
if rec.ebics_passphrase and len(rec.ebics_passphrase) < 8:
raise UserError(_("The Passphrase must be at least 8 characters long"))
@api.constrains("ebics_sig_passphrase")
def _check_ebics_sig_passphrase(self):
for rec in self:
if rec.ebics_sig_passphrase and len(rec.ebics_sig_passphrase) < 8:
raise UserError(
_("The Signature Passphrase must be at least 8 characters long")
)
@api.onchange("ebics_version")
def _onchange_ebics_version(self):
if self.ebics_version == "H005":
self.ebics_key_x509 = True
@api.onchange("signature_class")
def _onchange_signature_class(self):
if self.signature_class == "T":
self.swift_3skey = False
@api.onchange("ebics_passphrase_store", "ebics_passphrase")
def _onchange_ebics_passphrase_store(self):
if self.ebics_passphrase_store:
if self.ebics_passphrase:
# check passphrase before db store
keyring_params = {
"keys": self.ebics_keys_fn,
"passphrase": self.ebics_passphrase,
}
keyring = EbicsKeyRing(**keyring_params)
try:
# fintech <= 7.4.3 does not have a call to check if a
# passphrase matches with the value stored in the keyfile.
# We get around this limitation as follows:
# Get user keys to check for valid passphrases
# It will raise a ValueError on invalid passphrases
keyring["#USER"]
except ValueError as err: # noqa: F841
raise UserError(_("Passphrase mismatch.")) # noqa: B904
else:
if self.state != "draft":
self.ebics_passphrase = False
@api.onchange("swift_3skey")
def _onchange_swift_3skey(self):
if self.swift_3skey:
self.ebics_key_x509 = True
def set_to_draft(self):
return self.write({"state": "draft"})
def set_to_active_keys(self):
vals = {"state": "active_keys"}
self._update_passphrase_vals(vals)
return self.write(vals)
def set_to_get_bank_keys(self):
self.ensure_one()
if self.ebics_config_id.state != "draft":
raise UserError(
_(
"Set the EBICS Configuation record to 'Draft' "
"before starting the Key Renewal process."
)
)
return self.write({"state": "get_bank_keys"})
def ebics_init_1(self): # noqa: C901
"""
Initialization of bank keys - Step 1:
Create new keys and certificates for this user
"""
self.ensure_one()
if self.state != "draft":
raise UserError(
_("Set state to 'draft' before Bank Key (re)initialisation.")
)
if not self.ebics_passphrase:
raise UserError(_("Set a passphrase."))
if self.swift_3skey and not self.swift_3skey_certificate:
raise UserError(_("3SKey certificate missing."))
ebics_version = self.ebics_config_id.ebics_version
try:
keyring_params = {
"keys": self.ebics_keys_fn,
"passphrase": self.ebics_passphrase,
}
if self.ebics_sig_passphrase:
keyring_params["sig_passphrase"] = self.ebics_sig_passphrase
keyring = EbicsKeyRing(**keyring_params)
bank = EbicsBank(
keyring=keyring,
hostid=self.ebics_config_id.ebics_host,
url=self.ebics_config_id.ebics_url,
)
user = EbicsUser(
keyring=keyring,
partnerid=self.ebics_config_id.ebics_partner,
userid=self.name,
)
except Exception as err:
exctype, value = exc_info()[:2]
error = _("EBICS Initialisation Error:")
error += "\n" + str(exctype) + "\n" + str(value)
raise UserError(error) from err
self.ebics_config_id._check_ebics_keys()
if not os.path.isfile(self.ebics_keys_fn):
try:
# TODO:
# enable import of all type of certicates: A00x, X002, E002
if self.swift_3skey:
kwargs = {
self.ebics_config_id.ebics_key_version: base64.decodebytes(
self.swift_3skey_certificate
),
}
user.import_certificates(**kwargs)
user.create_keys(
keyversion=self.ebics_config_id.ebics_key_version,
bitlength=self.ebics_config_id.ebics_key_bitlength,
)
except Exception as err:
exctype, value = exc_info()[:2]
error = _("EBICS Initialisation Error:")
error += "\n" + str(exctype) + "\n" + str(value)
raise UserError(error) from err
if self.swift_3skey and not self.ebics_key_x509:
raise UserError(
_(
"The current version of this module "
"requires to X509 support when enabling 3SKey"
)
)
if self.ebics_key_x509:
dn_attrs = {
"commonName": self.ebics_key_x509_dn_cn,
"organizationName": self.ebics_key_x509_dn_o,
"organizationalUnitName": self.ebics_key_x509_dn_ou,
"countryName": self.ebics_key_x509_dn_c,
"stateOrProvinceName": self.ebics_key_x509_dn_st,
"localityName": self.ebics_key_x509_dn_l,
"emailAddress": self.ebics_key_x509_dn_e,
}
kwargs = {k: v for k, v in dn_attrs.items() if v}
user.create_certificates(**kwargs)
try:
client = EbicsClient(bank, user, version=ebics_version)
except RuntimeError as err:
e = exc_info()
error = _("EBICS Initialization Error:")
error += "\n"
error += err.args[0]
raise UserError(error) from err
# Send the public electronic signature key to the bank.
ebics_config_bank = self.ebics_config_id.journal_ids[0].bank_id
if not ebics_config_bank:
raise UserError(
_("No bank defined for the financial journal " "of the EBICS Config")
)
try:
supported_versions = client.HEV()
if supported_versions and ebics_version not in supported_versions:
err_msg = _("EBICS version mismatch.") + "\n"
err_msg += _("Versions supported by your bank:")
for k in supported_versions:
err_msg += "\n{}: {} ".format(k, supported_versions[k])
raise UserError(err_msg)
if ebics_version == "H003":
bank._order_number = self.ebics_config_id._get_order_number()
OrderID = client.INI()
_logger.info("%s, EBICS INI command, OrderID=%s", self._name, OrderID)
if ebics_version == "H003":
self.ebics_config_id._update_order_number(OrderID)
except URLError as err:
exctype, value = exc_info()[:2]
tb = "".join(format_exception(*exc_info()))
_logger.error(
"EBICS INI command error\nUserID: %s\n%s",
self.name,
tb,
)
raise UserError(
_(
"urlopen error:\n url '%(url)s' - %(val)s",
url=self.ebics_config_id.ebics_url,
val=str(value),
)
) from err
except EbicsFunctionalError as err:
e = exc_info()
error = _("EBICS Functional Error:")
error += "\n"
error += "{} (code: {})".format(e[1].message, e[1].code)
raise UserError(error) from err
except EbicsTechnicalError as err:
e = exc_info()
error = _("EBICS Technical Error:")
error += "\n"
error += "{} (code: {})".format(e[1].message, e[1].code)
raise UserError(error) from err
# Send the public authentication and encryption keys to the bank.
if ebics_version == "H003":
bank._order_number = self.ebics_config_id._get_order_number()
OrderID = client.HIA()
_logger.info("%s, EBICS HIA command, OrderID=%s", self._name, OrderID)
if ebics_version == "H003":
self.ebics_config_id._update_order_number(OrderID)
# Create an INI-letter which must be printed and sent to the bank.
ebics_config_bank = self.ebics_config_id.journal_ids[0].bank_id
cc = ebics_config_bank.country.code
if cc in ["FR", "DE"]:
lang = cc
else:
lang = self.env.user.lang or self.env["res.lang"].search([])[0].code
lang = lang[:2]
fn_date = fields.Date.today().isoformat()
fn = "_".join([self.ebics_config_id.ebics_host, "ini_letter", fn_date]) + ".pdf"
letter = user.create_ini_letter(bankname=ebics_config_bank.name, lang=lang)
vals = {
"ebics_ini_letter": base64.encodebytes(letter),
"ebics_ini_letter_fn": fn,
"state": "init",
}
self._update_passphrase_vals(vals)
return self.write(vals)
def ebics_init_2(self):
"""
Initialization of bank keys - Step 2:
Activation of the account by the bank.
"""
self.ensure_one()
if self.state != "init":
raise UserError(_("Set state to 'Initialisation'."))
vals = {"state": "get_bank_keys"}
self._update_passphrase_vals(vals)
return self.write(vals)
def ebics_init_3(self):
"""
Initialization of bank keys - Step 3:
After the account has been activated the public bank keys
must be downloaded and checked for consistency.
"""
self.ensure_one()
if self.state != "get_bank_keys":
raise UserError(_("Set state to 'Get Keys from Bank'."))
try:
keyring = EbicsKeyRing(
keys=self.ebics_keys_fn, passphrase=self.ebics_passphrase
)
bank = EbicsBank(
keyring=keyring,
hostid=self.ebics_config_id.ebics_host,
url=self.ebics_config_id.ebics_url,
)
user = EbicsUser(
keyring=keyring,
partnerid=self.ebics_config_id.ebics_partner,
userid=self.name,
)
client = EbicsClient(bank, user, version=self.ebics_config_id.ebics_version)
except Exception as err:
exctype, value = exc_info()[:2]
error = _("EBICS Initialisation Error:")
error += "\n" + str(exctype) + "\n" + str(value)
raise UserError(error) from err
try:
public_bank_keys = client.HPB()
except EbicsFunctionalError as err:
e = exc_info()
error = _("EBICS Functional Error:")
error += "\n"
error += "{} (code: {})".format(e[1].message, e[1].code)
raise UserError(error) from err
except Exception as err:
exctype, value = exc_info()[:2]
error = _("EBICS Initialisation Error:")
error += "\n" + str(exctype) + "\n" + str(value)
raise UserError(error) from err
public_bank_keys = public_bank_keys.encode()
fn_date = fields.Date.today().isoformat()
fn = (
"_".join([self.ebics_config_id.ebics_host, "public_bank_keys", fn_date])
+ ".txt"
)
vals = {
"ebics_public_bank_keys": base64.encodebytes(public_bank_keys),
"ebics_public_bank_keys_fn": fn,
"state": "to_verify",
}
self._update_passphrase_vals(vals)
return self.write(vals)
def ebics_init_4(self):
"""
Initialization of bank keys - Step 2:
Confirm Verification of the public bank keys
and activate the bank keys.
"""
self.ensure_one()
if self.state != "to_verify":
raise UserError(_("Set state to 'Verification'."))
keyring = EbicsKeyRing(
keys=self.ebics_keys_fn, passphrase=self.ebics_passphrase
)
bank = EbicsBank(
keyring=keyring,
hostid=self.ebics_config_id.ebics_host,
url=self.ebics_config_id.ebics_url,
)
bank.activate_keys()
vals = {"state": "active_keys"}
self._update_passphrase_vals(vals)
return self.write(vals)
def change_passphrase(self):
self.ensure_one()
ctx = dict(self.env.context, default_ebics_userid_id=self.id)
module = __name__.split("addons.")[1].split(".")[0]
view = self.env.ref("%s.ebics_change_passphrase_view_form" % module)
return {
"name": _("EBICS keys change passphrase"),
"view_type": "form",
"view_mode": "form",
"res_model": "ebics.change.passphrase",
"view_id": view.id,
"target": "new",
"context": ctx,
"type": "ir.actions.act_window",
}
def _update_passphrase_vals(self, vals):
"""
Remove non-stored passphrases from db after e.g. successfull init_1
"""
if vals["state"] in ("init", "get_bank_keys", "to_verify", "active_keys"):
if not self.ebics_passphrase_store:
vals["ebics_passphrase"] = False
if self.ebics_sig_passphrase:
vals["ebics_sig_passphrase"] = False

View File

@@ -0,0 +1,46 @@
# Copyright 2009-2020 Noviat.
# License LGPL-3 or later (http://www.gnu.org/licenses/lgpl).
import logging
from sys import exc_info
from traceback import format_exception
from odoo.tools import config
_logger = logging.getLogger(__name__)
try:
import fintech
except ImportError:
fintech = None
_logger.warning("Failed to import fintech")
fintech_register_name = config.get("fintech_register_name")
fintech_register_keycode = config.get("fintech_register_keycode")
fintech_register_users = config.get("fintech_register_users")
try:
if fintech:
fintech_register_users = (
fintech_register_users
and [x.strip() for x in fintech_register_users.split(",")]
or None
)
fintech.cryptolib = "cryptography"
fintech.register(
name=fintech_register_name,
keycode=fintech_register_keycode,
users=fintech_register_users,
)
except RuntimeError as e:
if str(e) == "'register' can be called only once":
pass
else:
_logger.error(str(e))
fintech.register()
except Exception:
msg = "fintech.register error"
tb = "".join(format_exception(*exc_info()))
msg += "\n%s" % tb
_logger.error(msg)
fintech.register()