# Streamlit app import os from datetime import date, datetime import re import bcrypt import mysql.connector from mysql.connector import pooling, errorcode import pandas as pd import streamlit as st from dotenv import load_dotenv import smtplib from email.message import EmailMessage from email.utils import formatdate load_dotenv() SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net") SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # OVH: 465 (SSL) ou 587 (STARTTLS) SMTP_USER = os.getenv("SMTP_USER") SMTP_PASS = os.getenv("SMTP_PASS") SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER) # ----------------------- # Auth minimale # ----------------------- def require_login(): st.markdown( "", unsafe_allow_html=True, ) load_dotenv() admin_user = os.getenv("ADMIN_USER") admin_hash = os.getenv("ADMIN_PASS_HASH") if not admin_user or not admin_hash: st.error("Variables ADMIN_USER et/ou ADMIN_PASS_HASH manquantes dans .env") st.stop() if "auth_ok" not in st.session_state: st.session_state.auth_ok = False if not st.session_state.auth_ok: col1, col2, col3 = st.columns([1, 2, 1]) with col2: st.header("🔐 Accùs restreint") u = st.text_input("Utilisateur") p = st.text_input("Mot de passe", type="password") if st.button("Se connecter", use_container_width=True): try: ok = (u == admin_user) and bcrypt.checkpw(p.encode(), admin_hash.encode()) except Exception: ok = False if ok: st.session_state.auth_ok = True st.rerun() else: st.error("Identifiants invalides.") st.stop() def logout_button(): st.markdown( """ """, unsafe_allow_html=True ) st.markdown('
', unsafe_allow_html=True) if st.button("đŸšȘ Quitter", key="logout", use_container_width=False): st.session_state.clear() st.success("DĂ©connexion effectuĂ©e.") st.rerun() st.markdown('
', unsafe_allow_html=True) require_login() logout_button() # ----------------------- # Connexion MySQL via pool # ----------------------- @st.cache_resource def get_pool(): load_dotenv() host = os.getenv("DB_HOST") port = int(os.getenv("MYSQL_PORT", "3306")) user = os.getenv("DB_USER") pwd = os.getenv("DB_PASSWORD") db = os.getenv("DB_NAME") missing = [k for k, v in { "DB_HOST": host, "MYSQL_PORT": port, "DB_USER": user, "DB_PASSWORD": pwd, "DB_NAME": db }.items() if v in (None, "")] if missing: raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}") return pooling.MySQLConnectionPool( pool_name="users_pool", pool_size=5, pool_reset_session=True, host=host, port=port, user=user, password=pwd, database=db, autocommit=True ) pool = get_pool() # ----------------------- # Helpers SQL + validations # ----------------------- EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") PHONE_RE = re.compile(r"\d{10,14}") def normalize_phone(p: str|None) -> str|None: if not p: return None digits = re.sub(r"\D", "", p) return digits if PHONE_RE.match(digits) else None def to_sql_date(d: date | str | None) -> str | None: if d is None: return None if isinstance(d, str): try: d = datetime.fromisoformat(d).date() except Exception: return None return d.strftime("%Y-%m-%d") def hash_password(plain: str, rounds: int = 12) -> str: salt = bcrypt.gensalt(rounds=rounds) return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8") def user_exists(cur, username: str, email: str) -> bool: cur.execute( "SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s OR email=%s", (username, email), ) (count,) = cur.fetchone() return count > 0 def list_users(cnx, limit: int = 500, include_password=False): fields = [ "NomUtilisateur", "Nom_complet", "Site", "DateExpiration", "Telephone", "email" ] if include_password: fields.append("MotDePasse") sql = f"SELECT {', '.join(fields)} FROM Utilisateurs ORDER BY NomUtilisateur LIMIT %s" with cnx.cursor(dictionary=True) as cur: cur.execute(sql, (limit,)) return cur.fetchall() def insert_user(cnx, username, full_name, site, password, expires, phone, email, store_plain=False): if not EMAIL_RE.match(email): raise ValueError("Email invalide.") phone_norm = normalize_phone(phone) exp_sql = to_sql_date(expires) pwd_hash = hash_password(password) with cnx.cursor() as cur: if user_exists(cur, username, email): raise RuntimeError("Nom d'utilisateur ou email dĂ©jĂ  existant.") if store_plain: cur.execute( """ INSERT INTO Utilisateurs (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email) VALUES (%s,%s,%s,%s,%s,%s,%s,%s) """, (username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email), ) else: cur.execute( """ INSERT INTO Utilisateurs (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email) VALUES (%s,%s,%s,NULL,%s,%s,%s,%s) """, (username, full_name, site, pwd_hash, exp_sql, phone_norm, email), ) return pwd_hash def get_user_details(cnx, username: str): with cnx.cursor(dictionary=True) as cur: cur.execute( """ SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email FROM Utilisateurs WHERE NomUtilisateur=%s """, (username,), ) return cur.fetchone() def update_field(cnx, username: str, field: str, value): allowed = { "Nom_complet": "Nom_complet", "Site": "Site", "DateExpiration": "DateExpiration", "Telephone": "Telephone", "email": "email", } if field not in allowed: raise ValueError("Champ non autorisĂ©.") sql_field = allowed[field] if field == "email": if not EMAIL_RE.match(str(value)): raise ValueError("Email invalide.") if field == "Telephone": value = normalize_phone(str(value)) if value else None if field == "DateExpiration": value = to_sql_date(value) with cnx.cursor() as cur: cur.execute( f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s", (value, username), ) def update_password(cnx, username: str, new_password: str): pwd_hash = hash_password(new_password) with cnx.cursor() as cur: cur.execute( "UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s", (pwd_hash, username), ) return pwd_hash def send_mail(to_email: str, subject: str, body_text: str, body_html: str | None = None): if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM): raise RuntimeError("Configuration SMTP incomplĂšte (SMTP_HOST/PORT/USER/PASS/FROM).") if not to_email: raise ValueError("Destinataire vide.") msg = EmailMessage() msg["From"] = SMTP_FROM msg["To"] = to_email msg["Date"] = formatdate(localtime=True) msg["Subject"] = subject msg.set_content(body_text) if body_html: msg.add_alternative(body_html, subtype="html") # Choix du protocole en fonction du port if SMTP_PORT == 465: with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=20) as s: s.login(SMTP_USER, SMTP_PASS) s.send_message(msg) else: # 587 (ou autre) : STARTTLS with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as s: s.ehlo() s.starttls() # passe en TLS s.ehlo() s.login(SMTP_USER, SMTP_PASS) s.send_message(msg) def get_user_email_and_field(cnx, username: str, field: str): cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"} if field not in cols: field = "Nom_complet" # fallback with cnx.cursor(dictionary=True) as cur: cur.execute( f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s", (username,) ) row = cur.fetchone() return (row.get("email") if row else None, row.get("field_value") if row else None) def get_user_email_and_field(cnx, username: str, field: str): cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"} if field not in cols: field = "Nom_complet" # fallback with cnx.cursor(dictionary=True) as cur: cur.execute( f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s", (username,) ) row = cur.fetchone() return (row.get("email") if row else None, row.get("field_value") if row else None) # ----------------------- # UI # ----------------------- st.set_page_config(page_title="Acces.Utilisateurs", page_icon="đŸ‘€", layout="wide") st.title("Gestion des utilisateurs") tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "CrĂ©er", "Modifier", "SĂ©curitĂ©"]) # ----------------------- # ONGLET LISTE # ----------------------- with tab_list: st.subheader("Utilisateurs") try: cnx = pool.get_connection() try: include_pw = os.getenv("ADMIN_USER") == st.session_state.get("current_user", os.getenv("ADMIN_USER")) rows = list_users(cnx, limit=1000, include_password=include_pw) finally: cnx.close() df = pd.DataFrame(rows) if not df.empty: from datetime import date df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date # --- Ajout coloration et statut expiration --- ALERTE_JOURS = 30 exp = pd.to_datetime(df["DateExpiration"], errors="coerce") today = pd.Timestamp(date.today()) df["Jours_restant"] = (exp - today).dt.days def statut_expiration(j): if pd.isna(j): return "Inconnu" j = int(j) if j < 0: return "PĂ©rimĂ©" if j <= ALERTE_JOURS: return f"BientĂŽt (≀{ALERTE_JOURS}j)" return "OK" df["Statut"] = df["Jours_restant"].apply(statut_expiration) c1, c2, c3 = st.columns([1, 1, 3]) with c1: only_bad = st.checkbox("📌 PĂ©rimĂ©s / bientĂŽt", value=False) with c2: tri_jours = st.checkbox("đŸ”œ Trier par Jours restants", value=True) if only_bad: df = df[df["Statut"].isin(["PĂ©rimĂ©", f"BientĂŽt (≀{ALERTE_JOURS}j)"])] if tri_jours and "Jours_restant" in df.columns: df = df.sort_values("Jours_restant", na_position="last") def colorize(row): stt = row.get("Statut", "") n = len(row) if stt == "PĂ©rimĂ©": return ["background-color:#ffe6e6; color:#b00020;"] * n # rouge clair if stt.startswith("BientĂŽt"): return ["background-color:#fff7e6; color:#8a6d3b;"] * n # orange clair return [""] * n styled = df.style.apply(colorize, axis=1) st.dataframe( styled, use_container_width=True, hide_index=True, column_config={ "DateExpiration": st.column_config.DateColumn("DateExpiration", format="YYYY-MM-DD"), "Jours_restant": st.column_config.NumberColumn("Jours restants", help="NĂ©gatif = pĂ©rimĂ©"), }, ) st.caption(f"{len(df)} utilisateur(s) affichĂ©(s)") else: st.info("Aucun utilisateur Ă  afficher.") except Exception as e: st.warning(f"Impossible de lister les utilisateurs : {e}") # --- Onglet CrĂ©er --- with tab_create: with st.form("create_user_form", clear_on_submit=False): st.subheader("Nouveau compte") c1, c2, c3 = st.columns([1.2, 1.5, 1]) username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier") full_name = c2.text_input("Nom_complet", placeholder="ClĂ©ment JAQUIER") site = c3.text_input("Site", placeholder="Roissy", value="Roissy") c4, c5, c6 = st.columns([1.4, 1, 1]) email = c4.text_input("email", placeholder="prenom.nom@domaine.com") phone = c5.text_input("TĂ©lĂ©phone", placeholder="06 12 12 35 32") expires = c6.date_input("DateExpiration", value=date.today()) c7, c8 = st.columns(2) password = c7.text_input("Mot de passe", type="password") password2 = c8.text_input("Confirmer", type="password") store_plain = st.checkbox("Stocker le mot de passe en clair (dĂ©conseillĂ©)", value=False) submitted = st.form_submit_button("CrĂ©er l'utilisateur", use_container_width=True) if submitted: if not username or not full_name or not site or not email: st.error("Champs requis manquants.") elif password != password2: st.error("Les mots de passe ne correspondent pas.") else: try: cnx = pool.get_connection() try: pwd_hash = insert_user( cnx, username=username, full_name=full_name, site=site, password=password, expires=expires, phone=phone, email=email, store_plain=store_plain ) finally: cnx.close() st.success("Utilisateur créé avec succĂšs ✅") st.caption("Hash (MotDePasseHash) :") st.code(pwd_hash) except mysql.connector.Error as db_err: if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR: st.error("Identifiants MySQL invalides.") else: st.error(f"Erreur MySQL : {db_err}") except Exception as e: st.error(f"Erreur : {e}") # --- Onglet Modifier --- with tab_edit: st.subheader("Modifier un utilisateur existant") try: cnx = pool.get_connection() try: users = list_users(cnx, limit=1000) finally: cnx.close() usernames = [u["NomUtilisateur"] for u in users] except Exception as e: users = [] usernames = [] st.warning(f"Impossible de charger la liste des utilisateurs : {e}") top_left, top_right = st.columns([1.2, 2]) with top_left: sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur") field = st.selectbox( "Champ Ă  modifier", ["Nom_complet", "Site", "DateExpiration", "Telephone", "email"] ) with top_right: if field == "DateExpiration": new_value = st.date_input("Nouvelle valeur", value=date.today()) else: new_value = st.text_input("Nouvelle valeur") notify_user = st.checkbox("Notifier l’utilisateur par e-mail", value=True, help="Envoie un e-mail si cochĂ©") update_btn = st.button("Mettre Ă  jour", disabled=not sel_user, use_container_width=True) if update_btn and sel_user: try: cnx = pool.get_connection() try: # 1) Lire email + ancienne valeur to_email, old_value = get_user_email_and_field(cnx, sel_user, field) # 2) Appliquer la mise Ă  jour update_field(cnx, sel_user, field, new_value) finally: cnx.close() st.success(f"✅ {field} mis Ă  jour pour {sel_user}") # 3) Notification mail si demandĂ© if notify_user: try: nv = new_value.strftime("%Y-%m-%d") if hasattr(new_value, "strftime") else new_value ov = old_value.strftime("%Y-%m-%d") if hasattr(old_value, "strftime") else old_value if not to_email: st.info("â„č Aucune notification envoyĂ©e : adresse e-mail manquante.") else: subject = f"[Compte] Mise Ă  jour de votre information : {field}" body_txt = ( f"Bonjour,\n\n" f"Votre information '{field}' vient d’ĂȘtre mise Ă  jour par l’administrateur.\n" f"Ancienne valeur : {ov}\n" f"Nouvelle valeur : {nv}\n\n" f"Si vous n’ĂȘtes pas Ă  l’origine de cette demande, rĂ©pondez Ă  cet e-mail.\n" f"Cordialement." ) body_html = f"""

Bonjour,

Votre information {field} vient d’ĂȘtre mise Ă  jour par l’administrateur.

Si vous n’ĂȘtes pas Ă  l’origine de cette demande, rĂ©pondez Ă  cet e-mail.

Cordialement.

""" send_mail(to_email, subject, body_txt, body_html) st.success(f"✉ Notification envoyĂ©e Ă  {to_email}") except Exception as e_mail: st.warning(f"Notification non envoyĂ©e : {e_mail}") except mysql.connector.Error as db_err: st.error(f"Erreur MySQL : {db_err}") except Exception as e: st.error(f"Erreur : {e}") # --- Onglet SĂ©curitĂ© --- with tab_security: st.subheader("RĂ©initialiser le mot de passe (bcrypt)") try: if 'user_cache_for_pw' not in st.session_state: cnx = pool.get_connection() try: st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)] finally: cnx.close() pw_user_list = st.session_state.user_cache_for_pw except Exception: pw_user_list = [] user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur") colp1, colp2 = st.columns(2) new_pw = colp1.text_input("Nouveau mot de passe", type="password") new_pw2 = colp2.text_input("Confirmer", type="password") if st.button("Mettre Ă  jour le mot de passe", disabled=not user_pw, use_container_width=True): if not new_pw: st.error("Mot de passe vide.") elif new_pw != new_pw2: st.error("Les mots de passe ne correspondent pas.") else: try: cnx = pool.get_connection() try: h = update_password(cnx, user_pw, new_pw) finally: cnx.close() st.success("Mot de passe mis Ă  jour ✅") st.caption("Hash (MotDePasseHash) :") st.code(h) except Exception as e: st.error(f"Erreur : {e}")