# app_users.py — création + modification de champs (wide + onglets + grille) import os import re from datetime import date, datetime import bcrypt import mysql.connector from mysql.connector import errorcode import streamlit as st from dotenv import load_dotenv # ----------------------- # Auth minimale # ----------------------- def require_login(): st.markdown( """ """, unsafe_allow_html=True ) load_dotenv() admin_user = os.getenv("ADMIN_USER", "admin") admin_hash = os.getenv("ADMIN_PASS_HASH") if not admin_hash: st.error("ADMIN_PASS_HASH manquant dans .env") st.stop() if "auth_ok" not in st.session_state: st.session_state.auth_ok = False if not st.session_state.auth_ok: st.markdown("

🔐 Accès restreint

", unsafe_allow_html=True) # conteneur centré login_left, login_center, login_right = st.columns([1, 2, 1]) with login_center: st.markdown( """
""", unsafe_allow_html=True ) u = st.text_input("Utilisateur", key="login_user") p = st.text_input("Mot de passe", type="password", key="login_pass") if st.button("Se connecter", use_container_width=True): if u == admin_user and bcrypt.checkpw(p.encode(), admin_hash.encode()): st.session_state.auth_ok = True st.rerun() else: st.error("Identifiants invalides") st.markdown("
", unsafe_allow_html=True) st.stop() require_login() # ----------------------- # Connexion MySQL # ----------------------- @st.cache_resource def get_connection(): load_dotenv() host = os.getenv("DB_HOST") port = int(os.getenv("MYSQL_PORT", "3306")) user = os.getenv("DB_USER") pwd = os.getenv("DB_PASSWORD") db = os.getenv("DB_NAME") missing = [k for k, v in { "MYSQL_USER": user, "MYSQL_PASSWORD": pwd, "MYSQL_HOST": host, "MYSQL_PORT": port, "MYSQL_DATABASE": db }.items() if v in (None, "")] if missing: raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}") return mysql.connector.connect( host=host, port=port, user=user, password=pwd, database=db, autocommit=True ) # ----------------------- # Utilitaires # ----------------------- EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") def normalize_phone(phone: str | None) -> str | None: if not phone: return None return re.sub(r"[^\d+]", "", phone) def to_sql_date(d: date | str) -> str: if isinstance(d, date): return d.strftime("%Y-%m-%d") for fmt in ("%Y-%m-%d", "%d/%m/%Y"): try: return datetime.strptime(d, fmt).strftime("%Y-%m-%d") except ValueError: continue raise ValueError("Date invalide (attendu: YYYY-MM-DD ou JJ/MM/AAAA)") def hash_password(plain: str, rounds: int = 12) -> str: salt = bcrypt.gensalt(rounds=rounds) return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8") def user_exists(cursor, username: str, email: str) -> bool: cursor.execute( "SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s OR email=%s", (username, email), ) (count,) = cursor.fetchone() return count > 0 def insert_user(cnx, username, full_name, site, password, expires, phone, email, store_plain=False): if not EMAIL_RE.match(email): raise ValueError("Email invalide.") phone_norm = normalize_phone(phone) exp_sql = to_sql_date(expires) pwd_hash = hash_password(password) with cnx.cursor() as cur: if user_exists(cur, username, email): raise RuntimeError("Nom d'utilisateur ou email déjà existant.") if store_plain: cur.execute( """ INSERT INTO Utilisateurs (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) """, (username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email), ) else: cur.execute( """ INSERT INTO Utilisateurs (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) """, (username, full_name, site, None, pwd_hash, exp_sql, phone_norm, email), ) return pwd_hash def list_users(cnx, limit=200): with cnx.cursor(dictionary=True) as cur: cur.execute( """ SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email FROM Utilisateurs ORDER BY NomUtilisateur ASC LIMIT %s """, (limit,), ) return cur.fetchall() def get_user(cnx, username: str): with cnx.cursor(dictionary=True) as cur: cur.execute( """ SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email FROM Utilisateurs WHERE NomUtilisateur=%s """, (username,), ) return cur.fetchone() def update_field(cnx, username: str, field: str, value): allowed = { "Nom_complet": "Nom_complet", "Site": "Site", "Telephone": "Telephone", "DateExpiration": "DateExpiration", "email": "email", } if field not in allowed: raise ValueError("Champ non autorisé.") sql_field = allowed[field] if field == "email": if not EMAIL_RE.match(str(value)): raise ValueError("Email invalide.") if field == "Telephone": value = normalize_phone(str(value)) if value else None if field == "DateExpiration": value = to_sql_date(value) with cnx.cursor() as cur: cur.execute( f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s", (value, username), ) def update_password(cnx, username: str, new_password: str): pwd_hash = hash_password(new_password) with cnx.cursor() as cur: cur.execute( "UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s", (pwd_hash, username), ) return pwd_hash # ----------------------- # UI # ----------------------- st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="wide") # CSS doux st.markdown( """ """, unsafe_allow_html=True ) st.title("👤 Acces.Utilisateurs") # Connexion DB (badge) try: cnx = get_connection() st.caption("Connexion MySQL via `.env` : **OK** ✅") except Exception as e: st.error(f"Connexion MySQL impossible : {e}") st.stop() # Bouton Sortie / Exit hdr_l, hdr_r = st.columns([1, 0.18]) with hdr_r: if st.button("🔒 SORTIE / EXIT", use_container_width=True, help="Se déconnecter et verrouiller l'accès"): st.session_state.auth_ok = False st.rerun() # ----------------------- # Onglets # ----------------------- tab_list, tab_create, tab_edit, tab_security = st.tabs( ["📋 Liste", "🆕 Créer", "✏️ Modifier", "🔐 Sécurité"] ) # --- Onglet Liste --- with tab_list: st.subheader("Utilisateurs récents") try: import pandas as pd rows = list_users(cnx, limit=1000) df = pd.DataFrame(rows) if not df.empty: df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date from datetime import date as _date today = _date.today() expired_mask = df["DateExpiration"] <= today col_a, col_b = st.columns(2) col_a.metric("Total utilisateurs", len(df)) col_b.metric("Comptes périmés", int(expired_mask.sum())) def style_expired(row): if row["DateExpiration"] <= today: return ["color: white; background-color: #d32f2f;"] * len(row) return [""] * len(row) styled = df.style.apply(style_expired, axis=1) visible_rows = len(df) row_px = 36 header_px = 48 padding_px = 24 height = min(1000, header_px + row_px * visible_rows + padding_px) st.dataframe( styled, use_container_width=True, hide_index=True, height=height, ) else: st.info("Aucun utilisateur à afficher.") except Exception as e: st.warning(f"Impossible de lister les utilisateurs : {e}") # --- Onglet Créer --- with tab_create: with st.form("create_user_form", clear_on_submit=False): st.subheader("Nouveau compte") c1, c2, c3 = st.columns([1.2, 1.5, 1]) username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier") full_name = c2.text_input("Nom_complet", placeholder="Clément JAQUIER") site = c3.text_input("Site", placeholder="Roissy", value="Roissy") c4, c5, c6 = st.columns([1.4, 1, 1]) email = c4.text_input("email", placeholder="prenom.nom@domaine.com") phone = c5.text_input("Téléphone", placeholder="06 12 12 35 32") expires = c6.date_input("DateExpiration", value=date.today()) c7, c8 = st.columns(2) password = c7.text_input("Mot de passe (saisie)", type="password") confirm = c8.text_input("Confirmer mot de passe", type="password") store_plain = st.checkbox("Remplir aussi MotDePasse en clair (déconseillé)", value=False) submitted = st.form_submit_button("Créer l’utilisateur", use_container_width=True) if submitted: if not username or not full_name or not site or not email or not password: st.error("Merci de remplir tous les champs obligatoires.") elif password != confirm: st.error("Les mots de passe ne correspondent pas.") else: try: pwd_hash = insert_user( cnx, username=username.strip(), full_name=full_name.strip(), site=site.strip(), password=password, expires=expires, phone=phone.strip() if phone else None, email=email.strip(), store_plain=store_plain, ) st.success("Utilisateur créé avec succès ✅") with st.expander("Voir le hash généré (MotDePasseHash)"): st.code(pwd_hash) except mysql.connector.Error as db_err: if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR: st.error("Identifiants MySQL invalides.") else: st.error(f"Erreur MySQL : {db_err}") except Exception as e: st.error(f"Erreur : {e}") # --- Onglet Modifier --- with tab_edit: st.subheader("Modifier un utilisateur existant") try: users = list_users(cnx, limit=1000) usernames = [u["NomUtilisateur"] for u in users] except Exception as e: users = [] usernames = [] st.warning(f"Impossible de charger la liste des utilisateurs : {e}") top_left, top_right = st.columns([1.2, 2]) with top_left: sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur") field = st.selectbox( "Champ à modifier", ["Nom_complet", "Site", "Telephone", "DateExpiration", "email"] ) if field == "DateExpiration": new_value = st.date_input("Nouvelle valeur (date)", value=date.today(), key="new_date") elif field == "Telephone": new_value = st.text_input("Nouvelle valeur (téléphone)", key="new_tel") else: new_value = st.text_input("Nouvelle valeur", key="new_text") update_btn = st.button( "Mettre à jour le champ", type="primary", use_container_width=True, disabled=not sel_user, ) with top_right: if sel_user: details = get_user(cnx, sel_user) if details: c1, c2, c3, c4, c5 = st.columns(5, gap="small") c1.markdown(f"
Nom
{details['Nom_complet'] or ''}
", unsafe_allow_html=True) c2.markdown(f"
Site
{details['Site'] or ''}
", unsafe_allow_html=True) c3.markdown(f"
Expire
{details['DateExpiration'] or ''}
", unsafe_allow_html=True) c4.markdown(f"
Tél
{details['Telephone'] or ''}
", unsafe_allow_html=True) c5.markdown(f"
Email
{details['email'] or ''}
", unsafe_allow_html=True) if update_btn and sel_user: try: update_field(cnx, sel_user, field, new_value) st.success(f"✅ {field} mis à jour pour {sel_user}") except mysql.connector.Error as db_err: st.error(f"Erreur MySQL : {db_err}") except Exception as e: st.error(f"Erreur : {e}") # --- Onglet Sécurité --- with tab_security: st.subheader("Réinitialiser le mot de passe (bcrypt)") try: if 'user_cache_for_pw' not in st.session_state: st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)] pw_user_list = st.session_state.user_cache_for_pw except Exception: pw_user_list = [] user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur") colp1, colp2 = st.columns(2) new_pw = colp1.text_input("Nouveau mot de passe", type="password") new_pw2 = colp2.text_input("Confirmer", type="password") if st.button("Mettre à jour le mot de passe", disabled=not user_pw, use_container_width=True): if not new_pw: st.error("Mot de passe vide.") elif new_pw != new_pw2: st.error("Les mots de passe ne correspondent pas.") else: try: h = update_password(cnx, user_pw, new_pw) st.success("Mot de passe mis à jour ✅") st.caption("Hash (MotDePasseHash) :") st.code(h) except Exception as e: st.error(f"Erreur : {e}")