Files
Inventaire-gestion/app/creer_user.py

428 lines
15 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# app_users.py — création + modification de champs (wide + onglets + grille)
import os
import re
from datetime import date, datetime
import bcrypt
import mysql.connector
from mysql.connector import errorcode
import streamlit as st
from dotenv import load_dotenv
# -----------------------
# Auth minimale
# -----------------------
def require_login():
st.markdown(
"""
<style>
body {
background-color: #f2f2f2;
}
</style>
""",
unsafe_allow_html=True
)
load_dotenv()
admin_user = os.getenv("ADMIN_USER", "admin")
admin_hash = os.getenv("ADMIN_PASS_HASH")
if not admin_hash:
st.error("ADMIN_PASS_HASH manquant dans .env")
st.stop()
if "auth_ok" not in st.session_state:
st.session_state.auth_ok = False
if not st.session_state.auth_ok:
st.markdown("<h2 style='text-align:center;'>🔐 Accès restreint</h2>", unsafe_allow_html=True)
# conteneur centré
login_left, login_center, login_right = st.columns([1, 2, 1])
with login_center:
st.markdown(
"""
<div style='border:1px solid #ddd; border-radius:12px; padding:2rem; background:#fafafa;'>
""",
unsafe_allow_html=True
)
u = st.text_input("Utilisateur", key="login_user")
p = st.text_input("Mot de passe", type="password", key="login_pass")
if st.button("Se connecter", use_container_width=True):
if u == admin_user and bcrypt.checkpw(p.encode(), admin_hash.encode()):
st.session_state.auth_ok = True
st.rerun()
else:
st.error("Identifiants invalides")
st.markdown("</div>", unsafe_allow_html=True)
st.stop()
require_login()
# -----------------------
# Connexion MySQL
# -----------------------
@st.cache_resource
def get_connection():
load_dotenv()
host = os.getenv("DB_HOST")
port = int(os.getenv("MYSQL_PORT", "3306"))
user = os.getenv("DB_USER")
pwd = os.getenv("DB_PASSWORD")
db = os.getenv("DB_NAME")
missing = [k for k, v in {
"MYSQL_USER": user, "MYSQL_PASSWORD": pwd, "MYSQL_HOST": host, "MYSQL_PORT": port, "MYSQL_DATABASE": db
}.items() if v in (None, "")]
if missing:
raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}")
return mysql.connector.connect(
host=host, port=port, user=user, password=pwd, database=db, autocommit=True
)
# -----------------------
# Utilitaires
# -----------------------
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
def normalize_phone(phone: str | None) -> str | None:
if not phone:
return None
return re.sub(r"[^\d+]", "", phone)
def to_sql_date(d: date | str) -> str:
if isinstance(d, date):
return d.strftime("%Y-%m-%d")
for fmt in ("%Y-%m-%d", "%d/%m/%Y"):
try:
return datetime.strptime(d, fmt).strftime("%Y-%m-%d")
except ValueError:
continue
raise ValueError("Date invalide (attendu: YYYY-MM-DD ou JJ/MM/AAAA)")
def hash_password(plain: str, rounds: int = 12) -> str:
salt = bcrypt.gensalt(rounds=rounds)
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
def user_exists(cursor, username: str, email: str) -> bool:
cursor.execute(
"SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s OR email=%s",
(username, email),
)
(count,) = cursor.fetchone()
return count > 0
def insert_user(cnx, username, full_name, site, password, expires, phone, email, store_plain=False):
if not EMAIL_RE.match(email):
raise ValueError("Email invalide.")
phone_norm = normalize_phone(phone)
exp_sql = to_sql_date(expires)
pwd_hash = hash_password(password)
with cnx.cursor() as cur:
if user_exists(cur, username, email):
raise RuntimeError("Nom d'utilisateur ou email déjà existant.")
if store_plain:
cur.execute(
"""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
""",
(username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email),
)
else:
cur.execute(
"""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email)
VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
""",
(username, full_name, site, None, pwd_hash, exp_sql, phone_norm, email),
)
return pwd_hash
def list_users(cnx, limit=200):
with cnx.cursor(dictionary=True) as cur:
cur.execute(
"""
SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
FROM Utilisateurs
ORDER BY NomUtilisateur ASC
LIMIT %s
""",
(limit,),
)
return cur.fetchall()
def get_user(cnx, username: str):
with cnx.cursor(dictionary=True) as cur:
cur.execute(
"""
SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
FROM Utilisateurs
WHERE NomUtilisateur=%s
""",
(username,),
)
return cur.fetchone()
def update_field(cnx, username: str, field: str, value):
allowed = {
"Nom_complet": "Nom_complet",
"Site": "Site",
"Telephone": "Telephone",
"DateExpiration": "DateExpiration",
"email": "email",
}
if field not in allowed:
raise ValueError("Champ non autorisé.")
sql_field = allowed[field]
if field == "email":
if not EMAIL_RE.match(str(value)):
raise ValueError("Email invalide.")
if field == "Telephone":
value = normalize_phone(str(value)) if value else None
if field == "DateExpiration":
value = to_sql_date(value)
with cnx.cursor() as cur:
cur.execute(
f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s",
(value, username),
)
def update_password(cnx, username: str, new_password: str):
pwd_hash = hash_password(new_password)
with cnx.cursor() as cur:
cur.execute(
"UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s",
(pwd_hash, username),
)
return pwd_hash
# -----------------------
# UI
# -----------------------
st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="wide")
# CSS doux
st.markdown(
"""
<style>
.block-container {padding-top: 1.25rem; padding-bottom: 2rem; max-width: 1400px;}
button[kind="primary"], .stButton>button {height: 2.6rem;}
.stForm {border: 1px solid #eee; padding: 1rem; border-radius: 12px;}
.soft-card {border:1px solid #eee; padding:0.75rem 1rem; border-radius:12px; background:#fafafa;}
</style>
""",
unsafe_allow_html=True
)
st.title("👤 Acces.Utilisateurs")
# Connexion DB (badge)
try:
cnx = get_connection()
st.caption("Connexion MySQL via `.env` : **OK** ✅")
except Exception as e:
st.error(f"Connexion MySQL impossible : {e}")
st.stop()
# Bouton Sortie / Exit
hdr_l, hdr_r = st.columns([1, 0.18])
with hdr_r:
if st.button("🔒 SORTIE / EXIT", use_container_width=True, help="Se déconnecter et verrouiller l'accès"):
st.session_state.auth_ok = False
st.rerun()
# -----------------------
# Onglets
# -----------------------
tab_list, tab_create, tab_edit, tab_security = st.tabs(
["📋 Liste", "🆕 Créer", "✏️ Modifier", "🔐 Sécurité"]
)
# --- Onglet Liste ---
with tab_list:
st.subheader("Utilisateurs récents")
try:
import pandas as pd
rows = list_users(cnx, limit=1000)
df = pd.DataFrame(rows)
if not df.empty:
df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date
from datetime import date as _date
today = _date.today()
expired_mask = df["DateExpiration"] <= today
col_a, col_b = st.columns(2)
col_a.metric("Total utilisateurs", len(df))
col_b.metric("Comptes périmés", int(expired_mask.sum()))
def style_expired(row):
if row["DateExpiration"] <= today:
return ["color: white; background-color: #d32f2f;"] * len(row)
return [""] * len(row)
styled = df.style.apply(style_expired, axis=1)
visible_rows = len(df)
row_px = 36
header_px = 48
padding_px = 24
height = min(1000, header_px + row_px * visible_rows + padding_px)
st.dataframe(
styled,
use_container_width=True,
hide_index=True,
height=height,
)
else:
st.info("Aucun utilisateur à afficher.")
except Exception as e:
st.warning(f"Impossible de lister les utilisateurs : {e}")
# --- Onglet Créer ---
with tab_create:
with st.form("create_user_form", clear_on_submit=False):
st.subheader("Nouveau compte")
c1, c2, c3 = st.columns([1.2, 1.5, 1])
username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier")
full_name = c2.text_input("Nom_complet", placeholder="Clément JAQUIER")
site = c3.text_input("Site", placeholder="Roissy", value="Roissy")
c4, c5, c6 = st.columns([1.4, 1, 1])
email = c4.text_input("email", placeholder="prenom.nom@domaine.com")
phone = c5.text_input("Téléphone", placeholder="06 12 12 35 32")
expires = c6.date_input("DateExpiration", value=date.today())
c7, c8 = st.columns(2)
password = c7.text_input("Mot de passe (saisie)", type="password")
confirm = c8.text_input("Confirmer mot de passe", type="password")
store_plain = st.checkbox("Remplir aussi MotDePasse en clair (déconseillé)", value=False)
submitted = st.form_submit_button("Créer lutilisateur", use_container_width=True)
if submitted:
if not username or not full_name or not site or not email or not password:
st.error("Merci de remplir tous les champs obligatoires.")
elif password != confirm:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
pwd_hash = insert_user(
cnx,
username=username.strip(),
full_name=full_name.strip(),
site=site.strip(),
password=password,
expires=expires,
phone=phone.strip() if phone else None,
email=email.strip(),
store_plain=store_plain,
)
st.success("Utilisateur créé avec succès ✅")
with st.expander("Voir le hash généré (MotDePasseHash)"):
st.code(pwd_hash)
except mysql.connector.Error as db_err:
if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
st.error("Identifiants MySQL invalides.")
else:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
# --- Onglet Modifier ---
with tab_edit:
st.subheader("Modifier un utilisateur existant")
try:
users = list_users(cnx, limit=1000)
usernames = [u["NomUtilisateur"] for u in users]
except Exception as e:
users = []
usernames = []
st.warning(f"Impossible de charger la liste des utilisateurs : {e}")
top_left, top_right = st.columns([1.2, 2])
with top_left:
sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur")
field = st.selectbox(
"Champ à modifier",
["Nom_complet", "Site", "Telephone", "DateExpiration", "email"]
)
if field == "DateExpiration":
new_value = st.date_input("Nouvelle valeur (date)", value=date.today(), key="new_date")
elif field == "Telephone":
new_value = st.text_input("Nouvelle valeur (téléphone)", key="new_tel")
else:
new_value = st.text_input("Nouvelle valeur", key="new_text")
update_btn = st.button(
"Mettre à jour le champ",
type="primary",
use_container_width=True,
disabled=not sel_user,
)
with top_right:
if sel_user:
details = get_user(cnx, sel_user)
if details:
c1, c2, c3, c4, c5 = st.columns(5, gap="small")
c1.markdown(f"<div class='soft-card'><b>Nom</b><br>{details['Nom_complet'] or ''}</div>", unsafe_allow_html=True)
c2.markdown(f"<div class='soft-card'><b>Site</b><br>{details['Site'] or ''}</div>", unsafe_allow_html=True)
c3.markdown(f"<div class='soft-card'><b>Expire</b><br>{details['DateExpiration'] or ''}</div>", unsafe_allow_html=True)
c4.markdown(f"<div class='soft-card'><b>Tél</b><br>{details['Telephone'] or ''}</div>", unsafe_allow_html=True)
c5.markdown(f"<div class='soft-card'><b>Email</b><br>{details['email'] or ''}</div>", unsafe_allow_html=True)
if update_btn and sel_user:
try:
update_field(cnx, sel_user, field, new_value)
st.success(f"{field} mis à jour pour {sel_user}")
except mysql.connector.Error as db_err:
st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
# --- Onglet Sécurité ---
with tab_security:
st.subheader("Réinitialiser le mot de passe (bcrypt)")
try:
if 'user_cache_for_pw' not in st.session_state:
st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)]
pw_user_list = st.session_state.user_cache_for_pw
except Exception:
pw_user_list = []
user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur")
colp1, colp2 = st.columns(2)
new_pw = colp1.text_input("Nouveau mot de passe", type="password")
new_pw2 = colp2.text_input("Confirmer", type="password")
if st.button("Mettre à jour le mot de passe", disabled=not user_pw, use_container_width=True):
if not new_pw:
st.error("Mot de passe vide.")
elif new_pw != new_pw2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
h = update_password(cnx, user_pw, new_pw)
st.success("Mot de passe mis à jour ✅")
st.caption("Hash (MotDePasseHash) :")
st.code(h)
except Exception as e:
st.error(f"Erreur : {e}")