Files
Inventaire-gestion/app/creer_user.py
2025-11-08 08:36:41 +01:00

353 lines
12 KiB
Python

# Streamlit app
import os
from datetime import date, datetime
import re
import bcrypt
import mysql.connector
from mysql.connector import pooling, errorcode
import pandas as pd
import streamlit as st
from dotenv import load_dotenv
# -----------------------
# Auth minimale
# -----------------------
def require_login():
st.markdown(
"<style>div.block-container{padding-top:2rem;}</style>",
unsafe_allow_html=True,
)
load_dotenv()
admin_user = os.getenv("ADMIN_USER")
admin_hash = os.getenv("ADMIN_PASS_HASH")
if not admin_user or not admin_hash:
st.error("Variables ADMIN_USER et/ou ADMIN_PASS_HASH manquantes dans .env")
st.stop()
if "auth_ok" not in st.session_state:
st.session_state.auth_ok = False
if not st.session_state.auth_ok:
col1, col2, col3 = st.columns([1, 2, 1])
with col2:
st.header("🔐 Accès restreint")
u = st.text_input("Utilisateur")
p = st.text_input("Mot de passe", type="password")
if st.button("Se connecter", use_container_width=True):
try:
ok = (u == admin_user) and bcrypt.checkpw(p.encode(), admin_hash.encode())
except Exception:
ok = False
if ok:
st.session_state.auth_ok = True
st.rerun()
else:
st.error("Identifiants invalides.")
st.stop()
require_login()
# -----------------------
# Connexion MySQL via pool
# -----------------------
@st.cache_resource
def get_pool():
load_dotenv()
host = os.getenv("DB_HOST")
port = int(os.getenv("MYSQL_PORT", "3306"))
user = os.getenv("DB_USER")
pwd = os.getenv("DB_PASSWORD")
db = os.getenv("DB_NAME")
missing = [k for k, v in {
"DB_HOST": host, "MYSQL_PORT": port, "DB_USER": user, "DB_PASSWORD": pwd, "DB_NAME": db
}.items() if v in (None, "")]
if missing:
raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}")
return pooling.MySQLConnectionPool(
pool_name="users_pool",
pool_size=5,
pool_reset_session=True,
host=host, port=port, user=user, password=pwd, database=db,
autocommit=True
)
pool = get_pool()
# -----------------------
# Helpers SQL + validations
# -----------------------
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
PHONE_RE = re.compile(r"\d{10,14}")
def normalize_phone(p: str|None) -> str|None:
if not p:
return None
digits = re.sub(r"\D", "", p)
return digits if PHONE_RE.match(digits) else None
def to_sql_date(d: date | str | None) -> str | None:
if d is None:
return None
if isinstance(d, str):
try:
d = datetime.fromisoformat(d).date()
except Exception:
return None
return d.strftime("%Y-%m-%d")
def hash_password(plain: str, rounds: int = 12) -> str:
salt = bcrypt.gensalt(rounds=rounds)
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
def user_exists(cur, username: str, email: str) -> bool:
cur.execute(
"SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s OR email=%s",
(username, email),
)
(count,) = cur.fetchone()
return count > 0
def list_users(cnx, limit: int = 500):
sql = """
SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
FROM Utilisateurs
ORDER BY NomUtilisateur
LIMIT %s
"""
with cnx.cursor(dictionary=True) as cur:
cur.execute(sql, (limit,))
return cur.fetchall()
def insert_user(cnx, username, full_name, site, password, expires, phone, email, store_plain=False):
if not EMAIL_RE.match(email):
raise ValueError("Email invalide.")
phone_norm = normalize_phone(phone)
exp_sql = to_sql_date(expires)
pwd_hash = hash_password(password)
with cnx.cursor() as cur:
if user_exists(cur, username, email):
raise RuntimeError("Nom d'utilisateur ou email déjà existant.")
if store_plain:
cur.execute(
"""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
""",
(username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email),
)
else:
cur.execute(
"""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email)
VALUES (%s,%s,%s,NULL,%s,%s,%s,%s)
""",
(username, full_name, site, pwd_hash, exp_sql, phone_norm, email),
)
return pwd_hash
def get_user_details(cnx, username: str):
with cnx.cursor(dictionary=True) as cur:
cur.execute(
"""
SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
FROM Utilisateurs WHERE NomUtilisateur=%s
""",
(username,),
)
return cur.fetchone()
def update_field(cnx, username: str, field: str, value):
allowed = {
"Nom_complet": "Nom_complet",
"Site": "Site",
"DateExpiration": "DateExpiration",
"Telephone": "Telephone",
"email": "email",
}
if field not in allowed:
raise ValueError("Champ non autorisé.")
sql_field = allowed[field]
if field == "email":
if not EMAIL_RE.match(str(value)):
raise ValueError("Email invalide.")
if field == "Telephone":
value = normalize_phone(str(value)) if value else None
if field == "DateExpiration":
value = to_sql_date(value)
with cnx.cursor() as cur:
cur.execute(
f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s",
(value, username),
)
def update_password(cnx, username: str, new_password: str):
pwd_hash = hash_password(new_password)
with cnx.cursor() as cur:
cur.execute(
"UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s",
(pwd_hash, username),
)
return pwd_hash
# -----------------------
# UI
# -----------------------
st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="wide")
st.title("Gestion des utilisateurs")
tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "Créer", "Modifier", "Sécurité"])
# --- Onglet Liste ---
with tab_list:
st.subheader("Utilisateurs")
try:
cnx = pool.get_connection()
try:
rows = list_users(cnx, limit=1000)
finally:
cnx.close()
df = pd.DataFrame(rows)
if not df.empty:
df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date
st.dataframe(df, use_container_width=True, hide_index=True)
st.caption(f"{len(df)} utilisateur(s)")
else:
st.info("Aucun utilisateur à afficher.")
except Exception as e:
st.warning(f"Impossible de lister les utilisateurs : {e}")
# --- Onglet Créer ---
with tab_create:
with st.form("create_user_form", clear_on_submit=False):
st.subheader("Nouveau compte")
c1, c2, c3 = st.columns([1.2, 1.5, 1])
username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier")
full_name = c2.text_input("Nom_complet", placeholder="Clément JAQUIER")
site = c3.text_input("Site", placeholder="Roissy", value="Roissy")
c4, c5, c6 = st.columns([1.4, 1, 1])
email = c4.text_input("email", placeholder="prenom.nom@domaine.com")
phone = c5.text_input("Téléphone", placeholder="06 12 12 35 32")
expires = c6.date_input("DateExpiration", value=date.today())
c7, c8 = st.columns(2)
password = c7.text_input("Mot de passe", type="password")
password2 = c8.text_input("Confirmer", type="password")
store_plain = st.checkbox("Stocker le mot de passe en clair (déconseillé)", value=False)
submitted = st.form_submit_button("Créer l'utilisateur", use_container_width=True)
if submitted:
if not username or not full_name or not site or not email:
st.error("Champs requis manquants.")
elif password != password2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
cnx = pool.get_connection()
try:
pwd_hash = insert_user(
cnx, username=username, full_name=full_name, site=site,
password=password, expires=expires, phone=phone, email=email,
store_plain=store_plain
)
finally:
cnx.close()
st.success("Utilisateur créé avec succès ✅")
st.caption("Hash (MotDePasseHash) :")
st.code(pwd_hash)
except mysql.connector.Error as db_err:
if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
st.error("Identifiants MySQL invalides.")
else:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
# --- Onglet Modifier ---
with tab_edit:
st.subheader("Modifier un utilisateur existant")
try:
cnx = pool.get_connection()
try:
users = list_users(cnx, limit=1000)
finally:
cnx.close()
usernames = [u["NomUtilisateur"] for u in users]
except Exception as e:
users = []
usernames = []
st.warning(f"Impossible de charger la liste des utilisateurs : {e}")
top_left, top_right = st.columns([1.2, 2])
with top_left:
sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur")
field = st.selectbox(
"Champ à modifier",
["Nom_complet", "Site", "DateExpiration", "Telephone", "email"]
)
with top_right:
if field == "DateExpiration":
new_value = st.date_input("Nouvelle valeur", value=date.today())
else:
new_value = st.text_input("Nouvelle valeur")
update_btn = st.button("Mettre à jour", disabled=not sel_user, use_container_width=True)
if update_btn and sel_user:
try:
cnx = pool.get_connection()
try:
update_field(cnx, sel_user, field, new_value)
finally:
cnx.close()
st.success(f"{field} mis à jour pour {sel_user}")
except mysql.connector.Error as db_err:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
# --- Onglet Sécurité ---
with tab_security:
st.subheader("Réinitialiser le mot de passe (bcrypt)")
try:
if 'user_cache_for_pw' not in st.session_state:
cnx = pool.get_connection()
try:
st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)]
finally:
cnx.close()
pw_user_list = st.session_state.user_cache_for_pw
except Exception:
pw_user_list = []
user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur")
colp1, colp2 = st.columns(2)
new_pw = colp1.text_input("Nouveau mot de passe", type="password")
new_pw2 = colp2.text_input("Confirmer", type="password")
if st.button("Mettre à jour le mot de passe", disabled=not user_pw, use_container_width=True):
if not new_pw:
st.error("Mot de passe vide.")
elif new_pw != new_pw2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
cnx = pool.get_connection()
try:
h = update_password(cnx, user_pw, new_pw)
finally:
cnx.close()
st.success("Mot de passe mis à jour ✅")
st.caption("Hash (MotDePasseHash) :")
st.code(h)
except Exception as e:
st.error(f"Erreur : {e}")