From 953b60eb70e9c6c7756098b2a986a21f3df222c9 Mon Sep 17 00:00:00 2001 From: Luc De Meyer Date: Sat, 7 May 2022 18:42:03 +0200 Subject: [PATCH] [14.0]add account_ebics_batch --- account_ebics/__manifest__.py | 52 +-- account_ebics/wizards/ebics_xfer.py | 472 +++++++++++++++------------- 2 files changed, 280 insertions(+), 244 deletions(-) diff --git a/account_ebics/__manifest__.py b/account_ebics/__manifest__.py index b3daf19..2f0ad2f 100644 --- a/account_ebics/__manifest__.py +++ b/account_ebics/__manifest__.py @@ -2,31 +2,31 @@ # License LGPL-3 or later (http://www.gnu.org/licenses/lpgl). { - 'name': 'EBICS banking protocol', - 'version': '14.0.1.0.2', - 'license': 'LGPL-3', - 'author': 'Noviat', - 'website': 'www.noviat.com', - 'category': 'Accounting & Finance', - 'depends': ['account'], - 'data': [ - 'security/ebics_security.xml', - 'security/ir.model.access.csv', - 'data/ebics_file_format.xml', - 'views/ebics_config_views.xml', - 'views/ebics_file_views.xml', - 'views/ebics_userid_views.xml', - 'views/ebics_file_format_views.xml', - 'wizards/ebics_change_passphrase.xml', - 'wizards/ebics_xfer.xml', - 'views/menu.xml', + "name": "EBICS banking protocol", + "version": "14.0.1.0.3", + "license": "LGPL-3", + "author": "Noviat", + "website": "www.noviat.com", + "category": "Accounting & Finance", + "depends": ["account"], + "data": [ + "security/ebics_security.xml", + "security/ir.model.access.csv", + "data/ebics_file_format.xml", + "views/ebics_config_views.xml", + "views/ebics_file_views.xml", + "views/ebics_userid_views.xml", + "views/ebics_file_format_views.xml", + "wizards/ebics_change_passphrase.xml", + "wizards/ebics_xfer.xml", + "views/menu.xml", ], - 'installable': True, - 'application': True, - 'external_dependencies': { - 'python': [ - 'fintech', - 'cryptography', - ] - }, + "installable": True, + "application": True, + "external_dependencies": { + "python": [ + "fintech", + "cryptography", + ] + }, } diff --git a/account_ebics/wizards/ebics_xfer.py b/account_ebics/wizards/ebics_xfer.py index 0ed09f0..5493056 100644 --- a/account_ebics/wizards/ebics_xfer.py +++ b/account_ebics/wizards/ebics_xfer.py @@ -14,125 +14,146 @@ import os from sys import exc_info from traceback import format_exception -from odoo import api, fields, models, _ +from odoo import _, api, fields, models from odoo.exceptions import UserError _logger = logging.getLogger(__name__) try: import fintech - from fintech.ebics import EbicsKeyRing, EbicsBank, EbicsUser, EbicsClient,\ - EbicsFunctionalError, EbicsTechnicalError, EbicsVerificationError - fintech.cryptolib = 'cryptography' + from fintech.ebics import ( + EbicsBank, + EbicsClient, + EbicsFunctionalError, + EbicsKeyRing, + EbicsTechnicalError, + EbicsUser, + EbicsVerificationError, + ) + + fintech.cryptolib = "cryptography" except ImportError: EbicsBank = object - _logger.warning('Failed to import fintech') + _logger.warning("Failed to import fintech") class EbicsBank(EbicsBank): - def _next_order_id(self, partnerid): """ EBICS protocol version H003 requires generation of the OrderID. The OrderID must be a string between 'A000' and 'ZZZZ' and unique for each partner id. """ - return hasattr(self, '_order_number') and self._order_number or 'A000' + return hasattr(self, "_order_number") and self._order_number or "A000" class EbicsXfer(models.TransientModel): - _name = 'ebics.xfer' - _description = 'EBICS file transfer' + _name = "ebics.xfer" + _description = "EBICS file transfer" ebics_config_id = fields.Many2one( - comodel_name='ebics.config', - string='EBICS Configuration', - domain=[('state', '=', 'confirm')], - default=lambda self: self._default_ebics_config_id()) + comodel_name="ebics.config", + string="EBICS Configuration", + domain=[("state", "=", "confirm")], + default=lambda self: self._default_ebics_config_id(), + ) ebics_userid_id = fields.Many2one( - comodel_name='ebics.userid', - string='EBICS UserID') - ebics_passphrase = fields.Char( - string='EBICS Passphrase') + comodel_name="ebics.userid", string="EBICS UserID" + ) + ebics_passphrase = fields.Char(string="EBICS Passphrase") date_from = fields.Date() date_to = fields.Date() - upload_data = fields.Binary(string='File to Upload') - upload_fname = fields.Char( - string='Upload Filename', default='') + upload_data = fields.Binary(string="File to Upload") + upload_fname = fields.Char(string="Upload Filename", default="") upload_fname_dummy = fields.Char( - related='upload_fname', string='Upload Filename', readonly=True) + related="upload_fname", string="Upload Filename", readonly=True + ) format_id = fields.Many2one( - comodel_name='ebics.file.format', - string='EBICS File Format', + comodel_name="ebics.file.format", + string="EBICS File Format", help="Select EBICS File Format to upload/download." - "\nLeave blank to download all available files.") + "\nLeave blank to download all available files.", + ) allowed_format_ids = fields.Many2many( - related='ebics_config_id.ebics_file_format_ids', - string='Allowed EBICS File Formats') + related="ebics_config_id.ebics_file_format_ids", + string="Allowed EBICS File Formats", + ) order_type = fields.Char( - related='format_id.order_type', - string='Order Type', + related="format_id.order_type", + string="Order Type", help="For most banks in France you should use the " - "format neutral Order Types 'FUL' for upload " - "and 'FDL' for download.") + "format neutral Order Types 'FUL' for upload " + "and 'FDL' for download.", + ) test_mode = fields.Boolean( - string='Test Mode', + string="Test Mode", help="Select this option to test if the syntax of " - "the upload file is correct." - "\nThis option is only available for " - "Order Type 'FUL'.") - note = fields.Text(string='EBICS file transfer Log', readonly=True) + "the upload file is correct." + "\nThis option is only available for " + "Order Type 'FUL'.", + ) + note = fields.Text(string="EBICS file transfer Log", readonly=True) @api.model def _default_ebics_config_id(self): - cfg_mod = self.env['ebics.config'] + cfg_mod = self.env["ebics.config"] cfg = cfg_mod.search( - [('company_ids', 'in', self.env.user.company_ids.ids), - ('state', '=', 'confirm')]) + [ + ("company_ids", "in", self.env.user.company_ids.ids), + ("state", "=", "confirm"), + ] + ) if cfg and len(cfg) == 1: return cfg else: return cfg_mod - @api.onchange('ebics_config_id') + @api.onchange("ebics_config_id") def _onchange_ebics_config_id(self): ebics_userids = self.ebics_config_id.ebics_userid_ids - if self._context.get('ebics_download'): - download_formats = self.ebics_config_id.ebics_file_format_ids\ - .filtered(lambda r: r.type == 'down') + if self._context.get("ebics_download"): + download_formats = self.ebics_config_id.ebics_file_format_ids.filtered( + lambda r: r.type == "down" + ) if len(download_formats) == 1: self.format_id = download_formats if len(ebics_userids) == 1: self.ebics_userid_id = ebics_userids else: transport_users = ebics_userids.filtered( - lambda r: r.signature_class == 'T') + lambda r: r.signature_class == "T" + ) if len(transport_users) == 1: self.ebics_userid_id = transport_users else: - upload_formats = self.ebics_config_id.ebics_file_format_ids\ - .filtered(lambda r: r.type == 'up') + upload_formats = self.ebics_config_id.ebics_file_format_ids.filtered( + lambda r: r.type == "up" + ) if len(upload_formats) == 1: self.format_id = upload_formats if len(ebics_userids) == 1: self.ebics_userid_id = ebics_userids - @api.onchange('upload_data') + @api.onchange("upload_data") def _onchange_upload_data(self): self.upload_fname_dummy = self.upload_fname self.format_id = False self._detect_upload_format() if not self.format_id: - upload_formats = self.format_id \ + upload_formats = ( + self.format_id or self.ebics_config_id.ebics_file_format_ids.filtered( - lambda r: r.type == 'up') + lambda r: r.type == "up" + ) + ) if len(upload_formats) > 1: upload_formats = upload_formats.filtered( - lambda r: self.upload_fname.endswith(r.suffix)) + lambda r: self.upload_fname.endswith(r.suffix) + ) if len(upload_formats) == 1: self.format_id = upload_formats - @api.onchange('format_id') + @api.onchange("format_id") def _onchange_format_id(self): self.order_type = self.format_id.order_type @@ -141,138 +162,152 @@ class EbicsXfer(models.TransientModel): ctx = self._context.copy() ebics_file = self._ebics_upload() if ebics_file: - ctx['ebics_file_id'] = ebics_file.id - module = __name__.split('addons.')[1].split('.')[0] - result_view = self.env.ref( - '%s.ebics_xfer_view_form_result' % module) + ctx["ebics_file_id"] = ebics_file.id + module = __name__.split("addons.")[1].split(".")[0] + result_view = self.env.ref("%s.ebics_xfer_view_form_result" % module) return { - 'name': _('EBICS file transfer result'), - 'res_id': self.id, - 'view_type': 'form', - 'view_mode': 'form', - 'res_model': 'ebics.xfer', - 'view_id': result_view.id, - 'target': 'new', - 'context': ctx, - 'type': 'ir.actions.act_window', + "name": _("EBICS file transfer result"), + "res_id": self.id, + "view_type": "form", + "view_mode": "form", + "res_model": "ebics.xfer", + "view_id": result_view.id, + "target": "new", + "context": ctx, + "type": "ir.actions.act_window", } def ebics_download(self): self.ensure_one() self.ebics_config_id._check_ebics_files() - ctx = self._context.copy() - self.note = '' + ctx = self.env.context.copy() + self.note = "" + err_cnt = 0 client = self._setup_client() - if client: + if not client: + err_cnt += 1 + self.note += ( + _("EBICS client setup failed for connection '%s'") + % self.ebics_config_id.name + ) + else: download_formats = ( self.format_id or self.ebics_config_id.ebics_file_format_ids.filtered( - lambda r: r.type == 'down' + lambda r: r.type == "down" ) ) - ebics_files = self.env['ebics.file'] + ebics_files = self.env["ebics.file"] date_from = self.date_from and self.date_from.isoformat() or None date_to = self.date_to and self.date_to.isoformat() or None for df in download_formats: try: success = False - if df.order_type == 'FDL': + if df.order_type == "FDL": data = client.FDL(df.name, date_from, date_to) else: params = None if date_from and date_to: - params = {'DateRange': { - 'Start': date_from, - 'End': date_to, - }} + params = { + "DateRange": { + "Start": date_from, + "End": date_to, + } + } data = client.download(df.order_type, params=params) ebics_files += self._handle_download_data(data, df) success = True except EbicsFunctionalError: + err_cnt += 1 e = exc_info() - self.note += '\n' + self.note += "\n" self.note += _( "EBICS Functional Error during download of File Format %s (%s):" ) % (df.name, df.order_type) - self.note += '\n' - self.note += '%s (code: %s)' % (e[1].message, e[1].code) + self.note += "\n" + self.note += "{} (code: {})".format(e[1].message, e[1].code) except EbicsTechnicalError: + err_cnt += 1 e = exc_info() - self.note += '\n' + self.note += "\n" self.note += _( "EBICS Technical Error during download of File Format %s (%s):" ) % (df.name, df.order_type) - self.note += '\n' - self.note += '%s (code: %s)' % (e[1].message, e[1].code) + self.note += "\n" + self.note += "{} (code: {})".format(e[1].message, e[1].code) except EbicsVerificationError: - self.note += '\n' + err_cnt += 1 + self.note += "\n" self.note += _( "EBICS Verification Error during download of " "File Format %s (%s):" ) % (df.name, df.order_type) - self.note += '\n' + self.note += "\n" self.note += _("The EBICS response could not be verified.") except UserError as e: - self.note += '\n' + self.note += "\n" self.note += _( "Warning during download of File Format %s (%s):" ) % (df.name, df.order_type) - self.note += '\n' + self.note += "\n" self.note += e.name except Exception: - self.note += '\n' + err_cnt += 1 + self.note += "\n" self.note += _( "Unknown Error during download of File Format %s (%s):" ) % (df.name, df.order_type) - tb = ''.join(format_exception(*exc_info())) - self.note += '\n%s' % tb + tb = "".join(format_exception(*exc_info())) + self.note += "\n%s" % tb else: # mark received data so that it is not included in further # downloads trans_id = client.last_trans_id client.confirm_download(trans_id=trans_id, success=success) - ctx['ebics_file_ids'] = ebics_files._ids + ctx["ebics_file_ids"] = ebics_files.ids if ebics_files: - self.note += '\n' + self.note += "\n" for f in ebics_files: - self.note += _( - "EBICS File '%s' is available for further processing." - ) % f.name - self.note += '\n' + self.note += ( + _("EBICS File '%s' is available for further processing.") + % f.name + ) + self.note += "\n" - module = __name__.split('addons.')[1].split('.')[0] - result_view = self.env.ref( - '%s.ebics_xfer_view_form_result' % module) + ctx["err_cnt"] = err_cnt + module = __name__.split("addons.")[1].split(".")[0] + result_view = self.env.ref("%s.ebics_xfer_view_form_result" % module) return { - 'name': _('EBICS file transfer result'), - 'res_id': self.id, - 'view_type': 'form', - 'view_mode': 'form', - 'res_model': 'ebics.xfer', - 'view_id': result_view.id, - 'target': 'new', - 'context': ctx, - 'type': 'ir.actions.act_window', + "name": _("EBICS file transfer result"), + "res_id": self.id, + "view_type": "form", + "view_mode": "form", + "res_model": "ebics.xfer", + "view_id": result_view.id, + "target": "new", + "context": ctx, + "type": "ir.actions.act_window", } def button_close(self): self.ensure_one() - return {'type': 'ir.actions.act_window_close'} + return {"type": "ir.actions.act_window_close"} def view_ebics_file(self): self.ensure_one() - module = __name__.split('addons.')[1].split('.')[0] - act = self.env['ir.actions.act_window']._for_xml_id( - '{}.ebics_file_action_download'.format(module)) - act['domain'] = [('id', 'in', self._context['ebics_file_ids'])] + module = __name__.split("addons.")[1].split(".")[0] + act = self.env["ir.actions.act_window"]._for_xml_id( + "{}.ebics_file_action_download".format(module) + ) + act["domain"] = [("id", "in", self._context["ebics_file_ids"])] return act def _ebics_upload(self): self.ensure_one() - ebics_file = self.env['ebics.file'] - self.note = '' + ebics_file = self.env["ebics.file"] + self.note = "" client = self._setup_client() if client: upload_data = base64.decodestring(self.upload_data) @@ -280,70 +315,69 @@ class EbicsXfer(models.TransientModel): OrderID = False try: order_type = self.order_type - if order_type == 'FUL': + if order_type == "FUL": kwargs = {} bank = self.ebics_config_id.journal_ids[0].bank_id cc = bank.country.code if cc: - kwargs['country'] = cc + kwargs["country"] = cc if self.test_mode: - kwargs['TEST'] = 'TRUE' + kwargs["TEST"] = "TRUE" OrderID = client.FUL(ef_format.name, upload_data, **kwargs) else: OrderID = client.upload(order_type, upload_data) if OrderID: - self.note += '\n' - self.note += _( - "EBICS File has been uploaded (OrderID %s)." - ) % OrderID + self.note += "\n" + self.note += ( + _("EBICS File has been uploaded (OrderID %s).") % OrderID + ) ef_note = _("EBICS OrderID: %s") % OrderID - if self.env.context.get('origin'): - ef_note += '\n' + _( - "Origin: %s") % self._context['origin'] + if self.env.context.get("origin"): + ef_note += "\n" + _("Origin: %s") % self._context["origin"] suffix = self.format_id.suffix fn = self.upload_fname if not fn.endswith(suffix): - fn = '.'.join([fn, suffix]) + fn = ".".join([fn, suffix]) ef_vals = { - 'name': self.upload_fname, - 'data': self.upload_data, - 'date': fields.Datetime.now(), - 'format_id': self.format_id.id, - 'state': 'done', - 'user_id': self._uid, - 'ebics_userid_id': self.ebics_userid_id.id, - 'note': ef_note, + "name": self.upload_fname, + "data": self.upload_data, + "date": fields.Datetime.now(), + "format_id": self.format_id.id, + "state": "done", + "user_id": self._uid, + "ebics_userid_id": self.ebics_userid_id.id, + "note": ef_note, "company_ids": [ self.env.context.get("force_company", self.env.company.id) ], } self._update_ef_vals(ef_vals) - ebics_file = self.env['ebics.file'].create(ef_vals) + ebics_file = self.env["ebics.file"].create(ef_vals) except EbicsFunctionalError: e = exc_info() - self.note += '\n' + self.note += "\n" self.note += _("EBICS Functional Error:") - self.note += '\n' - self.note += '%s (code: %s)' % (e[1].message, e[1].code) + self.note += "\n" + self.note += "{} (code: {})".format(e[1].message, e[1].code) except EbicsTechnicalError: e = exc_info() - self.note += '\n' + self.note += "\n" self.note += _("EBICS Technical Error:") - self.note += '\n' - self.note += '%s (code: %s)' % (e[1].message, e[1].code) + self.note += "\n" + self.note += "{} (code: {})".format(e[1].message, e[1].code) except EbicsVerificationError: - self.note += '\n' + self.note += "\n" self.note += _("EBICS Verification Error:") - self.note += '\n' + self.note += "\n" self.note += _("The EBICS response could not be verified.") except Exception: - self.note += '\n' + self.note += "\n" self.note += _("Unknown Error") - tb = ''.join(format_exception(*exc_info())) - self.note += '\n%s' % tb + tb = "".join(format_exception(*exc_info())) + self.note += "\n%s" % tb - if self.ebics_config_id.ebics_version == 'H003': + if self.ebics_config_id.ebics_version == "H003": OrderID = self.ebics_config_id._get_order_number() self.ebics_config_id.sudo()._update_order_number(OrderID) @@ -353,33 +387,35 @@ class EbicsXfer(models.TransientModel): self.ebics_config_id._check_ebics_keys() passphrase = self._get_passphrase() keyring = EbicsKeyRing( - keys=self.ebics_userid_id.ebics_keys_fn, - passphrase=passphrase) + keys=self.ebics_userid_id.ebics_keys_fn, passphrase=passphrase + ) bank = EbicsBank( keyring=keyring, hostid=self.ebics_config_id.ebics_host, - url=self.ebics_config_id.ebics_url) - if self.ebics_config_id.ebics_version == 'H003': + url=self.ebics_config_id.ebics_url, + ) + if self.ebics_config_id.ebics_version == "H003": bank._order_number = self.ebics_config_id._get_order_number() user = EbicsUser( keyring=keyring, partnerid=self.ebics_config_id.ebics_partner, - userid=self.ebics_userid_id.name) - signature_class = self.format_id.signature_class \ - or self.ebics_userid_id.signature_class - if signature_class == 'T': + userid=self.ebics_userid_id.name, + ) + signature_class = ( + self.format_id.signature_class or self.ebics_userid_id.signature_class + ) + if signature_class == "T": user.manual_approval = True try: - client = EbicsClient( - bank, user, version=self.ebics_config_id.ebics_version) + client = EbicsClient(bank, user, version=self.ebics_config_id.ebics_version) except Exception: - self.note += '\n' + self.note += "\n" self.note += _("Unknown Error") - tb = ''.join(format_exception(*exc_info())) - self.note += '\n%s' % tb + tb = "".join(format_exception(*exc_info())) + self.note += "\n%s" % tb client = False return client @@ -390,19 +426,18 @@ class EbicsXfer(models.TransientModel): if passphrase: return passphrase - module = __name__.split('addons.')[1].split('.')[0] - passphrase_view = self.env.ref( - '%s.ebics_xfer_view_form_passphrase' % module) + module = __name__.split("addons.")[1].split(".")[0] + passphrase_view = self.env.ref("%s.ebics_xfer_view_form_passphrase" % module) return { - 'name': _('EBICS file transfer'), - 'res_id': self.id, - 'view_type': 'form', - 'view_mode': 'form', - 'res_model': 'ebics.xfer', - 'view_id': passphrase_view.id, - 'target': 'new', - 'context': self._context, - 'type': 'ir.actions.act_window', + "name": _("EBICS file transfer"), + "res_id": self.id, + "view_type": "form", + "view_mode": "form", + "res_model": "ebics.xfer", + "view_id": passphrase_view.id, + "target": "new", + "context": self._context, + "type": "ir.actions.act_window", } def _file_format_methods(self): @@ -411,10 +446,10 @@ class EbicsXfer(models.TransientModel): for extra file formats. """ res = { - 'camt.xxx.cfonb120.stm': self._handle_cfonb120, - 'camt.xxx.cfonb120.stm.rfi': self._handle_cfonb120, - 'camt.052.001.02.stm': self._handle_camt052, - 'camt.053.001.02.stm': self._handle_camt053, + "camt.xxx.cfonb120.stm": self._handle_cfonb120, + "camt.xxx.cfonb120.stm.rfi": self._handle_cfonb120, + "camt.052.001.02.stm": self._handle_camt052, + "camt.053.001.02.stm": self._handle_camt053, } return res @@ -422,24 +457,24 @@ class EbicsXfer(models.TransientModel): """ Adapt this method to customize the EBICS File values. """ - if self.format_id and self.format_id.type == 'up': - fn = ef_vals['name'] - dups = self._check_duplicate_ebics_file( - fn, self.format_id) + if self.format_id and self.format_id.type == "up": + fn = ef_vals["name"] + dups = self._check_duplicate_ebics_file(fn, self.format_id) if dups: n = 1 - fn = '_'.join([fn, str(n)]) + fn = "_".join([fn, str(n)]) while self._check_duplicate_ebics_file(fn, self.format_id): n += 1 - fn = '_'.join([fn, str(n)]) - ef_vals['name'] = fn + fn = "_".join([fn, str(n)]) + ef_vals["name"] = fn def _handle_download_data(self, data, file_format): - ebics_files = self.env['ebics.file'] + ebics_files = self.env["ebics.file"] if isinstance(data, dict): for doc in data: ebics_files += self._create_ebics_file( - data[doc], file_format, docname=doc) + data[doc], file_format, docname=doc + ) else: ebics_files += self._create_ebics_file(data, file_format) return ebics_files @@ -457,59 +492,61 @@ class EbicsXfer(models.TransientModel): file format specific processing. """ ebics_files_root = self.ebics_config_id.ebics_files - tmp_dir = os.path.normpath(ebics_files_root + '/tmp') + tmp_dir = os.path.normpath(ebics_files_root + "/tmp") if not os.path.isdir(tmp_dir): os.makedirs(tmp_dir, mode=0o700) - fn_parts = [self.ebics_config_id.ebics_host, - self.ebics_config_id.ebics_partner] + fn_parts = [self.ebics_config_id.ebics_host, self.ebics_config_id.ebics_partner] if docname: fn_parts.append(docname) else: fn_date = self.date_to or fields.Date.today() fn_parts.append(fn_date.isoformat()) - base_fn = '_'.join(fn_parts) + base_fn = "_".join(fn_parts) n = 1 - full_tmp_fn = os.path.normpath(tmp_dir + '/' + base_fn) + full_tmp_fn = os.path.normpath(tmp_dir + "/" + base_fn) while os.path.exists(full_tmp_fn): n += 1 - tmp_fn = base_fn + '_' + str(n).rjust(3, '0') - full_tmp_fn = os.path.normpath(tmp_dir + '/' + tmp_fn) + tmp_fn = base_fn + "_" + str(n).rjust(3, "0") + full_tmp_fn = os.path.normpath(tmp_dir + "/" + tmp_fn) - with open(full_tmp_fn, 'wb') as f: + with open(full_tmp_fn, "wb") as f: f.write(data) ff_methods = self._file_format_methods() if file_format.name in ff_methods: data = ff_methods[file_format.name](data) - fn = '.'.join([base_fn, file_format.suffix]) + fn = ".".join([base_fn, file_format.suffix]) dups = self._check_duplicate_ebics_file(fn, file_format) if dups: - raise UserError(_( - "EBICS File with name '%s' has already been downloaded." - "\nPlease check this file and rename in case there is " - "no risk on duplicate transactions.") - % fn) + raise UserError( + _( + "EBICS File with name '%s' has already been downloaded." + "\nPlease check this file and rename in case there is " + "no risk on duplicate transactions." + ) + % fn + ) data = base64.encodebytes(data) ef_vals = { - 'name': fn, - 'data': data, - 'date': fields.Datetime.now(), - 'date_from': self.date_from, - 'date_to': self.date_to, - 'format_id': file_format.id, - 'user_id': self._uid, - 'ebics_userid_id': self.ebics_userid_id.id, - 'company_ids': self.ebics_config_id.company_ids.ids, + "name": fn, + "data": data, + "date": fields.Datetime.now(), + "date_from": self.date_from, + "date_to": self.date_to, + "format_id": file_format.id, + "user_id": self._uid, + "ebics_userid_id": self.ebics_userid_id.id, + "company_ids": self.ebics_config_id.company_ids.ids, } self._update_ef_vals(ef_vals) - ebics_file = self.env['ebics.file'].create(ef_vals) + ebics_file = self.env["ebics.file"].create(ef_vals) return ebics_file def _check_duplicate_ebics_file(self, fn, file_format): - dups = self.env['ebics.file'].search( - [('name', '=', fn), - ('format_id', '=', file_format.id)]) + dups = self.env["ebics.file"].search( + [("name", "=", fn), ("format_id", "=", file_format.id)] + ) return dups def _detect_upload_format(self): @@ -517,31 +554,30 @@ class EbicsXfer(models.TransientModel): Use this method in order to automatically detect and set the EBICS upload file format. """ - pass def _update_order_number(self, OrderID): o_list = list(OrderID) for i, c in enumerate(reversed(o_list), start=1): - if c == '9': - o_list[-i] = 'A' + if c == "9": + o_list[-i] = "A" break - if c == 'Z': + if c == "Z": continue else: o_list[-i] = chr(ord(c) + 1) break - next_nr = ''.join(o_list) - if next_nr == 'ZZZZ': - next_nr = 'A000' + next_nr = "".join(o_list) + if next_nr == "ZZZZ": + next_nr = "A000" self.ebics_config_id.order_number = next_nr def _insert_line_terminator(self, data_in, line_len): - data_in = data_in.replace(b'\n', b'').replace(b'\r', b'') - data_out = b'' + data_in = data_in.replace(b"\n", b"").replace(b"\r", b"") + data_out = b"" max_len = len(data_in) i = 0 while i + line_len <= max_len: - data_out += data_in[i:i + line_len] + b'\n' + data_out += data_in[i : i + line_len] + b"\n" i += line_len return data_out