diff --git a/.env b/.env index 2ad1ef7..cfb3bd0 100644 --- a/.env +++ b/.env @@ -1,5 +1,5 @@ # connexion mysql -DB_HOST=192.168.1.100 +DB_HOST=162.19.78.131 DB_USER=sondes DB_PASS=TX.)-U1!zq5Axdk4 DB_NAME=Sondes @@ -10,9 +10,17 @@ DB_NAME2=Acces AUTH_USERS=[{"user":"Michel","pass":"210462"}] +# --- Auth admin de l’app users --- +ADMIN_USER=Michel +DB_USER3=excel +DB_PASS3='%n#%3Lay1MPa$%kR^5@' +DB_NAME3=Acces +ADMIN_PASSWORD=Gabrielle +ADMIN_PASS_HASH='$2b$12$Dgv7jNLJuR.3hQminSVE9OP6hCSmW4nISArR3HF5LTPGFK0Zw29N2' + # MQTT -MQTT_HOST=192.168.1.100 -MQTT_USER=Sondes +MQTT_HOST=162.19.78.131 +MQTT_USER=sondes MQTT_PASS=3J@bjYP0 MQTT_PORT=1883 diff --git a/.idea/Gestion sondes.iml b/.idea/Gestion sondes.iml index f2c5810..5431a55 100644 --- a/.idea/Gestion sondes.iml +++ b/.idea/Gestion sondes.iml @@ -3,6 +3,7 @@ + diff --git a/Outils/asset/logo.png b/Outils/asset/logo.png new file mode 100644 index 0000000..6aa2d63 Binary files /dev/null and b/Outils/asset/logo.png differ diff --git a/Outils/users.py b/Outils/users.py new file mode 100644 index 0000000..0ef93bd --- /dev/null +++ b/Outils/users.py @@ -0,0 +1,637 @@ +# Streamlit app +import os +from datetime import date, datetime +import re +import bcrypt +import mysql.connector +from mysql.connector import pooling, errorcode +import pandas as pd +import streamlit as st +from dotenv import load_dotenv +import smtplib +from email.message import EmailMessage +from email.utils import formatdate +from PIL import Image +from pathlib import Path +import streamlit as st + +st.set_page_config( + page_title="Acces.Utilisateurs", + page_icon="👤", + layout="wide", + initial_sidebar_state="collapsed" +) + + +BASE_DIR = Path(__file__).resolve().parent +LOGO_PATH = BASE_DIR / "asset" / "Logo.png" + +if LOGO_PATH.is_file(): + logo = Image.open(LOGO_PATH) + + c1, c2, c3 = st.columns([1, 2, 1]) + with c2: + st.image(logo, width=160) +else: + st.warning(f"Logo non trouvé : {LOGO_PATH}") + +load_dotenv() +SMTP_HOST = os.getenv("SMTP_HOST", "ssl0.ovh.net") +SMTP_PORT = int(os.getenv("SMTP_PORT", "465")) # OVH: 465 (SSL) ou 587 (STARTTLS) +SMTP_USER = os.getenv("SMTP_USER") +SMTP_PASS = os.getenv("SMTP_PASS") +SMTP_FROM = os.getenv("SMTP_FROM", SMTP_USER) + +# ----------------------- +# Auth minimale +# ----------------------- +def require_login(): + st.markdown( + "", + unsafe_allow_html=True, + ) + admin_user = os.getenv("ADMIN_USER") + admin_hash = os.getenv("ADMIN_PASS_HASH") + if not admin_user or not admin_hash: + st.error("Variables ADMIN_USER et/ou ADMIN_PASS_HASH manquantes dans .env") + st.stop() + + if "auth_ok" not in st.session_state: + st.session_state.auth_ok = False + + if not st.session_state.auth_ok: + col1, col2, col3 = st.columns([1, 2, 1]) + with col2: + st.header("🔐 Accès restreint") + u = st.text_input("Utilisateur") + p = st.text_input("Mot de passe", type="password") + if st.button("Se connecter", width="stretch"): + try: + ok = (u == admin_user) and bcrypt.checkpw(p.encode(), admin_hash.encode()) + except Exception: + ok = False + if ok: + st.session_state.auth_ok = True + st.rerun() + else: + st.error("Identifiants invalides.") + st.stop() +def logout_button(): + st.markdown( + """ + + """, + unsafe_allow_html=True + ) + st.markdown('
', unsafe_allow_html=True) + if st.button("🚪 Quitter", width="content"): + st.session_state.clear() + st.success("Déconnexion effectuée.") + st.rerun() + st.markdown('
', unsafe_allow_html=True) + + +require_login() +logout_button() + +# ----------------------- +# Connexion MySQL via pool +# ----------------------- +@st.cache_resource +def get_pool(): + host = os.getenv("DB_HOST") + port = int(os.getenv("MYSQL_PORT", "3306")) # ✅ valeur par défaut 3306 + user = os.getenv("DB_USER2") + pwd = os.getenv("DB_PASS2") + db = os.getenv("DB_NAME2") + + # ✅ contrôle des variables indispensables + missing = [k for k, v in { + "DB_HOST": host, + "DB_USER3": user, + "DB_PASS3": pwd, + "DB_NAME3": db, + }.items() if v in (None, "")] + if missing: + raise RuntimeError(f"Variables manquantes dans .env : {', '.join(missing)}") + + return pooling.MySQLConnectionPool( + pool_name="users_pool", + pool_size=5, + pool_reset_session=True, + host=host, + port=port, + user=user, + password=pwd, + database=db, + autocommit=True, + ) + +# ----------------------- +# Helpers SQL + validations +# ----------------------- +EMAIL_RE = re.compile(r"^[^@\s]+@[^@\s]+\.[^@\s]+$") +PHONE_RE = re.compile(r"\d{10,14}") + +def normalize_phone(p: str|None) -> str|None: + if not p: + return None + digits = re.sub(r"\D", "", p) + return digits if PHONE_RE.match(digits) else None + +def to_sql_date(d: date | str | None) -> str | None: + if d is None: + return None + if isinstance(d, str): + try: + d = datetime.fromisoformat(d).date() + except Exception: + return None + return d.strftime("%Y-%m-%d") + +def hash_password(plain: str, rounds: int = 12) -> str: + salt = bcrypt.gensalt(rounds=rounds) + return bcrypt.hashpw(plain.encode("utf-8"), salt).decode("utf-8") + +def user_exists(cur, username: str) -> bool: + cur.execute("SELECT COUNT(*) FROM Acces.Utilisateurs WHERE NomUtilisateur=%s", (username,)) + (count,) = cur.fetchone() + return count > 0 +def find_users_by_email(cnx, email: str): + cur = cnx.cursor(dictionary=True) + try: + cur.execute( + "SELECT NomUtilisateur, Site FROM Acces.Utilisateurs WHERE email=%s ORDER BY NomUtilisateur", + (email,), + ) + return cur.fetchall() + finally: + cur.close() +def list_users(cnx, limit: int = 500, include_password=False): + fields = ["NomUtilisateur", "Nom_complet", "Site", "DateExpiration", "Telephone", "email"] + if include_password: + fields.append("MotDePasse") + sql = f"SELECT {', '.join(fields)} FROM Utilisateurs ORDER BY NomUtilisateur LIMIT %s" + + cur = cnx.cursor(dictionary=True) + try: + cur.execute(sql, (limit,)) + return cur.fetchall() + finally: + cur.close() + +def insert_user(cnx, username, full_name, site, password, expires, phone, email, role): + if not EMAIL_RE.match(email): + raise ValueError("Email invalide.") + phone_norm = normalize_phone(phone) + exp_sql = to_sql_date(expires) + pwd_hash = hash_password(password) + + cur = cnx.cursor() + try: + if user_exists(cur, username): + raise RuntimeError("Nom d'utilisateur déjà existant.") + cur.execute( + """ + INSERT INTO Acces.Utilisateurs + (NomUtilisateur, Nom_complet, Site, MotDePasse, MotDePasseHash, DateExpiration, Telephone, email, role) + VALUES (%s,%s,%s,%s,%s,%s,%s,%s,%s) + """, + (username, full_name, site, password, pwd_hash, exp_sql, phone_norm, email, role), + ) + return pwd_hash + finally: + cur.close() + + + +def get_user_details(cnx, username: str): + cur = cnx.cursor(dictionary=True) + try: + cur.execute( + """ + SELECT NomUtilisateur, Nom_complet, Site, DateExpiration, Telephone, email + FROM Acces.Utilisateurs + WHERE NomUtilisateur=%s + """, + (username,), + ) + return cur.fetchone() + finally: + cur.close() + +def update_field(cnx, username: str, field: str, value): + allowed = { + "Nom_complet": "Nom_complet", + "Site": "Site", + "DateExpiration": "DateExpiration", + "Telephone": "Telephone", + "email": "email", + } + if field not in allowed: + raise ValueError("Champ non autorisé.") + sql_field = allowed[field] + + if field == "email": + if not EMAIL_RE.match(str(value)): + raise ValueError("Email invalide.") + if field == "Telephone": + value = normalize_phone(str(value)) if value else None + if field == "DateExpiration": + value = to_sql_date(value) + + cur = cnx.cursor() + try: + cur.execute( + f"UPDATE Utilisateurs SET {sql_field}=%s WHERE NomUtilisateur=%s", + (value, username), + ) + finally: + cur.close() + +def update_password(cnx, username: str, new_password: str): + pwd_hash = hash_password(new_password) + cur = cnx.cursor() + try: + cur.execute( + "UPDATE Acces.Utilisateurs SET MotDePasse=NULL, MotDePasseHash=%s WHERE NomUtilisateur=%s", + (pwd_hash, username), + ) + return pwd_hash + finally: + cur.close() + +def send_mail(to_email: str, subject: str, body_text: str, body_html: str | None = None): + if not (SMTP_HOST and SMTP_PORT and SMTP_USER and SMTP_PASS and SMTP_FROM): + raise RuntimeError("Configuration SMTP incomplète (SMTP_HOST/PORT/USER/PASS/FROM).") + if not to_email: + raise ValueError("Destinataire vide.") + + msg = EmailMessage() + msg["From"] = SMTP_FROM + msg["To"] = to_email + msg["Date"] = formatdate(localtime=True) + msg["Subject"] = subject + msg.set_content(body_text) + if body_html: + msg.add_alternative(body_html, subtype="html") + + # Choix du protocole en fonction du port + if SMTP_PORT == 465: + with smtplib.SMTP_SSL(SMTP_HOST, SMTP_PORT, timeout=20) as s: + s.login(SMTP_USER, SMTP_PASS) + s.send_message(msg) + else: + # 587 (ou autre) : STARTTLS + with smtplib.SMTP(SMTP_HOST, SMTP_PORT, timeout=20) as s: + s.ehlo() + s.starttls() # passe en TLS + s.ehlo() + s.login(SMTP_USER, SMTP_PASS) + s.send_message(msg) + + def get_user_email_and_field(cnx, username: str, field: str): + cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"} + if field not in cols: + field = "Nom_complet" # fallback + with cnx.cursor(dictionary=True) as cur: + cur.execute( + f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s", + (username,) + ) + row = cur.fetchone() + return (row.get("email") if row else None, row.get("field_value") if row else None) + +def get_user_email_and_field(cnx, username: str, field: str): + cols = {"email", "Nom_complet", "Site", "DateExpiration", "Telephone"} + if field not in cols: + field = "Nom_complet" # fallback + + cur = cnx.cursor(dictionary=True) + try: + cur.execute( + f"SELECT email, {field} AS field_value FROM Utilisateurs WHERE NomUtilisateur=%s", + (username,), + ) + row = cur.fetchone() + return (row.get("email") if row else None, + row.get("field_value") if row else None) + finally: + cur.close() +# ----------------------- +# UI +# ----------------------- +st.title("Gestion des utilisateurs") +try: + pool = get_pool() +except Exception as e: + st.error(f"❌ Impossible d'initialiser le pool MySQL : {e}") + st.stop() +tab_list, tab_create, tab_edit, tab_security = st.tabs(["Liste", "Créer", "Modifier", "Sécurité"]) + +# ----------------------- +# ONGLET LISTE +# ----------------------- +with tab_list: + + st.subheader("Utilisateurs") + try: + cnx = pool.get_connection() + try: + include_pw = os.getenv("ADMIN_USER") == st.session_state.get("current_user", os.getenv("ADMIN_USER")) + rows = list_users(cnx, limit=1000, include_password=include_pw) + finally: + cnx.close() + + df = pd.DataFrame(rows) + if not df.empty: + from datetime import date + df["DateExpiration"] = pd.to_datetime(df["DateExpiration"], errors="coerce").dt.date + + # --- Ajout coloration et statut expiration --- + ALERTE_JOURS = 30 + exp = pd.to_datetime(df["DateExpiration"], errors="coerce") + today = pd.Timestamp(date.today()) + + df["Jours_restant"] = (exp - today).dt.days + + def statut_expiration(j): + if pd.isna(j): + return "Inconnu" + j = int(j) + if j < 0: + return "Périmé" + if j <= ALERTE_JOURS: + return f"Bientôt (≤{ALERTE_JOURS}j)" + return "OK" + + df["Statut"] = df["Jours_restant"].apply(statut_expiration) + + c1, c2, c3 = st.columns([1, 1, 3]) + with c1: + only_bad = st.checkbox("📌 Périmés / bientôt", value=False) + with c2: + tri_jours = st.checkbox("🔽 Trier par Jours restants", value=True) + + if only_bad: + df = df[df["Statut"].isin(["Périmé", f"Bientôt (≤{ALERTE_JOURS}j)"])] + + if tri_jours and "Jours_restant" in df.columns: + df = df.sort_values("Jours_restant", na_position="last") + + def colorize(row): + stt = row.get("Statut", "") + n = len(row) + if stt == "Périmé": + return ["background-color:#ffe6e6; color:#b00020;"] * n # rouge clair + if stt.startswith("Bientôt"): + return ["background-color:#fff7e6; color:#8a6d3b;"] * n # orange clair + return [""] * n + + styled = df.style.apply(colorize, axis=1) + + st.dataframe( + styled, + use_container_width=True, + hide_index=True, + height=700, # ⇐ affiche ~20 lignes sans scroller + column_config={ + "DateExpiration": st.column_config.DateColumn("DateExpiration", format="YYYY-MM-DD"), + "Jours_restant": st.column_config.NumberColumn("Jours restants", help="Négatif = périmé"), + }, + ) + st.caption(f"{len(df)} utilisateur(s) affiché(s)") + else: + st.info("Aucun utilisateur à afficher.") + except Exception as e: + st.warning(f"Impossible de lister les utilisateurs : {e}") + +# ----------------------- +# ONGLET CREER +# ----------------------- + +with tab_create: + with st.form("create_user_form", clear_on_submit=False): + st.subheader("Nouveau compte") + + c1, c2, c3 = st.columns([1.2, 1.5, 1]) + username = c1.text_input("NomUtilisateur", placeholder="ex: cjaquier") + full_name = c2.text_input("Nom_complet", placeholder="Clément JAQUIER") + site = c3.text_input("Site", placeholder="Roissy", value="Roissy") + role = st.selectbox("Rôle", ["Utilisateur", "Administrateur"], index=0) + + c4, c5, c6 = st.columns([1.4, 1, 1]) + email = c4.text_input("email", placeholder="prenom.nom@domaine.com") + phone = c5.text_input("Téléphone", placeholder="06 12 12 35 32") + expires = c6.date_input("DateExpiration", value=date.today()) + + c7, c8 = st.columns(2) + password = c7.text_input("Mot de passe", type="password") + password2 = c8.text_input("Confirmer", type="password") + + col_cb1, col_cb2 = st.columns([1.2, 1]) + notify_welcome = col_cb2.checkbox("Envoyer un e-mail de bienvenue", value=True, + help="Enverra l'identifiant, le nom, le site et le mot de passe en clair") + + submitted = st.form_submit_button("Créer l'utilisateur", use_container_width=True) + + if submitted: + if not username or not full_name or not site or not email: + st.error("Champs requis manquants.") + elif not EMAIL_RE.match(email): + st.error("Format d’e-mail invalide.") + elif password != password2: + st.error("Les mots de passe ne correspondent pas.") + else: + try: + cnx = pool.get_connection() + try: + # 🔎 avertir si l'e-mail existe déjà (mais on n'empêche pas) + dup = find_users_by_email(cnx, email) + if dup: + liste = ", ".join(f"{u['NomUtilisateur']}@{u['Site']}" for u in dup) + st.info(f"Cet e-mail est déjà utilisé par : {liste}") + + pwd_hash = insert_user( + cnx, username=username, full_name=full_name, site=site, + password=password, expires=expires, phone=phone, email=email, role=role + ) + finally: + cnx.close() + + st.success("Utilisateur créé avec succès ✅") + st.caption("Hash (MotDePasseHash) :") + st.code(pwd_hash) + + # ✉️ Mail de bienvenue (optionnel) + if notify_welcome: + try: + subj = f"[Compte créé] Vos accès — {site}" + body_txt = ( + "Bonjour,\n\n" + "Votre compte a été créé.\n\n" + f"Nom d’utilisateur : {username}\n" + f"Nom complet : {full_name}\n" + f"Site : {site}\n" + f"Mot de passe : {password}\n" + f"Date d'expiration: {expires.strftime('%Y-%m-%d')}\n\n" + "Cordialement." + ) + body_html = f""" + +

Bonjour,

+

Votre compte a été créé.

+ + + + + + +
Nom d’utilisateur{username}
Nom complet{full_name}
Site{site}
Mot de passe{password}
Date d'expiration{expires.strftime('%Y-%m-%d')}
+

Cordialement.

+ + """ + send_mail(email, subj, body_txt, body_html) + st.success(f"✉️ E-mail de bienvenue envoyé à {email}") + except Exception as e_mail: + st.warning(f"E-mail non envoyé : {e_mail}") + + except mysql.connector.Error as db_err: + if db_err.errno == errorcode.ER_ACCESS_DENIED_ERROR: + st.error("Identifiants MySQL invalides.") + else: + st.error(f"Erreur MySQL : {db_err}") + except Exception as e: + st.error(f"Erreur : {e}") + +# ----------------------- +# ONGLET MODIFIER +# ----------------------- + +with tab_edit: + st.subheader("Modifier un utilisateur existant") + try: + cnx = pool.get_connection() + try: + users = list_users(cnx, limit=1000) + finally: + cnx.close() + usernames = [u["NomUtilisateur"] for u in users] + except Exception as e: + users = [] + usernames = [] + st.warning(f"Impossible de charger la liste des utilisateurs : {e}") + + top_left, top_right = st.columns([1.2, 2]) + with top_left: + sel_user = st.selectbox("Utilisateur", usernames, placeholder="Choisir un utilisateur") + field = st.selectbox( + "Champ à modifier", + ["Nom_complet", "Site", "DateExpiration", "Telephone", "email"] + ) + + with top_right: + if field == "DateExpiration": + new_value = st.date_input("Nouvelle valeur", value=date.today()) + else: + new_value = st.text_input("Nouvelle valeur") + notify_user = st.checkbox("Notifier l’utilisateur par e-mail", value=True, help="Envoie un e-mail si coché") + update_btn = st.button("Mettre à jour", disabled=not sel_user, use_container_width=True) + if update_btn and sel_user: + try: + cnx = pool.get_connection() + try: + # 1) Lire email + ancienne valeur + to_email, old_value = get_user_email_and_field(cnx, sel_user, field) + + # 2) Appliquer la mise à jour + update_field(cnx, sel_user, field, new_value) + finally: + cnx.close() + + st.success(f"✅ {field} mis à jour pour {sel_user}") + + # 3) Notification mail si demandé + if notify_user: + try: + nv = new_value.strftime("%Y-%m-%d") if hasattr(new_value, "strftime") else new_value + ov = old_value.strftime("%Y-%m-%d") if hasattr(old_value, "strftime") else old_value + + if not to_email: + st.info("ℹ️ Aucune notification envoyée : adresse e-mail manquante.") + else: + subject = f"[Compte] Mise à jour de votre information : {field}" + body_txt = ( + f"Bonjour,\n\n" + f"Votre information '{field}' vient d’être mise à jour par l’administrateur.\n" + f"Ancienne valeur : {ov}\n" + f"Nouvelle valeur : {nv}\n\n" + f"Si vous n’êtes pas à l’origine de cette demande, répondez à cet e-mail.\n" + f"Cordialement." + ) + body_html = f""" + +

Bonjour,

+

Votre information {field} vient d’être mise à jour par l’administrateur.

+ +

Si vous n’êtes pas à l’origine de cette demande, répondez à cet e-mail.

+

Cordialement.

+ + """ + send_mail(to_email, subject, body_txt, body_html) + st.success(f"✉️ Notification envoyée à {to_email}") + except Exception as e_mail: + st.warning(f"Notification non envoyée : {e_mail}") + + except mysql.connector.Error as db_err: + st.error(f"Erreur MySQL : {db_err}") + except Exception as e: + st.error(f"Erreur : {e}") +# ----------------------- +# ONGLET SECURITE +# ----------------------- + +with tab_security: + st.subheader("Réinitialiser le mot de passe (bcrypt)") + try: + if 'user_cache_for_pw' not in st.session_state: + cnx = pool.get_connection() + try: + st.session_state.user_cache_for_pw = [u["NomUtilisateur"] for u in list_users(cnx, limit=1000)] + finally: + cnx.close() + pw_user_list = st.session_state.user_cache_for_pw + except Exception: + pw_user_list = [] + + user_pw = st.selectbox("Utilisateur", pw_user_list, key="pw_user", placeholder="Choisir un utilisateur") + colp1, colp2 = st.columns(2) + new_pw = colp1.text_input("Nouveau mot de passe", type="password") + new_pw2 = colp2.text_input("Confirmer", type="password") + + if st.button("Mettre à jour le mot de passe", disabled=not user_pw, use_container_width=True): + if not new_pw: + st.error("Mot de passe vide.") + elif new_pw != new_pw2: + st.error("Les mots de passe ne correspondent pas.") + else: + try: + cnx = pool.get_connection() + try: + h = update_password(cnx, user_pw, new_pw) + finally: + cnx.close() + st.success("Mot de passe mis à jour ✅") + st.caption("Hash (MotDePasseHash) :") + st.code(h) + except Exception as e: + st.error(f"Erreur : {e}") \ No newline at end of file diff --git a/app/gyro_control.py b/app/Gyrophare.py similarity index 100% rename from app/gyro_control.py rename to app/Gyrophare.py diff --git a/scripts/backup_Vpsl.sh b/scripts/backup_mysql.sh similarity index 95% rename from scripts/backup_Vpsl.sh rename to scripts/backup_mysql.sh index 1b53f17..f141f45 100644 --- a/scripts/backup_Vpsl.sh +++ b/scripts/backup_mysql.sh @@ -49,7 +49,7 @@ fi # 2) Dump MySQL echo "🔷 Dump MySQL…" if [[ -f "$MYSQL_DEFAULTS" ]]; then - DUMP="mysqldump --defaults-file=$MYSQL_DEFAULTS --all-databases --single-transaction --quick --lock-tables=false --routines --events --triggers" + DUMP="mysqldump --defaults-extra-file=$MYSQL_DEFAULTS --all-databases --single-transaction --quick --lock-tables=false --routines --events --triggers" else DUMP="mysqldump --all-databases --single-transaction --quick --lock-tables=false --routines --events --triggers" fi