Remise en état app

This commit is contained in:
2025-11-13 01:21:03 +01:00
parent 400ca9d4d1
commit 1fbe7494d1
3 changed files with 516 additions and 390 deletions

16
.env
View File

@@ -2,19 +2,9 @@
DB_HOST=162.19.78.131 DB_HOST=162.19.78.131
DB_USER=excel DB_USER=excel
DB_PASSWORD='%n#%3Lay1MPa$%kR^5@' DB_PASSWORD='%n#%3Lay1MPa$%kR^5@'
DB_NAME=Sondes DB_NAME=Acces
DB_PORT=3306 ADMIN_USER=Michel
AUTH_USERS=[{"user":"Michel","pass":"210462"}] ADMIN_PASS_HASH='$2b$12$Dgv7jNLJuR.3hQminSVE9OP6hCSmW4nISArR3HF5LTPGFK0Zw29N2'
# === Connexion SSH pour visualiseur_logs.py ===
SSH_HOST=162.19.78.131
SSH_PORT=22
SSH_USER=debian
SSH_KEY_PATH=/home/debian/.ssh/id_ed25519
SSH_KEY_PASSPHRASE='gaby'
SSH_LOG_DIR=/home/debian/Gestion_sondes/Logs
SSH_PASSWORD='lpZwixbBUFtGY'
SSH_FORCE_PASSWORD=1
# connexion OVH pour les SMS # connexion OVH pour les SMS
OVH_APP_KEY=f725d07b2f98a195 OVH_APP_KEY=f725d07b2f98a195

View File

@@ -1,17 +1,11 @@
import os, mysql.connector import mysql.connector, os
from dotenv import load_dotenv cnx = mysql.connector.connect(
from pathlib import Path
# charge le .env **avec chemin absolu**
load_dotenv(Path(__file__).resolve().parent.joinpath(".env"))
conn = mysql.connector.connect(
host=os.getenv("DB_HOST"), host=os.getenv("DB_HOST"),
port=int(os.getenv("DB_PORT", "3306")),
user=os.getenv("DB_USER"), user=os.getenv("DB_USER"),
password=os.getenv("DB_PASSWORD"), password=os.getenv("DB_PASS"),
database=os.getenv("DB_NAME"), database=os.getenv("DB_NAME"),
connection_timeout=5,
) )
conn.ping(reconnect=True, attempts=3, delay=1) print("OK, connecté !")
print("CONNECTED" if conn.is_connected() else "KO") cnx.close()
conn.close()

View File

@@ -1,271 +1,398 @@
# users.py # Streamlit app
# ------------------------------------------------------------
# Gestion des utilisateurs + droits d'accès par site (DroitsSites)
# - Streamlit UI
# - MySQL (mysql-connector-python)
# - Hash de mot de passe avec bcrypt
#
# .env attendu (ou variables d'env) :
# DB_HOST=...
# DB_PORT=3306
# DB_USER=...
# DB_PASS=...
# DB_NAME=Acces # <- base "annuaire" qui contient Utilisateurs/DroitsSites/Connexions
# ------------------------------------------------------------
import os import os
import re
from datetime import date, datetime from datetime import date, datetime
from typing import List, Tuple, Dict, Optional import re
import bcrypt
import streamlit as st
import mysql.connector import mysql.connector
from mysql.connector.pooling import MySQLConnectionPool from mysql.connector import pooling, errorcode
import pandas as pd
import streamlit as st
from dotenv import load_dotenv
import smtplib
from email.message import EmailMessage
from email.utils import formatdate
try: load_dotenv()
from dotenv import load_dotenv # facultatif mais pratique SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net")
SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # OVH: 465 (SSL) ou 587 (STARTTLS)
SMTP_USER = os.getenv("SMTP_USER")
SMTP_PASS = os.getenv("SMTP_PASS")
SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER)
# -----------------------
# Auth minimale
# -----------------------
def require_login():
st.markdown(
"<style>div.block-container{padding-top:2rem;}</style>",
unsafe_allow_html=True,
)
load_dotenv() load_dotenv()
except Exception: admin_user = os.getenv("ADMIN_USER")
pass admin_hash = os.getenv("ADMIN_PASS_HASH")
if not admin_user or not admin_hash:
st.error("Variables ADMIN_USER et/ou ADMIN_PASS_HASH manquantes dans .env")
st.stop()
try: if "auth_ok" not in st.session_state:
import bcrypt # pip install bcrypt st.session_state.auth_ok = False
except Exception:
bcrypt = None
# --------------------------- if not st.session_state.auth_ok:
# Config / Connexion MySQL col1, col2, col3 = st.columns([1, 2, 1])
# --------------------------- with col2:
DB_HOST = os.getenv("DB_HOST", "localhost") st.header("🔐 Accès restreint")
DB_PORT = int(os.getenv("DB_PORT", "3306")) u = st.text_input("Utilisateur")
DB_USER = os.getenv("DB_USER", "root") p = st.text_input("Mot de passe", type="password")
DB_PASS = os.getenv("DB_PASS", "") if st.button("Se connecter", use_container_width=True):
DB_NAME = os.getenv("DB_NAME", "Acces") try:
ok = (u == admin_user) and bcrypt.checkpw(p.encode(), admin_hash.encode())
except Exception:
ok = False
if ok:
st.session_state.auth_ok = True
st.rerun()
else:
st.error("Identifiants invalides.")
st.stop()
def logout_button():
st.markdown(
"""
<style>
div[data-testid="stToolbar"] {visibility: hidden;}
div[data-testid="stDecoration"] {display: none;}
div[data-testid="stStatusWidget"] {display: none;}
div.block-container {padding-top: 1rem;}
.logout-container {text-align:right; margin-top:-3rem;}
</style>
""",
unsafe_allow_html=True
)
st.markdown('<div class="logout-container">', unsafe_allow_html=True)
if st.button("🚪 Quitter", key="logout", use_container_width=False):
st.session_state.clear()
st.success("Déconnexion effectuée.")
st.rerun()
st.markdown('</div>', unsafe_allow_html=True)
pool: Optional[MySQLConnectionPool] = None
def init_pool():
global pool
if pool is None:
pool = MySQLConnectionPool(
pool_name="users_pool",
pool_size=5,
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASS,
database=DB_NAME,
autocommit=True,
charset="utf8mb4",
collation="utf8mb4_general_ci",
)
init_pool() require_login()
logout_button()
# -----------------------
# Connexion MySQL via pool
# -----------------------
@st.cache_resource
def get_pool():
load_dotenv()
host = os.getenv("DB_HOST")
port = int(os.getenv("MYSQL_PORT", "3306"))
user = os.getenv("DB_USER")
pwd = os.getenv("DB_PASSWORD")
db = os.getenv("DB_NAME")
missing = [k for k, v in {
"DB_HOST": host, "MYSQL_PORT": port, "DB_USER": user, "DB_PASSWORD": pwd, "DB_NAME": db
}.items() if v in (None, "")]
if missing:
raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}")
return pooling.MySQLConnectionPool(
pool_name="users_pool",
pool_size=5,
pool_reset_session=True,
host=host, port=port, user=user, password=pwd, database=db,
autocommit=True
)
pool = get_pool()
# -----------------------
# Helpers SQL + validations
# -----------------------
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
PHONE_RE = re.compile(r"\d{10,14}")
def normalize_phone(p: str|None) -> str|None:
if not p:
return None
digits = re.sub(r"\D", "", p)
return digits if PHONE_RE.match(digits) else None
# --------------------------- def to_sql_date(d: date | str | None) -> str | None:
# Utils Sécurité if d is None:
# --------------------------- return None
def hash_password(plain: str) -> str: if isinstance(d, str):
if not bcrypt: try:
raise RuntimeError("Le module 'bcrypt' est requis (pip install bcrypt).") d = datetime.fromisoformat(d).date()
salt = bcrypt.gensalt(rounds=12) except Exception:
return None
return d.strftime("%Y-%m-%d")
def hash_password(plain: str, rounds: int = 12) -> str:
salt = bcrypt.gensalt(rounds=rounds)
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8") return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
def user_exists(cur, username: str) -> bool:
# --------------------------- cur.execute("SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s", (username,))
# Accès SQL - Helpers (count,) = cur.fetchone()
# --------------------------- return count > 0
def list_all_sites(cnx) -> List[Tuple[str, str]]: def find_users_by_email(cnx, email: str):
"""Retourne [(dsn, bdd), ...] depuis Connexions.""" cur = cnx.cursor(dictionary=True)
cur = cnx.cursor()
try: try:
cur.execute("SELECT DSN, BDD FROM Connexions ORDER BY BDD") cur.execute(
"SELECT NomUtilisateur, Site FROM Utilisateurs WHERE email=%s ORDER BY NomUtilisateur",
(email,),
)
return cur.fetchall()
finally:
cur.close()
def list_users(cnx, limit: int = 500, include_password=False):
fields = ["NomUtilisateur", "Nom_complet", "Site", "DateExpiration", "Telephone", "email"]
if include_password:
fields.append("MotDePasse")
sql = f"SELECT {', '.join(fields)} FROM Utilisateurs ORDER BY NomUtilisateur LIMIT %s"
cur = cnx.cursor(dictionary=True)
try:
cur.execute(sql, (limit,))
return cur.fetchall() return cur.fetchall()
finally: finally:
cur.close() cur.close()
def insert_user(cnx, username: str, full_name: str, pwd_hash: str, def insert_user(cnx, username, full_name, site, password, expires, phone, email, role):
expires: date, phone: str, email: str, role: str, if not EMAIL_RE.match(email):
site_legacy: Optional[str] = None) -> None: raise ValueError("Email invalide.")
""" phone_norm = normalize_phone(phone)
Insère un utilisateur. exp_sql = to_sql_date(expires)
- 'Site' est obsolète : on le laisse à NULL/'' si la colonne existe encore. pwd_hash = hash_password(password)
"""
cur = cnx.cursor() cur = cnx.cursor()
try: try:
# Vérifie si la colonne Site existe (schéma en transition) if user_exists(cur, username):
cur.execute("SHOW COLUMNS FROM Utilisateurs LIKE 'Site'") raise RuntimeError("Nom d'utilisateur déjà existant.")
has_site_col = cur.fetchone() is not None cur.execute(
"""
if has_site_col: INSERT INTO Utilisateurs
cur.execute(""" (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email, role)
INSERT INTO Utilisateurs VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
(NomUtilisateur, Nom_complet, MotDePasseHash, DateExpiration, """,
Telephone, email, role ) (username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email, role),
VALUES (%s, %s, %s, %s, %s, %s, %s, %s) )
""", (username, full_name, pwd_hash, expires, phone, email, role, site_legacy or "")) return pwd_hash
else:
cur.execute("""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, MotDePasseHash, DateExpiration,
Telephone, email, role)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (username, full_name, pwd_hash, expires, phone, email, role))
finally: finally:
cur.close() cur.close()
def update_user_core(cnx, username: str, full_name: str,
expires: date, phone: str, email: str, role: str) -> None:
cur = cnx.cursor()
try:
cur.execute("""
UPDATE Utilisateurs
SET Nom_complet=%s,
DateExpiration=%s,
Telephone=%s,
email=%s,
role=%s
WHERE NomUtilisateur=%s
""", (full_name, expires, phone, email, role, username))
finally:
cur.close()
def update_user_password(cnx, username: str, pwd_hash: str) -> None:
cur = cnx.cursor()
try:
cur.execute("""
UPDATE Utilisateurs
SET MotDePasseHash=%s
WHERE NomUtilisateur=%s
""", (pwd_hash, username))
finally:
cur.close()
def list_users(cnx) -> List[Dict]: def get_user_details(cnx, username: str):
cur = cnx.cursor(dictionary=True) cur = cnx.cursor(dictionary=True)
try: try:
cur.execute(""" cur.execute(
SELECT u.NomUtilisateur, u.Nom_complet, u.role, u.DateExpiration, """
u.Telephone, u.email SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
FROM Utilisateurs u FROM Utilisateurs
ORDER BY u.NomUtilisateur WHERE NomUtilisateur=%s
""") """,
return cur.fetchall() (username,),
finally: )
cur.close()
def get_user(cnx, username: str) -> Optional[Dict]:
cur = cnx.cursor(dictionary=True)
try:
cur.execute("""
SELECT u.NomUtilisateur, u.Nom_complet, u.role, u.DateExpiration,
u.Telephone, u.email
FROM Utilisateurs u
WHERE u.NomUtilisateur=%s
""", (username,))
return cur.fetchone() return cur.fetchone()
finally: finally:
cur.close() cur.close()
def delete_user(cnx, username: str) -> None: def update_field(cnx, username: str, field: str, value):
allowed = {
"Nom_complet": "Nom_complet",
"Site": "Site",
"DateExpiration": "DateExpiration",
"Telephone": "Telephone",
"email": "email",
}
if field not in allowed:
raise ValueError("Champ non autorisé.")
sql_field = allowed[field]
if field == "email":
if not EMAIL_RE.match(str(value)):
raise ValueError("Email invalide.")
if field == "Telephone":
value = normalize_phone(str(value)) if value else None
if field == "DateExpiration":
value = to_sql_date(value)
cur = cnx.cursor() cur = cnx.cursor()
try: try:
# Nettoie d'abord les droits si contrainte FK cur.execute(
cur.execute("DELETE FROM DroitsSites WHERE NomUtilisateur=%s", (username,)) f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s",
cur.execute("DELETE FROM Utilisateurs WHERE NomUtilisateur=%s", (username,)) (value, username),
)
finally: finally:
cur.close() cur.close()
def get_user_sites(cnx, username: str) -> List[str]: def update_password(cnx, username: str, new_password: str):
"""Retourne la liste des DSN autorisés pour l'utilisateur.""" pwd_hash = hash_password(new_password)
cur = cnx.cursor() cur = cnx.cursor()
try: try:
cur.execute(""" cur.execute(
SELECT d.DSN "UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s",
FROM DroitsSites d (pwd_hash, username),
WHERE d.NomUtilisateur=%s )
ORDER BY d.DSN return pwd_hash
""", (username,))
return [r[0] for r in cur.fetchall()]
finally: finally:
cur.close() cur.close()
def grant_sites_to_user(cnx, username: str, dsns: Optional[List[str]], grant_all: bool = False) -> None: def send_mail(to_email: str, subject: str, body_text: str, body_html: str | None = None):
""" if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM):
Alimente DroitsSites pour un utilisateur. raise RuntimeError("Configuration SMTP incomplète (SMTP_HOST/PORT/USER/PASS/FROM).")
- grant_all=True : accorde tous les DSN présents dans Connexions if not to_email:
- sinon : insère uniquement les DSN de la liste raise ValueError("Destinataire vide.")
"""
cur = cnx.cursor() msg = EmailMessage()
msg["From"] = SMTP_FROM
msg["To"] = to_email
msg["Date"] = formatdate(localtime=True)
msg["Subject"] = subject
msg.set_content(body_text)
if body_html:
msg.add_alternative(body_html, subtype="html")
# Choix du protocole en fonction du port
if SMTP_PORT == 465:
with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=20) as s:
s.login(SMTP_USER, SMTP_PASS)
s.send_message(msg)
else:
# 587 (ou autre) : STARTTLS
with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as s:
s.ehlo()
s.starttls() # passe en TLS
s.ehlo()
s.login(SMTP_USER, SMTP_PASS)
s.send_message(msg)
def get_user_email_and_field(cnx, username: str, field: str):
cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"}
if field not in cols:
field = "Nom_complet" # fallback
with cnx.cursor(dictionary=True) as cur:
cur.execute(
f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s",
(username,)
)
row = cur.fetchone()
return (row.get("email") if row else None, row.get("field_value") if row else None)
def get_user_email_and_field(cnx, username: str, field: str):
cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"}
if field not in cols:
field = "Nom_complet" # fallback
cur = cnx.cursor(dictionary=True)
try: try:
if grant_all: cur.execute(
cur.execute(""" f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s",
INSERT IGNORE INTO DroitsSites (NomUtilisateur, Site, DSN) (username,),
SELECT %s, c.BDD, c.DSN FROM Connexions c )
""", (username,)) row = cur.fetchone()
else: return (row.get("email") if row else None,
if not dsns: row.get("field_value") if row else None)
return
cur.executemany("""
INSERT IGNORE INTO DroitsSites (NomUtilisateur, Site, DSN)
SELECT %s, c.BDD, c.DSN FROM Connexions c WHERE c.DSN=%s
""", [(username, d) for d in dsns])
finally: finally:
cur.close() cur.close()
# -----------------------
# UI
# -----------------------
st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="wide")
st.title("Gestion des utilisateurs")
def replace_user_sites(cnx, username: str, dsns: Optional[List[str]], grant_all: bool = False) -> None: tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "Créer", "Modifier", "Sécurité"])
"""Remplace complètement les droits de l'utilisateur par la nouvelle sélection."""
cur = cnx.cursor() # -----------------------
# ONGLET LISTE
# -----------------------
with tab_list:
st.subheader("Utilisateurs")
try: try:
cur.execute("DELETE FROM DroitsSites WHERE NomUtilisateur=%s", (username,)) cnx = pool.get_connection()
finally:
cur.close()
grant_sites_to_user(cnx, username, dsns, grant_all)
# ---------------------------
# UI Streamlit
# ---------------------------
st.set_page_config(page_title="Gestion des utilisateurs", page_icon="👤", layout="centered")
st.title("👤 Gestion des utilisateurs")
tabs = st.tabs(["Créer", "Modifier", "Lister / Supprimer"])
# --------- Onglet CRÉER ----------
with tabs[0]:
st.subheader("Créer un utilisateur")
with st.form("create_user_form", clear_on_submit=False):
c1, c2 = st.columns([1.2, 1.8])
username = c1.text_input("NomUtilisateur", placeholder="ex: michel")
full_name = c2.text_input("Nom_complet", placeholder="Michel DUPONT")
role = st.selectbox("Rôle", ["utilisateur", "commercial", "administrateur", "admin"], index=0)
# Sélection des sites
labels, label_to_dsn = [], {}
try: try:
cnx = pool.get_connection() include_pw = os.getenv("ADMIN_USER") == st.session_state.get("current_user", os.getenv("ADMIN_USER"))
try: rows = list_users(cnx, limit=1000, include_password=include_pw)
rows = list_all_sites(cnx) # [(dsn,bdd)] finally:
finally: cnx.close()
cnx.close()
labels = [f"{bdd} ({dsn})" for dsn, bdd in rows]
label_to_dsn = {lbl: dsn for (dsn, bdd), lbl in zip(rows, labels)}
except Exception as e:
st.warning(f"Chargement des sites impossible : {e}")
colL, colR = st.columns([2, 1]) df = pd.DataFrame(rows)
with colL: if not df.empty:
selected_labels = st.multiselect("Sites autorisés", labels, help="Choisis un ou plusieurs sites") from datetime import date
with colR: df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date
grant_all_sites = st.checkbox("Tous les sites", value=False)
# Si administrateur → tous les sites # --- Ajout coloration et statut expiration ---
if role.lower().startswith("admin"): ALERTE_JOURS = 30
grant_all_sites = True exp = pd.to_datetime(df["DateExpiration"], errors="coerce")
today = pd.Timestamp(date.today())
df["Jours_restant"] = (exp - today).dt.days
def statut_expiration(j):
if pd.isna(j):
return "Inconnu"
j = int(j)
if j < 0:
return "Périmé"
if j <= ALERTE_JOURS:
return f"Bientôt (≤{ALERTE_JOURS}j)"
return "OK"
df["Statut"] = df["Jours_restant"].apply(statut_expiration)
c1, c2, c3 = st.columns([1, 1, 3])
with c1:
only_bad = st.checkbox("📌 Périmés / bientôt", value=False)
with c2:
tri_jours = st.checkbox("🔽 Trier par Jours restants", value=True)
if only_bad:
df = df[df["Statut"].isin(["Périmé", f"Bientôt (≤{ALERTE_JOURS}j)"])]
if tri_jours and "Jours_restant" in df.columns:
df = df.sort_values("Jours_restant", na_position="last")
def colorize(row):
stt = row.get("Statut", "")
n = len(row)
if stt == "Périmé":
return ["background-color:#ffe6e6; color:#b00020;"] * n # rouge clair
if stt.startswith("Bientôt"):
return ["background-color:#fff7e6; color:#8a6d3b;"] * n # orange clair
return [""] * n
styled = df.style.apply(colorize, axis=1)
st.dataframe(
styled,
use_container_width=True,
hide_index=True,
height=700, # ⇐ affiche ~20 lignes sans scroller
column_config={
"DateExpiration": st.column_config.DateColumn("DateExpiration", format="YYYY-MM-DD"),
"Jours_restant": st.column_config.NumberColumn("Jours restants", help="Négatif = périmé"),
},
)
st.caption(f"{len(df)} utilisateur(s) affiché(s)")
else:
st.info("Aucun utilisateur à afficher.")
except Exception as e:
st.warning(f"Impossible de lister les utilisateurs : {e}")
# -----------------------
# ONGLET CREER
# -----------------------
with tab_create:
with st.form("create_user_form", clear_on_submit=False):
st.subheader("Nouveau compte")
c1, c2, c3 = st.columns([1.2, 1.5, 1])
username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier")
full_name = c2.text_input("Nom_complet", placeholder="Clément JAQUIER")
site = c3.text_input("Site", placeholder="Roissy", value="Roissy")
role = st.selectbox("Rôle", ["Utilisateur", "Administrateur"], index=0)
c4, c5, c6 = st.columns([1.4, 1, 1]) c4, c5, c6 = st.columns([1.4, 1, 1])
email = c4.text_input("email", placeholder="prenom.nom@domaine.com") email = c4.text_input("email", placeholder="prenom.nom@domaine.com")
@@ -273,191 +400,206 @@ with tabs[0]:
expires = c6.date_input("DateExpiration", value=date.today()) expires = c6.date_input("DateExpiration", value=date.today())
c7, c8 = st.columns(2) c7, c8 = st.columns(2)
password = c7.text_input("Mot de passe", type="password") password = c7.text_input("Mot de passe", type="password")
password2 = c8.text_input("Confirmer", type="password") password2 = c8.text_input("Confirmer", type="password")
col_cb1, col_cb2 = st.columns([1.2, 1])
notify_welcome = col_cb2.checkbox("Envoyer un e-mail de bienvenue", value=True,
help="Enverra l'identifiant, le nom, le site et le mot de passe en clair")
submitted = st.form_submit_button("Créer l'utilisateur", use_container_width=True) submitted = st.form_submit_button("Créer l'utilisateur", use_container_width=True)
if submitted: if submitted:
if not username or not full_name or not email: if not username or not full_name or not site or not email:
st.error("Champs requis manquants.") st.error("Champs requis manquants.")
elif not EMAIL_RE.match(email): elif not EMAIL_RE.match(email):
st.error("Format de-mail invalide.") st.error("Format de-mail invalide.")
elif not password:
st.error("Veuillez saisir un mot de passe.")
elif password != password2: elif password != password2:
st.error("Les mots de passe ne correspondent pas.") st.error("Les mots de passe ne correspondent pas.")
else: else:
try: try:
cnx = pool.get_connection() cnx = pool.get_connection()
try: try:
# Hash # 🔎 avertir si l'e-mail existe déjà (mais on n'empêche pas)
pwd_hash = hash_password(password) dup = find_users_by_email(cnx, email)
if dup:
liste = ", ".join(f"{u['NomUtilisateur']}@{u['Site']}" for u in dup)
st.info(f"Cet e-mail est déjà utilisé par : {liste}")
# Option 'site_legacy' : garde vide (schéma en transition) pwd_hash = insert_user(
insert_user( cnx, username=username, full_name=full_name, site=site,
cnx, password=password, expires=expires, phone=phone, email=email, role=role
username=username,
full_name=full_name,
pwd_hash=pwd_hash,
expires=expires,
phone=phone,
email=email,
role=role,
site_legacy=None,
) )
# Droits
chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels] if selected_labels else []
grant_sites_to_user(cnx, username=username, dsns=chosen_dsns, grant_all=grant_all_sites)
finally: finally:
cnx.close() cnx.close()
st.success("Utilisateur créé ✅") st.success("Utilisateur créé avec succès")
st.caption("Hash enregistré (MotDePasseHash) :") st.caption("Hash (MotDePasseHash) :")
st.code(pwd_hash) st.code(pwd_hash)
# ✉️ Mail de bienvenue (optionnel)
if notify_welcome:
try:
subj = f"[Compte créé] Vos accès — {site}"
body_txt = (
"Bonjour,\n\n"
"Votre compte a été créé.\n\n"
f"Nom dutilisateur : {username}\n"
f"Nom complet : {full_name}\n"
f"Site : {site}\n"
f"Mot de passe : {password}\n"
f"Date d'expiration: {expires.strftime('%Y-%m-%d')}\n\n"
"Cordialement."
)
body_html = f"""
<html><body>
<p>Bonjour,</p>
<p>Votre compte a été créé.</p>
<table cellpadding="6" cellspacing="0" border="0" style="border-collapse:collapse">
<tr><td><b>Nom dutilisateur</b></td><td>{username}</td></tr>
<tr><td><b>Nom complet</b></td><td>{full_name}</td></tr>
<tr><td><b>Site</b></td><td>{site}</td></tr>
<tr><td><b>Mot de passe</b></td><td style="font-family:monospace">{password}</td></tr>
<tr><td><b>Date d'expiration</b></td><td>{expires.strftime('%Y-%m-%d')}</td></tr>
</table>
<p>Cordialement.</p>
</body></html>
"""
send_mail(email, subj, body_txt, body_html)
st.success(f"✉️ E-mail de bienvenue envoyé à {email}")
except Exception as e_mail:
st.warning(f"E-mail non envoyé : {e_mail}")
except mysql.connector.Error as db_err: except mysql.connector.Error as db_err:
st.error(f"Erreur MySQL : {db_err}") if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
st.error("Identifiants MySQL invalides.")
else:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e: except Exception as e:
st.error(f"Erreur : {e}") st.error(f"Erreur : {e}")
# --------- Onglet MODIFIER ---------- # -----------------------
with tabs[1]: # ONGLET MODIFIER
st.subheader("Modifier un utilisateur") # -----------------------
usernames = [] with tab_edit:
st.subheader("Modifier un utilisateur existant")
try: try:
cnx = pool.get_connection() cnx = pool.get_connection()
try: try:
users = list_users(cnx) users = list_users(cnx, limit=1000)
finally: finally:
cnx.close() cnx.close()
usernames = [u["NomUtilisateur"] for u in users] usernames = [u["NomUtilisateur"] for u in users]
except Exception as e: except Exception as e:
st.warning(f"Chargement des utilisateurs impossible : {e}") users = []
usernames = []
st.warning(f"Impossible de charger la liste des utilisateurs : {e}")
target = st.selectbox("Choisir l'utilisateur", [""] + usernames, index=0) top_left, top_right = st.columns([1.2, 2])
if target: with top_left:
try: sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur")
cnx = pool.get_connection() field = st.selectbox(
try: "Champ à modifier",
u = get_user(cnx, target) ["Nom_complet", "Site", "DateExpiration", "Telephone", "email"]
rows = list_all_sites(cnx) # [(dsn,bdd)] )
current_dsns = set(get_user_sites(cnx, target))
finally:
cnx.close()
if not u: with top_right:
st.error("Utilisateur introuvable.") if field == "DateExpiration":
else: new_value = st.date_input("Nouvelle valeur", value=date.today())
labels = [f"{bdd} ({dsn})" for dsn, bdd in rows]
label_to_dsn = {lbl: dsn for (dsn, bdd), lbl in zip(rows, labels)}
# pré-sélection
preselected = [lbl for lbl in labels if label_to_dsn[lbl] in current_dsns]
with st.form("edit_user_form", clear_on_submit=False):
c1, c2 = st.columns([1.2, 1.8])
full_name = c2.text_input("Nom_complet", value=u["Nom_complet"] or "")
role = c1.selectbox("Rôle", ["utilisateur", "commercial", "administrateur", "admin"],
index=0 if (u["role"] or "").lower() not in ["commercial", "administrateur", "admin"]
else ["utilisateur","commercial","administrateur","admin"].index((u["role"] or "").lower()))
c4, c5, c6 = st.columns([1.4, 1, 1])
email = c4.text_input("email", value=u["email"] or "")
phone = c5.text_input("Téléphone", value=u["Telephone"] or "")
exp_val = u["DateExpiration"].date() if isinstance(u["DateExpiration"], datetime) else (u["DateExpiration"] or date.today())
expires = c6.date_input("DateExpiration", value=exp_val)
st.markdown("**Droits daccès aux sites**")
colL, colR = st.columns([2, 1])
with colL:
selected_labels = st.multiselect("Sites autorisés", labels, default=preselected)
with colR:
grant_all_sites = st.checkbox("Tous les sites", value=False)
st.divider()
cpass1, cpass2 = st.columns(2)
new_pass = cpass1.text_input("Nouveau mot de passe (optionnel)", type="password")
new_pass2 = cpass2.text_input("Confirmer", type="password")
cbtn1, cbtn2, cbtn3 = st.columns([1.2,1,1])
save_btn = cbtn1.form_submit_button("Enregistrer")
reset_btn = cbtn2.form_submit_button("Réinitialiser mot de passe")
replace_btn = cbtn3.form_submit_button("Remplacer droits")
if save_btn:
if not EMAIL_RE.match(email):
st.error("Format de-mail invalide.")
else:
try:
cnx = pool.get_connection()
try:
update_user_core(cnx, target, full_name, expires, phone, email, role)
# met à jour droits (ajout complémentaire, sans retirer)
chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels]
grant_sites_to_user(cnx, target, chosen_dsns, grant_all_sites)
finally:
cnx.close()
st.success("Modifications enregistrées ✅")
except Exception as e:
st.error(f"Erreur : {e}")
if reset_btn:
if not new_pass:
st.error("Saisis un nouveau mot de passe.")
elif new_pass != new_pass2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
cnx = pool.get_connection()
try:
update_user_password(cnx, target, hash_password(new_pass))
finally:
cnx.close()
st.success("Mot de passe réinitialisé ✅")
except Exception as e:
st.error(f"Erreur : {e}")
if replace_btn:
try:
cnx = pool.get_connection()
try:
chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels]
replace_user_sites(cnx, target, chosen_dsns, grant_all_sites)
finally:
cnx.close()
st.success("Droits remplacés ✅")
except Exception as e:
st.error(f"Erreur : {e}")
except Exception as e:
st.error(f"Erreur de chargement : {e}")
# --------- Onglet LISTE / SUPPR ----------
with tabs[2]:
st.subheader("Liste des utilisateurs")
try:
cnx = pool.get_connection()
try:
data = list_users(cnx)
finally:
cnx.close()
if not data:
st.info("Aucun utilisateur.")
else: else:
st.dataframe(data, use_container_width=True, hide_index=True) new_value = st.text_input("Nouvelle valeur")
except Exception as e: notify_user = st.checkbox("Notifier lutilisateur par e-mail", value=True, help="Envoie un e-mail si coché")
st.error(f"Erreur : {e}") update_btn = st.button("Mettre à jour", disabled=not sel_user, use_container_width=True)
if update_btn and sel_user:
st.divider()
st.subheader("Supprimer un utilisateur")
del_user = st.text_input("NomUtilisateur à supprimer", value="")
if st.button("Supprimer", type="primary", disabled=(not del_user)):
try: try:
cnx = pool.get_connection() cnx = pool.get_connection()
try: try:
delete_user(cnx, del_user) # 1) Lire email + ancienne valeur
to_email, old_value = get_user_email_and_field(cnx, sel_user, field)
# 2) Appliquer la mise à jour
update_field(cnx, sel_user, field, new_value)
finally: finally:
cnx.close() cnx.close()
st.success(f"Utilisateur '{del_user}' supprimé ✅")
st.success(f"{field} mis à jour pour {sel_user}")
# 3) Notification mail si demandé
if notify_user:
try:
nv = new_value.strftime("%Y-%m-%d") if hasattr(new_value, "strftime") else new_value
ov = old_value.strftime("%Y-%m-%d") if hasattr(old_value, "strftime") else old_value
if not to_email:
st.info(" Aucune notification envoyée : adresse e-mail manquante.")
else:
subject = f"[Compte] Mise à jour de votre information : {field}"
body_txt = (
f"Bonjour,\n\n"
f"Votre information '{field}' vient dêtre mise à jour par ladministrateur.\n"
f"Ancienne valeur : {ov}\n"
f"Nouvelle valeur : {nv}\n\n"
f"Si vous nêtes pas à lorigine de cette demande, répondez à cet e-mail.\n"
f"Cordialement."
)
body_html = f"""
<html><body>
<p>Bonjour,</p>
<p>Votre information <b>{field}</b> vient dêtre mise à jour par ladministrateur.</p>
<ul>
<li><b>Ancienne valeur :</b> {ov}</li>
<li><b>Nouvelle valeur :</b> {nv}</li>
</ul>
<p>Si vous nêtes pas à lorigine de cette demande, répondez à cet e-mail.</p>
<p>Cordialement.</p>
</body></html>
"""
send_mail(to_email, subject, body_txt, body_html)
st.success(f"✉️ Notification envoyée à {to_email}")
except Exception as e_mail:
st.warning(f"Notification non envoyée : {e_mail}")
except mysql.connector.Error as db_err:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e: except Exception as e:
st.error(f"Erreur : {e}") st.error(f"Erreur : {e}")
# -----------------------
# ONGLET SECURITE
# -----------------------
with tab_security:
st.subheader("Réinitialiser le mot de passe (bcrypt)")
try:
if 'user_cache_for_pw' not in st.session_state:
cnx = pool.get_connection()
try:
st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)]
finally:
cnx.close()
pw_user_list = st.session_state.user_cache_for_pw
except Exception:
pw_user_list = []
user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur")
colp1, colp2 = st.columns(2)
new_pw = colp1.text_input("Nouveau mot de passe", type="password")
new_pw2 = colp2.text_input("Confirmer", type="password")
if st.button("Mettre à jour le mot de passe", disabled=not user_pw, use_container_width=True):
if not new_pw:
st.error("Mot de passe vide.")
elif new_pw != new_pw2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
cnx = pool.get_connection()
try:
h = update_password(cnx, user_pw, new_pw)
finally:
cnx.close()
st.success("Mot de passe mis à jour ✅")
st.caption("Hash (MotDePasseHash) :")
st.code(h)
except Exception as e:
st.error(f"Erreur : {e}")