Source code for WORC.plotting.plot_estimator_performance

#!/usr/bin/env python

# Copyright 2016-2021 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import os
import sys
import numpy as np
import pandas as pd
from random import shuffle
from sklearn.base import is_regressor
from collections import OrderedDict
from sklearn.utils import resample
import WORC.addexceptions as ae
from WORC.classification import metrics
import WORC.processing.label_processing as lp
from WORC.plotting.compute_CI import compute_confidence
from WORC.plotting.compute_CI import compute_confidence_bootstrap


[docs]def fit_thresholds(thresholds, estimator, label_type, X_train, Y_train, ensemble, ensemble_scoring): print('Fitting thresholds on validation set') if not hasattr(estimator, 'cv_iter'): cv_iter = list(estimator.cv.split(X_train, Y_train)) estimator.cv_iter = cv_iter p_all = estimator.cv_results_['params'][0] thresholds_low = list() thresholds_high = list() for it, (train, valid) in enumerate(estimator.cv_iter): print(' - iteration {it + 1} / {n_iter}.') # NOTE: Explicitly exclude validation set, elso refit and score # somehow still seems to use it. X_train_temp = [estimator[label_type]['X_train'][i] for i in train] Y_train_temp = [estimator[label_type]['Y_train'][i] for i in train] train_temp = range(0, len(train)) # Refit a SearchCV object with the provided parameters if ensemble: estimator.create_ensemble(X_train_temp, Y_train_temp, method=ensemble, verbose=False, scoring=ensemble_scoring) else: estimator.refit_and_score(X_train_temp, Y_train_temp, p_all, train_temp, train_temp, verbose=False) # Predict and save scores X_train_values = [x[0] for x in X_train] # Throw away labels X_train_values_valid = [X_train_values[i] for i in valid] Y_valid_score_temp = estimator.predict_proba(X_train_values_valid) # Only take the probabilities for the second class Y_valid_score_temp = Y_valid_score_temp[:, 1] # Select thresholds thresholds_low.append(np.percentile(Y_valid_score_temp, thresholds[0]*100.0)) thresholds_high.append(np.percentile(Y_valid_score_temp, thresholds[1]*100.0)) thresholds_val = [np.mean(thresholds_low), np.mean(thresholds_high)] print(f'Thresholds {thresholds} converted to {thresholds_val}.') return thresholds_val
[docs]def compute_statistics(y_truth, y_score, y_prediction, modus, regression): """Compute statistics on predictions.""" if modus == 'singlelabel': # Compute singlelabel performance metrics if not regression: return metrics.performance_singlelabel(y_truth, y_prediction, y_score, regression) else: return metrics.performance_singlelabel(y_truth, y_prediction, y_score, regression) elif modus == 'multilabel': # Convert class objects to single label per patient y_truth_temp = list() y_prediction_temp = list() for yt, yp in zip(y_truth, y_prediction): label = np.where(yt == 1) if len(label) > 1: raise ae.WORCNotImplementedError('Multiclass classification evaluation is not supported in WORC.') y_truth_temp.append(label[0][0]) label = np.where(yp == 1) y_prediction_temp.append(label[0][0]) y_truth = y_truth_temp y_prediction = y_prediction_temp # Compute multilabel performance metrics predictions_multilabel =\ metrics.performance_multilabel(y_truth, y_prediction, y_score) # Compute all single label performance metrics as well n_labels = len(np.unique(y_truth)) for i_label in range(n_labels): y_truth_single = [i == i_label for i in y_truth] y_prediction_single = [i == i_label for i in y_prediction] y_score_single = y_score[:, i_label] predictions_singlelabel_temp =\ metrics.performance_singlelabel(y_truth_single, y_prediction_single, y_score_single, regression) if i_label == 0: predictions_singlelabel =\ [[i] for i in predictions_singlelabel_temp] else: for num, metric in enumerate(predictions_singlelabel_temp): predictions_singlelabel[num].append(metric) output = list(predictions_multilabel) + predictions_singlelabel return output else: raise ae.WORCKeyError('{modus} is not a valid modus!')
[docs]def plot_estimator_performance(prediction, label_data, label_type, crossval_type=None, alpha=0.95, ensemble=None, verbose=True, ensemble_scoring=None, output=None, modus=None, thresholds=None, survival=False, shuffle_estimators=False, bootstrap=None, bootstrap_N=None, overfit_scaler=None, save_memory=True, refit_ensemble=False): """Plot the output of a single estimator, e.g. a SVM. Parameters ---------- prediction: pandas dataframe or string, mandatory output of trainclassifier function, either a pandas dataframe or a HDF5 file label_data: string, mandatory Contains the path referring to a .txt file containing the patient label(s) and value(s) to be used for learning. See the Github Wiki for the format. label_type: string, mandatory Name of the label to extract from the label data to test the estimator on. alpha: float, default 0.95 Significance of confidence intervals. ensemble: False, integer or 'Caruana' Determine whether an ensemble will be created. If so, either provide an integer to determine how many of the top performing classifiers should be in the ensemble, or use the string "Caruana" to use smart ensembling based on Caruana et al. 2004. verbose: boolean, default True Plot intermedate messages. ensemble_scoring: string, default None Metric to be used for evaluating the ensemble. If None, the option set in the prediction object will be used. output: string, default stats Determine which results are put out. If stats, the statistics of the estimator will be returned. If scores, the scores will be returned. thresholds: list of integer(s), default None If None, use default threshold of sklearn (0.5) on posteriors to converge to a binary prediction. If one integer is provided, use that one. If two integers are provided, posterior < thresh[0] = 0, posterior > thresh[1] = 1. Returns ---------- Depending on the output parameters, the following outputs are returned: If output == 'stats': stats: dictionary Contains the confidence intervals of the performance metrics and the number of times each patient was classifier correctly or incorrectly. If output == 'scores': y_truths: list Contains the true label for each object. y_scores: list Contains the score (e.g. posterior) for each object. y_predictions: list Contains the predicted label for each object. pids: list Contains the patient ID/name for each object. """ # Load the prediction object if it's a hdf5 file if type(prediction) is not pd.core.frame.DataFrame: if os.path.isfile(prediction): prediction = pd.read_hdf(prediction) else: raise ae.WORCIOError(('{} is not an existing file!').format(str(prediction))) # Select the estimator from the pandas dataframe to use keys = prediction.keys() if label_type is None: label_type = keys[0] # Load the label data if type(label_data) is not dict: if os.path.isfile(label_data): if type(label_type) is not list: # Singlelabel: convert to list label_type = [[label_type]] elif len(label_type) == 1: label_type = label_type[0].split(', ') label_data = lp.load_labels(label_data, label_type) else: raise ae.WORCValueError(f"Label data {label_data} incorrect: not a dictionary, or file does not exist.") n_labels = len(label_type) patient_IDs = label_data['patient_IDs'] labels = label_data['label'] if type(label_type) is list: # FIXME: Support for multiple label types not supported yet. print('[WORC Warning] Support for multiple label types not supported yet. Taking first label for plot_estimator_performance.') label_type = keys[0] # Extract the estimators, features and labels regression = is_regressor(prediction[label_type]['classifiers'][0].best_estimator_) feature_labels = prediction[label_type]['feature_labels'] # Get some configuration variables if present in the prediction config = prediction[label_type].config if ensemble is None: ensemble = int(config['Ensemble']['Use']) if modus is None: modus = config['Labels']['modus'] if crossval_type is None: crossval_type = config['CrossValidation']['Type'] if bootstrap is None: bootstrap = config['Bootstrap']['Use'] if bootstrap_N is None: bootstrap_N = int(config['Bootstrap']['N_iterations']) if overfit_scaler is None: overfit_scaler = config['Evaluation']['OverfitScaler'] ensemble_metric = config['Ensemble']['Metric'] # Create lists for performance measures if not regression: sensitivity = list() specificity = list() precision = list() npv = list() accuracy = list() bca = list() auc = list() f1_score_list = list() if modus == 'multilabel': acc_av = list() # Also add scoring measures for all single label scores sensitivity_single = [list() for j in range(n_labels)] specificity_single = [list() for j in range(n_labels)] precision_single = [list() for j in range(n_labels)] npv_single = [list() for j in range(n_labels)] accuracy_single = [list() for j in range(n_labels)] bca_single = [list() for j in range(n_labels)] auc_single = [list() for j in range(n_labels)] f1_score_list_single = [list() for j in range(n_labels)] else: r2score = list() MSE = list() coefICC = list() PearsonC = list() PearsonP = list() SpearmanC = list() SpearmanP = list() patient_classification_list = dict() percentages_selected = list() if bootstrap: # To be compatible with flake8, initialize some variables y_truth_all = None y_prediction_all = None y_score_all = None test_patient_IDs_or = None if output in ['scores', 'decision'] or crossval_type == 'LOO': # Keep track of all groundth truths and scores y_truths = list() y_scores = list() y_predictions = list() pids = list() # Extract sample size N_1 = float(len(prediction[label_type]['patient_ID_train'][0])) N_2 = float(len(prediction[label_type]['patient_ID_test'][0])) # Convert tuples to lists if required if type(prediction[label_type]['X_test']) is tuple: prediction[label_type]['X_test'] = list(prediction[label_type]['X_test']) prediction[label_type]['X_train'] = list(prediction[label_type]['X_train']) prediction[label_type]['Y_train'] = list(prediction[label_type]['Y_train']) prediction[label_type]['Y_test'] = list(prediction[label_type]['Y_test']) prediction[label_type]['patient_ID_test'] = list(prediction[label_type]['patient_ID_test']) prediction[label_type]['patient_ID_train'] = list(prediction[label_type]['patient_ID_train']) prediction[label_type]['classifiers'] = list(prediction[label_type]['classifiers']) # Loop over the test sets, which correspond to cross-validation # or bootstrapping iterations n_iter = len(prediction[label_type]['Y_test']) if bootstrap: iterobject = range(0, bootstrap_N) else: iterobject = range(0, n_iter) for i in iterobject: print("\n") if bootstrap: print(f"Bootstrap {i + 1} / {bootstrap_N}.") else: print(f"Cross-validation {i + 1} / {n_iter}.") test_indices = list() # When bootstrapping, there is only a single train/test set. if bootstrap: if i == 0: X_test_temp_or = prediction[label_type]['X_test'][0] X_train_temp = prediction[label_type]['X_train'][0] Y_train_temp = prediction[label_type]['Y_train'][0] Y_test_temp_or = prediction[label_type]['Y_test'][0] test_patient_IDs_or = prediction[label_type]['patient_ID_test'][0] train_patient_IDs = prediction[label_type]['patient_ID_train'][0] fitted_model = prediction[label_type]['classifiers'][0] # Objects required for first iteration test_patient_IDs = test_patient_IDs_or[:] X_test_temp = X_test_temp_or[:] Y_test_temp = Y_test_temp_or[:] else: X_test_temp = prediction[label_type]['X_test'][i] X_train_temp = prediction[label_type]['X_train'][i] Y_train_temp = prediction[label_type]['Y_train'][i] Y_test_temp = prediction[label_type]['Y_test'][i] test_patient_IDs = prediction[label_type]['patient_ID_test'][i] train_patient_IDs = prediction[label_type]['patient_ID_train'][i] fitted_model = prediction[label_type]['classifiers'][i] # Check which patients are in the test set. if output == 'stats' and crossval_type != 'LOO': for i_ID in test_patient_IDs: # Initiate counting how many times a patient is classified correctly if i_ID not in patient_classification_list: patient_classification_list[i_ID] = dict() patient_classification_list[i_ID]['N_test'] = 0 patient_classification_list[i_ID]['N_correct'] = 0 patient_classification_list[i_ID]['N_wrong'] = 0 patient_classification_list[i_ID]['N_test'] += 1 # Check if this is exactly the label of the patient within the label file if i_ID not in patient_IDs: print(f'[WORC WARNING] Patient {i_ID} is not found the patient labels, removing underscore.') i_ID = i_ID.split("_")[0] if i_ID not in patient_IDs: print(f'[WORC WARNING] Did not help, excluding patient {i_ID}.') continue test_indices.append(np.where(patient_IDs == i_ID)[0][0]) # Extract ground truth y_truth = Y_test_temp # If required, shuffle estimators for "Random" ensembling if shuffle_estimators: # Randomly shuffle the estimators print('Shuffling estimators for random ensembling.') shuffle(fitted_model.cv_results_['params']) # If requested, first let the SearchCV object create an ensemble if bootstrap and i > 0: # For bootstrapping, only do this at the first iteration pass elif not fitted_model.ensemble or refit_ensemble: # If required, rank according to generalization score instead of mean_validation_score if ensemble_metric == 'generalization': print('Using generalization score for estimator ranking.') indices = fitted_model.cv_results_['rank_generalization_score'] fitted_model.cv_results_['params'] = [fitted_model.cv_results_['params'][i] for i in indices[::-1]] elif ensemble_metric != 'Default': raise ae.WORCKeyError(f'Metric {ensemble_metric} is not known: use Default or generalization.') # NOTE: Added for backwards compatability if not hasattr(fitted_model, 'cv_iter'): cv_iter = list(fitted_model.cv.split(X_train_temp, Y_train_temp)) fitted_model.cv_iter = cv_iter # Create the ensemble X_train_temp = [(x, feature_labels) for x in X_train_temp] fitted_model.create_ensemble(X_train_temp, Y_train_temp, method=ensemble, verbose=verbose, scoring=ensemble_scoring, overfit_scaler=overfit_scaler) # If bootstrap, generate a bootstrapped sample if bootstrap and i > 0: y_truth, y_prediction, y_score, test_patient_IDs =\ resample(y_truth_all, y_prediction_all, y_score_all, test_patient_IDs_or) else: # Create prediction y_prediction = fitted_model.predict(X_test_temp) if regression: y_score = y_prediction elif modus == 'multilabel': y_score = fitted_model.predict_proba(X_test_temp) else: y_score = fitted_model.predict_proba(X_test_temp)[:, 1] # Create a new binary score based on the thresholds if given if thresholds is not None: if len(thresholds) == 1: y_prediction = y_score >= thresholds[0] elif len(thresholds) == 2: # X_train_temp = [x[0] for x in X_train_temp] y_score_temp = list() y_prediction_temp = list() y_truth_temp = list() test_patient_IDs_temp = list() thresholds_val = fit_thresholds(thresholds, fitted_model, label_type, X_train_temp, Y_train_temp, ensemble, ensemble_scoring) for pnum in range(len(y_score)): if y_score[pnum] <= thresholds_val[0] or y_score[pnum] > thresholds_val[1]: y_score_temp.append(y_score[pnum]) y_prediction_temp.append(y_prediction[pnum]) y_truth_temp.append(y_truth[pnum]) test_patient_IDs_temp.append(test_patient_IDs[pnum]) perc = float(len(y_prediction_temp))/float(len(y_prediction)) percentages_selected.append(perc) print(f"Selected {len(y_prediction_temp)} from {len(y_prediction)} ({perc}%) patients using two thresholds.") y_score = y_score_temp y_prediction = y_prediction_temp y_truth = y_truth_temp test_patient_IDs = test_patient_IDs_temp else: raise ae.WORCValueError(f"Need None, one or two thresholds on the posterior; got {len(thresholds)}.") if crossval_type != 'LOO' and type(y_prediction) is np.ndarray: if y_prediction.shape == 1 or y_prediction.shape[0] == 1: # Convert to list for compatability y_prediction = [y_prediction.tolist()] # If all scores are NaN, the classifier cannot do probabilities, thus # use hard predictions if np.sum(np.isnan(y_score)) == len(y_prediction): print('[WORC Warning] All scores NaN, replacing with prediction.') y_score = y_prediction if bootstrap and i == 0: # Save objects for re-use y_truth_all = y_truth[:] y_prediction_all = y_prediction[:] y_score_all = y_score[:] print("Truth: " + str(y_truth)) print("Prediction: " + str(y_prediction)) print("Score: " + str(y_score)) if output == 'stats' and crossval_type != 'LOO': # Add if patient was classified correctly or not to counting for i_truth, i_predict, i_test_ID in zip(y_truth, y_prediction, test_patient_IDs): if modus == 'multilabel': success = (i_truth == i_predict).all() else: success = i_truth == i_predict if success: patient_classification_list[i_test_ID]['N_correct'] += 1 else: patient_classification_list[i_test_ID]['N_wrong'] += 1 if output in ['decision', 'scores'] or crossval_type == 'LOO': # Output the posteriors y_scores.append(y_score) y_truths.append(y_truth) y_predictions.append(y_prediction) pids.append(test_patient_IDs) elif output == 'stats': # Compute statistics print('Computing performance statistics.') # Compute confusion matrix and use for sensitivity/specificity performances = compute_statistics(y_truth, y_score, y_prediction, modus, regression) # Print AUC to keep you up to date if not regression: if modus == 'singlelabel': accuracy_temp, bca_temp, sensitivity_temp,\ specificity_temp, precision_temp, npv_temp,\ f1_score_temp, auc_temp = performances else: accuracy_temp, sensitivity_temp,\ specificity_temp, precision_temp, npv_temp,\ f1_score_temp, auc_temp, acc_av_temp,\ accuracy_temp_single,\ bca_temp_single, sensitivity_temp_single,\ specificity_temp_single, precision_temp_single,\ npv_temp_single, f1_score_temp_single,\ auc_temp_single = performances print('AUC: ' + str(auc_temp)) # Append performance to lists for all cross validations accuracy.append(accuracy_temp) if modus == 'singlelabel': bca.append(bca_temp) sensitivity.append(sensitivity_temp) specificity.append(specificity_temp) auc.append(auc_temp) f1_score_list.append(f1_score_temp) precision.append(precision_temp) npv.append(npv_temp) if modus == 'multilabel': acc_av.append(acc_av_temp) for j in range(n_labels): accuracy_single[j].append(accuracy_temp_single[j]) bca_single[j].append(bca_temp_single[j]) sensitivity_single[j].append(sensitivity_temp_single[j]) specificity_single[j].append(specificity_temp_single[j]) auc_single[j].append(auc_temp_single[j]) f1_score_list_single[j].append(f1_score_temp_single[j]) precision_single[j].append(precision_temp_single[j]) npv_single[j].append(npv_temp_single[j]) else: r2score_temp, MSE_temp, coefICC_temp, PearsonC_temp,\ PearsonP_temp, SpearmanC_temp,\ SpearmanP_temp = performances print('R2 Score: ' + str(r2score_temp)) r2score.append(r2score_temp) MSE.append(MSE_temp) coefICC.append(coefICC_temp) PearsonC.append(PearsonC_temp) PearsonP.append(PearsonP_temp) SpearmanC.append(SpearmanC_temp) SpearmanP.append(SpearmanP_temp) # Delete some objects to save memory in cross-validtion if not bootstrap and save_memory: del fitted_model, X_test_temp, X_train_temp, Y_train_temp del Y_test_temp, test_patient_IDs, train_patient_IDs prediction[label_type]['X_test'][i] = None prediction[label_type]['X_train'][i] = None prediction[label_type]['Y_train'][i] = None prediction[label_type]['Y_test'][i] = None prediction[label_type]['patient_ID_test'][i] = None prediction[label_type]['patient_ID_train'][i] = None prediction[label_type]['classifiers'][i] = None if output in ['scores', 'decision']: # Return the scores and true values of all patients return y_truths, y_scores, y_predictions, pids elif output == 'stats': if not regression: metric_names_single = ['Accuracy', 'BCA', 'Sensitivity', 'Specificity', 'Precision', 'NPV', 'F1-score', 'AUC'] if modus == 'singlelabel': metric_names = metric_names_single elif modus == 'multilabel': metric_names_multi = ['Accuracy', 'Sensitivity', 'Specificity', 'Precision', 'NPV', 'F1-score', 'AUC', 'Average Accuracy'] metric_names = metric_names_multi + metric_names_single else: # Regression metric_names = ['R2-score', 'MSE', 'ICC', 'PearsonC', 'PearsonP', 'SpearmanC', 'SpearmanP'] # Compute statistics stats = dict() output = dict() all_performances = None if crossval_type == 'LOO': performances = compute_statistics(y_truths, y_scores, y_predictions, modus, regression) # Put all metrics with their names in the statistics dict for k, v in zip(metric_names, performances): stats[k] = str(v) if thresholds is not None: if len(thresholds) == 2: # Compute percentage of patients that was selected stats["Percentage Selected"] = str(percentages_selected[0]) output['Statistics'] = stats else: # Compute alpha confidence intervals (CIs) # FIXME: multilabel performance per single label not included # FIXME: multilabel not working in bootstrap # FIXME: bootstrap not done in regression all_performances = dict() if not regression: if bootstrap: # Compute once for the real test set the performance X_test_temp = prediction[label_type]['X_test'][0] y_truth = prediction[label_type]['Y_test'][0] y_prediction = fitted_model.predict(X_test_temp) y_score = fitted_model.predict_proba(X_test_temp)[:, 1] performances_test =\ metrics.performance_singlelabel(y_truth, y_prediction, y_score, regression) # Aggregate bootstrapped performances performances_bootstrapped =\ [accuracy, bca, sensitivity, specificity, precision, npv, f1_score_list, auc] # Compute confidence intervals for all metrics for p in range(len(metric_names_single)): k = metric_names_single[p] + ' 95%:' perf = performances_bootstrapped[p] perf_test = performances_test[p] stats[k] = f"{perf_test} {str(compute_confidence_bootstrap(perf, perf_test, N_1, alpha))}" all_performances[metric_names_single[p]] = perf else: if modus == 'multilabel': # Multiclass # First gather overall performances performances = [accuracy, sensitivity, specificity, precision, npv, f1_score_list, auc, acc_av] for name, perf in zip(metric_names_multi, performances): all_performances[name] = perf stats[f"{name} 95%:"] = f"{np.nanmean(perf)} {str(compute_confidence(perf, N_1, N_2, alpha))}" # Performance per label performances = [accuracy_single, bca_single, sensitivity_single, specificity_single, auc_single, f1_score_list_single, precision_single, npv_single] print(sensitivity_single) for name, perf in zip(metric_names_single, performances): for nlabel, label in enumerate(label_type.split(',')): all_performances[f"{name}_{label}"] = perf[nlabel] stats[f"{name}_{label} 95%:"] = f"{np.nanmean(perf[nlabel])} {str(compute_confidence(perf, N_1, N_2, alpha))}" else: # Singleclass performances = [accuracy, bca, sensitivity, specificity, precision, npv, f1_score_list, auc] for name, perf in zip(metric_names_single, performances): all_performances[name] = perf stats[f"{name} 95%:"] = f"{np.nanmean(perf)} {str(compute_confidence(perf, N_1, N_2, alpha))}" if thresholds is not None: if len(thresholds) == 2: # Compute percentage of patients that was selected stats["Percentage Selected 95%:"] = f"{np.nanmean(percentages_selected)} {str(compute_confidence(percentages_selected, N_1, N_2, alpha))}" # Extract statistics on how often patients got classified correctly rankings = dict() alwaysright = dict() alwayswrong = dict() percentages = dict() timesintestset = dict() for i_ID in patient_classification_list: percentage_right = patient_classification_list[i_ID]['N_correct'] / float(patient_classification_list[i_ID]['N_test']) # Check if this is exactly the label of the patient within the label file check_id = i_ID if check_id not in patient_IDs: print(f'[WORC WARNING] Patient {check_id} is not found the patient labels, removing underscore.') check_id = check_id.split("_")[0] if check_id not in patient_IDs: print(f'[WORC WARNING] Did not help, excluding patient {check_id}.') continue if check_id in patient_IDs: label = labels[0][np.where(check_id == patient_IDs)] else: # Multiple instance of one patient label = labels[0][np.where(check_id.split('_')[0] == patient_IDs)] label = label[0][0] percentages[i_ID] = str(label) + ': ' + str(round(percentage_right, 2) * 100) + '%' if percentage_right == 1.0: alwaysright[i_ID] = label print(f"Always Right: {i_ID}, label {label}.") elif percentage_right == 0: alwayswrong[i_ID] = label print(f"Always Wrong: {i_ID}, label {label}.") timesintestset[i_ID] = patient_classification_list[i_ID]['N_test'] rankings["Always right"] = alwaysright rankings["Always wrong"] = alwayswrong rankings['Percentages'] = percentages rankings['timesintestset'] = timesintestset output['Rankings'] = rankings else: # Regression names = ['R2-score', 'MSE', 'ICC', 'PearsonC', 'PearsonP' 'SpearmanC', 'SpearmanP'] performances = [r2score, MSE, coefICC, PearsonC, PearsonP, SpearmanC, SpearmanP] for name, perf in zip(names, performances): all_performances[name] = perf stats['R2-score 95%: '] = f"{np.nanmean(r2score)} {str(compute_confidence(r2score, N_1, N_2, alpha))}" stats['MSE 95%: '] = f"{np.nanmean(MSE)} {str(compute_confidence(MSE, N_1, N_2, alpha))}" stats['ICC 95%: '] = f"{np.nanmean(coefICC)} {str(compute_confidence(coefICC, N_1, N_2, alpha))}" stats['PearsonC 95%: '] = f"{np.nanmean(PearsonC)} {str(compute_confidence(PearsonC, N_1, N_2, alpha))}" stats['PearsonP 95%: '] = f"{np.nanmean(PearsonP)} {str(compute_confidence(PearsonP, N_1, N_2, alpha))}" stats['SpearmanC 95%: '] = f"{np.nanmean(SpearmanC)} {str(compute_confidence(SpearmanC, N_1, N_2, alpha))}" stats['SpearmanP 95%: '] = f"{np.nanmean(SpearmanP)} {str(compute_confidence(SpearmanP, N_1, N_2, alpha))}" # Print all CI's and add to output stats = OrderedDict(sorted(stats.items())) for k, v in stats.items(): print(f"{k} : {v}.") output['Statistics'] = stats if all_performances is not None: output['All_performances'] = all_performances return output
[docs]def combine_multiple_estimators(predictions, label_data, N_1, N_2, multilabel_type, label_types, ensemble=1, strategy='argmax', alpha=0.95): ''' Combine multiple estimators in a single model. Note: the multilabel_type labels should correspond to the ordering in label_types. Hence, if multilabel_type = 0, the prediction is label_type[0] etc. ''' # Load the multilabel label data label_data = lp.load_labels(label_data, multilabel_type) patient_IDs = label_data['patient_IDs'] labels = label_data['label'] # Initialize some objects y_truths = list() y_scores = list() y_predictions = list() pids = list() y_truths_train = list() y_scores_train = list() y_predictions_train = list() pids_train = list() accuracy = list() sensitivity = list() specificity = list() auc = list() f1_score_list = list() precision = list() npv = list() acc_av = list() # Extract all the predictions from the estimators for prediction, label_type in zip(predictions, label_types): y_truth, y_score, y_prediction, pid,\ y_truth_train, y_score_train, y_prediction_train, pid_train =\ plot_estimator_performance(prediction, label_data, label_type, ensemble=ensemble, output='allscores') y_truths.append(y_truth) y_scores.append(y_score) y_predictions.append(y_prediction) pids.append(pid) y_truths_train.append(y_truth_train) y_scores_train.append(y_score_train) y_predictions_train.append(y_prediction_train) pids_train.append(pid_train) # Combine the predictions for i_crossval in range(0, len(y_truths[0])): # Extract all values for this cross validation iteration from all objects y_truth = [t[i_crossval] for t in y_truths] y_score = [t[i_crossval] for t in y_scores] pid = [t[i_crossval] for t in pids] if strategy == 'argmax': # For each patient, take the maximum posterior y_prediction = np.argmax(y_score, axis=0) y_score = np.max(y_score, axis=0) elif strategy == 'decisiontree': # Fit a decision tree on the training set a = 1 else: raise ae.WORCValueError(f"{strategy} is not a valid estimation combining strategy! Should be one of [argmax].") # Compute multilabel performance metrics y_truth = np.argmax(y_truth, axis=0) accuracy_temp, sensitivity_temp, specificity_temp, \ precision_temp, npv_temp, f1_score_temp, auc_temp, acc_av_temp = \ metrics.performance_multilabel(y_truth, y_prediction, y_score) print("Truth: " + str(y_truth)) print("Prediction: " + str(y_prediction)) print('AUC: ' + str(auc_temp)) # Append performance to lists for all cross validations accuracy.append(accuracy_temp) sensitivity.append(sensitivity_temp) specificity.append(specificity_temp) auc.append(auc_temp) f1_score_list.append(f1_score_temp) precision.append(precision_temp) npv.append(npv_temp) acc_av.append(acc_av_temp) # Compute confidence intervals stats = dict() stats["Accuracy 95%:"] = f"{np.nanmean(accuracy)} {str(compute_confidence(accuracy, N_1, N_2, alpha))}" stats["Average Accuracy 95%:"] = f"{np.nanmean(acc_av)} {str(compute_confidence(accuracy, N_1, N_2, alpha))}" stats["AUC 95%:"] = f"{np.nanmean(auc)} {str(compute_confidence(auc, N_1, N_2, alpha))}" stats["F1-score 95%:"] = f"{np.nanmean(f1_score_list)} {str(compute_confidence(f1_score_list, N_1, N_2, alpha))}" stats["Precision 95%:"] = f"{np.nanmean(precision)} {str(compute_confidence(precision, N_1, N_2, alpha))}" stats["NPV 95%:"] = f"{np.nanmean(npv)} {str(compute_confidence(npv, N_1, N_2, alpha))}" stats["Sensitivity 95%: "] = f"{np.nanmean(sensitivity)} {str(compute_confidence(sensitivity, N_1, N_2, alpha))}" stats["Specificity 95%:"] = f"{np.nanmean(specificity)} {str(compute_confidence(specificity, N_1, N_2, alpha))}" # Print all CI's stats = OrderedDict(sorted(stats.items())) for k, v in stats.items(): print(f"{k} : {v}.") return stats
[docs]def main(): if len(sys.argv) == 1: print("TODO: Put in an example") elif len(sys.argv) != 4: raise IOError("This function accepts three arguments!") else: prediction = sys.argv[1] patientinfo = sys.argv[2] label_type = sys.argv[3] plot_estimator_performance(prediction, patientinfo, label_type)
if __name__ == '__main__': main()