Source code for WORC.plotting.plot_ranked_scores

#!/usr/bin/env python

# Copyright 2016-2021 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import argparse
import pandas as pd
from WORC.plotting.plot_estimator_performance import plot_estimator_performance
import WORC.processing.label_processing as lp
import glob
import numpy as np
import csv
import os
from WORC.plotting import plot_images as pi
import SimpleITK as sitk
from WORC.addexceptions import WORCKeyError
import zipfile
from sys import platform


# NOTE: Need to add thresholds of plot_ranked_images to arguments


[docs]def main(): parser = argparse.ArgumentParser(description='Plot distances to hyperplane') parser.add_argument('-estimator', '--estimator', metavar='est', nargs='+', dest='est', type=str, required=True, help='Path to HDF5 file containing a fitted estimator.') parser.add_argument('-pinfo', '--pinfo', metavar='pinfo', nargs='+', dest='pinfo', type=str, required=True, help='Patient Info File (txt)') parser.add_argument('-images', '--images', metavar='images', nargs='+', dest='ims', type=str, required=True, help='Images of patients (ITK Image files)') parser.add_argument('-segmentations', '--segmentations', metavar='segmentations', nargs='+', dest='segs', type=str, required=True, help='Segmentations of patients (ITK Image files)') parser.add_argument('-ensemble_method', '--ensemble_method', metavar='ensemble_method', nargs='+', dest='ens_method', type=str, required=True, help='Method to be used for ensembling (string)') parser.add_argument('-ensemble_size', '--ensemble_size', metavar='ensemble_size', nargs='+', dest='ens_size', type=str, required=True, help='If ensembling method is top_N, size to be used (int)') parser.add_argument('-label_type', '--label_type', metavar='label_type', nargs='+', dest='label_type', type=str, required=True, help='Label name that is predicted by estimator (string)') parser.add_argument('-scores', '--scores', metavar='scores', nargs='+', dest='scores', type=str, required=True, help='Type of scoring: percentages or posteriors (string)') parser.add_argument('-output_csv', '--output_csv', metavar='output_csv', nargs='+', dest='output_csv', type=str, required=True, help='Output file for scores (CSV).') parser.add_argument('-output_zip', '--output_zip', metavar='output_zip', nargs='+', dest='output_zip', type=str, required=True, help='Output file for images (zip).') args = parser.parse_args() # convert inputs that should be single arguments to lists pinfo = args.pinfo if type(pinfo) is list: pinfo = ''.join(pinfo) estimator = args.est if type(estimator) is list: estimator = ''.join(estimator) ensemble_method = args.ens_method if type(ensemble_method) is list: ensemble_method = ''.join(ensemble_method) ensemble_size = args.ens_size if type(ensemble_size) is list: ensemble_size = int(ensemble_size[0]) label_type = args.label_type if type(label_type) is list: label_type = ''.join(label_type) scores = args.scores if type(scores) is list: scores = ''.join(scores) output_csv = args.output_csv if type(output_csv) is list: output_csv = ''.join(output_csv) output_zip = args.output_zip if type(output_zip) is list: output_zip = ''.join(output_zip) plot_ranked_scores(estimator=estimator, pinfo=pinfo, label_type=label_type, scores=scores, images=args.ims, segmentations=args.segs, ensemble_method=ensemble_method, ensemble_size=ensemble_size, output_csv=output_csv, output_zip=output_zip)
[docs]def flatten_object(input): """Flatten various objects to a 1D list.""" out = [] for x in input: if isinstance(x, (list, pd.core.series.Series, np.ndarray)): out += flatten_object(x) else: out.append(x) return out
[docs]def plot_ranked_percentages(estimator, pinfo, label_type=None, ensemble_method='top_N', ensemble_size=100, output_csv=None): # Read the inputs prediction = pd.read_hdf(estimator) # Determine the predicted score per patient print('Determining score per patient.') stats =\ plot_estimator_performance(prediction, pinfo, [label_type], alpha=0.95, ensemble_method=ensemble_method, ensemble_size=ensemble_size, output='stats') percentages = stats['Rankings']['Percentages'] ranking = np.argsort(list(percentages.values())) ranked_percentages_temp = [list(percentages.values())[r] for r in ranking] ranked_PIDs = [list(percentages.keys())[r] for r in ranking] ranked_percentages = list() ranked_truths = list() for r in ranked_percentages_temp: id = r.index(':') ranked_truths.append(float(r[0:id])) ranked_percentages.append(float(r[id+2:-1])) # Write output to csv if output_csv is not None: print("Writing output scores to CSV.") header = ['PatientID', 'TrueLabel', 'Percentage'] with open(output_csv, 'w') as csv_file: writer = csv.writer(csv_file) writer.writerow(header) for pid, truth, perc in zip(ranked_PIDs, ranked_truths, ranked_percentages): towrite = [str(pid), str(truth), str(perc)] writer.writerow(towrite) return ranked_percentages, ranked_truths, ranked_PIDs
[docs]def plot_ranked_images(pinfo, label_type, images, segmentations, ranked_truths, ranked_scores, ranked_PIDs, output_zip=None, output_itk=None, zoomfactor=4, scores='percentages'): # Match the images to the label data print('Matching image and segmentation data to labels.') label_data, images =\ lp.findlabeldata(pinfo, [label_type], images, objects=images) _, segmentations =\ lp.findlabeldata(pinfo, [label_type], segmentations, objects=segmentations) PIDs_images = label_data['patient_IDs'].tolist() PIDs_images = [i.lower() for i in PIDs_images] del label_data # Order the images and segmentations in the scores ordering ordering = list() for pid in ranked_PIDs: if pid.lower() in PIDs_images: ordering.append(PIDs_images.index(pid)) else: print(f'[WORC Warning] Patient {pid} not in images list!') PIDs_images = [PIDs_images[i] for i in ordering] images = [images[i] for i in ordering] segmentations = [segmentations[i] for i in ordering] # Print the middle segmented slice from each patient based on ranking print('Print the middle segmented slice from each patient based on ranking.') if output_zip is not None: zipf = zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) if output_itk is not None: # Determine spacing factor print("Determining spacings factor.") spacings_x = list() spacings_y = list() for idx, im in enumerate(images): print(('Processing patient {} / {}: {}.').format(str(idx + 1), str(len(images)), PIDs_images[idx])) im = sitk.ReadImage(im) spacings_x.append(im.GetSpacing()[0]) spacings_y.append(im.GetSpacing()[1]) # NOTE: Used in future feature resample = [min(spacings_x), min(spacings_y)] for idx in range(0, len(images)): print(('Processing patient {} / {}: {}.').format(str(idx + 1), str(len(images)), PIDs_images[idx])) im = sitk.ReadImage(images[idx]) seg = sitk.ReadImage(segmentations[idx]) pid = PIDs_images[idx] score = ranked_scores[idx] if scores == 'percentages': score = abs(int(score)) fname = str(score) + '_' + pid + '_TrueLabel_' + str(ranked_truths[idx]) + '_slice.png' if 'win' in platform: # Windows has issues with sorting based on '-', so replace with # 'min' if float(ranked_scores[idx]) < 0: fname = 'min' + fname[1:] if output_zip is not None: output_name = os.path.join(os.path.dirname(output_zip), fname) output_name_zoom = os.path.join(os.path.dirname(output_zip), 'zoom' + str(zoomfactor) + '_' + fname) else: output_name = None output_name_zoom = None imslice, maskslice = pi.slicer(image=im, mask=seg, output_name=output_name, output_name_zoom=output_name_zoom) if output_zip is not None: # Print PNGs and comine in ZIP zipf.write(output_name, fname) zipf.write(output_name_zoom, 'zoom_' + fname) os.remove(output_name) os.remove(output_name_zoom) if output_itk is not None: # Combine slices in 3D image print('WIP') del im, seg, imslice, maskslice
[docs]def plot_ranked_posteriors(estimator, pinfo, label_type=None, ensemble_method='top_N', ensemble_size=100, output_csv=None): # Read the inputs prediction = pd.read_hdf(estimator) if label_type is None: # Assume we want to have the first key label_type = prediction.keys()[0] # Determine the predicted score per patient print('Determining posterior per patient.') y_truths, y_scores, y_predictions, PIDs_scores =\ plot_estimator_performance(prediction, pinfo, [label_type], alpha=0.95, ensemble_method=ensemble_method, ensemble_size=ensemble_size, output='scores') # Extract all scores for each patient print('Aggregating scores per patient over all crossval iterations.') scores = dict() truths = dict() def aggregate_scores(y_truths_in, y_scores_in, PIDs_scores_in): y_truths_flat = flatten_object(y_truths_in) y_scores_flat = flatten_object(y_scores_in) PIDs_scores_flat = flatten_object(PIDs_scores_in) for yt, ys, pid in zip(y_truths_flat, y_scores_flat, PIDs_scores_flat): if pid not in scores.keys(): # No scores yet for patient, create list scores[pid] = list() truths[pid] = yt scores[pid].append(ys) # Take the mean for each patient and rank them scores_means = {pid: np.mean(scores[pid]) for pid in scores.keys()} # Rank according to mean scores ranking = np.argsort(list(scores_means.values())) ranked_PIDs = [list(scores_means.keys())[r] for r in ranking] ranked_mean_scores = [scores_means[r] for r in ranked_PIDs] ranked_scores = [scores[r] for r in ranked_PIDs] ranked_truths = [truths[r] for r in ranked_PIDs] return ranked_PIDs, ranked_truths, ranked_mean_scores, ranked_scores # Gather ground truth for each pid pid_truths = dict() for y, p in zip(y_truths, PIDs_scores): for k, v in zip(p, y): pid_truths[k] = v if len(label_type.split(',')) != 1: # Multiclass ranked_PIDs = dict() ranked_truths = dict() ranked_mean_scores = dict() ranked_scores = dict() total_scores = list() means = list() for lnum, label in enumerate(label_type.split(',')): # Select only values for this label y_truths_thislabel = np.asarray(y_truths)[:, :, lnum] y_scores_thislabel = np.asarray(y_scores)[:, :, lnum] # Rank the patients and scores ranked_PIDs_label, ranked_truths_label, ranked_mean_scores_label, ranked_scores_label =\ aggregate_scores(y_truths_thislabel, y_scores_thislabel, PIDs_scores) ranked_PIDs[label] = ranked_PIDs_label ranked_truths[label] = ranked_truths_label ranked_mean_scores[label] = ranked_mean_scores_label ranked_scores[label] = ranked_scores_label means.append(f"Mean_{label}") total_scores.extend([f"Score_{label}_{i}" for i in range(max([len(score) for score in ranked_scores_label]))]) # Write output to csv unique_pids = list(set(flatten_object(PIDs_scores))) # FIXME: bug in scores, so only give the means if output_csv is not None: print("Writing output scores to CSV.") header = ['PatientID', 'TrueLabel', 'Predicted'] + means #+ total_scores with open(output_csv, 'w') as csv_file: writer = csv.writer(csv_file) writer.writerow(header) for pid in unique_pids: pid_means = list() pid_truth = pid_truths[pid] pid_scores = list() for lnum, label in enumerate(label_type.split(',')): p_index = ranked_PIDs[label].index(pid) pid_means.append(ranked_mean_scores[label][p_index]) pid_scores.extend(ranked_scores[label][p_index]) towrite = [pid, str(pid_truth), np.argmax(pid_means)] + pid_means #+ pid_scores writer.writerow(towrite) else: # Single Label ranked_PIDs, ranked_truths, ranked_mean_scores, ranked_scores =\ aggregate_scores(y_truths, y_scores, PIDs_scores) # Write output to csv maxlen = max([len(score) for score in scores.values()]) if output_csv is not None: print("Writing output scores to CSV.") header = ['PatientID', 'TrueLabel', 'Probability'] for i in range(0, maxlen): header.append('Score' + str(i+1)) with open(output_csv, 'w') as csv_file: writer = csv.writer(csv_file) writer.writerow(header) for pid, truth, smean, scores in zip(ranked_PIDs, ranked_truths, ranked_mean_scores, ranked_scores): towrite = [str(pid), str(truth), str(smean)] for s in scores: towrite.append(str(s)) writer.writerow(towrite) return ranked_mean_scores, ranked_truths, ranked_PIDs
[docs]def plot_ranked_scores(estimator, pinfo, label_type, scores='percentages', images=[], segmentations=[], ensemble_method='top_N', ensemble_size=100, output_csv=None, output_zip=None, output_itk=None): ''' Rank the patients according to their average score. The score can either be the average posterior or the percentage of times the patient was classified correctly in the cross validations. Additionally, the middle slice of each patient is plot and saved according to the ranking. Parameters ---------- estimator: filepath, mandatory Path pointing to the .hdf5 file which was is the output of the trainclassifier function. pinfo: filepath, mandatory Path pointint to the .txt file which contains the patient label information. label_type: string, default None The name of the label predicted by the estimator. If None, the first label from the prediction file will be used. scores: string, default percentages Type of scoring to be used. Either 'posteriors' or 'percentages'. images: list, optional List containing the filepaths to the ITKImage image files of the patients. segmentations: list, optional List containing the filepaths to the ITKImage segmentation files of the patients. ensemble_method: string, optional Method to be used for ensembling. ensemble_size: int, optional If top_N method is used, number of workflows to be included in ensemble. output_csv: filepath, optional If given, the scores will be written to this csv file. output_zip: filepath, optional If given, the images will be plotted and the pngs saved to this zip file. output_itk: filepath, optional WIP ''' prediction = pd.read_hdf(estimator) if label_type is None: # Assume we want to have the first key label_type = prediction.keys()[0] if scores == 'posteriors': ranked_scores, ranked_truths, ranked_PIDs =\ plot_ranked_posteriors(estimator=estimator, pinfo=pinfo, label_type=label_type, ensemble_method=ensemble_method, ensemble_size=ensemble_size, output_csv=output_csv) elif scores == 'percentages': if prediction[prediction.keys()[0]].config['CrossValidation']['Type'] == 'LOO': print('Cannot rank percentages for LOO, returning dummies.') ranked_scores = ranked_truths = ranked_PIDs = [] with open(output_csv, 'w') as csv_file: writer = csv.writer(csv_file) writer.writerow(['LOO: Cannot rank percentages.']) else: ranked_scores, ranked_truths, ranked_PIDs =\ plot_ranked_percentages(estimator=estimator, pinfo=pinfo, label_type=label_type, ensemble_method=ensemble_method, ensemble_size=ensemble_size, output_csv=output_csv) else: message = ('{} is not a valid scoring method!').format(str(scores)) raise WORCKeyError(message) if output_zip is not None or output_itk is not None: # FIXME: check for multilabel by checking type if isinstance(ranked_scores, list): if images: # Rerank the scores split per ground truth class: negative for 0, positive for 1 ranked_scores_temp = list() for l, p in zip(ranked_truths, ranked_scores): if l == 0: ranked_scores_temp.append(-p) else: ranked_scores_temp.append(p) ranked_scores = ranked_scores_temp ranking = np.argsort(ranked_scores) ranked_scores = [ranked_scores[r] for r in ranking] ranked_truths = [ranked_truths[r] for r in ranking] ranked_PIDs = [ranked_PIDs[r] for r in ranking] # Convert to lower to later on overcome matching errors ranked_PIDs = [i.lower() for i in ranked_PIDs] # Plot the ranked images plot_ranked_images(pinfo=pinfo, label_type=label_type, images=images, segmentations=segmentations, ranked_truths=ranked_truths, ranked_scores=ranked_scores, ranked_PIDs=ranked_PIDs, output_zip=output_zip, output_itk=output_itk, scores=scores) else: # Make dummy if output_zip is not None: zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED, allowZip64=True) else: # Make dummy if output_zip is not None: zipfile.ZipFile(output_zip, 'w', zipfile.ZIP_DEFLATED, allowZip64=True)
if __name__ == '__main__': main()