Skip to content
Snippets Groups Projects
Commit 52c05dcd authored by Gilares Paul's avatar Gilares Paul
Browse files

Edit 6-score_sigmoid.py

parent 593228de
Branches
No related tags found
No related merge requests found
# Calcul des scores de qualité de l'eau en fonction des différents paramètres.
import pandas as pd
import numpy as np
def normaliser_score_sigmoid(valeur, valeur_max, inverse=False):
"""Normalise une valeur en utilisant une fonction sigmoïde pour lisser la transition."""
if pd.isna(valeur):
return 1
if valeur_max == 0:
......@@ -12,10 +13,8 @@ def normaliser_score_sigmoid(valeur, valeur_max, inverse=False):
return score if inverse else 1 - score
def calculer_scores_eau(fichier_csv, date):
"""
Charge un fichier CSV contenant les valeurs mesurées et max, puis calcule les scores par prélèvement.
"""
df = pd.read_csv(fichier_csv, low_memory=False) # Ajout de low_memory=False pour éviter les warnings
df = pd.read_csv(fichier_csv, low_memory=False)
# Définition des codes paramètres et seuils réglementaires (ARS et/ou OMS)
seuils = {
......@@ -31,7 +30,6 @@ def calculer_scores_eau(fichier_csv, date):
6276.0: 0.5 # Pesticides
}
# Transformation du pH en score spécifique (distance à la plage optimale)
def score_pH(valeur):
if pd.isna(valeur):
return 1
......@@ -41,37 +39,22 @@ def calculer_scores_eau(fichier_csv, date):
return max(0, 1 - min(abs(valeur - min_pH), abs(valeur - max_pH)) / (max_pH - min_pH))
df = df.dropna(subset=["rsana"])
# Vérification que df n'est pas vide après suppression des NaN
if df.empty:
raise ValueError("Le DataFrame est vide après suppression des NaN. Vérifiez les données !")
# Calcul de la moyenne des paramètres pour chaque cdpointsurv
df_moyenne = df.groupby(["cdpointsurv", "cdparametre"])["rsana"].mean().reset_index()
# Pivot pour obtenir une seule ligne par cdpointsurv avec les valeurs moyennes des paramètres
#df_pivot = df_moyenne.pivot(index="cdpointsurv", columns="cdparametre", values="rsana").fillna(0)
df_pivot = df_moyenne.pivot(index="cdpointsurv", columns="cdparametre", values="rsana")
df_moyenne = df.groupby(["cdpointsurv", "cdparametre"])["rsana"].mean().reset_index() # Calcul de la moyenne des paramètres pour chaque cdpointsurv
df_pivot = df_moyenne.pivot(index="cdpointsurv", columns="cdparametre", values="rsana") # Pivot pour garder une seule ligne par cdpointsurv avec les valeurs moyennes des paramètres
print("Nombre de lignes après pivot :", len(df_pivot))
# Compter les lignes avec des valeurs manquantes
nb_lignes_manquantes = df_pivot.isna().any(axis=1).sum()
nb_lignes_manquantes = df_pivot.isna().any(axis=1).sum() # Compter les lignes avec des valeurs manquantes
print(f"Nombre de lignes avec des valeurs manquantes avant remplacement par 0 : {nb_lignes_manquantes}")
# Remplacer les valeurs manquantes par 0
df_pivot = df_pivot.fillna(0)
df_pivot = df_pivot.fillna(0) # Remplacer les valeurs manquantes par 0 : choix arbitraire : si pas de valeur mesurée alors on prend une valeur nulle
print("df_pivot après remplacement des NaN :", df_pivot.shape)
print("df_pivot après suppression des NaN :", df_pivot.shape)
# Calcul des scores pour chaque paramètre
# Calcul des scores
df_pivot["score_pH"] = df_pivot[1302.0].apply(score_pH)
df_pivot["score_Chlore"] = df_pivot[1398.0].apply(lambda x: normaliser_score_sigmoid(x, seuils[1398.0]))
df_pivot["score_Pesticides"] = df_pivot[6276.0].apply(lambda x: normaliser_score_sigmoid(x, seuils[6276.0]))
# Score combiné pour Nitrites & Nitrates (pondération selon leur toxicité)
# Score combiné pour Nitrites & Nitrates
poids_nitrites = 10 # Les nitrites sont environ 10x plus toxiques que les nitrates
poids_nitrates = 1
df_pivot["score_Nitrites_Nitrates"] = df_pivot[[1339.0, 1340.0]].apply(
......@@ -79,7 +62,7 @@ def calculer_scores_eau(fichier_csv, date):
poids_nitrates * normaliser_score_sigmoid(row[1340.0], seuils[1340.0])) /
(poids_nitrites + poids_nitrates), axis=1)
# Score des métaux lourds (pondération selon leur toxicité et seuils réglementaires)
# Score combiné des métaux lourds
poids_metaux = {
1382.0: 3, # Plomb
1386.0: 1, # Nickel
......@@ -91,10 +74,10 @@ def calculer_scores_eau(fichier_csv, date):
lambda row: sum(poids_metaux[param] * normaliser_score_sigmoid(row[param], seuils[param]) for param in poids_metaux) /
sum(poids_metaux.values()), axis=1)
# ✅ Ajout du score global (pondération cohérente des scores)
# Score global
poids_scores = {
"score_pH": 1.5, # Pondération modérément élevée
"score_Chlore": 1, # Pondération modérée
"score_pH": 1.5, # Pondération modérée
"score_Chlore": 1, # Pondération faible
"score_Nitrites_Nitrates": 2, # Pondération élevée
"score_Metaux_Lourds": 3, # Pondération très élevée
"score_Pesticides": 2.5 # Pondération élevée
......@@ -104,27 +87,28 @@ def calculer_scores_eau(fichier_csv, date):
lambda row: sum(poids_scores[score] * row[score] for score in poids_scores) /
sum(poids_scores.values()), axis=1)
# Sélectionner les colonnes d'informations générales et enlever les doublons
colonnes_info = ["cddept_x", "inseecommune", "nomcommune", "cdreseau", "cdpointsurv", "nompointsurv", "coord_x", "coord_y"]
df_info = df[colonnes_info].drop_duplicates(subset=["cdpointsurv"])
# Fusionner les informations descriptives avec les moyennes des paramètres
df_final = df_info.merge(df_pivot, on="cdpointsurv", how="left")
# Enregistrer le fichier CSV
colonnes_info = ["cddept_x", "nomcommune", "inseecommune", "nompointsurv", "cdpointsurv", "coord_x", "coord_y"] # Tri des colonnes réellement utiles finalement pour l'application (les colonnes filtrées avant nous ont servi à garder une visibilité sur les données et ce qu'on faisait)
df_info = df[colonnes_info].drop_duplicates(subset=["cdpointsurv"]) # Suppression des lignes en trop : une seule par point de surveillance
df_joint = df_info.merge(df_pivot, on="cdpointsurv", how="left") # Jointure finale : infos point de surveillance et scores
# Liste finale des colonnes
columns_to_keep = [
"cddept_x", "nomcommune", "inseecommune", "nompointsurv", "cdpointsurv", "coord_x", "coord_y",
"score_pH", "score_Chlore", "score_Pesticides", "score_Nitrites_Nitrates",
"score_Metaux_Lourds", "score_global"
]
df_final = df_joint[columns_to_keep]
df_final.to_csv(f"data/sigmoid_scores_{date}.csv", index=False)
print(f"fichier {date} créé avec succès !")
print(f"Fichier avec scores pour {date} créé")
return df_final
# Lancer le calcul des scores
dates = ["20" + str(i).zfill(2) for i in range(18, 25)]
for date in dates:
print(f"Processing data for {date}")
fichier = f"data/raw/Final{date}.csv" # modifier année
print(f"Traitement pour {date}")
fichier = f"data/raw/Final{date}.csv"
df_scores = calculer_scores_eau(fichier, date)
print(df_scores.head())
\ No newline at end of file
# print(df_scores.head()) # Pour voir à quoi ressemble les premières lignes
\ No newline at end of file
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment