From 400ca9d4d1cafe9066304081254a632d955eda77 Mon Sep 17 00:00:00 2001 From: Michel Date: Wed, 12 Nov 2025 13:25:26 +0100 Subject: [PATCH] Avant changements --- .env | 16 +- app/Logs.py | 225 ++++++++++++ app/tracker.py | 371 ++++++++++++++++++++ app/users.py | 866 ++++++++++++++++++++--------------------------- requirements.txt | Bin 276 -> 300 bytes 5 files changed, 971 insertions(+), 507 deletions(-) create mode 100644 app/Logs.py create mode 100644 app/tracker.py diff --git a/.env b/.env index 7389ee4..79bcc74 100644 --- a/.env +++ b/.env @@ -2,9 +2,19 @@ DB_HOST=162.19.78.131 DB_USER=excel DB_PASSWORD='%n#%3Lay1MPa$%kR^5@' -DB_NAME=Acces -ADMIN_USER=Michel -ADMIN_PASS_HASH='$2b$12$Dgv7jNLJuR.3hQminSVE9OP6hCSmW4nISArR3HF5LTPGFK0Zw29N2' +DB_NAME=Sondes +DB_PORT=3306 +AUTH_USERS=[{"user":"Michel","pass":"210462"}] + +# === Connexion SSH pour visualiseur_logs.py === +SSH_HOST=162.19.78.131 +SSH_PORT=22 +SSH_USER=debian +SSH_KEY_PATH=/home/debian/.ssh/id_ed25519 +SSH_KEY_PASSPHRASE='gaby' +SSH_LOG_DIR=/home/debian/Gestion_sondes/Logs +SSH_PASSWORD='lpZwixbBUFtGY' +SSH_FORCE_PASSWORD=1 # connexion OVH pour les SMS OVH_APP_KEY=f725d07b2f98a195 diff --git a/app/Logs.py b/app/Logs.py new file mode 100644 index 0000000..68db89e --- /dev/null +++ b/app/Logs.py @@ -0,0 +1,225 @@ +# Logs.py +# Visualiseur de logs SSH ultra-simplifié (Streamlit) +# - Lecture de la config via .env uniquement +# - Connexion SSH via clé (chemin) ou mot de passe +# - Liste des fichiers, tail dernier N, recherche, téléchargement + +import os +import time +import stat +import paramiko +import streamlit as st +from dotenv import load_dotenv +from typing import Optional, Tuple + +# ================ +# CONFIG .env +# ================ +load_dotenv() # cherche automatiquement un .env dans le dossier courant + +VPS_HOST = os.getenv("SSH_HOST", "").strip() +VPS_PORT = int(os.getenv("SSH_PORT", 22)) +VPS_USER = os.getenv("SSH_USER", "").strip() +VPS_PASSWORD = os.getenv("SSH_PASSWORD", "") +VPS_KEY_PATH = os.getenv("SSH_KEY_PATH", "").strip() +VPS_LOG_DIR = os.getenv("SSH_LOG_DIR", "/home/debian/Gestion_sondes/Logs").rstrip("/") + +# ================ +# UTILITAIRES +# ================ +# --- Connexion strictement au MOT DE PASSE --- +@st.cache_resource(show_spinner=False) +def get_ssh_client_password_only() -> paramiko.SSHClient: + host = os.getenv("SSH_HOST", "").strip() + user = os.getenv("SSH_USER", "").strip() + port = int(os.getenv("SSH_PORT", 22)) + pwd = os.getenv("SSH_PASSWORD", "") + + if not host or not user or not pwd: + raise RuntimeError("SSH_HOST / SSH_USER / SSH_PASSWORD manquant(s) dans .env") + + client = paramiko.SSHClient() + client.set_missing_host_key_policy(paramiko.AutoAddPolicy()) + + # IMPORTANT : on force l’usage du mot de passe, aucune clé n’est chargée + try: + client.connect( + hostname=host, port=port, username=user, + password=pwd, timeout=10, allow_agent=False, look_for_keys=False + ) + # vérifie que le transport est bien up + t = client.get_transport() + if not t or not t.is_active(): + raise RuntimeError("Transport SSH inactif après connexion.") + return client + except Exception as e: + raise RuntimeError(f"Échec connexion SSH par mot de passe : {e}") from e + +# --- Ouverture SFTP sûre --- +try: + client = get_ssh_client_password_only() +except Exception as e: + st.error(str(e)) + st.stop() + +try: + sftp = client.open_sftp() +except Exception as e: + st.error(f"Erreur ouverture SFTP : {e}") + try: client.close() + except: pass + st.stop() + +def list_remote_files(sftp: paramiko.SFTPClient, directory: str) -> list[Tuple[str, int, float]]: + """ + Retourne [(nom, taille_bytes, mtime_epoch), ...] triés par mtime desc. + """ + items = [] + try: + for entry in sftp.listdir_attr(directory): + mode = entry.st_mode + if stat.S_ISREG(mode): + items.append((entry.filename, entry.st_size, entry.st_mtime)) + except FileNotFoundError: + st.error(f"❌ Dossier introuvable côté serveur : {directory}") + return [] + except Exception as e: + st.error(f"❌ Impossible de lister {directory} : {e}") + return [] + items.sort(key=lambda t: t[2], reverse=True) + return items + + +def read_remote_text_tail( + client: paramiko.SSHClient, path: str, lines: int = 500, grep: str | None = None +) -> str: + """ + Lit côté serveur les N dernières lignes (tail -n), + compatible .gz via zcat si nécessaire, avec grep optionnel (insensible à la casse). + """ + safe_path = path.replace('"', '\\"') + if path.endswith(".gz"): + base_cmd = f'zcat "{safe_path}"' + else: + base_cmd = f'tail -n {max(1, lines)} "{safe_path}" && exit 0 || head -n {max(1, lines)} "{safe_path}"' + + if grep: + # -i insensible à la casse ; on passe par grep après la commande de base + grep_safe = grep.replace('"', '\\"') # échappe juste les guillemets + base_cmd = f'{base_cmd} | grep -i "{grep_safe}" || true' + + stdin, stdout, stderr = client.exec_command(base_cmd, timeout=20) + out = stdout.read() + err = stderr.read() + text = (out or b"").decode("utf-8", errors="replace") + # Si zcat sans grep et sans tail (gros fichiers), on limite côté client pour éviter d'assommer Streamlit + if path.endswith(".gz") and not grep: + lines_list = text.splitlines() + text = "\n".join(lines_list[-max(1, lines):]) + if err and not text: + # Afficher l'erreur seulement si rien en sortie + text = (b"[stderr] " + err).decode("utf-8", errors="replace") + return text + + +def download_remote_file(sftp: paramiko.SFTPClient, path: str) -> bytes: + """Télécharge le fichier distant (binaire) en mémoire.""" + with sftp.file(path, "rb") as f: + return f.read() + + +# ================ +# UI STREAMLIT +# ================ + +st.set_page_config(page_title="Visualiseur de Logs", layout="wide") +st.title("📜 Visualiseur de logs (SSH)") + +with st.expander("Configuration (lecture seule)", expanded=False): + st.code( + f"HOST={VPS_HOST}\nUSER={VPS_USER}\nPORT={VPS_PORT}\n" + f"KEY_PATH={VPS_KEY_PATH or '-'}\nLOG_DIR={VPS_LOG_DIR}", + language="bash" + ) + +client = _get_ssh_client() +if client is None: + st.stop() + +try: + sftp = client.open_sftp() +except Exception as e: + st.error(f"Erreur ouverture SFTP : {e}") + try: + client.close() + except Exception: + pass + st.stop() + +with st.sidebar: + st.header("🔎 Options") + refresh = st.button("🔄 Rafraîchir la liste") + default_tail = st.number_input("Dernières lignes (tail)", min_value=50, max_value=100_000, value=500, step=50) + search_term = st.text_input("Recherche (grep -i)", value="", placeholder="ex: ERROR | Timeout | sonde") + st.caption("Astuce : la recherche applique un grep insensible à la casse sur le résultat.") + +# Rafraîchissement simple : on rejoue listdir si bouton cliqué +if refresh: + time.sleep(0.2) + +files = list_remote_files(sftp, VPS_LOG_DIR) +if not files: + sftp.close() + client.close() + st.stop() + +# Tableau simple (nom, taille, date) +col1, col2 = st.columns([2, 1]) +with col1: + names = [f[0] for f in files] + choice = st.selectbox("Fichiers disponibles (triés par date desc.)", names, index=0) +with col2: + # Infos du fichier choisi + chosen = next((f for f in files if f[0] == choice), None) + if chosen: + size_kb = chosen[1] / 1024 + mtime_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(chosen[2])) + st.metric("Taille (KB)", f"{size_kb:,.0f}".replace(",", " ")) + st.metric("Modifié le", mtime_str) + +full_path = f"{VPS_LOG_DIR}/{choice}" + +# Actions +place1, place2, place3 = st.columns([1, 1, 6]) +with place1: + do_show = st.button("👁️ Afficher") +with place2: + do_dl = st.button("⬇️ Télécharger") + +# Affichage +if do_show: + with st.spinner("Lecture du fichier distant..."): + text = read_remote_text_tail( + client, + full_path, + lines=int(default_tail), + grep=(search_term or None) + ) + st.text_area(f"Contenu : {choice}", text, height=600) + +# Téléchargement +if do_dl: + try: + data = download_remote_file(sftp, full_path) + st.download_button( + label=f"Télécharger {choice}", + data=data, + file_name=choice, + mime="application/octet-stream" + ) + except Exception as e: + st.error(f"❌ Échec du téléchargement : {e}") + +# Fermeture propre +sftp.close() +client.close() diff --git a/app/tracker.py b/app/tracker.py new file mode 100644 index 0000000..f817c6e --- /dev/null +++ b/app/tracker.py @@ -0,0 +1,371 @@ +# tracker.py +# ------------------------------------------------------------- +# Streamlit — Gestion de la table MySQL Sondes.tracker (avec address_hyphen) +# ------------------------------------------------------------- +# Schéma attendu : +# id (PK), address (ROM {0x..}), address_hyphen (28-..-..-..-..-..-..-..), +# lieu, repere, mise_en_service (DATE), res_bits, date (timestamp) +# Authentification intégrée via .env (AUTH_USERS JSON) +# ------------------------------------------------------------- +import re +import time +import json +import hmac +from typing import Optional +from datetime import date + +import pandas as pd +import streamlit as st +import mysql.connector as mysql +from contextlib import contextmanager +from dotenv import load_dotenv +import os +from pathlib import Path + +# 1) .env (avec recherche automatique depuis le fichier courant vers la racine) +try: + from dotenv import load_dotenv, find_dotenv + load_dotenv(find_dotenv(usecwd=True), override=False) +except Exception: + pass # si python-dotenv n'est pas installé, on passe (Streamlit peut fournir st.secrets) + +# 2) Streamlit secrets (optionnel si tu l’utilises) +try: + import streamlit as st + st_secrets_mysql = st.secrets.get("mysql", {}) +except Exception: + st_secrets_mysql = {} + +def get_db_cfg(): + # Priorité à st.secrets si dispo, sinon variables d’environnement (.env) + cfg = { + "host": st_secrets_mysql.get("host") or os.getenv("DB_HOST"), + "user": st_secrets_mysql.get("user") or os.getenv("DB_USER"), + "password": st_secrets_mysql.get("password") or os.getenv("DB_PASSWORD"), + "database": st_secrets_mysql.get("database") or os.getenv("DB_NAME") or os.getenv("DB_DATABASE"), + "port": int(st_secrets_mysql.get("port") or os.getenv("DB_PORT") or 3306), + "auth_plugin": os.getenv("DB_AUTH_PLUGIN") or None, # optionnel + } + # Nettoyage: retire les clés None + return {k: v for k, v in cfg.items() if v not in (None, "")} +# ========================== +# Configuration / Constantes +# ========================== +load_dotenv() # lit .env si présent + +TABLE_NAME = "tracker" +COL_ID = "id" +COL_ADDRESS = "address" # format ROM : {0x28,0xFF,...} +COL_ADDR_HYPHEN = "address_hyphen" # format hyphen : 28-xx-xx-xx-xx-xx-xx-xx +COL_LIEU = "lieu" +COL_REPERE = "repere" +COL_MES = "mise_en_service" # DATE +COL_RESBITS = "res_bits" +COL_DATE = "date" + +# Configuration BDD (standardisée sur les variables d'env MYSQL_*) +DB_CFG = dict( + host=os.getenv("DB_HOST"), + user=os.getenv("DB_USER"), + password=os.getenv("DB_PASS"), + database=os.getenv("DB_NAME"), + port=int(os.getenv("MYSQL_PORT", "3306")), +) + +# Regex d'une ROM code DS18B20 au format {0x28,0xFF,0xAA,0xBB,0xCC,0xDD,0xEE,0xCRC} +ROM_REGEX = re.compile(r"^{(?:0x[0-9A-Fa-f]{2},){7}0x[0-9A-Fa-f]{2}}$") +# Adresse hyphen : 8 octets hexa séparés par des tirets +HYPHEN_REGEX = re.compile(r"^[0-9A-Fa-f]{2}(?:-[0-9A-Fa-f]{2}){7}$") + +# Mapping résolution DS18B20 (bits -> infos) +RES_MAP = { + 9: {"precision": 0.5, "tconv_ms": 94}, + 10: {"precision": 0.25, "tconv_ms": 188}, + 11: {"precision": 0.125, "tconv_ms": 375}, + 12: {"precision": 0.0625,"tconv_ms": 750}, +} + +# ================== +# Authentification via .env (AUTH_USERS) +# ================== +AUTH_USERS_RAW = os.getenv("AUTH_USERS", "[]") + +def _load_users() -> dict: + try: + data = json.loads(AUTH_USERS_RAW) + return {str(d.get("user", "")).strip(): str(d.get("pass", "")) for d in data if d.get("user") and d.get("pass")} + except Exception: + return {} + +USERS = _load_users() + +def _constant_time_equals(a: str, b: str) -> bool: + return hmac.compare_digest(a.encode("utf-8"), b.encode("utf-8")) + +def verify_credentials(username: str, password: str) -> bool: + if not username or not password: + return False + expected = USERS.get(username.strip()) + if expected is None: + return False + return _constant_time_equals(password, expected) + +def require_login() -> Optional[str]: + if st.session_state.get("auth_ok") and st.session_state.get("auth_user"): + return st.session_state.get("auth_user") + + st.markdown("

🔒 Tracker

", unsafe_allow_html=True) + _, col2, _ = st.columns([1, 2, 1]) + with col2: + with st.form("login_form", clear_on_submit=False): + username = st.text_input("Utilisateur") + password = st.text_input("Mot de passe", type="password") + ok = st.form_submit_button("Se connecter") + if ok: + if verify_credentials(username, password): + st.session_state["auth_ok"] = True + st.session_state["auth_user"] = username.strip() + st.success("Connexion réussie.") + time.sleep(0.3) + st.rerun() + else: + st.error("Identifiants invalides.") + st.stop() + +# ================== +# Accès Base de Données +# ================== +@contextmanager +def get_conn(): + cfg = get_db_cfg() + # Astuce debug si besoin : + # print({k: ('***' if k=='password' else v) for k,v in cfg.items()}) + conn = mysql.connect(**cfg) + try: + yield conn + finally: + conn.close() + +# ----------------- +# Utilitaires +# ----------------- +def rom_help() -> str: + return ( + "Format ROM attendu : `{0x28,0xFF,0xAA,0xBB,0xCC,0xDD,0xEE,0x12}` " + "(8 octets en hex). Le premier octet est souvent 0x28 pour DS18B20." + ) + +def is_valid_rom(address: str) -> bool: + return bool(ROM_REGEX.match(str(address).strip())) + +def is_valid_hyphen(address_h: str) -> bool: + return bool(HYPHEN_REGEX.match(str(address_h).strip())) + +def rom_to_hyphen(rom: str) -> str: + hexes = re.findall(r"0x([0-9A-Fa-f]{2})", str(rom)) + if len(hexes) != 8: + return "" + return "-".join(h.lower() for h in hexes) + +def hyphen_to_rom(h: str) -> str: + parts = str(h).strip().split("-") + if len(parts) != 8 or not all(re.fullmatch(r"[0-9A-Fa-f]{2}", p) for p in parts): + return "" + return "{" + ",".join(f"0x{p.upper()}" for p in parts) + "}" + +def res_label(bits: int) -> str: + info = RES_MAP.get(bits) + if not info: + return f"{bits} bits (inconnu)" + return f"{bits} bits (±{info['precision']}°C, {info['tconv_ms']} ms)" + +# ----------------- +# Fonctions SQL +# ----------------- +def fetch_trackers(where_lieu: str | None = None) -> pd.DataFrame: + query = ( + f"SELECT {COL_ID}, {COL_ADDRESS}, {COL_ADDR_HYPHEN}, {COL_LIEU}, " + f"{COL_REPERE}, {COL_MES}, {COL_RESBITS}, {COL_DATE} " + f"FROM {TABLE_NAME}" + ) + params = [] + if where_lieu: + query += f" WHERE {COL_LIEU} = %s" + params.append(where_lieu) + query += f" ORDER BY {COL_LIEU}, {COL_REPERE}, {COL_ADDR_HYPHEN}, {COL_ADDRESS}" + with get_conn() as conn: + df = pd.read_sql(query, conn, params=params) + return df + +def insert_tracker(address: str, lieu: str, res_bits: int, + repere: str | None = None, mise_en_service: date | None = None, + address_hyphen: str | None = None) -> int: + addr_rom = (address or "").strip() if address else "" + addr_hyp = (address_hyphen or "").strip() if address_hyphen else "" + if addr_rom and not addr_hyp: + addr_hyp = rom_to_hyphen(addr_rom) + if addr_hyp and not addr_rom: + addr_rom = hyphen_to_rom(addr_hyp) + if not is_valid_rom(addr_rom) or not is_valid_hyphen(addr_hyp): + raise ValueError("Adresse invalide (ROM ou hyphen).") + sql = f""" + INSERT INTO {TABLE_NAME} + ({COL_ADDRESS}, {COL_ADDR_HYPHEN}, {COL_LIEU}, {COL_REPERE}, {COL_MES}, {COL_RESBITS}) + VALUES (%s, %s, %s, %s, %s, %s) + """ + with get_conn() as conn: + cur = conn.cursor() + cur.execute(sql, ( + addr_rom, + addr_hyp.lower(), + lieu, + (repere.strip() if repere and str(repere).strip() else None), + mise_en_service, + res_bits, + )) + conn.commit() + return cur.lastrowid + +def update_tracker(row_id: int, address: str, lieu: str, res_bits: int, + repere: str | None, mise_en_service: date | None, + address_hyphen: str | None = None) -> None: + addr_rom = (address or "").strip() + addr_hyp = (address_hyphen or "").strip() if address_hyphen else "" + if addr_rom and not addr_hyp: + addr_hyp = rom_to_hyphen(addr_rom) + if addr_hyp and not addr_rom: + addr_rom = hyphen_to_rom(addr_hyp) + if not is_valid_rom(addr_rom) or not is_valid_hyphen(addr_hyp): + raise ValueError("Adresse invalide (ROM ou hyphen).") + sql = f""" + UPDATE {TABLE_NAME} + SET {COL_ADDRESS}=%s, {COL_ADDR_HYPHEN}=%s, {COL_LIEU}=%s, {COL_REPERE}=%s, {COL_MES}=%s, {COL_RESBITS}=%s + WHERE {COL_ID}=%s + """ + with get_conn() as conn: + cur = conn.cursor() + cur.execute(sql, ( + addr_rom, + addr_hyp.lower(), + lieu, + (repere.strip() if repere and str(repere).strip() else None), + mise_en_service, + res_bits, + row_id, + )) + conn.commit() + +def delete_tracker(row_id: int) -> None: + sql = f"DELETE FROM {TABLE_NAME} WHERE {COL_ID}=%s" + with get_conn() as conn: + cur = conn.cursor() + cur.execute(sql, (row_id,)) + conn.commit() + +# ================== +# Application Streamlit +# ================== +st.set_page_config(page_title="Tracker", page_icon="🌡️", layout="wide") +user = require_login() + +st.title("🌡️ Gestion du parc sondes (stock ou installées)") +with st.expander("Paramètres de connexion (lecture seule)"): + st.write({k: ("***" if k in {"password"} else v) for k, v in DB_CFG.items()}) + st.caption("Configurez ces valeurs via le fichier .env") + +st.sidebar.header("Filtres & Actions") +st.sidebar.caption(f"Connecté en tant que **{user}**") + +_all = fetch_trackers() +lieux = sorted([x for x in _all[COL_LIEU].dropna().unique()]) if not _all.empty else [] +lieu_selected = st.sidebar.selectbox("Filtrer par lieu", options=["(Tous)"] + lieux, index=0) + +# Formulaire d'ajout +st.sidebar.subheader("Ajouter une sonde") +with st.sidebar.form("add_form", clear_on_submit=True): + new_address = st.text_input("Adresse ROM", placeholder="{0x28,0xFF,...}", help=rom_help()) + preview_h = rom_to_hyphen(new_address) if new_address else "" + st.text_input("Adresse hyphen (auto)", value=preview_h, disabled=True) + new_lieu = st.text_input("Lieu d'installation") + new_repere = st.text_input("Repère (optionnel)") + new_mes = st.date_input("Mise en service (optionnel)", value=None, format="YYYY-MM-DD") + new_res = st.selectbox("Résolution (bits)", options=[9,10,11,12]) + submitted = st.form_submit_button("Ajouter") + if submitted: + if not is_valid_rom(new_address): + st.warning("Adresse ROM invalide.") + elif not new_lieu.strip(): + st.warning("Lieu requis.") + else: + rid = insert_tracker( + new_address.strip(), + new_lieu.strip(), + int(new_res), + new_repere, + new_mes if isinstance(new_mes, date) else None, + address_hyphen=rom_to_hyphen(new_address.strip()), + ) + st.success(f"Sonde ajoutée (id={rid}).") + time.sleep(0.6) + st.rerun() + +st.sidebar.divider() +if st.sidebar.button("Se déconnecter"): + for _k in list(st.session_state.keys()): + st.session_state.pop(_k, None) + st.success("Déconnecté.") + time.sleep(0.3) + st.rerun() + +# Vue principale +if lieu_selected != "(Tous)": + df = fetch_trackers(where_lieu=lieu_selected) +else: + df = _all.copy() + +if df.empty: + st.info("Aucune sonde enregistrée.") +else: + df["resolution"] = df[COL_RESBITS].apply(res_label) + st.subheader("Enregistrements") + edited = st.data_editor( + df[[COL_ID, COL_ADDRESS, COL_ADDR_HYPHEN, COL_LIEU, COL_REPERE, COL_MES, COL_RESBITS, "resolution", COL_DATE]], + hide_index=True, + use_container_width=True, + num_rows="dynamic", + ) + removed_ids = set(df[COL_ID]) - set(edited[COL_ID]) + to_update = [] + for _, row in edited.iterrows(): + orig = df.loc[df[COL_ID] == row[COL_ID]].iloc[0] + changed = ( + (row[COL_ADDRESS] != orig[COL_ADDRESS]) or + (row[COL_ADDR_HYPHEN] != orig[COL_ADDR_HYPHEN]) or + (row[COL_LIEU] != orig[COL_LIEU]) or + (row.get(COL_REPERE) or "") != (orig.get(COL_REPERE) or "") or + (str(row.get(COL_MES) or "")[:10] != str(orig.get(COL_MES) or "")[:10]) or + (int(row[COL_RESBITS]) != int(orig[COL_RESBITS])) + ) + if changed: + mes_val = row.get(COL_MES) + if pd.isna(mes_val): + mes_val = None + elif hasattr(mes_val, "date"): + mes_val = mes_val.date() + to_update.append((int(row[COL_ID]), str(row[COL_ADDRESS]), str(row[COL_ADDR_HYPHEN] or ""), + str(row[COL_LIEU]), int(row[COL_RESBITS]), + str(row.get(COL_REPERE) or None), mes_val)) + col1, col2 = st.columns([1,1]) + if st.button("Enregistrer les modifications"): + for rid, addr_rom, addr_hyp, lieu, rbits, repere, mes in to_update: + update_tracker(rid, addr_rom, lieu, rbits, repere, mes, address_hyphen=addr_hyp) + for rid in removed_ids: + delete_tracker(rid) + st.success("Modifications enregistrées ✔️") + time.sleep(0.6) + st.rerun() + if st.button("Annuler"): + st.rerun() + +st.divider() +st.caption("Astuce : collez une adresse ROM {0x..,...} → la version hyphen est générée automatiquement.") diff --git a/app/users.py b/app/users.py index 9401dfc..635a99f 100644 --- a/app/users.py +++ b/app/users.py @@ -1,398 +1,271 @@ -# Streamlit app +# users.py +# ------------------------------------------------------------ +# Gestion des utilisateurs + droits d'accès par site (DroitsSites) +# - Streamlit UI +# - MySQL (mysql-connector-python) +# - Hash de mot de passe avec bcrypt +# +# .env attendu (ou variables d'env) : +# DB_HOST=... +# DB_PORT=3306 +# DB_USER=... +# DB_PASS=... +# DB_NAME=Acces # <- base "annuaire" qui contient Utilisateurs/DroitsSites/Connexions +# ------------------------------------------------------------ + import os -from datetime import date, datetime import re -import bcrypt -import mysql.connector -from mysql.connector import pooling, errorcode -import pandas as pd +from datetime import date, datetime +from typing import List, Tuple, Dict, Optional + import streamlit as st -from dotenv import load_dotenv -import smtplib -from email.message import EmailMessage -from email.utils import formatdate +import mysql.connector +from mysql.connector.pooling import MySQLConnectionPool -load_dotenv() -SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net") -SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # OVH: 465 (SSL) ou 587 (STARTTLS) -SMTP_USER = os.getenv("SMTP_USER") -SMTP_PASS = os.getenv("SMTP_PASS") -SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER) - -# ----------------------- -# Auth minimale -# ----------------------- -def require_login(): - st.markdown( - "", - unsafe_allow_html=True, - ) +try: + from dotenv import load_dotenv # facultatif mais pratique load_dotenv() - admin_user = os.getenv("ADMIN_USER") - admin_hash = os.getenv("ADMIN_PASS_HASH") - if not admin_user or not admin_hash: - st.error("Variables ADMIN_USER et/ou ADMIN_PASS_HASH manquantes dans .env") - st.stop() +except Exception: + pass - if "auth_ok" not in st.session_state: - st.session_state.auth_ok = False +try: + import bcrypt # pip install bcrypt +except Exception: + bcrypt = None - if not st.session_state.auth_ok: - col1, col2, col3 = st.columns([1, 2, 1]) - with col2: - st.header("🔐 Accès restreint") - u = st.text_input("Utilisateur") - p = st.text_input("Mot de passe", type="password") - if st.button("Se connecter", use_container_width=True): - try: - ok = (u == admin_user) and bcrypt.checkpw(p.encode(), admin_hash.encode()) - except Exception: - ok = False - if ok: - st.session_state.auth_ok = True - st.rerun() - else: - st.error("Identifiants invalides.") - st.stop() -def logout_button(): - st.markdown( - """ - - """, - unsafe_allow_html=True - ) - st.markdown('
', unsafe_allow_html=True) - if st.button("🚪 Quitter", key="logout", use_container_width=False): - st.session_state.clear() - st.success("Déconnexion effectuée.") - st.rerun() - st.markdown('
', unsafe_allow_html=True) +# --------------------------- +# Config / Connexion MySQL +# --------------------------- +DB_HOST = os.getenv("DB_HOST", "localhost") +DB_PORT = int(os.getenv("DB_PORT", "3306")) +DB_USER = os.getenv("DB_USER", "root") +DB_PASS = os.getenv("DB_PASS", "") +DB_NAME = os.getenv("DB_NAME", "Acces") +pool: Optional[MySQLConnectionPool] = None +def init_pool(): + global pool + if pool is None: + pool = MySQLConnectionPool( + pool_name="users_pool", + pool_size=5, + host=DB_HOST, + port=DB_PORT, + user=DB_USER, + password=DB_PASS, + database=DB_NAME, + autocommit=True, + charset="utf8mb4", + collation="utf8mb4_general_ci", + ) -require_login() -logout_button() +init_pool() -# ----------------------- -# Connexion MySQL via pool -# ----------------------- -@st.cache_resource -def get_pool(): - load_dotenv() - host = os.getenv("DB_HOST") - port = int(os.getenv("MYSQL_PORT", "3306")) - user = os.getenv("DB_USER") - pwd = os.getenv("DB_PASSWORD") - db = os.getenv("DB_NAME") - missing = [k for k, v in { - "DB_HOST": host, "MYSQL_PORT": port, "DB_USER": user, "DB_PASSWORD": pwd, "DB_NAME": db - }.items() if v in (None, "")] - if missing: - raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}") - return pooling.MySQLConnectionPool( - pool_name="users_pool", - pool_size=5, - pool_reset_session=True, - host=host, port=port, user=user, password=pwd, database=db, - autocommit=True - ) - -pool = get_pool() - -# ----------------------- -# Helpers SQL + validations -# ----------------------- EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") -PHONE_RE = re.compile(r"\d{10,14}") -def normalize_phone(p: str|None) -> str|None: - if not p: - return None - digits = re.sub(r"\D", "", p) - return digits if PHONE_RE.match(digits) else None -def to_sql_date(d: date | str | None) -> str | None: - if d is None: - return None - if isinstance(d, str): - try: - d = datetime.fromisoformat(d).date() - except Exception: - return None - return d.strftime("%Y-%m-%d") - -def hash_password(plain: str, rounds: int = 12) -> str: - salt = bcrypt.gensalt(rounds=rounds) +# --------------------------- +# Utils Sécurité +# --------------------------- +def hash_password(plain: str) -> str: + if not bcrypt: + raise RuntimeError("Le module 'bcrypt' est requis (pip install bcrypt).") + salt = bcrypt.gensalt(rounds=12) return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8") -def user_exists(cur, username: str) -> bool: - cur.execute("SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s", (username,)) - (count,) = cur.fetchone() - return count > 0 -def find_users_by_email(cnx, email: str): - cur = cnx.cursor(dictionary=True) - try: - cur.execute( - "SELECT NomUtilisateur, Site FROM Utilisateurs WHERE email=%s ORDER BY NomUtilisateur", - (email,), - ) - return cur.fetchall() - finally: - cur.close() -def list_users(cnx, limit: int = 500, include_password=False): - fields = ["NomUtilisateur", "Nom_complet", "Site", "DateExpiration", "Telephone", "email"] - if include_password: - fields.append("MotDePasse") - sql = f"SELECT {', '.join(fields)} FROM Utilisateurs ORDER BY NomUtilisateur LIMIT %s" - - cur = cnx.cursor(dictionary=True) - try: - cur.execute(sql, (limit,)) - return cur.fetchall() - finally: - cur.close() - -def insert_user(cnx, username, full_name, site, password, expires, phone, email, role): - if not EMAIL_RE.match(email): - raise ValueError("Email invalide.") - phone_norm = normalize_phone(phone) - exp_sql = to_sql_date(expires) - pwd_hash = hash_password(password) +# --------------------------- +# Accès SQL - Helpers +# --------------------------- +def list_all_sites(cnx) -> List[Tuple[str, str]]: + """Retourne [(dsn, bdd), ...] depuis Connexions.""" cur = cnx.cursor() try: - if user_exists(cur, username): - raise RuntimeError("Nom d'utilisateur déjà existant.") - cur.execute( - """ - INSERT INTO Utilisateurs - (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email, role) - VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) - """, - (username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email, role), - ) - return pwd_hash + cur.execute("SELECT DSN, BDD FROM Connexions ORDER BY BDD") + return cur.fetchall() finally: cur.close() +def insert_user(cnx, username: str, full_name: str, pwd_hash: str, + expires: date, phone: str, email: str, role: str, + site_legacy: Optional[str] = None) -> None: + """ + Insère un utilisateur. + - 'Site' est obsolète : on le laisse à NULL/'' si la colonne existe encore. + """ + cur = cnx.cursor() + try: + # Vérifie si la colonne Site existe (schéma en transition) + cur.execute("SHOW COLUMNS FROM Utilisateurs LIKE 'Site'") + has_site_col = cur.fetchone() is not None + if has_site_col: + cur.execute(""" + INSERT INTO Utilisateurs + (NomUtilisateur, Nom_complet, MotDePasseHash, DateExpiration, + Telephone, email, role ) + VALUES (%s, %s, %s, %s, %s, %s, %s, %s) + """, (username, full_name, pwd_hash, expires, phone, email, role, site_legacy or "")) + else: + cur.execute(""" + INSERT INTO Utilisateurs + (NomUtilisateur, Nom_complet, MotDePasseHash, DateExpiration, + Telephone, email, role) + VALUES (%s, %s, %s, %s, %s, %s, %s) + """, (username, full_name, pwd_hash, expires, phone, email, role)) + finally: + cur.close() -def get_user_details(cnx, username: str): +def update_user_core(cnx, username: str, full_name: str, + expires: date, phone: str, email: str, role: str) -> None: + cur = cnx.cursor() + try: + cur.execute(""" + UPDATE Utilisateurs + SET Nom_complet=%s, + DateExpiration=%s, + Telephone=%s, + email=%s, + role=%s + WHERE NomUtilisateur=%s + """, (full_name, expires, phone, email, role, username)) + finally: + cur.close() + +def update_user_password(cnx, username: str, pwd_hash: str) -> None: + cur = cnx.cursor() + try: + cur.execute(""" + UPDATE Utilisateurs + SET MotDePasseHash=%s + WHERE NomUtilisateur=%s + """, (pwd_hash, username)) + finally: + cur.close() + +def list_users(cnx) -> List[Dict]: cur = cnx.cursor(dictionary=True) try: - cur.execute( - """ - SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email - FROM Utilisateurs - WHERE NomUtilisateur=%s - """, - (username,), - ) + cur.execute(""" + SELECT u.NomUtilisateur, u.Nom_complet, u.role, u.DateExpiration, + u.Telephone, u.email + FROM Utilisateurs u + ORDER BY u.NomUtilisateur + """) + return cur.fetchall() + finally: + cur.close() + +def get_user(cnx, username: str) -> Optional[Dict]: + cur = cnx.cursor(dictionary=True) + try: + cur.execute(""" + SELECT u.NomUtilisateur, u.Nom_complet, u.role, u.DateExpiration, + u.Telephone, u.email + FROM Utilisateurs u + WHERE u.NomUtilisateur=%s + """, (username,)) return cur.fetchone() finally: cur.close() -def update_field(cnx, username: str, field: str, value): - allowed = { - "Nom_complet": "Nom_complet", - "Site": "Site", - "DateExpiration": "DateExpiration", - "Telephone": "Telephone", - "email": "email", - } - if field not in allowed: - raise ValueError("Champ non autorisé.") - sql_field = allowed[field] - - if field == "email": - if not EMAIL_RE.match(str(value)): - raise ValueError("Email invalide.") - if field == "Telephone": - value = normalize_phone(str(value)) if value else None - if field == "DateExpiration": - value = to_sql_date(value) - +def delete_user(cnx, username: str) -> None: cur = cnx.cursor() try: - cur.execute( - f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s", - (value, username), - ) + # Nettoie d'abord les droits si contrainte FK + cur.execute("DELETE FROM DroitsSites WHERE NomUtilisateur=%s", (username,)) + cur.execute("DELETE FROM Utilisateurs WHERE NomUtilisateur=%s", (username,)) finally: cur.close() -def update_password(cnx, username: str, new_password: str): - pwd_hash = hash_password(new_password) +def get_user_sites(cnx, username: str) -> List[str]: + """Retourne la liste des DSN autorisés pour l'utilisateur.""" cur = cnx.cursor() try: - cur.execute( - "UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s", - (pwd_hash, username), - ) - return pwd_hash + cur.execute(""" + SELECT d.DSN + FROM DroitsSites d + WHERE d.NomUtilisateur=%s + ORDER BY d.DSN + """, (username,)) + return [r[0] for r in cur.fetchall()] finally: cur.close() -def send_mail(to_email: str, subject: str, body_text: str, body_html: str | None = None): - if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM): - raise RuntimeError("Configuration SMTP incomplète (SMTP_HOST/PORT/USER/PASS/FROM).") - if not to_email: - raise ValueError("Destinataire vide.") - - msg = EmailMessage() - msg["From"] = SMTP_FROM - msg["To"] = to_email - msg["Date"] = formatdate(localtime=True) - msg["Subject"] = subject - msg.set_content(body_text) - if body_html: - msg.add_alternative(body_html, subtype="html") - - # Choix du protocole en fonction du port - if SMTP_PORT == 465: - with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=20) as s: - s.login(SMTP_USER, SMTP_PASS) - s.send_message(msg) - else: - # 587 (ou autre) : STARTTLS - with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as s: - s.ehlo() - s.starttls() # passe en TLS - s.ehlo() - s.login(SMTP_USER, SMTP_PASS) - s.send_message(msg) - - def get_user_email_and_field(cnx, username: str, field: str): - cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"} - if field not in cols: - field = "Nom_complet" # fallback - with cnx.cursor(dictionary=True) as cur: - cur.execute( - f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s", - (username,) - ) - row = cur.fetchone() - return (row.get("email") if row else None, row.get("field_value") if row else None) - -def get_user_email_and_field(cnx, username: str, field: str): - cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"} - if field not in cols: - field = "Nom_complet" # fallback - - cur = cnx.cursor(dictionary=True) +def grant_sites_to_user(cnx, username: str, dsns: Optional[List[str]], grant_all: bool = False) -> None: + """ + Alimente DroitsSites pour un utilisateur. + - grant_all=True : accorde tous les DSN présents dans Connexions + - sinon : insère uniquement les DSN de la liste + """ + cur = cnx.cursor() try: - cur.execute( - f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s", - (username,), - ) - row = cur.fetchone() - return (row.get("email") if row else None, - row.get("field_value") if row else None) - finally: - cur.close() -# ----------------------- -# UI -# ----------------------- -st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="wide") -st.title("Gestion des utilisateurs") - -tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "Créer", "Modifier", "Sécurité"]) - -# ----------------------- -# ONGLET LISTE -# ----------------------- -with tab_list: - - st.subheader("Utilisateurs") - try: - cnx = pool.get_connection() - try: - include_pw = os.getenv("ADMIN_USER") == st.session_state.get("current_user", os.getenv("ADMIN_USER")) - rows = list_users(cnx, limit=1000, include_password=include_pw) - finally: - cnx.close() - - df = pd.DataFrame(rows) - if not df.empty: - from datetime import date - df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date - - # --- Ajout coloration et statut expiration --- - ALERTE_JOURS = 30 - exp = pd.to_datetime(df["DateExpiration"], errors="coerce") - today = pd.Timestamp(date.today()) - - df["Jours_restant"] = (exp - today).dt.days - - def statut_expiration(j): - if pd.isna(j): - return "Inconnu" - j = int(j) - if j < 0: - return "Périmé" - if j <= ALERTE_JOURS: - return f"Bientôt (≤{ALERTE_JOURS}j)" - return "OK" - - df["Statut"] = df["Jours_restant"].apply(statut_expiration) - - c1, c2, c3 = st.columns([1, 1, 3]) - with c1: - only_bad = st.checkbox("📌 Périmés / bientôt", value=False) - with c2: - tri_jours = st.checkbox("🔽 Trier par Jours restants", value=True) - - if only_bad: - df = df[df["Statut"].isin(["Périmé", f"Bientôt (≤{ALERTE_JOURS}j)"])] - - if tri_jours and "Jours_restant" in df.columns: - df = df.sort_values("Jours_restant", na_position="last") - - def colorize(row): - stt = row.get("Statut", "") - n = len(row) - if stt == "Périmé": - return ["background-color:#ffe6e6; color:#b00020;"] * n # rouge clair - if stt.startswith("Bientôt"): - return ["background-color:#fff7e6; color:#8a6d3b;"] * n # orange clair - return [""] * n - - styled = df.style.apply(colorize, axis=1) - - st.dataframe( - styled, - use_container_width=True, - hide_index=True, - height=700, # ⇐ affiche ~20 lignes sans scroller - column_config={ - "DateExpiration": st.column_config.DateColumn("DateExpiration", format="YYYY-MM-DD"), - "Jours_restant": st.column_config.NumberColumn("Jours restants", help="Négatif = périmé"), - }, - ) - st.caption(f"{len(df)} utilisateur(s) affiché(s)") + if grant_all: + cur.execute(""" + INSERT IGNORE INTO DroitsSites (NomUtilisateur, Site, DSN) + SELECT %s, c.BDD, c.DSN FROM Connexions c + """, (username,)) else: - st.info("Aucun utilisateur à afficher.") - except Exception as e: - st.warning(f"Impossible de lister les utilisateurs : {e}") + if not dsns: + return + cur.executemany(""" + INSERT IGNORE INTO DroitsSites (NomUtilisateur, Site, DSN) + SELECT %s, c.BDD, c.DSN FROM Connexions c WHERE c.DSN=%s + """, [(username, d) for d in dsns]) + finally: + cur.close() -# ----------------------- -# ONGLET CREER -# ----------------------- +def replace_user_sites(cnx, username: str, dsns: Optional[List[str]], grant_all: bool = False) -> None: + """Remplace complètement les droits de l'utilisateur par la nouvelle sélection.""" + cur = cnx.cursor() + try: + cur.execute("DELETE FROM DroitsSites WHERE NomUtilisateur=%s", (username,)) + finally: + cur.close() + grant_sites_to_user(cnx, username, dsns, grant_all) + + +# --------------------------- +# UI Streamlit +# --------------------------- +st.set_page_config(page_title="Gestion des utilisateurs", page_icon="👤", layout="centered") +st.title("👤 Gestion des utilisateurs") + +tabs = st.tabs(["Créer", "Modifier", "Lister / Supprimer"]) + +# --------- Onglet CRÉER ---------- +with tabs[0]: + st.subheader("Créer un utilisateur") -with tab_create: with st.form("create_user_form", clear_on_submit=False): - st.subheader("Nouveau compte") + c1, c2 = st.columns([1.2, 1.8]) + username = c1.text_input("NomUtilisateur", placeholder="ex: michel") + full_name = c2.text_input("Nom_complet", placeholder="Michel DUPONT") - c1, c2, c3 = st.columns([1.2, 1.5, 1]) - username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier") - full_name = c2.text_input("Nom_complet", placeholder="Clément JAQUIER") - site = c3.text_input("Site", placeholder="Roissy", value="Roissy") - role = st.selectbox("Rôle", ["Utilisateur", "Administrateur"], index=0) + role = st.selectbox("Rôle", ["utilisateur", "commercial", "administrateur", "admin"], index=0) + + # Sélection des sites + labels, label_to_dsn = [], {} + try: + cnx = pool.get_connection() + try: + rows = list_all_sites(cnx) # [(dsn,bdd)] + finally: + cnx.close() + labels = [f"{bdd} ({dsn})" for dsn, bdd in rows] + label_to_dsn = {lbl: dsn for (dsn, bdd), lbl in zip(rows, labels)} + except Exception as e: + st.warning(f"Chargement des sites impossible : {e}") + + colL, colR = st.columns([2, 1]) + with colL: + selected_labels = st.multiselect("Sites autorisés", labels, help="Choisis un ou plusieurs sites") + with colR: + grant_all_sites = st.checkbox("Tous les sites", value=False) + + # Si administrateur → tous les sites + if role.lower().startswith("admin"): + grant_all_sites = True c4, c5, c6 = st.columns([1.4, 1, 1]) email = c4.text_input("email", placeholder="prenom.nom@domaine.com") @@ -400,206 +273,191 @@ with tab_create: expires = c6.date_input("DateExpiration", value=date.today()) c7, c8 = st.columns(2) - password = c7.text_input("Mot de passe", type="password") - password2 = c8.text_input("Confirmer", type="password") - - col_cb1, col_cb2 = st.columns([1.2, 1]) - notify_welcome = col_cb2.checkbox("Envoyer un e-mail de bienvenue", value=True, - help="Enverra l'identifiant, le nom, le site et le mot de passe en clair") + password = c7.text_input("Mot de passe", type="password") + password2 = c8.text_input("Confirmer", type="password") submitted = st.form_submit_button("Créer l'utilisateur", use_container_width=True) if submitted: - if not username or not full_name or not site or not email: + if not username or not full_name or not email: st.error("Champs requis manquants.") elif not EMAIL_RE.match(email): st.error("Format d’e-mail invalide.") + elif not password: + st.error("Veuillez saisir un mot de passe.") elif password != password2: st.error("Les mots de passe ne correspondent pas.") else: try: cnx = pool.get_connection() try: - # 🔎 avertir si l'e-mail existe déjà (mais on n'empêche pas) - dup = find_users_by_email(cnx, email) - if dup: - liste = ", ".join(f"{u['NomUtilisateur']}@{u['Site']}" for u in dup) - st.info(f"Cet e-mail est déjà utilisé par : {liste}") + # Hash + pwd_hash = hash_password(password) - pwd_hash = insert_user( - cnx, username=username, full_name=full_name, site=site, - password=password, expires=expires, phone=phone, email=email, role=role + # Option 'site_legacy' : garde vide (schéma en transition) + insert_user( + cnx, + username=username, + full_name=full_name, + pwd_hash=pwd_hash, + expires=expires, + phone=phone, + email=email, + role=role, + site_legacy=None, ) + + # Droits + chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels] if selected_labels else [] + grant_sites_to_user(cnx, username=username, dsns=chosen_dsns, grant_all=grant_all_sites) finally: cnx.close() - st.success("Utilisateur créé avec succès ✅") - st.caption("Hash (MotDePasseHash) :") + st.success("Utilisateur créé ✅") + st.caption("Hash enregistré (MotDePasseHash) :") st.code(pwd_hash) - - # ✉️ Mail de bienvenue (optionnel) - if notify_welcome: - try: - subj = f"[Compte créé] Vos accès — {site}" - body_txt = ( - "Bonjour,\n\n" - "Votre compte a été créé.\n\n" - f"Nom d’utilisateur : {username}\n" - f"Nom complet : {full_name}\n" - f"Site : {site}\n" - f"Mot de passe : {password}\n" - f"Date d'expiration: {expires.strftime('%Y-%m-%d')}\n\n" - "Cordialement." - ) - body_html = f""" - -

Bonjour,

-

Votre compte a été créé.

- - - - - - -
Nom d’utilisateur{username}
Nom complet{full_name}
Site{site}
Mot de passe{password}
Date d'expiration{expires.strftime('%Y-%m-%d')}
-

Cordialement.

- - """ - send_mail(email, subj, body_txt, body_html) - st.success(f"✉️ E-mail de bienvenue envoyé à {email}") - except Exception as e_mail: - st.warning(f"E-mail non envoyé : {e_mail}") - except mysql.connector.Error as db_err: - if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR: - st.error("Identifiants MySQL invalides.") - else: - st.error(f"Erreur MySQL : {db_err}") + st.error(f"Erreur MySQL : {db_err}") except Exception as e: st.error(f"Erreur : {e}") -# ----------------------- -# ONGLET MODIFIER -# ----------------------- +# --------- Onglet MODIFIER ---------- +with tabs[1]: + st.subheader("Modifier un utilisateur") -with tab_edit: - st.subheader("Modifier un utilisateur existant") + usernames = [] try: cnx = pool.get_connection() try: - users = list_users(cnx, limit=1000) + users = list_users(cnx) finally: cnx.close() usernames = [u["NomUtilisateur"] for u in users] except Exception as e: - users = [] - usernames = [] - st.warning(f"Impossible de charger la liste des utilisateurs : {e}") + st.warning(f"Chargement des utilisateurs impossible : {e}") - top_left, top_right = st.columns([1.2, 2]) - with top_left: - sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur") - field = st.selectbox( - "Champ à modifier", - ["Nom_complet", "Site", "DateExpiration", "Telephone", "email"] - ) - - with top_right: - if field == "DateExpiration": - new_value = st.date_input("Nouvelle valeur", value=date.today()) - else: - new_value = st.text_input("Nouvelle valeur") - notify_user = st.checkbox("Notifier l’utilisateur par e-mail", value=True, help="Envoie un e-mail si coché") - update_btn = st.button("Mettre à jour", disabled=not sel_user, use_container_width=True) - if update_btn and sel_user: + target = st.selectbox("Choisir l'utilisateur", [""] + usernames, index=0) + if target: try: cnx = pool.get_connection() try: - # 1) Lire email + ancienne valeur - to_email, old_value = get_user_email_and_field(cnx, sel_user, field) - - # 2) Appliquer la mise à jour - update_field(cnx, sel_user, field, new_value) + u = get_user(cnx, target) + rows = list_all_sites(cnx) # [(dsn,bdd)] + current_dsns = set(get_user_sites(cnx, target)) finally: cnx.close() - st.success(f"✅ {field} mis à jour pour {sel_user}") + if not u: + st.error("Utilisateur introuvable.") + else: + labels = [f"{bdd} ({dsn})" for dsn, bdd in rows] + label_to_dsn = {lbl: dsn for (dsn, bdd), lbl in zip(rows, labels)} + # pré-sélection + preselected = [lbl for lbl in labels if label_to_dsn[lbl] in current_dsns] - # 3) Notification mail si demandé - if notify_user: - try: - nv = new_value.strftime("%Y-%m-%d") if hasattr(new_value, "strftime") else new_value - ov = old_value.strftime("%Y-%m-%d") if hasattr(old_value, "strftime") else old_value + with st.form("edit_user_form", clear_on_submit=False): + c1, c2 = st.columns([1.2, 1.8]) + full_name = c2.text_input("Nom_complet", value=u["Nom_complet"] or "") + role = c1.selectbox("Rôle", ["utilisateur", "commercial", "administrateur", "admin"], + index=0 if (u["role"] or "").lower() not in ["commercial", "administrateur", "admin"] + else ["utilisateur","commercial","administrateur","admin"].index((u["role"] or "").lower())) + c4, c5, c6 = st.columns([1.4, 1, 1]) + email = c4.text_input("email", value=u["email"] or "") + phone = c5.text_input("Téléphone", value=u["Telephone"] or "") + exp_val = u["DateExpiration"].date() if isinstance(u["DateExpiration"], datetime) else (u["DateExpiration"] or date.today()) + expires = c6.date_input("DateExpiration", value=exp_val) - if not to_email: - st.info("ℹ️ Aucune notification envoyée : adresse e-mail manquante.") + st.markdown("**Droits d’accès aux sites**") + colL, colR = st.columns([2, 1]) + with colL: + selected_labels = st.multiselect("Sites autorisés", labels, default=preselected) + with colR: + grant_all_sites = st.checkbox("Tous les sites", value=False) + + st.divider() + cpass1, cpass2 = st.columns(2) + new_pass = cpass1.text_input("Nouveau mot de passe (optionnel)", type="password") + new_pass2 = cpass2.text_input("Confirmer", type="password") + + cbtn1, cbtn2, cbtn3 = st.columns([1.2,1,1]) + save_btn = cbtn1.form_submit_button("Enregistrer") + reset_btn = cbtn2.form_submit_button("Réinitialiser mot de passe") + replace_btn = cbtn3.form_submit_button("Remplacer droits") + + if save_btn: + if not EMAIL_RE.match(email): + st.error("Format d’e-mail invalide.") else: - subject = f"[Compte] Mise à jour de votre information : {field}" - body_txt = ( - f"Bonjour,\n\n" - f"Votre information '{field}' vient d’être mise à jour par l’administrateur.\n" - f"Ancienne valeur : {ov}\n" - f"Nouvelle valeur : {nv}\n\n" - f"Si vous n’êtes pas à l’origine de cette demande, répondez à cet e-mail.\n" - f"Cordialement." - ) - body_html = f""" - -

Bonjour,

-

Votre information {field} vient d’être mise à jour par l’administrateur.

- -

Si vous n’êtes pas à l’origine de cette demande, répondez à cet e-mail.

-

Cordialement.

- - """ - send_mail(to_email, subject, body_txt, body_html) - st.success(f"✉️ Notification envoyée à {to_email}") - except Exception as e_mail: - st.warning(f"Notification non envoyée : {e_mail}") + try: + cnx = pool.get_connection() + try: + update_user_core(cnx, target, full_name, expires, phone, email, role) + # met à jour droits (ajout complémentaire, sans retirer) + chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels] + grant_sites_to_user(cnx, target, chosen_dsns, grant_all_sites) + finally: + cnx.close() + st.success("Modifications enregistrées ✅") + except Exception as e: + st.error(f"Erreur : {e}") + + if reset_btn: + if not new_pass: + st.error("Saisis un nouveau mot de passe.") + elif new_pass != new_pass2: + st.error("Les mots de passe ne correspondent pas.") + else: + try: + cnx = pool.get_connection() + try: + update_user_password(cnx, target, hash_password(new_pass)) + finally: + cnx.close() + st.success("Mot de passe réinitialisé ✅") + except Exception as e: + st.error(f"Erreur : {e}") + + if replace_btn: + try: + cnx = pool.get_connection() + try: + chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels] + replace_user_sites(cnx, target, chosen_dsns, grant_all_sites) + finally: + cnx.close() + st.success("Droits remplacés ✅") + except Exception as e: + st.error(f"Erreur : {e}") - except mysql.connector.Error as db_err: - st.error(f"Erreur MySQL : {db_err}") except Exception as e: - st.error(f"Erreur : {e}") -# ----------------------- -# ONGLET SECURITE -# ----------------------- + st.error(f"Erreur de chargement : {e}") -with tab_security: - st.subheader("Réinitialiser le mot de passe (bcrypt)") +# --------- Onglet LISTE / SUPPR ---------- +with tabs[2]: + st.subheader("Liste des utilisateurs") try: - if 'user_cache_for_pw' not in st.session_state: + cnx = pool.get_connection() + try: + data = list_users(cnx) + finally: + cnx.close() + if not data: + st.info("Aucun utilisateur.") + else: + st.dataframe(data, use_container_width=True, hide_index=True) + except Exception as e: + st.error(f"Erreur : {e}") + + st.divider() + st.subheader("Supprimer un utilisateur") + del_user = st.text_input("NomUtilisateur à supprimer", value="") + if st.button("Supprimer", type="primary", disabled=(not del_user)): + try: cnx = pool.get_connection() try: - st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)] + delete_user(cnx, del_user) finally: cnx.close() - pw_user_list = st.session_state.user_cache_for_pw - except Exception: - pw_user_list = [] - - user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur") - colp1, colp2 = st.columns(2) - new_pw = colp1.text_input("Nouveau mot de passe", type="password") - new_pw2 = colp2.text_input("Confirmer", type="password") - - if st.button("Mettre à jour le mot de passe", disabled=not user_pw, use_container_width=True): - if not new_pw: - st.error("Mot de passe vide.") - elif new_pw != new_pw2: - st.error("Les mots de passe ne correspondent pas.") - else: - try: - cnx = pool.get_connection() - try: - h = update_password(cnx, user_pw, new_pw) - finally: - cnx.close() - st.success("Mot de passe mis à jour ✅") - st.caption("Hash (MotDePasseHash) :") - st.code(h) - except Exception as e: - st.error(f"Erreur : {e}") \ No newline at end of file + st.success(f"Utilisateur '{del_user}' supprimé ✅") + except Exception as e: + st.error(f"Erreur : {e}") diff --git a/requirements.txt b/requirements.txt index 6487b8c8057b0a9b6f2fee5381629799e23b0d28..c240005bda9f0fcd5944c91fb98c419f1fe43551 100644 GIT binary patch delta 32 icmbQjw1#Pe2%|&+Ln1>F5a%*vGGsI4Gw?ESfiVDhas{LS delta 7 OcmZ3(G=*t{2qORq>;eh^