# Copyright 2012 - Now Savoir-faire Linux # License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl). import logging import re from datetime import datetime from dateutil.relativedelta import relativedelta from flectra import api, fields, models from flectra.tools import DEFAULT_SERVER_DATETIME_FORMAT as DATETIME_FORMAT from flectra.tools.safe_eval import safe_eval _logger = logging.getLogger(__name__) def is_one_value(result): # check if sql query returns only one value if isinstance(result, dict) and "value" in result.dictfetchone(): return True elif isinstance(result, list) and "value" in result[0]: return True else: return False RE_SELECT_QUERY = re.compile( ".*(" + "|".join( ( "INSERT", "UPDATE", "DELETE", "CREATE", "ALTER", "DROP", "GRANT", "REVOKE", "INDEX", ) ) + ")" ) def is_sql_or_ddl_statement(query): """Check if sql query is a SELECT statement""" return not RE_SELECT_QUERY.match(query.upper()) class KPI(models.Model): """Key Performance Indicators.""" _name = "kpi" _description = "Key Performance Indicator" name = fields.Char(required=True) description = fields.Text() category_id = fields.Many2one( "kpi.category", "Category", required=True, ) threshold_id = fields.Many2one( "kpi.threshold", "Threshold", required=True, ) periodicity = fields.Integer(default=1) periodicity_uom = fields.Selection( [ ("minute", "Minute"), ("hour", "Hour"), ("day", "Day"), ("week", "Week"), ("month", "Month"), ], "Periodicity UoM", required=True, default="day", ) next_execution_date = fields.Datetime( "Next execution date", ) value = fields.Float( compute="_compute_display_last_kpi_value", ) color = fields.Text( compute="_compute_display_last_kpi_value", ) last_execution = fields.Datetime( "Last execution", compute="_compute_display_last_kpi_value", ) kpi_type = fields.Selection( [ ("python", "Python"), ("local", "SQL - Local DB"), ("external", "SQL - External DB"), ], "KPI Computation Type", ) dbsource_id = fields.Many2one( "base.external.dbsource", "External DB Source", ) kpi_code = fields.Text( "KPI Code", help=( "SQL code must return the result as 'value' " "(i.e. 'SELECT 5 AS value')." ), ) history_ids = fields.One2many( "kpi.history", "kpi_id", "History", ) active = fields.Boolean( help=( "Only active KPIs will be updated by the scheduler based on" " the periodicity configuration." ), default=True, ) company_id = fields.Many2one( "res.company", "Company", default=lambda self: self.env.company ) def _compute_display_last_kpi_value(self): history_obj = self.env["kpi.history"] for obj in self: history_ids = history_obj.search([("kpi_id", "=", obj.id)]) if history_ids: his = obj.history_ids[0] obj.value = his.value obj.color = his.color obj.last_execution = his.date else: obj.value = 0 obj.color = "#FFFFFF" obj.last_execution = False def _get_kpi_value(self): self.ensure_one() kpi_value = 0 if self.kpi_code: if self.kpi_type == "local" and is_sql_or_ddl_statement(self.kpi_code): self.env.cr.execute(self.kpi_code) dic = self.env.cr.dictfetchall() if is_one_value(dic): kpi_value = dic[0]["value"] elif ( self.kpi_type == "external" and self.dbsource_id.id and is_sql_or_ddl_statement(self.kpi_code) ): dbsrc_obj = self.dbsource_id res = dbsrc_obj.execute(self.kpi_code) if is_one_value(res): kpi_value = res[0]["value"] elif self.kpi_type == "python": kpi_value = safe_eval(self.kpi_code, {"self": self}) if isinstance(kpi_value, dict): res = kpi_value else: threshold_obj = self.threshold_id res = { "value": kpi_value, "color": threshold_obj.get_color(kpi_value), } res.update({"kpi_id": self.id}) return res def compute_kpi_value(self): for obj in self: history_vals = obj._get_kpi_value() history_obj = self.env["kpi.history"] history_obj.sudo().create(history_vals) return True def update_next_execution_date(self): for obj in self: if obj.periodicity_uom == "hour": delta = relativedelta(hours=obj.periodicity) elif obj.periodicity_uom == "minute": delta = relativedelta(minutes=obj.periodicity) elif obj.periodicity_uom == "day": delta = relativedelta(days=obj.periodicity) elif obj.periodicity_uom == "week": delta = relativedelta(weeks=obj.periodicity) elif obj.periodicity_uom == "month": delta = relativedelta(months=obj.periodicity) else: delta = relativedelta() new_date = datetime.now() + delta obj.next_execution_date = new_date.strftime(DATETIME_FORMAT) return True # Method called by the scheduler @api.model def update_kpi_value(self): filters = [ "&", "|", ("active", "=", True), ("next_execution_date", "<=", datetime.now().strftime(DATETIME_FORMAT)), ("next_execution_date", "=", False), ] if "filters" in self.env.context: filters.extend(self.env.context["filters"]) obj_ids = self.search(filters) res = None try: for obj in obj_ids: obj.compute_kpi_value() obj.update_next_execution_date() except Exception: _logger.exception("Failed updating KPI values") return res