Répartion du fichier users

This commit is contained in:
2025-11-19 00:39:26 +01:00
parent 0c1489ea6f
commit 788785d982
6 changed files with 82 additions and 594 deletions

View File

@@ -1,225 +1,67 @@
# Logs.py
# Visualiseur de logs SSH ultra-simplifié (Streamlit)
# - Lecture de la config via .env uniquement
# - Connexion SSH via clé (chemin) ou mot de passe
# - Liste des fichiers, tail dernier N, recherche, téléchargement
import os
import time
import stat
import paramiko
import streamlit as st
from dotenv import load_dotenv
from typing import Optional, Tuple
# ================
# CONFIG .env
# ================
load_dotenv() # cherche automatiquement un .env dans le dossier courant
# Dossier des logs
LOG_DIR = "/home/debian/Gestion_sondes/Logs"
VPS_HOST = os.getenv("SSH_HOST", "").strip()
VPS_PORT = int(os.getenv("SSH_PORT", 22))
VPS_USER = os.getenv("SSH_USER", "").strip()
VPS_PASSWORD = os.getenv("SSH_PASSWORD", "")
VPS_KEY_PATH = os.getenv("SSH_KEY_PATH", "").strip()
VPS_LOG_DIR = os.getenv("SSH_LOG_DIR", "/home/debian/Gestion_sondes/Logs").rstrip("/")
st.title("Gestion des logs - Gestion_sondes")
# ================
# UTILITAIRES
# ================
# --- Connexion strictement au MOT DE PASSE ---
@st.cache_resource(show_spinner=False)
def get_ssh_client_password_only() -> paramiko.SSHClient:
host = os.getenv("SSH_HOST", "").strip()
user = os.getenv("SSH_USER", "").strip()
port = int(os.getenv("SSH_PORT", 22))
pwd = os.getenv("SSH_PASSWORD", "")
if not host or not user or not pwd:
raise RuntimeError("SSH_HOST / SSH_USER / SSH_PASSWORD manquant(s) dans .env")
client = paramiko.SSHClient()
client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
# IMPORTANT : on force lusage du mot de passe, aucune clé nest chargée
try:
client.connect(
hostname=host, port=port, username=user,
password=pwd, timeout=10, allow_agent=False, look_for_keys=False
)
# vérifie que le transport est bien up
t = client.get_transport()
if not t or not t.is_active():
raise RuntimeError("Transport SSH inactif après connexion.")
return client
except Exception as e:
raise RuntimeError(f"Échec connexion SSH par mot de passe : {e}") from e
# --- Ouverture SFTP sûre ---
try:
client = get_ssh_client_password_only()
except Exception as e:
st.error(str(e))
# --- Vérification du dossier ---
if not os.path.isdir(LOG_DIR):
st.error(f"Le dossier {LOG_DIR} n'existe pas.")
st.stop()
try:
sftp = client.open_sftp()
except Exception as e:
st.error(f"Erreur ouverture SFTP : {e}")
try: client.close()
except: pass
st.stop()
# Liste des fichiers
files = sorted([f for f in os.listdir(LOG_DIR) if os.path.isfile(os.path.join(LOG_DIR, f))])
def list_remote_files(sftp: paramiko.SFTPClient, directory: str) -> list[Tuple[str, int, float]]:
"""
Retourne [(nom, taille_bytes, mtime_epoch), ...] triés par mtime desc.
"""
items = []
try:
for entry in sftp.listdir_attr(directory):
mode = entry.st_mode
if stat.S_ISREG(mode):
items.append((entry.filename, entry.st_size, entry.st_mtime))
except FileNotFoundError:
st.error(f"❌ Dossier introuvable côté serveur : {directory}")
return []
except Exception as e:
st.error(f"❌ Impossible de lister {directory} : {e}")
return []
items.sort(key=lambda t: t[2], reverse=True)
return items
def read_remote_text_tail(
client: paramiko.SSHClient, path: str, lines: int = 500, grep: str | None = None
) -> str:
"""
Lit côté serveur les N dernières lignes (tail -n),
compatible .gz via zcat si nécessaire, avec grep optionnel (insensible à la casse).
"""
safe_path = path.replace('"', '\\"')
if path.endswith(".gz"):
base_cmd = f'zcat "{safe_path}"'
else:
base_cmd = f'tail -n {max(1, lines)} "{safe_path}" && exit 0 || head -n {max(1, lines)} "{safe_path}"'
if grep:
# -i insensible à la casse ; on passe par grep après la commande de base
grep_safe = grep.replace('"', '\\"') # échappe juste les guillemets
base_cmd = f'{base_cmd} | grep -i "{grep_safe}" || true'
stdin, stdout, stderr = client.exec_command(base_cmd, timeout=20)
out = stdout.read()
err = stderr.read()
text = (out or b"").decode("utf-8", errors="replace")
# Si zcat sans grep et sans tail (gros fichiers), on limite côté client pour éviter d'assommer Streamlit
if path.endswith(".gz") and not grep:
lines_list = text.splitlines()
text = "\n".join(lines_list[-max(1, lines):])
if err and not text:
# Afficher l'erreur seulement si rien en sortie
text = (b"[stderr] " + err).decode("utf-8", errors="replace")
return text
def download_remote_file(sftp: paramiko.SFTPClient, path: str) -> bytes:
"""Télécharge le fichier distant (binaire) en mémoire."""
with sftp.file(path, "rb") as f:
return f.read()
# ================
# UI STREAMLIT
# ================
st.set_page_config(page_title="Visualiseur de Logs", layout="wide")
st.title("📜 Visualiseur de logs (SSH)")
with st.expander("Configuration (lecture seule)", expanded=False):
st.code(
f"HOST={VPS_HOST}\nUSER={VPS_USER}\nPORT={VPS_PORT}\n"
f"KEY_PATH={VPS_KEY_PATH or '-'}\nLOG_DIR={VPS_LOG_DIR}",
language="bash"
)
client = _get_ssh_client()
if client is None:
st.stop()
try:
sftp = client.open_sftp()
except Exception as e:
st.error(f"Erreur ouverture SFTP : {e}")
try:
client.close()
except Exception:
pass
st.stop()
with st.sidebar:
st.header("🔎 Options")
refresh = st.button("🔄 Rafraîchir la liste")
default_tail = st.number_input("Dernières lignes (tail)", min_value=50, max_value=100_000, value=500, step=50)
search_term = st.text_input("Recherche (grep -i)", value="", placeholder="ex: ERROR | Timeout | sonde")
st.caption("Astuce : la recherche applique un grep insensible à la casse sur le résultat.")
# Rafraîchissement simple : on rejoue listdir si bouton cliqué
if refresh:
time.sleep(0.2)
files = list_remote_files(sftp, VPS_LOG_DIR)
if not files:
sftp.close()
client.close()
st.warning("Aucun fichier de log trouvé.")
st.stop()
# Tableau simple (nom, taille, date)
col1, col2 = st.columns([2, 1])
with col1:
names = [f[0] for f in files]
choice = st.selectbox("Fichiers disponibles (triés par date desc.)", names, index=0)
with col2:
# Infos du fichier choisi
chosen = next((f for f in files if f[0] == choice), None)
if chosen:
size_kb = chosen[1] / 1024
mtime_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(chosen[2]))
st.metric("Taille (KB)", f"{size_kb:,.0f}".replace(",", " "))
st.metric("Modifié le", mtime_str)
# Sélection du fichier
selected_file = st.selectbox("Choisissez un fichier log :", files)
full_path = f"{VPS_LOG_DIR}/{choice}"
file_path = os.path.join(LOG_DIR, selected_file)
# Actions
place1, place2, place3 = st.columns([1, 1, 6])
with place1:
do_show = st.button("👁️ Afficher")
with place2:
do_dl = st.button("⬇️ Télécharger")
st.subheader(f"Contenu de : {selected_file}")
# Affichage
if do_show:
with st.spinner("Lecture du fichier distant..."):
text = read_remote_text_tail(
client,
full_path,
lines=int(default_tail),
grep=(search_term or None)
)
st.text_area(f"Contenu : {choice}", text, height=600)
# Téléchargement
if do_dl:
# Lecture du contenu
def read_file():
try:
data = download_remote_file(sftp, full_path)
st.download_button(
label=f"Télécharger {choice}",
data=data,
file_name=choice,
mime="application/octet-stream"
)
except Exception as e:
st.error(f"❌ Échec du téléchargement : {e}")
with open(file_path, "r") as f:
return f.read()
except Exception as err:
return f"Erreur lors de la lecture : {err}"
# Fermeture propre
sftp.close()
client.close()
content = read_file()
st.text_area("", content, height=400)
# --- Boutons daction ---
col1, col2, col3 = st.columns(3)
# Rafraîchir
with col1:
if st.button("🔄 Rafraîchir"):
st.rerun()
# Vider
with col2:
if st.button("🧹 Vider le fichier"):
try:
with open(file_path, "w") as f:
f.write("")
st.success(f"Le fichier '{selected_file}' a été vidé.")
st.rerun()
except Exception as e:
st.error(f"Erreur : {e}")
# Supprimer
with col3:
if st.button("🗑️ Supprimer le fichier"):
try:
os.remove(file_path)
st.success(f"Le fichier '{selected_file}' a été supprimé.")
st.rerun()
except Exception as e:
st.error(f"Erreur : {e}")

View File

@@ -1,371 +0,0 @@
# tracker.py
# -------------------------------------------------------------
# Streamlit — Gestion de la table MySQL Sondes.tracker (avec address_hyphen)
# -------------------------------------------------------------
# Schéma attendu :
# id (PK), address (ROM {0x..}), address_hyphen (28-..-..-..-..-..-..-..),
# lieu, repere, mise_en_service (DATE), res_bits, date (timestamp)
# Authentification intégrée via .env (AUTH_USERS JSON)
# -------------------------------------------------------------
import re
import time
import json
import hmac
from typing import Optional
from datetime import date
import pandas as pd
import streamlit as st
import mysql.connector as mysql
from contextlib import contextmanager
from dotenv import load_dotenv
import os
from pathlib import Path
# 1) .env (avec recherche automatique depuis le fichier courant vers la racine)
try:
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv(usecwd=True), override=False)
except Exception:
pass # si python-dotenv n'est pas installé, on passe (Streamlit peut fournir st.secrets)
# 2) Streamlit secrets (optionnel si tu lutilises)
try:
import streamlit as st
st_secrets_mysql = st.secrets.get("mysql", {})
except Exception:
st_secrets_mysql = {}
def get_db_cfg():
# Priorité à st.secrets si dispo, sinon variables denvironnement (.env)
cfg = {
"host": st_secrets_mysql.get("host") or os.getenv("DB_HOST"),
"user": st_secrets_mysql.get("user") or os.getenv("DB_USER"),
"password": st_secrets_mysql.get("password") or os.getenv("DB_PASSWORD"),
"database": st_secrets_mysql.get("database") or os.getenv("DB_NAME") or os.getenv("DB_DATABASE"),
"port": int(st_secrets_mysql.get("port") or os.getenv("DB_PORT") or 3306),
"auth_plugin": os.getenv("DB_AUTH_PLUGIN") or None, # optionnel
}
# Nettoyage: retire les clés None
return {k: v for k, v in cfg.items() if v not in (None, "")}
# ==========================
# Configuration / Constantes
# ==========================
load_dotenv() # lit .env si présent
TABLE_NAME = "tracker"
COL_ID = "id"
COL_ADDRESS = "address" # format ROM : {0x28,0xFF,...}
COL_ADDR_HYPHEN = "address_hyphen" # format hyphen : 28-xx-xx-xx-xx-xx-xx-xx
COL_LIEU = "lieu"
COL_REPERE = "repere"
COL_MES = "mise_en_service" # DATE
COL_RESBITS = "res_bits"
COL_DATE = "date"
# Configuration BDD (standardisée sur les variables d'env MYSQL_*)
DB_CFG = dict(
host=os.getenv("DB_HOST"),
user=os.getenv("DB_USER"),
password=os.getenv("DB_PASS"),
database=os.getenv("DB_NAME"),
port=int(os.getenv("MYSQL_PORT", "3306")),
)
# Regex d'une ROM code DS18B20 au format {0x28,0xFF,0xAA,0xBB,0xCC,0xDD,0xEE,0xCRC}
ROM_REGEX = re.compile(r"^{(?:0x[0-9A-Fa-f]{2},){7}0x[0-9A-Fa-f]{2}}$")
# Adresse hyphen : 8 octets hexa séparés par des tirets
HYPHEN_REGEX = re.compile(r"^[0-9A-Fa-f]{2}(?:-[0-9A-Fa-f]{2}){7}$")
# Mapping résolution DS18B20 (bits -> infos)
RES_MAP = {
9: {"precision": 0.5, "tconv_ms": 94},
10: {"precision": 0.25, "tconv_ms": 188},
11: {"precision": 0.125, "tconv_ms": 375},
12: {"precision": 0.0625,"tconv_ms": 750},
}
# ==================
# Authentification via .env (AUTH_USERS)
# ==================
AUTH_USERS_RAW = os.getenv("AUTH_USERS", "[]")
def _load_users() -> dict:
try:
data = json.loads(AUTH_USERS_RAW)
return {str(d.get("user", "")).strip(): str(d.get("pass", "")) for d in data if d.get("user") and d.get("pass")}
except Exception:
return {}
USERS = _load_users()
def _constant_time_equals(a: str, b: str) -> bool:
return hmac.compare_digest(a.encode("utf-8"), b.encode("utf-8"))
def verify_credentials(username: str, password: str) -> bool:
if not username or not password:
return False
expected = USERS.get(username.strip())
if expected is None:
return False
return _constant_time_equals(password, expected)
def require_login() -> Optional[str]:
if st.session_state.get("auth_ok") and st.session_state.get("auth_user"):
return st.session_state.get("auth_user")
st.markdown("<h2 style='text-align:center;'>🔒 Tracker</h2>", unsafe_allow_html=True)
_, col2, _ = st.columns([1, 2, 1])
with col2:
with st.form("login_form", clear_on_submit=False):
username = st.text_input("Utilisateur")
password = st.text_input("Mot de passe", type="password")
ok = st.form_submit_button("Se connecter")
if ok:
if verify_credentials(username, password):
st.session_state["auth_ok"] = True
st.session_state["auth_user"] = username.strip()
st.success("Connexion réussie.")
time.sleep(0.3)
st.rerun()
else:
st.error("Identifiants invalides.")
st.stop()
# ==================
# Accès Base de Données
# ==================
@contextmanager
def get_conn():
cfg = get_db_cfg()
# Astuce debug si besoin :
# print({k: ('***' if k=='password' else v) for k,v in cfg.items()})
conn = mysql.connect(**cfg)
try:
yield conn
finally:
conn.close()
# -----------------
# Utilitaires
# -----------------
def rom_help() -> str:
return (
"Format ROM attendu : `{0x28,0xFF,0xAA,0xBB,0xCC,0xDD,0xEE,0x12}` "
"(8 octets en hex). Le premier octet est souvent 0x28 pour DS18B20."
)
def is_valid_rom(address: str) -> bool:
return bool(ROM_REGEX.match(str(address).strip()))
def is_valid_hyphen(address_h: str) -> bool:
return bool(HYPHEN_REGEX.match(str(address_h).strip()))
def rom_to_hyphen(rom: str) -> str:
hexes = re.findall(r"0x([0-9A-Fa-f]{2})", str(rom))
if len(hexes) != 8:
return ""
return "-".join(h.lower() for h in hexes)
def hyphen_to_rom(h: str) -> str:
parts = str(h).strip().split("-")
if len(parts) != 8 or not all(re.fullmatch(r"[0-9A-Fa-f]{2}", p) for p in parts):
return ""
return "{" + ",".join(f"0x{p.upper()}" for p in parts) + "}"
def res_label(bits: int) -> str:
info = RES_MAP.get(bits)
if not info:
return f"{bits} bits (inconnu)"
return f"{bits} bits (±{info['precision']}°C, {info['tconv_ms']} ms)"
# -----------------
# Fonctions SQL
# -----------------
def fetch_trackers(where_lieu: str | None = None) -> pd.DataFrame:
query = (
f"SELECT {COL_ID}, {COL_ADDRESS}, {COL_ADDR_HYPHEN}, {COL_LIEU}, "
f"{COL_REPERE}, {COL_MES}, {COL_RESBITS}, {COL_DATE} "
f"FROM {TABLE_NAME}"
)
params = []
if where_lieu:
query += f" WHERE {COL_LIEU} = %s"
params.append(where_lieu)
query += f" ORDER BY {COL_LIEU}, {COL_REPERE}, {COL_ADDR_HYPHEN}, {COL_ADDRESS}"
with get_conn() as conn:
df = pd.read_sql(query, conn, params=params)
return df
def insert_tracker(address: str, lieu: str, res_bits: int,
repere: str | None = None, mise_en_service: date | None = None,
address_hyphen: str | None = None) -> int:
addr_rom = (address or "").strip() if address else ""
addr_hyp = (address_hyphen or "").strip() if address_hyphen else ""
if addr_rom and not addr_hyp:
addr_hyp = rom_to_hyphen(addr_rom)
if addr_hyp and not addr_rom:
addr_rom = hyphen_to_rom(addr_hyp)
if not is_valid_rom(addr_rom) or not is_valid_hyphen(addr_hyp):
raise ValueError("Adresse invalide (ROM ou hyphen).")
sql = f"""
INSERT INTO {TABLE_NAME}
({COL_ADDRESS}, {COL_ADDR_HYPHEN}, {COL_LIEU}, {COL_REPERE}, {COL_MES}, {COL_RESBITS})
VALUES (%s, %s, %s, %s, %s, %s)
"""
with get_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (
addr_rom,
addr_hyp.lower(),
lieu,
(repere.strip() if repere and str(repere).strip() else None),
mise_en_service,
res_bits,
))
conn.commit()
return cur.lastrowid
def update_tracker(row_id: int, address: str, lieu: str, res_bits: int,
repere: str | None, mise_en_service: date | None,
address_hyphen: str | None = None) -> None:
addr_rom = (address or "").strip()
addr_hyp = (address_hyphen or "").strip() if address_hyphen else ""
if addr_rom and not addr_hyp:
addr_hyp = rom_to_hyphen(addr_rom)
if addr_hyp and not addr_rom:
addr_rom = hyphen_to_rom(addr_hyp)
if not is_valid_rom(addr_rom) or not is_valid_hyphen(addr_hyp):
raise ValueError("Adresse invalide (ROM ou hyphen).")
sql = f"""
UPDATE {TABLE_NAME}
SET {COL_ADDRESS}=%s, {COL_ADDR_HYPHEN}=%s, {COL_LIEU}=%s, {COL_REPERE}=%s, {COL_MES}=%s, {COL_RESBITS}=%s
WHERE {COL_ID}=%s
"""
with get_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (
addr_rom,
addr_hyp.lower(),
lieu,
(repere.strip() if repere and str(repere).strip() else None),
mise_en_service,
res_bits,
row_id,
))
conn.commit()
def delete_tracker(row_id: int) -> None:
sql = f"DELETE FROM {TABLE_NAME} WHERE {COL_ID}=%s"
with get_conn() as conn:
cur = conn.cursor()
cur.execute(sql, (row_id,))
conn.commit()
# ==================
# Application Streamlit
# ==================
st.set_page_config(page_title="Tracker", page_icon="🌡️", layout="wide")
user = require_login()
st.title("🌡️ Gestion du parc sondes (stock ou installées)")
with st.expander("Paramètres de connexion (lecture seule)"):
st.write({k: ("***" if k in {"password"} else v) for k, v in DB_CFG.items()})
st.caption("Configurez ces valeurs via le fichier .env")
st.sidebar.header("Filtres & Actions")
st.sidebar.caption(f"Connecté en tant que **{user}**")
_all = fetch_trackers()
lieux = sorted([x for x in _all[COL_LIEU].dropna().unique()]) if not _all.empty else []
lieu_selected = st.sidebar.selectbox("Filtrer par lieu", options=["(Tous)"] + lieux, index=0)
# Formulaire d'ajout
st.sidebar.subheader("Ajouter une sonde")
with st.sidebar.form("add_form", clear_on_submit=True):
new_address = st.text_input("Adresse ROM", placeholder="{0x28,0xFF,...}", help=rom_help())
preview_h = rom_to_hyphen(new_address) if new_address else ""
st.text_input("Adresse hyphen (auto)", value=preview_h, disabled=True)
new_lieu = st.text_input("Lieu d'installation")
new_repere = st.text_input("Repère (optionnel)")
new_mes = st.date_input("Mise en service (optionnel)", value=None, format="YYYY-MM-DD")
new_res = st.selectbox("Résolution (bits)", options=[9,10,11,12])
submitted = st.form_submit_button("Ajouter")
if submitted:
if not is_valid_rom(new_address):
st.warning("Adresse ROM invalide.")
elif not new_lieu.strip():
st.warning("Lieu requis.")
else:
rid = insert_tracker(
new_address.strip(),
new_lieu.strip(),
int(new_res),
new_repere,
new_mes if isinstance(new_mes, date) else None,
address_hyphen=rom_to_hyphen(new_address.strip()),
)
st.success(f"Sonde ajoutée (id={rid}).")
time.sleep(0.6)
st.rerun()
st.sidebar.divider()
if st.sidebar.button("Se déconnecter"):
for _k in list(st.session_state.keys()):
st.session_state.pop(_k, None)
st.success("Déconnecté.")
time.sleep(0.3)
st.rerun()
# Vue principale
if lieu_selected != "(Tous)":
df = fetch_trackers(where_lieu=lieu_selected)
else:
df = _all.copy()
if df.empty:
st.info("Aucune sonde enregistrée.")
else:
df["resolution"] = df[COL_RESBITS].apply(res_label)
st.subheader("Enregistrements")
edited = st.data_editor(
df[[COL_ID, COL_ADDRESS, COL_ADDR_HYPHEN, COL_LIEU, COL_REPERE, COL_MES, COL_RESBITS, "resolution", COL_DATE]],
hide_index=True,
use_container_width=True,
num_rows="dynamic",
)
removed_ids = set(df[COL_ID]) - set(edited[COL_ID])
to_update = []
for _, row in edited.iterrows():
orig = df.loc[df[COL_ID] == row[COL_ID]].iloc[0]
changed = (
(row[COL_ADDRESS] != orig[COL_ADDRESS]) or
(row[COL_ADDR_HYPHEN] != orig[COL_ADDR_HYPHEN]) or
(row[COL_LIEU] != orig[COL_LIEU]) or
(row.get(COL_REPERE) or "") != (orig.get(COL_REPERE) or "") or
(str(row.get(COL_MES) or "")[:10] != str(orig.get(COL_MES) or "")[:10]) or
(int(row[COL_RESBITS]) != int(orig[COL_RESBITS]))
)
if changed:
mes_val = row.get(COL_MES)
if pd.isna(mes_val):
mes_val = None
elif hasattr(mes_val, "date"):
mes_val = mes_val.date()
to_update.append((int(row[COL_ID]), str(row[COL_ADDRESS]), str(row[COL_ADDR_HYPHEN] or ""),
str(row[COL_LIEU]), int(row[COL_RESBITS]),
str(row.get(COL_REPERE) or None), mes_val))
col1, col2 = st.columns([1,1])
if st.button("Enregistrer les modifications"):
for rid, addr_rom, addr_hyp, lieu, rbits, repere, mes in to_update:
update_tracker(rid, addr_rom, lieu, rbits, repere, mes, address_hyphen=addr_hyp)
for rid in removed_ids:
delete_tracker(rid)
st.success("Modifications enregistrées ✔️")
time.sleep(0.6)
st.rerun()
if st.button("Annuler"):
st.rerun()
st.divider()
st.caption("Astuce : collez une adresse ROM {0x..,...} → la version hyphen est générée automatiquement.")

View File

@@ -85,25 +85,33 @@ logout_button()
def get_pool():
load_dotenv()
host = os.getenv("DB_HOST")
port = int(os.getenv("MYSQL_PORT", "3306"))
user = os.getenv("DB_USER")
pwd = os.getenv("DB_PASSWORD")
db = os.getenv("DB_NAME")
port = int(os.getenv("MYSQL_PORT", "3306")) # ✅ valeur par défaut 3306
user = os.getenv("DB_USER2")
pwd = os.getenv("DB_PASSWORD2")
db = os.getenv("DB_NAME2")
# ✅ contrôle des variables indispensables
missing = [k for k, v in {
"DB_HOST": host, "MYSQL_PORT": port, "DB_USER": user, "DB_PASSWORD": pwd, "DB_NAME": db
"DB_HOST": host,
"DB_USER2": user,
"DB_PASSWORD2": pwd,
"DB_NAME2": db,
}.items() if v in (None, "")]
if missing:
raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}")
return pooling.MySQLConnectionPool(
pool_name="users_pool",
pool_size=5,
pool_reset_session=True,
host=host, port=port, user=user, password=pwd, database=db,
autocommit=True
host=host,
port=port,
user=user,
password=pwd,
database=db,
autocommit=True,
)
pool = get_pool()
# -----------------------
# Helpers SQL + validations
# -----------------------
@@ -300,7 +308,11 @@ def get_user_email_and_field(cnx, username: str, field: str):
# -----------------------
st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="wide")
st.title("Gestion des utilisateurs")
try:
pool = get_pool()
except Exception as e:
st.error(f"❌ Impossible d'initialiser le pool MySQL : {e}")
st.stop()
tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "Créer", "Modifier", "Sécurité"])
# -----------------------