[14.0]add account_ebics_batch

This commit is contained in:
Luc De Meyer 2022-05-07 18:42:03 +02:00
parent ac64eb5a83
commit 503d0821d4
2 changed files with 280 additions and 244 deletions

View File

@ -2,31 +2,31 @@
# License LGPL-3 or later (http://www.gnu.org/licenses/lpgl). # License LGPL-3 or later (http://www.gnu.org/licenses/lpgl).
{ {
'name': 'EBICS banking protocol', "name": "EBICS banking protocol",
'version': '14.0.1.0.2', "version": "14.0.1.0.3",
'license': 'LGPL-3', "license": "LGPL-3",
'author': 'Noviat', "author": "Noviat",
'website': 'www.noviat.com', "website": "www.noviat.com",
'category': 'Accounting & Finance', "category": "Accounting & Finance",
'depends': ['account'], "depends": ["account"],
'data': [ "data": [
'security/ebics_security.xml', "security/ebics_security.xml",
'security/ir.model.access.csv', "security/ir.model.access.csv",
'data/ebics_file_format.xml', "data/ebics_file_format.xml",
'views/ebics_config_views.xml', "views/ebics_config_views.xml",
'views/ebics_file_views.xml', "views/ebics_file_views.xml",
'views/ebics_userid_views.xml', "views/ebics_userid_views.xml",
'views/ebics_file_format_views.xml', "views/ebics_file_format_views.xml",
'wizards/ebics_change_passphrase.xml', "wizards/ebics_change_passphrase.xml",
'wizards/ebics_xfer.xml', "wizards/ebics_xfer.xml",
'views/menu.xml', "views/menu.xml",
], ],
'installable': True, "installable": True,
'application': True, "application": True,
'external_dependencies': { "external_dependencies": {
'python': [ "python": [
'fintech', "fintech",
'cryptography', "cryptography",
] ]
}, },
} }

View File

@ -14,125 +14,146 @@ import os
from sys import exc_info from sys import exc_info
from traceback import format_exception from traceback import format_exception
from odoo import api, fields, models, _ from odoo import _, api, fields, models
from odoo.exceptions import UserError from odoo.exceptions import UserError
_logger = logging.getLogger(__name__) _logger = logging.getLogger(__name__)
try: try:
import fintech import fintech
from fintech.ebics import EbicsKeyRing, EbicsBank, EbicsUser, EbicsClient,\ from fintech.ebics import (
EbicsFunctionalError, EbicsTechnicalError, EbicsVerificationError EbicsBank,
fintech.cryptolib = 'cryptography' EbicsClient,
EbicsFunctionalError,
EbicsKeyRing,
EbicsTechnicalError,
EbicsUser,
EbicsVerificationError,
)
fintech.cryptolib = "cryptography"
except ImportError: except ImportError:
EbicsBank = object EbicsBank = object
_logger.warning('Failed to import fintech') _logger.warning("Failed to import fintech")
class EbicsBank(EbicsBank): class EbicsBank(EbicsBank):
def _next_order_id(self, partnerid): def _next_order_id(self, partnerid):
""" """
EBICS protocol version H003 requires generation of the OrderID. EBICS protocol version H003 requires generation of the OrderID.
The OrderID must be a string between 'A000' and 'ZZZZ' and The OrderID must be a string between 'A000' and 'ZZZZ' and
unique for each partner id. unique for each partner id.
""" """
return hasattr(self, '_order_number') and self._order_number or 'A000' return hasattr(self, "_order_number") and self._order_number or "A000"
class EbicsXfer(models.TransientModel): class EbicsXfer(models.TransientModel):
_name = 'ebics.xfer' _name = "ebics.xfer"
_description = 'EBICS file transfer' _description = "EBICS file transfer"
ebics_config_id = fields.Many2one( ebics_config_id = fields.Many2one(
comodel_name='ebics.config', comodel_name="ebics.config",
string='EBICS Configuration', string="EBICS Configuration",
domain=[('state', '=', 'confirm')], domain=[("state", "=", "confirm")],
default=lambda self: self._default_ebics_config_id()) default=lambda self: self._default_ebics_config_id(),
)
ebics_userid_id = fields.Many2one( ebics_userid_id = fields.Many2one(
comodel_name='ebics.userid', comodel_name="ebics.userid", string="EBICS UserID"
string='EBICS UserID') )
ebics_passphrase = fields.Char( ebics_passphrase = fields.Char(string="EBICS Passphrase")
string='EBICS Passphrase')
date_from = fields.Date() date_from = fields.Date()
date_to = fields.Date() date_to = fields.Date()
upload_data = fields.Binary(string='File to Upload') upload_data = fields.Binary(string="File to Upload")
upload_fname = fields.Char( upload_fname = fields.Char(string="Upload Filename", default="")
string='Upload Filename', default='')
upload_fname_dummy = fields.Char( upload_fname_dummy = fields.Char(
related='upload_fname', string='Upload Filename', readonly=True) related="upload_fname", string="Upload Filename", readonly=True
)
format_id = fields.Many2one( format_id = fields.Many2one(
comodel_name='ebics.file.format', comodel_name="ebics.file.format",
string='EBICS File Format', string="EBICS File Format",
help="Select EBICS File Format to upload/download." help="Select EBICS File Format to upload/download."
"\nLeave blank to download all available files.") "\nLeave blank to download all available files.",
)
allowed_format_ids = fields.Many2many( allowed_format_ids = fields.Many2many(
related='ebics_config_id.ebics_file_format_ids', related="ebics_config_id.ebics_file_format_ids",
string='Allowed EBICS File Formats') string="Allowed EBICS File Formats",
)
order_type = fields.Char( order_type = fields.Char(
related='format_id.order_type', related="format_id.order_type",
string='Order Type', string="Order Type",
help="For most banks in France you should use the " help="For most banks in France you should use the "
"format neutral Order Types 'FUL' for upload " "format neutral Order Types 'FUL' for upload "
"and 'FDL' for download.") "and 'FDL' for download.",
)
test_mode = fields.Boolean( test_mode = fields.Boolean(
string='Test Mode', string="Test Mode",
help="Select this option to test if the syntax of " help="Select this option to test if the syntax of "
"the upload file is correct." "the upload file is correct."
"\nThis option is only available for " "\nThis option is only available for "
"Order Type 'FUL'.") "Order Type 'FUL'.",
note = fields.Text(string='EBICS file transfer Log', readonly=True) )
note = fields.Text(string="EBICS file transfer Log", readonly=True)
@api.model @api.model
def _default_ebics_config_id(self): def _default_ebics_config_id(self):
cfg_mod = self.env['ebics.config'] cfg_mod = self.env["ebics.config"]
cfg = cfg_mod.search( cfg = cfg_mod.search(
[('company_ids', 'in', self.env.user.company_ids.ids), [
('state', '=', 'confirm')]) ("company_ids", "in", self.env.user.company_ids.ids),
("state", "=", "confirm"),
]
)
if cfg and len(cfg) == 1: if cfg and len(cfg) == 1:
return cfg return cfg
else: else:
return cfg_mod return cfg_mod
@api.onchange('ebics_config_id') @api.onchange("ebics_config_id")
def _onchange_ebics_config_id(self): def _onchange_ebics_config_id(self):
ebics_userids = self.ebics_config_id.ebics_userid_ids ebics_userids = self.ebics_config_id.ebics_userid_ids
if self._context.get('ebics_download'): if self._context.get("ebics_download"):
download_formats = self.ebics_config_id.ebics_file_format_ids\ download_formats = self.ebics_config_id.ebics_file_format_ids.filtered(
.filtered(lambda r: r.type == 'down') lambda r: r.type == "down"
)
if len(download_formats) == 1: if len(download_formats) == 1:
self.format_id = download_formats self.format_id = download_formats
if len(ebics_userids) == 1: if len(ebics_userids) == 1:
self.ebics_userid_id = ebics_userids self.ebics_userid_id = ebics_userids
else: else:
transport_users = ebics_userids.filtered( transport_users = ebics_userids.filtered(
lambda r: r.signature_class == 'T') lambda r: r.signature_class == "T"
)
if len(transport_users) == 1: if len(transport_users) == 1:
self.ebics_userid_id = transport_users self.ebics_userid_id = transport_users
else: else:
upload_formats = self.ebics_config_id.ebics_file_format_ids\ upload_formats = self.ebics_config_id.ebics_file_format_ids.filtered(
.filtered(lambda r: r.type == 'up') lambda r: r.type == "up"
)
if len(upload_formats) == 1: if len(upload_formats) == 1:
self.format_id = upload_formats self.format_id = upload_formats
if len(ebics_userids) == 1: if len(ebics_userids) == 1:
self.ebics_userid_id = ebics_userids self.ebics_userid_id = ebics_userids
@api.onchange('upload_data') @api.onchange("upload_data")
def _onchange_upload_data(self): def _onchange_upload_data(self):
self.upload_fname_dummy = self.upload_fname self.upload_fname_dummy = self.upload_fname
self.format_id = False self.format_id = False
self._detect_upload_format() self._detect_upload_format()
if not self.format_id: if not self.format_id:
upload_formats = self.format_id \ upload_formats = (
self.format_id
or self.ebics_config_id.ebics_file_format_ids.filtered( or self.ebics_config_id.ebics_file_format_ids.filtered(
lambda r: r.type == 'up') lambda r: r.type == "up"
)
)
if len(upload_formats) > 1: if len(upload_formats) > 1:
upload_formats = upload_formats.filtered( upload_formats = upload_formats.filtered(
lambda r: self.upload_fname.endswith(r.suffix)) lambda r: self.upload_fname.endswith(r.suffix)
)
if len(upload_formats) == 1: if len(upload_formats) == 1:
self.format_id = upload_formats self.format_id = upload_formats
@api.onchange('format_id') @api.onchange("format_id")
def _onchange_format_id(self): def _onchange_format_id(self):
self.order_type = self.format_id.order_type self.order_type = self.format_id.order_type
@ -141,138 +162,152 @@ class EbicsXfer(models.TransientModel):
ctx = self._context.copy() ctx = self._context.copy()
ebics_file = self._ebics_upload() ebics_file = self._ebics_upload()
if ebics_file: if ebics_file:
ctx['ebics_file_id'] = ebics_file.id ctx["ebics_file_id"] = ebics_file.id
module = __name__.split('addons.')[1].split('.')[0] module = __name__.split("addons.")[1].split(".")[0]
result_view = self.env.ref( result_view = self.env.ref("%s.ebics_xfer_view_form_result" % module)
'%s.ebics_xfer_view_form_result' % module)
return { return {
'name': _('EBICS file transfer result'), "name": _("EBICS file transfer result"),
'res_id': self.id, "res_id": self.id,
'view_type': 'form', "view_type": "form",
'view_mode': 'form', "view_mode": "form",
'res_model': 'ebics.xfer', "res_model": "ebics.xfer",
'view_id': result_view.id, "view_id": result_view.id,
'target': 'new', "target": "new",
'context': ctx, "context": ctx,
'type': 'ir.actions.act_window', "type": "ir.actions.act_window",
} }
def ebics_download(self): def ebics_download(self):
self.ensure_one() self.ensure_one()
self.ebics_config_id._check_ebics_files() self.ebics_config_id._check_ebics_files()
ctx = self._context.copy() ctx = self.env.context.copy()
self.note = '' self.note = ""
err_cnt = 0
client = self._setup_client() client = self._setup_client()
if client: if not client:
err_cnt += 1
self.note += (
_("EBICS client setup failed for connection '%s'")
% self.ebics_config_id.name
)
else:
download_formats = ( download_formats = (
self.format_id self.format_id
or self.ebics_config_id.ebics_file_format_ids.filtered( or self.ebics_config_id.ebics_file_format_ids.filtered(
lambda r: r.type == 'down' lambda r: r.type == "down"
) )
) )
ebics_files = self.env['ebics.file'] ebics_files = self.env["ebics.file"]
date_from = self.date_from and self.date_from.isoformat() or None date_from = self.date_from and self.date_from.isoformat() or None
date_to = self.date_to and self.date_to.isoformat() or None date_to = self.date_to and self.date_to.isoformat() or None
for df in download_formats: for df in download_formats:
try: try:
success = False success = False
if df.order_type == 'FDL': if df.order_type == "FDL":
data = client.FDL(df.name, date_from, date_to) data = client.FDL(df.name, date_from, date_to)
else: else:
params = None params = None
if date_from and date_to: if date_from and date_to:
params = {'DateRange': { params = {
'Start': date_from, "DateRange": {
'End': date_to, "Start": date_from,
}} "End": date_to,
}
}
data = client.download(df.order_type, params=params) data = client.download(df.order_type, params=params)
ebics_files += self._handle_download_data(data, df) ebics_files += self._handle_download_data(data, df)
success = True success = True
except EbicsFunctionalError: except EbicsFunctionalError:
err_cnt += 1
e = exc_info() e = exc_info()
self.note += '\n' self.note += "\n"
self.note += _( self.note += _(
"EBICS Functional Error during download of File Format %s (%s):" "EBICS Functional Error during download of File Format %s (%s):"
) % (df.name, df.order_type) ) % (df.name, df.order_type)
self.note += '\n' self.note += "\n"
self.note += '%s (code: %s)' % (e[1].message, e[1].code) self.note += "{} (code: {})".format(e[1].message, e[1].code)
except EbicsTechnicalError: except EbicsTechnicalError:
err_cnt += 1
e = exc_info() e = exc_info()
self.note += '\n' self.note += "\n"
self.note += _( self.note += _(
"EBICS Technical Error during download of File Format %s (%s):" "EBICS Technical Error during download of File Format %s (%s):"
) % (df.name, df.order_type) ) % (df.name, df.order_type)
self.note += '\n' self.note += "\n"
self.note += '%s (code: %s)' % (e[1].message, e[1].code) self.note += "{} (code: {})".format(e[1].message, e[1].code)
except EbicsVerificationError: except EbicsVerificationError:
self.note += '\n' err_cnt += 1
self.note += "\n"
self.note += _( self.note += _(
"EBICS Verification Error during download of " "EBICS Verification Error during download of "
"File Format %s (%s):" "File Format %s (%s):"
) % (df.name, df.order_type) ) % (df.name, df.order_type)
self.note += '\n' self.note += "\n"
self.note += _("The EBICS response could not be verified.") self.note += _("The EBICS response could not be verified.")
except UserError as e: except UserError as e:
self.note += '\n' self.note += "\n"
self.note += _( self.note += _(
"Warning during download of File Format %s (%s):" "Warning during download of File Format %s (%s):"
) % (df.name, df.order_type) ) % (df.name, df.order_type)
self.note += '\n' self.note += "\n"
self.note += e.name self.note += e.name
except Exception: except Exception:
self.note += '\n' err_cnt += 1
self.note += "\n"
self.note += _( self.note += _(
"Unknown Error during download of File Format %s (%s):" "Unknown Error during download of File Format %s (%s):"
) % (df.name, df.order_type) ) % (df.name, df.order_type)
tb = ''.join(format_exception(*exc_info())) tb = "".join(format_exception(*exc_info()))
self.note += '\n%s' % tb self.note += "\n%s" % tb
else: else:
# mark received data so that it is not included in further # mark received data so that it is not included in further
# downloads # downloads
trans_id = client.last_trans_id trans_id = client.last_trans_id
client.confirm_download(trans_id=trans_id, success=success) client.confirm_download(trans_id=trans_id, success=success)
ctx['ebics_file_ids'] = ebics_files._ids ctx["ebics_file_ids"] = ebics_files.ids
if ebics_files: if ebics_files:
self.note += '\n' self.note += "\n"
for f in ebics_files: for f in ebics_files:
self.note += _( self.note += (
"EBICS File '%s' is available for further processing." _("EBICS File '%s' is available for further processing.")
) % f.name % f.name
self.note += '\n' )
self.note += "\n"
module = __name__.split('addons.')[1].split('.')[0] ctx["err_cnt"] = err_cnt
result_view = self.env.ref( module = __name__.split("addons.")[1].split(".")[0]
'%s.ebics_xfer_view_form_result' % module) result_view = self.env.ref("%s.ebics_xfer_view_form_result" % module)
return { return {
'name': _('EBICS file transfer result'), "name": _("EBICS file transfer result"),
'res_id': self.id, "res_id": self.id,
'view_type': 'form', "view_type": "form",
'view_mode': 'form', "view_mode": "form",
'res_model': 'ebics.xfer', "res_model": "ebics.xfer",
'view_id': result_view.id, "view_id": result_view.id,
'target': 'new', "target": "new",
'context': ctx, "context": ctx,
'type': 'ir.actions.act_window', "type": "ir.actions.act_window",
} }
def button_close(self): def button_close(self):
self.ensure_one() self.ensure_one()
return {'type': 'ir.actions.act_window_close'} return {"type": "ir.actions.act_window_close"}
def view_ebics_file(self): def view_ebics_file(self):
self.ensure_one() self.ensure_one()
module = __name__.split('addons.')[1].split('.')[0] module = __name__.split("addons.")[1].split(".")[0]
act = self.env['ir.actions.act_window']._for_xml_id( act = self.env["ir.actions.act_window"]._for_xml_id(
'{}.ebics_file_action_download'.format(module)) "{}.ebics_file_action_download".format(module)
act['domain'] = [('id', 'in', self._context['ebics_file_ids'])] )
act["domain"] = [("id", "in", self._context["ebics_file_ids"])]
return act return act
def _ebics_upload(self): def _ebics_upload(self):
self.ensure_one() self.ensure_one()
ebics_file = self.env['ebics.file'] ebics_file = self.env["ebics.file"]
self.note = '' self.note = ""
client = self._setup_client() client = self._setup_client()
if client: if client:
upload_data = base64.decodestring(self.upload_data) upload_data = base64.decodestring(self.upload_data)
@ -280,70 +315,69 @@ class EbicsXfer(models.TransientModel):
OrderID = False OrderID = False
try: try:
order_type = self.order_type order_type = self.order_type
if order_type == 'FUL': if order_type == "FUL":
kwargs = {} kwargs = {}
bank = self.ebics_config_id.journal_ids[0].bank_id bank = self.ebics_config_id.journal_ids[0].bank_id
cc = bank.country.code cc = bank.country.code
if cc: if cc:
kwargs['country'] = cc kwargs["country"] = cc
if self.test_mode: if self.test_mode:
kwargs['TEST'] = 'TRUE' kwargs["TEST"] = "TRUE"
OrderID = client.FUL(ef_format.name, upload_data, **kwargs) OrderID = client.FUL(ef_format.name, upload_data, **kwargs)
else: else:
OrderID = client.upload(order_type, upload_data) OrderID = client.upload(order_type, upload_data)
if OrderID: if OrderID:
self.note += '\n' self.note += "\n"
self.note += _( self.note += (
"EBICS File has been uploaded (OrderID %s)." _("EBICS File has been uploaded (OrderID %s).") % OrderID
) % OrderID )
ef_note = _("EBICS OrderID: %s") % OrderID ef_note = _("EBICS OrderID: %s") % OrderID
if self.env.context.get('origin'): if self.env.context.get("origin"):
ef_note += '\n' + _( ef_note += "\n" + _("Origin: %s") % self._context["origin"]
"Origin: %s") % self._context['origin']
suffix = self.format_id.suffix suffix = self.format_id.suffix
fn = self.upload_fname fn = self.upload_fname
if not fn.endswith(suffix): if not fn.endswith(suffix):
fn = '.'.join([fn, suffix]) fn = ".".join([fn, suffix])
ef_vals = { ef_vals = {
'name': self.upload_fname, "name": self.upload_fname,
'data': self.upload_data, "data": self.upload_data,
'date': fields.Datetime.now(), "date": fields.Datetime.now(),
'format_id': self.format_id.id, "format_id": self.format_id.id,
'state': 'done', "state": "done",
'user_id': self._uid, "user_id": self._uid,
'ebics_userid_id': self.ebics_userid_id.id, "ebics_userid_id": self.ebics_userid_id.id,
'note': ef_note, "note": ef_note,
"company_ids": [ "company_ids": [
self.env.context.get("force_company", self.env.company.id) self.env.context.get("force_company", self.env.company.id)
], ],
} }
self._update_ef_vals(ef_vals) self._update_ef_vals(ef_vals)
ebics_file = self.env['ebics.file'].create(ef_vals) ebics_file = self.env["ebics.file"].create(ef_vals)
except EbicsFunctionalError: except EbicsFunctionalError:
e = exc_info() e = exc_info()
self.note += '\n' self.note += "\n"
self.note += _("EBICS Functional Error:") self.note += _("EBICS Functional Error:")
self.note += '\n' self.note += "\n"
self.note += '%s (code: %s)' % (e[1].message, e[1].code) self.note += "{} (code: {})".format(e[1].message, e[1].code)
except EbicsTechnicalError: except EbicsTechnicalError:
e = exc_info() e = exc_info()
self.note += '\n' self.note += "\n"
self.note += _("EBICS Technical Error:") self.note += _("EBICS Technical Error:")
self.note += '\n' self.note += "\n"
self.note += '%s (code: %s)' % (e[1].message, e[1].code) self.note += "{} (code: {})".format(e[1].message, e[1].code)
except EbicsVerificationError: except EbicsVerificationError:
self.note += '\n' self.note += "\n"
self.note += _("EBICS Verification Error:") self.note += _("EBICS Verification Error:")
self.note += '\n' self.note += "\n"
self.note += _("The EBICS response could not be verified.") self.note += _("The EBICS response could not be verified.")
except Exception: except Exception:
self.note += '\n' self.note += "\n"
self.note += _("Unknown Error") self.note += _("Unknown Error")
tb = ''.join(format_exception(*exc_info())) tb = "".join(format_exception(*exc_info()))
self.note += '\n%s' % tb self.note += "\n%s" % tb
if self.ebics_config_id.ebics_version == 'H003': if self.ebics_config_id.ebics_version == "H003":
OrderID = self.ebics_config_id._get_order_number() OrderID = self.ebics_config_id._get_order_number()
self.ebics_config_id.sudo()._update_order_number(OrderID) self.ebics_config_id.sudo()._update_order_number(OrderID)
@ -353,33 +387,35 @@ class EbicsXfer(models.TransientModel):
self.ebics_config_id._check_ebics_keys() self.ebics_config_id._check_ebics_keys()
passphrase = self._get_passphrase() passphrase = self._get_passphrase()
keyring = EbicsKeyRing( keyring = EbicsKeyRing(
keys=self.ebics_userid_id.ebics_keys_fn, keys=self.ebics_userid_id.ebics_keys_fn, passphrase=passphrase
passphrase=passphrase) )
bank = EbicsBank( bank = EbicsBank(
keyring=keyring, keyring=keyring,
hostid=self.ebics_config_id.ebics_host, hostid=self.ebics_config_id.ebics_host,
url=self.ebics_config_id.ebics_url) url=self.ebics_config_id.ebics_url,
if self.ebics_config_id.ebics_version == 'H003': )
if self.ebics_config_id.ebics_version == "H003":
bank._order_number = self.ebics_config_id._get_order_number() bank._order_number = self.ebics_config_id._get_order_number()
user = EbicsUser( user = EbicsUser(
keyring=keyring, keyring=keyring,
partnerid=self.ebics_config_id.ebics_partner, partnerid=self.ebics_config_id.ebics_partner,
userid=self.ebics_userid_id.name) userid=self.ebics_userid_id.name,
signature_class = self.format_id.signature_class \ )
or self.ebics_userid_id.signature_class signature_class = (
if signature_class == 'T': self.format_id.signature_class or self.ebics_userid_id.signature_class
)
if signature_class == "T":
user.manual_approval = True user.manual_approval = True
try: try:
client = EbicsClient( client = EbicsClient(bank, user, version=self.ebics_config_id.ebics_version)
bank, user, version=self.ebics_config_id.ebics_version)
except Exception: except Exception:
self.note += '\n' self.note += "\n"
self.note += _("Unknown Error") self.note += _("Unknown Error")
tb = ''.join(format_exception(*exc_info())) tb = "".join(format_exception(*exc_info()))
self.note += '\n%s' % tb self.note += "\n%s" % tb
client = False client = False
return client return client
@ -390,19 +426,18 @@ class EbicsXfer(models.TransientModel):
if passphrase: if passphrase:
return passphrase return passphrase
module = __name__.split('addons.')[1].split('.')[0] module = __name__.split("addons.")[1].split(".")[0]
passphrase_view = self.env.ref( passphrase_view = self.env.ref("%s.ebics_xfer_view_form_passphrase" % module)
'%s.ebics_xfer_view_form_passphrase' % module)
return { return {
'name': _('EBICS file transfer'), "name": _("EBICS file transfer"),
'res_id': self.id, "res_id": self.id,
'view_type': 'form', "view_type": "form",
'view_mode': 'form', "view_mode": "form",
'res_model': 'ebics.xfer', "res_model": "ebics.xfer",
'view_id': passphrase_view.id, "view_id": passphrase_view.id,
'target': 'new', "target": "new",
'context': self._context, "context": self._context,
'type': 'ir.actions.act_window', "type": "ir.actions.act_window",
} }
def _file_format_methods(self): def _file_format_methods(self):
@ -411,10 +446,10 @@ class EbicsXfer(models.TransientModel):
for extra file formats. for extra file formats.
""" """
res = { res = {
'camt.xxx.cfonb120.stm': self._handle_cfonb120, "camt.xxx.cfonb120.stm": self._handle_cfonb120,
'camt.xxx.cfonb120.stm.rfi': self._handle_cfonb120, "camt.xxx.cfonb120.stm.rfi": self._handle_cfonb120,
'camt.052.001.02.stm': self._handle_camt052, "camt.052.001.02.stm": self._handle_camt052,
'camt.053.001.02.stm': self._handle_camt053, "camt.053.001.02.stm": self._handle_camt053,
} }
return res return res
@ -422,24 +457,24 @@ class EbicsXfer(models.TransientModel):
""" """
Adapt this method to customize the EBICS File values. Adapt this method to customize the EBICS File values.
""" """
if self.format_id and self.format_id.type == 'up': if self.format_id and self.format_id.type == "up":
fn = ef_vals['name'] fn = ef_vals["name"]
dups = self._check_duplicate_ebics_file( dups = self._check_duplicate_ebics_file(fn, self.format_id)
fn, self.format_id)
if dups: if dups:
n = 1 n = 1
fn = '_'.join([fn, str(n)]) fn = "_".join([fn, str(n)])
while self._check_duplicate_ebics_file(fn, self.format_id): while self._check_duplicate_ebics_file(fn, self.format_id):
n += 1 n += 1
fn = '_'.join([fn, str(n)]) fn = "_".join([fn, str(n)])
ef_vals['name'] = fn ef_vals["name"] = fn
def _handle_download_data(self, data, file_format): def _handle_download_data(self, data, file_format):
ebics_files = self.env['ebics.file'] ebics_files = self.env["ebics.file"]
if isinstance(data, dict): if isinstance(data, dict):
for doc in data: for doc in data:
ebics_files += self._create_ebics_file( ebics_files += self._create_ebics_file(
data[doc], file_format, docname=doc) data[doc], file_format, docname=doc
)
else: else:
ebics_files += self._create_ebics_file(data, file_format) ebics_files += self._create_ebics_file(data, file_format)
return ebics_files return ebics_files
@ -457,59 +492,61 @@ class EbicsXfer(models.TransientModel):
file format specific processing. file format specific processing.
""" """
ebics_files_root = self.ebics_config_id.ebics_files ebics_files_root = self.ebics_config_id.ebics_files
tmp_dir = os.path.normpath(ebics_files_root + '/tmp') tmp_dir = os.path.normpath(ebics_files_root + "/tmp")
if not os.path.isdir(tmp_dir): if not os.path.isdir(tmp_dir):
os.makedirs(tmp_dir, mode=0o700) os.makedirs(tmp_dir, mode=0o700)
fn_parts = [self.ebics_config_id.ebics_host, fn_parts = [self.ebics_config_id.ebics_host, self.ebics_config_id.ebics_partner]
self.ebics_config_id.ebics_partner]
if docname: if docname:
fn_parts.append(docname) fn_parts.append(docname)
else: else:
fn_date = self.date_to or fields.Date.today() fn_date = self.date_to or fields.Date.today()
fn_parts.append(fn_date.isoformat()) fn_parts.append(fn_date.isoformat())
base_fn = '_'.join(fn_parts) base_fn = "_".join(fn_parts)
n = 1 n = 1
full_tmp_fn = os.path.normpath(tmp_dir + '/' + base_fn) full_tmp_fn = os.path.normpath(tmp_dir + "/" + base_fn)
while os.path.exists(full_tmp_fn): while os.path.exists(full_tmp_fn):
n += 1 n += 1
tmp_fn = base_fn + '_' + str(n).rjust(3, '0') tmp_fn = base_fn + "_" + str(n).rjust(3, "0")
full_tmp_fn = os.path.normpath(tmp_dir + '/' + tmp_fn) full_tmp_fn = os.path.normpath(tmp_dir + "/" + tmp_fn)
with open(full_tmp_fn, 'wb') as f: with open(full_tmp_fn, "wb") as f:
f.write(data) f.write(data)
ff_methods = self._file_format_methods() ff_methods = self._file_format_methods()
if file_format.name in ff_methods: if file_format.name in ff_methods:
data = ff_methods[file_format.name](data) data = ff_methods[file_format.name](data)
fn = '.'.join([base_fn, file_format.suffix]) fn = ".".join([base_fn, file_format.suffix])
dups = self._check_duplicate_ebics_file(fn, file_format) dups = self._check_duplicate_ebics_file(fn, file_format)
if dups: if dups:
raise UserError(_( raise UserError(
"EBICS File with name '%s' has already been downloaded." _(
"\nPlease check this file and rename in case there is " "EBICS File with name '%s' has already been downloaded."
"no risk on duplicate transactions.") "\nPlease check this file and rename in case there is "
% fn) "no risk on duplicate transactions."
)
% fn
)
data = base64.encodebytes(data) data = base64.encodebytes(data)
ef_vals = { ef_vals = {
'name': fn, "name": fn,
'data': data, "data": data,
'date': fields.Datetime.now(), "date": fields.Datetime.now(),
'date_from': self.date_from, "date_from": self.date_from,
'date_to': self.date_to, "date_to": self.date_to,
'format_id': file_format.id, "format_id": file_format.id,
'user_id': self._uid, "user_id": self._uid,
'ebics_userid_id': self.ebics_userid_id.id, "ebics_userid_id": self.ebics_userid_id.id,
'company_ids': self.ebics_config_id.company_ids.ids, "company_ids": self.ebics_config_id.company_ids.ids,
} }
self._update_ef_vals(ef_vals) self._update_ef_vals(ef_vals)
ebics_file = self.env['ebics.file'].create(ef_vals) ebics_file = self.env["ebics.file"].create(ef_vals)
return ebics_file return ebics_file
def _check_duplicate_ebics_file(self, fn, file_format): def _check_duplicate_ebics_file(self, fn, file_format):
dups = self.env['ebics.file'].search( dups = self.env["ebics.file"].search(
[('name', '=', fn), [("name", "=", fn), ("format_id", "=", file_format.id)]
('format_id', '=', file_format.id)]) )
return dups return dups
def _detect_upload_format(self): def _detect_upload_format(self):
@ -517,31 +554,30 @@ class EbicsXfer(models.TransientModel):
Use this method in order to automatically detect and set the Use this method in order to automatically detect and set the
EBICS upload file format. EBICS upload file format.
""" """
pass
def _update_order_number(self, OrderID): def _update_order_number(self, OrderID):
o_list = list(OrderID) o_list = list(OrderID)
for i, c in enumerate(reversed(o_list), start=1): for i, c in enumerate(reversed(o_list), start=1):
if c == '9': if c == "9":
o_list[-i] = 'A' o_list[-i] = "A"
break break
if c == 'Z': if c == "Z":
continue continue
else: else:
o_list[-i] = chr(ord(c) + 1) o_list[-i] = chr(ord(c) + 1)
break break
next_nr = ''.join(o_list) next_nr = "".join(o_list)
if next_nr == 'ZZZZ': if next_nr == "ZZZZ":
next_nr = 'A000' next_nr = "A000"
self.ebics_config_id.order_number = next_nr self.ebics_config_id.order_number = next_nr
def _insert_line_terminator(self, data_in, line_len): def _insert_line_terminator(self, data_in, line_len):
data_in = data_in.replace(b'\n', b'').replace(b'\r', b'') data_in = data_in.replace(b"\n", b"").replace(b"\r", b"")
data_out = b'' data_out = b""
max_len = len(data_in) max_len = len(data_in)
i = 0 i = 0
while i + line_len <= max_len: while i + line_len <= max_len:
data_out += data_in[i:i + line_len] + b'\n' data_out += data_in[i : i + line_len] + b"\n"
i += line_len i += line_len
return data_out return data_out