Files
Fichiers_perso/Societe_Generale.py
2025-04-19 11:55:46 +02:00

107 lines
3.7 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
# Insertion de données da l'app SG.
import streamlit as st
import mysql.connector
from dotenv import load_dotenv
import os
from datetime import datetime
load_dotenv()
st.set_page_config(page_title="Insertion Mysql", layout="centered")
st.title("🗓️ Insertion dans une base MySQL")
# Charger les identifiants
DB_HOST = os.getenv("DB_HOST")
DB_USER = os.getenv("DB_USER")
DB_PASSWORD = os.getenv("DB_PASSWORD")
# Connexion sans DB initiale pour lister les bases
@st.cache_data
def get_databases():
conn = mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD
)
cursor = conn.cursor()
cursor.execute("SHOW DATABASES")
bases = [db[0] for db in cursor.fetchall()]
conn.close()
return bases
# Connexion avec DB sélectionnée
def connect_to_db(db_name):
return mysql.connector.connect(
host=DB_HOST,
user=DB_USER,
password=DB_PASSWORD,
database=db_name
)
# --- Interface principale ---
base = st.selectbox("Choisir une base de données", get_databases())
if base:
conn = connect_to_db(base)
cursor = conn.cursor()
cursor.execute("SHOW TABLES")
tables = [table[0] for table in cursor.fetchall()]
table = st.selectbox("Choisir une table", tables)
if table:
cursor.execute(f"DESCRIBE {table}")
colonnes = cursor.fetchall()
# Détection clé primaire
cursor.execute(f"SHOW INDEX FROM {table} WHERE Key_name = 'PRIMARY'")
pk = cursor.fetchone()
primary_key = pk[4] if pk else None
champ_date_candidates = [col[0] for col in colonnes if "date" in col[1].lower()]
champ_date = st.selectbox("Champ de date", champ_date_candidates)
nb_mois = st.number_input("Nombre d'insertions mensuelles", 1, 36, 6)
date_depart = st.date_input("Date de départ des insertions", value=datetime.today().date())
st.subheader("Champs à insérer (hors clé primaire et champ de date)")
champs_choisis = []
champs_editables = [col for col in colonnes if col[0] != primary_key and col[0] != champ_date]
for col in champs_editables:
if st.checkbox(f"{col[0]} ({col[1]})", value=True):
champs_choisis.append(col[0])
with st.form("formulaire_insertion"):
st.markdown("**Valeurs fixes pour les champs cochés**")
valeurs_fixes = {}
for nom in champs_choisis:
val = st.text_input(nom)
valeurs_fixes[nom] = val if val != "" else None
submit = st.form_submit_button("Insérer")
try:
cursor = conn.cursor()
insert_count = 0
for i in range(nb_mois):
year = date_depart.year + (date_depart.month + i - 1) // 12
month = (date_depart.month + i - 1) % 12 + 1
day = min(date_depart.day, 28)
date_cible = datetime(year, month, day).date()
champs = [champ_date] + list(valeurs_fixes.keys())
valeurs = [date_cible] + list(valeurs_fixes.values())
sql = f"INSERT INTO {table} ({', '.join(champs)}) VALUES ({', '.join(['%s'] * len(valeurs))})"
cursor.execute(sql, valeurs)
insert_count += 1
conn.commit()
st.success(f"{insert_count} lignes insérées avec succès dans `{table}`.")
except Exception as e:
st.error(f"❌ Erreur lors de linsertion : {e}")
conn.commit()
st.success(f"{insert_count} lignes insérées avec succès dans `{table}`.")
st.success(f"{insert_count} lignes insérées avec succès.")