odoo_account_ebics/account_ebics/wizards/ebics_xfer.py

568 lines
21 KiB
Python
Raw Normal View History

2021-01-06 20:22:11 +00:00
# Copyright 2009-2021 Noviat.
2019-12-22 09:54:00 +00:00
# License LGPL-3 or later (http://www.gnu.org/licenses/lpgl).
2018-08-14 15:04:39 +00:00
"""
import logging
logging.basicConfig(
level=logging.DEBUG,
format='[%(asctime)s] %(levelname)s - %(name)s: %(message)s')
"""
import base64
import logging
import os
from sys import exc_info
from traceback import format_exception
from odoo import api, fields, models, _
from odoo.exceptions import UserError
_logger = logging.getLogger(__name__)
try:
import fintech
from fintech.ebics import EbicsKeyRing, EbicsBank, EbicsUser, EbicsClient,\
EbicsFunctionalError, EbicsTechnicalError, EbicsVerificationError
fintech.cryptolib = 'cryptography'
except ImportError:
EbicsBank = object
_logger.warning('Failed to import fintech')
class EbicsBank(EbicsBank):
def _next_order_id(self, partnerid):
"""
EBICS protocol version H003 requires generation of the OrderID.
The OrderID must be a string between 'A000' and 'ZZZZ' and
unique for each partner id.
"""
return hasattr(self, '_order_number') and self._order_number or 'A000'
class EbicsXfer(models.TransientModel):
_name = 'ebics.xfer'
_description = 'EBICS file transfer'
ebics_config_id = fields.Many2one(
comodel_name='ebics.config',
string='EBICS Configuration',
domain=[('state', '=', 'confirm')],
2018-08-14 15:04:39 +00:00
default=lambda self: self._default_ebics_config_id())
ebics_userid_id = fields.Many2one(
comodel_name='ebics.userid',
string='EBICS UserID')
2018-08-14 15:04:39 +00:00
ebics_passphrase = fields.Char(
string='EBICS Passphrase')
date_from = fields.Date()
date_to = fields.Date()
upload_data = fields.Binary(string='File to Upload')
upload_fname = fields.Char(
string='Upload Filename', default='')
upload_fname_dummy = fields.Char(
related='upload_fname', string='Upload Filename', readonly=True)
format_id = fields.Many2one(
comodel_name='ebics.file.format',
string='EBICS File Format',
help="Select EBICS File Format to upload/download."
"\nLeave blank to download all available files.")
2020-12-14 17:32:49 +00:00
allowed_format_ids = fields.Many2many(
related='ebics_config_id.ebics_file_format_ids',
string='Allowed EBICS File Formats')
2020-12-03 21:36:30 +00:00
order_type = fields.Char(
related='format_id.order_type',
2018-08-14 15:04:39 +00:00
string='Order Type',
2020-12-03 21:36:30 +00:00
help="For most banks in France you should use the "
2018-08-14 15:04:39 +00:00
"format neutral Order Types 'FUL' for upload "
"and 'FDL' for download.")
test_mode = fields.Boolean(
string='Test Mode',
help="Select this option to test if the syntax of "
"the upload file is correct."
"\nThis option is only available for "
"Order Type 'FUL'.")
note = fields.Text(string='EBICS file transfer Log', readonly=True)
@api.model
def _default_ebics_config_id(self):
cfg_mod = self.env['ebics.config']
cfg = cfg_mod.search(
[('company_ids', 'in', self.env.user.company_ids.ids),
('state', '=', 'confirm')])
2018-08-14 15:04:39 +00:00
if cfg and len(cfg) == 1:
return cfg
else:
return cfg_mod
@api.onchange('ebics_config_id')
def _onchange_ebics_config_id(self):
ebics_userids = self.ebics_config_id.ebics_userid_ids
2018-08-14 15:04:39 +00:00
if self._context.get('ebics_download'):
download_formats = self.ebics_config_id.ebics_file_format_ids\
.filtered(lambda r: r.type == 'down')
if len(download_formats) == 1:
self.format_id = download_formats
if len(ebics_userids) == 1:
self.ebics_userid_id = ebics_userids
else:
transport_users = ebics_userids.filtered(
lambda r: r.signature_class == 'T')
if len(transport_users) == 1:
self.ebics_userid_id = transport_users
2018-08-14 15:04:39 +00:00
else:
upload_formats = self.ebics_config_id.ebics_file_format_ids\
.filtered(lambda r: r.type == 'up')
if len(upload_formats) == 1:
self.format_id = upload_formats
if len(ebics_userids) == 1:
self.ebics_userid_id = ebics_userids
2018-08-14 15:04:39 +00:00
@api.onchange('upload_data')
def _onchange_upload_data(self):
self.upload_fname_dummy = self.upload_fname
self.format_id = False
2018-08-14 15:04:39 +00:00
self._detect_upload_format()
if not self.format_id:
upload_formats = self.format_id \
or self.ebics_config_id.ebics_file_format_ids.filtered(
lambda r: r.type == 'up')
if len(upload_formats) > 1:
upload_formats = upload_formats.filtered(
lambda r: self.upload_fname.endswith(r.suffix))
if len(upload_formats) == 1:
self.format_id = upload_formats
2018-08-14 15:04:39 +00:00
@api.onchange('format_id')
def _onchange_format_id(self):
self.order_type = self.format_id.order_type
def ebics_upload(self):
self.ensure_one()
ctx = self._context.copy()
ebics_file = self._ebics_upload()
if ebics_file:
ctx['ebics_file_id'] = ebics_file.id
module = __name__.split('addons.')[1].split('.')[0]
result_view = self.env.ref(
'%s.ebics_xfer_view_form_result' % module)
return {
'name': _('EBICS file transfer result'),
'res_id': self.id,
'view_type': 'form',
'view_mode': 'form',
'res_model': 'ebics.xfer',
'view_id': result_view.id,
'target': 'new',
'context': ctx,
'type': 'ir.actions.act_window',
}
def ebics_download(self):
self.ensure_one()
self.ebics_config_id._check_ebics_files()
ctx = self._context.copy()
self.note = ''
client = self._setup_client()
if client:
download_formats = (
self.format_id
or self.ebics_config_id.ebics_file_format_ids.filtered(
lambda r: r.type == 'down'
)
)
2018-08-14 15:04:39 +00:00
ebics_files = self.env['ebics.file']
2020-12-03 21:36:30 +00:00
date_from = self.date_from and self.date_from.isoformat() or None
date_to = self.date_to and self.date_to.isoformat() or None
for df in download_formats:
try:
success = False
if df.order_type == 'FDL':
data = client.FDL(df.name, date_from, date_to)
else:
params = None
if date_from and date_to:
params = {'DateRange': {
'Start': date_from,
'End': date_to,
}}
data = client.download(df.order_type, params=params)
ebics_files += self._handle_download_data(data, df)
success = True
except EbicsFunctionalError:
e = exc_info()
self.note += '\n'
self.note += _(
"EBICS Functional Error during download of File Format %s (%s):"
) % (df.name, df.order_type)
self.note += '\n'
self.note += '%s (code: %s)' % (e[1].message, e[1].code)
except EbicsTechnicalError:
e = exc_info()
self.note += '\n'
self.note += _(
"EBICS Technical Error during download of File Format %s (%s):"
) % (df.name, df.order_type)
self.note += '\n'
self.note += '%s (code: %s)' % (e[1].message, e[1].code)
except EbicsVerificationError:
self.note += '\n'
self.note += _(
"EBICS Verification Error during download of "
"File Format %s (%s):"
) % (df.name, df.order_type)
self.note += '\n'
self.note += _("The EBICS response could not be verified.")
except UserError as e:
self.note += '\n'
self.note += _(
"Warning during download of File Format %s (%s):"
) % (df.name, df.order_type)
self.note += '\n'
self.note += e.name
except Exception:
self.note += '\n'
self.note += _(
"Unknown Error during download of File Format %s (%s):"
) % (df.name, df.order_type)
tb = ''.join(format_exception(*exc_info()))
self.note += '\n%s' % tb
2018-08-14 15:04:39 +00:00
else:
# mark received data so that it is not included in further
# downloads
trans_id = client.last_trans_id
client.confirm_download(trans_id=trans_id, success=success)
2018-08-14 15:04:39 +00:00
ctx['ebics_file_ids'] = ebics_files._ids
if ebics_files:
self.note += '\n'
for f in ebics_files:
self.note += _(
"EBICS File '%s' is available for further processing."
) % f.name
self.note += '\n'
module = __name__.split('addons.')[1].split('.')[0]
result_view = self.env.ref(
'%s.ebics_xfer_view_form_result' % module)
return {
'name': _('EBICS file transfer result'),
'res_id': self.id,
'view_type': 'form',
'view_mode': 'form',
'res_model': 'ebics.xfer',
'view_id': result_view.id,
'target': 'new',
'context': ctx,
'type': 'ir.actions.act_window',
}
def button_close(self):
self.ensure_one()
return {'type': 'ir.actions.act_window_close'}
def view_ebics_file(self):
self.ensure_one()
module = __name__.split('addons.')[1].split('.')[0]
2020-12-14 17:32:49 +00:00
act = self.env['ir.actions.act_window']._for_xml_id(
'{}.ebics_file_action_download'.format(module))
2018-08-14 15:04:39 +00:00
act['domain'] = [('id', 'in', self._context['ebics_file_ids'])]
return act
def _ebics_upload(self):
self.ensure_one()
ebics_file = self.env['ebics.file']
self.note = ''
client = self._setup_client()
if client:
upload_data = base64.decodestring(self.upload_data)
ef_format = self.format_id
OrderID = False
try:
order_type = self.order_type
2018-08-14 15:04:39 +00:00
if order_type == 'FUL':
kwargs = {}
2020-07-14 21:18:32 +00:00
bank = self.ebics_config_id.journal_ids[0].bank_id
cc = bank.country.code
if cc:
kwargs['country'] = cc
2018-08-14 15:04:39 +00:00
if self.test_mode:
kwargs['TEST'] = 'TRUE'
OrderID = client.FUL(ef_format.name, upload_data, **kwargs)
2018-08-14 15:04:39 +00:00
else:
2020-12-03 21:36:30 +00:00
OrderID = client.upload(order_type, upload_data)
2018-08-14 15:04:39 +00:00
if OrderID:
self.note += '\n'
self.note += _(
"EBICS File has been uploaded (OrderID %s)."
) % OrderID
ef_note = _("EBICS OrderID: %s") % OrderID
2020-07-14 21:18:32 +00:00
if self.env.context.get('origin'):
2018-08-14 15:04:39 +00:00
ef_note += '\n' + _(
"Origin: %s") % self._context['origin']
suffix = self.format_id.suffix
fn = self.upload_fname
if not fn.endswith(suffix):
2021-01-30 18:16:52 +00:00
fn = '.'.join([fn, suffix])
2018-08-14 15:04:39 +00:00
ef_vals = {
'name': self.upload_fname,
'data': self.upload_data,
'date': fields.Datetime.now(),
'format_id': self.format_id.id,
'state': 'done',
'user_id': self._uid,
'ebics_userid_id': self.ebics_userid_id.id,
2018-08-14 15:04:39 +00:00
'note': ef_note,
2021-01-06 20:22:11 +00:00
"company_ids": [
self.env.context.get("force_company", self.env.company.id)
],
2018-08-14 15:04:39 +00:00
}
self._update_ef_vals(ef_vals)
ebics_file = self.env['ebics.file'].create(ef_vals)
except EbicsFunctionalError:
e = exc_info()
self.note += '\n'
self.note += _("EBICS Functional Error:")
self.note += '\n'
self.note += '%s (code: %s)' % (e[1].message, e[1].code)
except EbicsTechnicalError:
e = exc_info()
self.note += '\n'
self.note += _("EBICS Technical Error:")
self.note += '\n'
self.note += '%s (code: %s)' % (e[1].message, e[1].code)
except EbicsVerificationError:
self.note += '\n'
self.note += _("EBICS Verification Error:")
self.note += '\n'
self.note += _("The EBICS response could not be verified.")
except Exception:
2018-08-14 15:04:39 +00:00
self.note += '\n'
self.note += _("Unknown Error")
tb = ''.join(format_exception(*exc_info()))
self.note += '\n%s' % tb
if self.ebics_config_id.ebics_version == 'H003' and OrderID:
self.ebics_config_id._update_order_number(OrderID)
return ebics_file
def _setup_client(self):
self.ebics_config_id._check_ebics_keys()
passphrase = self._get_passphrase()
keyring = EbicsKeyRing(
2020-07-14 21:18:32 +00:00
keys=self.ebics_userid_id.ebics_keys_fn,
2018-08-14 15:04:39 +00:00
passphrase=passphrase)
bank = EbicsBank(
keyring=keyring,
hostid=self.ebics_config_id.ebics_host,
url=self.ebics_config_id.ebics_url)
if self.ebics_config_id.ebics_version == 'H003':
bank._order_number = self.ebics_config_id._get_order_number()
user = EbicsUser(
keyring=keyring,
partnerid=self.ebics_config_id.ebics_partner,
2020-07-14 21:18:32 +00:00
userid=self.ebics_userid_id.name)
2018-08-14 15:04:39 +00:00
signature_class = self.format_id.signature_class \
2020-07-14 21:18:32 +00:00
or self.ebics_userid_id.signature_class
2018-08-14 15:04:39 +00:00
if signature_class == 'T':
user.manual_approval = True
try:
client = EbicsClient(
bank, user, version=self.ebics_config_id.ebics_version)
except Exception:
2018-08-14 15:04:39 +00:00
self.note += '\n'
self.note += _("Unknown Error")
tb = ''.join(format_exception(*exc_info()))
self.note += '\n%s' % tb
client = False
return client
def _get_passphrase(self):
2020-07-14 21:18:32 +00:00
passphrase = self.ebics_userid_id.ebics_passphrase
2018-08-14 15:04:39 +00:00
if passphrase:
return passphrase
module = __name__.split('addons.')[1].split('.')[0]
passphrase_view = self.env.ref(
'%s.ebics_xfer_view_form_passphrase' % module)
return {
'name': _('EBICS file transfer'),
'res_id': self.id,
'view_type': 'form',
'view_mode': 'form',
'res_model': 'ebics.xfer',
'view_id': passphrase_view.id,
'target': 'new',
'context': self._context,
'type': 'ir.actions.act_window',
}
def _file_format_methods(self):
"""
Extend this dictionary in order to add support
for extra file formats.
"""
res = {
'camt.xxx.cfonb120.stm': self._handle_cfonb120,
2019-12-11 18:57:00 +00:00
'camt.052.001.02.stm': self._handle_camt052,
2018-08-14 15:04:39 +00:00
'camt.053.001.02.stm': self._handle_camt053,
}
return res
def _update_ef_vals(self, ef_vals):
"""
Adapt this method to customize the EBICS File values.
"""
if self.format_id and self.format_id.type == 'up':
fn = ef_vals['name']
dups = self._check_duplicate_ebics_file(
fn, self.format_id)
if dups:
n = 1
fn = '_'.join([fn, str(n)])
while self._check_duplicate_ebics_file(fn, self.format_id):
n += 1
fn = '_'.join([fn, str(n)])
ef_vals['name'] = fn
def _handle_download_data(self, data, file_format):
ebics_files = self.env['ebics.file']
if isinstance(data, dict):
for doc in data:
ebics_files += self._create_ebics_file(
data[doc], file_format, docname=doc)
else:
ebics_files += self._create_ebics_file(data, file_format)
return ebics_files
def _create_ebics_file(self, data, file_format, docname=None):
"""
Write the data as received over the EBICS connection
to a temporary file so that is is available for
analysis (e.g. in case formats are received that cannot
be handled in the current version of this module).
TODO: add code to clean-up /tmp on a regular basis.
After saving the data received we call the method to perform
file format specific processing.
"""
ebics_files_root = self.ebics_config_id.ebics_files
tmp_dir = os.path.normpath(ebics_files_root + '/tmp')
if not os.path.isdir(tmp_dir):
os.makedirs(tmp_dir, mode=0o700)
fn_parts = [self.ebics_config_id.ebics_host,
self.ebics_config_id.ebics_partner]
2018-08-14 15:04:39 +00:00
if docname:
fn_parts.append(docname)
else:
fn_date = self.date_to or fields.Date.today()
2019-10-17 20:05:42 +00:00
fn_parts.append(fn_date.isoformat())
2018-08-14 15:04:39 +00:00
base_fn = '_'.join(fn_parts)
n = 1
full_tmp_fn = os.path.normpath(tmp_dir + '/' + base_fn)
while os.path.exists(full_tmp_fn):
n += 1
tmp_fn = base_fn + '_' + str(n).rjust(3, '0')
full_tmp_fn = os.path.normpath(tmp_dir + '/' + tmp_fn)
with open(full_tmp_fn, 'wb') as f:
f.write(data)
ff_methods = self._file_format_methods()
if file_format.name in ff_methods:
data = ff_methods[file_format.name](data)
fn = '.'.join([base_fn, file_format.suffix])
dups = self._check_duplicate_ebics_file(fn, file_format)
if dups:
raise UserError(_(
"EBICS File with name '%s' has already been downloaded."
"\nPlease check this file and rename in case there is "
"no risk on duplicate transactions.")
% fn)
2021-04-11 18:23:07 +00:00
data = base64.encodebytes(data)
2018-08-14 15:04:39 +00:00
ef_vals = {
'name': fn,
'data': data,
'date': fields.Datetime.now(),
'date_from': self.date_from,
2020-07-18 15:57:48 +00:00
'date_to': self.date_to,
2018-08-14 15:04:39 +00:00
'format_id': file_format.id,
'user_id': self._uid,
'ebics_userid_id': self.ebics_userid_id.id,
'company_ids': self.ebics_config_id.company_ids.ids,
2018-08-14 15:04:39 +00:00
}
self._update_ef_vals(ef_vals)
ebics_file = self.env['ebics.file'].create(ef_vals)
return ebics_file
def _check_duplicate_ebics_file(self, fn, file_format):
dups = self.env['ebics.file'].search(
[('name', '=', fn),
('format_id', '=', file_format.id)])
2018-08-14 15:04:39 +00:00
return dups
def _detect_upload_format(self):
"""
Use this method in order to automatically detect and set the
EBICS upload file format.
"""
pass
def _update_order_number(self, OrderID):
o_list = list(OrderID)
for i, c in enumerate(reversed(o_list), start=1):
if c == '9':
o_list[-i] = 'A'
break
if c == 'Z':
continue
else:
o_list[-i] = chr(ord(c) + 1)
break
next_nr = ''.join(o_list)
if next_nr == 'ZZZZ':
next_nr = 'A000'
self.ebics_config_id.order_number = next_nr
2018-08-14 15:04:39 +00:00
def _insert_line_terminator(self, data_in, line_len):
2020-07-24 17:10:03 +00:00
data_in = data_in.replace(b'\n', b'').replace(b'\r', b'')
2018-08-14 15:04:39 +00:00
data_out = b''
max_len = len(data_in)
2018-08-14 15:04:39 +00:00
i = 0
while i + line_len <= max_len:
2018-08-14 15:04:39 +00:00
data_out += data_in[i:i + line_len] + b'\n'
i += line_len
return data_out
def _handle_cfonb120(self, data_in):
return self._insert_line_terminator(data_in, 120)
def _handle_cfonb240(self, data_in):
return self._insert_line_terminator(data_in, 240)
2019-12-11 18:57:00 +00:00
def _handle_camt052(self, data_in):
"""
Use this method if you need to fix camt files received
from your bank before passing them to the
Odoo Community CAMT parser.
Remark: Odoo Enterprise doesn't support camt.052.
"""
return data_in
2018-08-14 15:04:39 +00:00
def _handle_camt053(self, data_in):
"""
Use this method if you need to fix camt files received
from your bank before passing them to the
Odoo Enterprise or Community CAMT parser.
"""
return data_in