Source code for WORC.plotting.plot_estimator_performance
#!/usr/bin/env python
# Copyright 2016-2021 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import sys
import numpy as np
import pandas as pd
from random import shuffle
from sklearn.base import is_regressor
from collections import OrderedDict
from sklearn.utils import resample
import WORC.addexceptions as ae
from WORC.classification import metrics
import WORC.processing.label_processing as lp
from WORC.plotting.compute_CI import compute_confidence
from WORC.plotting.compute_CI import compute_confidence_bootstrap
[docs]def fit_thresholds(thresholds, estimator, label_type, X_train, Y_train,
ensemble_method, ensemble_size, ensemble_scoring):
print('Fitting thresholds on validation set')
if not hasattr(estimator, 'cv_iter'):
cv_iter = list(estimator.cv.split(X_train, Y_train))
estimator.cv_iter = cv_iter
p_all = estimator.cv_results_['params'][0]
thresholds_low = list()
thresholds_high = list()
for it, (train, valid) in enumerate(estimator.cv_iter):
print(' - iteration {it + 1} / {n_iter}.')
# NOTE: Explicitly exclude validation set, elso refit and score
# somehow still seems to use it.
X_train_temp = [estimator[label_type]['X_train'][i] for i in train]
Y_train_temp = [estimator[label_type]['Y_train'][i] for i in train]
train_temp = range(0, len(train))
# Refit a SearchCV object with the provided parameters
if ensemble_method is not None:
estimator.create_ensemble(X_train_temp, Y_train_temp,
method=ensemble_method, size=ensemble_size,
verbose=False, scoring=ensemble_scoring)
else:
estimator.refit_and_score(X_train_temp, Y_train_temp, p_all,
train_temp, train_temp,
verbose=False)
# Predict and save scores
X_train_values = [x[0] for x in X_train] # Throw away labels
X_train_values_valid = [X_train_values[i] for i in valid]
Y_valid_score_temp = estimator.predict_proba(X_train_values_valid)
# Only take the probabilities for the second class
Y_valid_score_temp = Y_valid_score_temp[:, 1]
# Select thresholds
thresholds_low.append(np.percentile(Y_valid_score_temp, thresholds[0]*100.0))
thresholds_high.append(np.percentile(Y_valid_score_temp, thresholds[1]*100.0))
thresholds_val = [np.mean(thresholds_low), np.mean(thresholds_high)]
print(f'Thresholds {thresholds} converted to {thresholds_val}.')
return thresholds_val
[docs]def compute_statistics(y_truth, y_score, y_prediction, modus, regression):
"""Compute statistics on predictions."""
if modus == 'singlelabel':
# Compute singlelabel performance metrics
if not regression:
return metrics.performance_singlelabel(y_truth,
y_prediction,
y_score,
regression)
else:
return metrics.performance_singlelabel(y_truth,
y_prediction,
y_score,
regression)
elif modus == 'multilabel':
# Convert class objects to single label per patient
y_truth_temp = list()
y_prediction_temp = list()
for yt, yp in zip(y_truth, y_prediction):
label = np.where(yt == 1)
if len(label) > 1:
raise ae.WORCNotImplementedError('Multiclass classification evaluation is not supported in WORC.')
y_truth_temp.append(label[0][0])
label = np.where(yp == 1)
y_prediction_temp.append(label[0][0])
y_truth = y_truth_temp
y_prediction = y_prediction_temp
# Compute multilabel performance metrics
predictions_multilabel =\
metrics.performance_multilabel(y_truth,
y_prediction,
y_score)
# Compute all single label performance metrics as well
n_labels = len(np.unique(y_truth))
for i_label in range(n_labels):
y_truth_single = [i == i_label for i in y_truth]
y_prediction_single = [i == i_label for i in y_prediction]
y_score_single = y_score[:, i_label]
predictions_singlelabel_temp =\
metrics.performance_singlelabel(y_truth_single,
y_prediction_single,
y_score_single,
regression)
if i_label == 0:
predictions_singlelabel =\
[[i] for i in predictions_singlelabel_temp]
else:
for num, metric in enumerate(predictions_singlelabel_temp):
predictions_singlelabel[num].append(metric)
output = list(predictions_multilabel) + predictions_singlelabel
return output
else:
raise ae.WORCKeyError('{modus} is not a valid modus!')
[docs]def plot_estimator_performance(prediction, label_data, label_type,
crossval_type=None, alpha=0.95,
ensemble_method='top_N',
ensemble_size=100, verbose=True,
ensemble_scoring=None,
output=None, modus=None,
thresholds=None, survival=False,
shuffle_estimators=False,
bootstrap=None, bootstrap_N=None,
overfit_scaler=None,
save_memory=True,
refit_ensemble=False):
"""Plot the output of a single estimator, e.g. a SVM.
Parameters
----------
prediction: pandas dataframe or string, mandatory
output of trainclassifier function, either a pandas dataframe
or a HDF5 file
label_data: string, mandatory
Contains the path referring to a .txt file containing the
patient label(s) and value(s) to be used for learning. See
the Github Wiki for the format.
label_type: string, mandatory
Name of the label to extract from the label data to test the
estimator on.
alpha: float, default 0.95
Significance of confidence intervals.
ensemble_method: string, default 'top_N'
Determine which method to use for creating the ensemble.
Choices: top_N or Caruana
ensemble_size: int, default 50
Determine the size of the ensemble. Only relevant for top_N
verbose: boolean, default True
Plot intermedate messages.
ensemble_scoring: string, default None
Metric to be used for evaluating the ensemble. If None,
the option set in the prediction object will be used.
output: string, default stats
Determine which results are put out. If stats, the statistics of the
estimator will be returned. If scores, the scores will be returned.
thresholds: list of integer(s), default None
If None, use default threshold of sklearn (0.5) on posteriors to
converge to a binary prediction. If one integer is provided, use that one.
If two integers are provided, posterior < thresh[0] = 0, posterior > thresh[1] = 1.
Returns
----------
Depending on the output parameters, the following outputs are returned:
If output == 'stats':
stats: dictionary
Contains the confidence intervals of the performance metrics
and the number of times each patient was classifier correctly
or incorrectly.
If output == 'scores':
y_truths: list
Contains the true label for each object.
y_scores: list
Contains the score (e.g. posterior) for each object.
y_predictions: list
Contains the predicted label for each object.
pids: list
Contains the patient ID/name for each object.
"""
# Load the prediction object if it's a hdf5 file
if type(prediction) is not pd.core.frame.DataFrame:
if os.path.isfile(prediction):
prediction = pd.read_hdf(prediction)
else:
raise ae.WORCIOError(('{} is not an existing file!').format(str(prediction)))
# Select the estimator from the pandas dataframe to use
keys = prediction.keys()
if label_type is None:
label_type = keys[0]
# Load the label data
if type(label_data) is not dict:
if os.path.isfile(label_data):
if type(label_type) is not list:
# Singlelabel: convert to list
label_type = [[label_type]]
elif len(label_type) == 1:
label_type = label_type[0].split(', ')
label_data = lp.load_labels(label_data, label_type)
else:
raise ae.WORCValueError(f"Label data {label_data} incorrect: not a dictionary, or file does not exist.")
n_labels = len(label_type)
patient_IDs = label_data['patient_IDs']
labels = label_data['label']
if type(label_type) is list:
# FIXME: Support for multiple label types not supported yet.
print('[WORC Warning] Support for multiple label types not supported yet. Taking first label for plot_estimator_performance.')
label_type = keys[0]
# Extract the estimators, features and labels
regression = is_regressor(prediction[label_type]['classifiers'][0].best_estimator_)
feature_labels = prediction[label_type]['feature_labels']
# Get some configuration variables if present in the prediction
config = prediction[label_type].config
if modus is None:
modus = config['Labels']['modus']
if crossval_type is None:
crossval_type = config['CrossValidation']['Type']
if bootstrap is None:
bootstrap = config['Bootstrap']['Use']
if bootstrap_N is None:
bootstrap_N = int(config['Bootstrap']['N_iterations'])
if overfit_scaler is None:
overfit_scaler = config['Evaluation']['OverfitScaler']
ensemble_metric = config['Ensemble']['Metric']
# Create lists for performance measures
if not regression:
sensitivity = list()
specificity = list()
precision = list()
npv = list()
accuracy = list()
bca = list()
auc = list()
f1_score_list = list()
val_score = list()
if modus == 'multilabel':
acc_av = list()
# Also add scoring measures for all single label scores
sensitivity_single = [list() for j in range(n_labels)]
specificity_single = [list() for j in range(n_labels)]
precision_single = [list() for j in range(n_labels)]
npv_single = [list() for j in range(n_labels)]
accuracy_single = [list() for j in range(n_labels)]
bca_single = [list() for j in range(n_labels)]
auc_single = [list() for j in range(n_labels)]
f1_score_list_single = [list() for j in range(n_labels)]
else:
r2score = list()
MSE = list()
coefICC = list()
PearsonC = list()
PearsonP = list()
SpearmanC = list()
SpearmanP = list()
patient_classification_list = dict()
percentages_selected = list()
if bootstrap:
# To be compatible with flake8, initialize some variables
y_truth_all = None
y_prediction_all = None
y_score_all = None
test_patient_IDs_or = None
if output in ['scores', 'decision'] or crossval_type == 'LOO':
# Keep track of all groundth truths and scores
y_truths = list()
y_scores = list()
y_predictions = list()
pids = list()
# Extract sample size
N_1 = float(len(prediction[label_type]['patient_ID_train'][0]))
N_2 = float(len(prediction[label_type]['patient_ID_test'][0]))
# Convert tuples to lists if required
if type(prediction[label_type]['X_test']) is tuple:
prediction[label_type]['X_test'] = list(prediction[label_type]['X_test'])
prediction[label_type]['X_train'] = list(prediction[label_type]['X_train'])
prediction[label_type]['Y_train'] = list(prediction[label_type]['Y_train'])
prediction[label_type]['Y_test'] = list(prediction[label_type]['Y_test'])
prediction[label_type]['patient_ID_test'] = list(prediction[label_type]['patient_ID_test'])
prediction[label_type]['patient_ID_train'] = list(prediction[label_type]['patient_ID_train'])
prediction[label_type]['classifiers'] = list(prediction[label_type]['classifiers'])
# Loop over the test sets, which correspond to cross-validation
# or bootstrapping iterations
n_iter = len(prediction[label_type]['Y_test'])
if bootstrap:
iterobject = range(0, bootstrap_N)
else:
iterobject = range(0, n_iter)
for i in iterobject:
print("\n")
if bootstrap:
print(f"Bootstrap {i + 1} / {bootstrap_N}.")
else:
print(f"Cross-validation {i + 1} / {n_iter}.")
test_indices = list()
# When bootstrapping, there is only a single train/test set.
if bootstrap:
if i == 0:
X_test_temp_or = prediction[label_type]['X_test'][0]
X_train_temp = prediction[label_type]['X_train'][0]
Y_train_temp = prediction[label_type]['Y_train'][0]
Y_test_temp_or = prediction[label_type]['Y_test'][0]
test_patient_IDs_or = prediction[label_type]['patient_ID_test'][0]
train_patient_IDs = prediction[label_type]['patient_ID_train'][0]
fitted_model = prediction[label_type]['classifiers'][0]
# Objects required for first iteration
test_patient_IDs = test_patient_IDs_or[:]
X_test_temp = X_test_temp_or[:]
Y_test_temp = Y_test_temp_or[:]
else:
X_test_temp = prediction[label_type]['X_test'][i]
X_train_temp = prediction[label_type]['X_train'][i]
Y_train_temp = prediction[label_type]['Y_train'][i]
Y_test_temp = prediction[label_type]['Y_test'][i]
test_patient_IDs = prediction[label_type]['patient_ID_test'][i]
train_patient_IDs = prediction[label_type]['patient_ID_train'][i]
fitted_model = prediction[label_type]['classifiers'][i]
# Check which patients are in the test set.
if output == 'stats' and crossval_type != 'LOO':
for i_ID in test_patient_IDs:
# Initiate counting how many times a patient is classified correctly
if i_ID not in patient_classification_list:
patient_classification_list[i_ID] = dict()
patient_classification_list[i_ID]['N_test'] = 0
patient_classification_list[i_ID]['N_correct'] = 0
patient_classification_list[i_ID]['N_wrong'] = 0
patient_classification_list[i_ID]['N_test'] += 1
# Check if this is exactly the label of the patient within the label file
if i_ID not in patient_IDs:
print(f'[WORC WARNING] Patient {i_ID} is not found the patient labels, removing underscore.')
i_ID = i_ID.split("_")[0]
if i_ID not in patient_IDs:
print(f'[WORC WARNING] Did not help, excluding patient {i_ID}.')
continue
test_indices.append(np.where(patient_IDs == i_ID)[0][0])
# Extract ground truth
y_truth = Y_test_temp
# If required, shuffle estimators for "Random" ensembling
if shuffle_estimators:
# Randomly shuffle the estimators
print('Shuffling estimators for random ensembling.')
shuffle(fitted_model.cv_results_['params'])
# If requested, first let the SearchCV object create an ensemble
if bootstrap and i > 0:
# For bootstrapping, only do this at the first iteration
pass
elif not fitted_model.ensemble or refit_ensemble:
# If required, rank according to generalization score instead of mean_validation_score
if ensemble_metric == 'generalization':
print('Using generalization score for estimator ranking.')
indices = fitted_model.cv_results_['rank_generalization_score']
fitted_model.cv_results_['params'] = [fitted_model.cv_results_['params'][i] for i in indices[::-1]]
elif ensemble_metric != 'Default':
raise ae.WORCKeyError(f'Metric {ensemble_metric} is not known: use Default or generalization.')
# NOTE: Added for backwards compatability
if not hasattr(fitted_model, 'cv_iter'):
cv_iter = list(fitted_model.cv.split(X_train_temp, Y_train_temp))
fitted_model.cv_iter = cv_iter
# Create the ensemble
X_train_temp = [(x, feature_labels) for x in X_train_temp]
fitted_model.create_ensemble(X_train_temp, Y_train_temp,
method=ensemble_method, size=ensemble_size,
verbose=verbose, scoring=ensemble_scoring,
overfit_scaler=overfit_scaler)
# If bootstrap, generate a bootstrapped sample
if bootstrap and i > 0:
y_truth, y_prediction, y_score, test_patient_IDs =\
resample(y_truth_all, y_prediction_all,
y_score_all, test_patient_IDs_or)
else:
# Create prediction
y_prediction = fitted_model.predict(X_test_temp)
if regression:
y_score = y_prediction
elif modus == 'multilabel':
y_score = fitted_model.predict_proba(X_test_temp)
else:
y_score = fitted_model.predict_proba(X_test_temp)[:, 1]
# Create a new binary score based on the thresholds if given
if thresholds is not None:
if len(thresholds) == 1:
y_prediction = y_score >= thresholds[0]
elif len(thresholds) == 2:
# X_train_temp = [x[0] for x in X_train_temp]
y_score_temp = list()
y_prediction_temp = list()
y_truth_temp = list()
test_patient_IDs_temp = list()
thresholds_val = fit_thresholds(thresholds, fitted_model,
label_type, X_train_temp,
Y_train_temp, ensemble_method,
ensemble_size,
ensemble_scoring)
for pnum in range(len(y_score)):
if y_score[pnum] <= thresholds_val[0] or y_score[pnum] > thresholds_val[1]:
y_score_temp.append(y_score[pnum])
y_prediction_temp.append(y_prediction[pnum])
y_truth_temp.append(y_truth[pnum])
test_patient_IDs_temp.append(test_patient_IDs[pnum])
perc = float(len(y_prediction_temp))/float(len(y_prediction))
percentages_selected.append(perc)
print(f"Selected {len(y_prediction_temp)} from {len(y_prediction)} ({perc}%) patients using two thresholds.")
y_score = y_score_temp
y_prediction = y_prediction_temp
y_truth = y_truth_temp
test_patient_IDs = test_patient_IDs_temp
else:
raise ae.WORCValueError(f"Need None, one or two thresholds on the posterior; got {len(thresholds)}.")
if crossval_type != 'LOO' and type(y_prediction) is np.ndarray:
if y_prediction.shape == 1 or y_prediction.shape[0] == 1:
# Convert to list for compatability
y_prediction = [y_prediction.tolist()]
# If all scores are NaN, the classifier cannot do probabilities, thus
# use hard predictions
if np.sum(np.isnan(y_score)) == len(y_prediction):
print('[WORC Warning] All scores NaN, replacing with prediction.')
y_score = y_prediction
if bootstrap and i == 0:
# Save objects for re-use
y_truth_all = y_truth[:]
y_prediction_all = y_prediction[:]
y_score_all = y_score[:]
print("Truth: " + str(y_truth))
print("Prediction: " + str(y_prediction))
print("Score: " + str(y_score))
if output == 'stats' and crossval_type != 'LOO':
# Add if patient was classified correctly or not to counting
for i_truth, i_predict, i_test_ID in zip(y_truth, y_prediction, test_patient_IDs):
if modus == 'multilabel':
success = (i_truth == i_predict).all()
else:
success = i_truth == i_predict
if success:
patient_classification_list[i_test_ID]['N_correct'] += 1
else:
patient_classification_list[i_test_ID]['N_wrong'] += 1
if output in ['decision', 'scores'] or crossval_type == 'LOO':
# Output the posteriors
y_scores.append(y_score)
y_truths.append(y_truth)
y_predictions.append(y_prediction)
pids.append(test_patient_IDs)
elif output == 'stats':
# Compute statistics
print('Computing performance statistics.')
# Compute confusion matrix and use for sensitivity/specificity
performances = compute_statistics(y_truth, y_score, y_prediction,
modus, regression)
# Print AUC to keep you up to date
if not regression:
if modus == 'singlelabel':
accuracy_temp, bca_temp, sensitivity_temp,\
specificity_temp, precision_temp, npv_temp,\
f1_score_temp, auc_temp = performances
else:
accuracy_temp, sensitivity_temp,\
specificity_temp, precision_temp, npv_temp,\
f1_score_temp, auc_temp, acc_av_temp,\
accuracy_temp_single,\
bca_temp_single, sensitivity_temp_single,\
specificity_temp_single, precision_temp_single,\
npv_temp_single, f1_score_temp_single,\
auc_temp_single = performances
print('AUC: ' + str(auc_temp))
# Append performance to lists for all cross validations
accuracy.append(accuracy_temp)
if modus == 'singlelabel':
bca.append(bca_temp)
sensitivity.append(sensitivity_temp)
specificity.append(specificity_temp)
auc.append(auc_temp)
f1_score_list.append(f1_score_temp)
precision.append(precision_temp)
npv.append(npv_temp)
val_score.append(fitted_model.ensemble_validation_score)
if modus == 'multilabel':
acc_av.append(acc_av_temp)
for j in range(n_labels):
accuracy_single[j].append(accuracy_temp_single[j])
bca_single[j].append(bca_temp_single[j])
sensitivity_single[j].append(sensitivity_temp_single[j])
specificity_single[j].append(specificity_temp_single[j])
auc_single[j].append(auc_temp_single[j])
f1_score_list_single[j].append(f1_score_temp_single[j])
precision_single[j].append(precision_temp_single[j])
npv_single[j].append(npv_temp_single[j])
else:
r2score_temp, MSE_temp, coefICC_temp, PearsonC_temp,\
PearsonP_temp, SpearmanC_temp,\
SpearmanP_temp = performances
print('R2 Score: ' + str(r2score_temp))
r2score.append(r2score_temp)
MSE.append(MSE_temp)
coefICC.append(coefICC_temp)
PearsonC.append(PearsonC_temp)
PearsonP.append(PearsonP_temp)
SpearmanC.append(SpearmanC_temp)
SpearmanP.append(SpearmanP_temp)
# Delete some objects to save memory in cross-validtion
if not bootstrap and save_memory:
del fitted_model, X_test_temp, X_train_temp, Y_train_temp
del Y_test_temp, test_patient_IDs, train_patient_IDs
prediction[label_type]['X_test'][i] = None
prediction[label_type]['X_train'][i] = None
prediction[label_type]['Y_train'][i] = None
prediction[label_type]['Y_test'][i] = None
prediction[label_type]['patient_ID_test'][i] = None
prediction[label_type]['patient_ID_train'][i] = None
prediction[label_type]['classifiers'][i] = None
if output in ['scores', 'decision']:
# Return the scores and true values of all patients
return y_truths, y_scores, y_predictions, pids
elif output == 'stats':
if not regression:
metric_names_single = ['Accuracy', 'BCA', 'Sensitivity',
'Specificity', 'Precision', 'NPV',
'F1-score', 'AUC']
if modus == 'singlelabel':
metric_names = metric_names_single
elif modus == 'multilabel':
metric_names_multi = ['Accuracy', 'Sensitivity',
'Specificity', 'Precision', 'NPV',
'F1-score', 'AUC',
'Average Accuracy']
metric_names = metric_names_multi + metric_names_single
else:
# Regression
metric_names = ['R2-score', 'MSE', 'ICC', 'PearsonC',
'PearsonP', 'SpearmanC', 'SpearmanP']
# Compute statistics
stats = dict()
output = dict()
all_performances = None
if crossval_type == 'LOO':
performances = compute_statistics(y_truths, y_scores,
y_predictions,
modus, regression)
# Put all metrics with their names in the statistics dict
for k, v in zip(metric_names, performances):
stats[k] = str(v)
if thresholds is not None:
if len(thresholds) == 2:
# Compute percentage of patients that was selected
stats["Percentage Selected"] = str(percentages_selected[0])
output['Statistics'] = stats
else:
# Compute alpha confidence intervals (CIs)
# FIXME: multilabel performance per single label not included
# FIXME: multilabel not working in bootstrap
# FIXME: bootstrap not done in regression
all_performances = dict()
if not regression:
if bootstrap:
# Compute once for the real test set the performance
X_test_temp = prediction[label_type]['X_test'][0]
y_truth = prediction[label_type]['Y_test'][0]
y_prediction = fitted_model.predict(X_test_temp)
y_score = fitted_model.predict_proba(X_test_temp)[:, 1]
performances_test =\
metrics.performance_singlelabel(y_truth,
y_prediction,
y_score,
regression)
# Aggregate bootstrapped performances
performances_bootstrapped =\
[accuracy, bca, sensitivity, specificity, precision,
npv, f1_score_list, auc]
# Compute confidence intervals for all metrics
for p in range(len(metric_names_single)):
k = metric_names_single[p] + ' 95%:'
perf = performances_bootstrapped[p]
perf_test = performances_test[p]
stats[k] = f"{perf_test} {str(compute_confidence_bootstrap(perf, perf_test, N_1, alpha))}"
all_performances[metric_names_single[p]] = perf
else:
# From SMAC
# names = ['Accuracy', 'BCA', 'AUC', 'F1-score', 'Precision',
# 'NPV', 'Sensitivity', 'Specificity', 'Validation-score']
# performances = [accuracy, bca, auc, f1_score_list,
# precision, npv, sensitivity, specificity, val_score]
# for name, perf in zip(names, performances):
# all_performances[name] = perf
#
# stats["Accuracy 95%:"] = f"{np.nanmean(accuracy)} {str(compute_confidence(accuracy, N_1, N_2, alpha))}"
# stats["BCA 95%:"] = f"{np.nanmean(bca)} {str(compute_confidence(bca, N_1, N_2, alpha))}"
# stats["AUC 95%:"] = f"{np.nanmean(auc)} {str(compute_confidence(auc, N_1, N_2, alpha))}"
# stats["F1-score 95%:"] = f"{np.nanmean(f1_score_list)} {str(compute_confidence(f1_score_list, N_1, N_2, alpha))}"
# stats["Precision 95%:"] = f"{np.nanmean(precision)} {str(compute_confidence(precision, N_1, N_2, alpha))}"
# stats["NPV 95%:"] = f"{np.nanmean(npv)} {str(compute_confidence(npv, N_1, N_2, alpha))}"
# stats["Sensitivity 95%:"] = f"{np.nanmean(sensitivity)} {str(compute_confidence(sensitivity, N_1, N_2, alpha))}"
# stats["Specificity 95%:"] = f"{np.nanmean(specificity)} {str(compute_confidence(specificity, N_1, N_2, alpha))}"
# stats["Validation 95%:"] = f"{np.nanmean(val_score)} {str(compute_confidence(val_score, N_1, N_2, alpha))}"
if modus == 'multilabel':
# Multiclass
# First gather overall performances
performances = [accuracy, sensitivity, specificity,
precision, npv, f1_score_list, auc,
acc_av]
for name, perf in zip(metric_names_multi, performances):
all_performances[name] = perf
stats[f"{name} 95%:"] = f"{np.nanmean(perf)} {str(compute_confidence(perf, N_1, N_2, alpha))}"
# Performance per label
performances = [accuracy_single, bca_single,
sensitivity_single, specificity_single,
auc_single, f1_score_list_single,
precision_single, npv_single]
for name, perf in zip(metric_names_single, performances):
for nlabel, label in enumerate(label_type.split(',')):
all_performances[f"{name}_{label}"] = perf[nlabel]
stats[f"{name}_{label} 95%:"] = f"{np.nanmean(perf[nlabel])} {str(compute_confidence(perf[nlabel], N_1, N_2, alpha))}"
else:
# Singleclass
performances = [accuracy, bca, sensitivity, specificity,
precision, npv, f1_score_list, auc]
for name, perf in zip(metric_names_single, performances):
all_performances[name] = perf
try:
CI = compute_confidence(perf, N_1, N_2, alpha)
except ZeroDivisionError:
# CI not defined, give nan
CI = [np.nan, np.nan]
stats[f"{name} 95%:"] = f"{np.nanmean(perf)} {str(CI)}"
if thresholds is not None:
if len(thresholds) == 2:
# Compute percentage of patients that was selected
stats["Percentage Selected 95%:"] = f"{np.nanmean(percentages_selected)} {str(compute_confidence(percentages_selected, N_1, N_2, alpha))}"
# Extract statistics on how often patients got classified correctly
rankings = dict()
alwaysright = dict()
alwayswrong = dict()
percentages = dict()
timesintestset = dict()
for i_ID in patient_classification_list:
percentage_right = patient_classification_list[i_ID]['N_correct'] / float(patient_classification_list[i_ID]['N_test'])
# Check if this is exactly the label of the patient within the label file
check_id = i_ID
if check_id not in patient_IDs:
print(f'[WORC WARNING] Patient {check_id} is not found the patient labels, removing underscore.')
check_id = check_id.split("_")[0]
if check_id not in patient_IDs:
print(f'[WORC WARNING] Did not help, excluding patient {check_id}.')
continue
if check_id in patient_IDs:
label = labels[0][np.where(check_id == patient_IDs)]
else:
# Multiple instance of one patient
label = labels[0][np.where(check_id.split('_')[0] == patient_IDs)]
label = label[0][0]
percentages[i_ID] = str(label) + ': ' + str(round(percentage_right, 2) * 100) + '%'
if percentage_right == 1.0:
alwaysright[i_ID] = label
print(f"Always Right: {i_ID}, label {label}.")
elif percentage_right == 0:
alwayswrong[i_ID] = label
print(f"Always Wrong: {i_ID}, label {label}.")
timesintestset[i_ID] = patient_classification_list[i_ID]['N_test']
rankings["Always right"] = alwaysright
rankings["Always wrong"] = alwayswrong
rankings['Percentages'] = percentages
rankings['timesintestset'] = timesintestset
output['Rankings'] = rankings
else:
# Regression
names = ['R2-score', 'MSE', 'ICC', 'PearsonC', 'PearsonP'
'SpearmanC', 'SpearmanP']
performances = [r2score, MSE, coefICC, PearsonC,
PearsonP, SpearmanC, SpearmanP]
for name, perf in zip(names, performances):
all_performances[name] = perf
stats['R2-score 95%: '] = f"{np.nanmean(r2score)} {str(compute_confidence(r2score, N_1, N_2, alpha))}"
stats['MSE 95%: '] = f"{np.nanmean(MSE)} {str(compute_confidence(MSE, N_1, N_2, alpha))}"
stats['ICC 95%: '] = f"{np.nanmean(coefICC)} {str(compute_confidence(coefICC, N_1, N_2, alpha))}"
stats['PearsonC 95%: '] = f"{np.nanmean(PearsonC)} {str(compute_confidence(PearsonC, N_1, N_2, alpha))}"
stats['PearsonP 95%: '] = f"{np.nanmean(PearsonP)} {str(compute_confidence(PearsonP, N_1, N_2, alpha))}"
stats['SpearmanC 95%: '] = f"{np.nanmean(SpearmanC)} {str(compute_confidence(SpearmanC, N_1, N_2, alpha))}"
stats['SpearmanP 95%: '] = f"{np.nanmean(SpearmanP)} {str(compute_confidence(SpearmanP, N_1, N_2, alpha))}"
# Print all CI's and add to output
stats = OrderedDict(sorted(stats.items()))
for k, v in stats.items():
print(f"{k} : {v}.")
output['Statistics'] = stats
if all_performances is not None:
output['All_performances'] = all_performances
return output
[docs]def combine_multiple_estimators(predictions, label_data, N_1, N_2,
multilabel_type, label_types,
ensemble=1, strategy='argmax', alpha=0.95):
'''
Combine multiple estimators in a single model.
Note: the multilabel_type labels should correspond to the ordering in label_types.
Hence, if multilabel_type = 0, the prediction is label_type[0] etc.
'''
# Load the multilabel label data
label_data = lp.load_labels(label_data, multilabel_type)
patient_IDs = label_data['patient_IDs']
labels = label_data['label']
# Initialize some objects
y_truths = list()
y_scores = list()
y_predictions = list()
pids = list()
y_truths_train = list()
y_scores_train = list()
y_predictions_train = list()
pids_train = list()
accuracy = list()
sensitivity = list()
specificity = list()
auc = list()
f1_score_list = list()
precision = list()
npv = list()
acc_av = list()
# Extract all the predictions from the estimators
for prediction, label_type in zip(predictions, label_types):
y_truth, y_score, y_prediction, pid,\
y_truth_train, y_score_train, y_prediction_train, pid_train =\
plot_estimator_performance(prediction, label_data, label_type,
ensemble=ensemble, output='allscores')
y_truths.append(y_truth)
y_scores.append(y_score)
y_predictions.append(y_prediction)
pids.append(pid)
y_truths_train.append(y_truth_train)
y_scores_train.append(y_score_train)
y_predictions_train.append(y_prediction_train)
pids_train.append(pid_train)
# Combine the predictions
for i_crossval in range(0, len(y_truths[0])):
# Extract all values for this cross validation iteration from all objects
y_truth = [t[i_crossval] for t in y_truths]
y_score = [t[i_crossval] for t in y_scores]
pid = [t[i_crossval] for t in pids]
if strategy == 'argmax':
# For each patient, take the maximum posterior
y_prediction = np.argmax(y_score, axis=0)
y_score = np.max(y_score, axis=0)
elif strategy == 'decisiontree':
# Fit a decision tree on the training set
a = 1
else:
raise ae.WORCValueError(f"{strategy} is not a valid estimation combining strategy! Should be one of [argmax].")
# Compute multilabel performance metrics
y_truth = np.argmax(y_truth, axis=0)
accuracy_temp, sensitivity_temp, specificity_temp, \
precision_temp, npv_temp, f1_score_temp, auc_temp, acc_av_temp = \
metrics.performance_multilabel(y_truth,
y_prediction,
y_score)
print("Truth: " + str(y_truth))
print("Prediction: " + str(y_prediction))
print('AUC: ' + str(auc_temp))
# Append performance to lists for all cross validations
accuracy.append(accuracy_temp)
sensitivity.append(sensitivity_temp)
specificity.append(specificity_temp)
auc.append(auc_temp)
f1_score_list.append(f1_score_temp)
precision.append(precision_temp)
npv.append(npv_temp)
acc_av.append(acc_av_temp)
# Compute confidence intervals
stats = dict()
stats["Accuracy 95%:"] = f"{np.nanmean(accuracy)} {str(compute_confidence(accuracy, N_1, N_2, alpha))}"
stats["Average Accuracy 95%:"] = f"{np.nanmean(acc_av)} {str(compute_confidence(accuracy, N_1, N_2, alpha))}"
stats["AUC 95%:"] = f"{np.nanmean(auc)} {str(compute_confidence(auc, N_1, N_2, alpha))}"
stats["F1-score 95%:"] = f"{np.nanmean(f1_score_list)} {str(compute_confidence(f1_score_list, N_1, N_2, alpha))}"
stats["Precision 95%:"] = f"{np.nanmean(precision)} {str(compute_confidence(precision, N_1, N_2, alpha))}"
stats["NPV 95%:"] = f"{np.nanmean(npv)} {str(compute_confidence(npv, N_1, N_2, alpha))}"
stats["Sensitivity 95%: "] = f"{np.nanmean(sensitivity)} {str(compute_confidence(sensitivity, N_1, N_2, alpha))}"
stats["Specificity 95%:"] = f"{np.nanmean(specificity)} {str(compute_confidence(specificity, N_1, N_2, alpha))}"
# Print all CI's
stats = OrderedDict(sorted(stats.items()))
for k, v in stats.items():
print(f"{k} : {v}.")
return stats
[docs]def main():
if len(sys.argv) == 1:
print("TODO: Put in an example")
elif len(sys.argv) != 4:
raise IOError("This function accepts three arguments!")
else:
prediction = sys.argv[1]
patientinfo = sys.argv[2]
label_type = sys.argv[3]
plot_estimator_performance(prediction, patientinfo, label_type)
if __name__ == '__main__':
main()