mirror of
https://gitlab.com/flectra-community/reporting-engine.git
synced 2024-11-22 22:22:05 +00:00
261 lines
8.4 KiB
Python
261 lines
8.4 KiB
Python
# Copyright 2020 Creu Blanca
|
|
# License AGPL-3.0 or later (http://www.gnu.org/licenses/agpl).
|
|
|
|
import ast
|
|
import json
|
|
import re
|
|
from datetime import date, datetime, time
|
|
|
|
from dateutil import relativedelta
|
|
|
|
from flectra import _, api, fields, models
|
|
from flectra.exceptions import ValidationError
|
|
from flectra.tools.float_utils import float_compare
|
|
from flectra.tools.safe_eval import safe_eval
|
|
|
|
from flectra.addons.base.models.ir_cron import _intervalTypes
|
|
|
|
|
|
class KpiKpi(models.Model):
|
|
_name = "kpi.kpi"
|
|
_description = "Kpi Kpi"
|
|
|
|
name = fields.Char(required=True)
|
|
active = fields.Boolean(default=True)
|
|
cron_id = fields.Many2one("ir.cron", readonly=True, copy=False)
|
|
computation_method = fields.Selection(
|
|
[("function", "Function"), ("code", "Code")],
|
|
required=True,
|
|
default="code",
|
|
)
|
|
value = fields.Serialized()
|
|
dashboard_item_ids = fields.One2many("kpi.dashboard.item", inverse_name="kpi_id")
|
|
model_id = fields.Many2one(
|
|
"ir.model",
|
|
)
|
|
function = fields.Char()
|
|
args = fields.Char()
|
|
kwargs = fields.Char()
|
|
widget = fields.Selection(
|
|
[
|
|
("integer", "Integer"),
|
|
("number", "Number"),
|
|
("meter", "Meter"),
|
|
("counter", "Counter"),
|
|
("graph", "Graph"),
|
|
],
|
|
required=True,
|
|
default="number",
|
|
)
|
|
value_last_update = fields.Datetime(readonly=True)
|
|
prefix = fields.Char()
|
|
suffix = fields.Char()
|
|
action_ids = fields.One2many(
|
|
"kpi.kpi.action",
|
|
inverse_name="kpi_id",
|
|
help="Actions that can be opened from the KPI",
|
|
)
|
|
code = fields.Text("Code")
|
|
store_history = fields.Boolean()
|
|
store_history_interval = fields.Selection(
|
|
selection=lambda self: self.env["ir.cron"]._fields["interval_type"].selection,
|
|
)
|
|
store_history_interval_number = fields.Integer()
|
|
compute_on_fly = fields.Boolean()
|
|
history_ids = fields.One2many("kpi.kpi.history", inverse_name="kpi_id")
|
|
computed_value = fields.Serialized(compute="_compute_computed_value")
|
|
computed_date = fields.Datetime(compute="_compute_computed_value")
|
|
|
|
@api.depends("value", "value_last_update", "compute_on_fly")
|
|
def _compute_computed_value(self):
|
|
for record in self:
|
|
if record.compute_on_fly:
|
|
record.computed_value = record._compute_value()
|
|
record.computed_date = fields.Datetime.now()
|
|
else:
|
|
record.computed_value = record.value
|
|
record.computed_date = record.value_last_update
|
|
|
|
def _cron_vals(self):
|
|
return {
|
|
"name": self.name,
|
|
"model_id": self.env.ref("kpi_dashboard.model_kpi_kpi").id,
|
|
"interval_number": 1,
|
|
"interval_type": "hours",
|
|
"state": "code",
|
|
"code": "model.browse(%s).compute()" % self.id,
|
|
"active": True,
|
|
}
|
|
|
|
def compute(self):
|
|
for record in self:
|
|
record._compute()
|
|
return True
|
|
|
|
def _generate_history_vals(self, value):
|
|
return {
|
|
"kpi_id": self.id,
|
|
"value": value,
|
|
"widget": self.widget,
|
|
}
|
|
|
|
def _compute_value(self):
|
|
return getattr(self, "_compute_value_%s" % self.computation_method)()
|
|
|
|
def _compute(self):
|
|
value = self._compute_value()
|
|
self.write({"value": value})
|
|
if self.store_history:
|
|
last = self.env["kpi.kpi.history"].search(
|
|
[("kpi_id", "=", self.id)], limit=1
|
|
)
|
|
if (
|
|
not last
|
|
or not self.store_history_interval
|
|
or last.create_date
|
|
+ _intervalTypes[self.store_history_interval](
|
|
self.store_history_interval_number
|
|
)
|
|
< fields.Datetime.now()
|
|
):
|
|
self.env["kpi.kpi.history"].create(self._generate_history_vals(value))
|
|
notifications = []
|
|
for dashboard_item in self.dashboard_item_ids:
|
|
channel = "kpi_dashboard_%s" % dashboard_item.dashboard_id.id
|
|
notifications.append([channel, dashboard_item._read_dashboard()])
|
|
if notifications:
|
|
self.env["bus.bus"].sendmany(notifications)
|
|
|
|
def _compute_value_function(self):
|
|
obj = self
|
|
if self.model_id:
|
|
obj = self.env[self.model_id.model]
|
|
args = ast.literal_eval(self.args or "[]")
|
|
kwargs = ast.literal_eval(self.kwargs or "{}")
|
|
return getattr(obj, self.function)(*args, **kwargs)
|
|
|
|
def generate_cron(self):
|
|
self.ensure_one()
|
|
self.cron_id = self.env["ir.cron"].create(self._cron_vals())
|
|
|
|
def write(self, vals):
|
|
if "value" in vals:
|
|
vals["value_last_update"] = fields.Datetime.now()
|
|
return super().write(vals)
|
|
|
|
def _get_code_input_dict(self):
|
|
return {
|
|
"self": self,
|
|
"model": self.browse(),
|
|
"datetime": datetime,
|
|
"date": date,
|
|
"time": time,
|
|
"float_compare": float_compare,
|
|
"relativedelta": relativedelta.relativedelta,
|
|
}
|
|
|
|
def _forbidden_code(self):
|
|
return ["commit", "rollback", "getattr", "execute"]
|
|
|
|
def _compute_value_code(self):
|
|
forbidden = self._forbidden_code()
|
|
search_terms = "(" + ("|".join(forbidden)) + ")"
|
|
if re.search(search_terms, (self.code or "").lower()):
|
|
message = ", ".join(forbidden[:-1]) or ""
|
|
if len(message) > 0:
|
|
message += _(" or ")
|
|
message += forbidden[-1]
|
|
raise ValidationError(
|
|
_("The code cannot contain the following terms: %s.") % message
|
|
)
|
|
results = self._get_code_input_dict()
|
|
savepoint = "kpi_formula_%s" % self.id
|
|
# pylint: disable=E8103
|
|
self.env.cr.execute("savepoint %s" % savepoint)
|
|
safe_eval(self.code or "", results, mode="exec", nocopy=True)
|
|
# pylint: disable=E8103
|
|
self.env.cr.execute("rollback to %s" % savepoint)
|
|
return results.get("result", {})
|
|
|
|
def show_value(self):
|
|
self.ensure_one()
|
|
action = self.env.ref("kpi_dashboard.kpi_kpi_act_window")
|
|
result = action.read()[0]
|
|
result.update(
|
|
{
|
|
"res_id": self.id,
|
|
"target": "new",
|
|
"view_mode": "form",
|
|
"views": [
|
|
(self.env.ref("kpi_dashboard.kpi_kpi_widget_form_view").id, "form")
|
|
],
|
|
}
|
|
)
|
|
return result
|
|
|
|
|
|
class KpiKpiAction(models.Model):
|
|
_name = "kpi.kpi.action"
|
|
_description = "KPI action"
|
|
|
|
kpi_id = fields.Many2one("kpi.kpi", required=True, ondelete="cascade")
|
|
action = fields.Reference(
|
|
selection=[
|
|
("ir.actions.report", "ir.actions.report"),
|
|
("ir.actions.act_window", "ir.actions.act_window"),
|
|
("ir.actions.act_url", "ir.actions.act_url"),
|
|
("ir.actions.server", "ir.actions.server"),
|
|
("ir.actions.client", "ir.actions.client"),
|
|
],
|
|
required=True,
|
|
)
|
|
context = fields.Char()
|
|
|
|
def read_dashboard(self):
|
|
result = {}
|
|
for r in self:
|
|
result[r.id] = {
|
|
"id": r.action.id,
|
|
"type": r.action._name,
|
|
"name": r.action.name,
|
|
"context": safe_eval(r.context or "{}"),
|
|
}
|
|
return result
|
|
|
|
|
|
class KpiKpiHistory(models.Model):
|
|
_name = "kpi.kpi.history"
|
|
_description = "KPI history"
|
|
_order = "create_date DESC"
|
|
|
|
kpi_id = fields.Many2one(
|
|
"kpi.kpi", required=True, ondelete="cascade", readonly=True
|
|
)
|
|
value = fields.Serialized(readonly=True)
|
|
raw_value = fields.Char(compute="_compute_raw_value")
|
|
name = fields.Char(related="kpi_id.name")
|
|
widget = fields.Selection(
|
|
selection=lambda self: self.env["kpi.kpi"]._fields["widget"].selection,
|
|
required=True,
|
|
default="number",
|
|
)
|
|
|
|
@api.depends("value")
|
|
def _compute_raw_value(self):
|
|
for record in self:
|
|
record.raw_value = json.dumps(record.value)
|
|
|
|
def show_form(self):
|
|
self.ensure_one()
|
|
action = self.env.ref("kpi_dashboard.kpi_kpi_history_act_window")
|
|
result = action.read()[0]
|
|
result.update(
|
|
{
|
|
"res_id": self.id,
|
|
"target": "new",
|
|
"view_mode": "form",
|
|
"views": [(self.env.context.get("form_id"), "form")],
|
|
}
|
|
)
|
|
return result
|