diff --git a/app/Test_Mysql.py b/app/Test_Mysql.py deleted file mode 100644 index e83a7a7..0000000 --- a/app/Test_Mysql.py +++ /dev/null @@ -1,17 +0,0 @@ -import os, mysql.connector -from dotenv import load_dotenv -from pathlib import Path - -# charge le .env **avec chemin absolu** -load_dotenv(Path(__file__).resolve().parent.joinpath(".env")) - -conn = mysql.connector.connect( - host=os.getenv("DB_HOST"), - user=os.getenv("DB_USER"), - password=os.getenv("DB_PASSWORD"), - database=os.getenv("DB_NAME"), - connection_timeout=5, -) -conn.ping(reconnect=True, attempts=3, delay=1) -print("CONNECTED" if conn.is_connected() else "KO") -conn.close() diff --git a/app/creer_user.py b/app/creer_user.py deleted file mode 100644 index 9220b1b..0000000 --- a/app/creer_user.py +++ /dev/null @@ -1,596 +0,0 @@ -# Streamlit app -import os -from datetime import date, datetime -import re -import bcrypt -import mysql.connector -from mysql.connector import pooling, errorcode -import pandas as pd -import streamlit as st -from dotenv import load_dotenv -import smtplib -from email.message import EmailMessage -from email.utils import formatdate - -load_dotenv() -SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net") -SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # OVH: 465 (SSL) ou 587 (STARTTLS) -SMTP_USER = os.getenv("SMTP_USER") -SMTP_PASS = os.getenv("SMTP_PASS") -SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER) - -# ----------------------- -# Auth minimale -# ----------------------- -def require_login(): - st.markdown( - "", - unsafe_allow_html=True, - ) - load_dotenv() - admin_user = os.getenv("ADMIN_USER") - admin_hash = os.getenv("ADMIN_PASS_HASH") - if not admin_user or not admin_hash: - st.error("Variables ADMIN_USER et/ou ADMIN_PASS_HASH manquantes dans .env") - st.stop() - - if "auth_ok" not in st.session_state: - st.session_state.auth_ok = False - - if not st.session_state.auth_ok: - col1, col2, col3 = st.columns([1, 2, 1]) - with col2: - st.header("🔐 Accùs restreint") - u = st.text_input("Utilisateur") - p = st.text_input("Mot de passe", type="password") - if st.button("Se connecter", use_container_width=True): - try: - ok = (u == admin_user) and bcrypt.checkpw(p.encode(), admin_hash.encode()) - except Exception: - ok = False - if ok: - st.session_state.auth_ok = True - st.rerun() - else: - st.error("Identifiants invalides.") - st.stop() -def logout_button(): - st.markdown( - """ - - """, - unsafe_allow_html=True - ) - st.markdown('
', unsafe_allow_html=True) - if st.button("đŸšȘ Quitter", key="logout", use_container_width=False): - st.session_state.clear() - st.success("DĂ©connexion effectuĂ©e.") - st.rerun() - st.markdown('
', unsafe_allow_html=True) - - -require_login() -logout_button() - -# ----------------------- -# Connexion MySQL via pool -# ----------------------- -@st.cache_resource -def get_pool(): - load_dotenv() - host = os.getenv("DB_HOST") - port = int(os.getenv("MYSQL_PORT", "3306")) - user = os.getenv("DB_USER") - pwd = os.getenv("DB_PASSWORD") - db = os.getenv("DB_NAME") - missing = [k for k, v in { - "DB_HOST": host, "MYSQL_PORT": port, "DB_USER": user, "DB_PASSWORD": pwd, "DB_NAME": db - }.items() if v in (None, "")] - if missing: - raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}") - return pooling.MySQLConnectionPool( - pool_name="users_pool", - pool_size=5, - pool_reset_session=True, - host=host, port=port, user=user, password=pwd, database=db, - autocommit=True - ) - -pool = get_pool() - -# ----------------------- -# Helpers SQL + validations -# ----------------------- -EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") -PHONE_RE = re.compile(r"\d{10,14}") - -def normalize_phone(p: str|None) -> str|None: - if not p: - return None - digits = re.sub(r"\D", "", p) - return digits if PHONE_RE.match(digits) else None - -def to_sql_date(d: date | str | None) -> str | None: - if d is None: - return None - if isinstance(d, str): - try: - d = datetime.fromisoformat(d).date() - except Exception: - return None - return d.strftime("%Y-%m-%d") - -def hash_password(plain: str, rounds: int = 12) -> str: - salt = bcrypt.gensalt(rounds=rounds) - return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8") - -def user_exists(cur, username: str) -> bool: - cur.execute("SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s", (username,)) - (count,) = cur.fetchone() - return count > 0 -def find_users_by_email(cnx, email: str): - with cnx.cursor(dictionary=True) as cur: - cur.execute( - "SELECT NomUtilisateur, Site FROM Utilisateurs WHERE email=%s ORDER BY NomUtilisateur", - (email,), - ) - return cur.fetchall() -def list_users(cnx, limit: int = 500, include_password=False): - fields = [ - "NomUtilisateur", "Nom_complet", "Site", "DateExpiration", - "Telephone", "email" - ] - if include_password: - fields.append("MotDePasse") - - sql = f"SELECT {', '.join(fields)} FROM Utilisateurs ORDER BY NomUtilisateur LIMIT %s" - with cnx.cursor(dictionary=True) as cur: - cur.execute(sql, (limit,)) - return cur.fetchall() - -def insert_user(cnx, username, full_name, site, password, expires, phone, email, store_plain=False): - if not EMAIL_RE.match(email): - raise ValueError("Email invalide.") - phone_norm = normalize_phone(phone) - exp_sql = to_sql_date(expires) - pwd_hash = hash_password(password) - - with cnx.cursor() as cur: - # ✅ vĂ©rif uniquement sur NomUtilisateur - if user_exists(cur, username): - raise RuntimeError("Nom d'utilisateur dĂ©jĂ  existant.") - - if store_plain: - cur.execute( - """ - INSERT INTO Utilisateurs - (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email) - VALUES (%s,%s,%s,%s,%s,%s,%s,%s) - """, - (username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email), - ) - else: - cur.execute( - """ - INSERT INTO Utilisateurs - (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email) - VALUES (%s,%s,%s,NULL,%s,%s,%s,%s) - """, - (username, full_name, site, pwd_hash, exp_sql, phone_norm, email), - ) - return pwd_hash - - -def get_user_details(cnx, username: str): - with cnx.cursor(dictionary=True) as cur: - cur.execute( - """ - SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email - FROM Utilisateurs WHERE NomUtilisateur=%s - """, - (username,), - ) - return cur.fetchone() - -def update_field(cnx, username: str, field: str, value): - allowed = { - "Nom_complet": "Nom_complet", - "Site": "Site", - "DateExpiration": "DateExpiration", - "Telephone": "Telephone", - "email": "email", - } - if field not in allowed: - raise ValueError("Champ non autorisĂ©.") - sql_field = allowed[field] - - if field == "email": - if not EMAIL_RE.match(str(value)): - raise ValueError("Email invalide.") - if field == "Telephone": - value = normalize_phone(str(value)) if value else None - if field == "DateExpiration": - value = to_sql_date(value) - - with cnx.cursor() as cur: - cur.execute( - f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s", - (value, username), - ) - -def update_password(cnx, username: str, new_password: str): - pwd_hash = hash_password(new_password) - with cnx.cursor() as cur: - cur.execute( - "UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s", - (pwd_hash, username), - ) - return pwd_hash - -def send_mail(to_email: str, subject: str, body_text: str, body_html: str | None = None): - if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM): - raise RuntimeError("Configuration SMTP incomplĂšte (SMTP_HOST/PORT/USER/PASS/FROM).") - if not to_email: - raise ValueError("Destinataire vide.") - - msg = EmailMessage() - msg["From"] = SMTP_FROM - msg["To"] = to_email - msg["Date"] = formatdate(localtime=True) - msg["Subject"] = subject - msg.set_content(body_text) - if body_html: - msg.add_alternative(body_html, subtype="html") - - # Choix du protocole en fonction du port - if SMTP_PORT == 465: - with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=20) as s: - s.login(SMTP_USER, SMTP_PASS) - s.send_message(msg) - else: - # 587 (ou autre) : STARTTLS - with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as s: - s.ehlo() - s.starttls() # passe en TLS - s.ehlo() - s.login(SMTP_USER, SMTP_PASS) - s.send_message(msg) - - def get_user_email_and_field(cnx, username: str, field: str): - cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"} - if field not in cols: - field = "Nom_complet" # fallback - with cnx.cursor(dictionary=True) as cur: - cur.execute( - f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s", - (username,) - ) - row = cur.fetchone() - return (row.get("email") if row else None, row.get("field_value") if row else None) - -def get_user_email_and_field(cnx, username: str, field: str): - cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"} - if field not in cols: - field = "Nom_complet" # fallback - with cnx.cursor(dictionary=True) as cur: - cur.execute( - f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s", - (username,) - ) - row = cur.fetchone() - return (row.get("email") if row else None, row.get("field_value") if row else None) -# ----------------------- -# UI -# ----------------------- -st.set_page_config(page_title="Acces.Utilisateurs", page_icon="đŸ‘€", layout="wide") -st.title("Gestion des utilisateurs") - -tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "CrĂ©er", "Modifier", "SĂ©curitĂ©"]) - -# ----------------------- -# ONGLET LISTE -# ----------------------- -with tab_list: - st.subheader("Utilisateurs") - try: - cnx = pool.get_connection() - try: - include_pw = os.getenv("ADMIN_USER") == st.session_state.get("current_user", os.getenv("ADMIN_USER")) - rows = list_users(cnx, limit=1000, include_password=include_pw) - finally: - cnx.close() - - df = pd.DataFrame(rows) - if not df.empty: - from datetime import date - df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date - - # --- Ajout coloration et statut expiration --- - ALERTE_JOURS = 30 - exp = pd.to_datetime(df["DateExpiration"], errors="coerce") - today = pd.Timestamp(date.today()) - - df["Jours_restant"] = (exp - today).dt.days - - def statut_expiration(j): - if pd.isna(j): - return "Inconnu" - j = int(j) - if j < 0: - return "PĂ©rimĂ©" - if j <= ALERTE_JOURS: - return f"BientĂŽt (≀{ALERTE_JOURS}j)" - return "OK" - - df["Statut"] = df["Jours_restant"].apply(statut_expiration) - - c1, c2, c3 = st.columns([1, 1, 3]) - with c1: - only_bad = st.checkbox("📌 PĂ©rimĂ©s / bientĂŽt", value=False) - with c2: - tri_jours = st.checkbox("đŸ”œ Trier par Jours restants", value=True) - - if only_bad: - df = df[df["Statut"].isin(["PĂ©rimĂ©", f"BientĂŽt (≀{ALERTE_JOURS}j)"])] - - if tri_jours and "Jours_restant" in df.columns: - df = df.sort_values("Jours_restant", na_position="last") - - def colorize(row): - stt = row.get("Statut", "") - n = len(row) - if stt == "PĂ©rimĂ©": - return ["background-color:#ffe6e6; color:#b00020;"] * n # rouge clair - if stt.startswith("BientĂŽt"): - return ["background-color:#fff7e6; color:#8a6d3b;"] * n # orange clair - return [""] * n - - styled = df.style.apply(colorize, axis=1) - - st.dataframe( - styled, - use_container_width=True, - hide_index=True, - column_config={ - "DateExpiration": st.column_config.DateColumn("DateExpiration", format="YYYY-MM-DD"), - "Jours_restant": st.column_config.NumberColumn("Jours restants", help="NĂ©gatif = pĂ©rimĂ©"), - }, - ) - st.caption(f"{len(df)} utilisateur(s) affichĂ©(s)") - else: - st.info("Aucun utilisateur Ă  afficher.") - except Exception as e: - st.warning(f"Impossible de lister les utilisateurs : {e}") - -# ----------------------- -# ONGLET CREER -# ----------------------- - -with tab_create: - with st.form("create_user_form", clear_on_submit=False): - st.subheader("Nouveau compte") - - c1, c2, c3 = st.columns([1.2, 1.5, 1]) - username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier") - full_name = c2.text_input("Nom_complet", placeholder="ClĂ©ment JAQUIER") - site = c3.text_input("Site", placeholder="Roissy", value="Roissy") - - c4, c5, c6 = st.columns([1.4, 1, 1]) - email = c4.text_input("email", placeholder="prenom.nom@domaine.com") - phone = c5.text_input("TĂ©lĂ©phone", placeholder="06 12 12 35 32") - expires = c6.date_input("DateExpiration", value=date.today()) - - c7, c8 = st.columns(2) - password = c7.text_input("Mot de passe", type="password") - password2 = c8.text_input("Confirmer", type="password") - - col_cb1, col_cb2 = st.columns([1.2, 1]) - # si tu stockes encore le mot de passe en clair dans la table - store_plain = col_cb1.checkbox("Stocker le mot de passe en clair (dĂ©conseillĂ©)", value=False) - # ✅ nouvelle case pour l'e-mail de bienvenue Ă  la crĂ©ation - notify_welcome = col_cb2.checkbox("Envoyer un e-mail de bienvenue", value=True, - help="Enverra l'identifiant, le nom, le site et le mot de passe en clair") - - submitted = st.form_submit_button("CrĂ©er l'utilisateur", use_container_width=True) - - if submitted: - if not username or not full_name or not site or not email: - st.error("Champs requis manquants.") - elif not EMAIL_RE.match(email): - st.error("Format d’e-mail invalide.") - elif password != password2: - st.error("Les mots de passe ne correspondent pas.") - else: - try: - cnx = pool.get_connection() - try: - # 🔎 avertir si l'e-mail existe dĂ©jĂ  (mais on n'empĂȘche pas) - dup = find_users_by_email(cnx, email) - if dup: - liste = ", ".join(f"{u['NomUtilisateur']}@{u['Site']}" for u in dup) - st.info(f"Cet e-mail est dĂ©jĂ  utilisĂ© par : {liste}") - - pwd_hash = insert_user( - cnx, username=username, full_name=full_name, site=site, - password=password, expires=expires, phone=phone, email=email, - store_plain=store_plain - ) - finally: - cnx.close() - - st.success("Utilisateur créé avec succĂšs ✅") - st.caption("Hash (MotDePasseHash) :") - st.code(pwd_hash) - - # ✉ Mail de bienvenue (optionnel) - if notify_welcome: - try: - subj = f"[Compte créé] Vos accĂšs — {site}" - body_txt = ( - "Bonjour,\n\n" - "Votre compte a Ă©tĂ© créé.\n\n" - f"Nom d’utilisateur : {username}\n" - f"Nom complet : {full_name}\n" - f"Site : {site}\n" - f"Mot de passe : {password}\n" - f"Date d'expiration: {expires.strftime('%Y-%m-%d')}\n\n" - "Cordialement." - ) - body_html = f""" - -

Bonjour,

-

Votre compte a été créé.

- - - - - - -
Nom d’utilisateur{username}
Nom complet{full_name}
Site{site}
Mot de passe{password}
Date d'expiration{expires.strftime('%Y-%m-%d')}
-

Cordialement.

- - """ - send_mail(email, subj, body_txt, body_html) - st.success(f"✉ E-mail de bienvenue envoyĂ© Ă  {email}") - except Exception as e_mail: - st.warning(f"E-mail non envoyĂ© : {e_mail}") - - except mysql.connector.Error as db_err: - if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR: - st.error("Identifiants MySQL invalides.") - else: - st.error(f"Erreur MySQL : {db_err}") - except Exception as e: - st.error(f"Erreur : {e}") - -# ----------------------- -# ONGLET MODIFIER -# ----------------------- - -with tab_edit: - st.subheader("Modifier un utilisateur existant") - try: - cnx = pool.get_connection() - try: - users = list_users(cnx, limit=1000) - finally: - cnx.close() - usernames = [u["NomUtilisateur"] for u in users] - except Exception as e: - users = [] - usernames = [] - st.warning(f"Impossible de charger la liste des utilisateurs : {e}") - - top_left, top_right = st.columns([1.2, 2]) - with top_left: - sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur") - field = st.selectbox( - "Champ Ă  modifier", - ["Nom_complet", "Site", "DateExpiration", "Telephone", "email"] - ) - - with top_right: - if field == "DateExpiration": - new_value = st.date_input("Nouvelle valeur", value=date.today()) - else: - new_value = st.text_input("Nouvelle valeur") - notify_user = st.checkbox("Notifier l’utilisateur par e-mail", value=True, help="Envoie un e-mail si cochĂ©") - update_btn = st.button("Mettre Ă  jour", disabled=not sel_user, use_container_width=True) - if update_btn and sel_user: - try: - cnx = pool.get_connection() - try: - # 1) Lire email + ancienne valeur - to_email, old_value = get_user_email_and_field(cnx, sel_user, field) - - # 2) Appliquer la mise Ă  jour - update_field(cnx, sel_user, field, new_value) - finally: - cnx.close() - - st.success(f"✅ {field} mis Ă  jour pour {sel_user}") - - # 3) Notification mail si demandĂ© - if notify_user: - try: - nv = new_value.strftime("%Y-%m-%d") if hasattr(new_value, "strftime") else new_value - ov = old_value.strftime("%Y-%m-%d") if hasattr(old_value, "strftime") else old_value - - if not to_email: - st.info("â„č Aucune notification envoyĂ©e : adresse e-mail manquante.") - else: - subject = f"[Compte] Mise Ă  jour de votre information : {field}" - body_txt = ( - f"Bonjour,\n\n" - f"Votre information '{field}' vient d’ĂȘtre mise Ă  jour par l’administrateur.\n" - f"Ancienne valeur : {ov}\n" - f"Nouvelle valeur : {nv}\n\n" - f"Si vous n’ĂȘtes pas Ă  l’origine de cette demande, rĂ©pondez Ă  cet e-mail.\n" - f"Cordialement." - ) - body_html = f""" - -

Bonjour,

-

Votre information {field} vient d’ĂȘtre mise Ă  jour par l’administrateur.

- -

Si vous n’ĂȘtes pas Ă  l’origine de cette demande, rĂ©pondez Ă  cet e-mail.

-

Cordialement.

- - """ - send_mail(to_email, subject, body_txt, body_html) - st.success(f"✉ Notification envoyĂ©e Ă  {to_email}") - except Exception as e_mail: - st.warning(f"Notification non envoyĂ©e : {e_mail}") - - except mysql.connector.Error as db_err: - st.error(f"Erreur MySQL : {db_err}") - except Exception as e: - st.error(f"Erreur : {e}") -# ----------------------- -# ONGLET SECURITE -# ----------------------- - -with tab_security: - st.subheader("RĂ©initialiser le mot de passe (bcrypt)") - try: - if 'user_cache_for_pw' not in st.session_state: - cnx = pool.get_connection() - try: - st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)] - finally: - cnx.close() - pw_user_list = st.session_state.user_cache_for_pw - except Exception: - pw_user_list = [] - - user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur") - colp1, colp2 = st.columns(2) - new_pw = colp1.text_input("Nouveau mot de passe", type="password") - new_pw2 = colp2.text_input("Confirmer", type="password") - - if st.button("Mettre Ă  jour le mot de passe", disabled=not user_pw, use_container_width=True): - if not new_pw: - st.error("Mot de passe vide.") - elif new_pw != new_pw2: - st.error("Les mots de passe ne correspondent pas.") - else: - try: - cnx = pool.get_connection() - try: - h = update_password(cnx, user_pw, new_pw) - finally: - cnx.close() - st.success("Mot de passe mis Ă  jour ✅") - st.caption("Hash (MotDePasseHash) :") - st.code(h) - except Exception as e: - st.error(f"Erreur : {e}") diff --git a/app/generate_admin_hash.py b/app/generate_admin_hash.py deleted file mode 100644 index b5834ee..0000000 --- a/app/generate_admin_hash.py +++ /dev/null @@ -1,6 +0,0 @@ -import bcrypt - -password = input("Entre le mot de passe admin Ă  hacher : ") -hashed = bcrypt.hashpw(password.encode(), bcrypt.gensalt(rounds=12)) -print("\nHash gĂ©nĂ©rĂ© :") -print(hashed.decode()) \ No newline at end of file