mirror of
https://github.com/brain-tec/account_ebics.git
synced 2024-11-23 20:52:04 +00:00
629 lines
23 KiB
Python
629 lines
23 KiB
Python
# Copyright 2009-2022 Noviat.
|
|
# License LGPL-3 or later (http://www.gnu.org/licenses/lpgl).
|
|
|
|
"""
|
|
import logging
|
|
logging.basicConfig(
|
|
level=logging.DEBUG,
|
|
format='[%(asctime)s] %(levelname)s - %(name)s: %(message)s')
|
|
"""
|
|
|
|
import base64
|
|
import logging
|
|
import os
|
|
from sys import exc_info
|
|
from traceback import format_exception
|
|
|
|
from odoo import _, api, fields, models
|
|
from odoo.exceptions import UserError
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
|
|
try:
|
|
import fintech
|
|
from fintech.ebics import (
|
|
BusinessTransactionFormat,
|
|
EbicsBank,
|
|
EbicsClient,
|
|
EbicsFunctionalError,
|
|
EbicsKeyRing,
|
|
EbicsTechnicalError,
|
|
EbicsUser,
|
|
EbicsVerificationError,
|
|
)
|
|
|
|
fintech.cryptolib = "cryptography"
|
|
except ImportError:
|
|
EbicsBank = object
|
|
_logger.warning("Failed to import fintech")
|
|
|
|
|
|
class EbicsBank(EbicsBank):
|
|
def _next_order_id(self, partnerid):
|
|
"""
|
|
EBICS protocol version H003 requires generation of the OrderID.
|
|
The OrderID must be a string between 'A000' and 'ZZZZ' and
|
|
unique for each partner id.
|
|
"""
|
|
return hasattr(self, "_order_number") and self._order_number or "A000"
|
|
|
|
|
|
class EbicsXfer(models.TransientModel):
|
|
_name = "ebics.xfer"
|
|
_description = "EBICS file transfer"
|
|
|
|
ebics_config_id = fields.Many2one(
|
|
comodel_name="ebics.config",
|
|
string="EBICS Configuration",
|
|
domain=[("state", "=", "confirm")],
|
|
default=lambda self: self._default_ebics_config_id(),
|
|
)
|
|
ebics_userid_id = fields.Many2one(
|
|
comodel_name="ebics.userid", string="EBICS UserID"
|
|
)
|
|
ebics_passphrase = fields.Char(string="EBICS Passphrase")
|
|
date_from = fields.Date()
|
|
date_to = fields.Date()
|
|
upload_data = fields.Binary(string="File to Upload")
|
|
upload_fname = fields.Char(string="Upload Filename", default="")
|
|
upload_fname_dummy = fields.Char(
|
|
related="upload_fname", string="Upload Filename", readonly=True
|
|
)
|
|
format_id = fields.Many2one(
|
|
comodel_name="ebics.file.format",
|
|
string="EBICS File Format",
|
|
help="Select EBICS File Format to upload/download."
|
|
"\nLeave blank to download all available files.",
|
|
)
|
|
allowed_format_ids = fields.Many2many(
|
|
related="ebics_config_id.ebics_file_format_ids",
|
|
string="Allowed EBICS File Formats",
|
|
)
|
|
order_type = fields.Char(
|
|
related="format_id.order_type",
|
|
string="Order Type",
|
|
)
|
|
test_mode = fields.Boolean(
|
|
help="Select this option to test if the syntax of "
|
|
"the upload file is correct."
|
|
"\nThis option is only available for "
|
|
"Order Type 'FUL'.",
|
|
)
|
|
note = fields.Text(string="EBICS file transfer Log", readonly=True)
|
|
|
|
@api.model
|
|
def _default_ebics_config_id(self):
|
|
cfg_mod = self.env["ebics.config"]
|
|
cfg = cfg_mod.search(
|
|
[
|
|
("company_ids", "in", self.env.user.company_ids.ids),
|
|
("state", "=", "confirm"),
|
|
]
|
|
)
|
|
if cfg and len(cfg) == 1:
|
|
return cfg
|
|
else:
|
|
return cfg_mod
|
|
|
|
@api.onchange("ebics_config_id")
|
|
def _onchange_ebics_config_id(self):
|
|
ebics_userids = self.ebics_config_id.ebics_userid_ids
|
|
if self._context.get("ebics_download"):
|
|
download_formats = self.ebics_config_id.ebics_file_format_ids.filtered(
|
|
lambda r: r.type == "down"
|
|
)
|
|
if len(download_formats) == 1:
|
|
self.format_id = download_formats
|
|
if len(ebics_userids) == 1:
|
|
self.ebics_userid_id = ebics_userids
|
|
else:
|
|
transport_users = ebics_userids.filtered(
|
|
lambda r: r.signature_class == "T"
|
|
)
|
|
if len(transport_users) == 1:
|
|
self.ebics_userid_id = transport_users
|
|
else:
|
|
upload_formats = self.ebics_config_id.ebics_file_format_ids.filtered(
|
|
lambda r: r.type == "up"
|
|
)
|
|
if len(upload_formats) == 1:
|
|
self.format_id = upload_formats
|
|
if len(ebics_userids) == 1:
|
|
self.ebics_userid_id = ebics_userids
|
|
|
|
@api.onchange("upload_data")
|
|
def _onchange_upload_data(self):
|
|
self.upload_fname_dummy = self.upload_fname
|
|
self.format_id = False
|
|
self._detect_upload_format()
|
|
if not self.format_id:
|
|
upload_formats = (
|
|
self.format_id
|
|
or self.ebics_config_id.ebics_file_format_ids.filtered(
|
|
lambda r: r.type == "up"
|
|
)
|
|
)
|
|
if len(upload_formats) > 1:
|
|
upload_formats = upload_formats.filtered(
|
|
lambda r: self.upload_fname.endswith(r.suffix)
|
|
)
|
|
if len(upload_formats) == 1:
|
|
self.format_id = upload_formats
|
|
|
|
@api.onchange("format_id")
|
|
def _onchange_format_id(self):
|
|
self.order_type = self.format_id.order_type
|
|
|
|
def ebics_upload(self):
|
|
self.ensure_one()
|
|
ctx = self._context.copy()
|
|
ebics_file = self._ebics_upload()
|
|
if ebics_file:
|
|
ctx["ebics_file_id"] = ebics_file.id
|
|
module = __name__.split("addons.")[1].split(".")[0]
|
|
result_view = self.env.ref("%s.ebics_xfer_view_form_result" % module)
|
|
return {
|
|
"name": _("EBICS file transfer result"),
|
|
"res_id": self.id,
|
|
"view_type": "form",
|
|
"view_mode": "form",
|
|
"res_model": "ebics.xfer",
|
|
"view_id": result_view.id,
|
|
"target": "new",
|
|
"context": ctx,
|
|
"type": "ir.actions.act_window",
|
|
}
|
|
|
|
def ebics_download(self):
|
|
self.ensure_one()
|
|
self.ebics_config_id._check_ebics_files()
|
|
ctx = self.env.context.copy()
|
|
self.note = ""
|
|
err_cnt = 0
|
|
client = self._setup_client()
|
|
if not client:
|
|
err_cnt += 1
|
|
self.note += (
|
|
_("EBICS client setup failed for connection '%s'")
|
|
% self.ebics_config_id.name
|
|
)
|
|
else:
|
|
download_formats = (
|
|
self.format_id
|
|
or self.ebics_config_id.ebics_file_format_ids.filtered(
|
|
lambda r: r.type == "down"
|
|
)
|
|
)
|
|
ebics_files = self.env["ebics.file"]
|
|
date_from = self.date_from and self.date_from.isoformat() or None
|
|
date_to = self.date_to and self.date_to.isoformat() or None
|
|
for df in download_formats:
|
|
try:
|
|
success = False
|
|
if df.order_type == "BTD":
|
|
btf = BusinessTransactionFormat(
|
|
df.btf_service,
|
|
df.btf_message,
|
|
scope=df.btf_scope or None,
|
|
option=df.btf_option or None,
|
|
container=df.btf_container or None,
|
|
version=df.btf_version or None,
|
|
variant=df.btf_variant or None,
|
|
format=df.btf_format or None,
|
|
)
|
|
data = client.BTD(btf, start=date_from, end=date_to)
|
|
elif df.order_type == "FDL":
|
|
data = client.FDL(df.name, date_from, date_to)
|
|
else:
|
|
params = None
|
|
if date_from and date_to:
|
|
params = {
|
|
"DateRange": {
|
|
"Start": date_from,
|
|
"End": date_to,
|
|
}
|
|
}
|
|
data = client.download(df.order_type, params=params)
|
|
ebics_files += self._handle_download_data(data, df)
|
|
success = True
|
|
except EbicsFunctionalError:
|
|
err_cnt += 1
|
|
e = exc_info()
|
|
self.note += "\n"
|
|
self.note += _(
|
|
"EBICS Functional Error during download of "
|
|
"File Format %(name)s (%(order_type)s):",
|
|
name=df.name,
|
|
order_type=df.order_type,
|
|
)
|
|
self.note += "\n"
|
|
self.note += "{} (code: {})".format(e[1].message, e[1].code)
|
|
except EbicsTechnicalError:
|
|
err_cnt += 1
|
|
e = exc_info()
|
|
self.note += "\n"
|
|
self.note += _(
|
|
"EBICS Technical Error during download of "
|
|
"File Format %(name)s (%(order_type)s):",
|
|
name=df.name,
|
|
order_type=df.order_type,
|
|
)
|
|
self.note += "\n"
|
|
self.note += "{} (code: {})".format(e[1].message, e[1].code)
|
|
except EbicsVerificationError:
|
|
err_cnt += 1
|
|
self.note += "\n"
|
|
self.note += _(
|
|
"EBICS Verification Error during download of "
|
|
"File Format %(name)s (%(order_type)s):",
|
|
name=df.name,
|
|
order_type=df.order_type,
|
|
)
|
|
self.note += "\n"
|
|
self.note += _("The EBICS response could not be verified.")
|
|
except UserError as e:
|
|
self.note += "\n"
|
|
self.note += _(
|
|
"Warning during download of "
|
|
"File Format %(name)s (%(order_type)s):",
|
|
name=df.name,
|
|
order_type=df.order_type,
|
|
)
|
|
self.note += "\n"
|
|
self.note += e.name
|
|
except Exception:
|
|
err_cnt += 1
|
|
self.note += "\n"
|
|
self.note += _(
|
|
"Unknown Error during download of "
|
|
"File Format %(name)s (%(order_type)s):",
|
|
name=df.name,
|
|
order_type=df.order_type,
|
|
)
|
|
tb = "".join(format_exception(*exc_info()))
|
|
self.note += "\n%s" % tb
|
|
else:
|
|
# mark received data so that it is not included in further
|
|
# downloads
|
|
trans_id = client.last_trans_id
|
|
client.confirm_download(trans_id=trans_id, success=success)
|
|
|
|
ctx["ebics_file_ids"] = ebics_files.ids
|
|
|
|
if ebics_files:
|
|
self.note += "\n"
|
|
for f in ebics_files:
|
|
self.note += (
|
|
_("EBICS File '%s' is available for further processing.")
|
|
% f.name
|
|
)
|
|
self.note += "\n"
|
|
|
|
ctx["err_cnt"] = err_cnt
|
|
module = __name__.split("addons.")[1].split(".")[0]
|
|
result_view = self.env.ref("%s.ebics_xfer_view_form_result" % module)
|
|
return {
|
|
"name": _("EBICS file transfer result"),
|
|
"res_id": self.id,
|
|
"view_type": "form",
|
|
"view_mode": "form",
|
|
"res_model": "ebics.xfer",
|
|
"view_id": result_view.id,
|
|
"target": "new",
|
|
"context": ctx,
|
|
"type": "ir.actions.act_window",
|
|
}
|
|
|
|
def button_close(self):
|
|
self.ensure_one()
|
|
return {"type": "ir.actions.act_window_close"}
|
|
|
|
def view_ebics_file(self):
|
|
self.ensure_one()
|
|
module = __name__.split("addons.")[1].split(".")[0]
|
|
act = self.env["ir.actions.act_window"]._for_xml_id(
|
|
"{}.ebics_file_action_download".format(module)
|
|
)
|
|
act["domain"] = [("id", "in", self._context["ebics_file_ids"])]
|
|
return act
|
|
|
|
def _ebics_upload(self):
|
|
self.ensure_one()
|
|
ebics_file = self.env["ebics.file"]
|
|
self.note = ""
|
|
client = self._setup_client()
|
|
if client:
|
|
upload_data = base64.decodebytes(self.upload_data)
|
|
ef_format = self.format_id
|
|
OrderID = False
|
|
try:
|
|
order_type = self.order_type
|
|
if order_type == "FUL":
|
|
kwargs = {}
|
|
bank = self.ebics_config_id.journal_ids[0].bank_id
|
|
cc = bank.country.code
|
|
if cc:
|
|
kwargs["country"] = cc
|
|
if self.test_mode:
|
|
kwargs["TEST"] = "TRUE"
|
|
OrderID = client.FUL(ef_format.name, upload_data, **kwargs)
|
|
else:
|
|
OrderID = client.upload(order_type, upload_data)
|
|
if OrderID:
|
|
self.note += "\n"
|
|
self.note += (
|
|
_("EBICS File has been uploaded (OrderID %s).") % OrderID
|
|
)
|
|
ef_note = _("EBICS OrderID: %s") % OrderID
|
|
if self.env.context.get("origin"):
|
|
ef_note += "\n" + _("Origin: %s") % self._context["origin"]
|
|
suffix = self.format_id.suffix
|
|
fn = self.upload_fname
|
|
if not fn.endswith(suffix):
|
|
fn = ".".join([fn, suffix])
|
|
ef_vals = {
|
|
"name": self.upload_fname,
|
|
"data": self.upload_data,
|
|
"date": fields.Datetime.now(),
|
|
"format_id": self.format_id.id,
|
|
"state": "done",
|
|
"user_id": self._uid,
|
|
"ebics_userid_id": self.ebics_userid_id.id,
|
|
"note": ef_note,
|
|
"company_ids": [
|
|
self.env.context.get("force_company", self.env.company.id)
|
|
],
|
|
}
|
|
self._update_ef_vals(ef_vals)
|
|
ebics_file = self.env["ebics.file"].create(ef_vals)
|
|
|
|
except EbicsFunctionalError:
|
|
e = exc_info()
|
|
self.note += "\n"
|
|
self.note += _("EBICS Functional Error:")
|
|
self.note += "\n"
|
|
self.note += "{} (code: {})".format(e[1].message, e[1].code)
|
|
except EbicsTechnicalError:
|
|
e = exc_info()
|
|
self.note += "\n"
|
|
self.note += _("EBICS Technical Error:")
|
|
self.note += "\n"
|
|
self.note += "{} (code: {})".format(e[1].message, e[1].code)
|
|
except EbicsVerificationError:
|
|
self.note += "\n"
|
|
self.note += _("EBICS Verification Error:")
|
|
self.note += "\n"
|
|
self.note += _("The EBICS response could not be verified.")
|
|
except Exception:
|
|
self.note += "\n"
|
|
self.note += _("Unknown Error")
|
|
tb = "".join(format_exception(*exc_info()))
|
|
self.note += "\n%s" % tb
|
|
|
|
if self.ebics_config_id.ebics_version == "H003":
|
|
OrderID = self.ebics_config_id._get_order_number()
|
|
self.ebics_config_id.sudo()._update_order_number(OrderID)
|
|
|
|
return ebics_file
|
|
|
|
def _setup_client(self):
|
|
self.ebics_config_id._check_ebics_keys()
|
|
passphrase = self._get_passphrase()
|
|
keyring = EbicsKeyRing(
|
|
keys=self.ebics_userid_id.ebics_keys_fn, passphrase=passphrase
|
|
)
|
|
|
|
bank = EbicsBank(
|
|
keyring=keyring,
|
|
hostid=self.ebics_config_id.ebics_host,
|
|
url=self.ebics_config_id.ebics_url,
|
|
)
|
|
if self.ebics_config_id.ebics_version == "H003":
|
|
bank._order_number = self.ebics_config_id._get_order_number()
|
|
|
|
user = EbicsUser(
|
|
keyring=keyring,
|
|
partnerid=self.ebics_config_id.ebics_partner,
|
|
userid=self.ebics_userid_id.name,
|
|
)
|
|
signature_class = (
|
|
self.format_id.signature_class or self.ebics_userid_id.signature_class
|
|
)
|
|
if signature_class == "T":
|
|
user.manual_approval = True
|
|
|
|
try:
|
|
client = EbicsClient(bank, user, version=self.ebics_config_id.ebics_version)
|
|
except Exception:
|
|
self.note += "\n"
|
|
self.note += _("Unknown Error")
|
|
tb = "".join(format_exception(*exc_info()))
|
|
self.note += "\n%s" % tb
|
|
client = False
|
|
|
|
return client
|
|
|
|
def _get_passphrase(self):
|
|
passphrase = self.ebics_userid_id.ebics_passphrase
|
|
|
|
if passphrase:
|
|
return passphrase
|
|
|
|
module = __name__.split("addons.")[1].split(".")[0]
|
|
passphrase_view = self.env.ref("%s.ebics_xfer_view_form_passphrase" % module)
|
|
return {
|
|
"name": _("EBICS file transfer"),
|
|
"res_id": self.id,
|
|
"view_type": "form",
|
|
"view_mode": "form",
|
|
"res_model": "ebics.xfer",
|
|
"view_id": passphrase_view.id,
|
|
"target": "new",
|
|
"context": self._context,
|
|
"type": "ir.actions.act_window",
|
|
}
|
|
|
|
def _file_format_methods(self):
|
|
"""
|
|
Extend this dictionary in order to add support
|
|
for extra file formats.
|
|
"""
|
|
res = {
|
|
"camt.xxx.cfonb120.stm": self._handle_cfonb120,
|
|
"camt.xxx.cfonb120.stm.rfi": self._handle_cfonb120,
|
|
"camt.052.001.02.stm": self._handle_camt052,
|
|
"camt.053.001.02.stm": self._handle_camt053,
|
|
}
|
|
return res
|
|
|
|
def _update_ef_vals(self, ef_vals):
|
|
"""
|
|
Adapt this method to customize the EBICS File values.
|
|
"""
|
|
if self.format_id and self.format_id.type == "up":
|
|
fn = ef_vals["name"]
|
|
dups = self._check_duplicate_ebics_file(fn, self.format_id)
|
|
if dups:
|
|
n = 1
|
|
fn = "_".join([fn, str(n)])
|
|
while self._check_duplicate_ebics_file(fn, self.format_id):
|
|
n += 1
|
|
fn = "_".join([fn, str(n)])
|
|
ef_vals["name"] = fn
|
|
|
|
def _handle_download_data(self, data, file_format):
|
|
ebics_files = self.env["ebics.file"]
|
|
if isinstance(data, dict):
|
|
for doc in data:
|
|
ebics_files += self._create_ebics_file(
|
|
data[doc], file_format, docname=doc
|
|
)
|
|
else:
|
|
ebics_files += self._create_ebics_file(data, file_format)
|
|
return ebics_files
|
|
|
|
def _create_ebics_file(self, data, file_format, docname=None):
|
|
"""
|
|
Write the data as received over the EBICS connection
|
|
to a temporary file so that is is available for
|
|
analysis (e.g. in case formats are received that cannot
|
|
be handled in the current version of this module).
|
|
|
|
TODO: add code to clean-up /tmp on a regular basis.
|
|
|
|
After saving the data received we call the method to perform
|
|
file format specific processing.
|
|
"""
|
|
ebics_files_root = self.ebics_config_id.ebics_files
|
|
tmp_dir = os.path.normpath(ebics_files_root + "/tmp")
|
|
if not os.path.isdir(tmp_dir):
|
|
os.makedirs(tmp_dir, mode=0o700)
|
|
fn_parts = [self.ebics_config_id.ebics_host, self.ebics_config_id.ebics_partner]
|
|
if docname:
|
|
fn_parts.append(docname)
|
|
else:
|
|
fn_date = self.date_to or fields.Date.today()
|
|
fn_parts.append(fn_date.isoformat())
|
|
base_fn = "_".join(fn_parts)
|
|
n = 1
|
|
full_tmp_fn = os.path.normpath(tmp_dir + "/" + base_fn)
|
|
while os.path.exists(full_tmp_fn):
|
|
n += 1
|
|
tmp_fn = base_fn + "_" + str(n).rjust(3, "0")
|
|
full_tmp_fn = os.path.normpath(tmp_dir + "/" + tmp_fn)
|
|
|
|
with open(full_tmp_fn, "wb") as f:
|
|
f.write(data)
|
|
|
|
ff_methods = self._file_format_methods()
|
|
if file_format.name in ff_methods:
|
|
data = ff_methods[file_format.name](data)
|
|
|
|
fn = ".".join([base_fn, file_format.suffix])
|
|
dups = self._check_duplicate_ebics_file(fn, file_format)
|
|
if dups:
|
|
raise UserError(
|
|
_(
|
|
"EBICS File with name '%s' has already been downloaded."
|
|
"\nPlease check this file and rename in case there is "
|
|
"no risk on duplicate transactions."
|
|
)
|
|
% fn
|
|
)
|
|
data = base64.encodebytes(data)
|
|
ef_vals = {
|
|
"name": fn,
|
|
"data": data,
|
|
"date": fields.Datetime.now(),
|
|
"date_from": self.date_from,
|
|
"date_to": self.date_to,
|
|
"format_id": file_format.id,
|
|
"user_id": self._uid,
|
|
"ebics_userid_id": self.ebics_userid_id.id,
|
|
"company_ids": self.ebics_config_id.company_ids.ids,
|
|
}
|
|
self._update_ef_vals(ef_vals)
|
|
ebics_file = self.env["ebics.file"].create(ef_vals)
|
|
return ebics_file
|
|
|
|
def _check_duplicate_ebics_file(self, fn, file_format):
|
|
dups = self.env["ebics.file"].search(
|
|
[("name", "=", fn), ("format_id", "=", file_format.id)]
|
|
)
|
|
return dups
|
|
|
|
def _detect_upload_format(self):
|
|
"""
|
|
Use this method in order to automatically detect and set the
|
|
EBICS upload file format.
|
|
"""
|
|
|
|
def _update_order_number(self, OrderID):
|
|
o_list = list(OrderID)
|
|
for i, c in enumerate(reversed(o_list), start=1):
|
|
if c == "9":
|
|
o_list[-i] = "A"
|
|
break
|
|
if c == "Z":
|
|
continue
|
|
else:
|
|
o_list[-i] = chr(ord(c) + 1)
|
|
break
|
|
next_nr = "".join(o_list)
|
|
if next_nr == "ZZZZ":
|
|
next_nr = "A000"
|
|
self.ebics_config_id.order_number = next_nr
|
|
|
|
def _insert_line_terminator(self, data_in, line_len):
|
|
data_in = data_in.replace(b"\n", b"").replace(b"\r", b"")
|
|
data_out = b""
|
|
max_len = len(data_in)
|
|
i = 0
|
|
while i + line_len <= max_len:
|
|
data_out += data_in[i : i + line_len] + b"\n"
|
|
i += line_len
|
|
return data_out
|
|
|
|
def _handle_cfonb120(self, data_in):
|
|
return self._insert_line_terminator(data_in, 120)
|
|
|
|
def _handle_cfonb240(self, data_in):
|
|
return self._insert_line_terminator(data_in, 240)
|
|
|
|
def _handle_camt052(self, data_in):
|
|
"""
|
|
Use this method if you need to fix camt files received
|
|
from your bank before passing them to the
|
|
Odoo Community CAMT parser.
|
|
Remark: Odoo Enterprise doesn't support camt.052.
|
|
"""
|
|
return data_in
|
|
|
|
def _handle_camt053(self, data_in):
|
|
"""
|
|
Use this method if you need to fix camt files received
|
|
from your bank before passing them to the
|
|
Odoo Enterprise or Community CAMT parser.
|
|
"""
|
|
return data_in
|