Files
Gestion_sondes/app/domo91.py
2026-05-22 13:36:00 +02:00

1162 lines
46 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# -*- coding: utf-8 -*-
import os
import random
import traceback
from datetime import datetime, date, time
from contextlib import closing
import bcrypt
import matplotlib.dates as mdates
import matplotlib.pyplot as plt
import mysql.connector
import pandas as pd
import numpy as np
pd.set_option("future.no_silent_downcasting", True)
import streamlit as st
from PIL import Image
from dotenv import find_dotenv, load_dotenv
from fpdf import FPDF
from streamlit_autorefresh import st_autorefresh
# =========================================================
# Config de page
# =========================================================
st.set_page_config(page_title="Domo91 - Surveillance", layout="wide")
BASE_DIR = os.path.dirname(os.path.abspath(__file__))
logo_path = os.path.join(BASE_DIR, "assets", "Logo.png")
logo = Image.open(logo_path)
st.sidebar.image(logo, use_container_width=True)
st.title("📊 Domo91 - Surveillance des sondes")
st.write("Bienvenue sur lapplication de supervision.")
# =========================================================
# ENV & DB
# =========================================================
env_file = find_dotenv(usecwd=True)
if env_file:
load_dotenv(env_file)
db_config = {
"host": os.getenv("DB_HOST"),
"user": os.getenv("DB_USER"),
"password": os.getenv("DB_PASS"),
"database": os.getenv("DB_NAME"),
"autocommit": False,
"consume_results": True,
}
# Roissy n'existe pas actuellement => on garde Saclay / Meudon
SITES_AUTORISES = {"Saclay", "Meudon"} # anti-injection sur noms de tables
SITES_LISTE = sorted(SITES_AUTORISES)
def get_connection():
return mysql.connector.connect(**db_config)
def assert_site_ok(site_name: str):
if site_name not in SITES_AUTORISES:
raise ValueError(f"Site invalide: {site_name}")
def get_chambres_froides_actives(conn, site: str):
"""Retourne la liste des chambres froides actives (Etat=ON) pour un site, avec leur Temp_Max."""
assert_site_ok(site)
with closing(conn.cursor(dictionary=True)) as cur:
cur.execute(
"""
SELECT Sonde, Temp_Max
FROM Sondes.Chambres_froides
WHERE Lieu = %s
AND UPPER(Etat) = 'ON'
ORDER BY Sonde
""",
(site,),
)
return cur.fetchall()
# =========================================================
# Session state
# =========================================================
for key, default in {
"authenticated": False,
"role": None,
"site_autorise": None,
"onglet_actif": "Accueil",
"selected_date": date.today(),
"selected_site": "Saclay",
"selected_periode": "Toute la journée",
}.items():
st.session_state.setdefault(key, default)
# =========================================================
# Sécurité mots de passe
# =========================================================
def hash_password(plain_password: str) -> str:
return bcrypt.hashpw(plain_password.encode("utf-8"), bcrypt.gensalt()).decode("utf-8")
def verifier_password(input_password: str, hash_en_base: str) -> bool:
return bcrypt.checkpw(input_password.encode("utf-8"), hash_en_base.encode("utf-8"))
# =========================================================
# Gyro: lecture + badge
# =========================================================
def fetch_gyro(site_name: str):
assert_site_ok(site_name)
q = """
SELECT Etat, `Date`
FROM Sondes.v_gyro_last
WHERE Lieu = %s \
AND Sonde = 'Gyro'
ORDER BY `Date` DESC
LIMIT 1 \
"""
with closing(get_connection()) as cnx, closing(cnx.cursor(dictionary=True)) as cur:
cur.execute(q, (site_name,)) # <-- FIX
row = cur.fetchone()
if not row:
return None, None
etat = (row.get("Etat") or "").strip().upper()
ts = row.get("Date")
return etat, ts
def render_gyro_badge(site_name: str, stale_after_min: int = 10):
"""Affiche un voyant Gyro (vert/rouge/orange) + fraîcheur des données."""
etat, ts = fetch_gyro(site_name)
if etat in ("ON", "1"):
color, label = "#ef4444", "GYRO ON"
elif etat in ("OFF", "0"):
color, label = "#22c55e", "GYRO OFF"
elif etat in ("ALERTE", "ALARM", "ALARMED"):
color, label = "#f59e0b", "GYRO ALERTE"
else:
color, label = "#9E9E9E", "GYRO INCONNU"
stale = True
age_txt = ""
if ts is not None:
try:
from datetime import datetime as _dt
now = _dt.now(ts.tzinfo) if hasattr(ts, "tzinfo") and ts.tzinfo else _dt.now()
mins = int((now - ts).total_seconds() // 60)
stale = mins >= stale_after_min
age_txt = f"MAJ: {ts:%d/%m/%Y %H:%M} ({mins} min)"
except Exception:
pass
border = "4px dashed" if stale else "4px solid"
opacity = "0.6" if stale else "1"
st.markdown(f"""
<div style="display:flex;align-items:center;gap:12px;
padding:10px 14px;border:{border} {color};
border-radius:16px;background:rgba(0,0,0,0.03);">
<div style="width:18px;height:18px;border-radius:50%;
background:{color};opacity:{opacity};
box-shadow:0 0 14px {color};"></div>
<div style="font-weight:600;font-size:1rem;color:{color};">{label}</div>
<div style="margin-left:auto;font-size:0.85rem;color:#666;">
{age_txt}{' — données anciennes' if stale else ''}
</div>
</div>
""", unsafe_allow_html=True)
# =========================================================
# Bootstrap schéma : Journal_Erreurs
# =========================================================
def ensure_schema():
ddl_generated = """
CREATE TABLE IF NOT EXISTS Journal_Erreurs \
( \
Id INT AUTO_INCREMENT PRIMARY KEY, \
Site VARCHAR(50) NOT NULL, \
Sonde VARCHAR(100) NOT NULL, \
DateJour DATE NOT NULL, \
Type VARCHAR(30) NOT NULL, \
Source_Id INT NULL, \
Source_Id_norm INT GENERATED ALWAYS AS (COALESCE(Source_Id, 0)) STORED, \
Resume TEXT NOT NULL, \
Statut VARCHAR(20) NOT NULL DEFAULT 'Nouveau', \
Priorite TINYINT NOT NULL DEFAULT 3, \
Assignation VARCHAR(100) NULL, \
Commentaire TEXT NULL, \
Tag VARCHAR(50) NULL, \
CreatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP, \
UpdatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, \
UNIQUE KEY uk_site_sonde_date_type (Site, Sonde, DateJour, Type, Source_Id_norm), \
KEY idx_journal_date_site (DateJour, Site), \
KEY idx_journal_statut (Statut)
) ENGINE = InnoDB \
DEFAULT CHARSET = utf8mb4 \
COLLATE = utf8mb4_unicode_ci; \
"""
ddl_fallback = """
CREATE TABLE IF NOT EXISTS Journal_Erreurs \
( \
Id INT AUTO_INCREMENT PRIMARY KEY, \
Site VARCHAR(50) NOT NULL, \
Sonde VARCHAR(100) NOT NULL, \
DateJour DATE NOT NULL, \
Type VARCHAR(30) NOT NULL, \
Source_Id INT NULL, \
Source_Id_norm INT NULL DEFAULT 0, \
Resume TEXT NOT NULL, \
Statut VARCHAR(20) NOT NULL DEFAULT 'Nouveau', \
Priorite TINYINT NOT NULL DEFAULT 3, \
Assignation VARCHAR(100) NULL, \
Commentaire TEXT NULL, \
Tag VARCHAR(50) NULL, \
CreatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP, \
UpdatedAt TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP
) ENGINE = InnoDB \
DEFAULT CHARSET = utf8mb4 \
COLLATE = utf8mb4_unicode_ci; \
"""
idxs_fallback = [
"CREATE UNIQUE INDEX IF NOT EXISTS uk_site_sonde_date_type ON Sondes.Journal_Erreurs (Site, Sonde, DateJour, Type, Source_Id_norm)",
"CREATE INDEX IF NOT EXISTS idx_journal_date_site ON Sondes.Journal_Erreurs (DateJour, Site)",
"CREATE INDEX IF NOT EXISTS idx_journal_statut ON Sondes.Journal_Erreurs (Statut)",
]
triggers_fallback = [
"""
CREATE TRIGGER trg_je_bi
BEFORE INSERT
ON Sondes.Journal_Erreurs
FOR EACH ROW
BEGIN
SET NEW.Source_Id_norm = IFNULL(NEW.Source_Id, 0);
END
""",
"""
CREATE TRIGGER trg_je_bu
BEFORE UPDATE
ON Sondes.Journal_Erreurs
FOR EACH ROW
BEGIN
SET NEW.Source_Id_norm = IFNULL(NEW.Source_Id, 0);
END
""",
]
try:
with closing(get_connection()) as cnx, closing(cnx.cursor()) as cur:
cur.execute(ddl_generated)
cnx.commit()
except Exception:
with closing(get_connection()) as cnx, closing(cnx.cursor()) as cur:
cur.execute(ddl_fallback)
for q in idxs_fallback:
try:
cur.execute(q)
except Exception:
pass
for name in ("trg_je_bi", "trg_je_bu"):
try:
cur.execute(f"DROP TRIGGER IF EXISTS {name}")
except Exception:
pass
for q in triggers_fallback:
try:
cur.execute(q)
except Exception:
pass
cnx.commit()
try:
ensure_schema()
except Exception as e:
st.warning(f"Init schéma Journal_Erreurs : {e}")
# =========================================================
# Connexion utilisateur
# =========================================================
if not st.session_state.get("authenticated", False):
login = st.sidebar.text_input("Nom d'utilisateur")
password = st.sidebar.text_input("Mot de passe", type="password")
# --- Bouton connexion ---
clicked = st.sidebar.button("Se connecter")
# --- QR code SOUS le bouton ---
st.sidebar.markdown("---")
st.sidebar.caption("Accès rapide depuis un smartphone :")
qr_path = os.path.join(BASE_DIR, "assets", "QR_Domo91FR.png")
if os.path.exists(qr_path):
st.sidebar.image(qr_path, width=180)
else:
st.sidebar.error(f"QR code introuvable : {qr_path}")
# --- Traitement de la connexion ---
if clicked:
try:
with closing(get_connection()) as conn, closing(conn.cursor(dictionary=True)) as cursor:
cursor.execute(
"""
SELECT NomUtilisateur, role, MotDePasseHash, Site, DateExpiration
FROM Acces.Utilisateurs
WHERE NomUtilisateur = %s
LIMIT 1
""",
(login,),
)
result = cursor.fetchone()
if not result:
st.sidebar.error("Identifiants invalides")
elif result["DateExpiration"] and result["DateExpiration"] < date.today():
st.sidebar.error("⛔ Accès expiré.")
elif not verifier_password(password, result["MotDePasseHash"]):
st.sidebar.error("Identifiants invalides")
else:
st.session_state.update({
"authenticated": True,
"role": result["role"],
"site_autorise": result["Site"],
"onglet_actif": "Accueil",
})
now_str = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cursor.execute(
"""
INSERT INTO Sondes.Connexion_Log (Utilisateur, Lieu, Date_Connexion)
VALUES (%s, %s, %s)
""",
(result["NomUtilisateur"], result["Site"], now_str),
)
conn.commit()
st.rerun()
except Exception as e:
st.sidebar.error(f"Erreur connexion : {e}")
st.sidebar.text(traceback.format_exc())
else:
st.sidebar.success(f"Connecté ({st.session_state.get('role')})")
if st.sidebar.button("🔓 Déconnexion"):
for key in ["authenticated", "role", "site_autorise"]:
st.session_state[key] = False if key == "authenticated" else None
st.session_state["onglet_actif"] = "Accueil"
st.rerun()
# =========================================================
# PDF
# =========================================================
# =========================================================
# PDF
# =========================================================
def generer_pdf(site_name: str, date_str: str, periode: str):
"""
Génère un PDF de relevés + alertes pour un site et une date.
- site_name : nom de table (Saclay/Meudon)
- date_str : YYYY-MM-DD
- periode : libellé de tranche horaire
"""
assert_site_ok(site_name)
st.info(f"Génération du rapport PDF pour {site_name} à la date {date_str} ({periode})")
plages = {
"Toute la journée": (time(0, 0), time(23, 59)),
"Matin (6h-12h)": (time(6, 0), time(12, 0)),
"Après-midi (12h-18h)": (time(12, 0), time(18, 0)),
"Nuit (18h-6h)": (time(18, 0), time(6, 0)),
}
try:
with closing(get_connection()) as conn, closing(conn.cursor(dictionary=True)) as cur:
# Mesures
cur.execute(
f"SELECT Sonde, Date, Temperature FROM `{site_name}` WHERE DATE(Date) = %s ORDER BY Sonde, Date",
(date_str,),
)
rows = cur.fetchall()
df = pd.DataFrame(rows)
if df.empty:
st.warning("Aucune donnée ce jour.")
return
df["Heure"] = pd.to_datetime(df["Date"]).dt.strftime("%H:%M")
df["Heure_obj"] = pd.to_datetime(df["Date"]).dt.time
heure_debut, heure_fin = plages.get(periode, (time(0, 0), time(23, 59)))
if heure_debut < heure_fin:
df = df[(df["Heure_obj"] >= heure_debut) & (df["Heure_obj"] <= heure_fin)]
else:
df = df[(df["Heure_obj"] >= heure_debut) | (df["Heure_obj"] <= heure_fin)]
releves = {}
for sonde in sorted(df["Sonde"].unique()):
df_sonde = df[df["Sonde"] == sonde]
releves[sonde] = list(zip(df_sonde["Heure"], df_sonde["Temperature"]))
# Alertes du jour
table_alertes = f"Alertes_{site_name}"
cur.execute(
f"SELECT Sonde, Debut_defaut, Etat FROM `{table_alertes}` WHERE DATE(Debut_defaut) = %s",
(date_str,),
)
alertes = cur.fetchall()
class RapportPDF(FPDF):
def header(self):
self.set_font("Arial", "B", 14)
self.cell(0, 10, "Rapport de surveillance des sondes", ln=1, align="C")
self.set_font("Arial", "", 12)
self.cell(0, 8, f"Site : {getattr(self, 'site_name', '')}", ln=1, align="C")
self.cell(0, 8, f"Date : {date_str}", ln=1, align="C")
self.cell(0, 8, f"Période : {getattr(self, 'periode', '')}", ln=1, align="C")
self.ln(4)
def releves_section(self, data):
self.set_font("Arial", "B", 12)
self.cell(0, 8, "Relevés de température", ln=1)
for sonde, mesures in data.items():
self.set_font("Arial", "B", 11)
self.cell(0, 7, f"Sonde : {sonde}", ln=1)
col1 = mesures[::2]
col2 = mesures[1::2]
self.set_font("Arial", "B", 10)
self.cell(40, 6, "Heure", border=1)
self.cell(30, 6, "Temp (°C)", border=1)
self.cell(20, 6, "", border=0)
self.cell(40, 6, "Heure", border=1)
self.cell(30, 6, "Temp (°C)", border=1)
self.ln()
self.set_font("Arial", "", 10)
for i in range(max(len(col1), len(col2))):
if i < len(col1):
h1, t1 = col1[i]
self.cell(40, 6, h1, border=1)
self.cell(30, 6, f"{t1:.2f}", border=1)
else:
self.cell(70, 6, "", border=0)
self.cell(20, 6, "", border=0)
if i < len(col2):
h2, t2 = col2[i]
self.cell(40, 6, h2, border=1)
self.cell(30, 6, f"{t2:.2f}", border=1)
else:
self.cell(70, 6, "", border=0)
self.ln()
self.ln(3)
def alertes_section(self, data):
self.set_font("Arial", "B", 12)
self.cell(0, 8, "Alertes enregistrées", ln=1)
self.set_font("Arial", "", 10)
if not data:
self.cell(0, 6, "Aucune alerte ce jour.", ln=1)
else:
for a in data:
self.cell(0, 6, f"{a['Sonde']} - {a['Debut_defaut']} - {a['Etat']}", ln=1)
pdf = RapportPDF()
pdf.site_name = site_name
pdf.periode = periode
pdf.add_page()
pdf.releves_section(releves)
pdf.alertes_section(alertes)
output_dir = os.path.join(BASE_DIR, "PDF")
os.makedirs(output_dir, exist_ok=True)
file_name = f"rapport_{site_name}_{date_str}.pdf"
output_path = os.path.join(output_dir, file_name)
pdf.output(output_path)
with open(output_path, "rb") as f:
st.download_button(
label="📥 Télécharger le rapport PDF",
data=f,
file_name=file_name,
mime="application/pdf",
)
except Exception as err:
st.error(f"Erreur lors de la génération du PDF : {err}")
st.text(traceback.format_exc())
# =========================================================
# Journal erreurs (SQL)
# =========================================================
def _get_site_courant():
role = st.session_state.get("role")
if role != "superviseur":
return st.session_state.get("site_autorise")
return st.session_state.get("selected_site", "Saclay")
def load_alertes(site: str, jour: date):
assert_site_ok(site)
table_alertes = f"Alertes_{site}"
q = f"""
SELECT
a.Id AS Source_Id,
%s AS Site,
a.Sonde AS Sonde,
DATE(a.Debut_defaut) AS DateJour,
'Alerte' AS Type,
CONCAT('Alerte ', a.Etat, ' depuis ', DATE_FORMAT(a.Debut_defaut,'%H:%i')) AS Resume,
a.Etat AS Etat
FROM `{table_alertes}` a
WHERE DATE(a.Debut_defaut) = %s
OR (a.Etat <> 'Acquitté' AND DATE(a.Debut_defaut) <= %s);
"""
with closing(get_connection()) as cnx, closing(cnx.cursor(dictionary=True)) as cur:
cur.execute(q, (site, jour, jour))
rows = cur.fetchall()
cols = ["Source_Id", "Site", "Sonde", "DateJour", "Type", "Resume", "Etat"]
return pd.DataFrame(rows, columns=cols) if rows else pd.DataFrame(columns=cols)
def load_anomalies_auto(site_name: str, jour: date):
assert_site_ok(site_name)
table_mesures = site_name
gap_threshold_min = 20
jump_deg = 10
min_phys, max_phys = -60, 120
q = f"""
WITH J AS (
SELECT Sonde, `Date`, Temperature
FROM `{table_mesures}`
WHERE DATE(`Date`) = %s
),
L AS (
SELECT Sonde,
`Date`,
Temperature,
LAG(`Date`) OVER (PARTITION BY Sonde ORDER BY `Date`) AS prev_date,
LAG(Temperature) OVER (PARTITION BY Sonde ORDER BY `Date`) AS prev_temp
FROM J
),
gaps AS (
SELECT Sonde,
COUNT(*) AS nb_gaps,
MAX(TIMESTAMPDIFF(MINUTE, prev_date, `Date`)) AS max_gap_min
FROM L
WHERE prev_date IS NOT NULL
AND TIMESTAMPDIFF(MINUTE, prev_date, `Date`) >= {gap_threshold_min}
GROUP BY Sonde
),
jumps AS (
SELECT Sonde,
COUNT(*) AS nb_jumps,
MAX(ABS(Temperature - prev_temp)) AS max_jump
FROM L
WHERE prev_temp IS NOT NULL
AND ABS(Temperature - prev_temp) > {jump_deg}
GROUP BY Sonde
),
bounds AS (
SELECT Sonde,
MIN(Temperature) AS tmin,
MAX(Temperature) AS tmax
FROM J
GROUP BY Sonde
HAVING MIN(Temperature) < {min_phys} OR MAX(Temperature) > {max_phys}
)
SELECT NULL AS Source_Id, %s AS Site, g.Sonde AS Sonde, %s AS DateJour, 'Auto' AS Type,
CONCAT('Trous de mesures : ', nb_gaps, ' fois (max ', max_gap_min, ' min)') AS Resume
FROM gaps g
UNION ALL
SELECT NULL, %s, j.Sonde, %s, 'Auto',
CONCAT('Sauts de température suspects : ', nb_jumps, ' (max ', ROUND(max_jump,1), '°C)')
FROM jumps j
UNION ALL
SELECT NULL, %s, b.Sonde, %s, 'Auto',
CONCAT('Valeurs hors bornes physiques (min=', ROUND(tmin,1), '°C, max=', ROUND(tmax,1), '°C)')
FROM bounds b
ORDER BY Sonde;
"""
params = (jour, site_name, jour, site_name, jour, site_name, jour)
with closing(get_connection()) as cnx, closing(cnx.cursor(dictionary=True)) as cur:
cur.execute(q, params)
rows = cur.fetchall()
cols = ["Source_Id", "Site", "Sonde", "DateJour", "Type", "Resume"]
return pd.DataFrame(rows, columns=cols) if rows else pd.DataFrame(columns=cols)
def load_journal_existants(site: str, jour: date):
assert_site_ok(site)
q = """
SELECT Id, \
Site, \
Sonde, \
DateJour, \
Type, \
Source_Id, \
Resume,
Statut, \
Priorite, \
Assignation, \
Commentaire, \
Tag
FROM Sondes.Journal_Erreurs
WHERE Site = %s \
AND DateJour = %s; \
"""
with closing(get_connection()) as cnx, closing(cnx.cursor(dictionary=True)) as cur:
cur.execute(q, (site, jour))
rows = cur.fetchall()
cols = ["Id", "Site", "Sonde", "DateJour", "Type", "Source_Id", "Resume",
"Statut", "Priorite", "Assignation", "Commentaire", "Tag"]
return pd.DataFrame(rows, columns=cols) if rows else pd.DataFrame(columns=cols)
def upsert_journal(rows: list[dict]):
if not rows:
return
q_insert = """
INSERT INTO Sondes.Journal_Erreurs
(Site, Sonde, DateJour, Type, Source_Id, Resume, Statut, Priorite, Assignation, Commentaire, Tag)
VALUES (%(Site)s, %(Sonde)s, %(DateJour)s, %(Type)s, %(Source_Id)s, %(Resume)s,
%(Statut)s, %(Priorite)s, %(Assignation)s, %(Commentaire)s, %(Tag)s)
ON DUPLICATE KEY UPDATE Resume=VALUES(Resume), \
Statut=VALUES(Statut), \
Priorite=VALUES(Priorite), \
Assignation=VALUES(Assignation), \
Commentaire=VALUES(Commentaire), \
Tag=VALUES(Tag), \
UpdatedAt=CURRENT_TIMESTAMP; \
"""
with closing(get_connection()) as cnx, closing(cnx.cursor()) as cur:
cur.executemany(q_insert, rows)
cnx.commit()
# =========================================================
# Page Journal erreurs
# =========================================================
def page_journal_erreurs():
st.header("📝 Journal des erreurs")
site = _get_site_courant()
if not site:
st.warning("Veuillez dabord choisir un site sur la page daccueil.")
return
if site not in SITES_AUTORISES:
st.error("Site invalide.")
return
jour = st.date_input("Date de vision", value=st.session_state.get("selected_date", date.today()))
st.caption(f"Site : {site} — Date : {jour.strftime('%d/%m/%Y')}")
df_alertes = load_alertes(site, jour)
df_auto = load_anomalies_auto(site, jour)
df_saved = load_journal_existants(site, jour)
base = pd.concat([df_auto, df_alertes], ignore_index=True)
if base.empty and df_saved.empty:
st.info("Aucune anomalie détectée ni alerte pour cette date.")
return
def _source_norm(x):
return 0 if pd.isna(x) else int(x)
# base : garantir Source_Id + Source_Id_norm
if "Source_Id" not in base.columns:
base["Source_Id"] = pd.NA
base["Source_Id_norm"] = base["Source_Id"].apply(_source_norm)
# df_saved : garantir colonnes et Key, même si vide
if df_saved is None or df_saved.empty:
df_saved = pd.DataFrame(columns=["Key", "Statut", "Priorite", "Assignation", "Commentaire", "Tag"])
else:
if "Source_Id" not in df_saved.columns:
df_saved["Source_Id"] = pd.NA
df_saved["Source_Id_norm"] = df_saved["Source_Id"].apply(_source_norm)
df_saved["Key"] = (
df_saved["Site"].astype(str) + "|" +
df_saved["Sonde"].astype(str) + "|" +
df_saved["DateJour"].astype(str) + "|" +
df_saved["Type"].astype(str) + "|" +
df_saved["Source_Id_norm"].astype(str)
)
for c in ["Statut", "Priorite", "Assignation", "Commentaire", "Tag"]:
if c not in df_saved.columns:
df_saved[c] = pd.NA
# base Key (toujours)
base["Key"] = (
base["Site"].astype(str) + "|" +
base["Sonde"].astype(str) + "|" +
base["DateJour"].astype(str) + "|" +
base["Type"].astype(str) + "|" +
base["Source_Id_norm"].astype(str)
)
df = base.merge(
df_saved[["Key", "Statut", "Priorite", "Assignation", "Commentaire", "Tag"]],
on="Key",
how="left",
)
# Defaults & types
df["Statut"] = df["Statut"].fillna("Nouveau").astype("string")
df["Priorite"] = pd.to_numeric(df["Priorite"], errors="coerce").fillna(3).astype("Int64")
for c in ["Assignation", "Commentaire", "Tag"]:
df[c] = df[c].astype("string").fillna("")
for c in ["Sonde", "Type", "Resume"]:
df[c] = df[c].astype("string").fillna("")
st.subheader("Synthèse (éditable)")
view_cols = ["Key", "Sonde", "Type", "Resume", "Statut", "Priorite", "Assignation", "Tag", "Commentaire"]
disabled_cols = ["Key", "Sonde", "Type", "Resume"]
editable = st.data_editor(
df[view_cols].copy(),
disabled=disabled_cols,
use_container_width=True,
hide_index=True,
column_config={
"Key": st.column_config.TextColumn("Key", disabled=True),
"Statut": st.column_config.SelectboxColumn(
"Statut", options=["Nouveau", "En cours", "Planifié", "Clos"], help="État de la tâche"
),
"Priorite": st.column_config.NumberColumn(
"Priorité", min_value=1, max_value=3, step=1, format="%d",
help="1=Haute, 2=Moyenne, 3=Basse"
),
"Assignation": st.column_config.TextColumn("Assignation", help="Technicien / ticket"),
"Tag": st.column_config.TextColumn("Tag", help="Intermittent, Réseau, Capteur, etc."),
"Commentaire": st.column_config.TextColumn(
"Commentaire", help="Notes (SHIFT+Entrée pour un saut de ligne)", max_chars=2000
),
}
)
def _none_if_empty(x):
if x is None:
return None
if isinstance(x, float) and pd.isna(x):
return None
if isinstance(x, str) and x.strip() == "":
return None
return x
if st.session_state.get("role") == "superviseur":
if st.button("💾 Enregistrer les modifications"):
df_keys = df[["Key", "Site", "DateJour", "Source_Id"]].copy()
df_to_save = editable.merge(df_keys, on="Key", how="left")
payload = []
for _, r in df_to_save.iterrows():
payload.append({
"Site": r["Site"],
"Sonde": r["Sonde"],
"DateJour": r["DateJour"],
"Type": r["Type"],
"Source_Id": int(r["Source_Id"]) if pd.notna(r.get("Source_Id")) else None,
"Resume": r["Resume"],
"Statut": _none_if_empty(r.get("Statut")) or "Nouveau",
"Priorite": int(r.get("Priorite", 3)) if pd.notna(r.get("Priorite", 3)) else 3,
"Assignation": _none_if_empty(r.get("Assignation")),
"Commentaire": _none_if_empty(r.get("Commentaire")),
"Tag": _none_if_empty(r.get("Tag")),
})
upsert_journal(payload)
st.success("Journal mis à jour.")
else:
st.info("Lecture seule (réservé aux superviseurs pour lédition).")
# =========================================================
# Affichage ALERTES non acquittées (global)
# =========================================================
if st.session_state.get("authenticated"):
try:
role = st.session_state.get("role")
site_selectionne = (
st.session_state.get("site_autorise")
if role != "superviseur"
else st.session_state.get("selected_site", "Saclay")
)
if site_selectionne:
assert_site_ok(site_selectionne)
table_alertes = f"Alertes_{site_selectionne}"
with closing(get_connection()) as conn, closing(conn.cursor(dictionary=True)) as cursor:
cursor.execute(
f"SELECT Sonde, Debut_defaut, Etat "
f"FROM `{table_alertes}` "
f"WHERE Etat != 'Acquitté' "
f"ORDER BY Debut_defaut DESC"
)
alertes = cursor.fetchall()
if alertes:
st.subheader("🚨 Alertes non acquittées")
st.dataframe(pd.DataFrame(alertes), use_container_width=True)
else:
st.success("✅ Aucune alerte en cours.")
else:
st.info("Connectez-vous et choisissez un site pour afficher les alertes.")
except Exception as e:
st.error(f"Erreur lors de la récupération des alertes : {e}")
st.text(traceback.format_exc())
else:
pass
# =========================================================
# Navigation + Pages
# =========================================================
if st.session_state.get("authenticated"):
# Onglets selon rôle
if st.session_state.get("role") != "superviseur":
onglets = ["Accueil", "Statistiques"]
else:
onglets = ["Accueil", "Statistiques", "Traffic", "Journal erreurs"]
# Normaliser l'onglet actif
if st.session_state.get("onglet_actif") not in onglets:
st.session_state["onglet_actif"] = onglets[0]
# Menu (créé pour tous)
onglet_selectionne = st.sidebar.radio(
"📁 Navigation",
onglets,
index=onglets.index(st.session_state["onglet_actif"]),
)
st.session_state["onglet_actif"] = onglet_selectionne
# Contexte commun
site_actuel = (
st.session_state.get("site_autorise")
if st.session_state.get("role") != "superviseur"
else st.session_state.get("selected_site", "Saclay")
)
date_selectionnee = st.session_state.get("selected_date", date.today())
# ------------------ Accueil ------------------
rows_mesures = []
if onglet_selectionne == "Accueil":
try:
# Site imposé ou sélection admin
if st.session_state.get("role") == "superviseur":
if site_actuel not in SITES_LISTE:
site_actuel = SITES_LISTE[0]
site_actuel = st.selectbox(
"📍 Choisissez un site :",
SITES_LISTE,
index=SITES_LISTE.index(site_actuel) if site_actuel in SITES_LISTE else 0
)
st.session_state["selected_site"] = site_actuel
else:
st.info(f"Site imposé")
#st.info(f"Site imposé : {site_actuel}")
#assert_site_ok(site_actuel)
# Voyant Gyro
st.subheader(f"🚨 Statut Gyro — {site_actuel}")
try:
st_autorefresh(interval=30_000, key="gyro_autorefresh")
except Exception:
pass
render_gyro_badge(site_actuel)
# Date
date_selectionnee = st.date_input("📅 Date du relevé", value=date_selectionnee)
st.session_state["selected_date"] = date_selectionnee
df_sonde = pd.DataFrame()
seuil_temp = 10.0
sonde_choisie = None
# 1) Charger config (sondes ON + seuils) puis mesures du jour
with closing(get_connection()) as conn:
cfg_on = get_chambres_froides_actives(conn, site_actuel)
sondes_on = [r["Sonde"] for r in cfg_on]
seuils_on = {r["Sonde"]: float(r["Temp_Max"]) for r in cfg_on}
if not sondes_on:
st.warning("Aucune sonde active (Etat=ON) dans Chambres_froides pour ce site.")
st.stop()
with closing(conn.cursor(dictionary=True)) as cur_mesures:
placeholders = ", ".join(["%s"] * len(sondes_on))
q = f"""
SELECT Sonde, Date, Temperature
FROM `{site_actuel}`
WHERE DATE(Date) = %s
AND Sonde IN ({placeholders})
ORDER BY Sonde, Date DESC
"""
params = [date_selectionnee.strftime("%Y-%m-%d")] + sondes_on
cur_mesures.execute(q, params)
rows_mesures = cur_mesures.fetchall()
if rows_mesures:
df = pd.DataFrame(rows_mesures)
df["Date"] = pd.to_datetime(df["Date"])
sonde_choisie = st.selectbox("🧪 Choisissez une sonde :", sondes_on)
df_sonde = df[df["Sonde"] == sonde_choisie].copy()
df_sonde["Heure"] = df_sonde["Date"].dt.hour
tranche = st.radio(
"🕒 Tranche horaire :",
["Toute la journée", "Matin (6h-12h)", "Après-midi (12h-18h)", "Nuit (18h-6h)"],
)
st.session_state["selected_periode"] = tranche
if st.button("🧾 Générer le PDF du jour"):
generer_pdf(
site_actuel,
date_selectionnee.strftime("%Y-%m-%d"),
st.session_state.get("selected_periode", "Toute la journée"),
)
# Filtre tranche
if tranche == "Matin (6h-12h)":
df_sonde = df_sonde[(df_sonde["Heure"] >= 6) & (df_sonde["Heure"] < 12)]
elif tranche == "Après-midi (12h-18h)":
df_sonde = df_sonde[(df_sonde["Heure"] >= 12) & (df_sonde["Heure"] < 18)]
elif tranche == "Nuit (18h-6h)":
df_sonde = df_sonde[(df_sonde["Heure"] >= 18) | (df_sonde["Heure"] < 6)]
seuil_temp = seuils_on.get(sonde_choisie, 10.0)
if not df_sonde.empty:
st.subheader("📊 Tableau des relevés")
def surlignage_temp(val):
try:
return "color: red; font-weight: bold" if float(val) > seuil_temp else ""
except (ValueError, TypeError):
return ""
styled_df = df_sonde.style.map(surlignage_temp, subset=["Temperature"])
st.dataframe(styled_df, use_container_width=True)
st.subheader("📈 Évolution de la température")
fig, ax = plt.subplots(figsize=(10, 4))
ax.plot(df_sonde["Date"], df_sonde["Temperature"], marker="o")
ax.axhline(seuil_temp, linestyle="--", label=f"Seuil {seuil_temp}°C")
ax.set_xlabel("Heure")
ax.set_ylabel("Température (°C)")
ax.set_title(f"{sonde_choisie} - {date_selectionnee.strftime('%d/%m/%Y')}")
ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
ax.legend()
st.pyplot(fig)
else:
st.info("Aucun relevé dans la tranche horaire sélectionnée.")
else:
st.info("Aucun relevé pour cette date.")
except Exception as e:
st.error(f"Erreur : {e}")
st.text(traceback.format_exc())
# ------------------ Statistiques ------------------
elif onglet_selectionne == "Statistiques":
st.markdown("## 📈 Statistiques de température")
site = (
st.session_state.get("site_autorise")
if st.session_state.get("role") != "superviseur"
else st.session_state.get("selected_site", "Saclay")
)
assert_site_ok(site)
date_val = st.session_state.get("selected_date", date.today())
try:
with closing(get_connection()) as conn, closing(conn.cursor(dictionary=True)) as cursor:
cursor.execute(
f"SELECT * FROM `{site}` WHERE DATE(Date) = %s ORDER BY Sonde, Date",
(date_val.strftime("%Y-%m-%d"),),
)
rows = cursor.fetchall()
df = pd.DataFrame(rows)
if df.empty:
st.info("Aucune donnée pour cette date.")
else:
df["Date"] = pd.to_datetime(df["Date"])
sondes = sorted(df["Sonde"].unique())
sonde = st.selectbox("Choisir une sonde :", sondes, key="selectbox_stats")
df_sonde = df[df["Sonde"] == sonde]
# Récupérer Temp_Max pour cette sonde (si définie)
seuil = None
try:
with closing(get_connection()) as conn_thr, closing(conn_thr.cursor(dictionary=True)) as cur_thr:
cur_thr.execute(
"SELECT Temp_Max FROM Sondes.Chambres_froides WHERE Lieu = %s AND Sonde = %s",
(site, sonde),
)
row_thr = cur_thr.fetchone()
if row_thr:
seuil = float(row_thr["Temp_Max"])
except Exception as e:
st.warning(f"Impossible de récupérer le seuil pour {sonde} : {e}")
st.subheader("Évolution journalière")
fig, ax = plt.subplots(figsize=(10, 4))
dates = df_sonde["Date"]
temp = df_sonde["Temperature"]
if seuil is None:
# Pas de seuil trouvé : tracé classique
ax.plot(dates, temp, marker="o")
else:
# On sépare les valeurs normales et excessives
temp_ok = temp.where(temp <= seuil, np.nan)
temp_high = temp.where(temp > seuil, np.nan)
# Courbe normale
ax.plot(dates, temp_ok, marker="o")
# Partie excessive en rouge
ax.plot(dates, temp_high, marker="o", color="red")
# Ligne horizontale du seuil (optionnel mais très parlant)
ax.axhline(seuil, linestyle="--", linewidth=1)
ax.text(dates.iloc[0], seuil, f" Seuil {seuil}°C", va="bottom")
ax.set_title(f"{sonde} - {date_val.strftime('%d/%m/%Y')}")
ax.set_xlabel("Heure")
ax.set_ylabel("Température (°C)")
ax.xaxis.set_major_formatter(mdates.DateFormatter("%H:%M"))
st.pyplot(fig)
except Exception as e:
st.error(f"Erreur chargement statistiques : {e}")
st.text(traceback.format_exc())
# Admin Chambres froides
if st.session_state.get("role") == "superviseur":
with st.expander("🛠️ Gestion des chambres froides (administrateur)", expanded=True):
if st.button("🔄 Actualiser la liste"):
st.session_state["refresh_admin"] = random.randint(0, 9999)
try:
with closing(get_connection()) as conn_admin, closing(
conn_admin.cursor(dictionary=True)
) as cursor_admin:
cursor_admin.execute(
"SELECT * FROM Sondes.Chambres_froides WHERE Lieu = %s", (site,)
)
chambres = cursor_admin.fetchall()
if not chambres:
st.warning("Aucune chambre froide pour ce site.")
else:
table_alertes = f"Alertes_{site}"
for chambre in chambres:
col1, col2, col3 = st.columns([3, 1, 2])
with col1:
st.markdown(f"**{chambre['Sonde']}**")
with col2:
etat = st.checkbox(
"ON",
value=(chambre["Etat"] == "ON"),
key=f"etat_{chambre['Id']}_{st.session_state.get('refresh_admin', 0)}",
)
new_etat = "ON" if etat else "OFF"
with col3:
temp_max = int(chambre["Temp_Max"])
moins, temp_display, plus = st.columns([1, 2, 1])
with moins:
if st.button("", key=f"moins_{chambre['Id']}"):
temp_max -= 1
with temp_display:
st.markdown(
f"<div style='text-align:center;font-size:20px'>{temp_max}°C</div>",
unsafe_allow_html=True,
)
with plus:
if st.button("", key=f"plus_{chambre['Id']}"):
temp_max += 1
# Si quelque chose a changé (état ou Temp_Max)
if new_etat != chambre["Etat"] or temp_max != chambre["Temp_Max"]:
# 1) Mise à jour de la chambre froide
cursor_admin.execute(
"UPDATE Sondes.Chambres_froides "
"SET Etat = %s, Temp_Max = %s WHERE Id = %s",
(new_etat, temp_max, chambre["Id"]),
)
conn_admin.commit()
st.success(f"{chambre['Sonde']} mise à jour")
# 2) Si on vient de passer la chambre à OFF → acquitter les alertes
if new_etat == "OFF":
try:
cursor_admin.execute(
f"UPDATE Sondes.`{table_alertes}` "
"SET Etat = 'Acquitté' "
"WHERE Sonde = %s AND Etat <> 'Acquitté'",
(chambre["Sonde"],),
)
conn_admin.commit()
st.info(
f"Toutes les alertes en cours pour {chambre['Sonde']} ont été acquittées."
)
except Exception as e_alert:
st.error(
f"Erreur lors de l'acquittement des alertes pour {chambre['Sonde']} : {e_alert}"
)
except Exception as e:
st.error(f"Erreur SQL (admin) : {e}")
st.text(traceback.format_exc())
# ------------------ Traffic ------------------
elif onglet_selectionne == "Traffic":
st.header("🚦 Connexions récentes")
try:
with closing(get_connection()) as conn, closing(conn.cursor(dictionary=True)) as cursor:
cursor.execute(
"SELECT Utilisateur, Lieu, Date_Connexion "
"FROM Sondes.Connexion_Log "
"WHERE Utilisateur NOT LIKE %s "
"ORDER BY Date_Connexion DESC "
"LIMIT 100",
("Michel%",),
)
logs = cursor.fetchall()
df_logs = pd.DataFrame(logs)
st.dataframe(df_logs, use_container_width=True)
except Exception as e:
st.error(f"Erreur : {e}")
st.text(traceback.format_exc())
# ------------------ Journal erreurs ------------------
elif onglet_selectionne == "Journal erreurs":
page_journal_erreurs()
else:
st.info("Connectez-vous pour accéder à lapplication.")