import pcdl
import numpy as np
import scipy
import pandas as pd
from shutil import rmtree
from typing import Union
import re
import inspect, ast
import textwrap
[docs]
def summ_func_FinalPopLiveDead(outputPath:str,summaryFile:Union[str,None], dic_params:dict, SampleID:int, ReplicateID:int) -> Union[pd.DataFrame,None]:
"""
Final population of live and dead cells
Args:
outputPath: str -> Path to the PhysiCell output directory.
summaryFile: Union[str, None] -> File to store the summary (optional).
dic_params: dict -> Dictionary of simulation parameters.
SampleID: int -> Unique identifier for the sample.
ReplicateID: int -> Unique identifier for the replicate.
Returns:
pd.DataFrame or None -> DataFrame with the computed QoIs or None if saved to a file.
"""
# read the last file
mcds = pcdl.TimeStep('final.xml',outputPath, microenv=False, graph=False, settingxml=None, verbose=False)
# dataframe of cells
df_cell = mcds.get_cell_df()
# population stats live and dead cells
live_cells = len(df_cell[ (df_cell['dead'] == False) ] )
dead_cells = len(df_cell[ (df_cell['dead'] == True) ] )
# dataframe structure
data = {'time': mcds.get_time(), 'sampleID': SampleID, 'replicateID': ReplicateID, 'live_cells': live_cells, 'dead_cells': dead_cells, 'run_time_sec': mcds.get_runtime()}
data_conc = {**data,**dic_params} # concatenate output data and parameters
df = pd.DataFrame([data_conc])
# remove replicate output folder
rmtree( outputPath )
if (summaryFile):
df.to_csv(summaryFile, sep='\t', encoding='utf-8')
return None
else: return df
# Population over time of live and dead cells
[docs]
def summ_func_TimeSeriesPopLiveDead(outputPath:str,summaryFile:Union[str,None], dic_params:dict, SampleID:int, ReplicateID:int) -> Union[pd.DataFrame,None]:
"""
Population over time of live and dead cells
Args:
outputPath: str -> Path to the PhysiCell output directory.
summaryFile: Union[str, None] -> File to store the summary (optional).
dic_params: dict -> Dictionary of simulation parameters.
SampleID: int -> Unique identifier for the sample.
ReplicateID: int -> Unique identifier for the replicate.
Returns:
pd.DataFrame or None -> DataFrame with the computed QoIs or None if saved to a file.
"""
mcds_ts = pcdl.TimeSeries(outputPath, microenv=False, graph=False, settingxml=None, verbose=False)
for mcds in mcds_ts.get_mcds_list():
df_cell = mcds.get_cell_df()
live_cells = len(df_cell[ (df_cell['dead'] == False) ] )
dead_cells = len(df_cell[ (df_cell['dead'] == True) ] )
data = {'time': mcds.get_time(), 'sampleID': SampleID, 'replicateID': ReplicateID, 'live_cells': live_cells, 'dead_cells': dead_cells, 'run_time_sec': mcds.get_runtime()}
data_conc = {**data,**dic_params} # concatenate output data and parameters
if ( mcds.get_time() == 0 ): df = pd.DataFrame([data_conc]) # create the dataframe
else: df.loc[len(df)] = data_conc # append the dictionary to the dataframe
# remove replicate output folder
rmtree( outputPath )
if (summaryFile):
df.to_csv(summaryFile, sep='\t', encoding='utf-8')
return None
else: return df
def _check_functions_need_microenv(qoi_funcs:dict) -> bool:
"""
Check if any of the QoI functions require microenvironment data (df_subs).
Args:
qoi_funcs: dict -> Dictionary of QoI functions
Returns:
bool: True if any function needs microenvironment data
"""
# If no functions provided, return True (default to loading microenvironment)
if not qoi_funcs:
return True
# If any function needs microenvironment data, return True
for func in qoi_funcs.values():
# Check if any parameter name indicates substrate/concentration data
param_lower = func.__param_name__.lower()
if 'subs' in param_lower or 'conc' in param_lower or 'micro' in param_lower:
return True
# If none need it,
return False
[docs]
def safe_call_qoi_function(func: callable, mcds:Union[pcdl.TimeStep,None]=None, list_mcds:Union[list,None]=None ):
"""
Safely call a QoI function with the appropriate dataframe based on parameter inspection.
Args:
func: The QoI function to call
mcds: pcdl.TimeStep or None -> The mcds object for single snapshot
list_mcds: list of pcdl.TimeStep or None -> The mcds time series object for multiple snapshots
Returns:
Result of the QoI function
"""
# Require stored metadata for dispatch.
param_name = getattr(func, '__param_name__', None)
# Reject non-wrapped callables (no __param_name__ attribute).
if param_name is None:
raise ValueError(
"QoI function is missing required '__param_name__' metadata. "
"Wrap callables with _create_wrapper_for_qoi_function or recreate_qoi_functions first."
)
# Handle wrapped/string-defined QoI functions.
if param_name in ['df_cell', 'df'] and mcds is not None: # Function expects cell dataframe
return func(mcds.get_cell_df())
elif param_name in ['df_subs'] and mcds is not None: # Function expects substrate dataframe
return func(mcds.get_conc_df())
elif param_name in ['mcds'] and mcds is not None: # Function expects the mcds object
return func(mcds)
elif param_name in ['mcds_ts'] and list_mcds is not None: # Function expects the mcds time series object
if mcds == list_mcds[-1]: # Ensure we only compute once per time series (last mcds passed)
return func(list_mcds)
else:
return None # Skip computation for other snapshots
raise ValueError(
f"Could not call QoI function with param_name='{param_name}', "
f"mcds provided={mcds is not None}, list_mcds provided={list_mcds is not None}."
)
def _create_wrapper_for_qoi_function(func: callable, param_name: str, qoi_name: str) -> callable:
"""
Create a wrapper function for a QoI function that preserves parameter inspection capability.
Args:
func: The original QoI function to wrap
param_name: The name of the parameter that the function expects (e.g., 'df_cell', 'df_subs', 'mcds', 'mcds_ts')
qoi_name: The name of the QoI function (used for setting the wrapper's __name__)
"""
# Create a wrapper function that calls the pre-compiled lambda
def wrapper(*args, **kwargs):
return func(*args, **kwargs)
# Store the original parameter name as an attribute for inspection
wrapper.__param_name__ = param_name
wrapper.__name__ = qoi_name
return wrapper
def _extract_lambda_source(func):
source = textwrap.dedent(inspect.getsource(func))
lambda_index = source.find("lambda")
if lambda_index == -1:
raise ValueError("No lambda expression found")
candidate = source[lambda_index:].strip()
# Try progressively shorter suffixes until the lambda expression parses.
# This handles trailing commas and surrounding syntax from dict literals,
# calls, or other enclosing expressions.
for end in range(len(candidate), 0, -1):
snippet = candidate[:end].rstrip()
if not snippet:
continue
try:
tree = ast.parse(f"({snippet})", mode="eval")
except SyntaxError:
continue
for node in ast.walk(tree):
if isinstance(node, ast.Lambda):
extracted = ast.get_source_segment(f"({snippet})", node)
if extracted:
return extracted
raise ValueError("No lambda expression found")
def _convert_qoi_function_to_string(qoi_func: callable, qoi_name:str):
"""
Convert a callable QoI function to its source code string representation.
Args:
qoi_func: The QoI function to convert (must be a callable)
qoi_name: The name of the QoI function (used for error messages)
"""
# Check if the qoi_func is a callable function converted to string
if callable(qoi_func):
return _extract_lambda_source(qoi_func)
else:
raise ValueError(f"QoI function for '{qoi_name}' is not callable.")
def _create_named_function_from_string(qoi_name: str, func_str: str, qoi_def:dict={}) -> callable:
"""
Dynamically creates a named function from a string.
Args:
qoi_name: The name of the function to be created.
func_str: The string representation of the function.
qoi_def (dict): first-class object, that can be used in qoi_functions
lambda string, mapped to their name.
e.g. for a function definition, if the function definition is:
def my_func():
print('hello world!')
return 0
then the qoi_def dict would look like this:
{'my_func': my_func}
Returns:
The created function with preserved parameter inspection capability.
"""
# Extract arg name from lambda (e.g., "lambda df_subs:" -> "df_subs")
arg_match = re.search(r'lambda\s+(\w+):', func_str)
if arg_match:
arg_name = arg_match.group(1)
else:
arg_name = 'mcds'
# Create a restricted namespace with necessary imports and no built-in functions.
namespace = {'pd': pd, 'np': np, 'len': len, 'sum': sum, 'map': map, '__builtins__': {}}
# Add locally defined QoI helper callables so lambda strings can reference them.
namespace.update(qoi_def)
namespace.update({
name: obj
for name, obj in globals().items()
if callable(obj) and name.startswith('qoi_func')
})
# Evaluate the lambda function once at creation time
try:
compiled_func = eval(func_str, namespace)
except Exception as e:
raise ValueError(f"Error evaluating QoI function string '{func_str}': {e}")
# Create a wrapper function that calls the pre-compiled lambda
wrapper = _create_wrapper_for_qoi_function(func=compiled_func, param_name=arg_name, qoi_name=qoi_name)
return wrapper
[docs]
def recreate_qoi_functions(qoi_functions:dict, qoi_def:dict={}) -> dict:
"""
Recreate QoI functions from their string representations.
Args:
qoi_functions: Dictionary of QoI functions (keys as names, values as strings).
Pass a mapping like the following:
qoi_functions = {
"mean_substrate": "lambda df_subs: df_subs['substrate'].mean()",
"live_cell_count": "lambda df_cell: len(df_cell[df_cell['dead'] == False])",
"max_volume": "lambda df: df['total_volume'].max()",
"mean_radial_distance": "lambda df: df[['position_x', 'position_y', 'position_z']].apply(lambda row: ((row['position_x']**2 + row['position_y']**2 + row['position_z']**2)**0.5), axis=1).mean()"
}
qoi_def (dict): first-class object, that can be used in qoi_functions
lambda string, mapped to their name.
e.g. for a function definition, if the function definition is:
def my_func():
print('hello world!')
return 0
then the qoi_def dict would look like this:
{'my_func': my_func}
Returns:
Dictionary of recreated QoI functions (keys as names, values as callables)
"""
recreated_qoi_funcs = {}
for qoi_name, func_str in qoi_functions.items():
try:
if callable(func_str):
signature = inspect.signature(func_str)
param_name = list(dict(signature.parameters).keys())[0]
recreated_qoi_funcs[qoi_name] = _create_wrapper_for_qoi_function(func=func_str, param_name=param_name, qoi_name=qoi_name)
else:
recreated_qoi_funcs[qoi_name] = _create_named_function_from_string(qoi_name=qoi_name, func_str=func_str, qoi_def=qoi_def)
except Exception as e:
raise ValueError(f"Error recreating QoI function '{qoi_name}': {e}")
return recreated_qoi_funcs
[docs]
def summary_function(outputPath:str, summaryFile:Union[str, None], dic_params:dict, SampleID:int, ReplicateID:int, qoi_functions:dict, RemoveFolder:bool=True, drop_columns:Union[list, None]=None,) -> Union[pd.DataFrame, None]:
"""
Generic summary function for creating custom QoIs (Quantities of Interest) based on df_cell elements.
Args:
outputPath: str -> Path to the PhysiCell output directory.
summaryFile: Union[str, None] -> File to store the summary (optional).
dic_params: dict -> Dictionary of simulation parameters.
SampleID: int -> Unique identifier for the sample.
ReplicateID: int -> Unique identifier for the replicate.
qoi_functions: dict -> Dictionary of QoI functions with keys as QoI names and values as functions/lambdas.
RemoveFolder: bool -> Whether to remove the output folder after processing.
drop_columns: list -> List of columns to drop from the DataFrame.
Returns:
pd.DataFrame or None -> DataFrame with the computed QoIs or None if saved to a file.
"""
try:
# Check if any function needs microenvironment data
needs_microenv = _check_functions_need_microenv(qoi_functions)
# Load the time series
mcds_ts = pcdl.TimeSeries(outputPath, microenv=needs_microenv, graph=False, settingxml=None, verbose=False)
list_mcds=mcds_ts.get_mcds_list()
# All data is stored as a list of mcds
if qoi_functions is None:
# Optional: Remove replicate output folder
if (RemoveFolder): rmtree(outputPath)
# Entire list of mcds is returned if drop_columns is empty
if not drop_columns:
return list_mcds
else:
df_list = []
for mcds in list_mcds:
df_cell = mcds.get_cell_df()
df_cell.drop(columns=drop_columns, inplace=True, errors='ignore')
df_list.append(df_cell)
return df_list
else:
df_list = []
for mcds in list_mcds:
try:
qoi_data = {}
for name, func in qoi_functions.items():
func_result = safe_call_qoi_function(func, mcds=mcds, list_mcds=list_mcds)
if func_result is not None:
qoi_data[name] = func_result
if not qoi_data: continue # Skip if no QoI data was computed
# print(f"Computed QoIs at time {mcds.get_time()}: {qoi_data}")
except Exception as e:
raise RuntimeError(f"Error computing QoIs in summary_function: {e}")
data = {
'time': mcds.get_time(),
'sampleID': SampleID,
'replicateID': ReplicateID,
**qoi_data
}
data_conc = {**data, **dic_params}
df_list.append(data_conc)
df = pd.DataFrame(df_list)
except FileNotFoundError as e:
raise RuntimeError(f"Error: Required file not found in {outputPath}. {e}")
except Exception as e:
raise RuntimeError(f"An error occurred while processing QoIs: {e}")
# Optional: Remove replicate output folder
if (RemoveFolder): rmtree(outputPath)
# Save to file or return DataFrame
if summaryFile:
df.to_csv(summaryFile, sep='\t', encoding='utf-8')
return None
else:
return df
[docs]
def qoi_func_radial_density_summary(df, center=[0,0,0]):
"""
Extract summary statistics from radial distribution of cells
Args:
df: DataFrame with columns ['position_x', 'position_y', 'position_z']
center: List or array-like of length 3 representing the center point for radial distance calculation (default is [0,0,0])
"""
cell_centers = df[['position_x', 'position_y', 'position_z']].values
dist_from_center = np.linalg.norm(cell_centers - np.array(center), axis=1)
return {
"center_of_mass": np.average(dist_from_center),
"median": np.median(dist_from_center),
"spread": np.std(dist_from_center),
"iqr": np.percentile(dist_from_center, 75) - np.percentile(dist_from_center, 25),
"skewness": scipy.stats.skew(dist_from_center),
"kurtosis": scipy.stats.kurtosis(dist_from_center),
}
[docs]
def qoi_func_persistent_homology(df:pd.DataFrame, Plot=False) -> tuple:
"""
Compute persistent homology vectorization using muspan.
(source: https://docs.muspan.co.uk/latest/_collections/topology/Topology%203%20-%20persistence%20vectorisation.html)
Args:
df_cells: DataFrame -> DataFrame containing cell data with 'position_x', 'position_y', and 'cell_type' columns.
Plot: bool -> Whether to plot the persistence diagram (optional).
Returns:
tuple -> (pd.Series with vectorized persistent homology features, figure or None)
"""
import matplotlib.pyplot as plt
try:
import muspan
except ImportError:
raise ImportError("muspan library is required for computing persistent homology. Please install it via 'pip install muspan'.")
# Extract cell positions
points = df[['position_x', 'position_y']].to_numpy()
labels = df['cell_type'].to_numpy()
# Create a muspan domain and add points
domain = muspan.domain('Position Data')
domain.add_points(points, 'Cell Positions')
domain.add_labels('Celltype', labels)
# Query to select cells of types 'A', 'B', ... in each domain
q_cell_types = muspan.query.query(domain, ('label', 'Celltype'), 'in', df['cell_type'].unique().tolist())
# Compute Vietoris-Rips filtrations
feature_persistence = muspan.topology.vietoris_rips_filtration(domain, population=q_cell_types, max_dimension=1)
# Plot persistence diagram (optional)
figure = None
if Plot:
fig, axes = plt.subplots(figsize=(15, 6), nrows=1, ncols=2)
# Plot domain with cell types
muspan.visualise.visualise(domain, 'Celltype', ax=axes[0], add_cbar=False, marker_size=2.5, objects_to_plot=q_cell_types)
# Plot persistence diagram
muspan.visualise.persistence_diagram(feature_persistence, ax=axes[1])
figure = [fig, axes]
# Vectorise the persistence homology diagram for the domain using statistical method
vectorised_ph,name_of_features = muspan.topology.vectorise_persistence(feature_persistence, method='statistics')
return pd.Series(vectorised_ph, index=name_of_features), figure
[docs]
def qoi_func_relational_ph( df: pd.DataFrame, landmark_type: str, witness_type: str, max_dim: int = 1, mode: str = "distance", ax = None) -> tuple:
"""
Relational Persistent Homology using Dowker/Witness idea.
A→B (A as vertices, B as witnesses) describes how B are arranged around A geometry.
Args:
df : DataFrame with columns ['position_x','position_y','cell_type']
landmark_type : cell type to use as landmarks (A)
witness_type : cell type to use as witnesses (B)
max_dim : PH dimension (0 or 1)
mode: 'distance' or 'count'
ax : matplotlib axis for plotting (optional)
Returns:
tuple -> (pd.Series with vectorized persistent homology features (summary statistics), raw persistence diagram (list of (dimension, (birth, death)) pairs)
"""
from scipy.spatial.distance import cdist
from scipy.spatial import Delaunay
import gudhi, muspan
import matplotlib.pyplot as plt
# Extract A = Landmarks, B = Witnesses
A_points = df[df["cell_type"] == landmark_type][["position_x","position_y"]].to_numpy()
B_points = df[df["cell_type"] == witness_type][["position_x","position_y"]].to_numpy()
if len(A_points) == 0 or len(B_points) == 0:
print("Warning: Both landmark_type and witness_type must be present.")
# Return empty Series without pre-defined index - will be filled with NaN by the caller
return pd.Series(dtype=float), None
# Pairwise distances B×A
D = cdist(B_points, A_points)
# Build candidate simplex list from landmarks
if len(A_points) < 3:
print("Warning: At least 3 landmark points are required for relational PH.")
# Return empty Series - will be filled with NaN by the caller
return pd.Series(dtype=float), None
# Use Delaunay to match Python repo behavior
tri = Delaunay(A_points)
# vertices = all points
vertices = list(range(len(A_points)))
# edges from Delaunay
edges = set()
faces = set()
for simplex in tri.simplices:
simplex = list(simplex)
# all edges
for i in range(3):
for j in range(i+1, 3):
edges.add(tuple(sorted([simplex[i], simplex[j]])))
# triangle
faces.add(tuple(sorted(simplex)))
edges = list(edges)
faces = list(faces)
# Compute filtration values
def dowker_distance(simplex):
"""Distance-based Dowker: min_w max_a d(a,w)."""
return np.min(np.max(D[:, simplex], axis=1))
def dowker_count(simplex):
"""Count-based Dowker: number of witnesses covering all simplex vertices."""
return -np.sum(np.all(D[:, simplex] <= np.max(D[:, simplex], axis=0), axis=1))
# Compute filtration values based on the distance
fil_func = dowker_distance if mode == "distance" else dowker_count
f_vertex = np.array([fil_func([i]) for i in vertices])
f_edge = np.array([fil_func(list(e)) for e in edges])
if max_dim >= 2:
f_face = np.array([fil_func(list(f)) for f in faces])
# Build GUDHI SimplexTree
st = gudhi.SimplexTree()
# vertices
for i, fv in enumerate(f_vertex):
st.insert([i], filtration=float(fv))
# edges
for (e, fv) in zip(edges, f_edge):
st.insert(list(e), filtration=float(fv))
# faces
if max_dim >= 2:
for (f, fv) in zip(faces, f_face):
st.insert(list(f), filtration=float(fv))
st.initialize_filtration()
# Compute persistence
diag = st.persistence()
# Convert GUDHI persistence output to muspan-style dict {'dgms': [array_dim0, array_dim1, ...]}
dgms = []
for d in range(max_dim + 1):
intervals = [pair for dim, pair in diag if dim == d]
if len(intervals) == 0:
dgms.append(np.zeros((0, 2)))
else:
dgms.append(np.array(intervals))
feature_persistence = {'dgms': dgms}
# Vectorize diagram using muspan's statistics. Some degenerate diagrams can
# produce empty finite arrays in muspan/numpy percentile calls.
try:
vec, names = muspan.topology.vectorise_persistence(feature_persistence, method="statistics")
vec = pd.Series(vec, index=names)
except (IndexError, ValueError) as e:
print(
f"Warning: Could not vectorize relational PH for "
f"{landmark_type}->{witness_type}: {e}"
)
# Return empty Series - will be filled with NaN by the caller
vec = pd.Series(dtype=float)
# Plot
if ax is not None:
axes = gudhi.plot_persistence_diagram(diag, axes=ax)
return vec, diag