From 8e94eda6bfb26d18840c97b34702d341f2882c59 Mon Sep 17 00:00:00 2001 From: Michel Date: Tue, 19 Aug 2025 12:14:35 +0200 Subject: [PATCH] Gestions des users --- bcrypt_check.py | 26 ---- creer_user.py | 324 ++++++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 324 insertions(+), 26 deletions(-) delete mode 100644 bcrypt_check.py create mode 100644 creer_user.py diff --git a/bcrypt_check.py b/bcrypt_check.py deleted file mode 100644 index 896c317..0000000 --- a/bcrypt_check.py +++ /dev/null @@ -1,26 +0,0 @@ -#!/usr/bin/env python3 -import sys, argparse, bcrypt - -def main(): - ap = argparse.ArgumentParser() - ap.add_argument("--hash", required=True) - ap.add_argument("--password-stdin", action="store_true") - args = ap.parse_args() - - if not args.password_stdin: - print("ERR") - return 2 - - password = sys.stdin.read().rstrip("\r\n") - try: - ok = bcrypt.checkpw(password.encode("utf-8"), - args.hash.encode("ascii")) - except Exception: - print("ERR") - return 2 - - print("OK" if ok else "ERR") - return 0 if ok else 1 - -if __name__ == "__main__": - sys.exit(main()) diff --git a/creer_user.py b/creer_user.py new file mode 100644 index 0000000..375105b --- /dev/null +++ b/creer_user.py @@ -0,0 +1,324 @@ +# app_users.py — création + modification de champs (sans sidebar, .env only) +import os +import re +from datetime import date, datetime + +import bcrypt +import mysql.connector +from mysql.connector import errorcode +import streamlit as st +from dotenv import load_dotenv + +def require_login(): + load_dotenv() + admin_user = os.getenv("ADMIN_USER", "admin") + admin_hash = os.getenv("ADMIN_PASS_HASH") + + if not admin_hash: + st.error("ADMIN_PASS_HASH manquant dans .env") + st.stop() + + if "auth_ok" not in st.session_state: + st.session_state.auth_ok = False + + if not st.session_state.auth_ok: + st.title("🔐 Accès restreint") + u = st.text_input("Utilisateur") + p = st.text_input("Mot de passe", type="password") + if st.button("Se connecter"): + if u == admin_user and bcrypt.checkpw(p.encode(), admin_hash.encode()): + st.session_state.auth_ok = True + st.rerun() # ✅ nouvelle fonction + else: + st.error("Identifiants invalides") + st.stop() + +require_login() +# ====================== +# Connexion MySQL via .env +# ====================== +@st.cache_resource +def get_connection(): + load_dotenv() + host = os.getenv("DB_HOST") + port = int(os.getenv("MYSQL_PORT", "3306")) + user = os.getenv("DB_USER") + pwd = os.getenv("DB_PASSWORD") + db = os.getenv("DB_NAME") + + missing = [k for k, v in { + "MYSQL_USER": user, "MYSQL_PASSWORD": pwd, "MYSQL_HOST": host, "MYSQL_PORT": port, "MYSQL_DATABASE": db + }.items() if v in (None, "")] + if missing: + raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}") + + return mysql.connector.connect( + host=host, port=port, user=user, password=pwd, database=db, autocommit=True + ) + +# ====================== +# Utilitaires +# ====================== +EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") + +def normalize_phone(phone: str | None) -> str | None: + if not phone: + return None + return re.sub(r"[^\d+]", "", phone) + +def to_sql_date(d: date | str) -> str: + if isinstance(d, date): + return d.strftime("%Y-%m-%d") + # accepte JJ/MM/AAAA + for fmt in ("%Y-%m-%d", "%d/%m/%Y"): + try: + return datetime.strptime(d, fmt).strftime("%Y-%m-%d") + except ValueError: + continue + raise ValueError("Date invalide (attendu: YYYY-MM-DD ou JJ/MM/AAAA)") + +def hash_password(plain: str, rounds: int = 12) -> str: + salt = bcrypt.gensalt(rounds=rounds) + return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8") + +def user_exists(cursor, username: str, email: str) -> bool: + cursor.execute( + "SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s OR email=%s", + (username, email), + ) + (count,) = cursor.fetchone() + return count > 0 + +def insert_user(cnx, username, full_name, site, password, expires, phone, email, store_plain=False): + if not EMAIL_RE.match(email): + raise ValueError("Email invalide.") + phone_norm = normalize_phone(phone) + exp_sql = to_sql_date(expires) + pwd_hash = hash_password(password) + + with cnx.cursor() as cur: + if user_exists(cur, username, email): + raise RuntimeError("Nom d'utilisateur ou email déjà existant.") + if store_plain: + cur.execute( + """ + INSERT INTO Utilisateurs + (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s) + """, + (username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email), + ) + else: + cur.execute( + """ + INSERT INTO Utilisateurs + (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s) + """, + (username, full_name, site, None, pwd_hash, exp_sql, phone_norm, email), + ) + return pwd_hash + +def list_users(cnx, limit=200): + with cnx.cursor(dictionary=True) as cur: + cur.execute( + """ + SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email + FROM Utilisateurs + ORDER BY NomUtilisateur ASC + LIMIT %s + """, + (limit,), + ) + return cur.fetchall() + +def get_user(cnx, username: str): + with cnx.cursor(dictionary=True) as cur: + cur.execute( + """ + SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email + FROM Utilisateurs + WHERE NomUtilisateur=%s + """, + (username,), + ) + return cur.fetchone() + +def update_field(cnx, username: str, field: str, value): + allowed = { + "Nom_complet": "Nom_complet", + "Site": "Site", + "Telephone": "Telephone", + "DateExpiration": "DateExpiration", + "email": "email", + } + if field not in allowed: + raise ValueError("Champ non autorisé.") + sql_field = allowed[field] + + # validations + if field == "email": + if not EMAIL_RE.match(str(value)): + raise ValueError("Email invalide.") + if field == "Telephone": + value = normalize_phone(str(value)) if value else None + if field == "DateExpiration": + value = to_sql_date(value) + + with cnx.cursor() as cur: + cur.execute( + f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s", + (value, username), + ) + +def update_password(cnx, username: str, new_password: str): + pwd_hash = hash_password(new_password) + with cnx.cursor() as cur: + cur.execute( + "UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s", + (pwd_hash, username), + ) + return pwd_hash + + +# ====================== +# UI Streamlit +# ====================== +st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="centered") +st.title("Acces.Utilisateurs") + +# Connexion +try: + cnx = get_connection() + st.caption("Connexion MySQL via `.env` : **OK** ✅") +except Exception as e: + st.error(f"Connexion MySQL impossible : {e}") + st.stop() + +# --- Création --- +with st.form("create_user_form", clear_on_submit=False): + st.subheader("Nouveau compte") + + col1, col2 = st.columns(2) + username = col1.text_input("NomUtilisateur", placeholder="ex: cjaquier") + full_name = col2.text_input("Nom_complet", placeholder="Clément JAQUIER") + + col3, col4 = st.columns(2) + site = col3.text_input("Site", placeholder="Roissy", value="Roissy") + email = col4.text_input("email", placeholder="prenom.nom@domaine.com") + + col5, col6 = st.columns(2) + phone = col5.text_input("Téléphone", placeholder="06 12 12 35 32") + expires = col6.date_input("DateExpiration", value=date.today()) + + col7, col8 = st.columns(2) + password = col7.text_input("Mot de passe (saisie)", type="password") + confirm = col8.text_input("Confirmer mot de passe", type="password") + + store_plain = st.checkbox("Remplir aussi MotDePasse en clair (déconseillé)", value=False) + submitted = st.form_submit_button("Créer l’utilisateur", use_container_width=True) + + if submitted: + if not username or not full_name or not site or not email or not password: + st.error("Merci de remplir tous les champs obligatoires.") + elif password != confirm: + st.error("Les mots de passe ne correspondent pas.") + else: + try: + pwd_hash = insert_user( + cnx, + username=username.strip(), + full_name=full_name.strip(), + site=site.strip(), + password=password, + expires=expires, + phone=phone.strip() if phone else None, + email=email.strip(), + store_plain=store_plain, + ) + st.success("Utilisateur créé avec succès ✅") + with st.expander("Voir le hash généré (MotDePasseHash)"): + st.code(pwd_hash) + except mysql.connector.Error as db_err: + if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR: + st.error("Identifiants MySQL invalides.") + else: + st.error(f"Erreur MySQL : {db_err}") + except Exception as e: + st.error(f"Erreur : {e}") + +st.divider() + +# --- Modification d'un champ --- +st.subheader("Modifier un utilisateur existant") + +# chargement liste +try: + users = list_users(cnx, limit=1000) + usernames = [u["NomUtilisateur"] for u in users] +except Exception as e: + users = [] + usernames = [] + st.warning(f"Impossible de charger la liste des utilisateurs : {e}") + +colA, colB = st.columns([2, 3]) +with colA: + sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur") +with colB: + if sel_user: + details = get_user(cnx, sel_user) + if details: + st.caption(f"Actuel — Nom: **{details['Nom_complet']}**, Site: **{details['Site']}**, " + f"Expiration: **{details['DateExpiration']}**, Tél: **{details['Telephone']}**, " + f"Email: **{details['email']}**") + +col1, col2 = st.columns(2) +with col1: + field = st.selectbox("Champ à modifier", ["Nom_complet", "Site", "Telephone", "DateExpiration", "email"]) +with col2: + # widget adapté au champ + if field == "DateExpiration": + new_value = st.date_input("Nouvelle valeur (date)", value=date.today(), key="new_date") + elif field == "Telephone": + new_value = st.text_input("Nouvelle valeur (téléphone)", key="new_tel") + else: + new_value = st.text_input("Nouvelle valeur", key="new_text") + +btn_update = st.button("Mettre à jour le champ", type="primary", use_container_width=True, disabled=not sel_user) + +if btn_update and sel_user: + try: + update_field(cnx, sel_user, field, new_value) + st.success(f"✅ {field} mis à jour pour {sel_user}") + except mysql.connector.Error as db_err: + st.error(f"Erreur MySQL : {db_err}") + except Exception as e: + st.error(f"Erreur : {e}") + +# --- Réinitialisation mot de passe (optionnel) --- +with st.expander("🔐 Réinitialiser le mot de passe (bcrypt)"): + user_pw = st.selectbox("Utilisateur", usernames, key="pw_user", placeholder="Choisir un utilisateur") + colp1, colp2 = st.columns(2) + new_pw = colp1.text_input("Nouveau mot de passe", type="password") + new_pw2 = colp2.text_input("Confirmer", type="password") + if st.button("Mettre à jour le mot de passe", disabled=not user_pw): + if not new_pw: + st.error("Mot de passe vide.") + elif new_pw != new_pw2: + st.error("Les mots de passe ne correspondent pas.") + else: + try: + h = update_password(cnx, user_pw, new_pw) + st.success("Mot de passe mis à jour ✅") + st.caption("Hash (MotDePasseHash) :") + st.code(h) + except Exception as e: + st.error(f"Erreur : {e}") + +st.divider() +st.subheader("Utilisateurs récents") +try: + rows = list_users(cnx, limit=200) + st.dataframe(rows, use_container_width=True, hide_index=True) +except Exception as e: + st.warning(f"Impossible de lister les utilisateurs : {e}")