# Copyright (C) 2017 - Today: GRAP (http://www.grap.coop) # @author: Sylvain LE GAL (https://twitter.com/legalsylvain) # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl.html). import logging from datetime import datetime, timedelta from psycopg2 import ProgrammingError from flectra import SUPERUSER_ID, _, api, fields, models from flectra.exceptions import UserError, ValidationError from flectra.tools import sql, table_columns from flectra.tools.safe_eval import safe_eval _logger = logging.getLogger(__name__) class BiSQLView(models.Model): _name = "bi.sql.view" _description = "BI SQL View" _order = "sequence" _inherit = ["sql.request.mixin"] _sql_prefix = "x_bi_sql_view_" _model_prefix = "x_bi_sql_view." _sql_request_groups_relation = "bi_sql_view_groups_rel" _sql_request_users_relation = "bi_sql_view_users_rel" _STATE_SQL_EDITOR = [ ("model_valid", "SQL View and Model Created"), ("ui_valid", "Views, Action and Menu Created"), ] technical_name = fields.Char( required=True, help="Suffix of the SQL view. SQL full name will be computed and" " prefixed by 'x_bi_sql_view_'. Syntax should follow: " "https://www.postgresql.org/" "docs/current/static/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS", ) view_name = fields.Char( compute="_compute_view_name", readonly=True, store=True, help="Full name of the SQL view", ) model_name = fields.Char( compute="_compute_model_name", readonly=True, store=True, help="Full Qualified Name of the transient model that will" " be created.", ) is_materialized = fields.Boolean( string="Is Materialized View", default=True, ) materialized_text = fields.Char(compute="_compute_materialized_text", store=True) size = fields.Char( string="Database Size", readonly=True, help="Size of the materialized view and its indexes", ) state = fields.Selection(selection_add=_STATE_SQL_EDITOR) view_order = fields.Char( required=True, default="pivot,graph,tree", help="Comma-separated text. Possible values:" ' "graph", "pivot", "tree" or "form"', ) query = fields.Text( help="SQL Request that will be inserted as the view. Take care to :\n" " * set a name for all your selected fields, specially if you use" " SQL function (like EXTRACT, ...);\n" " * Do not use 'SELECT *' or 'SELECT table.*';\n" " * prefix the name of the selectable columns by 'x_';", default="SELECT\n" " my_field as x_my_field\n" "FROM my_table", ) domain_force = fields.Text( string="Extra Rule Definition", default="[]", help="Define here access restriction to data.\n" " Take care to use field name prefixed by 'x_'." " A global 'ir.rule' will be created." " A typical Multi Company rule is for exemple \n" " ['|', ('x_company_id','child_of', [user.company_id.id])," "('x_company_id','=',False)].", ) computed_action_context = fields.Text(compute="_compute_computed_action_context") action_context = fields.Text( default="{}", help="Define here a context that will be used" " by default, when creating the action.", ) bi_sql_view_field_ids = fields.One2many( string="SQL Fields", comodel_name="bi.sql.view.field", inverse_name="bi_sql_view_id", ) model_id = fields.Many2one( string="Flectra Model", comodel_name="ir.model", readonly=True ) # UI related fields # 1. Editable fields, which can be set by the user (optional) before # creating the UI elements @api.model def _default_parent_menu_id(self): return self.env.ref("bi_sql_editor.menu_bi_sql_editor") parent_menu_id = fields.Many2one( string="Parent Flectra Menu", comodel_name="ir.ui.menu", required=True, default=lambda self: self._default_parent_menu_id(), help="By assigning a value to this field before manually creating the " "UI, you're overwriting the parent menu on which the menu related to " "the SQL report will be created.", ) # 2. Readonly fields, non editable by the user form_view_id = fields.Many2one( string="Flectra Form View", comodel_name="ir.ui.view", readonly=True ) tree_view_id = fields.Many2one( string="Flectra Tree View", comodel_name="ir.ui.view", readonly=True ) graph_view_id = fields.Many2one( string="Flectra Graph View", comodel_name="ir.ui.view", readonly=True ) pivot_view_id = fields.Many2one( string="Flectra Pivot View", comodel_name="ir.ui.view", readonly=True ) search_view_id = fields.Many2one( string="Flectra Search View", comodel_name="ir.ui.view", readonly=True ) action_id = fields.Many2one( string="Flectra Action", comodel_name="ir.actions.act_window", readonly=True ) menu_id = fields.Many2one( string="Flectra Menu", comodel_name="ir.ui.menu", readonly=True ) cron_id = fields.Many2one( string="Flectra Cron", comodel_name="ir.cron", readonly=True, help="Cron Task that will refresh the materialized view", ondelete="cascade", ) rule_id = fields.Many2one(string="Flectra Rule", comodel_name="ir.rule", readonly=True) sequence = fields.Integer(string="sequence") # Constrains Section @api.constrains("is_materialized") def _check_index_materialized(self): for rec in self.filtered(lambda x: not x.is_materialized): if rec.bi_sql_view_field_ids.filtered(lambda x: x.is_index): raise UserError( _("You can not create indexes on non materialized views") ) @api.constrains("view_order") def _check_view_order(self): for rec in self: if rec.view_order: for vtype in rec.view_order.split(","): if vtype not in ("graph", "pivot", "tree", "form"): raise UserError( _("Only graph, pivot, tree or form views are supported") ) # Compute Section @api.depends("bi_sql_view_field_ids.graph_type") def _compute_computed_action_context(self): for rec in self: action = { "pivot_measures": [], "pivot_row_groupby": [], "pivot_column_groupby": [], } for field in rec.bi_sql_view_field_ids.filtered( lambda x: x.graph_type == "measure" ): action["pivot_measures"].append(field.name) # If no measure are defined, we display by default the count # of the element, to avoid an empty view if not action["pivot_measures"]: action["pivot_measures"] = ["__count__"] for field in rec.bi_sql_view_field_ids.filtered( lambda x: x.graph_type == "row" ): action["pivot_row_groupby"].append(field.name) for field in rec.bi_sql_view_field_ids.filtered( lambda x: x.graph_type == "col" ): action["pivot_column_groupby"].append(field.name) rec.computed_action_context = str(action) @api.depends("is_materialized") def _compute_materialized_text(self): for sql_view in self: sql_view.materialized_text = ( sql_view.is_materialized and "MATERIALIZED" or "" ) @api.depends("technical_name") def _compute_view_name(self): for sql_view in self: sql_view.view_name = f"{sql_view._sql_prefix}{sql_view.technical_name}" @api.depends("technical_name") def _compute_model_name(self): for sql_view in self: sql_view.model_name = f"{sql_view._model_prefix}{sql_view.technical_name}" # Overload Section def write(self, vals): res = super().write(vals) if vals.get("sequence", False): for rec in self.filtered(lambda x: x.menu_id): rec.menu_id.sequence = rec.sequence return res def unlink(self): if any(view.state not in ("draft", "sql_valid") for view in self): raise UserError( _( "You can only unlink draft views." "If you want to delete them, first set them to draft." ) ) return super().unlink() def copy(self, default=None): self.ensure_one() default = dict(default or {}) if "name" not in default: default["name"] = _("%s (Copy)") % self.name if "technical_name" not in default: default["technical_name"] = f"{self.technical_name}_copy" return super().copy(default=default) # Action Section def button_create_sql_view_and_model(self): for sql_view in self.filtered(lambda x: x.state == "sql_valid"): # Check if many2one fields are correctly bad_fields = sql_view.bi_sql_view_field_ids.filtered( lambda x: x.ttype == "many2one" and not x.many2one_model_id.id ) if bad_fields: raise ValidationError( _("Please set related models on the following fields %s") % ",".join(bad_fields.mapped("name")) ) # Create ORM and access sql_view._create_model_and_fields() sql_view._create_model_access() # Create SQL View and indexes sql_view._create_view() sql_view._create_index() if sql_view.is_materialized: if not sql_view.cron_id: sql_view.cron_id = ( self.env["ir.cron"].create(sql_view._prepare_cron()).id ) else: sql_view.cron_id.active = True sql_view.state = "model_valid" def button_reset_to_model_valid(self): views = self.filtered(lambda x: x.state == "ui_valid") views.mapped("form_view_id").unlink() views.mapped("tree_view_id").unlink() views.mapped("graph_view_id").unlink() views.mapped("pivot_view_id").unlink() views.mapped("search_view_id").unlink() views.mapped("action_id").unlink() views.mapped("menu_id").unlink() return views.write({"state": "model_valid"}) def button_reset_to_sql_valid(self): self.button_reset_to_model_valid() views = self.filtered(lambda x: x.state == "model_valid") for sql_view in views: # Drop SQL View (and indexes by cascade) if sql_view.is_materialized: sql_view._drop_view() if sql_view.cron_id: sql_view.cron_id.active = False # Drop ORM sql_view._drop_model_and_fields() return views.write({"state": "sql_valid"}) def button_set_draft(self): self.button_reset_to_model_valid() self.button_reset_to_sql_valid() return super().button_set_draft() def button_create_ui(self): self.form_view_id = self.env["ir.ui.view"].create(self._prepare_form_view()).id self.tree_view_id = self.env["ir.ui.view"].create(self._prepare_tree_view()).id self.graph_view_id = ( self.env["ir.ui.view"].create(self._prepare_graph_view()).id ) self.pivot_view_id = ( self.env["ir.ui.view"].create(self._prepare_pivot_view()).id ) self.search_view_id = ( self.env["ir.ui.view"].create(self._prepare_search_view()).id ) self.action_id = ( self.env["ir.actions.act_window"].create(self._prepare_action()).id ) self.menu_id = self.env["ir.ui.menu"].create(self._prepare_menu()).id self.write({"state": "ui_valid"}) def button_update_model_access(self): self._drop_model_access() self._create_model_access() self.write({"has_group_changed": False}) def button_refresh_materialized_view(self): self._refresh_materialized_view() def button_open_view(self): return { "type": "ir.actions.act_window", "res_model": self.model_id.model, "search_view_id": self.search_view_id.id, "view_mode": self.action_id.view_mode, } # Prepare Function def _prepare_model(self): self.ensure_one() field_id = [] for field in self.bi_sql_view_field_ids.filtered( lambda x: x.field_description is not False ): field_id.append([0, False, field._prepare_model_field()]) return { "name": self.name, "model": self.model_name, "access_ids": [], "field_id": field_id, } def _prepare_model_access(self): self.ensure_one() res = [] for group in self.group_ids: res.append( { "name": _("%(model_name)s Access %(full_name)s") % {"model_name": self.model_name, "full_name": group.full_name}, "model_id": self.model_id.id, "group_id": group.id, "perm_read": True, "perm_create": False, "perm_write": False, "perm_unlink": False, } ) return res def _prepare_cron(self): now = datetime.now() return { "name": _("Refresh Materialized View %s") % self.view_name, "user_id": SUPERUSER_ID, "model_id": self.env["ir.model"] .search([("model", "=", self._name)], limit=1) .id, "state": "code", "code": "model._refresh_materialized_view_cron(%s)" % self.ids, "numbercall": -1, "interval_number": 1, "interval_type": "days", "nextcall": now + timedelta(days=1), "active": True, } def _prepare_rule(self): self.ensure_one() return { "name": _("Access %s") % self.name, "model_id": self.model_id.id, "domain_force": self.domain_force, "global": True, } def _prepare_form_view(self): self.ensure_one() return { "name": self.name, "type": "form", "model": self.model_id.model, "arch": """""" """
""".format( "".join([x._prepare_form_field() for x in self.bi_sql_view_field_ids]) ), } def _prepare_tree_view(self): self.ensure_one() return { "name": self.name, "type": "tree", "model": self.model_id.model, "arch": """""" """