diff --git a/Excel/prod/Ratio_prod.xlsm b/Excel/prod/Ratio_prod.xlsm
index 12db6e4..6b4a483 100644
Binary files a/Excel/prod/Ratio_prod.xlsm and b/Excel/prod/Ratio_prod.xlsm differ
diff --git a/Excel/prod/VERSION.txt b/Excel/prod/VERSION.txt
index 4bdd32f..d920ece 100644
--- a/Excel/prod/VERSION.txt
+++ b/Excel/prod/VERSION.txt
@@ -1 +1 @@
-2.40
+2.41
diff --git a/app/creer_user.py b/app/creer_user.py
index eda7f7f..452257f 100644
--- a/app/creer_user.py
+++ b/app/creer_user.py
@@ -1,11 +1,12 @@
-# app_users.py — création + modification de champs (wide + onglets + grille)
+# Streamlit app
import os
-import re
from datetime import date, datetime
+import re
import bcrypt
import mysql.connector
-from mysql.connector import errorcode
+from mysql.connector import pooling, errorcode
+import pandas as pd
import streamlit as st
from dotenv import load_dotenv
@@ -14,108 +15,110 @@ from dotenv import load_dotenv
# -----------------------
def require_login():
st.markdown(
- """
-
- """,
- unsafe_allow_html=True
+ "",
+ unsafe_allow_html=True,
)
load_dotenv()
admin_user = os.getenv("ADMIN_USER")
admin_hash = os.getenv("ADMIN_PASS_HASH")
-
- if not admin_hash:
- st.error("ADMIN_PASS_HASH manquant dans .env")
+ if not admin_user or not admin_hash:
+ st.error("Variables ADMIN_USER et/ou ADMIN_PASS_HASH manquantes dans .env")
st.stop()
if "auth_ok" not in st.session_state:
st.session_state.auth_ok = False
if not st.session_state.auth_ok:
- st.markdown("
🔐 Accès restreint
", unsafe_allow_html=True)
-
- # conteneur centré
- login_left, login_center, login_right = st.columns([1, 2, 1])
- with login_center:
- st.markdown(
- """
-
- """,
- unsafe_allow_html=True
- )
- u = st.text_input("Utilisateur", key="login_user")
- p = st.text_input("Mot de passe", type="password", key="login_pass")
-
+ col1, col2, col3 = st.columns([1, 2, 1])
+ with col2:
+ st.header("🔐 Accès restreint")
+ u = st.text_input("Utilisateur")
+ p = st.text_input("Mot de passe", type="password")
if st.button("Se connecter", use_container_width=True):
- if u == admin_user and bcrypt.checkpw(p.encode(), admin_hash.encode()):
+ try:
+ ok = (u == admin_user) and bcrypt.checkpw(p.encode(), admin_hash.encode())
+ except Exception:
+ ok = False
+ if ok:
st.session_state.auth_ok = True
st.rerun()
else:
- st.error("Identifiants invalides")
-
- st.markdown("
", unsafe_allow_html=True)
-
+ st.error("Identifiants invalides.")
st.stop()
require_login()
# -----------------------
-# Connexion MySQL
+# Connexion MySQL via pool
# -----------------------
@st.cache_resource
-def get_connection():
+def get_pool():
load_dotenv()
host = os.getenv("DB_HOST")
port = int(os.getenv("MYSQL_PORT", "3306"))
user = os.getenv("DB_USER")
pwd = os.getenv("DB_PASSWORD")
db = os.getenv("DB_NAME")
-
missing = [k for k, v in {
- "MYSQL_USER": user, "MYSQL_PASSWORD": pwd, "MYSQL_HOST": host, "MYSQL_PORT": port, "MYSQL_DATABASE": db
+ "DB_HOST": host, "MYSQL_PORT": port, "DB_USER": user, "DB_PASSWORD": pwd, "DB_NAME": db
}.items() if v in (None, "")]
if missing:
raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}")
-
- return mysql.connector.connect(
- host=host, port=port, user=user, password=pwd, database=db, autocommit=True
+ return pooling.MySQLConnectionPool(
+ pool_name="users_pool",
+ pool_size=5,
+ pool_reset_session=True,
+ host=host, port=port, user=user, password=pwd, database=db,
+ autocommit=True
)
+pool = get_pool()
+
# -----------------------
-# Utilitaires
+# Helpers SQL + validations
# -----------------------
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
+PHONE_RE = re.compile(r"\d{10,14}")
-def normalize_phone(phone: str | None) -> str | None:
- if not phone:
+def normalize_phone(p: str|None) -> str|None:
+ if not p:
return None
- return re.sub(r"[^\d+]", "", phone)
+ digits = re.sub(r"\D", "", p)
+ return digits if PHONE_RE.match(digits) else None
-def to_sql_date(d: date | str) -> str:
- if isinstance(d, date):
- return d.strftime("%Y-%m-%d")
- for fmt in ("%Y-%m-%d", "%d/%m/%Y"):
+def to_sql_date(d: date | str | None) -> str | None:
+ if d is None:
+ return None
+ if isinstance(d, str):
try:
- return datetime.strptime(d, fmt).strftime("%Y-%m-%d")
- except ValueError:
- continue
- raise ValueError("Date invalide (attendu: YYYY-MM-DD ou JJ/MM/AAAA)")
+ d = datetime.fromisoformat(d).date()
+ except Exception:
+ return None
+ return d.strftime("%Y-%m-%d")
def hash_password(plain: str, rounds: int = 12) -> str:
salt = bcrypt.gensalt(rounds=rounds)
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
-def user_exists(cursor, username: str, email: str) -> bool:
- cursor.execute(
+def user_exists(cur, username: str, email: str) -> bool:
+ cur.execute(
"SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s OR email=%s",
(username, email),
)
- (count,) = cursor.fetchone()
+ (count,) = cur.fetchone()
return count > 0
+def list_users(cnx, limit: int = 500):
+ sql = """
+ SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
+ FROM Utilisateurs
+ ORDER BY NomUtilisateur
+ LIMIT %s
+ """
+ with cnx.cursor(dictionary=True) as cur:
+ cur.execute(sql, (limit,))
+ return cur.fetchall()
+
def insert_user(cnx, username, full_name, site, password, expires, phone, email, store_plain=False):
if not EMAIL_RE.match(email):
raise ValueError("Email invalide.")
@@ -140,32 +143,18 @@ def insert_user(cnx, username, full_name, site, password, expires, phone, email,
"""
INSERT INTO Utilisateurs
(NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email)
- VALUES (%s,%s,%s,%s,%s,%s,%s,%s)
+ VALUES (%s,%s,%s,NULL,%s,%s,%s,%s)
""",
- (username, full_name, site, None, pwd_hash, exp_sql, phone_norm, email),
+ (username, full_name, site, pwd_hash, exp_sql, phone_norm, email),
)
return pwd_hash
-def list_users(cnx, limit=200):
+def get_user_details(cnx, username: str):
with cnx.cursor(dictionary=True) as cur:
cur.execute(
"""
SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
- FROM Utilisateurs
- ORDER BY NomUtilisateur ASC
- LIMIT %s
- """,
- (limit,),
- )
- return cur.fetchall()
-
-def get_user(cnx, username: str):
- with cnx.cursor(dictionary=True) as cur:
- cur.execute(
- """
- SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
- FROM Utilisateurs
- WHERE NomUtilisateur=%s
+ FROM Utilisateurs WHERE NomUtilisateur=%s
""",
(username,),
)
@@ -175,8 +164,8 @@ def update_field(cnx, username: str, field: str, value):
allowed = {
"Nom_complet": "Nom_complet",
"Site": "Site",
- "Telephone": "Telephone",
"DateExpiration": "DateExpiration",
+ "Telephone": "Telephone",
"email": "email",
}
if field not in allowed:
@@ -210,81 +199,24 @@ def update_password(cnx, username: str, new_password: str):
# UI
# -----------------------
st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="wide")
+st.title("Gestion des utilisateurs")
-# CSS doux
-st.markdown(
- """
-
- """,
- unsafe_allow_html=True
-)
-
-st.title("👤 Acces.Utilisateurs")
-
-# Connexion DB (badge)
-try:
- cnx = get_connection()
- st.caption("Connexion MySQL via `.env` : **OK** ✅")
-except Exception as e:
- st.error(f"Connexion MySQL impossible : {e}")
- st.stop()
-
-# Bouton Sortie / Exit
-hdr_l, hdr_r = st.columns([1, 0.18])
-with hdr_r:
- if st.button("🔒 SORTIE / EXIT", use_container_width=True, help="Se déconnecter et verrouiller l'accès"):
- st.session_state.auth_ok = False
- st.rerun()
-
-# -----------------------
-# Onglets
-# -----------------------
-tab_list, tab_create, tab_edit, tab_security = st.tabs(
- ["📋 Liste", "🆕 Créer", "✏️ Modifier", "🔐 Sécurité"]
-)
+tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "Créer", "Modifier", "Sécurité"])
# --- Onglet Liste ---
with tab_list:
- st.subheader("Utilisateurs récents")
+ st.subheader("Utilisateurs")
try:
- import pandas as pd
- rows = list_users(cnx, limit=1000)
+ cnx = pool.get_connection()
+ try:
+ rows = list_users(cnx, limit=1000)
+ finally:
+ cnx.close()
df = pd.DataFrame(rows)
-
if not df.empty:
df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date
-
- from datetime import date as _date
- today = _date.today()
- expired_mask = df["DateExpiration"] <= today
-
- col_a, col_b = st.columns(2)
- col_a.metric("Total utilisateurs", len(df))
- col_b.metric("Comptes périmés", int(expired_mask.sum()))
-
- def style_expired(row):
- if row["DateExpiration"] <= today:
- return ["color: white; background-color: #d32f2f;"] * len(row)
- return [""] * len(row)
- styled = df.style.apply(style_expired, axis=1)
-
- visible_rows = len(df)
- row_px = 36
- header_px = 48
- padding_px = 24
- height = min(1000, header_px + row_px * visible_rows + padding_px)
-
- st.dataframe(
- styled,
- use_container_width=True,
- hide_index=True,
- height=height,
- )
+ st.dataframe(df, use_container_width=True, hide_index=True)
+ st.caption(f"{len(df)} utilisateur(s)")
else:
st.info("Aucun utilisateur à afficher.")
except Exception as e:
@@ -306,47 +238,48 @@ with tab_create:
expires = c6.date_input("DateExpiration", value=date.today())
c7, c8 = st.columns(2)
- password = c7.text_input("Mot de passe (saisie)", type="password")
- confirm = c8.text_input("Confirmer mot de passe", type="password")
+ password = c7.text_input("Mot de passe", type="password")
+ password2 = c8.text_input("Confirmer", type="password")
+ store_plain = st.checkbox("Stocker le mot de passe en clair (déconseillé)", value=False)
- store_plain = st.checkbox("Remplir aussi MotDePasse en clair (déconseillé)", value=False)
- submitted = st.form_submit_button("Créer l’utilisateur", use_container_width=True)
+ submitted = st.form_submit_button("Créer l'utilisateur", use_container_width=True)
- if submitted:
- if not username or not full_name or not site or not email or not password:
- st.error("Merci de remplir tous les champs obligatoires.")
- elif password != confirm:
- st.error("Les mots de passe ne correspondent pas.")
- else:
+ if submitted:
+ if not username or not full_name or not site or not email:
+ st.error("Champs requis manquants.")
+ elif password != password2:
+ st.error("Les mots de passe ne correspondent pas.")
+ else:
+ try:
+ cnx = pool.get_connection()
try:
pwd_hash = insert_user(
- cnx,
- username=username.strip(),
- full_name=full_name.strip(),
- site=site.strip(),
- password=password,
- expires=expires,
- phone=phone.strip() if phone else None,
- email=email.strip(),
- store_plain=store_plain,
+ cnx, username=username, full_name=full_name, site=site,
+ password=password, expires=expires, phone=phone, email=email,
+ store_plain=store_plain
)
- st.success("Utilisateur créé avec succès ✅")
- with st.expander("Voir le hash généré (MotDePasseHash)"):
- st.code(pwd_hash)
- except mysql.connector.Error as db_err:
- if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
- st.error("Identifiants MySQL invalides.")
- else:
- st.error(f"Erreur MySQL : {db_err}")
- except Exception as e:
- st.error(f"Erreur : {e}")
+ finally:
+ cnx.close()
+ st.success("Utilisateur créé avec succès ✅")
+ st.caption("Hash (MotDePasseHash) :")
+ st.code(pwd_hash)
+ except mysql.connector.Error as db_err:
+ if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
+ st.error("Identifiants MySQL invalides.")
+ else:
+ st.error(f"Erreur MySQL : {db_err}")
+ except Exception as e:
+ st.error(f"Erreur : {e}")
# --- Onglet Modifier ---
with tab_edit:
st.subheader("Modifier un utilisateur existant")
-
try:
- users = list_users(cnx, limit=1000)
+ cnx = pool.get_connection()
+ try:
+ users = list_users(cnx, limit=1000)
+ finally:
+ cnx.close()
usernames = [u["NomUtilisateur"] for u in users]
except Exception as e:
users = []
@@ -354,43 +287,27 @@ with tab_edit:
st.warning(f"Impossible de charger la liste des utilisateurs : {e}")
top_left, top_right = st.columns([1.2, 2])
-
with top_left:
sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur")
-
field = st.selectbox(
"Champ à modifier",
- ["Nom_complet", "Site", "Telephone", "DateExpiration", "email"]
- )
-
- if field == "DateExpiration":
- new_value = st.date_input("Nouvelle valeur (date)", value=date.today(), key="new_date")
- elif field == "Telephone":
- new_value = st.text_input("Nouvelle valeur (téléphone)", key="new_tel")
- else:
- new_value = st.text_input("Nouvelle valeur", key="new_text")
-
- update_btn = st.button(
- "Mettre à jour le champ",
- type="primary",
- use_container_width=True,
- disabled=not sel_user,
+ ["Nom_complet", "Site", "DateExpiration", "Telephone", "email"]
)
with top_right:
- if sel_user:
- details = get_user(cnx, sel_user)
- if details:
- c1, c2, c3, c4, c5 = st.columns(5, gap="small")
- c1.markdown(f"Nom
{details['Nom_complet'] or ''}
", unsafe_allow_html=True)
- c2.markdown(f"Site
{details['Site'] or ''}
", unsafe_allow_html=True)
- c3.markdown(f"Expire
{details['DateExpiration'] or ''}
", unsafe_allow_html=True)
- c4.markdown(f"Tél
{details['Telephone'] or ''}
", unsafe_allow_html=True)
- c5.markdown(f"Email
{details['email'] or ''}
", unsafe_allow_html=True)
+ if field == "DateExpiration":
+ new_value = st.date_input("Nouvelle valeur", value=date.today())
+ else:
+ new_value = st.text_input("Nouvelle valeur")
+ update_btn = st.button("Mettre à jour", disabled=not sel_user, use_container_width=True)
if update_btn and sel_user:
try:
- update_field(cnx, sel_user, field, new_value)
+ cnx = pool.get_connection()
+ try:
+ update_field(cnx, sel_user, field, new_value)
+ finally:
+ cnx.close()
st.success(f"✅ {field} mis à jour pour {sel_user}")
except mysql.connector.Error as db_err:
st.error(f"Erreur MySQL : {db_err}")
@@ -402,7 +319,11 @@ with tab_security:
st.subheader("Réinitialiser le mot de passe (bcrypt)")
try:
if 'user_cache_for_pw' not in st.session_state:
- st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)]
+ cnx = pool.get_connection()
+ try:
+ st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)]
+ finally:
+ cnx.close()
pw_user_list = st.session_state.user_cache_for_pw
except Exception:
pw_user_list = []
@@ -419,7 +340,11 @@ with tab_security:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
- h = update_password(cnx, user_pw, new_pw)
+ cnx = pool.get_connection()
+ try:
+ h = update_password(cnx, user_pw, new_pw)
+ finally:
+ cnx.close()
st.success("Mot de passe mis à jour ✅")
st.caption("Hash (MotDePasseHash) :")
st.code(h)