mirror of
https://github.com/brain-tec/account_ebics.git
synced 2025-08-02 16:59:20 +00:00
13 multi bank account support (#15)
* [13.0][IMP]add support for multiple bank accounts and multiple EBICS UserIDs over a single EBICS connection * ebics refactoring fixes * ebics refactoring fixes
This commit is contained in:
@@ -3,3 +3,4 @@ from . import account_bank_statement
|
||||
from . import ebics_config
|
||||
from . import ebics_file
|
||||
from . import ebics_file_format
|
||||
from . import ebics_userid
|
||||
|
@@ -1,4 +1,4 @@
|
||||
# Copyright 2009-2018 Noviat.
|
||||
# Copyright 2009-2020 Noviat.
|
||||
# License LGPL-3 or later (http://www.gnu.org/licenses/lpgl).
|
||||
|
||||
from odoo import fields, models
|
||||
|
@@ -1,22 +1,18 @@
|
||||
# Copyright 2009-2020 Noviat.
|
||||
# License LGPL-3 or later (http://www.gnu.org/licenses/lpgl).
|
||||
|
||||
import base64
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
from sys import exc_info
|
||||
from urllib.error import URLError
|
||||
|
||||
from odoo import api, fields, models, _
|
||||
from odoo import _, api, fields, models
|
||||
from odoo.exceptions import UserError
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
import fintech
|
||||
from fintech.ebics import EbicsKeyRing, EbicsBank, EbicsUser,\
|
||||
EbicsClient, EbicsFunctionalError, EbicsTechnicalError
|
||||
from fintech.ebics import EbicsBank
|
||||
fintech.cryptolib = 'cryptography'
|
||||
except ImportError:
|
||||
EbicsBank = object
|
||||
@@ -38,27 +34,20 @@ class EbicsConfig(models.Model):
|
||||
"""
|
||||
EBICS configuration is stored in a separate object in order to
|
||||
allow extra security policies on this object.
|
||||
|
||||
Remark:
|
||||
This Configuration model implements a simple model of the relationship
|
||||
between users and authorizations and may need to be adapted
|
||||
in next versions of this module to cope with higher complexity .
|
||||
"""
|
||||
_name = 'ebics.config'
|
||||
_description = 'EBICS Configuration'
|
||||
_order = 'name'
|
||||
|
||||
name = fields.Char(string='Name', required=True)
|
||||
company_partner_id = fields.Many2one(
|
||||
comodel_name='res.partner',
|
||||
related='company_id.partner_id',
|
||||
string='Account Holder',
|
||||
readonly=True, store=False)
|
||||
bank_id = fields.Many2one(
|
||||
comodel_name='res.partner.bank',
|
||||
name = fields.Char(
|
||||
string='Name',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
string='Bank Account',
|
||||
domain="[('partner_id','=', company_partner_id)]",
|
||||
required=True)
|
||||
journal_ids = fields.Many2many(
|
||||
comodel_name='account.journal',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
string='Bank Accounts',
|
||||
domain="[('type', '=', 'bank')]",
|
||||
required=True)
|
||||
ebics_host = fields.Char(
|
||||
string='EBICS HostID', required=True,
|
||||
@@ -88,8 +77,9 @@ class EbicsConfig(models.Model):
|
||||
"communicate with the EBICS bank server and the authorisations "
|
||||
"that these users will possess. "
|
||||
"\nIt is identified by the PartnerID.")
|
||||
ebics_user = fields.Char(
|
||||
string='EBICS UserID', required=True,
|
||||
ebics_userid_ids = fields.One2many(
|
||||
comodel_name='ebics.userid',
|
||||
inverse_name='ebics_config_id',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
help="Human users or a technical system that is/are "
|
||||
"assigned to a customer. "
|
||||
@@ -98,35 +88,18 @@ class EbicsConfig(models.Model):
|
||||
"The technical subscriber serves only for the data exchange "
|
||||
"between customer and financial institution. "
|
||||
"The human user also can authorise orders.")
|
||||
# Currently only a singe signature class per user is supported
|
||||
# Classes A and B are not yet supported.
|
||||
signature_class = fields.Selection(
|
||||
selection=[('E', 'Single signature'),
|
||||
('T', 'Transport signature')],
|
||||
string='Signature Class',
|
||||
required=True, default='T',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
help="Default signature class."
|
||||
"This default can be overriden for specific "
|
||||
"EBICS transactions (cf. File Formats).")
|
||||
ebics_files = fields.Char(
|
||||
string='EBICS Files Root', required=True,
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
default=lambda self: self._default_ebics_files(),
|
||||
help="Root Directory for EBICS File Transfer Folders.")
|
||||
|
||||
# We store the EBICS keys in a separate directory in the file system.
|
||||
# This directory requires special protection to reduce fraude.
|
||||
ebics_keys = fields.Char(
|
||||
string='EBICS Keys', required=True,
|
||||
string='EBICS Keys Root', required=True,
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
default=lambda self: self._default_ebics_keys(),
|
||||
help="File holding the EBICS Keys."
|
||||
"\nSpecify the full path (directory + filename).")
|
||||
ebics_keys_found = fields.Boolean(
|
||||
compute='_compute_ebics_keys_found')
|
||||
ebics_passphrase = fields.Char(
|
||||
string='EBICS Passphrase')
|
||||
help="Root Directory for storing the EBICS Keys.")
|
||||
ebics_key_version = fields.Selection(
|
||||
selection=[('A005', 'A005 (RSASSA-PKCS1-v1_5)'),
|
||||
('A006', 'A006 (RSASSA-PSS)')],
|
||||
@@ -140,51 +113,6 @@ class EbicsConfig(models.Model):
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
help="The bit length of the generated keys. "
|
||||
"\nThe value must be between 1536 and 4096.")
|
||||
ebics_ini_letter = fields.Binary(
|
||||
string='EBICS INI Letter', readonly=True,
|
||||
help="INI-letter PDF document to be sent to your bank.")
|
||||
ebics_ini_letter_fn = fields.Char(
|
||||
string='INI-letter Filename', readonly=True)
|
||||
ebics_public_bank_keys = fields.Binary(
|
||||
string='EBICS Public Bank Keys', readonly=True,
|
||||
help="EBICS Public Bank Keys to be checked for consistency.")
|
||||
ebics_public_bank_keys_fn = fields.Char(
|
||||
string='EBICS Public Bank Keys Filename', readonly=True)
|
||||
|
||||
# X.509 Distinguished Name attributes used to
|
||||
# create self-signed X.509 certificates
|
||||
ebics_key_x509 = fields.Boolean(
|
||||
string='X509 support',
|
||||
help="Set this flag in order to work with "
|
||||
"self-signed X.509 certificates")
|
||||
ebics_key_x509_dn_cn = fields.Char(
|
||||
string='Common Name [CN]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_o = fields.Char(
|
||||
string='Organization Name [O]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_ou = fields.Char(
|
||||
string='Organizational Unit Name [OU]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_c = fields.Char(
|
||||
string='Country Name [C]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_st = fields.Char(
|
||||
string='State Or Province Name [ST]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_l = fields.Char(
|
||||
string='Locality Name [L]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_e = fields.Char(
|
||||
string='Email Address',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_file_format_ids = fields.Many2many(
|
||||
comodel_name='ebics.file.format',
|
||||
column1='config_id', column2='format_id',
|
||||
@@ -193,10 +121,7 @@ class EbicsConfig(models.Model):
|
||||
)
|
||||
state = fields.Selection(
|
||||
[('draft', 'Draft'),
|
||||
('init', 'Initialisation'),
|
||||
('get_bank_keys', 'Get Keys from Bank'),
|
||||
('to_verify', 'Verification'),
|
||||
('active', 'Active')],
|
||||
('confirm', 'Confirmed')],
|
||||
string='State',
|
||||
default='draft',
|
||||
required=True, readonly=True)
|
||||
@@ -207,10 +132,11 @@ class EbicsConfig(models.Model):
|
||||
"[A-Z]{1}[A-Z0-9]{3}")
|
||||
active = fields.Boolean(
|
||||
string='Active', default=True)
|
||||
company_id = fields.Many2one(
|
||||
'res.company', string='Company',
|
||||
default=lambda self: self.env.user.company_id,
|
||||
required=True)
|
||||
company_ids = fields.Many2many(
|
||||
comodel_name='res.company',
|
||||
string='Companies',
|
||||
required=True,
|
||||
help="Companies sharing this EBICS contract.")
|
||||
|
||||
@api.model
|
||||
def _default_ebics_files(self):
|
||||
@@ -218,22 +144,7 @@ class EbicsConfig(models.Model):
|
||||
|
||||
@api.model
|
||||
def _default_ebics_keys(self):
|
||||
return '/'.join(['/etc/odoo/ebics_keys',
|
||||
self._cr.dbname,
|
||||
'mykeys'])
|
||||
|
||||
@api.depends('ebics_keys')
|
||||
def _compute_ebics_keys_found(self):
|
||||
for cfg in self:
|
||||
cfg.ebics_keys_found = (
|
||||
cfg.ebics_keys and os.path.isfile(cfg.ebics_keys))
|
||||
|
||||
@api.constrains('ebics_passphrase')
|
||||
def _check_ebics_passphrase(self):
|
||||
for cfg in self:
|
||||
if not cfg.ebics_passphrase or len(cfg.ebics_passphrase) < 8:
|
||||
raise UserError(_(
|
||||
"The passphrase must be at least 8 characters long"))
|
||||
return '/'.join(['/etc/odoo/ebics_keys', self._cr.dbname])
|
||||
|
||||
@api.constrains('order_number')
|
||||
def _check_order_number(self):
|
||||
@@ -252,223 +163,22 @@ class EbicsConfig(models.Model):
|
||||
"Order Number should comply with the following pattern:"
|
||||
"\n[A-Z]{1}[A-Z0-9]{3}"))
|
||||
|
||||
@api.onchange('journal_ids')
|
||||
def _onchange_journal_ids(self):
|
||||
self.company_ids = self.journal_ids.mapped('company_id')
|
||||
|
||||
def unlink(self):
|
||||
for ebics_config in self:
|
||||
if ebics_config.state == 'active':
|
||||
raise UserError(_(
|
||||
"You cannot remove active EBICS congirations."))
|
||||
"You cannot remove active EBICS configurations."))
|
||||
return super(EbicsConfig, self).unlink()
|
||||
|
||||
def set_to_draft(self):
|
||||
return self.write({'state': 'draft'})
|
||||
|
||||
def set_to_get_bank_keys(self):
|
||||
return self.write({'state': 'get_bank_keys'})
|
||||
|
||||
def set_to_active(self):
|
||||
return self.write({'state': 'active'})
|
||||
|
||||
def ebics_init_1(self):
|
||||
"""
|
||||
Initialization of bank keys - Step 1:
|
||||
Create new keys and certificates for this user
|
||||
"""
|
||||
self.ensure_one()
|
||||
self._check_ebics_files()
|
||||
if self.state != 'draft':
|
||||
raise UserError(
|
||||
_("Set state to 'draft' before Bank Key (re)initialisation."))
|
||||
|
||||
if not self.ebics_passphrase:
|
||||
raise UserError(
|
||||
_("Set a passphrase."))
|
||||
|
||||
try:
|
||||
keyring = EbicsKeyRing(
|
||||
keys=self.ebics_keys,
|
||||
passphrase=self.ebics_passphrase)
|
||||
bank = EbicsBank(
|
||||
keyring=keyring, hostid=self.ebics_host, url=self.ebics_url)
|
||||
user = EbicsUser(
|
||||
keyring=keyring, partnerid=self.ebics_partner,
|
||||
userid=self.ebics_user)
|
||||
except Exception:
|
||||
exctype, value = exc_info()[:2]
|
||||
error = _("EBICS Initialisation Error:")
|
||||
error += '\n' + str(exctype) + '\n' + str(value)
|
||||
raise UserError(error)
|
||||
|
||||
self._check_ebics_keys()
|
||||
if not os.path.isfile(self.ebics_keys):
|
||||
try:
|
||||
user.create_keys(
|
||||
keyversion=self.ebics_key_version,
|
||||
bitlength=self.ebics_key_bitlength)
|
||||
except Exception:
|
||||
exctype, value = exc_info()[:2]
|
||||
error = _("EBICS Initialisation Error:")
|
||||
error += '\n' + str(exctype) + '\n' + str(value)
|
||||
raise UserError(error)
|
||||
|
||||
if self.ebics_key_x509:
|
||||
dn_attrs = {
|
||||
'commonName': self.ebics_key_x509_dn_cn,
|
||||
'organizationName': self.ebics_key_x509_dn_o,
|
||||
'organizationalUnitName': self.ebics_key_x509_dn_ou,
|
||||
'countryName': self.ebics_key_x509_dn_c,
|
||||
'stateOrProvinceName': self.ebics_key_x509_dn_st,
|
||||
'localityName': self.ebics_key_x509_dn_l,
|
||||
'emailAddress': self.ebics_key_x509_dn_e,
|
||||
}
|
||||
kwargs = {k: v for k, v in dn_attrs.items() if v}
|
||||
user.create_certificates(**kwargs)
|
||||
|
||||
client = EbicsClient(bank, user, version=self.ebics_version)
|
||||
|
||||
# Send the public electronic signature key to the bank.
|
||||
try:
|
||||
if self.ebics_version == 'H003':
|
||||
bank._order_number = self._get_order_number()
|
||||
OrderID = client.INI()
|
||||
_logger.info(
|
||||
'%s, EBICS INI command, OrderID=%s', self._name, OrderID)
|
||||
if self.ebics_version == 'H003':
|
||||
self._update_order_number(OrderID)
|
||||
except URLError:
|
||||
exctype, value = exc_info()[:2]
|
||||
raise UserError(_(
|
||||
"urlopen error:\n url '%s' - %s")
|
||||
% (self.ebics_url, str(value)))
|
||||
except EbicsFunctionalError:
|
||||
e = exc_info()
|
||||
error = _("EBICS Functional Error:")
|
||||
error += '\n'
|
||||
error += '%s (code: %s)' % (e[1].message, e[1].code)
|
||||
raise UserError(error)
|
||||
except EbicsTechnicalError:
|
||||
e = exc_info()
|
||||
error = _("EBICS Technical Error:")
|
||||
error += '\n'
|
||||
error += '%s (code: %s)' % (e[1].message, e[1].code)
|
||||
raise UserError(error)
|
||||
|
||||
# Send the public authentication and encryption keys to the bank.
|
||||
if self.ebics_version == 'H003':
|
||||
bank._order_number = self._get_order_number()
|
||||
OrderID = client.HIA()
|
||||
_logger.info('%s, EBICS HIA command, OrderID=%s', self._name, OrderID)
|
||||
if self.ebics_version == 'H003':
|
||||
self._update_order_number(OrderID)
|
||||
|
||||
# Create an INI-letter which must be printed and sent to the bank.
|
||||
cc = self.bank_id.bank_id.country.code
|
||||
if cc in ['FR', 'DE']:
|
||||
lang = cc
|
||||
else:
|
||||
lang = self.env.user.lang or \
|
||||
self.env['res.lang'].search([])[0].code
|
||||
lang = lang[:2]
|
||||
tmp_dir = os.path.normpath(self.ebics_files + '/tmp')
|
||||
if not os.path.isdir(tmp_dir):
|
||||
os.makedirs(tmp_dir, mode=0o700)
|
||||
fn_date = fields.Date.today().isoformat()
|
||||
fn = '_'.join([self.ebics_host, 'ini_letter', fn_date]) + '.pdf'
|
||||
full_tmp_fn = os.path.normpath(tmp_dir + '/' + fn)
|
||||
user.create_ini_letter(
|
||||
bankname=self.bank_id.bank_id.name,
|
||||
path=full_tmp_fn,
|
||||
lang=lang)
|
||||
with open(full_tmp_fn, 'rb') as f:
|
||||
letter = f.read()
|
||||
self.write({
|
||||
'ebics_ini_letter': base64.encodestring(letter),
|
||||
'ebics_ini_letter_fn': fn,
|
||||
})
|
||||
|
||||
return self.write({'state': 'init'})
|
||||
|
||||
def ebics_init_2(self):
|
||||
"""
|
||||
Initialization of bank keys - Step 2:
|
||||
Activation of the account by the bank.
|
||||
"""
|
||||
if self.state != 'init':
|
||||
raise UserError(
|
||||
_("Set state to 'Initialisation'."))
|
||||
self.ensure_one()
|
||||
return self.write({'state': 'get_bank_keys'})
|
||||
|
||||
def ebics_init_3(self):
|
||||
"""
|
||||
Initialization of bank keys - Step 3:
|
||||
|
||||
After the account has been activated the public bank keys
|
||||
must be downloaded and checked for consistency.
|
||||
"""
|
||||
self.ensure_one()
|
||||
self._check_ebics_files()
|
||||
if self.state != 'get_bank_keys':
|
||||
raise UserError(
|
||||
_("Set state to 'Get Keys from Bank'."))
|
||||
keyring = EbicsKeyRing(
|
||||
keys=self.ebics_keys, passphrase=self.ebics_passphrase)
|
||||
bank = EbicsBank(
|
||||
keyring=keyring, hostid=self.ebics_host, url=self.ebics_url)
|
||||
user = EbicsUser(
|
||||
keyring=keyring, partnerid=self.ebics_partner,
|
||||
userid=self.ebics_user)
|
||||
client = EbicsClient(
|
||||
bank, user, version=self.ebics_version)
|
||||
|
||||
public_bank_keys = client.HPB()
|
||||
public_bank_keys = public_bank_keys.encode()
|
||||
tmp_dir = os.path.normpath(self.ebics_files + '/tmp')
|
||||
if not os.path.isdir(tmp_dir):
|
||||
os.makedirs(tmp_dir, mode=0o700)
|
||||
fn_date = fields.Date.today().isoformat()
|
||||
fn = '_'.join([self.ebics_host, 'public_bank_keys', fn_date]) + '.txt'
|
||||
self.write({
|
||||
'ebics_public_bank_keys': base64.encodestring(public_bank_keys),
|
||||
'ebics_public_bank_keys_fn': fn,
|
||||
'state': 'to_verify',
|
||||
})
|
||||
|
||||
return True
|
||||
|
||||
def ebics_init_4(self):
|
||||
"""
|
||||
Initialization of bank keys - Step 2:
|
||||
Confirm Verification of the public bank keys
|
||||
and activate the bank keyu.
|
||||
"""
|
||||
self.ensure_one()
|
||||
if self.state != 'to_verify':
|
||||
raise UserError(
|
||||
_("Set state to 'Verification'."))
|
||||
|
||||
keyring = EbicsKeyRing(
|
||||
keys=self.ebics_keys, passphrase=self.ebics_passphrase)
|
||||
bank = EbicsBank(
|
||||
keyring=keyring, hostid=self.ebics_host, url=self.ebics_url)
|
||||
bank.activate_keys()
|
||||
return self.write({'state': 'active'})
|
||||
|
||||
def change_passphrase(self):
|
||||
self.ensure_one()
|
||||
ctx = dict(self._context, default_ebics_config_id=self.id)
|
||||
module = __name__.split('addons.')[1].split('.')[0]
|
||||
view = self.env.ref(
|
||||
'%s.ebics_change_passphrase_view_form' % module)
|
||||
return {
|
||||
'name': _('EBICS keys change passphrase'),
|
||||
'view_type': 'form',
|
||||
'view_mode': 'form',
|
||||
'res_model': 'ebics.change.passphrase',
|
||||
'view_id': view.id,
|
||||
'target': 'new',
|
||||
'context': ctx,
|
||||
'type': 'ir.actions.act_window',
|
||||
}
|
||||
def set_to_confirm(self):
|
||||
return self.write({'state': 'confirm'})
|
||||
|
||||
def _get_order_number(self):
|
||||
return self.order_number
|
||||
@@ -490,18 +200,12 @@ class EbicsConfig(models.Model):
|
||||
self.order_number = next
|
||||
|
||||
def _check_ebics_keys(self):
|
||||
if self.ebics_keys:
|
||||
dirname = os.path.dirname(self.ebics_keys)
|
||||
if not os.path.exists(dirname):
|
||||
raise UserError(_(
|
||||
"EBICS Keys Directory '%s' is not available."
|
||||
"\nPlease contact your system administrator.")
|
||||
% dirname)
|
||||
if os.path.isdir(self.ebics_keys):
|
||||
dirname = self.ebics_keys or ''
|
||||
if not os.path.exists(dirname):
|
||||
raise UserError(_(
|
||||
"Configuration Error.\n"
|
||||
"The 'EBICS Keys' parameter should be a full path "
|
||||
"(directory + filename) not a directory name."))
|
||||
"EBICS Keys Root Directory %s is not available."
|
||||
"\nPlease contact your system administrator.")
|
||||
% dirname)
|
||||
|
||||
def _check_ebics_files(self):
|
||||
dirname = self.ebics_files or ''
|
||||
|
@@ -1,9 +1,10 @@
|
||||
# Copyright 2009-2020 Noviat.
|
||||
# License LGPL-3 or later (http://www.gnu.org/licenses/lpgl).
|
||||
|
||||
import base64
|
||||
import logging
|
||||
|
||||
from odoo import api, fields, models, _
|
||||
from odoo import _, fields, models
|
||||
from odoo.exceptions import UserError
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
@@ -13,6 +14,10 @@ class EbicsFile(models.Model):
|
||||
_name = 'ebics.file'
|
||||
_description = 'Object to store EBICS Data Files'
|
||||
_order = 'date desc'
|
||||
_sql_constraints = [
|
||||
('name_uniq', 'unique (name, format_id)',
|
||||
'This File has already been down- or uploaded !')
|
||||
]
|
||||
|
||||
name = fields.Char(string='Filename')
|
||||
data = fields.Binary(string='File', readonly=True)
|
||||
@@ -46,25 +51,17 @@ class EbicsFile(models.Model):
|
||||
comodel_name='res.users', string='User',
|
||||
default=lambda self: self.env.user,
|
||||
readonly=True)
|
||||
ebics_userid_id = fields.Many2one(
|
||||
comodel_name='ebics.userid',
|
||||
string='EBICS UserID',
|
||||
ondelete='restrict',
|
||||
readonly=True)
|
||||
note = fields.Text(string='Notes')
|
||||
note_process = fields.Text(string='Notes')
|
||||
company_id = fields.Many2one(
|
||||
company_ids = fields.Many2many(
|
||||
comodel_name='res.company',
|
||||
string='Company',
|
||||
default=lambda self: self._default_company_id())
|
||||
|
||||
_sql_constraints = [
|
||||
('name_company_uniq', 'unique (name, company_id, format_id)',
|
||||
'This File has already been imported !')
|
||||
]
|
||||
|
||||
@api.model
|
||||
def _default_company_id(self):
|
||||
"""
|
||||
Adapt this method in case your bank provides transactions
|
||||
of multiple legal entities in a single EBICS File.
|
||||
"""
|
||||
return self.env.user.company_id
|
||||
string='Companies',
|
||||
help="Companies sharing this EBICS file.")
|
||||
|
||||
def unlink(self):
|
||||
ff_methods = self._file_format_methods()
|
||||
@@ -168,11 +165,25 @@ class EbicsFile(models.Model):
|
||||
import_module = 'account_bank_statement_import_fr_cfonb'
|
||||
self._check_import_module(import_module)
|
||||
wiz_model = 'account.bank.statement.import'
|
||||
wiz_vals = {
|
||||
'attachment_ids': [(0, 0, {'name': self.name,
|
||||
'datas': self.data,
|
||||
'store_fname': self.name})]}
|
||||
wiz = self.env[wiz_model].create(wiz_vals)
|
||||
data_file = base64.b64decode(self.data)
|
||||
lines = data_file.split(b'\n')
|
||||
att_vals = []
|
||||
st_lines = b''
|
||||
for line in lines:
|
||||
rec_type = line[0:2]
|
||||
acc_number = line[21:32]
|
||||
st_lines += line + b'\n'
|
||||
if rec_type == b'07':
|
||||
fn = '_'.join([acc_number.decode(), self.name])
|
||||
att_vals.append({
|
||||
'name': fn,
|
||||
'store_fname': fn,
|
||||
'datas': base64.b64encode(st_lines)
|
||||
})
|
||||
st_lines = b''
|
||||
wiz_vals = {'attachment_ids': [(0, 0, x) for x in att_vals]}
|
||||
wiz_ctx = dict(self.env.context, active_model='ebics.file')
|
||||
wiz = self.env[wiz_model].with_context(wiz_ctx).create(wiz_vals)
|
||||
res = wiz.import_file()
|
||||
notifications = []
|
||||
statement_ids = []
|
||||
@@ -224,10 +235,11 @@ class EbicsFile(models.Model):
|
||||
import_module = 'account_bank_statement_import_camt%'
|
||||
self._check_import_module(import_module)
|
||||
wiz_model = 'account.bank.statement.import'
|
||||
|
||||
wiz_vals = {
|
||||
'data_file': self.data,
|
||||
'filename': self.name,
|
||||
}
|
||||
'attachment_ids': [(0, 0, {'name': self.name,
|
||||
'datas': self.data,
|
||||
'store_fname': self.name})]}
|
||||
ctx = dict(self.env.context, active_model='ebics.file')
|
||||
wiz = self.env[wiz_model].with_context(ctx).create(wiz_vals)
|
||||
res = wiz.import_file()
|
||||
|
358
account_ebics/models/ebics_userid.py
Normal file
358
account_ebics/models/ebics_userid.py
Normal file
@@ -0,0 +1,358 @@
|
||||
# Copyright 2009-2020 Noviat.
|
||||
# License LGPL-3 or later (http://www.gnu.org/licenses/lpgl).
|
||||
|
||||
import base64
|
||||
import logging
|
||||
import os
|
||||
from sys import exc_info
|
||||
from urllib.error import URLError
|
||||
|
||||
from odoo import _, api, fields, models
|
||||
from odoo.exceptions import UserError
|
||||
|
||||
_logger = logging.getLogger(__name__)
|
||||
|
||||
try:
|
||||
import fintech
|
||||
from fintech.ebics import EbicsKeyRing, EbicsBank, EbicsUser,\
|
||||
EbicsClient, EbicsFunctionalError, EbicsTechnicalError
|
||||
fintech.cryptolib = 'cryptography'
|
||||
except ImportError:
|
||||
_logger.warning('Failed to import fintech')
|
||||
|
||||
|
||||
class EbicsUserID(models.Model):
|
||||
_name = 'ebics.userid'
|
||||
_description = 'EBICS UserID'
|
||||
_order = 'name'
|
||||
|
||||
name = fields.Char(
|
||||
string='EBICS UserID', required=True,
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
help="Human users or a technical system that is/are "
|
||||
"assigned to a customer. "
|
||||
"\nOn the EBICS bank server it is identified "
|
||||
"by the combination of UserID and PartnerID. "
|
||||
"The technical subscriber serves only for the data exchange "
|
||||
"between customer and financial institution. "
|
||||
"The human user also can authorise orders.")
|
||||
ebics_config_id = fields.Many2one(
|
||||
comodel_name='ebics.config',
|
||||
string='EBICS Configuration',
|
||||
ondelete='cascade')
|
||||
user_ids = fields.Many2many(
|
||||
comodel_name='res.users',
|
||||
string='Users',
|
||||
required=True,
|
||||
help="Users who are allowed to use this EBICS UserID for "
|
||||
" bank transactions.")
|
||||
# Currently only a singe signature class per user is supported
|
||||
# Classes A and B are not yet supported.
|
||||
signature_class = fields.Selection(
|
||||
selection=[('E', 'Single signature'),
|
||||
('T', 'Transport signature')],
|
||||
string='Signature Class',
|
||||
required=True, default='T',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
help="Default signature class."
|
||||
"This default can be overriden for specific "
|
||||
"EBICS transactions (cf. File Formats).")
|
||||
ebics_keys_fn = fields.Char(
|
||||
compute='_compute_ebics_keys_fn')
|
||||
ebics_keys_found = fields.Boolean(
|
||||
compute='_compute_ebics_keys_found')
|
||||
ebics_passphrase = fields.Char(
|
||||
string='EBICS Passphrase')
|
||||
ebics_ini_letter = fields.Binary(
|
||||
string='EBICS INI Letter', readonly=True,
|
||||
help="INI-letter PDF document to be sent to your bank.")
|
||||
ebics_ini_letter_fn = fields.Char(
|
||||
string='INI-letter Filename', readonly=True)
|
||||
ebics_public_bank_keys = fields.Binary(
|
||||
string='EBICS Public Bank Keys', readonly=True,
|
||||
help="EBICS Public Bank Keys to be checked for consistency.")
|
||||
ebics_public_bank_keys_fn = fields.Char(
|
||||
string='EBICS Public Bank Keys Filename', readonly=True)
|
||||
# X.509 Distinguished Name attributes used to
|
||||
# create self-signed X.509 certificates
|
||||
ebics_key_x509 = fields.Boolean(
|
||||
string='X509 support',
|
||||
help="Set this flag in order to work with "
|
||||
"self-signed X.509 certificates")
|
||||
ebics_key_x509_dn_cn = fields.Char(
|
||||
string='Common Name [CN]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_o = fields.Char(
|
||||
string='Organization Name [O]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_ou = fields.Char(
|
||||
string='Organizational Unit Name [OU]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_c = fields.Char(
|
||||
string='Country Name [C]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_st = fields.Char(
|
||||
string='State Or Province Name [ST]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_l = fields.Char(
|
||||
string='Locality Name [L]',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
ebics_key_x509_dn_e = fields.Char(
|
||||
string='Email Address',
|
||||
readonly=True, states={'draft': [('readonly', False)]},
|
||||
)
|
||||
state = fields.Selection(
|
||||
[('draft', 'Draft'),
|
||||
('init', 'Initialisation'),
|
||||
('get_bank_keys', 'Get Keys from Bank'),
|
||||
('to_verify', 'Verification'),
|
||||
('active_keys', 'Active Keys')],
|
||||
string='State',
|
||||
default='draft',
|
||||
required=True, readonly=True)
|
||||
active = fields.Boolean(
|
||||
string='Active', default=True)
|
||||
company_ids = fields.Many2many(
|
||||
comodel_name='res.company',
|
||||
string='Companies',
|
||||
required=True,
|
||||
help="Companies sharing this EBICS contract.")
|
||||
|
||||
@api.depends('name')
|
||||
def _compute_ebics_keys_fn(self):
|
||||
for rec in self:
|
||||
keys_dir = rec.ebics_config_id.ebics_keys
|
||||
rec.ebics_keys_fn = (
|
||||
rec.name
|
||||
and keys_dir
|
||||
and os.path.isfile(
|
||||
keys_dir + '/' + rec.name + '_keys'))
|
||||
|
||||
@api.depends('ebics_keys_fn')
|
||||
def _compute_ebics_keys_found(self):
|
||||
for rec in self:
|
||||
rec.ebics_keys_found = (
|
||||
rec.ebics_keys_fn
|
||||
and os.path.isfile(rec.ebics_keys_fn)
|
||||
)
|
||||
|
||||
@api.constrains('ebics_passphrase')
|
||||
def _check_ebics_passphrase(self):
|
||||
for rec in self:
|
||||
if not rec.ebics_passphrase or len(rec.ebics_passphrase) < 8:
|
||||
raise UserError(_(
|
||||
"The passphrase must be at least 8 characters long"))
|
||||
|
||||
def set_to_draft(self):
|
||||
return self.write({'state': 'draft'})
|
||||
|
||||
def set_to_get_bank_keys(self):
|
||||
return self.write({'state': 'get_bank_keys'})
|
||||
|
||||
def ebics_init_1(self):
|
||||
"""
|
||||
Initialization of bank keys - Step 1:
|
||||
Create new keys and certificates for this user
|
||||
"""
|
||||
self.ensure_one()
|
||||
self._check_ebics_files()
|
||||
if self.state != 'draft':
|
||||
raise UserError(
|
||||
_("Set state to 'draft' before Bank Key (re)initialisation."))
|
||||
|
||||
if not self.ebics_passphrase:
|
||||
raise UserError(
|
||||
_("Set a passphrase."))
|
||||
|
||||
try:
|
||||
keyring = EbicsKeyRing(
|
||||
keys=self.ebics_keys,
|
||||
passphrase=self.ebics_passphrase)
|
||||
bank = EbicsBank(
|
||||
keyring=keyring, hostid=self.ebics_host, url=self.ebics_url)
|
||||
user = EbicsUser(
|
||||
keyring=keyring, partnerid=self.ebics_partner,
|
||||
userid=self.name)
|
||||
except Exception:
|
||||
exctype, value = exc_info()[:2]
|
||||
error = _("EBICS Initialisation Error:")
|
||||
error += '\n' + str(exctype) + '\n' + str(value)
|
||||
raise UserError(error)
|
||||
|
||||
self._check_ebics_keys()
|
||||
if not os.path.isfile(self.ebics_keys):
|
||||
try:
|
||||
user.create_keys(
|
||||
keyversion=self.ebics_key_version,
|
||||
bitlength=self.ebics_key_bitlength)
|
||||
except Exception:
|
||||
exctype, value = exc_info()[:2]
|
||||
error = _("EBICS Initialisation Error:")
|
||||
error += '\n' + str(exctype) + '\n' + str(value)
|
||||
raise UserError(error)
|
||||
|
||||
if self.ebics_key_x509:
|
||||
dn_attrs = {
|
||||
'commonName': self.ebics_key_x509_dn_cn,
|
||||
'organizationName': self.ebics_key_x509_dn_o,
|
||||
'organizationalUnitName': self.ebics_key_x509_dn_ou,
|
||||
'countryName': self.ebics_key_x509_dn_c,
|
||||
'stateOrProvinceName': self.ebics_key_x509_dn_st,
|
||||
'localityName': self.ebics_key_x509_dn_l,
|
||||
'emailAddress': self.ebics_key_x509_dn_e,
|
||||
}
|
||||
kwargs = {k: v for k, v in dn_attrs.items() if v}
|
||||
user.create_certificates(**kwargs)
|
||||
|
||||
client = EbicsClient(bank, user, version=self.ebics_version)
|
||||
|
||||
# Send the public electronic signature key to the bank.
|
||||
try:
|
||||
if self.ebics_version == 'H003':
|
||||
bank._order_number = self._get_order_number()
|
||||
OrderID = client.INI()
|
||||
_logger.info(
|
||||
'%s, EBICS INI command, OrderID=%s', self._name, OrderID)
|
||||
if self.ebics_version == 'H003':
|
||||
self._update_order_number(OrderID)
|
||||
except URLError:
|
||||
exctype, value = exc_info()[:2]
|
||||
raise UserError(_(
|
||||
"urlopen error:\n url '%s' - %s")
|
||||
% (self.ebics_url, str(value)))
|
||||
except EbicsFunctionalError:
|
||||
e = exc_info()
|
||||
error = _("EBICS Functional Error:")
|
||||
error += '\n'
|
||||
error += '%s (code: %s)' % (e[1].message, e[1].code)
|
||||
raise UserError(error)
|
||||
except EbicsTechnicalError:
|
||||
e = exc_info()
|
||||
error = _("EBICS Technical Error:")
|
||||
error += '\n'
|
||||
error += '%s (code: %s)' % (e[1].message, e[1].code)
|
||||
raise UserError(error)
|
||||
|
||||
# Send the public authentication and encryption keys to the bank.
|
||||
if self.ebics_version == 'H003':
|
||||
bank._order_number = self._get_order_number()
|
||||
OrderID = client.HIA()
|
||||
_logger.info('%s, EBICS HIA command, OrderID=%s', self._name, OrderID)
|
||||
if self.ebics_version == 'H003':
|
||||
self._update_order_number(OrderID)
|
||||
|
||||
# Create an INI-letter which must be printed and sent to the bank.
|
||||
cc = self.bank_id.bank_id.country.code
|
||||
if cc in ['FR', 'DE']:
|
||||
lang = cc
|
||||
else:
|
||||
lang = self.env.user.lang or \
|
||||
self.env['res.lang'].search([])[0].code
|
||||
lang = lang[:2]
|
||||
tmp_dir = os.path.normpath(self.ebics_files + '/tmp')
|
||||
if not os.path.isdir(tmp_dir):
|
||||
os.makedirs(tmp_dir, mode=0o700)
|
||||
fn_date = fields.Date.today().isoformat()
|
||||
fn = '_'.join([self.ebics_host, 'ini_letter', fn_date]) + '.pdf'
|
||||
full_tmp_fn = os.path.normpath(tmp_dir + '/' + fn)
|
||||
user.create_ini_letter(
|
||||
bankname=self.bank_id.bank_id.name,
|
||||
path=full_tmp_fn,
|
||||
lang=lang)
|
||||
with open(full_tmp_fn, 'rb') as f:
|
||||
letter = f.read()
|
||||
self.write({
|
||||
'ebics_ini_letter': base64.encodestring(letter),
|
||||
'ebics_ini_letter_fn': fn,
|
||||
})
|
||||
|
||||
return self.write({'state': 'init'})
|
||||
|
||||
def ebics_init_2(self):
|
||||
"""
|
||||
Initialization of bank keys - Step 2:
|
||||
Activation of the account by the bank.
|
||||
"""
|
||||
if self.state != 'init':
|
||||
raise UserError(
|
||||
_("Set state to 'Initialisation'."))
|
||||
self.ensure_one()
|
||||
return self.write({'state': 'get_bank_keys'})
|
||||
|
||||
def ebics_init_3(self):
|
||||
"""
|
||||
Initialization of bank keys - Step 3:
|
||||
|
||||
After the account has been activated the public bank keys
|
||||
must be downloaded and checked for consistency.
|
||||
"""
|
||||
self.ensure_one()
|
||||
self._check_ebics_files()
|
||||
if self.state != 'get_bank_keys':
|
||||
raise UserError(
|
||||
_("Set state to 'Get Keys from Bank'."))
|
||||
keyring = EbicsKeyRing(
|
||||
keys=self.ebics_keys, passphrase=self.ebics_passphrase)
|
||||
bank = EbicsBank(
|
||||
keyring=keyring, hostid=self.ebics_host, url=self.ebics_url)
|
||||
user = EbicsUser(
|
||||
keyring=keyring, partnerid=self.ebics_partner,
|
||||
userid=self.name)
|
||||
client = EbicsClient(
|
||||
bank, user, version=self.ebics_version)
|
||||
|
||||
public_bank_keys = client.HPB()
|
||||
public_bank_keys = public_bank_keys.encode()
|
||||
tmp_dir = os.path.normpath(self.ebics_files + '/tmp')
|
||||
if not os.path.isdir(tmp_dir):
|
||||
os.makedirs(tmp_dir, mode=0o700)
|
||||
fn_date = fields.Date.today().isoformat()
|
||||
fn = '_'.join([self.ebics_host, 'public_bank_keys', fn_date]) + '.txt'
|
||||
self.write({
|
||||
'ebics_public_bank_keys': base64.encodestring(public_bank_keys),
|
||||
'ebics_public_bank_keys_fn': fn,
|
||||
'state': 'to_verify',
|
||||
})
|
||||
|
||||
return True
|
||||
|
||||
def ebics_init_4(self):
|
||||
"""
|
||||
Initialization of bank keys - Step 2:
|
||||
Confirm Verification of the public bank keys
|
||||
and activate the bank keyu.
|
||||
"""
|
||||
self.ensure_one()
|
||||
if self.state != 'to_verify':
|
||||
raise UserError(
|
||||
_("Set state to 'Verification'."))
|
||||
|
||||
keyring = EbicsKeyRing(
|
||||
keys=self.ebics_keys, passphrase=self.ebics_passphrase)
|
||||
bank = EbicsBank(
|
||||
keyring=keyring, hostid=self.ebics_host, url=self.ebics_url)
|
||||
bank.activate_keys()
|
||||
return self.write({'state': 'active_keys'})
|
||||
|
||||
def change_passphrase(self):
|
||||
self.ensure_one()
|
||||
ctx = dict(self._context, default_ebics_config_id=self.id)
|
||||
module = __name__.split('addons.')[1].split('.')[0]
|
||||
view = self.env.ref(
|
||||
'%s.ebics_change_passphrase_view_form' % module)
|
||||
return {
|
||||
'name': _('EBICS keys change passphrase'),
|
||||
'view_type': 'form',
|
||||
'view_mode': 'form',
|
||||
'res_model': 'ebics.change.passphrase',
|
||||
'view_id': view.id,
|
||||
'target': 'new',
|
||||
'context': ctx,
|
||||
'type': 'ir.actions.act_window',
|
||||
}
|
Reference in New Issue
Block a user