diff --git a/.env b/.env
index 7389ee4..79bcc74 100644
--- a/.env
+++ b/.env
@@ -2,9 +2,19 @@
DB_HOST=162.19.78.131
DB_USER=excel
DB_PASSWORD='%n#%3Lay1MPa$%kR^5@'
-DB_NAME=Acces
-ADMIN_USER=Michel
-ADMIN_PASS_HASH='$2b$12$Dgv7jNLJuR.3hQminSVE9OP6hCSmW4nISArR3HF5LTPGFK0Zw29N2'
+DB_NAME=Sondes
+DB_PORT=3306
+AUTH_USERS=[{"user":"Michel","pass":"210462"}]
+
+# === Connexion SSH pour visualiseur_logs.py ===
+SSH_HOST=162.19.78.131
+SSH_PORT=22
+SSH_USER=debian
+SSH_KEY_PATH=/home/debian/.ssh/id_ed25519
+SSH_KEY_PASSPHRASE='gaby'
+SSH_LOG_DIR=/home/debian/Gestion_sondes/Logs
+SSH_PASSWORD='lpZwixbBUFtGY'
+SSH_FORCE_PASSWORD=1
# connexion OVH pour les SMS
OVH_APP_KEY=f725d07b2f98a195
diff --git a/app/Logs.py b/app/Logs.py
new file mode 100644
index 0000000..68db89e
--- /dev/null
+++ b/app/Logs.py
@@ -0,0 +1,225 @@
+# Logs.py
+# Visualiseur de logs SSH ultra-simplifié (Streamlit)
+# - Lecture de la config via .env uniquement
+# - Connexion SSH via clé (chemin) ou mot de passe
+# - Liste des fichiers, tail dernier N, recherche, téléchargement
+
+import os
+import time
+import stat
+import paramiko
+import streamlit as st
+from dotenv import load_dotenv
+from typing import Optional, Tuple
+
+# ================
+# CONFIG .env
+# ================
+load_dotenv() # cherche automatiquement un .env dans le dossier courant
+
+VPS_HOST = os.getenv("SSH_HOST", "").strip()
+VPS_PORT = int(os.getenv("SSH_PORT", 22))
+VPS_USER = os.getenv("SSH_USER", "").strip()
+VPS_PASSWORD = os.getenv("SSH_PASSWORD", "")
+VPS_KEY_PATH = os.getenv("SSH_KEY_PATH", "").strip()
+VPS_LOG_DIR = os.getenv("SSH_LOG_DIR", "/home/debian/Gestion_sondes/Logs").rstrip("/")
+
+# ================
+# UTILITAIRES
+# ================
+# --- Connexion strictement au MOT DE PASSE ---
+@st.cache_resource(show_spinner=False)
+def get_ssh_client_password_only() -> paramiko.SSHClient:
+ host = os.getenv("SSH_HOST", "").strip()
+ user = os.getenv("SSH_USER", "").strip()
+ port = int(os.getenv("SSH_PORT", 22))
+ pwd = os.getenv("SSH_PASSWORD", "")
+
+ if not host or not user or not pwd:
+ raise RuntimeError("SSH_HOST / SSH_USER / SSH_PASSWORD manquant(s) dans .env")
+
+ client = paramiko.SSHClient()
+ client.set_missing_host_key_policy(paramiko.AutoAddPolicy())
+
+ # IMPORTANT : on force l’usage du mot de passe, aucune clé n’est chargée
+ try:
+ client.connect(
+ hostname=host, port=port, username=user,
+ password=pwd, timeout=10, allow_agent=False, look_for_keys=False
+ )
+ # vérifie que le transport est bien up
+ t = client.get_transport()
+ if not t or not t.is_active():
+ raise RuntimeError("Transport SSH inactif après connexion.")
+ return client
+ except Exception as e:
+ raise RuntimeError(f"Échec connexion SSH par mot de passe : {e}") from e
+
+# --- Ouverture SFTP sûre ---
+try:
+ client = get_ssh_client_password_only()
+except Exception as e:
+ st.error(str(e))
+ st.stop()
+
+try:
+ sftp = client.open_sftp()
+except Exception as e:
+ st.error(f"Erreur ouverture SFTP : {e}")
+ try: client.close()
+ except: pass
+ st.stop()
+
+def list_remote_files(sftp: paramiko.SFTPClient, directory: str) -> list[Tuple[str, int, float]]:
+ """
+ Retourne [(nom, taille_bytes, mtime_epoch), ...] triés par mtime desc.
+ """
+ items = []
+ try:
+ for entry in sftp.listdir_attr(directory):
+ mode = entry.st_mode
+ if stat.S_ISREG(mode):
+ items.append((entry.filename, entry.st_size, entry.st_mtime))
+ except FileNotFoundError:
+ st.error(f"❌ Dossier introuvable côté serveur : {directory}")
+ return []
+ except Exception as e:
+ st.error(f"❌ Impossible de lister {directory} : {e}")
+ return []
+ items.sort(key=lambda t: t[2], reverse=True)
+ return items
+
+
+def read_remote_text_tail(
+ client: paramiko.SSHClient, path: str, lines: int = 500, grep: str | None = None
+) -> str:
+ """
+ Lit côté serveur les N dernières lignes (tail -n),
+ compatible .gz via zcat si nécessaire, avec grep optionnel (insensible à la casse).
+ """
+ safe_path = path.replace('"', '\\"')
+ if path.endswith(".gz"):
+ base_cmd = f'zcat "{safe_path}"'
+ else:
+ base_cmd = f'tail -n {max(1, lines)} "{safe_path}" && exit 0 || head -n {max(1, lines)} "{safe_path}"'
+
+ if grep:
+ # -i insensible à la casse ; on passe par grep après la commande de base
+ grep_safe = grep.replace('"', '\\"') # échappe juste les guillemets
+ base_cmd = f'{base_cmd} | grep -i "{grep_safe}" || true'
+
+ stdin, stdout, stderr = client.exec_command(base_cmd, timeout=20)
+ out = stdout.read()
+ err = stderr.read()
+ text = (out or b"").decode("utf-8", errors="replace")
+ # Si zcat sans grep et sans tail (gros fichiers), on limite côté client pour éviter d'assommer Streamlit
+ if path.endswith(".gz") and not grep:
+ lines_list = text.splitlines()
+ text = "\n".join(lines_list[-max(1, lines):])
+ if err and not text:
+ # Afficher l'erreur seulement si rien en sortie
+ text = (b"[stderr] " + err).decode("utf-8", errors="replace")
+ return text
+
+
+def download_remote_file(sftp: paramiko.SFTPClient, path: str) -> bytes:
+ """Télécharge le fichier distant (binaire) en mémoire."""
+ with sftp.file(path, "rb") as f:
+ return f.read()
+
+
+# ================
+# UI STREAMLIT
+# ================
+
+st.set_page_config(page_title="Visualiseur de Logs", layout="wide")
+st.title("📜 Visualiseur de logs (SSH)")
+
+with st.expander("Configuration (lecture seule)", expanded=False):
+ st.code(
+ f"HOST={VPS_HOST}\nUSER={VPS_USER}\nPORT={VPS_PORT}\n"
+ f"KEY_PATH={VPS_KEY_PATH or '-'}\nLOG_DIR={VPS_LOG_DIR}",
+ language="bash"
+ )
+
+client = _get_ssh_client()
+if client is None:
+ st.stop()
+
+try:
+ sftp = client.open_sftp()
+except Exception as e:
+ st.error(f"Erreur ouverture SFTP : {e}")
+ try:
+ client.close()
+ except Exception:
+ pass
+ st.stop()
+
+with st.sidebar:
+ st.header("🔎 Options")
+ refresh = st.button("🔄 Rafraîchir la liste")
+ default_tail = st.number_input("Dernières lignes (tail)", min_value=50, max_value=100_000, value=500, step=50)
+ search_term = st.text_input("Recherche (grep -i)", value="", placeholder="ex: ERROR | Timeout | sonde")
+ st.caption("Astuce : la recherche applique un grep insensible à la casse sur le résultat.")
+
+# Rafraîchissement simple : on rejoue listdir si bouton cliqué
+if refresh:
+ time.sleep(0.2)
+
+files = list_remote_files(sftp, VPS_LOG_DIR)
+if not files:
+ sftp.close()
+ client.close()
+ st.stop()
+
+# Tableau simple (nom, taille, date)
+col1, col2 = st.columns([2, 1])
+with col1:
+ names = [f[0] for f in files]
+ choice = st.selectbox("Fichiers disponibles (triés par date desc.)", names, index=0)
+with col2:
+ # Infos du fichier choisi
+ chosen = next((f for f in files if f[0] == choice), None)
+ if chosen:
+ size_kb = chosen[1] / 1024
+ mtime_str = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(chosen[2]))
+ st.metric("Taille (KB)", f"{size_kb:,.0f}".replace(",", " "))
+ st.metric("Modifié le", mtime_str)
+
+full_path = f"{VPS_LOG_DIR}/{choice}"
+
+# Actions
+place1, place2, place3 = st.columns([1, 1, 6])
+with place1:
+ do_show = st.button("👁️ Afficher")
+with place2:
+ do_dl = st.button("⬇️ Télécharger")
+
+# Affichage
+if do_show:
+ with st.spinner("Lecture du fichier distant..."):
+ text = read_remote_text_tail(
+ client,
+ full_path,
+ lines=int(default_tail),
+ grep=(search_term or None)
+ )
+ st.text_area(f"Contenu : {choice}", text, height=600)
+
+# Téléchargement
+if do_dl:
+ try:
+ data = download_remote_file(sftp, full_path)
+ st.download_button(
+ label=f"Télécharger {choice}",
+ data=data,
+ file_name=choice,
+ mime="application/octet-stream"
+ )
+ except Exception as e:
+ st.error(f"❌ Échec du téléchargement : {e}")
+
+# Fermeture propre
+sftp.close()
+client.close()
diff --git a/app/tracker.py b/app/tracker.py
new file mode 100644
index 0000000..f817c6e
--- /dev/null
+++ b/app/tracker.py
@@ -0,0 +1,371 @@
+# tracker.py
+# -------------------------------------------------------------
+# Streamlit — Gestion de la table MySQL Sondes.tracker (avec address_hyphen)
+# -------------------------------------------------------------
+# Schéma attendu :
+# id (PK), address (ROM {0x..}), address_hyphen (28-..-..-..-..-..-..-..),
+# lieu, repere, mise_en_service (DATE), res_bits, date (timestamp)
+# Authentification intégrée via .env (AUTH_USERS JSON)
+# -------------------------------------------------------------
+import re
+import time
+import json
+import hmac
+from typing import Optional
+from datetime import date
+
+import pandas as pd
+import streamlit as st
+import mysql.connector as mysql
+from contextlib import contextmanager
+from dotenv import load_dotenv
+import os
+from pathlib import Path
+
+# 1) .env (avec recherche automatique depuis le fichier courant vers la racine)
+try:
+ from dotenv import load_dotenv, find_dotenv
+ load_dotenv(find_dotenv(usecwd=True), override=False)
+except Exception:
+ pass # si python-dotenv n'est pas installé, on passe (Streamlit peut fournir st.secrets)
+
+# 2) Streamlit secrets (optionnel si tu l’utilises)
+try:
+ import streamlit as st
+ st_secrets_mysql = st.secrets.get("mysql", {})
+except Exception:
+ st_secrets_mysql = {}
+
+def get_db_cfg():
+ # Priorité à st.secrets si dispo, sinon variables d’environnement (.env)
+ cfg = {
+ "host": st_secrets_mysql.get("host") or os.getenv("DB_HOST"),
+ "user": st_secrets_mysql.get("user") or os.getenv("DB_USER"),
+ "password": st_secrets_mysql.get("password") or os.getenv("DB_PASSWORD"),
+ "database": st_secrets_mysql.get("database") or os.getenv("DB_NAME") or os.getenv("DB_DATABASE"),
+ "port": int(st_secrets_mysql.get("port") or os.getenv("DB_PORT") or 3306),
+ "auth_plugin": os.getenv("DB_AUTH_PLUGIN") or None, # optionnel
+ }
+ # Nettoyage: retire les clés None
+ return {k: v for k, v in cfg.items() if v not in (None, "")}
+# ==========================
+# Configuration / Constantes
+# ==========================
+load_dotenv() # lit .env si présent
+
+TABLE_NAME = "tracker"
+COL_ID = "id"
+COL_ADDRESS = "address" # format ROM : {0x28,0xFF,...}
+COL_ADDR_HYPHEN = "address_hyphen" # format hyphen : 28-xx-xx-xx-xx-xx-xx-xx
+COL_LIEU = "lieu"
+COL_REPERE = "repere"
+COL_MES = "mise_en_service" # DATE
+COL_RESBITS = "res_bits"
+COL_DATE = "date"
+
+# Configuration BDD (standardisée sur les variables d'env MYSQL_*)
+DB_CFG = dict(
+ host=os.getenv("DB_HOST"),
+ user=os.getenv("DB_USER"),
+ password=os.getenv("DB_PASS"),
+ database=os.getenv("DB_NAME"),
+ port=int(os.getenv("MYSQL_PORT", "3306")),
+)
+
+# Regex d'une ROM code DS18B20 au format {0x28,0xFF,0xAA,0xBB,0xCC,0xDD,0xEE,0xCRC}
+ROM_REGEX = re.compile(r"^{(?:0x[0-9A-Fa-f]{2},){7}0x[0-9A-Fa-f]{2}}$")
+# Adresse hyphen : 8 octets hexa séparés par des tirets
+HYPHEN_REGEX = re.compile(r"^[0-9A-Fa-f]{2}(?:-[0-9A-Fa-f]{2}){7}$")
+
+# Mapping résolution DS18B20 (bits -> infos)
+RES_MAP = {
+ 9: {"precision": 0.5, "tconv_ms": 94},
+ 10: {"precision": 0.25, "tconv_ms": 188},
+ 11: {"precision": 0.125, "tconv_ms": 375},
+ 12: {"precision": 0.0625,"tconv_ms": 750},
+}
+
+# ==================
+# Authentification via .env (AUTH_USERS)
+# ==================
+AUTH_USERS_RAW = os.getenv("AUTH_USERS", "[]")
+
+def _load_users() -> dict:
+ try:
+ data = json.loads(AUTH_USERS_RAW)
+ return {str(d.get("user", "")).strip(): str(d.get("pass", "")) for d in data if d.get("user") and d.get("pass")}
+ except Exception:
+ return {}
+
+USERS = _load_users()
+
+def _constant_time_equals(a: str, b: str) -> bool:
+ return hmac.compare_digest(a.encode("utf-8"), b.encode("utf-8"))
+
+def verify_credentials(username: str, password: str) -> bool:
+ if not username or not password:
+ return False
+ expected = USERS.get(username.strip())
+ if expected is None:
+ return False
+ return _constant_time_equals(password, expected)
+
+def require_login() -> Optional[str]:
+ if st.session_state.get("auth_ok") and st.session_state.get("auth_user"):
+ return st.session_state.get("auth_user")
+
+ st.markdown("
🔒 Tracker
", unsafe_allow_html=True)
+ _, col2, _ = st.columns([1, 2, 1])
+ with col2:
+ with st.form("login_form", clear_on_submit=False):
+ username = st.text_input("Utilisateur")
+ password = st.text_input("Mot de passe", type="password")
+ ok = st.form_submit_button("Se connecter")
+ if ok:
+ if verify_credentials(username, password):
+ st.session_state["auth_ok"] = True
+ st.session_state["auth_user"] = username.strip()
+ st.success("Connexion réussie.")
+ time.sleep(0.3)
+ st.rerun()
+ else:
+ st.error("Identifiants invalides.")
+ st.stop()
+
+# ==================
+# Accès Base de Données
+# ==================
+@contextmanager
+def get_conn():
+ cfg = get_db_cfg()
+ # Astuce debug si besoin :
+ # print({k: ('***' if k=='password' else v) for k,v in cfg.items()})
+ conn = mysql.connect(**cfg)
+ try:
+ yield conn
+ finally:
+ conn.close()
+
+# -----------------
+# Utilitaires
+# -----------------
+def rom_help() -> str:
+ return (
+ "Format ROM attendu : `{0x28,0xFF,0xAA,0xBB,0xCC,0xDD,0xEE,0x12}` "
+ "(8 octets en hex). Le premier octet est souvent 0x28 pour DS18B20."
+ )
+
+def is_valid_rom(address: str) -> bool:
+ return bool(ROM_REGEX.match(str(address).strip()))
+
+def is_valid_hyphen(address_h: str) -> bool:
+ return bool(HYPHEN_REGEX.match(str(address_h).strip()))
+
+def rom_to_hyphen(rom: str) -> str:
+ hexes = re.findall(r"0x([0-9A-Fa-f]{2})", str(rom))
+ if len(hexes) != 8:
+ return ""
+ return "-".join(h.lower() for h in hexes)
+
+def hyphen_to_rom(h: str) -> str:
+ parts = str(h).strip().split("-")
+ if len(parts) != 8 or not all(re.fullmatch(r"[0-9A-Fa-f]{2}", p) for p in parts):
+ return ""
+ return "{" + ",".join(f"0x{p.upper()}" for p in parts) + "}"
+
+def res_label(bits: int) -> str:
+ info = RES_MAP.get(bits)
+ if not info:
+ return f"{bits} bits (inconnu)"
+ return f"{bits} bits (±{info['precision']}°C, {info['tconv_ms']} ms)"
+
+# -----------------
+# Fonctions SQL
+# -----------------
+def fetch_trackers(where_lieu: str | None = None) -> pd.DataFrame:
+ query = (
+ f"SELECT {COL_ID}, {COL_ADDRESS}, {COL_ADDR_HYPHEN}, {COL_LIEU}, "
+ f"{COL_REPERE}, {COL_MES}, {COL_RESBITS}, {COL_DATE} "
+ f"FROM {TABLE_NAME}"
+ )
+ params = []
+ if where_lieu:
+ query += f" WHERE {COL_LIEU} = %s"
+ params.append(where_lieu)
+ query += f" ORDER BY {COL_LIEU}, {COL_REPERE}, {COL_ADDR_HYPHEN}, {COL_ADDRESS}"
+ with get_conn() as conn:
+ df = pd.read_sql(query, conn, params=params)
+ return df
+
+def insert_tracker(address: str, lieu: str, res_bits: int,
+ repere: str | None = None, mise_en_service: date | None = None,
+ address_hyphen: str | None = None) -> int:
+ addr_rom = (address or "").strip() if address else ""
+ addr_hyp = (address_hyphen or "").strip() if address_hyphen else ""
+ if addr_rom and not addr_hyp:
+ addr_hyp = rom_to_hyphen(addr_rom)
+ if addr_hyp and not addr_rom:
+ addr_rom = hyphen_to_rom(addr_hyp)
+ if not is_valid_rom(addr_rom) or not is_valid_hyphen(addr_hyp):
+ raise ValueError("Adresse invalide (ROM ou hyphen).")
+ sql = f"""
+ INSERT INTO {TABLE_NAME}
+ ({COL_ADDRESS}, {COL_ADDR_HYPHEN}, {COL_LIEU}, {COL_REPERE}, {COL_MES}, {COL_RESBITS})
+ VALUES (%s, %s, %s, %s, %s, %s)
+ """
+ with get_conn() as conn:
+ cur = conn.cursor()
+ cur.execute(sql, (
+ addr_rom,
+ addr_hyp.lower(),
+ lieu,
+ (repere.strip() if repere and str(repere).strip() else None),
+ mise_en_service,
+ res_bits,
+ ))
+ conn.commit()
+ return cur.lastrowid
+
+def update_tracker(row_id: int, address: str, lieu: str, res_bits: int,
+ repere: str | None, mise_en_service: date | None,
+ address_hyphen: str | None = None) -> None:
+ addr_rom = (address or "").strip()
+ addr_hyp = (address_hyphen or "").strip() if address_hyphen else ""
+ if addr_rom and not addr_hyp:
+ addr_hyp = rom_to_hyphen(addr_rom)
+ if addr_hyp and not addr_rom:
+ addr_rom = hyphen_to_rom(addr_hyp)
+ if not is_valid_rom(addr_rom) or not is_valid_hyphen(addr_hyp):
+ raise ValueError("Adresse invalide (ROM ou hyphen).")
+ sql = f"""
+ UPDATE {TABLE_NAME}
+ SET {COL_ADDRESS}=%s, {COL_ADDR_HYPHEN}=%s, {COL_LIEU}=%s, {COL_REPERE}=%s, {COL_MES}=%s, {COL_RESBITS}=%s
+ WHERE {COL_ID}=%s
+ """
+ with get_conn() as conn:
+ cur = conn.cursor()
+ cur.execute(sql, (
+ addr_rom,
+ addr_hyp.lower(),
+ lieu,
+ (repere.strip() if repere and str(repere).strip() else None),
+ mise_en_service,
+ res_bits,
+ row_id,
+ ))
+ conn.commit()
+
+def delete_tracker(row_id: int) -> None:
+ sql = f"DELETE FROM {TABLE_NAME} WHERE {COL_ID}=%s"
+ with get_conn() as conn:
+ cur = conn.cursor()
+ cur.execute(sql, (row_id,))
+ conn.commit()
+
+# ==================
+# Application Streamlit
+# ==================
+st.set_page_config(page_title="Tracker", page_icon="🌡️", layout="wide")
+user = require_login()
+
+st.title("🌡️ Gestion du parc sondes (stock ou installées)")
+with st.expander("Paramètres de connexion (lecture seule)"):
+ st.write({k: ("***" if k in {"password"} else v) for k, v in DB_CFG.items()})
+ st.caption("Configurez ces valeurs via le fichier .env")
+
+st.sidebar.header("Filtres & Actions")
+st.sidebar.caption(f"Connecté en tant que **{user}**")
+
+_all = fetch_trackers()
+lieux = sorted([x for x in _all[COL_LIEU].dropna().unique()]) if not _all.empty else []
+lieu_selected = st.sidebar.selectbox("Filtrer par lieu", options=["(Tous)"] + lieux, index=0)
+
+# Formulaire d'ajout
+st.sidebar.subheader("Ajouter une sonde")
+with st.sidebar.form("add_form", clear_on_submit=True):
+ new_address = st.text_input("Adresse ROM", placeholder="{0x28,0xFF,...}", help=rom_help())
+ preview_h = rom_to_hyphen(new_address) if new_address else ""
+ st.text_input("Adresse hyphen (auto)", value=preview_h, disabled=True)
+ new_lieu = st.text_input("Lieu d'installation")
+ new_repere = st.text_input("Repère (optionnel)")
+ new_mes = st.date_input("Mise en service (optionnel)", value=None, format="YYYY-MM-DD")
+ new_res = st.selectbox("Résolution (bits)", options=[9,10,11,12])
+ submitted = st.form_submit_button("Ajouter")
+ if submitted:
+ if not is_valid_rom(new_address):
+ st.warning("Adresse ROM invalide.")
+ elif not new_lieu.strip():
+ st.warning("Lieu requis.")
+ else:
+ rid = insert_tracker(
+ new_address.strip(),
+ new_lieu.strip(),
+ int(new_res),
+ new_repere,
+ new_mes if isinstance(new_mes, date) else None,
+ address_hyphen=rom_to_hyphen(new_address.strip()),
+ )
+ st.success(f"Sonde ajoutée (id={rid}).")
+ time.sleep(0.6)
+ st.rerun()
+
+st.sidebar.divider()
+if st.sidebar.button("Se déconnecter"):
+ for _k in list(st.session_state.keys()):
+ st.session_state.pop(_k, None)
+ st.success("Déconnecté.")
+ time.sleep(0.3)
+ st.rerun()
+
+# Vue principale
+if lieu_selected != "(Tous)":
+ df = fetch_trackers(where_lieu=lieu_selected)
+else:
+ df = _all.copy()
+
+if df.empty:
+ st.info("Aucune sonde enregistrée.")
+else:
+ df["resolution"] = df[COL_RESBITS].apply(res_label)
+ st.subheader("Enregistrements")
+ edited = st.data_editor(
+ df[[COL_ID, COL_ADDRESS, COL_ADDR_HYPHEN, COL_LIEU, COL_REPERE, COL_MES, COL_RESBITS, "resolution", COL_DATE]],
+ hide_index=True,
+ use_container_width=True,
+ num_rows="dynamic",
+ )
+ removed_ids = set(df[COL_ID]) - set(edited[COL_ID])
+ to_update = []
+ for _, row in edited.iterrows():
+ orig = df.loc[df[COL_ID] == row[COL_ID]].iloc[0]
+ changed = (
+ (row[COL_ADDRESS] != orig[COL_ADDRESS]) or
+ (row[COL_ADDR_HYPHEN] != orig[COL_ADDR_HYPHEN]) or
+ (row[COL_LIEU] != orig[COL_LIEU]) or
+ (row.get(COL_REPERE) or "") != (orig.get(COL_REPERE) or "") or
+ (str(row.get(COL_MES) or "")[:10] != str(orig.get(COL_MES) or "")[:10]) or
+ (int(row[COL_RESBITS]) != int(orig[COL_RESBITS]))
+ )
+ if changed:
+ mes_val = row.get(COL_MES)
+ if pd.isna(mes_val):
+ mes_val = None
+ elif hasattr(mes_val, "date"):
+ mes_val = mes_val.date()
+ to_update.append((int(row[COL_ID]), str(row[COL_ADDRESS]), str(row[COL_ADDR_HYPHEN] or ""),
+ str(row[COL_LIEU]), int(row[COL_RESBITS]),
+ str(row.get(COL_REPERE) or None), mes_val))
+ col1, col2 = st.columns([1,1])
+ if st.button("Enregistrer les modifications"):
+ for rid, addr_rom, addr_hyp, lieu, rbits, repere, mes in to_update:
+ update_tracker(rid, addr_rom, lieu, rbits, repere, mes, address_hyphen=addr_hyp)
+ for rid in removed_ids:
+ delete_tracker(rid)
+ st.success("Modifications enregistrées ✔️")
+ time.sleep(0.6)
+ st.rerun()
+ if st.button("Annuler"):
+ st.rerun()
+
+st.divider()
+st.caption("Astuce : collez une adresse ROM {0x..,...} → la version hyphen est générée automatiquement.")
diff --git a/app/users.py b/app/users.py
index 9401dfc..635a99f 100644
--- a/app/users.py
+++ b/app/users.py
@@ -1,398 +1,271 @@
-# Streamlit app
+# users.py
+# ------------------------------------------------------------
+# Gestion des utilisateurs + droits d'accès par site (DroitsSites)
+# - Streamlit UI
+# - MySQL (mysql-connector-python)
+# - Hash de mot de passe avec bcrypt
+#
+# .env attendu (ou variables d'env) :
+# DB_HOST=...
+# DB_PORT=3306
+# DB_USER=...
+# DB_PASS=...
+# DB_NAME=Acces # <- base "annuaire" qui contient Utilisateurs/DroitsSites/Connexions
+# ------------------------------------------------------------
+
import os
-from datetime import date, datetime
import re
-import bcrypt
-import mysql.connector
-from mysql.connector import pooling, errorcode
-import pandas as pd
+from datetime import date, datetime
+from typing import List, Tuple, Dict, Optional
+
import streamlit as st
-from dotenv import load_dotenv
-import smtplib
-from email.message import EmailMessage
-from email.utils import formatdate
+import mysql.connector
+from mysql.connector.pooling import MySQLConnectionPool
-load_dotenv()
-SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net")
-SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # OVH: 465 (SSL) ou 587 (STARTTLS)
-SMTP_USER = os.getenv("SMTP_USER")
-SMTP_PASS = os.getenv("SMTP_PASS")
-SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER)
-
-# -----------------------
-# Auth minimale
-# -----------------------
-def require_login():
- st.markdown(
- "",
- unsafe_allow_html=True,
- )
+try:
+ from dotenv import load_dotenv # facultatif mais pratique
load_dotenv()
- admin_user = os.getenv("ADMIN_USER")
- admin_hash = os.getenv("ADMIN_PASS_HASH")
- if not admin_user or not admin_hash:
- st.error("Variables ADMIN_USER et/ou ADMIN_PASS_HASH manquantes dans .env")
- st.stop()
+except Exception:
+ pass
- if "auth_ok" not in st.session_state:
- st.session_state.auth_ok = False
+try:
+ import bcrypt # pip install bcrypt
+except Exception:
+ bcrypt = None
- if not st.session_state.auth_ok:
- col1, col2, col3 = st.columns([1, 2, 1])
- with col2:
- st.header("🔐 Accès restreint")
- u = st.text_input("Utilisateur")
- p = st.text_input("Mot de passe", type="password")
- if st.button("Se connecter", use_container_width=True):
- try:
- ok = (u == admin_user) and bcrypt.checkpw(p.encode(), admin_hash.encode())
- except Exception:
- ok = False
- if ok:
- st.session_state.auth_ok = True
- st.rerun()
- else:
- st.error("Identifiants invalides.")
- st.stop()
-def logout_button():
- st.markdown(
- """
-
- """,
- unsafe_allow_html=True
- )
- st.markdown('', unsafe_allow_html=True)
- if st.button("🚪 Quitter", key="logout", use_container_width=False):
- st.session_state.clear()
- st.success("Déconnexion effectuée.")
- st.rerun()
- st.markdown('
', unsafe_allow_html=True)
+# ---------------------------
+# Config / Connexion MySQL
+# ---------------------------
+DB_HOST = os.getenv("DB_HOST", "localhost")
+DB_PORT = int(os.getenv("DB_PORT", "3306"))
+DB_USER = os.getenv("DB_USER", "root")
+DB_PASS = os.getenv("DB_PASS", "")
+DB_NAME = os.getenv("DB_NAME", "Acces")
+pool: Optional[MySQLConnectionPool] = None
+def init_pool():
+ global pool
+ if pool is None:
+ pool = MySQLConnectionPool(
+ pool_name="users_pool",
+ pool_size=5,
+ host=DB_HOST,
+ port=DB_PORT,
+ user=DB_USER,
+ password=DB_PASS,
+ database=DB_NAME,
+ autocommit=True,
+ charset="utf8mb4",
+ collation="utf8mb4_general_ci",
+ )
-require_login()
-logout_button()
+init_pool()
-# -----------------------
-# Connexion MySQL via pool
-# -----------------------
-@st.cache_resource
-def get_pool():
- load_dotenv()
- host = os.getenv("DB_HOST")
- port = int(os.getenv("MYSQL_PORT", "3306"))
- user = os.getenv("DB_USER")
- pwd = os.getenv("DB_PASSWORD")
- db = os.getenv("DB_NAME")
- missing = [k for k, v in {
- "DB_HOST": host, "MYSQL_PORT": port, "DB_USER": user, "DB_PASSWORD": pwd, "DB_NAME": db
- }.items() if v in (None, "")]
- if missing:
- raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}")
- return pooling.MySQLConnectionPool(
- pool_name="users_pool",
- pool_size=5,
- pool_reset_session=True,
- host=host, port=port, user=user, password=pwd, database=db,
- autocommit=True
- )
-
-pool = get_pool()
-
-# -----------------------
-# Helpers SQL + validations
-# -----------------------
EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$")
-PHONE_RE = re.compile(r"\d{10,14}")
-def normalize_phone(p: str|None) -> str|None:
- if not p:
- return None
- digits = re.sub(r"\D", "", p)
- return digits if PHONE_RE.match(digits) else None
-def to_sql_date(d: date | str | None) -> str | None:
- if d is None:
- return None
- if isinstance(d, str):
- try:
- d = datetime.fromisoformat(d).date()
- except Exception:
- return None
- return d.strftime("%Y-%m-%d")
-
-def hash_password(plain: str, rounds: int = 12) -> str:
- salt = bcrypt.gensalt(rounds=rounds)
+# ---------------------------
+# Utils Sécurité
+# ---------------------------
+def hash_password(plain: str) -> str:
+ if not bcrypt:
+ raise RuntimeError("Le module 'bcrypt' est requis (pip install bcrypt).")
+ salt = bcrypt.gensalt(rounds=12)
return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8")
-def user_exists(cur, username: str) -> bool:
- cur.execute("SELECT COUNT(*) FROM Utilisateurs WHERE NomUtilisateur=%s", (username,))
- (count,) = cur.fetchone()
- return count > 0
-def find_users_by_email(cnx, email: str):
- cur = cnx.cursor(dictionary=True)
- try:
- cur.execute(
- "SELECT NomUtilisateur, Site FROM Utilisateurs WHERE email=%s ORDER BY NomUtilisateur",
- (email,),
- )
- return cur.fetchall()
- finally:
- cur.close()
-def list_users(cnx, limit: int = 500, include_password=False):
- fields = ["NomUtilisateur", "Nom_complet", "Site", "DateExpiration", "Telephone", "email"]
- if include_password:
- fields.append("MotDePasse")
- sql = f"SELECT {', '.join(fields)} FROM Utilisateurs ORDER BY NomUtilisateur LIMIT %s"
-
- cur = cnx.cursor(dictionary=True)
- try:
- cur.execute(sql, (limit,))
- return cur.fetchall()
- finally:
- cur.close()
-
-def insert_user(cnx, username, full_name, site, password, expires, phone, email, role):
- if not EMAIL_RE.match(email):
- raise ValueError("Email invalide.")
- phone_norm = normalize_phone(phone)
- exp_sql = to_sql_date(expires)
- pwd_hash = hash_password(password)
+# ---------------------------
+# Accès SQL - Helpers
+# ---------------------------
+def list_all_sites(cnx) -> List[Tuple[str, str]]:
+ """Retourne [(dsn, bdd), ...] depuis Connexions."""
cur = cnx.cursor()
try:
- if user_exists(cur, username):
- raise RuntimeError("Nom d'utilisateur déjà existant.")
- cur.execute(
- """
- INSERT INTO Utilisateurs
- (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email, role)
- VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s)
- """,
- (username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email, role),
- )
- return pwd_hash
+ cur.execute("SELECT DSN, BDD FROM Connexions ORDER BY BDD")
+ return cur.fetchall()
finally:
cur.close()
+def insert_user(cnx, username: str, full_name: str, pwd_hash: str,
+ expires: date, phone: str, email: str, role: str,
+ site_legacy: Optional[str] = None) -> None:
+ """
+ Insère un utilisateur.
+ - 'Site' est obsolète : on le laisse à NULL/'' si la colonne existe encore.
+ """
+ cur = cnx.cursor()
+ try:
+ # Vérifie si la colonne Site existe (schéma en transition)
+ cur.execute("SHOW COLUMNS FROM Utilisateurs LIKE 'Site'")
+ has_site_col = cur.fetchone() is not None
+ if has_site_col:
+ cur.execute("""
+ INSERT INTO Utilisateurs
+ (NomUtilisateur, Nom_complet, MotDePasseHash, DateExpiration,
+ Telephone, email, role )
+ VALUES (%s, %s, %s, %s, %s, %s, %s, %s)
+ """, (username, full_name, pwd_hash, expires, phone, email, role, site_legacy or ""))
+ else:
+ cur.execute("""
+ INSERT INTO Utilisateurs
+ (NomUtilisateur, Nom_complet, MotDePasseHash, DateExpiration,
+ Telephone, email, role)
+ VALUES (%s, %s, %s, %s, %s, %s, %s)
+ """, (username, full_name, pwd_hash, expires, phone, email, role))
+ finally:
+ cur.close()
-def get_user_details(cnx, username: str):
+def update_user_core(cnx, username: str, full_name: str,
+ expires: date, phone: str, email: str, role: str) -> None:
+ cur = cnx.cursor()
+ try:
+ cur.execute("""
+ UPDATE Utilisateurs
+ SET Nom_complet=%s,
+ DateExpiration=%s,
+ Telephone=%s,
+ email=%s,
+ role=%s
+ WHERE NomUtilisateur=%s
+ """, (full_name, expires, phone, email, role, username))
+ finally:
+ cur.close()
+
+def update_user_password(cnx, username: str, pwd_hash: str) -> None:
+ cur = cnx.cursor()
+ try:
+ cur.execute("""
+ UPDATE Utilisateurs
+ SET MotDePasseHash=%s
+ WHERE NomUtilisateur=%s
+ """, (pwd_hash, username))
+ finally:
+ cur.close()
+
+def list_users(cnx) -> List[Dict]:
cur = cnx.cursor(dictionary=True)
try:
- cur.execute(
- """
- SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email
- FROM Utilisateurs
- WHERE NomUtilisateur=%s
- """,
- (username,),
- )
+ cur.execute("""
+ SELECT u.NomUtilisateur, u.Nom_complet, u.role, u.DateExpiration,
+ u.Telephone, u.email
+ FROM Utilisateurs u
+ ORDER BY u.NomUtilisateur
+ """)
+ return cur.fetchall()
+ finally:
+ cur.close()
+
+def get_user(cnx, username: str) -> Optional[Dict]:
+ cur = cnx.cursor(dictionary=True)
+ try:
+ cur.execute("""
+ SELECT u.NomUtilisateur, u.Nom_complet, u.role, u.DateExpiration,
+ u.Telephone, u.email
+ FROM Utilisateurs u
+ WHERE u.NomUtilisateur=%s
+ """, (username,))
return cur.fetchone()
finally:
cur.close()
-def update_field(cnx, username: str, field: str, value):
- allowed = {
- "Nom_complet": "Nom_complet",
- "Site": "Site",
- "DateExpiration": "DateExpiration",
- "Telephone": "Telephone",
- "email": "email",
- }
- if field not in allowed:
- raise ValueError("Champ non autorisé.")
- sql_field = allowed[field]
-
- if field == "email":
- if not EMAIL_RE.match(str(value)):
- raise ValueError("Email invalide.")
- if field == "Telephone":
- value = normalize_phone(str(value)) if value else None
- if field == "DateExpiration":
- value = to_sql_date(value)
-
+def delete_user(cnx, username: str) -> None:
cur = cnx.cursor()
try:
- cur.execute(
- f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s",
- (value, username),
- )
+ # Nettoie d'abord les droits si contrainte FK
+ cur.execute("DELETE FROM DroitsSites WHERE NomUtilisateur=%s", (username,))
+ cur.execute("DELETE FROM Utilisateurs WHERE NomUtilisateur=%s", (username,))
finally:
cur.close()
-def update_password(cnx, username: str, new_password: str):
- pwd_hash = hash_password(new_password)
+def get_user_sites(cnx, username: str) -> List[str]:
+ """Retourne la liste des DSN autorisés pour l'utilisateur."""
cur = cnx.cursor()
try:
- cur.execute(
- "UPDATE Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s",
- (pwd_hash, username),
- )
- return pwd_hash
+ cur.execute("""
+ SELECT d.DSN
+ FROM DroitsSites d
+ WHERE d.NomUtilisateur=%s
+ ORDER BY d.DSN
+ """, (username,))
+ return [r[0] for r in cur.fetchall()]
finally:
cur.close()
-def send_mail(to_email: str, subject: str, body_text: str, body_html: str | None = None):
- if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM):
- raise RuntimeError("Configuration SMTP incomplète (SMTP_HOST/PORT/USER/PASS/FROM).")
- if not to_email:
- raise ValueError("Destinataire vide.")
-
- msg = EmailMessage()
- msg["From"] = SMTP_FROM
- msg["To"] = to_email
- msg["Date"] = formatdate(localtime=True)
- msg["Subject"] = subject
- msg.set_content(body_text)
- if body_html:
- msg.add_alternative(body_html, subtype="html")
-
- # Choix du protocole en fonction du port
- if SMTP_PORT == 465:
- with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=20) as s:
- s.login(SMTP_USER, SMTP_PASS)
- s.send_message(msg)
- else:
- # 587 (ou autre) : STARTTLS
- with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as s:
- s.ehlo()
- s.starttls() # passe en TLS
- s.ehlo()
- s.login(SMTP_USER, SMTP_PASS)
- s.send_message(msg)
-
- def get_user_email_and_field(cnx, username: str, field: str):
- cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"}
- if field not in cols:
- field = "Nom_complet" # fallback
- with cnx.cursor(dictionary=True) as cur:
- cur.execute(
- f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s",
- (username,)
- )
- row = cur.fetchone()
- return (row.get("email") if row else None, row.get("field_value") if row else None)
-
-def get_user_email_and_field(cnx, username: str, field: str):
- cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"}
- if field not in cols:
- field = "Nom_complet" # fallback
-
- cur = cnx.cursor(dictionary=True)
+def grant_sites_to_user(cnx, username: str, dsns: Optional[List[str]], grant_all: bool = False) -> None:
+ """
+ Alimente DroitsSites pour un utilisateur.
+ - grant_all=True : accorde tous les DSN présents dans Connexions
+ - sinon : insère uniquement les DSN de la liste
+ """
+ cur = cnx.cursor()
try:
- cur.execute(
- f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s",
- (username,),
- )
- row = cur.fetchone()
- return (row.get("email") if row else None,
- row.get("field_value") if row else None)
- finally:
- cur.close()
-# -----------------------
-# UI
-# -----------------------
-st.set_page_config(page_title="Acces.Utilisateurs", page_icon="👤", layout="wide")
-st.title("Gestion des utilisateurs")
-
-tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "Créer", "Modifier", "Sécurité"])
-
-# -----------------------
-# ONGLET LISTE
-# -----------------------
-with tab_list:
-
- st.subheader("Utilisateurs")
- try:
- cnx = pool.get_connection()
- try:
- include_pw = os.getenv("ADMIN_USER") == st.session_state.get("current_user", os.getenv("ADMIN_USER"))
- rows = list_users(cnx, limit=1000, include_password=include_pw)
- finally:
- cnx.close()
-
- df = pd.DataFrame(rows)
- if not df.empty:
- from datetime import date
- df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date
-
- # --- Ajout coloration et statut expiration ---
- ALERTE_JOURS = 30
- exp = pd.to_datetime(df["DateExpiration"], errors="coerce")
- today = pd.Timestamp(date.today())
-
- df["Jours_restant"] = (exp - today).dt.days
-
- def statut_expiration(j):
- if pd.isna(j):
- return "Inconnu"
- j = int(j)
- if j < 0:
- return "Périmé"
- if j <= ALERTE_JOURS:
- return f"Bientôt (≤{ALERTE_JOURS}j)"
- return "OK"
-
- df["Statut"] = df["Jours_restant"].apply(statut_expiration)
-
- c1, c2, c3 = st.columns([1, 1, 3])
- with c1:
- only_bad = st.checkbox("📌 Périmés / bientôt", value=False)
- with c2:
- tri_jours = st.checkbox("🔽 Trier par Jours restants", value=True)
-
- if only_bad:
- df = df[df["Statut"].isin(["Périmé", f"Bientôt (≤{ALERTE_JOURS}j)"])]
-
- if tri_jours and "Jours_restant" in df.columns:
- df = df.sort_values("Jours_restant", na_position="last")
-
- def colorize(row):
- stt = row.get("Statut", "")
- n = len(row)
- if stt == "Périmé":
- return ["background-color:#ffe6e6; color:#b00020;"] * n # rouge clair
- if stt.startswith("Bientôt"):
- return ["background-color:#fff7e6; color:#8a6d3b;"] * n # orange clair
- return [""] * n
-
- styled = df.style.apply(colorize, axis=1)
-
- st.dataframe(
- styled,
- use_container_width=True,
- hide_index=True,
- height=700, # ⇐ affiche ~20 lignes sans scroller
- column_config={
- "DateExpiration": st.column_config.DateColumn("DateExpiration", format="YYYY-MM-DD"),
- "Jours_restant": st.column_config.NumberColumn("Jours restants", help="Négatif = périmé"),
- },
- )
- st.caption(f"{len(df)} utilisateur(s) affiché(s)")
+ if grant_all:
+ cur.execute("""
+ INSERT IGNORE INTO DroitsSites (NomUtilisateur, Site, DSN)
+ SELECT %s, c.BDD, c.DSN FROM Connexions c
+ """, (username,))
else:
- st.info("Aucun utilisateur à afficher.")
- except Exception as e:
- st.warning(f"Impossible de lister les utilisateurs : {e}")
+ if not dsns:
+ return
+ cur.executemany("""
+ INSERT IGNORE INTO DroitsSites (NomUtilisateur, Site, DSN)
+ SELECT %s, c.BDD, c.DSN FROM Connexions c WHERE c.DSN=%s
+ """, [(username, d) for d in dsns])
+ finally:
+ cur.close()
-# -----------------------
-# ONGLET CREER
-# -----------------------
+def replace_user_sites(cnx, username: str, dsns: Optional[List[str]], grant_all: bool = False) -> None:
+ """Remplace complètement les droits de l'utilisateur par la nouvelle sélection."""
+ cur = cnx.cursor()
+ try:
+ cur.execute("DELETE FROM DroitsSites WHERE NomUtilisateur=%s", (username,))
+ finally:
+ cur.close()
+ grant_sites_to_user(cnx, username, dsns, grant_all)
+
+
+# ---------------------------
+# UI Streamlit
+# ---------------------------
+st.set_page_config(page_title="Gestion des utilisateurs", page_icon="👤", layout="centered")
+st.title("👤 Gestion des utilisateurs")
+
+tabs = st.tabs(["Créer", "Modifier", "Lister / Supprimer"])
+
+# --------- Onglet CRÉER ----------
+with tabs[0]:
+ st.subheader("Créer un utilisateur")
-with tab_create:
with st.form("create_user_form", clear_on_submit=False):
- st.subheader("Nouveau compte")
+ c1, c2 = st.columns([1.2, 1.8])
+ username = c1.text_input("NomUtilisateur", placeholder="ex: michel")
+ full_name = c2.text_input("Nom_complet", placeholder="Michel DUPONT")
- c1, c2, c3 = st.columns([1.2, 1.5, 1])
- username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier")
- full_name = c2.text_input("Nom_complet", placeholder="Clément JAQUIER")
- site = c3.text_input("Site", placeholder="Roissy", value="Roissy")
- role = st.selectbox("Rôle", ["Utilisateur", "Administrateur"], index=0)
+ role = st.selectbox("Rôle", ["utilisateur", "commercial", "administrateur", "admin"], index=0)
+
+ # Sélection des sites
+ labels, label_to_dsn = [], {}
+ try:
+ cnx = pool.get_connection()
+ try:
+ rows = list_all_sites(cnx) # [(dsn,bdd)]
+ finally:
+ cnx.close()
+ labels = [f"{bdd} ({dsn})" for dsn, bdd in rows]
+ label_to_dsn = {lbl: dsn for (dsn, bdd), lbl in zip(rows, labels)}
+ except Exception as e:
+ st.warning(f"Chargement des sites impossible : {e}")
+
+ colL, colR = st.columns([2, 1])
+ with colL:
+ selected_labels = st.multiselect("Sites autorisés", labels, help="Choisis un ou plusieurs sites")
+ with colR:
+ grant_all_sites = st.checkbox("Tous les sites", value=False)
+
+ # Si administrateur → tous les sites
+ if role.lower().startswith("admin"):
+ grant_all_sites = True
c4, c5, c6 = st.columns([1.4, 1, 1])
email = c4.text_input("email", placeholder="prenom.nom@domaine.com")
@@ -400,206 +273,191 @@ with tab_create:
expires = c6.date_input("DateExpiration", value=date.today())
c7, c8 = st.columns(2)
- password = c7.text_input("Mot de passe", type="password")
- password2 = c8.text_input("Confirmer", type="password")
-
- col_cb1, col_cb2 = st.columns([1.2, 1])
- notify_welcome = col_cb2.checkbox("Envoyer un e-mail de bienvenue", value=True,
- help="Enverra l'identifiant, le nom, le site et le mot de passe en clair")
+ password = c7.text_input("Mot de passe", type="password")
+ password2 = c8.text_input("Confirmer", type="password")
submitted = st.form_submit_button("Créer l'utilisateur", use_container_width=True)
if submitted:
- if not username or not full_name or not site or not email:
+ if not username or not full_name or not email:
st.error("Champs requis manquants.")
elif not EMAIL_RE.match(email):
st.error("Format d’e-mail invalide.")
+ elif not password:
+ st.error("Veuillez saisir un mot de passe.")
elif password != password2:
st.error("Les mots de passe ne correspondent pas.")
else:
try:
cnx = pool.get_connection()
try:
- # 🔎 avertir si l'e-mail existe déjà (mais on n'empêche pas)
- dup = find_users_by_email(cnx, email)
- if dup:
- liste = ", ".join(f"{u['NomUtilisateur']}@{u['Site']}" for u in dup)
- st.info(f"Cet e-mail est déjà utilisé par : {liste}")
+ # Hash
+ pwd_hash = hash_password(password)
- pwd_hash = insert_user(
- cnx, username=username, full_name=full_name, site=site,
- password=password, expires=expires, phone=phone, email=email, role=role
+ # Option 'site_legacy' : garde vide (schéma en transition)
+ insert_user(
+ cnx,
+ username=username,
+ full_name=full_name,
+ pwd_hash=pwd_hash,
+ expires=expires,
+ phone=phone,
+ email=email,
+ role=role,
+ site_legacy=None,
)
+
+ # Droits
+ chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels] if selected_labels else []
+ grant_sites_to_user(cnx, username=username, dsns=chosen_dsns, grant_all=grant_all_sites)
finally:
cnx.close()
- st.success("Utilisateur créé avec succès ✅")
- st.caption("Hash (MotDePasseHash) :")
+ st.success("Utilisateur créé ✅")
+ st.caption("Hash enregistré (MotDePasseHash) :")
st.code(pwd_hash)
-
- # ✉️ Mail de bienvenue (optionnel)
- if notify_welcome:
- try:
- subj = f"[Compte créé] Vos accès — {site}"
- body_txt = (
- "Bonjour,\n\n"
- "Votre compte a été créé.\n\n"
- f"Nom d’utilisateur : {username}\n"
- f"Nom complet : {full_name}\n"
- f"Site : {site}\n"
- f"Mot de passe : {password}\n"
- f"Date d'expiration: {expires.strftime('%Y-%m-%d')}\n\n"
- "Cordialement."
- )
- body_html = f"""
-
- Bonjour,
- Votre compte a été créé.
-
- | Nom d’utilisateur | {username} |
- | Nom complet | {full_name} |
- | Site | {site} |
- | Mot de passe | {password} |
- | Date d'expiration | {expires.strftime('%Y-%m-%d')} |
-
- Cordialement.
-
- """
- send_mail(email, subj, body_txt, body_html)
- st.success(f"✉️ E-mail de bienvenue envoyé à {email}")
- except Exception as e_mail:
- st.warning(f"E-mail non envoyé : {e_mail}")
-
except mysql.connector.Error as db_err:
- if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR:
- st.error("Identifiants MySQL invalides.")
- else:
- st.error(f"Erreur MySQL : {db_err}")
+ st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
st.error(f"Erreur : {e}")
-# -----------------------
-# ONGLET MODIFIER
-# -----------------------
+# --------- Onglet MODIFIER ----------
+with tabs[1]:
+ st.subheader("Modifier un utilisateur")
-with tab_edit:
- st.subheader("Modifier un utilisateur existant")
+ usernames = []
try:
cnx = pool.get_connection()
try:
- users = list_users(cnx, limit=1000)
+ users = list_users(cnx)
finally:
cnx.close()
usernames = [u["NomUtilisateur"] for u in users]
except Exception as e:
- users = []
- usernames = []
- st.warning(f"Impossible de charger la liste des utilisateurs : {e}")
+ st.warning(f"Chargement des utilisateurs impossible : {e}")
- top_left, top_right = st.columns([1.2, 2])
- with top_left:
- sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur")
- field = st.selectbox(
- "Champ à modifier",
- ["Nom_complet", "Site", "DateExpiration", "Telephone", "email"]
- )
-
- with top_right:
- if field == "DateExpiration":
- new_value = st.date_input("Nouvelle valeur", value=date.today())
- else:
- new_value = st.text_input("Nouvelle valeur")
- notify_user = st.checkbox("Notifier l’utilisateur par e-mail", value=True, help="Envoie un e-mail si coché")
- update_btn = st.button("Mettre à jour", disabled=not sel_user, use_container_width=True)
- if update_btn and sel_user:
+ target = st.selectbox("Choisir l'utilisateur", [""] + usernames, index=0)
+ if target:
try:
cnx = pool.get_connection()
try:
- # 1) Lire email + ancienne valeur
- to_email, old_value = get_user_email_and_field(cnx, sel_user, field)
-
- # 2) Appliquer la mise à jour
- update_field(cnx, sel_user, field, new_value)
+ u = get_user(cnx, target)
+ rows = list_all_sites(cnx) # [(dsn,bdd)]
+ current_dsns = set(get_user_sites(cnx, target))
finally:
cnx.close()
- st.success(f"✅ {field} mis à jour pour {sel_user}")
+ if not u:
+ st.error("Utilisateur introuvable.")
+ else:
+ labels = [f"{bdd} ({dsn})" for dsn, bdd in rows]
+ label_to_dsn = {lbl: dsn for (dsn, bdd), lbl in zip(rows, labels)}
+ # pré-sélection
+ preselected = [lbl for lbl in labels if label_to_dsn[lbl] in current_dsns]
- # 3) Notification mail si demandé
- if notify_user:
- try:
- nv = new_value.strftime("%Y-%m-%d") if hasattr(new_value, "strftime") else new_value
- ov = old_value.strftime("%Y-%m-%d") if hasattr(old_value, "strftime") else old_value
+ with st.form("edit_user_form", clear_on_submit=False):
+ c1, c2 = st.columns([1.2, 1.8])
+ full_name = c2.text_input("Nom_complet", value=u["Nom_complet"] or "")
+ role = c1.selectbox("Rôle", ["utilisateur", "commercial", "administrateur", "admin"],
+ index=0 if (u["role"] or "").lower() not in ["commercial", "administrateur", "admin"]
+ else ["utilisateur","commercial","administrateur","admin"].index((u["role"] or "").lower()))
+ c4, c5, c6 = st.columns([1.4, 1, 1])
+ email = c4.text_input("email", value=u["email"] or "")
+ phone = c5.text_input("Téléphone", value=u["Telephone"] or "")
+ exp_val = u["DateExpiration"].date() if isinstance(u["DateExpiration"], datetime) else (u["DateExpiration"] or date.today())
+ expires = c6.date_input("DateExpiration", value=exp_val)
- if not to_email:
- st.info("ℹ️ Aucune notification envoyée : adresse e-mail manquante.")
+ st.markdown("**Droits d’accès aux sites**")
+ colL, colR = st.columns([2, 1])
+ with colL:
+ selected_labels = st.multiselect("Sites autorisés", labels, default=preselected)
+ with colR:
+ grant_all_sites = st.checkbox("Tous les sites", value=False)
+
+ st.divider()
+ cpass1, cpass2 = st.columns(2)
+ new_pass = cpass1.text_input("Nouveau mot de passe (optionnel)", type="password")
+ new_pass2 = cpass2.text_input("Confirmer", type="password")
+
+ cbtn1, cbtn2, cbtn3 = st.columns([1.2,1,1])
+ save_btn = cbtn1.form_submit_button("Enregistrer")
+ reset_btn = cbtn2.form_submit_button("Réinitialiser mot de passe")
+ replace_btn = cbtn3.form_submit_button("Remplacer droits")
+
+ if save_btn:
+ if not EMAIL_RE.match(email):
+ st.error("Format d’e-mail invalide.")
else:
- subject = f"[Compte] Mise à jour de votre information : {field}"
- body_txt = (
- f"Bonjour,\n\n"
- f"Votre information '{field}' vient d’être mise à jour par l’administrateur.\n"
- f"Ancienne valeur : {ov}\n"
- f"Nouvelle valeur : {nv}\n\n"
- f"Si vous n’êtes pas à l’origine de cette demande, répondez à cet e-mail.\n"
- f"Cordialement."
- )
- body_html = f"""
-
- Bonjour,
- Votre information {field} vient d’être mise à jour par l’administrateur.
-
- - Ancienne valeur : {ov}
- - Nouvelle valeur : {nv}
-
- Si vous n’êtes pas à l’origine de cette demande, répondez à cet e-mail.
- Cordialement.
-
- """
- send_mail(to_email, subject, body_txt, body_html)
- st.success(f"✉️ Notification envoyée à {to_email}")
- except Exception as e_mail:
- st.warning(f"Notification non envoyée : {e_mail}")
+ try:
+ cnx = pool.get_connection()
+ try:
+ update_user_core(cnx, target, full_name, expires, phone, email, role)
+ # met à jour droits (ajout complémentaire, sans retirer)
+ chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels]
+ grant_sites_to_user(cnx, target, chosen_dsns, grant_all_sites)
+ finally:
+ cnx.close()
+ st.success("Modifications enregistrées ✅")
+ except Exception as e:
+ st.error(f"Erreur : {e}")
+
+ if reset_btn:
+ if not new_pass:
+ st.error("Saisis un nouveau mot de passe.")
+ elif new_pass != new_pass2:
+ st.error("Les mots de passe ne correspondent pas.")
+ else:
+ try:
+ cnx = pool.get_connection()
+ try:
+ update_user_password(cnx, target, hash_password(new_pass))
+ finally:
+ cnx.close()
+ st.success("Mot de passe réinitialisé ✅")
+ except Exception as e:
+ st.error(f"Erreur : {e}")
+
+ if replace_btn:
+ try:
+ cnx = pool.get_connection()
+ try:
+ chosen_dsns = [label_to_dsn[lbl] for lbl in selected_labels]
+ replace_user_sites(cnx, target, chosen_dsns, grant_all_sites)
+ finally:
+ cnx.close()
+ st.success("Droits remplacés ✅")
+ except Exception as e:
+ st.error(f"Erreur : {e}")
- except mysql.connector.Error as db_err:
- st.error(f"Erreur MySQL : {db_err}")
except Exception as e:
- st.error(f"Erreur : {e}")
-# -----------------------
-# ONGLET SECURITE
-# -----------------------
+ st.error(f"Erreur de chargement : {e}")
-with tab_security:
- st.subheader("Réinitialiser le mot de passe (bcrypt)")
+# --------- Onglet LISTE / SUPPR ----------
+with tabs[2]:
+ st.subheader("Liste des utilisateurs")
try:
- if 'user_cache_for_pw' not in st.session_state:
+ cnx = pool.get_connection()
+ try:
+ data = list_users(cnx)
+ finally:
+ cnx.close()
+ if not data:
+ st.info("Aucun utilisateur.")
+ else:
+ st.dataframe(data, use_container_width=True, hide_index=True)
+ except Exception as e:
+ st.error(f"Erreur : {e}")
+
+ st.divider()
+ st.subheader("Supprimer un utilisateur")
+ del_user = st.text_input("NomUtilisateur à supprimer", value="")
+ if st.button("Supprimer", type="primary", disabled=(not del_user)):
+ try:
cnx = pool.get_connection()
try:
- st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)]
+ delete_user(cnx, del_user)
finally:
cnx.close()
- pw_user_list = st.session_state.user_cache_for_pw
- except Exception:
- pw_user_list = []
-
- user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur")
- colp1, colp2 = st.columns(2)
- new_pw = colp1.text_input("Nouveau mot de passe", type="password")
- new_pw2 = colp2.text_input("Confirmer", type="password")
-
- if st.button("Mettre à jour le mot de passe", disabled=not user_pw, use_container_width=True):
- if not new_pw:
- st.error("Mot de passe vide.")
- elif new_pw != new_pw2:
- st.error("Les mots de passe ne correspondent pas.")
- else:
- try:
- cnx = pool.get_connection()
- try:
- h = update_password(cnx, user_pw, new_pw)
- finally:
- cnx.close()
- st.success("Mot de passe mis à jour ✅")
- st.caption("Hash (MotDePasseHash) :")
- st.code(h)
- except Exception as e:
- st.error(f"Erreur : {e}")
\ No newline at end of file
+ st.success(f"Utilisateur '{del_user}' supprimé ✅")
+ except Exception as e:
+ st.error(f"Erreur : {e}")
diff --git a/requirements.txt b/requirements.txt
index 6487b8c..c240005 100644
Binary files a/requirements.txt and b/requirements.txt differ