# Copyright 2009-2022 Noviat. # License LGPL-3 or later (http://www.gnu.org/licenses/lpgl). import base64 import logging from odoo import _, fields, models from odoo.exceptions import UserError from odoo.tools.safe_eval import safe_eval _logger = logging.getLogger(__name__) class EbicsFile(models.Model): _name = "ebics.file" _description = "Object to store EBICS Data Files" _order = "date desc" _sql_constraints = [ ( "name_uniq", "unique (name, format_id)", "This File has already been down- or uploaded !", ) ] name = fields.Char(string="Filename") data = fields.Binary(string="File", readonly=True) format_id = fields.Many2one( comodel_name="ebics.file.format", string="EBICS File Formats", readonly=True ) type = fields.Selection(related="format_id.type", readonly=True) date_from = fields.Date( readonly=True, help="'Date From' as entered in the download wizard." ) date_to = fields.Date( readonly=True, help="'Date To' as entered in the download wizard." ) date = fields.Datetime( required=True, readonly=True, help="File Upload/Download date" ) bank_statement_ids = fields.One2many( comodel_name="account.bank.statement", inverse_name="ebics_file_id", string="Generated Bank Statements", readonly=True, ) state = fields.Selection( [("draft", "Draft"), ("done", "Done")], string="State", default="draft", required=True, readonly=True, ) user_id = fields.Many2one( comodel_name="res.users", string="User", default=lambda self: self.env.user, readonly=True, ) ebics_userid_id = fields.Many2one( comodel_name="ebics.userid", string="EBICS UserID", ondelete="restrict", readonly=True, ) note = fields.Text(string="Notes") note_process = fields.Text(string="Notes") company_ids = fields.Many2many( comodel_name="res.company", string="Companies", help="Companies sharing this EBICS file.", ) def unlink(self): ff_methods = self._file_format_methods() for ebics_file in self: if ebics_file.state == "done": raise UserError(_("You can only remove EBICS files in state 'Draft'.")) # execute format specific actions ff = ebics_file.format_id.download_process_method if ff in ff_methods: if ff_methods[ff].get("unlink"): ff_methods[ff]["unlink"](ebics_file) # remove bank statements ebics_file.bank_statement_ids.unlink() return super(EbicsFile, self).unlink() def set_to_draft(self): return self.write({"state": "draft"}) def set_to_done(self): return self.write({"state": "done"}) def process(self): self.ensure_one() ctx = dict(self.env.context, allowed_company_ids=self.env.user.company_ids.ids) self = self.with_context(ctx) self.note_process = "" ff_methods = self._file_format_methods() ff = self.format_id.download_process_method if ff in ff_methods: if ff_methods[ff].get("process"): res = ff_methods[ff]["process"](self) self.state = "done" return res else: return self._process_undefined_format() def action_open_bank_statements(self): self.ensure_one() action = self.env["ir.actions.act_window"]._for_xml_id( "account.action_bank_statement_tree" ) domain = safe_eval(action.get("domain") or "[]") domain += [("id", "in", self._context.get("statement_ids"))] action.update({"domain": domain}) return action def button_close(self): self.ensure_one() return {"type": "ir.actions.act_window_close"} def _file_format_methods(self): """ Extend this dictionary in order to add support for extra file formats. """ res = { "cfonb120": { "process": self._process_cfonb120, "unlink": self._unlink_cfonb120, }, "camt.052": { "process": self._process_camt052, "unlink": self._unlink_camt052, }, "camt.053": { "process": self._process_camt053, "unlink": self._unlink_camt053, }, "camt.054": { "process": self._process_camt054, "unlink": self._unlink_camt054, }, "pain.002": { "process": self._process_pain002, "unlink": self._unlink_pain002, }, } return res def _check_import_module(self, module, raise_if_not_found=True): mod = ( self.env["ir.module.module"] .sudo() .search([("name", "=like", module), ("state", "=", "installed")]) ) if not mod: if raise_if_not_found: raise UserError( _( "The module to process the '%s' format is not installed " "on your system. " "\nPlease install module '%s'" ) % (self.format_id.name, module) ) return False return True def _process_result_action(self, res): notifications = [] st_line_ids = [] statement_ids = [] if res.get("context"): notifications = res["context"].get("notifications", []) st_line_ids = res["context"].get("statement_line_ids", []) if notifications: for notif in notifications: parts = [] for k in ["type", "message", "details"]: if notif.get(k): msg = "{}: {}".format(k, notif[k]) parts.append(msg) self.note_process += "\n".join(parts) self.note_process += "\n" self.note_process += "\n" if st_line_ids: self.flush() self.env.cr.execute( """ SELECT DISTINCT absl.statement_id, abs.name, abs.date, abs.company_id, rc.name AS company_name FROM account_bank_statement_line absl INNER JOIN account_bank_statement abs ON abs.id = absl.statement_id INNER JOIN res_company rc ON rc.id = abs.company_id WHERE absl.id IN %s ORDER BY date, company_id """, (tuple(st_line_ids),), ) sts_data = self.env.cr.dictfetchall() else: sts_data = [] st_cnt = len(sts_data) if st_cnt: self.note_process += _("%s bank statements have been imported: ") % st_cnt self.note_process += "\n" for st_data in sts_data: self.note_process += ("\n%s, %s (%s)") % ( st_data["date"], st_data["name"], st_data["company_name"], ) statement_ids = [x["statement_id"] for x in sts_data] if statement_ids: self.sudo().bank_statement_ids = [(6, 0, statement_ids)] ctx = dict(self.env.context, statement_ids=statement_ids) module = __name__.split("addons.")[1].split(".")[0] result_view = self.env.ref("%s.ebics_file_view_form_result" % module) return { "name": _("Import EBICS File"), "res_id": self.id, "view_type": "form", "view_mode": "form", "res_model": self._name, "view_id": result_view.id, "target": "new", "context": ctx, "type": "ir.actions.act_window", } @staticmethod def _process_cfonb120(self): """ We do not support the standard _journal_creation_wizard since a single cfonb120 file may contain statements from different legal entities. """ import_module = "account_statement_import_fr_cfonb" self._check_import_module(import_module) wiz_model = "account.statement.import" data_file = base64.b64decode(self.data) lines = data_file.split(b"\n") wiz_vals_list = [] st_lines = b"" transactions = False for line in lines: rec_type = line[0:2] acc_number = line[21:32] st_lines += line + b"\n" if rec_type == b"04": transactions = True if rec_type == b"07": if transactions: fn = "_".join([acc_number.decode(), self.name]) wiz_vals_list.append( { "statement_filename": fn, "statement_file": base64.b64encode(st_lines), } ) st_lines = b"" transactions = False result = { "type": "ir.actions.client", "tag": "bank_statement_reconciliation_view", "context": { "statement_line_ids": [], "company_ids": self.env.user.company_ids.ids, "notifications": [], }, } wiz_ctx = dict(self.env.context, active_model="ebics.file") for i, wiz_vals in enumerate(wiz_vals_list, start=1): wiz = self.env[wiz_model].with_context(wiz_ctx).create(wiz_vals) res = wiz.import_file_button() ctx = res.get("context") if res.get("res_model") == "account.bank.statement.import.journal.creation": message = _("Error detected while importing statement number %s.\n") % i message += _("No financial journal found.") details = _("Bank account number: %s") % ctx.get( "default_bank_acc_number" ) result["context"]["notifications"].extend( [ { "type": "warning", "message": message, "details": details, } ] ) continue result["context"]["statement_line_ids"].extend(ctx["statement_line_ids"]) result["context"]["notifications"].extend(ctx["notifications"]) return self._process_result_action(result) @staticmethod def _unlink_cfonb120(self): """ Placeholder for cfonb120 specific actions before removing the EBICS data file and its related bank statements. """ @staticmethod def _process_camt052(self): import_module = "account_statement_import_camt" self._check_import_module(import_module) return self._process_camt053(self) @staticmethod def _unlink_camt052(self): """ Placeholder for camt052 specific actions before removing the EBICS data file and its related bank statements. """ @staticmethod def _process_camt054(self): import_module = "account_statement_import_camt" self._check_import_module(import_module) return self._process_camt053(self) @staticmethod def _unlink_camt054(self): """ Placeholder for camt054 specific actions before removing the EBICS data file and its related bank statements. """ @staticmethod def _process_camt053(self): modules = [ ("oca", "account_statement_import_camt"), ("oe", "account_bank_statement_import_camt"), ] found = False for _src, mod in modules: if self._check_import_module(mod, raise_if_not_found=False): found = True break if not found: raise UserError( _( "The module to process the '%s' format is not installed " "on your system. " "\nPlease install one of the following modules: \n%s." ) % (self.format_id.name, ", ".join([x[1] for x in modules])) ) if _src == "oca": self._process_camt053_oca() else: self._process_camt053_oe() def _process_camt053_oca(self): wiz_model = "account.statement.import" wiz_vals = { "statement_filename": self.name, "statement_file": self.data, } result = { "type": "ir.actions.client", "tag": "bank_statement_reconciliation_view", "context": { "statement_line_ids": [], "company_ids": self.env.user.company_ids.ids, "notifications": [], }, } wiz_ctx = dict(self.env.context, active_model="ebics.file") wiz = self.env[wiz_model].with_context(wiz_ctx).create(wiz_vals) res = wiz.import_file_button() ctx = res.get("context") if res.get("res_model") == "account.bank.statement.import.journal.creation": message = _("Error detected while importing statement %s.\n") % self.name message += _("No financial journal found.") details = _("Bank account number: %s") % ctx.get("default_bank_acc_number") result["context"]["notifications"].extend( [ { "type": "warning", "message": message, "details": details, } ] ) result["context"]["statement_line_ids"].extend(ctx["statement_line_ids"]) result["context"]["notifications"].extend(ctx["notifications"]) return self._process_result_action(result) def _process_camt053_oe(self): wiz_model = "account.bank.statement.import" wiz_vals = { "attachment_ids": [ ( 0, 0, {"name": self.name, "datas": self.data, "store_fname": self.name}, ) ] } ctx = dict(self.env.context, active_model="ebics.file") wiz = self.env[wiz_model].with_context(ctx).create(wiz_vals) res = wiz.import_file() if res.get("res_model") == "account.bank.statement.import.journal.creation": if res.get("context"): bank_account = res["context"].get("default_bank_acc_number") raise UserError( _("No financial journal found for Company Bank Account %s") % bank_account ) return self._process_result_action(res) @staticmethod def _unlink_camt053(self): """ Placeholder for camt053 specific actions before removing the EBICS data file and its related bank statements. """ @staticmethod def _process_pain002(self): """ Placeholder for processing pain.002 files. TODO: add import logic based upon OCA 'account_payment_return_import' """ @staticmethod def _unlink_pain002(self): """ Placeholder for pain.002 specific actions before removing the EBICS data file. """ raise NotImplementedError def _process_undefined_format(self): raise UserError( _( "The current version of the 'account_ebics' module " "has no support to automatically process EBICS files " "with format %s." ) % self.format_id.name )