diff --git a/.env b/.env
index 79bcc74..7389ee4 100644
--- a/.env
+++ b/.env
@@ -2,19 +2,9 @@
DB_HOST=162.19.78.131
DB_USER=excel
DB_PASSWORD='%n#%3Lay1MPa$%kR^5@'
-DB_NAME=Sondes
-DB_PORT=3306
-AUTH_USERS=[{"user":"Michel","pass":"210462"}]
-
-# === Connexion SSH pour visualiseur_logs.py ===
-SSH_HOST=162.19.78.131
-SSH_PORT=22
-SSH_USER=debian
-SSH_KEY_PATH=/home/debian/.ssh/id_ed25519
-SSH_KEY_PASSPHRASE='gaby'
-SSH_LOG_DIR=/home/debian/Gestion_sondes/Logs
-SSH_PASSWORD='lpZwixbBUFtGY'
-SSH_FORCE_PASSWORD=1
+DB_NAME=Acces
+ADMIN_USER=Michel
+ADMIN_PASS_HASH='$2b$12$Dgv7jNLJuR.3hQminSVE9OP6hCSmW4nISArR3HF5LTPGFK0Zw29N2'
# connexion OVH pour les SMS
OVH_APP_KEY=f725d07b2f98a195
diff --git a/app/Test_Mysql.py b/app/Test_Mysql.py
index e83a7a7..45559f5 100644
--- a/app/Test_Mysql.py
+++ b/app/Test_Mysql.py
@@ -1,17 +1,11 @@
-import os, mysql.connector
-from dotenv import load_dotenv
-from pathlib import Path
-
-# charge le .env **avec chemin absolu**
-load_dotenv(Path(__file__).resolve().parent.joinpath(".env"))
-
-conn = mysql.connector.connect(
+import mysql.connector, os
+cnx = mysql.connector.connect(
host=os.getenv("DB_HOST"),
+ port=int(os.getenv("DB_PORT", "3306")),
user=os.getenv("DB_USER"),
- password=os.getenv("DB_PASSWORD"),
+ password=os.getenv("DB_PASS"),
database=os.getenv("DB_NAME"),
- connection_timeout=5,
)
-conn.ping(reconnect=True, attempts=3, delay=1)
-print("CONNECTED" if conn.is_connected() else "KO")
-conn.close()
+print("OK, connecté !")
+cnx.close()
+
diff --git a/app/users.py b/app/users.py
index 635a99f..9401dfc 100644
--- a/app/users.py
+++ b/app/users.py
@@ -1,271 +1,398 @@
-# users.py
-# ------------------------------------------------------------
-# Gestion des utilisateurs + droits d'accès par site (DroitsSites)
-# - Streamlit UI
-# - MySQL (mysql-connector-python)
-# - Hash de mot de passe avec bcrypt
-#
-# .env attendu (ou variables d'env) :
-# DB_HOST=...
-# DB_PORT=3306
-# DB_USER=...
-# DB_PASS=...
-# DB_NAME=Acces # <- base "annuaire" qui contient Utilisateurs/DroitsSites/Connexions
-# ------------------------------------------------------------
-
+# Streamlit app
import os
-import re
from datetime import date, datetime
-from typing import List, Tuple, Dict, Optional
-
-import streamlit as st
+import re
+import bcrypt
import mysql.connector
-from mysql.connector.pooling import MySQLConnectionPool
+from mysql.connector import pooling, errorcode
+import pandas as pd
+import streamlit as st
+from dotenv import load_dotenv
+import smtplib
+from email.message import EmailMessage
+from email.utils import formatdate
-try:
- from dotenv import load_dotenv # facultatif mais pratique
+load_dotenv()
+SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net")
+SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # OVH: 465 (SSL) ou 587 (STARTTLS)
+SMTP_USER = os.getenv("SMTP_USER")
+SMTP_PASS = os.getenv("SMTP_PASS")
+SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER)
+
+# -----------------------
+# Auth minimale
+# -----------------------
+def require_login():
+ st.markdown(
+ "",
+ unsafe_allow_html=True,
+ )
load_dotenv()
-except Exception:
- pass
+ admin_user = os.getenv("ADMIN_USER")
+ admin_hash = os.getenv("ADMIN_PASS_HASH")
+ if not admin_user or not admin_hash:
+ st.error("Variables ADMIN_USER et/ou ADMIN_PASS_HASH manquantes dans .env")
+ st.stop()
-try:
- import bcrypt # pip install bcrypt
-except Exception:
- bcrypt = None
+ if "auth_ok" not in st.session_state:
+ st.session_state.auth_ok = False
-# ---------------------------
-# Config / Connexion MySQL
-# ---------------------------
-DB_HOST = os.getenv("DB_HOST", "localhost")
-DB_PORT = int(os.getenv("DB_PORT", "3306"))
-DB_USER = os.getenv("DB_USER", "root")
-DB_PASS = os.getenv("DB_PASS", "")
-DB_NAME = os.getenv("DB_NAME", "Acces")
+ if not st.session_state.auth_ok:
+ col1, col2, col3 = st.columns([1, 2, 1])
+ with col2:
+ st.header("🔐 Accès restreint")
+ u = st.text_input("Utilisateur")
+ p = st.text_input("Mot de passe", type="password")
+ if st.button("Se connecter", use_container_width=True):
+ try:
+ ok = (u == admin_user) and bcrypt.checkpw(p.encode(), admin_hash.encode())
+ except Exception:
+ ok = False
+ if ok:
+ st.session_state.auth_ok = True
+ st.rerun()
+ else:
+ st.error("Identifiants invalides.")
+ st.stop()
+def logout_button():
+ st.markdown(
+ """
+
+ """,
+ unsafe_allow_html=True
+ )
+ st.markdown('
', unsafe_allow_html=True)
+ if st.button("🚪 Quitter", key="logout", use_container_width=False):
+ st.session_state.clear()
+ st.success("Déconnexion effectuée.")
+ st.rerun()
+ st.markdown('
', unsafe_allow_html=True)
-pool: Optional[MySQLConnectionPool] = None
-def init_pool():
- global pool
- if pool is None:
- pool = MySQLConnectionPool(
- pool_name="users_pool",
- pool_size=5,
- host=DB_HOST,
- port=DB_PORT,
- user=DB_USER,
- password=DB_PASS,
- database=DB_NAME,
- autocommit=True,
- charset="utf8mb4",
- collation="utf8mb4_general_ci",
- )
-init_pool()
+require_login()
+logout_button()
+# -----------------------
+# Connexion MySQL via pool
+# -----------------------
+@st.cache_resource
+def get_pool():
+ load_dotenv()
+ host = os.getenv("DB_HOST")
+ port = int(os.getenv("MYSQL_PORT", "3306"))
+ user = os.getenv("DB_USER")
+ pwd = os.getenv("DB_PASSWORD")
+ db = os.getenv("DB_NAME")
+ missing = [k for k, v in {
+ "DB_HOST": host, "MYSQL_PORT": port, "DB_USER": user, "DB_PASSWORD": pwd, "DB_NAME": db
+ }.items() if v in (None, "")]
+ if missing:
+ raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}")
+ return pooling.MySQLConnectionPool(
+ pool_name="users_pool",
+ pool_size=5,
+ pool_reset_session=True,
+ host=host, port=port, user=user, password=pwd, database=db,
+ autocommit=True
+ )
+
+pool = get_pool()
+
+# -----------------------
+# Helpers SQL + validations
+# -----------------------
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
+PHONE_RE = re.compile(r"\d{10,14}")
+def normalize_phone(p: str|None) -> str|None:
+ if not p:
+ return None
+ digits = re.sub(r"\D", "", p)
+ return digits if PHONE_RE.match(digits) else None
-# ---------------------------
-# Utils Sécurité
-# ---------------------------
-def hash_password(plain: str) -> str:
- if not bcrypt:
- raise RuntimeError("Le module 'bcrypt' est requis (pip install bcrypt).")
- salt = bcrypt.gensalt(rounds=12)
+def to_sql_date(d: date | str | None) -> str | None:
+ if d is None:
+ return None
+ if isinstance(d, str):
+ try:
+ d = datetime.fromisoformat(d).date()
+ except Exception:
+ return None
+ return d.strftime("%Y-%m-%d")
+
+def hash_password(plain: str, rounds: int = 12) -> str:
+ salt = bcrypt.gensalt(rounds=rounds)
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
-
-# ---------------------------
-# Accès SQL - Helpers
-# ---------------------------
-def list_all_sites(cnx) -> List[Tuple[str, str]]:
- """Retourne [(dsn, bdd), ...] depuis Connexions."""
- cur = cnx.cursor()
+def user_exists(cur, username: str) -> bool:
+ cur.execute("SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s", (username,))
+ (count,) = cur.fetchone()
+ return count > 0
+def find_users_by_email(cnx, email: str):
+ cur = cnx.cursor(dictionary=True)
try:
- cur.execute("SELECT DSN, BDD FROM Connexions ORDER BY BDD")
+ cur.execute(
+ "SELECT NomUtilisateur, Site FROM Utilisateurs WHERE email=%s ORDER BY NomUtilisateur",
+ (email,),
+ )
+ return cur.fetchall()
+ finally:
+ cur.close()
+def list_users(cnx, limit: int = 500, include_password=False):
+ fields = ["NomUtilisateur", "Nom_complet", "Site", "DateExpiration", "Telephone", "email"]
+ if include_password:
+ fields.append("MotDePasse")
+ sql = f"SELECT {', '.join(fields)} FROM Utilisateurs ORDER BY NomUtilisateur LIMIT %s"
+
+ cur = cnx.cursor(dictionary=True)
+ try:
+ cur.execute(sql, (limit,))
return cur.fetchall()
finally:
cur.close()
-def insert_user(cnx, username: str, full_name: str, pwd_hash: str,
- expires: date, phone: str, email: str, role: str,
- site_legacy: Optional[str] = None) -> None:
- """
- Insère un utilisateur.
- - 'Site' est obsolète : on le laisse à NULL/'' si la colonne existe encore.
- """
+def insert_user(cnx, username, full_name, site, password, expires, phone, email, role):
+ if not EMAIL_RE.match(email):
+ raise ValueError("Email invalide.")
+ phone_norm = normalize_phone(phone)
+ exp_sql = to_sql_date(expires)
+ pwd_hash = hash_password(password)
+
cur = cnx.cursor()
try:
- # Vérifie si la colonne Site existe (schéma en transition)
- cur.execute("SHOW COLUMNS FROM Utilisateurs LIKE 'Site'")
- has_site_col = cur.fetchone() is not None
-
- if has_site_col:
- cur.execute("""
- INSERT INTO Utilisateurs
- (NomUtilisateur, Nom_complet, MotDePasseHash, DateExpiration,
- Telephone, email, role )
- VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
- """, (username, full_name, pwd_hash, expires, phone, email, role, site_legacy or ""))
- else:
- cur.execute("""
- INSERT INTO Utilisateurs
- (NomUtilisateur, Nom_complet, MotDePasseHash, DateExpiration,
- Telephone, email, role)
- VALUES (%s, %s, %s, %s, %s, %s, %s)
- """, (username, full_name, pwd_hash, expires, phone, email, role))
+ if user_exists(cur, username):
+ raise RuntimeError("Nom d'utilisateur déjà existant.")
+ cur.execute(
+ """
+ INSERT INTO Utilisateurs
+ (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email, role)
+ VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
+ """,
+ (username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email, role),
+ )
+ return pwd_hash
finally:
cur.close()
-def update_user_core(cnx, username: str, full_name: str,
- expires: date, phone: str, email: str, role: str) -> None:
- cur = cnx.cursor()
- try:
- cur.execute("""
- UPDATE Utilisateurs
- SET Nom_complet=%s,
- DateExpiration=%s,
- Telephone=%s,
- email=%s,
- role=%s
- WHERE NomUtilisateur=%s
- """, (full_name, expires, phone, email, role, username))
- finally:
- cur.close()
-def update_user_password(cnx, username: str, pwd_hash: str) -> None:
- cur = cnx.cursor()
- try:
- cur.execute("""
- UPDATE Utilisateurs
- SET MotDePasseHash=%s
- WHERE NomUtilisateur=%s
- """, (pwd_hash, username))
- finally:
- cur.close()
-def list_users(cnx) -> List[Dict]:
+def get_user_details(cnx, username: str):
cur = cnx.cursor(dictionary=True)
try:
- cur.execute("""
- SELECT u.NomUtilisateur, u.Nom_complet, u.role, u.DateExpiration,
- u.Telephone, u.email
- FROM Utilisateurs u
- ORDER BY u.NomUtilisateur
- """)
- return cur.fetchall()
- finally:
- cur.close()
-
-def get_user(cnx, username: str) -> Optional[Dict]:
- cur = cnx.cursor(dictionary=True)
- try:
- cur.execute("""
- SELECT u.NomUtilisateur, u.Nom_complet, u.role, u.DateExpiration,
- u.Telephone, u.email
- FROM Utilisateurs u
- WHERE u.NomUtilisateur=%s
- """, (username,))
+ cur.execute(
+ """
+ SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
+ FROM Utilisateurs
+ WHERE NomUtilisateur=%s
+ """,
+ (username,),
+ )
return cur.fetchone()
finally:
cur.close()
-def delete_user(cnx, username: str) -> None:
+def update_field(cnx, username: str, field: str, value):
+ allowed = {
+ "Nom_complet": "Nom_complet",
+ "Site": "Site",
+ "DateExpiration": "DateExpiration",
+ "Telephone": "Telephone",
+ "email": "email",
+ }
+ if field not in allowed:
+ raise ValueError("Champ non autorisé.")
+ sql_field = allowed[field]
+
+ if field == "email":
+ if not EMAIL_RE.match(str(value)):
+ raise ValueError("Email invalide.")
+ if field == "Telephone":
+ value = normalize_phone(str(value)) if value else None
+ if field == "DateExpiration":
+ value = to_sql_date(value)
+
cur = cnx.cursor()
try:
- # Nettoie d'abord les droits si contrainte FK
- cur.execute("DELETE FROM DroitsSites WHERE NomUtilisateur=%s", (username,))
- cur.execute("DELETE FROM Utilisateurs WHERE NomUtilisateur=%s", (username,))
+ cur.execute(
+ f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s",
+ (value, username),
+ )
finally:
cur.close()
-def get_user_sites(cnx, username: str) -> List[str]:
- """Retourne la liste des DSN autorisés pour l'utilisateur."""
+def update_password(cnx, username: str, new_password: str):
+ pwd_hash = hash_password(new_password)
cur = cnx.cursor()
try:
- cur.execute("""
- SELECT d.DSN
- FROM DroitsSites d
- WHERE d.NomUtilisateur=%s
- ORDER BY d.DSN
- """, (username,))
- return [r[0] for r in cur.fetchall()]
+ cur.execute(
+ "UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s",
+ (pwd_hash, username),
+ )
+ return pwd_hash
finally:
cur.close()
-def grant_sites_to_user(cnx, username: str, dsns: Optional[List[str]], grant_all: bool = False) -> None:
- """
- Alimente DroitsSites pour un utilisateur.
- - grant_all=True : accorde tous les DSN présents dans Connexions
- - sinon : insère uniquement les DSN de la liste
- """
- cur = cnx.cursor()
+def send_mail(to_email: str, subject: str, body_text: str, body_html: str | None = None):
+ if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM):
+ raise RuntimeError("Configuration SMTP incomplète (SMTP_HOST/PORT/USER/PASS/FROM).")
+ if not to_email:
+ raise ValueError("Destinataire vide.")
+
+ msg = EmailMessage()
+ msg["From"] = SMTP_FROM
+ msg["To"] = to_email
+ msg["Date"] = formatdate(localtime=True)
+ msg["Subject"] = subject
+ msg.set_content(body_text)
+ if body_html:
+ msg.add_alternative(body_html, subtype="html")
+
+ # Choix du protocole en fonction du port
+ if SMTP_PORT == 465:
+ with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=20) as s:
+ s.login(SMTP_USER, SMTP_PASS)
+ s.send_message(msg)
+ else:
+ # 587 (ou autre) : STARTTLS
+ with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as s:
+ s.ehlo()
+ s.starttls() # passe en TLS
+ s.ehlo()
+ s.login(SMTP_USER, SMTP_PASS)
+ s.send_message(msg)
+
+ def get_user_email_and_field(cnx, username: str, field: str):
+ cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"}
+ if field not in cols:
+ field = "Nom_complet" # fallback
+ with cnx.cursor(dictionary=True) as cur:
+ cur.execute(
+ f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s",
+ (username,)
+ )
+ row = cur.fetchone()
+ return (row.get("email") if row else None, row.get("field_value") if row else None)
+
+def get_user_email_and_field(cnx, username: str, field: str):
+ cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"}
+ if field not in cols:
+ field = "Nom_complet" # fallback
+
+ cur = cnx.cursor(dictionary=True)
try:
- if grant_all:
- cur.execute("""
- INSERT IGNORE INTO DroitsSites (NomUtilisateur, Site, DSN)
- SELECT %s, c.BDD, c.DSN FROM Connexions c
- """, (username,))
- else:
- if not dsns:
- return
- cur.executemany("""
- INSERT IGNORE INTO DroitsSites (NomUtilisateur, Site, DSN)
- SELECT %s, c.BDD, c.DSN FROM Connexions c WHERE c.DSN=%s
- """, [(username, d) for d in dsns])
+ cur.execute(
+ f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s",
+ (username,),
+ )
+ row = cur.fetchone()
+ return (row.get("email") if row else None,
+ row.get("field_value") if row else None)
finally:
cur.close()
+# -----------------------
+# UI
+# -----------------------
+st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="wide")
+st.title("Gestion des utilisateurs")
-def replace_user_sites(cnx, username: str, dsns: Optional[List[str]], grant_all: bool = False) -> None:
- """Remplace complètement les droits de l'utilisateur par la nouvelle sélection."""
- cur = cnx.cursor()
+tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "Créer", "Modifier", "Sécurité"])
+
+# -----------------------
+# ONGLET LISTE
+# -----------------------
+with tab_list:
+
+ st.subheader("Utilisateurs")
try:
- cur.execute("DELETE FROM DroitsSites WHERE NomUtilisateur=%s", (username,))
- finally:
- cur.close()
- grant_sites_to_user(cnx, username, dsns, grant_all)
-
-
-# ---------------------------
-# UI Streamlit
-# ---------------------------
-st.set_page_config(page_title="Gestion des utilisateurs", page_icon="👤", layout="centered")
-st.title("👤 Gestion des utilisateurs")
-
-tabs = st.tabs(["Créer", "Modifier", "Lister / Supprimer"])
-
-# --------- Onglet CRÉER ----------
-with tabs[0]:
- st.subheader("Créer un utilisateur")
-
- with st.form("create_user_form", clear_on_submit=False):
- c1, c2 = st.columns([1.2, 1.8])
- username = c1.text_input("NomUtilisateur", placeholder="ex: michel")
- full_name = c2.text_input("Nom_complet", placeholder="Michel DUPONT")
-
- role = st.selectbox("Rôle", ["utilisateur", "commercial", "administrateur", "admin"], index=0)
-
- # Sélection des sites
- labels, label_to_dsn = [], {}
+ cnx = pool.get_connection()
try:
- cnx = pool.get_connection()
- try:
- rows = list_all_sites(cnx) # [(dsn,bdd)]
- finally:
- cnx.close()
- labels = [f"{bdd} ({dsn})" for dsn, bdd in rows]
- label_to_dsn = {lbl: dsn for (dsn, bdd), lbl in zip(rows, labels)}
- except Exception as e:
- st.warning(f"Chargement des sites impossible : {e}")
+ include_pw = os.getenv("ADMIN_USER") == st.session_state.get("current_user", os.getenv("ADMIN_USER"))
+ rows = list_users(cnx, limit=1000, include_password=include_pw)
+ finally:
+ cnx.close()
- colL, colR = st.columns([2, 1])
- with colL:
- selected_labels = st.multiselect("Sites autorisés", labels, help="Choisis un ou plusieurs sites")
- with colR:
- grant_all_sites = st.checkbox("Tous les sites", value=False)
+ df = pd.DataFrame(rows)
+ if not df.empty:
+ from datetime import date
+ df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date
- # Si administrateur → tous les sites
- if role.lower().startswith("admin"):
- grant_all_sites = True
+ # --- Ajout coloration et statut expiration ---
+ ALERTE_JOURS = 30
+ exp = pd.to_datetime(df["DateExpiration"], errors="coerce")
+ today = pd.Timestamp(date.today())
+
+ df["Jours_restant"] = (exp - today).dt.days
+
+ def statut_expiration(j):
+ if pd.isna(j):
+ return "Inconnu"
+ j = int(j)
+ if j < 0:
+ return "Périmé"
+ if j <= ALERTE_JOURS:
+ return f"Bientôt (≤{ALERTE_JOURS}j)"
+ return "OK"
+
+ df["Statut"] = df["Jours_restant"].apply(statut_expiration)
+
+ c1, c2, c3 = st.columns([1, 1, 3])
+ with c1:
+ only_bad = st.checkbox("📌 Périmés / bientôt", value=False)
+ with c2:
+ tri_jours = st.checkbox("🔽 Trier par Jours restants", value=True)
+
+ if only_bad:
+ df = df[df["Statut"].isin(["Périmé", f"Bientôt (≤{ALERTE_JOURS}j)"])]
+
+ if tri_jours and "Jours_restant" in df.columns:
+ df = df.sort_values("Jours_restant", na_position="last")
+
+ def colorize(row):
+ stt = row.get("Statut", "")
+ n = len(row)
+ if stt == "Périmé":
+ return ["background-color:#ffe6e6; color:#b00020;"] * n # rouge clair
+ if stt.startswith("Bientôt"):
+ return ["background-color:#fff7e6; color:#8a6d3b;"] * n # orange clair
+ return [""] * n
+
+ styled = df.style.apply(colorize, axis=1)
+
+ st.dataframe(
+ styled,
+ use_container_width=True,
+ hide_index=True,
+ height=700, # ⇐ affiche ~20 lignes sans scroller
+ column_config={
+ "DateExpiration": st.column_config.DateColumn("DateExpiration", format="YYYY-MM-DD"),
+ "Jours_restant": st.column_config.NumberColumn("Jours restants", help="Négatif = périmé"),
+ },
+ )
+ st.caption(f"{len(df)} utilisateur(s) affiché(s)")
+ else:
+ st.info("Aucun utilisateur à afficher.")
+ except Exception as e:
+ st.warning(f"Impossible de lister les utilisateurs : {e}")
+
+# -----------------------
+# ONGLET CREER
+# -----------------------
+
+with tab_create:
+ with st.form("create_user_form", clear_on_submit=False):
+ st.subheader("Nouveau compte")
+
+ c1, c2, c3 = st.columns([1.2, 1.5, 1])
+ username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier")
+ full_name = c2.text_input("Nom_complet", placeholder="Clément JAQUIER")
+ site = c3.text_input("Site", placeholder="Roissy", value="Roissy")
+ role = st.selectbox("Rôle", ["Utilisateur", "Administrateur"], index=0)
c4, c5, c6 = st.columns([1.4, 1, 1])
email = c4.text_input("email", placeholder="prenom.nom@domaine.com")
@@ -273,191 +400,206 @@ with tabs[0]:
expires = c6.date_input("DateExpiration", value=date.today())
c7, c8 = st.columns(2)
- password = c7.text_input("Mot de passe", type="password")
- password2 = c8.text_input("Confirmer", type="password")
+ password = c7.text_input("Mot de passe", type="password")
+ password2 = c8.text_input("Confirmer", type="password")
+
+ col_cb1, col_cb2 = st.columns([1.2, 1])
+ notify_welcome = col_cb2.checkbox("Envoyer un e-mail de bienvenue", value=True,
+ help="Enverra l'identifiant, le nom, le site et le mot de passe en clair")
submitted = st.form_submit_button("Créer l'utilisateur", use_container_width=True)
if submitted:
- if not username or not full_name or not email:
+ if not username or not full_name or not site or not email:
st.error("Champs requis manquants.")
elif not EMAIL_RE.match(email):
st.error("Format d’e-mail invalide.")
- elif not password:
- st.error("Veuillez saisir un mot de passe.")
elif password != password2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
cnx = pool.get_connection()
try:
- # Hash
- pwd_hash = hash_password(password)
+ # 🔎 avertir si l'e-mail existe déjà (mais on n'empêche pas)
+ dup = find_users_by_email(cnx, email)
+ if dup:
+ liste = ", ".join(f"{u['NomUtilisateur']}@{u['Site']}" for u in dup)
+ st.info(f"Cet e-mail est déjà utilisé par : {liste}")
- # Option 'site_legacy' : garde vide (schéma en transition)
- insert_user(
- cnx,
- username=username,
- full_name=full_name,
- pwd_hash=pwd_hash,
- expires=expires,
- phone=phone,
- email=email,
- role=role,
- site_legacy=None,
+ pwd_hash = insert_user(
+ cnx, username=username, full_name=full_name, site=site,
+ password=password, expires=expires, phone=phone, email=email, role=role
)
-
- # Droits
- chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels] if selected_labels else []
- grant_sites_to_user(cnx, username=username, dsns=chosen_dsns, grant_all=grant_all_sites)
finally:
cnx.close()
- st.success("Utilisateur créé ✅")
- st.caption("Hash enregistré (MotDePasseHash) :")
+ st.success("Utilisateur créé avec succès ✅")
+ st.caption("Hash (MotDePasseHash) :")
st.code(pwd_hash)
+
+ # ✉️ Mail de bienvenue (optionnel)
+ if notify_welcome:
+ try:
+ subj = f"[Compte créé] Vos accès — {site}"
+ body_txt = (
+ "Bonjour,\n\n"
+ "Votre compte a été créé.\n\n"
+ f"Nom d’utilisateur : {username}\n"
+ f"Nom complet : {full_name}\n"
+ f"Site : {site}\n"
+ f"Mot de passe : {password}\n"
+ f"Date d'expiration: {expires.strftime('%Y-%m-%d')}\n\n"
+ "Cordialement."
+ )
+ body_html = f"""
+
+ Bonjour,
+ Votre compte a été créé.
+
+ | Nom d’utilisateur | {username} |
+ | Nom complet | {full_name} |
+ | Site | {site} |
+ | Mot de passe | {password} |
+ | Date d'expiration | {expires.strftime('%Y-%m-%d')} |
+
+ Cordialement.
+
+ """
+ send_mail(email, subj, body_txt, body_html)
+ st.success(f"✉️ E-mail de bienvenue envoyé à {email}")
+ except Exception as e_mail:
+ st.warning(f"E-mail non envoyé : {e_mail}")
+
except mysql.connector.Error as db_err:
- st.error(f"Erreur MySQL : {db_err}")
+ if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
+ st.error("Identifiants MySQL invalides.")
+ else:
+ st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
-# --------- Onglet MODIFIER ----------
-with tabs[1]:
- st.subheader("Modifier un utilisateur")
+# -----------------------
+# ONGLET MODIFIER
+# -----------------------
- usernames = []
+with tab_edit:
+ st.subheader("Modifier un utilisateur existant")
try:
cnx = pool.get_connection()
try:
- users = list_users(cnx)
+ users = list_users(cnx, limit=1000)
finally:
cnx.close()
usernames = [u["NomUtilisateur"] for u in users]
except Exception as e:
- st.warning(f"Chargement des utilisateurs impossible : {e}")
+ users = []
+ usernames = []
+ st.warning(f"Impossible de charger la liste des utilisateurs : {e}")
- target = st.selectbox("Choisir l'utilisateur", [""] + usernames, index=0)
- if target:
- try:
- cnx = pool.get_connection()
- try:
- u = get_user(cnx, target)
- rows = list_all_sites(cnx) # [(dsn,bdd)]
- current_dsns = set(get_user_sites(cnx, target))
- finally:
- cnx.close()
+ top_left, top_right = st.columns([1.2, 2])
+ with top_left:
+ sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur")
+ field = st.selectbox(
+ "Champ à modifier",
+ ["Nom_complet", "Site", "DateExpiration", "Telephone", "email"]
+ )
- if not u:
- st.error("Utilisateur introuvable.")
- else:
- labels = [f"{bdd} ({dsn})" for dsn, bdd in rows]
- label_to_dsn = {lbl: dsn for (dsn, bdd), lbl in zip(rows, labels)}
- # pré-sélection
- preselected = [lbl for lbl in labels if label_to_dsn[lbl] in current_dsns]
-
- with st.form("edit_user_form", clear_on_submit=False):
- c1, c2 = st.columns([1.2, 1.8])
- full_name = c2.text_input("Nom_complet", value=u["Nom_complet"] or "")
- role = c1.selectbox("Rôle", ["utilisateur", "commercial", "administrateur", "admin"],
- index=0 if (u["role"] or "").lower() not in ["commercial", "administrateur", "admin"]
- else ["utilisateur","commercial","administrateur","admin"].index((u["role"] or "").lower()))
- c4, c5, c6 = st.columns([1.4, 1, 1])
- email = c4.text_input("email", value=u["email"] or "")
- phone = c5.text_input("Téléphone", value=u["Telephone"] or "")
- exp_val = u["DateExpiration"].date() if isinstance(u["DateExpiration"], datetime) else (u["DateExpiration"] or date.today())
- expires = c6.date_input("DateExpiration", value=exp_val)
-
- st.markdown("**Droits d’accès aux sites**")
- colL, colR = st.columns([2, 1])
- with colL:
- selected_labels = st.multiselect("Sites autorisés", labels, default=preselected)
- with colR:
- grant_all_sites = st.checkbox("Tous les sites", value=False)
-
- st.divider()
- cpass1, cpass2 = st.columns(2)
- new_pass = cpass1.text_input("Nouveau mot de passe (optionnel)", type="password")
- new_pass2 = cpass2.text_input("Confirmer", type="password")
-
- cbtn1, cbtn2, cbtn3 = st.columns([1.2,1,1])
- save_btn = cbtn1.form_submit_button("Enregistrer")
- reset_btn = cbtn2.form_submit_button("Réinitialiser mot de passe")
- replace_btn = cbtn3.form_submit_button("Remplacer droits")
-
- if save_btn:
- if not EMAIL_RE.match(email):
- st.error("Format d’e-mail invalide.")
- else:
- try:
- cnx = pool.get_connection()
- try:
- update_user_core(cnx, target, full_name, expires, phone, email, role)
- # met à jour droits (ajout complémentaire, sans retirer)
- chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels]
- grant_sites_to_user(cnx, target, chosen_dsns, grant_all_sites)
- finally:
- cnx.close()
- st.success("Modifications enregistrées ✅")
- except Exception as e:
- st.error(f"Erreur : {e}")
-
- if reset_btn:
- if not new_pass:
- st.error("Saisis un nouveau mot de passe.")
- elif new_pass != new_pass2:
- st.error("Les mots de passe ne correspondent pas.")
- else:
- try:
- cnx = pool.get_connection()
- try:
- update_user_password(cnx, target, hash_password(new_pass))
- finally:
- cnx.close()
- st.success("Mot de passe réinitialisé ✅")
- except Exception as e:
- st.error(f"Erreur : {e}")
-
- if replace_btn:
- try:
- cnx = pool.get_connection()
- try:
- chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels]
- replace_user_sites(cnx, target, chosen_dsns, grant_all_sites)
- finally:
- cnx.close()
- st.success("Droits remplacés ✅")
- except Exception as e:
- st.error(f"Erreur : {e}")
-
- except Exception as e:
- st.error(f"Erreur de chargement : {e}")
-
-# --------- Onglet LISTE / SUPPR ----------
-with tabs[2]:
- st.subheader("Liste des utilisateurs")
- try:
- cnx = pool.get_connection()
- try:
- data = list_users(cnx)
- finally:
- cnx.close()
- if not data:
- st.info("Aucun utilisateur.")
+ with top_right:
+ if field == "DateExpiration":
+ new_value = st.date_input("Nouvelle valeur", value=date.today())
else:
- st.dataframe(data, use_container_width=True, hide_index=True)
- except Exception as e:
- st.error(f"Erreur : {e}")
-
- st.divider()
- st.subheader("Supprimer un utilisateur")
- del_user = st.text_input("NomUtilisateur à supprimer", value="")
- if st.button("Supprimer", type="primary", disabled=(not del_user)):
+ new_value = st.text_input("Nouvelle valeur")
+ notify_user = st.checkbox("Notifier l’utilisateur par e-mail", value=True, help="Envoie un e-mail si coché")
+ update_btn = st.button("Mettre à jour", disabled=not sel_user, use_container_width=True)
+ if update_btn and sel_user:
try:
cnx = pool.get_connection()
try:
- delete_user(cnx, del_user)
+ # 1) Lire email + ancienne valeur
+ to_email, old_value = get_user_email_and_field(cnx, sel_user, field)
+
+ # 2) Appliquer la mise à jour
+ update_field(cnx, sel_user, field, new_value)
finally:
cnx.close()
- st.success(f"Utilisateur '{del_user}' supprimé ✅")
+
+ st.success(f"✅ {field} mis à jour pour {sel_user}")
+
+ # 3) Notification mail si demandé
+ if notify_user:
+ try:
+ nv = new_value.strftime("%Y-%m-%d") if hasattr(new_value, "strftime") else new_value
+ ov = old_value.strftime("%Y-%m-%d") if hasattr(old_value, "strftime") else old_value
+
+ if not to_email:
+ st.info("ℹ️ Aucune notification envoyée : adresse e-mail manquante.")
+ else:
+ subject = f"[Compte] Mise à jour de votre information : {field}"
+ body_txt = (
+ f"Bonjour,\n\n"
+ f"Votre information '{field}' vient d’être mise à jour par l’administrateur.\n"
+ f"Ancienne valeur : {ov}\n"
+ f"Nouvelle valeur : {nv}\n\n"
+ f"Si vous n’êtes pas à l’origine de cette demande, répondez à cet e-mail.\n"
+ f"Cordialement."
+ )
+ body_html = f"""
+
+ Bonjour,
+ Votre information {field} vient d’être mise à jour par l’administrateur.
+
+ - Ancienne valeur : {ov}
+ - Nouvelle valeur : {nv}
+
+ Si vous n’êtes pas à l’origine de cette demande, répondez à cet e-mail.
+ Cordialement.
+
+ """
+ send_mail(to_email, subject, body_txt, body_html)
+ st.success(f"✉️ Notification envoyée à {to_email}")
+ except Exception as e_mail:
+ st.warning(f"Notification non envoyée : {e_mail}")
+
+ except mysql.connector.Error as db_err:
+ st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
+# -----------------------
+# ONGLET SECURITE
+# -----------------------
+
+with tab_security:
+ st.subheader("Réinitialiser le mot de passe (bcrypt)")
+ try:
+ if 'user_cache_for_pw' not in st.session_state:
+ cnx = pool.get_connection()
+ try:
+ st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)]
+ finally:
+ cnx.close()
+ pw_user_list = st.session_state.user_cache_for_pw
+ except Exception:
+ pw_user_list = []
+
+ user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur")
+ colp1, colp2 = st.columns(2)
+ new_pw = colp1.text_input("Nouveau mot de passe", type="password")
+ new_pw2 = colp2.text_input("Confirmer", type="password")
+
+ if st.button("Mettre à jour le mot de passe", disabled=not user_pw, use_container_width=True):
+ if not new_pw:
+ st.error("Mot de passe vide.")
+ elif new_pw != new_pw2:
+ st.error("Les mots de passe ne correspondent pas.")
+ else:
+ try:
+ cnx = pool.get_connection()
+ try:
+ h = update_password(cnx, user_pw, new_pw)
+ finally:
+ cnx.close()
+ st.success("Mot de passe mis à jour ✅")
+ st.caption("Hash (MotDePasseHash) :")
+ st.code(h)
+ except Exception as e:
+ st.error(f"Erreur : {e}")
\ No newline at end of file