Source code for WORC.tools.createfixedsplits

#!/usr/bin/env python

# Copyright 2016-2022 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
#     http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

import numpy as np
from sklearn.model_selection import train_test_split
import WORC.addexceptions as ae
from WORC.processing.label_processing import load_labels
import pandas as pd
import WORC.processing.label_processing as lp


[docs]def createfixedsplits(label_file=None, label_type=None, patient_IDs=None, stratify=True, test_size=0.2, N_iterations=1, modus='singlelabel', output=None): ''' Create fixed splits for a random-split cross-validation. Parameters ---------- label_file : filepath CSV file containing the labels of the patients. label_type: list of strings labels to extracted from the label file, e.g. ['label1'] patient_IDs: list of strings names of patients to take into account. If None, take all stratify: Boolean If True, splits are stratified. In this case, you need to provide label data. test_size: float Percentage of patients in test set per iteration. N_iterations: integer Number of cross-validation iterations modus: str singlelabel or regression. Multilabel not implemented yet. output: filepath csv filename to save output to. Returns ------- df: pandas Dataframe Fixed splits created. ''' # Check whether input is valid if patient_IDs is None: if label_file is not None and label_type is not None: # Read the label file label_data = load_labels(label_file, label_type) patient_IDs = label_data['patient_IDs'] else: raise ae.WORCValueError('Either a label file and label type or patient_IDs need to be provided!') else: if stratify is True: if label_file is not None and label_type is not None: # Extract data for specific patients only label_data, _ = lp.findlabeldata(label_file, label_type, pids=patient_IDs) else: raise ae.WORCValueError('A label file and label type needs to be provided for stratified splitting!') # Create the stratification object if stratify: if modus == 'singlelabel': stratify = label_data['label'][0].tolist() elif modus == 'multilabel': # Create a stratification object from the labels # Label = 0 means no label equals one # Other label numbers refer to the label name that is 1 stratify = list() labels = label_data['label'] for pnum in range(0, len(labels[0])): plabel = 0 for lnum, slabel in enumerate(labels): if slabel[pnum] == 1: plabel = lnum + 1 stratify.append(plabel) else: raise ae.WORCKeyError('{} is not a valid modus!').format(modus) pd_dict = dict() for i in range(N_iterations): print(f'Splitting iteration {i + 1} / {N_iterations}') # Create a random seed for the splitting random_seed = np.random.randint(5000) # Define stratification unique_patient_IDs, unique_indices =\ np.unique(np.asarray(patient_IDs), return_index=True) if modus == 'regression' or not stratify: unique_stratify = None else: unique_stratify = [stratify[i] for i in unique_indices] # Split, throw error when dataset is too small for split ratio's try: unique_PID_train, indices_PID_test\ = train_test_split(unique_patient_IDs, test_size=test_size, random_state=random_seed, stratify=unique_stratify) except ValueError as e: e = str(e) + ' Increase the size of your test set.' raise ae.WORCValueError(e) # Check for all IDs if they are in test or training indices_train = list() indices_test = list() patient_ID_train = list() patient_ID_test = list() for num, pid in enumerate(patient_IDs): if pid in unique_PID_train: indices_train.append(num) # Make sure we get a unique ID if pid in patient_ID_train: n = 1 while str(pid + '_' + str(n)) in patient_ID_train: n += 1 pid = str(pid + '_' + str(n)) patient_ID_train.append(pid) else: indices_test.append(num) # Make sure we get a unique ID if pid in patient_ID_test: n = 1 while str(pid + '_' + str(n)) in patient_ID_test: n += 1 pid = str(pid + '_' + str(n)) patient_ID_test.append(pid) # Add to train object pd_dict[str(i) + '_train'] = patient_ID_train # Test object has to be same length as training object extras = [""]*(len(patient_ID_train) - len(patient_ID_test)) patient_ID_test.extend(extras) pd_dict[str(i) + '_test'] = patient_ID_test # Convert into pandas dataframe for easy use and conversion df = pd.DataFrame(pd_dict) # Write output if required if output is not None: print("Writing Output.") df.to_csv(output) return df
[docs]def test(): patient_IDs = ['HN1004', 'HN1077', 'HN1088', 'HN1146', 'HN1159', 'HN1192', 'HN1259', 'HN1260', 'HN1323', 'HN1331', 'HN1339', 'HN1342', 'HN1372', 'HN1491', 'HN1501', 'HN1519', 'HN1524', 'HN1554', 'HN1560', 'HN1748'] createfixedsplits(label_file=r'C:\Users\Martijn Starmans\Documents\GitHub\WORCTutorial\Data\Examplefiles\pinfo_HN.csv', patient_IDs=patient_IDs, stratify=True, label_type=['imaginary_label_1'], N_iterations=3, output='fixedsplits.csv')
if __name__ == "__main__": test()