325 lines
11 KiB
Python
325 lines
11 KiB
Python
# app_users.py — création + modification de champs (sans sidebar, .env only)
|
||
import os
|
||
import re
|
||
from datetime import date, datetime
|
||
|
||
import bcrypt
|
||
import mysql.connector
|
||
from mysql.connector import errorcode
|
||
import streamlit as st
|
||
from dotenv import load_dotenv
|
||
|
||
def require_login():
|
||
load_dotenv()
|
||
admin_user = os.getenv("ADMIN_USER", "admin")
|
||
admin_hash = os.getenv("ADMIN_PASS_HASH")
|
||
|
||
if not admin_hash:
|
||
st.error("ADMIN_PASS_HASH manquant dans .env")
|
||
st.stop()
|
||
|
||
if "auth_ok" not in st.session_state:
|
||
st.session_state.auth_ok = False
|
||
|
||
if not st.session_state.auth_ok:
|
||
st.title("🔐 Accès restreint")
|
||
u = st.text_input("Utilisateur")
|
||
p = st.text_input("Mot de passe", type="password")
|
||
if st.button("Se connecter"):
|
||
if u == admin_user and bcrypt.checkpw(p.encode(), admin_hash.encode()):
|
||
st.session_state.auth_ok = True
|
||
st.rerun() # ✅ nouvelle fonction
|
||
else:
|
||
st.error("Identifiants invalides")
|
||
st.stop()
|
||
|
||
require_login()
|
||
# ======================
|
||
# Connexion MySQL via .env
|
||
# ======================
|
||
@st.cache_resource
|
||
def get_connection():
|
||
load_dotenv()
|
||
host = os.getenv("DB_HOST")
|
||
port = int(os.getenv("MYSQL_PORT", "3306"))
|
||
user = os.getenv("DB_USER")
|
||
pwd = os.getenv("DB_PASSWORD")
|
||
db = os.getenv("DB_NAME")
|
||
|
||
missing = [k for k, v in {
|
||
"MYSQL_USER": user, "MYSQL_PASSWORD": pwd, "MYSQL_HOST": host, "MYSQL_PORT": port, "MYSQL_DATABASE": db
|
||
}.items() if v in (None, "")]
|
||
if missing:
|
||
raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}")
|
||
|
||
return mysql.connector.connect(
|
||
host=host, port=port, user=user, password=pwd, database=db, autocommit=True
|
||
)
|
||
|
||
# ======================
|
||
# Utilitaires
|
||
# ======================
|
||
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
|
||
|
||
def normalize_phone(phone: str | None) -> str | None:
|
||
if not phone:
|
||
return None
|
||
return re.sub(r"[^\d+]", "", phone)
|
||
|
||
def to_sql_date(d: date | str) -> str:
|
||
if isinstance(d, date):
|
||
return d.strftime("%Y-%m-%d")
|
||
# accepte JJ/MM/AAAA
|
||
for fmt in ("%Y-%m-%d", "%d/%m/%Y"):
|
||
try:
|
||
return datetime.strptime(d, fmt).strftime("%Y-%m-%d")
|
||
except ValueError:
|
||
continue
|
||
raise ValueError("Date invalide (attendu: YYYY-MM-DD ou JJ/MM/AAAA)")
|
||
|
||
def hash_password(plain: str, rounds: int = 12) -> str:
|
||
salt = bcrypt.gensalt(rounds=rounds)
|
||
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
|
||
|
||
def user_exists(cursor, username: str, email: str) -> bool:
|
||
cursor.execute(
|
||
"SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s OR email=%s",
|
||
(username, email),
|
||
)
|
||
(count,) = cursor.fetchone()
|
||
return count > 0
|
||
|
||
def insert_user(cnx, username, full_name, site, password, expires, phone, email, store_plain=False):
|
||
if not EMAIL_RE.match(email):
|
||
raise ValueError("Email invalide.")
|
||
phone_norm = normalize_phone(phone)
|
||
exp_sql = to_sql_date(expires)
|
||
pwd_hash = hash_password(password)
|
||
|
||
with cnx.cursor() as cur:
|
||
if user_exists(cur, username, email):
|
||
raise RuntimeError("Nom d'utilisateur ou email déjà existant.")
|
||
if store_plain:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO Utilisateurs
|
||
(NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
|
||
""",
|
||
(username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email),
|
||
)
|
||
else:
|
||
cur.execute(
|
||
"""
|
||
INSERT INTO Utilisateurs
|
||
(NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email)
|
||
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
|
||
""",
|
||
(username, full_name, site, None, pwd_hash, exp_sql, phone_norm, email),
|
||
)
|
||
return pwd_hash
|
||
|
||
def list_users(cnx, limit=200):
|
||
with cnx.cursor(dictionary=True) as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
|
||
FROM Utilisateurs
|
||
ORDER BY NomUtilisateur ASC
|
||
LIMIT %s
|
||
""",
|
||
(limit,),
|
||
)
|
||
return cur.fetchall()
|
||
|
||
def get_user(cnx, username: str):
|
||
with cnx.cursor(dictionary=True) as cur:
|
||
cur.execute(
|
||
"""
|
||
SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
|
||
FROM Utilisateurs
|
||
WHERE NomUtilisateur=%s
|
||
""",
|
||
(username,),
|
||
)
|
||
return cur.fetchone()
|
||
|
||
def update_field(cnx, username: str, field: str, value):
|
||
allowed = {
|
||
"Nom_complet": "Nom_complet",
|
||
"Site": "Site",
|
||
"Telephone": "Telephone",
|
||
"DateExpiration": "DateExpiration",
|
||
"email": "email",
|
||
}
|
||
if field not in allowed:
|
||
raise ValueError("Champ non autorisé.")
|
||
sql_field = allowed[field]
|
||
|
||
# validations
|
||
if field == "email":
|
||
if not EMAIL_RE.match(str(value)):
|
||
raise ValueError("Email invalide.")
|
||
if field == "Telephone":
|
||
value = normalize_phone(str(value)) if value else None
|
||
if field == "DateExpiration":
|
||
value = to_sql_date(value)
|
||
|
||
with cnx.cursor() as cur:
|
||
cur.execute(
|
||
f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s",
|
||
(value, username),
|
||
)
|
||
|
||
def update_password(cnx, username: str, new_password: str):
|
||
pwd_hash = hash_password(new_password)
|
||
with cnx.cursor() as cur:
|
||
cur.execute(
|
||
"UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s",
|
||
(pwd_hash, username),
|
||
)
|
||
return pwd_hash
|
||
|
||
|
||
# ======================
|
||
# UI Streamlit
|
||
# ======================
|
||
st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="centered")
|
||
st.title("Acces.Utilisateurs")
|
||
|
||
# Connexion
|
||
try:
|
||
cnx = get_connection()
|
||
st.caption("Connexion MySQL via `.env` : **OK** ✅")
|
||
except Exception as e:
|
||
st.error(f"Connexion MySQL impossible : {e}")
|
||
st.stop()
|
||
|
||
# --- Création ---
|
||
with st.form("create_user_form", clear_on_submit=False):
|
||
st.subheader("Nouveau compte")
|
||
|
||
col1, col2 = st.columns(2)
|
||
username = col1.text_input("NomUtilisateur", placeholder="ex: cjaquier")
|
||
full_name = col2.text_input("Nom_complet", placeholder="Clément JAQUIER")
|
||
|
||
col3, col4 = st.columns(2)
|
||
site = col3.text_input("Site", placeholder="Roissy", value="Roissy")
|
||
email = col4.text_input("email", placeholder="prenom.nom@domaine.com")
|
||
|
||
col5, col6 = st.columns(2)
|
||
phone = col5.text_input("Téléphone", placeholder="06 12 12 35 32")
|
||
expires = col6.date_input("DateExpiration", value=date.today())
|
||
|
||
col7, col8 = st.columns(2)
|
||
password = col7.text_input("Mot de passe (saisie)", type="password")
|
||
confirm = col8.text_input("Confirmer mot de passe", type="password")
|
||
|
||
store_plain = st.checkbox("Remplir aussi MotDePasse en clair (déconseillé)", value=False)
|
||
submitted = st.form_submit_button("Créer l’utilisateur", use_container_width=True)
|
||
|
||
if submitted:
|
||
if not username or not full_name or not site or not email or not password:
|
||
st.error("Merci de remplir tous les champs obligatoires.")
|
||
elif password != confirm:
|
||
st.error("Les mots de passe ne correspondent pas.")
|
||
else:
|
||
try:
|
||
pwd_hash = insert_user(
|
||
cnx,
|
||
username=username.strip(),
|
||
full_name=full_name.strip(),
|
||
site=site.strip(),
|
||
password=password,
|
||
expires=expires,
|
||
phone=phone.strip() if phone else None,
|
||
email=email.strip(),
|
||
store_plain=store_plain,
|
||
)
|
||
st.success("Utilisateur créé avec succès ✅")
|
||
with st.expander("Voir le hash généré (MotDePasseHash)"):
|
||
st.code(pwd_hash)
|
||
except mysql.connector.Error as db_err:
|
||
if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
|
||
st.error("Identifiants MySQL invalides.")
|
||
else:
|
||
st.error(f"Erreur MySQL : {db_err}")
|
||
except Exception as e:
|
||
st.error(f"Erreur : {e}")
|
||
|
||
st.divider()
|
||
|
||
# --- Modification d'un champ ---
|
||
st.subheader("Modifier un utilisateur existant")
|
||
|
||
# chargement liste
|
||
try:
|
||
users = list_users(cnx, limit=1000)
|
||
usernames = [u["NomUtilisateur"] for u in users]
|
||
except Exception as e:
|
||
users = []
|
||
usernames = []
|
||
st.warning(f"Impossible de charger la liste des utilisateurs : {e}")
|
||
|
||
colA, colB = st.columns([2, 3])
|
||
with colA:
|
||
sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur")
|
||
with colB:
|
||
if sel_user:
|
||
details = get_user(cnx, sel_user)
|
||
if details:
|
||
st.caption(f"Actuel — Nom: **{details['Nom_complet']}**, Site: **{details['Site']}**, "
|
||
f"Expiration: **{details['DateExpiration']}**, Tél: **{details['Telephone']}**, "
|
||
f"Email: **{details['email']}**")
|
||
|
||
col1, col2 = st.columns(2)
|
||
with col1:
|
||
field = st.selectbox("Champ à modifier", ["Nom_complet", "Site", "Telephone", "DateExpiration", "email"])
|
||
with col2:
|
||
# widget adapté au champ
|
||
if field == "DateExpiration":
|
||
new_value = st.date_input("Nouvelle valeur (date)", value=date.today(), key="new_date")
|
||
elif field == "Telephone":
|
||
new_value = st.text_input("Nouvelle valeur (téléphone)", key="new_tel")
|
||
else:
|
||
new_value = st.text_input("Nouvelle valeur", key="new_text")
|
||
|
||
btn_update = st.button("Mettre à jour le champ", type="primary", use_container_width=True, disabled=not sel_user)
|
||
|
||
if btn_update and sel_user:
|
||
try:
|
||
update_field(cnx, sel_user, field, new_value)
|
||
st.success(f"✅ {field} mis à jour pour {sel_user}")
|
||
except mysql.connector.Error as db_err:
|
||
st.error(f"Erreur MySQL : {db_err}")
|
||
except Exception as e:
|
||
st.error(f"Erreur : {e}")
|
||
|
||
# --- Réinitialisation mot de passe (optionnel) ---
|
||
with st.expander("🔐 Réinitialiser le mot de passe (bcrypt)"):
|
||
user_pw = st.selectbox("Utilisateur", usernames, key="pw_user", placeholder="Choisir un utilisateur")
|
||
colp1, colp2 = st.columns(2)
|
||
new_pw = colp1.text_input("Nouveau mot de passe", type="password")
|
||
new_pw2 = colp2.text_input("Confirmer", type="password")
|
||
if st.button("Mettre à jour le mot de passe", disabled=not user_pw):
|
||
if not new_pw:
|
||
st.error("Mot de passe vide.")
|
||
elif new_pw != new_pw2:
|
||
st.error("Les mots de passe ne correspondent pas.")
|
||
else:
|
||
try:
|
||
h = update_password(cnx, user_pw, new_pw)
|
||
st.success("Mot de passe mis à jour ✅")
|
||
st.caption("Hash (MotDePasseHash) :")
|
||
st.code(h)
|
||
except Exception as e:
|
||
st.error(f"Erreur : {e}")
|
||
|
||
st.divider()
|
||
st.subheader("Utilisateurs récents")
|
||
try:
|
||
rows = list_users(cnx, limit=200)
|
||
st.dataframe(rows, use_container_width=True, hide_index=True)
|
||
except Exception as e:
|
||
st.warning(f"Impossible de lister les utilisateurs : {e}")
|