reporting-engine/kpi/models/kpi.py

227 lines
6.5 KiB
Python
Raw Permalink Normal View History

2024-12-03 13:17:35 +00:00
# Copyright 2012 - Now Savoir-faire Linux <https://www.savoirfairelinux.com/>
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
import logging
import re
from datetime import datetime
from dateutil.relativedelta import relativedelta
from flectra import api, fields, models
from flectra.tools import DEFAULT_SERVER_DATETIME_FORMAT as DATETIME_FORMAT
from flectra.tools.safe_eval import safe_eval
_logger = logging.getLogger(__name__)
def is_one_value(result):
# check if sql query returns only one value
if isinstance(result, dict) and "value" in result.dictfetchone():
return True
elif isinstance(result, list) and "value" in result[0]:
return True
else:
return False
RE_SELECT_QUERY = re.compile(
".*("
+ "|".join(
(
"INSERT",
"UPDATE",
"DELETE",
"CREATE",
"ALTER",
"DROP",
"GRANT",
"REVOKE",
"INDEX",
)
)
+ ")"
)
def is_sql_or_ddl_statement(query):
"""Check if sql query is a SELECT statement"""
return not RE_SELECT_QUERY.match(query.upper())
class KPI(models.Model):
"""Key Performance Indicators."""
_name = "kpi"
_description = "Key Performance Indicator"
name = fields.Char(required=True)
description = fields.Text()
category_id = fields.Many2one(
"kpi.category",
"Category",
required=True,
)
threshold_id = fields.Many2one(
"kpi.threshold",
"Threshold",
required=True,
)
periodicity = fields.Integer(default=1)
periodicity_uom = fields.Selection(
[
("minute", "Minute"),
("hour", "Hour"),
("day", "Day"),
("week", "Week"),
("month", "Month"),
],
"Periodicity UoM",
required=True,
default="day",
)
next_execution_date = fields.Datetime(
"Next execution date",
)
value = fields.Float(
compute="_compute_display_last_kpi_value",
)
color = fields.Text(
compute="_compute_display_last_kpi_value",
)
last_execution = fields.Datetime(
"Last execution",
compute="_compute_display_last_kpi_value",
)
kpi_type = fields.Selection(
[
("python", "Python"),
("local", "SQL - Local DB"),
("external", "SQL - External DB"),
],
"KPI Computation Type",
)
dbsource_id = fields.Many2one(
"base.external.dbsource",
"External DB Source",
)
kpi_code = fields.Text(
"KPI Code",
help=(
"SQL code must return the result as 'value' " "(i.e. 'SELECT 5 AS value')."
),
)
history_ids = fields.One2many(
"kpi.history",
"kpi_id",
"History",
)
active = fields.Boolean(
help=(
"Only active KPIs will be updated by the scheduler based on"
" the periodicity configuration."
),
default=True,
)
company_id = fields.Many2one(
"res.company", "Company", default=lambda self: self.env.company
)
def _compute_display_last_kpi_value(self):
history_obj = self.env["kpi.history"]
for obj in self:
history_ids = history_obj.search([("kpi_id", "=", obj.id)])
if history_ids:
his = obj.history_ids[0]
obj.value = his.value
obj.color = his.color
obj.last_execution = his.date
else:
obj.value = 0
obj.color = "#FFFFFF"
obj.last_execution = False
def _get_kpi_value(self):
self.ensure_one()
kpi_value = 0
if self.kpi_code:
if self.kpi_type == "local" and is_sql_or_ddl_statement(self.kpi_code):
self.env.cr.execute(self.kpi_code)
dic = self.env.cr.dictfetchall()
if is_one_value(dic):
kpi_value = dic[0]["value"]
elif (
self.kpi_type == "external"
and self.dbsource_id.id
and is_sql_or_ddl_statement(self.kpi_code)
):
dbsrc_obj = self.dbsource_id
res = dbsrc_obj.execute(self.kpi_code)
if is_one_value(res):
kpi_value = res[0]["value"]
elif self.kpi_type == "python":
kpi_value = safe_eval(self.kpi_code, {"self": self})
if isinstance(kpi_value, dict):
res = kpi_value
else:
threshold_obj = self.threshold_id
res = {
"value": kpi_value,
"color": threshold_obj.get_color(kpi_value),
}
res.update({"kpi_id": self.id})
return res
def compute_kpi_value(self):
for obj in self:
history_vals = obj._get_kpi_value()
history_obj = self.env["kpi.history"]
history_obj.sudo().create(history_vals)
return True
def update_next_execution_date(self):
for obj in self:
if obj.periodicity_uom == "hour":
delta = relativedelta(hours=obj.periodicity)
elif obj.periodicity_uom == "minute":
delta = relativedelta(minutes=obj.periodicity)
elif obj.periodicity_uom == "day":
delta = relativedelta(days=obj.periodicity)
elif obj.periodicity_uom == "week":
delta = relativedelta(weeks=obj.periodicity)
elif obj.periodicity_uom == "month":
delta = relativedelta(months=obj.periodicity)
else:
delta = relativedelta()
new_date = datetime.now() + delta
obj.next_execution_date = new_date.strftime(DATETIME_FORMAT)
return True
# Method called by the scheduler
@api.model
def update_kpi_value(self):
filters = [
"&",
"|",
("active", "=", True),
("next_execution_date", "<=", datetime.now().strftime(DATETIME_FORMAT)),
("next_execution_date", "=", False),
]
if "filters" in self.env.context:
filters.extend(self.env.context["filters"])
obj_ids = self.search(filters)
res = None
try:
for obj in obj_ids:
obj.compute_kpi_value()
obj.update_next_execution_date()
except Exception:
_logger.exception("Failed updating KPI values")
return res