import numpy as np
import pandas as pd
from typing import Union
# My local modules
from ..database.ma_db import load_output, load_samples, check_db_consistency
from ..utils.sumstats import recreate_qoi_functions, safe_call_qoi_function
def _reshape_sa_expanded_data(expanded_data: pd.DataFrame, qoi_columns: list) -> pd.DataFrame:
"""Reshape expanded sensitivity analysis data for pivot table analysis.
This function transforms time-series data from long format (multiple rows per sample)
to wide format (columns for each time point) to facilitate statistical analysis.
Args:
expanded_data (pd.DataFrame): DataFrame containing expanded simulation data
with SampleID, ReplicateID, time, and QoI columns.
qoi_columns (list): List of quantity of interest column names to reshape.
Returns:
pd.DataFrame: Reshaped DataFrame with multi-level columns where each QoI and time point becomes a separate column indexed by SampleID and ReplicateID.
Example:
>>> data = pd.DataFrame({
... 'SampleID': [0, 0, 1, 1],
... 'ReplicateID': [0, 0, 0, 0],
... 'time': [0, 1, 0, 1],
... 'cell_count': [100, 150, 80, 120]
... })
>>> reshaped = reshape_sa_expanded_data(data, ['cell_count'])
"""
try:
# Ensure QoI columns are numeric
for qoi in qoi_columns:
expanded_data[qoi] = pd.to_numeric(expanded_data[qoi], errors='coerce')
# Create a unique time_id for each time step
expanded_data['time_id'] = expanded_data.groupby(['SampleID', 'ReplicateID']).cumcount()
# Pivot the DataFrame to create columns for each QoI and time_id
reshaped_data = expanded_data.pivot_table(
index=['SampleID', 'ReplicateID'],
columns='time_id',
values=qoi_columns + ['time']
)
# Flatten the multi-index columns
reshaped_data.columns = [
f"{col[0]}_{int(col[1])}" if col[0] != 'time' else f"time_{int(col[1])}"
for col in reshaped_data.columns
]
reshaped_data.reset_index(inplace=True)
return reshaped_data
except Exception as e:
raise ValueError(f"Error reshaping expanded data: {e}")
[docs]
def mcds_list_to_qoi_df_for_sa(recreated_qoi_funcs, all_sample_ids, chunk_size, db_file, verbose=False) -> pd.DataFrame:
"""Convert a list of MCDS objects to a DataFrame of quantities of interest for sensitivity analysis.
This function processes a list of MCDS simulation results, extracting relevant
quantities of interest (QoIs) at each time point and organizing them into a
structured DataFrame suitable for sensitivity analysis.
Args:
recreated_qoi_funcs (dict): Dictionary of QoI functions where keys are QoI names
and values are callable functions.
all_sample_ids (list): List of all sample IDs to process.
chunk_size (int): Number of samples to process in each chunk to manage memory usage.
db_file (str): Path to the database file containing simulation output.
Returns:
pd.DataFrame: DataFrame with calculated QoI values indexed by SampleID and ReplicateID, with columns for each QoI - columns combined with time points.
"""
df_qois = pd.DataFrame()
# Process samples in chunks to avoid memory issues
for i in range(0, len(all_sample_ids), chunk_size):
chunk_sample_ids = all_sample_ids[i:i + chunk_size]
# Load only this chunk of data
df_output = load_output(db_file, sample_ids=chunk_sample_ids, load_data=True)
for SampleID in df_output['SampleID'].unique():
df_sample = df_output[df_output['SampleID'] == SampleID]
for ReplicateID in df_sample['ReplicateID'].unique():
mcds_ts_list = df_sample[df_sample['ReplicateID'] == ReplicateID]['Data'].values[0]
if verbose:
print(f"SampleID: {SampleID}, ReplicateID: {ReplicateID} - mcds_ts_list: {mcds_ts_list}")
data = {'SampleID': SampleID, 'ReplicateID': ReplicateID}
for id_time, mcds in enumerate(mcds_ts_list):
data[f"time_{id_time}"] = mcds.get_time()
try:
for qoi_name, qoi_func in recreated_qoi_funcs.items():
function_result = safe_call_qoi_function(qoi_func, mcds=mcds, list_mcds=mcds_ts_list)
if function_result is not None:
data[f"{qoi_name}_{id_time}"] = function_result
if len(data) == 2: continue # Skip if no QoI data was computed
except Exception as e:
raise RuntimeError(f"Error calculating QoIs for SampleID: {SampleID}, ReplicateID: {ReplicateID} - QoI: {qoi_name}_{id_time}: {e}")
# Store the data in a DataFrame
df_qoi_replicate = pd.DataFrame({key: [value] for key, value in data.items()})
df_qois = pd.concat([df_qois, df_qoi_replicate], ignore_index=True)
# Explicitely free memory
del mcds_ts_list
del df_sample
del df_output
# Reset index of the final DataFrame
df_qois = df_qois.reset_index(drop=True)
return df_qois
[docs]
def mcds_list_to_qoi_df_long(recreated_qoi_funcs, all_sample_ids, chunk_size, db_file, verbose=False) -> pd.DataFrame:
"""Convert a list of MCDS objects to a DataFrame of quantities of interest in long format.
This function processes a list of MCDS simulation results, extracting relevant
quantities of interest (QoIs) at each time point and organizing them into a long
structured DataFrame.
Args:
recreated_qoi_funcs (dict): Dictionary of QoI functions where keys are QoI names
and values are callable functions.
all_sample_ids (list): List of all sample IDs to process.
chunk_size (int): Number of samples to process in each chunk to manage memory usage.
db_file (str): Path to the database file containing simulation output.
Returns:
pd.DataFrame: DataFrame with calculated QoI values indexed by SampleID and ReplicateID, with columns for each QoI - columns combined with time points.
"""
# Process samples in chunks to avoid memory issues
ls_column = ['SampleID', 'time', 'ReplicateID']
llo_data = []
# Track keys per QoI to dynamically add new columns as encountered
qoi_keys = {}
for i in range(0, len(all_sample_ids), chunk_size):
chunk_sample_ids = all_sample_ids[i:i + chunk_size]
# Load only this chunk of data
df_output = load_output(db_file, sample_ids=chunk_sample_ids, load_data=True)
# Fetch mcds from sample replicate
for s_sample in sorted(df_output['SampleID'].unique()):
df_sample = df_output[df_output['SampleID'] == s_sample]
for s_replicate in sorted(df_sample['ReplicateID'].unique()):
l_mcds = df_sample[df_sample['ReplicateID'] == s_replicate]['Data'].values[0]
for mcds in l_mcds:
lo_data = [s_sample, mcds.get_time(), s_replicate]
try:
for s_qoi_name, o_qoi_func in sorted(recreated_qoi_funcs.items()):
if verbose:
print(f"processing sample replicate qoi: {s_sample} {s_replicate} {s_qoi_name} ...")
# Execute qoi function on mcds
o_result = safe_call_qoi_function(o_qoi_func, mcds=mcds, list_mcds=l_mcds)
# Save results from multi qoi function (dict or Series)
if type(o_result) in {dict, pd.Series}:
items = sorted(o_result.items())
# Add new keys to tracking and columns if not seen before
if s_qoi_name not in qoi_keys:
qoi_keys[s_qoi_name] = []
for s_key, o_value in items:
if s_key not in qoi_keys[s_qoi_name]:
qoi_keys[s_qoi_name].append(s_key)
ls_column.append(f'{s_qoi_name}_{s_key}')
lo_data.append(o_value)
# Pad with NaN if result is empty but keys were previously seen
if len(items) == 0 and s_qoi_name in qoi_keys:
for _ in qoi_keys[s_qoi_name]:
lo_data.append(np.nan)
# Save result from single qoi function
else:
if s_qoi_name not in ls_column:
ls_column.append(s_qoi_name)
lo_data.append(o_result)
# Error handling
except Exception as e:
raise RuntimeError(f"Error calculating QoIs for SampleID: {s_sample}, ReplicateID: {s_replicate} - QoI: {s_qoi_name}: {e}")
# Save row
llo_data.append(lo_data)
# Explicitely free memory
del l_mcds
del df_sample
del df_output
# Pad all rows to match final column count
final_col_count = len(ls_column)
llo_data = [row + [np.nan] * (final_col_count - len(row)) for row in llo_data]
# Generate data frame
df_qois = pd.DataFrame(llo_data, columns=ls_column)
df_qois = df_qois.sort_values(['SampleID','time','ReplicateID'], ignore_index=True)
return df_qois
[docs]
def mcds_list_to_qoi_df_for_calib(recreated_qoi_funcs, all_sample_ids, chunk_size, db_file, verbose=False) -> pd.DataFrame:
"""Convert a list of MCDS objects to a DataFrame of quantities of interest for calibration.
This function processes a list of MCDS simulation results, extracting relevant
quantities of interest (QoIs) and organizing them into a structured DataFrame
suitable for calibration tasks.
Args:
recreated_qoi_funcs (dict): Dictionary of QoI functions where keys are QoI names
and values are callable functions.
all_sample_ids (list): List of all sample IDs to process.
chunk_size (int): Number of samples to process in each chunk to manage memory usage.
db_file (str): Path to the database file containing simulation output.
Returns:
pd.DataFrame: DataFrame with calculated QoI values indexed by SampleID and ReplicateID, with columns for each QoI - columns is not combined with time points.
"""
df_qois = pd.DataFrame()
# Process samples in chunks to avoid memory issues
for i in range(0, len(all_sample_ids), chunk_size):
chunk_sample_ids = all_sample_ids[i:i + chunk_size]
# Load only this chunk of data
df_output = load_output(db_file, sample_ids=chunk_sample_ids, load_data=True)
for SampleID in df_output['SampleID'].unique():
df_sample = df_output[df_output['SampleID'] == SampleID]
for ReplicateID in df_sample['ReplicateID'].unique():
mcds_ts_list = df_sample[df_sample['ReplicateID'] == ReplicateID]['Data'].values[0]
for id_time, mcds in enumerate(mcds_ts_list):
data = {'SampleID': SampleID, 'ReplicateID': ReplicateID, 'time': mcds.get_time()}
try:
for qoi_name, qoi_func in recreated_qoi_funcs.items():
function_result = safe_call_qoi_function(qoi_func, mcds=mcds, list_mcds=mcds_ts_list)
if verbose:
print(f"processing sample replicate qoi: {SampleID} {ReplicateID} {qoi_name} ...")
if function_result is not None:
data[qoi_name] = function_result
if len(data) == 3: continue # Skip if no QoI data was computed
except Exception as e:
raise RuntimeError(f"Error calculating QoIs for SampleID: {SampleID}, ReplicateID: {ReplicateID} - QoI: {qoi_name}: {e}")
# Store the data in a DataFrame
df_qoi_replicate = pd.DataFrame({key: [value] for key, value in data.items()})
df_qois = pd.concat([df_qois, df_qoi_replicate], ignore_index=True)
# Explicitely free memory
del mcds_ts_list
del df_sample
del df_output
# Reset index of the final DataFrame
df_qois = df_qois.reset_index(drop=True)
return df_qois
[docs]
def get_qoi_from_db_file(db_file:str, qoi_names:list) -> pd.DataFrame:
"""Extract quantities of interest (QoIs) from a database file containing simulation results.
Returns a long-format DataFrame with one row per (SampleID, time, ReplicateID),
consistent with the raw-MCDS storage path. Old Mode-A databases (Data column
contains precomputed QoI DataFrames) are handled transparently.
Args:
db_file (str): Path to the SQLite database containing simulation results.
qoi_names (list): List of QoI names to extract.
Returns:
pd.DataFrame: Long-format DataFrame with columns [SampleID, time, ReplicateID, <qoi_names>],
sorted by (SampleID, time, ReplicateID).
"""
df_qois_data = load_output(db_file, load_data=True)
# Flatten stored QoI DataFrames into a single long-format DataFrame
expanded_data = pd.concat(
[
pd.DataFrame(data).assign(SampleID=SampleID, ReplicateID=ReplicateID)
for (SampleID, ReplicateID), group in df_qois_data.groupby(['SampleID', 'ReplicateID'])
for data in group['Data']
],
ignore_index=True
)
keep_cols = ['SampleID', 'time', 'ReplicateID'] + [q for q in qoi_names if q in expanded_data.columns]
return expanded_data[keep_cols].sort_values(['SampleID', 'time', 'ReplicateID'], ignore_index=True)
[docs]
def calculate_qoi_from_db_file(db_file:str, qoi_functions:dict, qoi_def:dict={}, chunk_size:int=10, mode='long', verbose=False) -> pd.DataFrame:
"""Calculate quantities of interest from sensitivity analysis database results.
This function loads simulation results from a database in chunks and applies QoI
functions to extract meaningful metrics from the time-series data. Processing in
chunks helps avoid excessive memory usage for large databases.
Args:
db_file (str): Path to the SQLite database containing simulation results.
qoi_functions (dict): Dictionary of QoI functions where keys are QoI names
and values are lambda functions or string representations.
qoi_def (dict): first-class object, that can be used in qoi_functions
lambda string, mapped to their name.
e.g. for a function definition, if the function definition is:
def my_func():
print('hello world!')
return 0
then the qoi_def dict would look like this:
{'my_func': my_func}
chunk_size (int, optional): Number of samples to process at a time. Default is 10.
Adjust based on available memory and data size.
mode: Specify the form of the result dataframe. Possible modes are
sa, calib, and long. The default is long.
Returns:
pd.DataFrame: DataFrame with calculated QoI values indexed by SampleID and ReplicateID, with columns for each QoI.
Example:
>>> qoi_funcs = {
... 'live_cells': lambda df: len(df[df['dead'] == False]),
... 'dead_cells': lambda df: len(df[df['dead'] == True])
... }
>>> qoi_df = calculate_qoi_from_db_file('study.db', qoi_funcs, chunk_size=20)
"""
# Load sample IDs to determine what to process
dic_samples = load_samples(db_file)
all_sample_ids = sorted(dic_samples.keys())
# Recreate QoI functions from their string representations
recreated_qoi_funcs = recreate_qoi_functions(qoi_functions=qoi_functions, qoi_def=qoi_def)
if mode == 'sa':
df_qois = mcds_list_to_qoi_df_for_sa(
recreated_qoi_funcs=recreated_qoi_funcs,
all_sample_ids=all_sample_ids,
chunk_size=chunk_size,
db_file=db_file,
verbose=verbose,
)
elif mode == 'calib':
df_qois = mcds_list_to_qoi_df_for_calib(
recreated_qoi_funcs=recreated_qoi_funcs,
all_sample_ids=all_sample_ids,
chunk_size=chunk_size,
db_file=db_file,
verbose=verbose,
)
elif mode == 'long':
df_qois = mcds_list_to_qoi_df_long(
recreated_qoi_funcs=recreated_qoi_funcs,
all_sample_ids=all_sample_ids,
chunk_size=chunk_size,
db_file=db_file,
verbose=verbose,
)
else:
raise ValueError(f"Unknown mode '{mode}'. Supported modes are 'sa' and 'calib'.")
return df_qois
[docs]
def get_mean_std_qois(df_qois: pd.DataFrame, filter_columns: list = []) -> pd.DataFrame:
"""Calculate the mean and standard deviation of quantities of interest (QoIs) across replicates.
This function computes the mean and standard deviation QoI values for each parameter sample across all replicates, providing a central tendency and variability measure for the QoI estimates.
Args:
df_qois (pd.DataFrame): DataFrame containing QoI values with SampleID, ReplicateID, and QoI columns.
filter_columns (list, optional): List of columns to include in the calculation. If None, all numeric columns are used.
Returns: tuple: A tuple containing:
pd.DataFrame: DataFrame containing the mean QoI values for each SampleID, indexed by SampleID and ReplicateID.
pd.DataFrame: DataFrame containing the standard deviation QoI values for each SampleID, indexed by SampleID and ReplicateID.
Note:
The mean QoI values are calculated by averaging the QoI estimates across all replicates for each parameter sample. This provides a central tendency measure for the QoI estimates, which can be used for sensitivity analysis and comparison between different parameter samples.
"""
df_grouped = df_qois.groupby(['SampleID']+filter_columns)
df_mean = df_grouped.mean(numeric_only=True) # ignores NaN values
df_mean.drop(columns=['ReplicateID'], inplace=True)
df_std = df_grouped.std(numeric_only=True) # ignores NaN values
df_std.drop(columns=['ReplicateID'], inplace=True)
return df_mean, df_std
[docs]
def get_relative_mcse_qois(df_mean: pd.DataFrame, df_std: pd.DataFrame, num_replicates: int, time_columns: list) -> pd.DataFrame:
"""Calculate the relative Monte Carlo Standard Error (MCSE) for QoI estimates.
This function computes the relative MCSE for each QoI across replicates, providing
insight into the uncertainty of the QoI estimates due to finite sampling in the simulations.
Args:
df_mean (pd.DataFrame): DataFrame containing the mean QoI values for each SampleID, indexed by SampleID and ReplicateID.
df_std (pd.DataFrame): DataFrame containing the standard deviation QoI values for each SampleID, indexed by SampleID and ReplicateID.
num_replicates (int): The number of replicates used to calculate the mean and standard deviation of the QoIs.
time_columns (list): List of column names corresponding to time points in the DataFrame, which should not be included in the MCSE calculation.
Returns:
pd.DataFrame: DataFrame containing the relative MCSE values for each QoI, indexed by SampleID and ReplicateID.
Note:
Relative Monte Carlo Standard Error (MCSE) is calculated as the standard deviation
of the QoI across replicates divided by the square root of the number of replicates, and then normalized by the mean QoI value to express it as a percentage. This provides insight into the uncertainty of the QoI estimates due to finite sampling in the simulations.
- < 1% (Excellent): This is the gold standard. If your relative MCSE is under 1%, your mean estimate is highly stable and precise. You can confidently use this metric for sensitivity analysis or publication.
- 1% to 5% (Good / Acceptable): For stochastic biological simulations like PhysiCell, getting under 5% is generally considered reliable and practical.
- 5% to 10% (Caution): You can use these metrics to observe broad trends, but small differences between parameters might just be noise. You likely need to run more replicates.
- > 10% (Unreliable): The metric is too noisy. If you are running 50+ replicates and still have a relative MCSE > 10%, that specific QoI (Quantity of Interest) is likely a poor choice, or your biological system is fundamentally chaotic in that aspect.
"""
df_relative_mcse = df_std/np.sqrt(num_replicates)
# Small epsilon to avoid division by zero
epsilon = max(0.01 * np.nanmedian(df_mean.abs().to_numpy().flatten()), 1e-12)
# Relative MCSE
df_relative_mcse = df_relative_mcse.div(df_mean + epsilon)
# Replace the columns relative to time as the real value from df_mean
df_relative_mcse[time_columns] = df_mean[time_columns]
return df_relative_mcse
[docs]
def get_summary_statistics_qois(df_qois: pd.DataFrame) -> tuple:
"""Calculate summary statistics (mean, standard deviation, and relative MCSE) of quantities of interest.
Args:
df_qois (pd.DataFrame): DataFrame containing QoI values with SampleID and ReplicateID columns.
Returns: tuple: A tuple containing:
pd.DataFrame: DataFrame with statistical summaries (mean) of QoIs grouped by SampleID, with columns for each QoI statistic.
pd.DataFrame: DataFrame with standard deviation of QoIs grouped by SampleID, with columns for each QoI statistic.
pd.DataFrame: DataFrame with relative Monte Carlo Standard Error (MCSE) of QoIs grouped by SampleID, with columns for each QoI statistic.
"""
time_columns = sorted([col for col in df_qois.columns if col.startswith("time_")])
if not time_columns:
if 'time' in df_qois.columns:
filter_columns = ['time']
else:
raise ValueError("No time columns found in the DataFrame.")
else:
filter_columns = []
df_mean, df_std = get_mean_std_qois(df_qois, filter_columns=filter_columns)
df_relative_mcse = get_relative_mcse_qois(df_mean, df_std, num_replicates = df_qois['ReplicateID'].nunique(), time_columns=time_columns)
return df_mean, df_std, df_relative_mcse
[docs]
def calculate_qoi_statistics(db_file_path: str, qoi_funcs: dict, df_qois_data: pd.DataFrame = None, ignore_db_consistency: bool = False, qoi_def: dict = {}, chunk_size: int = 10) -> tuple:
"""Calculate statistical summaries (mean and relative MCSE) of quantities of interest across replicates.
This function computes mean and relative Monte Carlo Standard Error (MCSE) of QoI values across
simulation replicates for each parameter sample, enabling uncertainty quantification.
Args:
db_file_path (str): Path to the database file for context.
qoi_funcs (dict): Dictionary of QoI functions where keys are QoI names and
values are lambda functions or None.
df_qois_data (pd.DataFrame): DataFrame containing QoI values with SampleID,
ReplicateID, and QoI columns. Default is None, in which case the function will attempt to load QoI data from the database.
ignore_db_consistency (bool): If True, bypasses the database consistency check.
qoi_def (dict): first-class object, that can be used in qoi_funcs lambda string, mapped to their name.
e.g. for a function definition, if the function definition is:
def my_func():
print('hello world!')
return 0
then the qoi_def dict would look like this:
{'my_func': my_func}
chunk_size (int, optional): Number of samples to process at a time when loading from the database. Default is 10. Adjust based on available memory and data size.
Returns:
tuple: A tuple containing:
- df_mean (pd.DataFrame): DataFrame with statistical summaries (mean) of QoIs grouped by SampleID, with columns for each QoI statistic.
- df_std (pd.DataFrame): DataFrame with standard deviation of QoIs grouped by SampleID, with columns for each QoI statistic.
- df_relative_mcse (pd.DataFrame): DataFrame with relative Monte Carlo Standard Error (MCSE) of QoIs grouped by SampleID, with columns for each QoI statistic.
Note:
Relative Monte Carlo Standard Error (MCSE) is calculated as the standard deviation
of the QoI across replicates divided by the square root of the number of replicates, and then normalized by the mean QoI value to express it as a percentage. This provides insight into the uncertainty of the QoI estimates due to finite sampling in the simulations.
- < 1% (Excellent): This is the gold standard. If your relative MCSE is under 1%, your mean estimate is highly stable and precise. You can confidently use this metric for sensitivity analysis or publication.
- 1% to 5% (Good / Acceptable): For stochastic biological simulations like PhysiCell, getting under 5% is generally considered reliable and practical.
- 5% to 10% (Caution): You can use these metrics to observe broad trends, but small differences between parameters might just be noise. You likely need to run more replicates.
- > 10% (Unreliable): The metric is too noisy. If you are running 50+ replicates and still have a relative MCSE > 10%, that specific QoI (Quantity of Interest) is likely a poor choice, or your biological system is fundamentally chaotic in that aspect.
Raises:
ValueError: If no QoI functions are defined or data format is invalid.
Example:
>>> qoi_funcs = {
... 'live_cells': lambda df: len(df[df['dead'] == False]),
... 'dead_cells': lambda df: len(df[df['dead'] == True])
... }
>>> df_mean, df_std, df_mcse = calculate_qoi_statistics(qoi_data, qoi_funcs, 'study.db')
"""
if df_qois_data is None:
print("No QoI data provided, calculating QoIs from the database...")
try:
df_qois_data = load_output(db_file_path, load_data=False)
except Exception as e:
raise ValueError(f"Error loading output data from database: {e}")
# Check db consistency
if not check_db_consistency(db_file_path):
if ignore_db_consistency:
print("Warning: Database consistency check failed, but proceeding as per user request.")
else:
raise ValueError("Database consistency check failed. Please verify the database integrity.")
# Check if 'Data' column in df_qois_data is a DataFrame - Case of db generated by custom summary function
if ('Data' in df_qois_data.columns):
qoi_columns = list(qoi_funcs.keys())
if not qoi_columns:
raise ValueError("Error: No QoI functions defined.")
if isinstance(df_qois_data['Data'].iloc[0], pd.DataFrame):
print("Extracting QoIs from DataFrame...")
try:
df_qois = get_qoi_from_db_file(db_file_path, qoi_columns)
except Exception as e:
raise ValueError(f"Error extracting QoIs from DataFrame: {e}")
# Check if 'Data' column in df_qois_data is a series of mcds list - Case of db generated by generic summary function with NO QoI functions
elif isinstance(df_qois_data['Data'].iloc[0], list):
print("Calculating QoIs from mcds list...")
try:
df_qois = calculate_qoi_from_db_file(db_file_path, qoi_funcs, qoi_def=qoi_def, chunk_size=chunk_size)
except Exception as e:
raise ValueError(f"Error calculating QoIs from mcds list: {e}")
if df_qois.empty:
raise ValueError("df_qois is empty, unable to generate QoIs from the database.")
else:
raise ValueError("Error: Data element is neither Dataframe nor List.")
# If QoIs are already in the database and 'Data' column is not present
else:
print("Extracting QoIs from existing DataFrame...")
df_qois = df_qois_data
# Take the mean and MCSE among the replicates and sort the samples
try:
df_mean, df_std, df_relative_mcse = get_summary_statistics_qois(df_qois)
except Exception as e:
raise ValueError(f"Error taking the mean, std, and MCSE among replicates: {e}")
return df_mean, df_std, df_relative_mcse
[docs]
def apply_pca_to_qois(df_mean: pd.DataFrame, latent_dim: int = 3, seed: int = None):
"""Apply PCA to the selected QoIs.
This function uses PCA (scikit-learn) as a linear dimensionality reduction technique on the QoI matrix (samples x features).
Args:
df_mean (pd.DataFrame): DataFrame with QoI mean values indexed by SampleID.
latent_dim (int): Dimension of the latent encoding.
seed (int): Random seed for reproducibility (used if PCA requires it). If None, no seed is set.
Returns:
dict: {
'method': 'pca',
'encoder_output': np.ndarray (n_samples x latent_dim),
'reconstruction': np.ndarray (n_samples x n_features),
'model': PCA object,
'scaler': StandardScaler object
}
"""
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import PCA
scaler = StandardScaler()
X_scaled = scaler.fit_transform(df_mean.to_numpy(dtype=float))
n_samples, n_features = X_scaled.shape
pca = PCA(n_components=min(latent_dim, n_features), random_state=seed)
enc = pca.fit_transform(X_scaled)
recon = pca.inverse_transform(enc)
return {'method': 'pca', 'encoder_output': enc, 'reconstruction': recon, 'model': pca, 'scaler': scaler}
[docs]
def apply_autoencoder_to_qois(df_mean: pd.DataFrame, latent_dim: int = 2, epochs: int = 200, batch_size: int = 32, verbose: bool = False, seed: int = None):
"""Apply an autoencoder to the selected QoIs.
This function attempts to use PyTorch to train a small autoencoder on the
QoI matrix (samples x features). If a seed is provided, results will be reproducible.
Args:
df_mean (pd.DataFrame): DataFrame with QoI mean values indexed by SampleID.
latent_dim (int): Dimension of the latent encoding.
epochs (int): Training epochs for the PyTorch autoencoder.
batch_size (int): Batch size for training.
verbose (bool): Verbosity flag.
seed (int): Random seed for reproducibility. If None, no seed is set.
Returns:
dict: {
'method': 'torch',
'encoder_output': np.ndarray (n_samples x latent_dim),
'reconstruction': np.ndarray (n_samples x n_features),
'model': trained model object
'scaler': StandardScaler object
}
"""
import numpy as _np
import random
from sklearn.preprocessing import StandardScaler
# Set seeds for reproducibility
if seed is not None:
random.seed(seed)
_np.random.seed(seed)
X = df_mean.sort_index().to_numpy(dtype=float)
scaler = StandardScaler()
X_scaled = scaler.fit_transform(X)
n_samples, n_features = X_scaled.shape
# Try PyTorch first
try:
import torch
import torch.nn as nn
from torch.utils.data import TensorDataset, DataLoader
# Set torch seed
if seed is not None:
torch.manual_seed(seed)
torch.cuda.manual_seed_all(seed)
if verbose:
print("Using PyTorch autoencoder")
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
class AE(nn.Module):
def __init__(self, n_in, n_latent):
super().__init__()
self.encoder = nn.Sequential(
nn.Linear(n_in, max(16, n_in//2)),
nn.ReLU(),
nn.Linear(max(16, n_in//2), n_latent),
)
self.decoder = nn.Sequential(
nn.Linear(n_latent, max(16, n_in//2)),
nn.ReLU(),
nn.Linear(max(16, n_in//2), n_in),
)
def forward(self, x):
z = self.encoder(x)
xrec = self.decoder(z)
return xrec, z
model = AE(n_features, latent_dim).to(device)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)
loss_fn = nn.MSELoss()
tensor_X = torch.tensor(X_scaled, dtype=torch.float32)
dataset = TensorDataset(tensor_X)
loader = DataLoader(dataset, batch_size=min(batch_size, n_samples), shuffle=False, generator=torch.Generator().manual_seed(seed) if seed is not None else None)
model.train()
for ep in range(epochs):
epoch_loss = 0.0
for (batch,) in loader:
batch = batch.to(device)
optimizer.zero_grad()
xrec, _ = model(batch)
loss = loss_fn(xrec, batch)
loss.backward()
optimizer.step()
epoch_loss += float(loss.detach().cpu().item()) * batch.size(0)
if verbose and (ep % max(1, epochs//10) == 0 or ep == epochs-1):
print(f"AE epoch {ep+1}/{epochs} loss={epoch_loss / n_samples:.6g}")
# encode and reconstruct
model.eval()
with torch.no_grad():
X_t = tensor_X.to(device)
xrec_t, z_t = model(X_t)
recon = xrec_t.cpu().numpy()
enc = z_t.cpu().numpy()
return {'method': 'torch', 'encoder_output': enc, 'reconstruction': recon, 'model': model, 'scaler': scaler}
except Exception as e:
if verbose:
print(f"PyTorch autoencoder unavailable or failed ({e}).")
[docs]
def regression_accuracy_parameters(df_parameters: pd.DataFrame, encoder_output: np.ndarray) -> pd.DataFrame:
"""Calculate regression accuracy of parameters from the latent encoding.
This function trains a regression model (e.g., Random Forest) to predict each parameter from the latent encoding and evaluates the R² score for each parameter.
Args:
df_parameters (pd.DataFrame): DataFrame containing parameter values indexed by SampleID.
encoder_output (np.ndarray): Latent encoding of the QoI data (n_samples x latent_dim).
Returns:
pd.DataFrame: DataFrame containing R² scores for each parameter, indexed by parameter name.
- R² ≈ 1.0: Perfect fit. The model explains all the variance in the parameter values from the latent encoding.
- R² < 0.5: Poor fit. The model does not capture the relationship between the latent encoding and the parameter values well.
- R² ≈ 0: No fit. The model does not explain any of the variance in the parameter values from the latent encoding, indicating no relationship.
- R² < 0: Worse than no fit. The model performs worse than simply predicting the mean parameter value for all samples, suggesting a very poor relationship between the latent encoding and the parameter values.
"""
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import cross_val_score
r2_results = {}
for param in df_parameters.columns:
y = df_parameters[param].to_numpy()
model = RandomForestRegressor(n_estimators=100, random_state=42)
model.fit(encoder_output, y)
# Use cross_val_score to get the 'True' R2
# This prevents the 0.82 from being an overfitted illusion
scores = cross_val_score(model, encoder_output, y, cv=5, scoring='r2')
r2_results[param] = scores.mean()
return pd.DataFrame.from_dict(r2_results, orient='index', columns=['R2_CV'])
[docs]
def align_params_to_qois(df_params: pd.DataFrame, df_qois: pd.DataFrame) -> pd.DataFrame:
"""Align parameter DataFrame to match the multi-index structure of QoI DataFrame.
If df_qois has a multi-index (SampleID, time) but df_params only has SampleID,
this function replicates parameter rows for each time point so that both DataFrames
can be joined on (SampleID, time).
String/categorical columns in df_params are automatically encoded to numeric values
using LabelEncoder to ensure compatibility with scikit-learn regression models.
Args:
df_params (pd.DataFrame): Parameter DataFrame indexed by SampleID.
df_qois (pd.DataFrame): QoI DataFrame, possibly with multi-index (SampleID, time).
Returns:
pd.DataFrame: Parameter DataFrame aligned to match df_qois index structure,
with categorical columns encoded as numeric.
"""
from sklearn.preprocessing import LabelEncoder
# If df_qois has a multi-index with 'time', expand df_params accordingly
if hasattr(df_qois.index, 'names') and 'time' in df_qois.index.names:
# Extract unique (SampleID, time) pairs from df_qois
qois_index = df_qois.index
# Create aligned params by replicating rows for each time point
expanded_data = []
new_index_tuples = []
for sample_id, time_val in qois_index:
if sample_id in df_params.index:
# Replicate the parameter row for this (SampleID, time) pair
expanded_data.append(df_params.loc[sample_id].values)
new_index_tuples.append((sample_id, time_val))
# Create new DataFrame with aligned rows
df_params_aligned = pd.DataFrame(expanded_data, columns=df_params.columns)
df_params_aligned.index = pd.MultiIndex.from_tuples(new_index_tuples, names=['SampleID', 'time'])
df_params_aligned = df_params_aligned.sort_index()
else:
# No time index in df_qois, return df_params as-is
df_params_aligned = df_params.copy()
# Encode categorical columns to numeric for regression compatibility
for col in df_params_aligned.columns:
if df_params_aligned[col].dtype == 'object': # String/categorical column
le = LabelEncoder()
df_params_aligned[col] = le.fit_transform(df_params_aligned[col].astype(str))
return df_params_aligned
[docs]
def find_optimal_qoi_set(df_qois: pd.DataFrame, df_params: pd.DataFrame) -> list:
"""
Use Recursive Feature Elimination with Cross-Validation (RFECV) to find the optimal set of QoIs that best predict the parameters.
This function trains a regression model (e.g., Random Forest) to predict parameters from the QoIs and uses RFECV to identify the most important QoIs for accurate parameter prediction. The optimal set of QoIs is determined based on the R² score obtained through cross-validation.
Args:
df_qois (pd.DataFrame): DataFrame containing QoI mean values indexed by SampleID and time.
df_params (pd.DataFrame): DataFrame containing parameter values indexed by SampleID and time.
Returns:
list: List of optimal QoI names that best predict the parameters based on RFECV analysis.
"""
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import RFECV
from sklearn.model_selection import KFold
from sklearn.metrics import make_scorer, r2_score
# 1. Define the base model
base_estimator = RandomForestRegressor(n_estimators=100, random_state=42)
# 2. Define a custom scoring function that focuses on the worst-predicted parameter (the bottleneck)
def min_r2_scorer(y_true, y_pred):
# Calculate R2 for each parameter individually
scores = r2_score(y_true, y_pred, multioutput='raw_values')
# Return the worst score (the bottleneck)
return np.min(scores)
# 3. Set up RFECV with the custom scorer and cross-validation strategy
selector = RFECV(
estimator=base_estimator,
step=1,
cv=KFold(5),
scoring=make_scorer(min_r2_scorer), # Focus on the hardest-to-identify parameter
min_features_to_select=df_params.shape[1],
n_jobs=-1
)
# 4. Fit the selector to the data
selector.fit(df_qois.sort_index().to_numpy(), df_params.sort_index().to_numpy())
return selector
def _regression_accuracy_with_weights(df_parameters: pd.DataFrame, encoder_output: np.ndarray, mcse_weights: np.ndarray = None) -> pd.DataFrame:
"""
Calculate regression accuracy of parameters from latent encoding with optional MCSE-based weighting.
Uses sample_weight inversely proportional to MCSE to give more weight to stable features.
Args:
df_parameters (pd.DataFrame): DataFrame containing parameter values indexed by SampleID.
encoder_output (np.ndarray): Latent encoding (n_samples x latent_dim).
mcse_weights (np.ndarray): Per-sample weights based on information-to-noise ratio. If None, uniform weights.
Returns:
pd.DataFrame: DataFrame containing weighted R² scores for each parameter.
"""
from sklearn.ensemble import RandomForestRegressor
from sklearn.model_selection import cross_val_score
r2_results = {}
for param in df_parameters.columns:
y = df_parameters[param].to_numpy()
model = RandomForestRegressor(n_estimators=100, random_state=42)
if mcse_weights is not None:
# Use sample_weight to favor stable features (passed via fit_params)
model.fit(encoder_output, y, sample_weight=mcse_weights)
scores = cross_val_score(model, encoder_output, y, cv=5, scoring='r2', fit_params={'sample_weight': mcse_weights})
else:
model.fit(encoder_output, y)
scores = cross_val_score(model, encoder_output, y, cv=5, scoring='r2')
r2_results[param] = scores.mean()
return pd.DataFrame.from_dict(r2_results, orient='index', columns=['R2_CV'])
[docs]
def recursive_feature_elimination(df_qois_mean, df_qois_mcse, df_params, autoencoder_params={}, mcse_threshold=0.10, correlation_threshold=0.95, verbose=False):
"""
Perform smart feature elimination using balanced Autoencoder-first approach.
**Philosophy (Hybrid):** Removes only pathological features, preserves useful noise:
1. Remove EXTREME outliers (MCSE > 10% by default, configurable)
2. Remove highly correlated redundancy (>95%, configurable)
3. Apply Autoencoder on cleaned feature set
4. Use RFECV on latent space to find optimal encoding
5. Validate with Synthetic Recovery Test
This avoids "noise = non-informative" while still cleaning truly problematic features.
Args:
df_qois_mean (pd.DataFrame): DataFrame containing mean QoI values indexed by SampleID and time.
df_qois_mcse (pd.DataFrame): DataFrame containing relative MCSE values for QoIs indexed by SampleID and time.
df_params (pd.DataFrame): DataFrame containing parameter values indexed by SampleID.
autoencoder_params (dict): Parameters for autoencoder (latent_dim, epochs, batch_size, seed).
mcse_threshold (float): Threshold for removing EXTREME noise (default 0.10 = 10%). Only removes pathological features.
correlation_threshold (float): Threshold for removing redundant correlations (default 0.95 = 95%). Only removes near-duplicates.
verbose (bool): Print detailed pipeline stages.
Returns:
dict: {
'removed_extreme_noise': list of features removed as outliers,
'removed_redundant': list of highly correlated features removed,
'cleaned_qois': list of QoIs kept for autoencoder,
'correlation_matrix': full pairwise correlation matrix,
'full_autoencoder_results': autoencoder results on cleaned set,
'full_regression_r2': R² from full set,
'final_qois': QoI names selected by RFECV,
'reduced_autoencoder_results': autoencoder on final selected QoIs,
'reduced_regression_r2': R² from reduced set,
'synthetic_recovery_test': Full vs Reduced R² comparison,
}
"""
import warnings
from sklearn.ensemble import RandomForestRegressor
from sklearn.feature_selection import RFECV
from sklearn.model_selection import KFold, cross_val_score
from sklearn.metrics import make_scorer, r2_score
#####################################################
# Stage 0: Prepare Data
#####################################################
df_qois_work = df_qois_mean.copy()
# Identify and fill trajectory QoIs (those with NaN in MCSE)
trajectory_qois = [col for col in df_qois_mcse.columns if df_qois_mcse[col].isna().any()]
if trajectory_qois:
last_per_sample = df_qois_work[trajectory_qois].groupby(level='SampleID').transform('last')
df_qois_work[trajectory_qois] = df_qois_work[trajectory_qois].fillna(last_per_sample)
if verbose:
print(f"✓ Trajectory QoIs filled (NaN→last value per sample): {trajectory_qois}")
# Data quality check: Remove rows with any NaN or inf values
initial_rows = len(df_qois_work)
nan_mask = df_qois_work.isna().any(axis=1) | np.isinf(df_qois_work).any(axis=1)
if nan_mask.any():
removed_rows = nan_mask.sum()
df_qois_work = df_qois_work[~nan_mask]
if verbose:
print(f"⚠ Data Quality: Removed {removed_rows}/{initial_rows} rows with NaN/inf values")
if len(df_qois_work) == 0:
raise ValueError("No valid QoI data remaining after removing NaN/inf values. Check data source.")
# Align parameters
df_params_aligned = align_params_to_qois(df_params, df_qois_work)
# Ensure params and QoIs have matching indices
common_idx = df_qois_work.index.intersection(df_params_aligned.index)
if len(common_idx) < len(df_qois_work):
df_qois_work = df_qois_work.loc[common_idx]
df_params_aligned = df_params_aligned.loc[common_idx]
if verbose:
print(f"⚠ Index alignment: Kept {len(common_idx)} matching (SampleID, time) pairs")
#####################################################
# Stage 1: Remove EXTREME Outliers Only (Light Filter)
#####################################################
if verbose:
print(f"\n[STAGE 1] Pathological Noise Removal (MCSE > {mcse_threshold*100}%)")
removed_extreme_noise = [col for col in df_qois_mcse.columns if df_qois_mcse[col].quantile(0.9) > mcse_threshold]
df_qois_work = df_qois_work.drop(columns=removed_extreme_noise, errors='ignore')
if verbose:
if removed_extreme_noise:
print(f" Removed {len(removed_extreme_noise)}/{len(df_qois_mcse.columns)} pathological features: {removed_extreme_noise}")
else:
print(f" No features exceed extreme noise threshold (all MCSE ≤ {mcse_threshold*100}%)")
#####################################################
# Stage 2: Remove Redundant Correlations (Hard Threshold)
#####################################################
if verbose:
print(f"\n[STAGE 2] Redundancy Removal (Correlation > {correlation_threshold*100}%)")
corr_matrix = df_qois_work.corr().abs()
removed_redundant = []
# Greedy strategy: for each correlated pair, keep the one with lower median MCSE
visited = set()
for col in corr_matrix.columns:
if col in visited:
continue
corr_peers = corr_matrix.index[corr_matrix[col] > correlation_threshold].tolist()
if len(corr_peers) > 1:
# Keep the one with best MCSE
best_feature = min(corr_peers, key=lambda x: df_qois_mcse[x].median())
for peer in corr_peers:
if peer != best_feature and peer not in removed_redundant:
removed_redundant.append(peer)
visited.add(peer)
visited.add(best_feature)
df_qois_work = df_qois_work.drop(columns=removed_redundant, errors='ignore')
if verbose:
if removed_redundant:
print(f" Removed {len(removed_redundant)}/{len(corr_matrix.columns)} redundant features: {removed_redundant}")
else:
print(f" No near-duplicate features found (correlation ≤ {correlation_threshold*100}%)")
#####################################################
# Stage 3: Autoencoder on CLEANED Feature Set
#####################################################
if verbose:
print(f"\n[STAGE 3] Autoencoder on Cleaned Features ({len(df_qois_work.columns)} QoIs)")
dict_ae_full = apply_autoencoder_to_qois(
df_qois_work,
latent_dim=autoencoder_params.get('latent_dim', 2),
epochs=autoencoder_params.get('epochs', 200),
batch_size=autoencoder_params.get('batch_size', 32),
seed=autoencoder_params.get('seed', 42),
verbose=verbose
)
# Regression on full autoencoder output
df_r2_full = _regression_accuracy_with_weights(
df_params_aligned,
dict_ae_full['encoder_output'],
mcse_weights=None # Uniform weights for full set
)
if verbose:
print(f" Full AE → Params R² (mean): {df_r2_full['R2_CV'].mean():.4f}")
#####################################################
# Stage 4: RFECV on Latent Space to Find Optimal QoI Set
#####################################################
if verbose:
print(f"\n[STAGE 4] RFECV-based Feature Selection on Latent Encoding")
# Define scoring: bottleneck (minimum R² of hardest parameter)
def min_r2_scorer(y_true, y_pred):
if y_true.ndim == 1:
return r2_score(y_true, y_pred)
else:
return np.min([r2_score(y_true[:, i], y_pred[:, i]) for i in range(y_true.shape[1])])
# RFECV just to identify if we can remove more from the cleaned set
base_rf = RandomForestRegressor(n_estimators=100, random_state=42)
selector = RFECV(
base_rf,
step=1,
cv=KFold(5),
scoring=make_scorer(min_r2_scorer),
min_features_to_select=max(1, len(df_qois_work.columns) // 2),
n_jobs=-1
)
# Fit RFECV on cleaned QoI data
selector.fit(df_qois_work.to_numpy(), df_params_aligned.to_numpy())
selected_qoi_mask = selector.support_
final_qois = df_qois_work.columns[selected_qoi_mask].tolist()
if verbose:
removed_by_rfecv = df_qois_work.columns[~selected_qoi_mask].tolist()
print(f" RFECV selected {len(final_qois)}/{len(df_qois_work.columns)} QoIs")
if removed_by_rfecv:
print(f" Removed by RFECV: {removed_by_rfecv}")
#####################################################
# Stage 5: Synthetic Recovery Test
#####################################################
if verbose:
print(f"\n[STAGE 5] Synthetic Recovery Test (Full vs Reduced)")
# Train on reduced set
dict_ae_reduced = apply_autoencoder_to_qois(
df_qois_work[final_qois],
latent_dim=autoencoder_params.get('latent_dim', 2),
epochs=autoencoder_params.get('epochs', 200),
batch_size=autoencoder_params.get('batch_size', 32),
seed=autoencoder_params.get('seed', 42),
verbose=False
)
df_r2_reduced = _regression_accuracy_with_weights(
df_params_aligned,
dict_ae_reduced['encoder_output'],
mcse_weights=None
)
# Compare
recovery_test = pd.DataFrame({
'Full_R2': df_r2_full['R2_CV'],
'Reduced_R2': df_r2_reduced['R2_CV'],
'R2_Drop': df_r2_full['R2_CV'] - df_r2_reduced['R2_CV']
})
if verbose:
print(f" Full Set Mean R²: {df_r2_full['R2_CV'].mean():.4f}")
print(f" Reduced Set Mean R²: {df_r2_reduced['R2_CV'].mean():.4f}")
print(f" Mean R² Drop: {recovery_test['R2_Drop'].mean():.4f}")
acceptable_drops = recovery_test[recovery_test['R2_Drop'] <= 0.05]
critical_drops = recovery_test[recovery_test['R2_Drop'] > 0.1]
if len(acceptable_drops) == len(recovery_test):
print(f" ✓ All parameters recovered well (R² drop ≤ 0.05)")
elif len(critical_drops) > 0:
print(f" ⚠ Critical R² drops (>0.1) for: {critical_drops.index.tolist()}")
#####################################################
# Stage 6: Final Output
#####################################################
return {
'removed_extreme_noise': removed_extreme_noise,
'removed_redundant': removed_redundant,
'cleaned_qois': list(df_qois_work.columns),
'cleaned_index': df_qois_work.index, # Index of rows kept after NaN/inf removal
'correlation_matrix': corr_matrix,
# Full set (on cleaned features)
'full_autoencoder_results': dict_ae_full,
'full_regression_r2': df_r2_full,
# Reduced set (post-RFECV)
'final_qois': final_qois,
'reduced_autoencoder_results': dict_ae_reduced,
'reduced_regression_r2': df_r2_reduced,
# Validation
'synthetic_recovery_test': recovery_test,
}