odoo_account_ebics/account_ebics/models/ebics_file.py

296 lines
10 KiB
Python

# -*- coding: utf-8 -*-
# Copyright 2009-2020 Noviat.
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import base64
import logging
from openerp import api, fields, models, _
from openerp.exceptions import Warning as UserError
_logger = logging.getLogger(__name__)
class EbicsFile(models.Model):
_name = 'ebics.file'
_description = 'Object to store EBICS Data Files'
_order = 'date desc'
name = fields.Char(string='Filename')
data = fields.Binary(string='File', readonly=True)
format_id = fields.Many2one(
comodel_name='ebics.file.format',
string='EBICS File Formats',
readonly=True)
type = fields.Selection(
related='format_id.type',
readonly=True)
date_from = fields.Date(
readonly=True,
help="'Date From' as entered in the download wizard.")
date_to = fields.Date(
readonly=True,
help="'Date To' as entered in the download wizard.")
date = fields.Datetime(
required=True, readonly=True,
help='File Upload/Download date')
bank_statement_ids = fields.One2many(
comodel_name='account.bank.statement',
inverse_name='ebics_file_id',
string='Generated Bank Statements', readonly=True)
state = fields.Selection(
[('draft', 'Draft'),
('done', 'Done')],
string='State',
default='draft',
required=True, readonly=True)
user_id = fields.Many2one(
comodel_name='res.users', string='User',
default=lambda self: self.env.user,
readonly=True)
note = fields.Text(string='Notes')
note_process = fields.Text(string='Notes')
company_id = fields.Many2one(
comodel_name='res.company',
string='Company',
default=lambda self: self._default_company_id())
_sql_constraints = [
('name_company_uniq', 'unique (name, company_id, format_id)',
'This File has already been imported !')
]
@api.model
def _default_company_id(self):
"""
Adapt this method in case your bank provides transactions
of multiple legal entities in a single EBICS File.
"""
return self.env.user.company_id
@api.multi
def unlink(self):
ff_methods = self._file_format_methods()
for ebics_file in self:
if ebics_file.state == 'done':
raise UserError(_(
"You can only remove EBICS files in state 'Draft'."))
# execute format specific actions
ff = ebics_file.format_id.name
if ff in ff_methods:
if ff_methods[ff].get('unlink'):
ff_methods[ff]['unlink'](ebics_file)
# remove bank statements
ebics_file.bank_statement_ids.unlink()
return super(EbicsFile, self).unlink()
@api.multi
def set_to_draft(self):
return self.write({'state': 'draft'})
@api.multi
def set_to_done(self):
return self.write({'state': 'done'})
@api.multi
def process(self):
self.ensure_one()
self.note_process = ''
ff_methods = self._file_format_methods()
ff = self.format_id.name
if ff in ff_methods:
if ff_methods[ff].get('process'):
res = ff_methods[ff]['process'](self)
self.state = 'done'
return res
else:
return self._process_undefined_format()
@api.multi
def action_open_bank_statements(self):
self.ensure_one()
action = self.env['ir.actions.act_window'].for_xml_id(
'account', 'action_bank_statement_tree')
domain = eval(action.get('domain') or '[]')
domain += [('id', 'in', self._context.get('statement_ids'))]
action.update({'domain': domain})
return action
@api.multi
def button_close(self):
self.ensure_one()
return {'type': 'ir.actions.act_window_close'}
def _file_format_methods(self):
"""
Extend this dictionary in order to add support
for extra file formats.
"""
res = {
'camt.xxx.cfonb120.stm':
{'process': self._process_cfonb120,
'unlink': self._unlink_cfonb120},
'camt.053.001.02.stm':
{'process': self._process_camt053,
'unlink': self._unlink_camt053},
}
return res
def _check_import_module(self, module):
mod = self.env['ir.module.module'].search(
[('name', '=', module),
('state', '=', 'installed')])
if not mod:
raise UserError(_(
"The module to process the '%s' format is not installed "
"on your system. "
"\nPlease install module '%s'")
% (self.format_id.name, module))
def _process_result_action(self, ctx):
module = __name__.split('addons.')[1].split('.')[0]
result_view = self.env.ref('%s.ebics_file_view_form_result' % module)
return {
'name': _('Import EBICS File'),
'res_id': self.id,
'view_type': 'form',
'view_mode': 'form',
'res_model': self._name,
'view_id': result_view.id,
'target': 'new',
'context': ctx,
'type': 'ir.actions.act_window',
}
@staticmethod
def _process_cfonb120(self):
import_module = 'account_bank_statement_import_fr_cfonb'
self._check_import_module(import_module)
wiz_model = 'account.bank.statement.import'
data_file = base64.b64decode(self.data)
lines = data_file.split('\n')
data_files = []
st_lines = ''
transactions = False
for line in lines:
rec_type = line[0:2]
acc_number = line[21:32]
st_lines += line + '\n'
if rec_type == '04':
transactions = True
if rec_type == '07':
if transactions:
data_files.append({
'acc_number': acc_number,
'data_file': base64.b64encode(st_lines),
})
st_lines = ''
transactions = False
result = {
'type': 'ir.actions.client',
'tag': 'bank_statement_reconciliation_view',
'context': {'statement_ids': [],
'notifications': []},
}
for i, data_file in enumerate(data_files, start=1):
acc_number = data_file['acc_number']
if not self.env[wiz_model]._find_bank_account_id(acc_number):
message = _(
"Error detected while importing statement number %s.\n"
) % i
message += _("No financial journal found.")
details = _(
'Bank account number: %s'
) % acc_number
result['context']['notifications'].extend([{
'type': 'warning',
'message': message,
'details': details,
}])
continue
wiz_vals = {'data_file': data_file['data_file']}
wiz = self.env[wiz_model].create(wiz_vals)
res = wiz.import_file()
ctx = res.get('context')
result['context']['statement_ids'].extend(
ctx['statement_ids'])
result['context']['notifications'].extend(
ctx['notifications'])
if result.get('context'):
notifications = result['context'].get('notifications', [])
statement_ids = result['context'].get('statement_ids', [])
if notifications:
for notif in notifications:
parts = []
for k in ['type', 'message', 'details']:
if notif.get(k):
msg = '%s: %s' % (k, notif[k])
parts.append(msg)
self.note_process += '\n'.join(parts)
self.note_process += '\n'
self.note_process += '\n'
self.note_process += _(
"Number of Bank Statements: %s"
) % len(statement_ids)
if statement_ids:
self.bank_statement_ids = [(6, 0, statement_ids)]
return self._process_result_action(
dict(self.env.context, statement_ids=statement_ids))
@staticmethod
def _unlink_cfonb120(self):
"""
Placeholder for cfonb120 specific actions before removing the
EBICS data file and its related bank statements.
"""
pass
@staticmethod
def _process_camt053(self):
import_module = 'account_bank_statement_import_camt'
self._check_import_module(import_module)
wiz_model = 'account.bank.statement.import'
wiz_vals = {
'data_file': self.data,
# 'filename': self.name, # V10 field
}
wiz = self.env[wiz_model].create(wiz_vals)
res = wiz.import_file()
notifications = []
statement_ids = []
if res.get('context'):
notifications = res['context'].get('notifications', [])
statement_ids = res['context'].get('statement_ids', [])
if notifications:
for notif in notifications:
parts = []
for k in ['type', 'message', 'details']:
if notif.get(k):
msg = '%s: %s' % (k, notif[k])
parts.append(msg)
self.note_process += '\n'.join(parts)
self.note_process += '\n'
self.note_process += '\n'
self.note_process += _(
"Number of Bank Statements: %s"
) % len(statement_ids)
if statement_ids:
self.bank_statement_ids = [(6, 0, statement_ids)]
ctx = dict(self._context, statement_ids=statement_ids)
return self._process_result_action(ctx)
@staticmethod
def _unlink_camt053(self):
"""
Placeholder for camt053 specific actions before removing the
EBICS data file and its related bank statements.
"""
pass
def _process_undefined_format(self):
raise UserError(_(
"The current version of the 'account_ebics' module "
"has no support to automatically process EBICS files "
"with format %s."
) % self.format_id.name)