# app_users.py — création + modification de champs (sans sidebar, .env only) import os import re from datetime import date, datetime import bcrypt import mysql.connector from mysql.connector import errorcode import streamlit as st from dotenv import load_dotenv def require_login(): load_dotenv() admin_user = os.getenv("ADMIN_USER", "admin") admin_hash = os.getenv("ADMIN_PASS_HASH") if not admin_hash: st.error("ADMIN_PASS_HASH manquant dans .env") st.stop() if "auth_ok" not in st.session_state: st.session_state.auth_ok = False if not st.session_state.auth_ok: st.title("🔐 Accès restreint") u = st.text_input("Utilisateur") p = st.text_input("Mot de passe", type="password") if st.button("Se connecter"): if u == admin_user and bcrypt.checkpw(p.encode(), admin_hash.encode()): st.session_state.auth_ok = True st.rerun() # ✅ nouvelle fonction else: st.error("Identifiants invalides") st.stop() require_login() # ====================== # Connexion MySQL via .env # ====================== @st.cache_resource def get_connection(): load_dotenv() host = os.getenv("DB_HOST") port = int(os.getenv("MYSQL_PORT", "3306")) user = os.getenv("DB_USER") pwd = os.getenv("DB_PASSWORD") db = os.getenv("DB_NAME") missing = [k for k, v in { "MYSQL_USER": user, "MYSQL_PASSWORD": pwd, "MYSQL_HOST": host, "MYSQL_PORT": port, "MYSQL_DATABASE": db }.items() if v in (None, "")] if missing: raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}") return mysql.connector.connect( host=host, port=port, user=user, password=pwd, database=db, autocommit=True ) # ====================== # Utilitaires # ====================== EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") def normalize_phone(phone: str | None) -> str | None: if not phone: return None return re.sub(r"[^\d+]", "", phone) def to_sql_date(d: date | str) -> str: if isinstance(d, date): return d.strftime("%Y-%m-%d") # accepte JJ/MM/AAAA for fmt in ("%Y-%m-%d", "%d/%m/%Y"): try: return datetime.strptime(d, fmt).strftime("%Y-%m-%d") except ValueError: continue raise ValueError("Date invalide (attendu: YYYY-MM-DD ou JJ/MM/AAAA)") def hash_password(plain: str, rounds: int = 12) -> str: salt = bcrypt.gensalt(rounds=rounds) return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8") def user_exists(cursor, username: str, email: str) -> bool: cursor.execute( "SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s OR email=%s", (username, email), ) (count,) = cursor.fetchone() return count > 0 def insert_user(cnx, username, full_name, site, password, expires, phone, email, store_plain=False): if not EMAIL_RE.match(email): raise ValueError("Email invalide.") phone_norm = normalize_phone(phone) exp_sql = to_sql_date(expires) pwd_hash = hash_password(password) with cnx.cursor() as cur: if user_exists(cur, username, email): raise RuntimeError("Nom d'utilisateur ou email déjà existant.") if store_plain: cur.execute( """ INSERT INTO Utilisateurs (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) """, (username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email), ) else: cur.execute( """ INSERT INTO Utilisateurs (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) """, (username, full_name, site, None, pwd_hash, exp_sql, phone_norm, email), ) return pwd_hash def list_users(cnx, limit=200): with cnx.cursor(dictionary=True) as cur: cur.execute( """ SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email FROM Utilisateurs ORDER BY NomUtilisateur ASC LIMIT %s """, (limit,), ) return cur.fetchall() def get_user(cnx, username: str): with cnx.cursor(dictionary=True) as cur: cur.execute( """ SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email FROM Utilisateurs WHERE NomUtilisateur=%s """, (username,), ) return cur.fetchone() def update_field(cnx, username: str, field: str, value): allowed = { "Nom_complet": "Nom_complet", "Site": "Site", "Telephone": "Telephone", "DateExpiration": "DateExpiration", "email": "email", } if field not in allowed: raise ValueError("Champ non autorisé.") sql_field = allowed[field] # validations if field == "email": if not EMAIL_RE.match(str(value)): raise ValueError("Email invalide.") if field == "Telephone": value = normalize_phone(str(value)) if value else None if field == "DateExpiration": value = to_sql_date(value) with cnx.cursor() as cur: cur.execute( f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s", (value, username), ) def update_password(cnx, username: str, new_password: str): pwd_hash = hash_password(new_password) with cnx.cursor() as cur: cur.execute( "UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s", (pwd_hash, username), ) return pwd_hash # ====================== # UI Streamlit # ====================== st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="centered") st.title("Acces.Utilisateurs") # Connexion try: cnx = get_connection() st.caption("Connexion MySQL via `.env` : **OK** ✅") except Exception as e: st.error(f"Connexion MySQL impossible : {e}") st.stop() # --- Création --- with st.form("create_user_form", clear_on_submit=False): st.subheader("Nouveau compte") col1, col2 = st.columns(2) username = col1.text_input("NomUtilisateur", placeholder="ex: cjaquier") full_name = col2.text_input("Nom_complet", placeholder="Clément JAQUIER") col3, col4 = st.columns(2) site = col3.text_input("Site", placeholder="Roissy", value="Roissy") email = col4.text_input("email", placeholder="prenom.nom@domaine.com") col5, col6 = st.columns(2) phone = col5.text_input("Téléphone", placeholder="06 12 12 35 32") expires = col6.date_input("DateExpiration", value=date.today()) col7, col8 = st.columns(2) password = col7.text_input("Mot de passe (saisie)", type="password") confirm = col8.text_input("Confirmer mot de passe", type="password") store_plain = st.checkbox("Remplir aussi MotDePasse en clair (déconseillé)", value=False) submitted = st.form_submit_button("Créer l’utilisateur", use_container_width=True) if submitted: if not username or not full_name or not site or not email or not password: st.error("Merci de remplir tous les champs obligatoires.") elif password != confirm: st.error("Les mots de passe ne correspondent pas.") else: try: pwd_hash = insert_user( cnx, username=username.strip(), full_name=full_name.strip(), site=site.strip(), password=password, expires=expires, phone=phone.strip() if phone else None, email=email.strip(), store_plain=store_plain, ) st.success("Utilisateur créé avec succès ✅") with st.expander("Voir le hash généré (MotDePasseHash)"): st.code(pwd_hash) except mysql.connector.Error as db_err: if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR: st.error("Identifiants MySQL invalides.") else: st.error(f"Erreur MySQL : {db_err}") except Exception as e: st.error(f"Erreur : {e}") st.divider() # --- Modification d'un champ --- st.subheader("Modifier un utilisateur existant") # chargement liste try: users = list_users(cnx, limit=1000) usernames = [u["NomUtilisateur"] for u in users] except Exception as e: users = [] usernames = [] st.warning(f"Impossible de charger la liste des utilisateurs : {e}") colA, colB = st.columns([2, 3]) with colA: sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur") with colB: if sel_user: details = get_user(cnx, sel_user) if details: st.caption(f"Actuel — Nom: **{details['Nom_complet']}**, Site: **{details['Site']}**, " f"Expiration: **{details['DateExpiration']}**, Tél: **{details['Telephone']}**, " f"Email: **{details['email']}**") col1, col2 = st.columns(2) with col1: field = st.selectbox("Champ à modifier", ["Nom_complet", "Site", "Telephone", "DateExpiration", "email"]) with col2: # widget adapté au champ if field == "DateExpiration": new_value = st.date_input("Nouvelle valeur (date)", value=date.today(), key="new_date") elif field == "Telephone": new_value = st.text_input("Nouvelle valeur (téléphone)", key="new_tel") else: new_value = st.text_input("Nouvelle valeur", key="new_text") btn_update = st.button("Mettre à jour le champ", type="primary", use_container_width=True, disabled=not sel_user) if btn_update and sel_user: try: update_field(cnx, sel_user, field, new_value) st.success(f"✅ {field} mis à jour pour {sel_user}") except mysql.connector.Error as db_err: st.error(f"Erreur MySQL : {db_err}") except Exception as e: st.error(f"Erreur : {e}") # --- Réinitialisation mot de passe (optionnel) --- with st.expander("🔐 Réinitialiser le mot de passe (bcrypt)"): user_pw = st.selectbox("Utilisateur", usernames, key="pw_user", placeholder="Choisir un utilisateur") colp1, colp2 = st.columns(2) new_pw = colp1.text_input("Nouveau mot de passe", type="password") new_pw2 = colp2.text_input("Confirmer", type="password") if st.button("Mettre à jour le mot de passe", disabled=not user_pw): if not new_pw: st.error("Mot de passe vide.") elif new_pw != new_pw2: st.error("Les mots de passe ne correspondent pas.") else: try: h = update_password(cnx, user_pw, new_pw) st.success("Mot de passe mis à jour ✅") st.caption("Hash (MotDePasseHash) :") st.code(h) except Exception as e: st.error(f"Erreur : {e}") st.divider() st.subheader("Utilisateurs récents") try: rows = list_users(cnx, limit=200) st.dataframe(rows, use_container_width=True, hide_index=True) except Exception as e: st.warning(f"Impossible de lister les utilisateurs : {e}")