Files
Outils/app/users.py
2025-11-12 13:25:26 +01:00

464 lines
17 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# users.py
# ------------------------------------------------------------
# Gestion des utilisateurs + droits d'accès par site (DroitsSites)
# - Streamlit UI
# - MySQL (mysql-connector-python)
# - Hash de mot de passe avec bcrypt
#
# .env attendu (ou variables d'env) :
# DB_HOST=...
# DB_PORT=3306
# DB_USER=...
# DB_PASS=...
# DB_NAME=Acces # <- base "annuaire" qui contient Utilisateurs/DroitsSites/Connexions
# ------------------------------------------------------------
import os
import re
from datetime import date, datetime
from typing import List, Tuple, Dict, Optional
import streamlit as st
import mysql.connector
from mysql.connector.pooling import MySQLConnectionPool
try:
from dotenv import load_dotenv # facultatif mais pratique
load_dotenv()
except Exception:
pass
try:
import bcrypt # pip install bcrypt
except Exception:
bcrypt = None
# ---------------------------
# Config / Connexion MySQL
# ---------------------------
DB_HOST = os.getenv("DB_HOST", "localhost")
DB_PORT = int(os.getenv("DB_PORT", "3306"))
DB_USER = os.getenv("DB_USER", "root")
DB_PASS = os.getenv("DB_PASS", "")
DB_NAME = os.getenv("DB_NAME", "Acces")
pool: Optional[MySQLConnectionPool] = None
def init_pool():
global pool
if pool is None:
pool = MySQLConnectionPool(
pool_name="users_pool",
pool_size=5,
host=DB_HOST,
port=DB_PORT,
user=DB_USER,
password=DB_PASS,
database=DB_NAME,
autocommit=True,
charset="utf8mb4",
collation="utf8mb4_general_ci",
)
init_pool()
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
# ---------------------------
# Utils Sécurité
# ---------------------------
def hash_password(plain: str) -> str:
if not bcrypt:
raise RuntimeError("Le module 'bcrypt' est requis (pip install bcrypt).")
salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
# ---------------------------
# Accès SQL - Helpers
# ---------------------------
def list_all_sites(cnx) -> List[Tuple[str, str]]:
"""Retourne [(dsn, bdd), ...] depuis Connexions."""
cur = cnx.cursor()
try:
cur.execute("SELECT DSN, BDD FROM Connexions ORDER BY BDD")
return cur.fetchall()
finally:
cur.close()
def insert_user(cnx, username: str, full_name: str, pwd_hash: str,
expires: date, phone: str, email: str, role: str,
site_legacy: Optional[str] = None) -> None:
"""
Insère un utilisateur.
- 'Site' est obsolète : on le laisse à NULL/'' si la colonne existe encore.
"""
cur = cnx.cursor()
try:
# Vérifie si la colonne Site existe (schéma en transition)
cur.execute("SHOW COLUMNS FROM Utilisateurs LIKE 'Site'")
has_site_col = cur.fetchone() is not None
if has_site_col:
cur.execute("""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, MotDePasseHash, DateExpiration,
Telephone, email, role )
VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
""", (username, full_name, pwd_hash, expires, phone, email, role, site_legacy or ""))
else:
cur.execute("""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, MotDePasseHash, DateExpiration,
Telephone, email, role)
VALUES (%s, %s, %s, %s, %s, %s, %s)
""", (username, full_name, pwd_hash, expires, phone, email, role))
finally:
cur.close()
def update_user_core(cnx, username: str, full_name: str,
expires: date, phone: str, email: str, role: str) -> None:
cur = cnx.cursor()
try:
cur.execute("""
UPDATE Utilisateurs
SET Nom_complet=%s,
DateExpiration=%s,
Telephone=%s,
email=%s,
role=%s
WHERE NomUtilisateur=%s
""", (full_name, expires, phone, email, role, username))
finally:
cur.close()
def update_user_password(cnx, username: str, pwd_hash: str) -> None:
cur = cnx.cursor()
try:
cur.execute("""
UPDATE Utilisateurs
SET MotDePasseHash=%s
WHERE NomUtilisateur=%s
""", (pwd_hash, username))
finally:
cur.close()
def list_users(cnx) -> List[Dict]:
cur = cnx.cursor(dictionary=True)
try:
cur.execute("""
SELECT u.NomUtilisateur, u.Nom_complet, u.role, u.DateExpiration,
u.Telephone, u.email
FROM Utilisateurs u
ORDER BY u.NomUtilisateur
""")
return cur.fetchall()
finally:
cur.close()
def get_user(cnx, username: str) -> Optional[Dict]:
cur = cnx.cursor(dictionary=True)
try:
cur.execute("""
SELECT u.NomUtilisateur, u.Nom_complet, u.role, u.DateExpiration,
u.Telephone, u.email
FROM Utilisateurs u
WHERE u.NomUtilisateur=%s
""", (username,))
return cur.fetchone()
finally:
cur.close()
def delete_user(cnx, username: str) -> None:
cur = cnx.cursor()
try:
# Nettoie d'abord les droits si contrainte FK
cur.execute("DELETE FROM DroitsSites WHERE NomUtilisateur=%s", (username,))
cur.execute("DELETE FROM Utilisateurs WHERE NomUtilisateur=%s", (username,))
finally:
cur.close()
def get_user_sites(cnx, username: str) -> List[str]:
"""Retourne la liste des DSN autorisés pour l'utilisateur."""
cur = cnx.cursor()
try:
cur.execute("""
SELECT d.DSN
FROM DroitsSites d
WHERE d.NomUtilisateur=%s
ORDER BY d.DSN
""", (username,))
return [r[0] for r in cur.fetchall()]
finally:
cur.close()
def grant_sites_to_user(cnx, username: str, dsns: Optional[List[str]], grant_all: bool = False) -> None:
"""
Alimente DroitsSites pour un utilisateur.
- grant_all=True : accorde tous les DSN présents dans Connexions
- sinon : insère uniquement les DSN de la liste
"""
cur = cnx.cursor()
try:
if grant_all:
cur.execute("""
INSERT IGNORE INTO DroitsSites (NomUtilisateur, Site, DSN)
SELECT %s, c.BDD, c.DSN FROM Connexions c
""", (username,))
else:
if not dsns:
return
cur.executemany("""
INSERT IGNORE INTO DroitsSites (NomUtilisateur, Site, DSN)
SELECT %s, c.BDD, c.DSN FROM Connexions c WHERE c.DSN=%s
""", [(username, d) for d in dsns])
finally:
cur.close()
def replace_user_sites(cnx, username: str, dsns: Optional[List[str]], grant_all: bool = False) -> None:
"""Remplace complètement les droits de l'utilisateur par la nouvelle sélection."""
cur = cnx.cursor()
try:
cur.execute("DELETE FROM DroitsSites WHERE NomUtilisateur=%s", (username,))
finally:
cur.close()
grant_sites_to_user(cnx, username, dsns, grant_all)
# ---------------------------
# UI Streamlit
# ---------------------------
st.set_page_config(page_title="Gestion des utilisateurs", page_icon="👤", layout="centered")
st.title("👤 Gestion des utilisateurs")
tabs = st.tabs(["Créer", "Modifier", "Lister / Supprimer"])
# --------- Onglet CRÉER ----------
with tabs[0]:
st.subheader("Créer un utilisateur")
with st.form("create_user_form", clear_on_submit=False):
c1, c2 = st.columns([1.2, 1.8])
username = c1.text_input("NomUtilisateur", placeholder="ex: michel")
full_name = c2.text_input("Nom_complet", placeholder="Michel DUPONT")
role = st.selectbox("Rôle", ["utilisateur", "commercial", "administrateur", "admin"], index=0)
# Sélection des sites
labels, label_to_dsn = [], {}
try:
cnx = pool.get_connection()
try:
rows = list_all_sites(cnx) # [(dsn,bdd)]
finally:
cnx.close()
labels = [f"{bdd} ({dsn})" for dsn, bdd in rows]
label_to_dsn = {lbl: dsn for (dsn, bdd), lbl in zip(rows, labels)}
except Exception as e:
st.warning(f"Chargement des sites impossible : {e}")
colL, colR = st.columns([2, 1])
with colL:
selected_labels = st.multiselect("Sites autorisés", labels, help="Choisis un ou plusieurs sites")
with colR:
grant_all_sites = st.checkbox("Tous les sites", value=False)
# Si administrateur → tous les sites
if role.lower().startswith("admin"):
grant_all_sites = True
c4, c5, c6 = st.columns([1.4, 1, 1])
email = c4.text_input("email", placeholder="prenom.nom@domaine.com")
phone = c5.text_input("Téléphone", placeholder="06 12 12 35 32")
expires = c6.date_input("DateExpiration", value=date.today())
c7, c8 = st.columns(2)
password = c7.text_input("Mot de passe", type="password")
password2 = c8.text_input("Confirmer", type="password")
submitted = st.form_submit_button("Créer l'utilisateur", use_container_width=True)
if submitted:
if not username or not full_name or not email:
st.error("Champs requis manquants.")
elif not EMAIL_RE.match(email):
st.error("Format de-mail invalide.")
elif not password:
st.error("Veuillez saisir un mot de passe.")
elif password != password2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
cnx = pool.get_connection()
try:
# Hash
pwd_hash = hash_password(password)
# Option 'site_legacy' : garde vide (schéma en transition)
insert_user(
cnx,
username=username,
full_name=full_name,
pwd_hash=pwd_hash,
expires=expires,
phone=phone,
email=email,
role=role,
site_legacy=None,
)
# Droits
chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels] if selected_labels else []
grant_sites_to_user(cnx, username=username, dsns=chosen_dsns, grant_all=grant_all_sites)
finally:
cnx.close()
st.success("Utilisateur créé ✅")
st.caption("Hash enregistré (MotDePasseHash) :")
st.code(pwd_hash)
except mysql.connector.Error as db_err:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
# --------- Onglet MODIFIER ----------
with tabs[1]:
st.subheader("Modifier un utilisateur")
usernames = []
try:
cnx = pool.get_connection()
try:
users = list_users(cnx)
finally:
cnx.close()
usernames = [u["NomUtilisateur"] for u in users]
except Exception as e:
st.warning(f"Chargement des utilisateurs impossible : {e}")
target = st.selectbox("Choisir l'utilisateur", [""] + usernames, index=0)
if target:
try:
cnx = pool.get_connection()
try:
u = get_user(cnx, target)
rows = list_all_sites(cnx) # [(dsn,bdd)]
current_dsns = set(get_user_sites(cnx, target))
finally:
cnx.close()
if not u:
st.error("Utilisateur introuvable.")
else:
labels = [f"{bdd} ({dsn})" for dsn, bdd in rows]
label_to_dsn = {lbl: dsn for (dsn, bdd), lbl in zip(rows, labels)}
# pré-sélection
preselected = [lbl for lbl in labels if label_to_dsn[lbl] in current_dsns]
with st.form("edit_user_form", clear_on_submit=False):
c1, c2 = st.columns([1.2, 1.8])
full_name = c2.text_input("Nom_complet", value=u["Nom_complet"] or "")
role = c1.selectbox("Rôle", ["utilisateur", "commercial", "administrateur", "admin"],
index=0 if (u["role"] or "").lower() not in ["commercial", "administrateur", "admin"]
else ["utilisateur","commercial","administrateur","admin"].index((u["role"] or "").lower()))
c4, c5, c6 = st.columns([1.4, 1, 1])
email = c4.text_input("email", value=u["email"] or "")
phone = c5.text_input("Téléphone", value=u["Telephone"] or "")
exp_val = u["DateExpiration"].date() if isinstance(u["DateExpiration"], datetime) else (u["DateExpiration"] or date.today())
expires = c6.date_input("DateExpiration", value=exp_val)
st.markdown("**Droits daccès aux sites**")
colL, colR = st.columns([2, 1])
with colL:
selected_labels = st.multiselect("Sites autorisés", labels, default=preselected)
with colR:
grant_all_sites = st.checkbox("Tous les sites", value=False)
st.divider()
cpass1, cpass2 = st.columns(2)
new_pass = cpass1.text_input("Nouveau mot de passe (optionnel)", type="password")
new_pass2 = cpass2.text_input("Confirmer", type="password")
cbtn1, cbtn2, cbtn3 = st.columns([1.2,1,1])
save_btn = cbtn1.form_submit_button("Enregistrer")
reset_btn = cbtn2.form_submit_button("Réinitialiser mot de passe")
replace_btn = cbtn3.form_submit_button("Remplacer droits")
if save_btn:
if not EMAIL_RE.match(email):
st.error("Format de-mail invalide.")
else:
try:
cnx = pool.get_connection()
try:
update_user_core(cnx, target, full_name, expires, phone, email, role)
# met à jour droits (ajout complémentaire, sans retirer)
chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels]
grant_sites_to_user(cnx, target, chosen_dsns, grant_all_sites)
finally:
cnx.close()
st.success("Modifications enregistrées ✅")
except Exception as e:
st.error(f"Erreur : {e}")
if reset_btn:
if not new_pass:
st.error("Saisis un nouveau mot de passe.")
elif new_pass != new_pass2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
cnx = pool.get_connection()
try:
update_user_password(cnx, target, hash_password(new_pass))
finally:
cnx.close()
st.success("Mot de passe réinitialisé ✅")
except Exception as e:
st.error(f"Erreur : {e}")
if replace_btn:
try:
cnx = pool.get_connection()
try:
chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels]
replace_user_sites(cnx, target, chosen_dsns, grant_all_sites)
finally:
cnx.close()
st.success("Droits remplacés ✅")
except Exception as e:
st.error(f"Erreur : {e}")
except Exception as e:
st.error(f"Erreur de chargement : {e}")
# --------- Onglet LISTE / SUPPR ----------
with tabs[2]:
st.subheader("Liste des utilisateurs")
try:
cnx = pool.get_connection()
try:
data = list_users(cnx)
finally:
cnx.close()
if not data:
st.info("Aucun utilisateur.")
else:
st.dataframe(data, use_container_width=True, hide_index=True)
except Exception as e:
st.error(f"Erreur : {e}")
st.divider()
st.subheader("Supprimer un utilisateur")
del_user = st.text_input("NomUtilisateur à supprimer", value="")
if st.button("Supprimer", type="primary", disabled=(not del_user)):
try:
cnx = pool.get_connection()
try:
delete_user(cnx, del_user)
finally:
cnx.close()
st.success(f"Utilisateur '{del_user}' supprimé ✅")
except Exception as e:
st.error(f"Erreur : {e}")