#!/usr/bin/env python
# Copyright 2020 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import os
import subprocess
import scipy.io as sio
import WORC.IOparser.file_io as wio
import WORC.IOparser.config_io_combat as cio
import numpy as np
import random
import pandas as pd
from WORC.addexceptions import WORCValueError, WORCKeyError
import tempfile
from sys import platform
from WORC.featureprocessing.VarianceThreshold import selfeat_variance
from sklearn.preprocessing import StandardScaler
from neuroCombat import neuroCombat
import matplotlib
matplotlib.use('agg')
import matplotlib.pyplot as plt
from WORC.featureprocessing.Imputer import Imputer
[docs]def ComBat(features_train_in, labels_train, config, features_train_out,
features_test_in=None, labels_test=None, features_test_out=None,
VarianceThreshold=True, scaler=False, logarithmic=False):
"""
Apply ComBat feature harmonization.
Based on: https://github.com/Jfortin1/ComBatHarmonization
"""
# Load the config
print('############################################################')
print('# Initializing ComBat. #')
print('############################################################\n')
config = cio.load_config(config)
excluded_features = config['ComBat']['excluded_features']
# If mod, than also load moderating labels
if config['ComBat']['mod'][0] == '[]':
label_names = config['ComBat']['batch']
else:
label_names = config['ComBat']['batch'] + config['ComBat']['mod']
# Load the features for both training and testing, match with batch and mod parameters
label_data_train, image_features_train =\
wio.load_features(features_train_in, patientinfo=labels_train,
label_type=label_names)
feature_labels = image_features_train[0][1]
image_features_train = [i[0] for i in image_features_train]
label_data_train['patient_IDs'] = list(label_data_train['patient_IDs'])
# Exclude features
if excluded_features:
print(f'\t Excluding features containing: {excluded_features}')
# Determine indices of excluded features
included_feature_indices = []
excluded_feature_indices = []
for fnum, i in enumerate(feature_labels):
if not any(e in i for e in excluded_features):
included_feature_indices.append(fnum)
else:
excluded_feature_indices.append(fnum)
# Actually exclude the features
image_features_train_combat = [np.asarray(i)[included_feature_indices].tolist() for i in image_features_train]
feature_labels_combat = np.asarray(feature_labels)[included_feature_indices].tolist()
image_features_train_noncombat = [np.asarray(i)[excluded_feature_indices].tolist() for i in image_features_train]
feature_labels_noncombat = np.asarray(feature_labels)[excluded_feature_indices].tolist()
else:
image_features_train_combat = image_features_train
feature_labels_combat = feature_labels.tolist()
image_features_train_noncombat = []
feature_labels_noncombat = []
# Detect NaNs, otherwise first feature imputation is required
if any(np.isnan(a) for a in np.asarray(image_features_train_combat).flatten()):
print('\t [WARNING] NaNs detected, applying median imputation')
imputer = Imputer(missing_values=np.nan, strategy='median')
imputer.fit(image_features_train_combat)
image_features_train_combat = imputer.transform(image_features_train_combat)
else:
imputer = None
# Apply a scaler to the features
if scaler:
print('\t Fitting scaler on dataset.')
scaler = StandardScaler().fit(image_features_train_combat)
image_features_train_combat = scaler.transform(image_features_train_combat)
# Remove features with a constant value
if VarianceThreshold:
print(f'\t Applying variance threshold on dataset.')
image_features_train_combat, feature_labels_combat, VarSel =\
selfeat_variance(image_features_train_combat, np.asarray([feature_labels_combat]))
feature_labels_combat = feature_labels_combat[0].tolist()
if features_test_in:
label_data_test, image_features_test =\
wio.load_features(features_test_in, patientinfo=labels_test,
label_type=label_names)
image_features_test = [i[0] for i in image_features_test]
label_data_test['patient_IDs'] = list(label_data_test['patient_IDs'])
if excluded_features:
image_features_test_combat = [np.asarray(i)[included_feature_indices].tolist() for i in image_features_test]
image_features_test_noncombat = [np.asarray(i)[excluded_feature_indices].tolist() for i in image_features_test]
else:
image_features_test_combat = image_features_test
image_features_test_noncombat = []
# Apply imputation if required
if imputer is not None:
image_features_test_combat = imputer.transform(image_features_test_combat)
# Apply a scaler to the features
if scaler:
image_features_test_combat = scaler.transform(image_features_test_combat)
# Remove features with a constant value
if VarianceThreshold:
image_features_test_combat = VarSel.transform(image_features_test_combat)
all_features = image_features_train_combat.tolist() + image_features_test_combat.tolist()
all_labels = list()
for i in range(label_data_train['label'].shape[0]):
all_labels.append(label_data_train['label'][i, :, 0].tolist() + label_data_test['label'][i, :, 0].tolist())
all_labels = np.asarray(all_labels)
else:
all_features = image_features_train_combat.tolist()
all_labels = label_data_train['label']
# Convert data to a single array
all_features_matrix = np.asarray(all_features)
all_labels = np.squeeze(all_labels)
# Apply logarithm if required
if logarithmic:
print('\t Taking log10 of features before applying ComBat.')
all_features_matrix = np.log10(all_features_matrix)
# Convert all_labels to dictionary
if len(all_labels.shape) == 1:
# No mod variables
all_labels = {label_data_train['label_name'][0]: all_labels}
else:
all_labels = {k: v for k, v in zip(label_data_train['label_name'], all_labels)}
# Split labels in batch and moderation labels
bat = config['ComBat']['batch']
mod = config['ComBat']['mod']
print(f'\t Using batch variable {bat}, mod variables {mod}.')
batch = [all_labels[l] for l in all_labels.keys() if l in config['ComBat']['batch']]
batch = batch[0]
if config['ComBat']['mod'][0] == '[]':
mod = None
else:
mod = [all_labels[l] for l in all_labels.keys() if l in config['ComBat']['mod']]
# Set parameters for output files
parameters = {'batch': config['ComBat']['batch'],
'mod': config['ComBat']['mod'],
'par': config['ComBat']['par']}
name = 'Image features: ComBat corrected'
panda_labels = ['parameters',
'patient',
'feature_values',
'feature_labels']
feature_labels = feature_labels_combat + feature_labels_noncombat
# Convert all inputs to arrays with right shape
all_features_matrix = np.transpose(all_features_matrix)
if mod is not None:
mod = np.transpose(np.asarray(mod))
# Patients identified with batch -1.0 should be skipped
skipname = 'Image features: ComBat skipped'
ntrain = len(image_features_train_combat)
ndel = 0
print(features_test_out)
for bnum, b in enumerate(batch):
bnum -= ndel
if b == -1.0:
if bnum < ntrain - ndel:
# Training patient
print('train')
pid = label_data_train['patient_IDs'][bnum]
out = features_train_out[bnum]
# Combine ComBat and non-ComBat features
feature_values_temp = list(all_features_matrix[:, bnum]) + list(image_features_train_noncombat[bnum])
# Delete patient for later processing
del label_data_train['patient_IDs'][bnum]
del image_features_train_noncombat[bnum]
del features_train_out[bnum]
image_features_train_combat = np.delete(image_features_train_combat, bnum, 0)
else:
# Test patient
print('test')
pid = label_data_test['patient_IDs'][bnum - ntrain]
out = features_test_out[bnum - ntrain]
# Combine ComBat and non-ComBat features
feature_values_temp = list(all_features_matrix[:, bnum]) + list(image_features_test_noncombat[bnum - ntrain])
# Delete patient for later processing
del label_data_test['patient_IDs'][bnum - ntrain]
del image_features_test_noncombat[bnum - ntrain]
del features_test_out[bnum - ntrain]
image_features_test_combat = np.delete(image_features_test_combat, bnum - ntrain, 0)
# Delete some other variables for later processing
all_features_matrix = np.delete(all_features_matrix, bnum, 1)
if mod is not None:
mod = np.delete(mod, bnum, 0)
batch = np.delete(batch, bnum, 0)
# Notify user
print(f'[WARNING] Skipping patient {pid} as batch variable is -1.0.')
# Sort based on feature label
feature_labels_temp, feature_values_temp =\
zip(*sorted(zip(feature_labels, feature_values_temp)))
# Convert to pandas Series and save as hdf5
panda_data = pd.Series([parameters, pid, feature_values_temp,
feature_labels_temp],
index=panda_labels,
name=skipname
)
print(f'\t Saving image features to: {out}.')
panda_data.to_hdf(out, 'image_features')
ndel += 1
print(features_test_out)
# Run ComBat in Matlab
if config['ComBat']['language'] == 'matlab':
print('\t Executing ComBat through Matlab')
data_harmonized = ComBatMatlab(dat=all_features_matrix,
batch=batch,
command=config['ComBat']['matlab'],
mod=mod,
par=config['ComBat']['par'],
per_feature=config['ComBat']['per_feature'])
elif config['ComBat']['language'] == 'python':
print('\t Executing ComBat through neuroComBat in Python')
data_harmonized = ComBatPython(dat=all_features_matrix,
batch=batch,
mod=mod,
eb=config['ComBat']['eb'],
par=config['ComBat']['par'],
per_feature=config['ComBat']['per_feature'])
else:
raise WORCKeyError(f"Language {config['ComBat']['language']} unknown.")
# Convert values back if logarithm was used
if logarithmic:
data_harmonized = 10 ** data_harmonized
# Convert again to train hdf5 files
feature_values_train_combat = [data_harmonized[:, i] for i in range(len(image_features_train_combat))]
for fnum, i_feat in enumerate(feature_values_train_combat):
# Combine ComBat and non-ComBat features
feature_values_temp = i_feat.tolist() + image_features_train_noncombat[fnum]
# Sort based on feature label
feature_labels_temp, feature_values_temp =\
zip(*sorted(zip(feature_labels, feature_values_temp)))
# Convert to pandas Series and save as hdf5
pid = label_data_train['patient_IDs'][fnum]
panda_data = pd.Series([parameters, pid, feature_values_temp,
feature_labels_temp],
index=panda_labels,
name=name
)
print(f'Saving image features to: {features_train_out[fnum]}.')
panda_data.to_hdf(features_train_out[fnum], 'image_features')
# Repeat for testing if required
if features_test_in:
print(len(image_features_test_combat))
print(data_harmonized.shape[1])
feature_values_test_combat = [data_harmonized[:, i] for i in range(data_harmonized.shape[1] - len(image_features_test_combat), data_harmonized.shape[1])]
for fnum, i_feat in enumerate(feature_values_test_combat):
print(fnum)
# Combine ComBat and non-ComBat features
feature_values_temp = i_feat.tolist() + image_features_test_noncombat[fnum]
# Sort based on feature label
feature_labels_temp, feature_values_temp =\
zip(*sorted(zip(feature_labels, feature_values_temp)))
# Convert to pandas Series and save as hdf5
pid = label_data_test['patient_IDs'][fnum]
panda_data = pd.Series([parameters, pid, feature_values_temp,
feature_labels_temp],
index=panda_labels,
name=name
)
print(f'Saving image features to: {features_test_out[fnum]}.')
panda_data.to_hdf(features_test_out[fnum], 'image_features')
[docs]def ComBatPython(dat, batch, mod=None, par=1,
eb=1, per_feature=False, plotting=False):
"""
Run the ComBat Function python script.
par = 0 is non-parametric.
"""
# convert inputs to neuroCombat format.
covars = dict()
categorical_cols = list()
covars['batch'] = batch
if mod is not None:
for i_mod in range(mod.shape[1]):
label = f'mod_{i_mod}'
covars[label] = [m for m in mod[:, i_mod]]
categorical_cols.append(label)
covars = pd.DataFrame(covars)
batch_col = 'batch'
if par == 0:
parametric = False
elif par == 1:
parametric = True
else:
raise WORCValueError(f'Par should be 0 or 1, now {par}.')
if eb == 0:
eb = False
elif eb == 1:
eb = True
else:
raise WORCValueError(f'eb should be 0 or 1, now {eb}.')
if per_feature == 0:
per_feature = False
elif per_feature == 1:
per_feature = True
else:
raise WORCValueError(f'per_feature should be 0 or 1, now {per_feature}.')
# execute ComBat
if not per_feature:
data_harmonized = neuroCombat(dat=dat, covars=covars, batch_col=batch_col,
categorical_cols=categorical_cols,
eb=eb, parametric=parametric)
elif per_feature:
print('\t Executing ComBat per feature.')
data_harmonized = np.zeros(dat.shape)
# Shape: (features, samples)
for i in range(dat.shape[0]):
if eb:
# Copy feature + random noise
random_feature = np.random.rand(dat[i, :].shape[0])
feat_temp = np.asarray([dat[i, :], dat[i, :] + random_feature])
else:
# Just use the single feature
feat_temp = np.asarray([dat[i, :]])
feat_temp = neuroCombat(dat=feat_temp, covars=covars,
batch_col=batch_col,
categorical_cols=categorical_cols,
eb=eb, parametric=parametric)
data_harmonized[i, :] = feat_temp[0, :]
if plotting:
feat1 = dat[i, :]
feat1_harm = data_harmonized[i, :]
print(len(feat1))
feat1_b1 = [f for f, b in zip(feat1, batch[0]) if b == 1.0]
feat1_b2 = [f for f, b in zip(feat1, batch[0]) if b == 2.0]
print(len(feat1_b1))
print(len(feat1_b2))
feat1_harm_b1 = [f for f, b in zip(feat1_harm, batch[0]) if b == 1.0]
feat1_harm_b2 = [f for f, b in zip(feat1_harm, batch[0]) if b == 2.0]
plt.figure()
ax = plt.subplot(2, 1, 1)
ax.scatter(np.ones((len(feat1_b1))), feat1_b1, color='red')
ax.scatter(np.ones((len(feat1_b2))) + 1, feat1_b2, color='blue')
plt.title('Before Combat')
ax = plt.subplot(2, 1, 2)
ax.scatter(np.ones((len(feat1_b1))), feat1_harm_b1, color='red')
ax.scatter(np.ones((len(feat1_b2))) + 1, feat1_harm_b2, color='blue')
plt.title('After Combat')
plt.show()
else:
raise WORCValueError(f'per_feature should be False or True, now {per_feature}.')
return data_harmonized
[docs]def Synthetictest(n_patients=50, n_features=10, par=1, eb=1,
per_feature=False, difscale=False, logarithmic=False,
oddpatient=True, oddfeat=True, samefeat=True):
"""Test for ComBat with Synthetic data."""
features = np.zeros((n_features, n_patients))
batch = list()
# First batch: Gaussian with loc 0, scale 1
for i in range(0, int(n_patients/2)):
feat_temp = [np.random.normal(loc=0.0, scale=1.0) for i in range(n_features)]
if i == 1 and oddpatient:
feat_temp = [np.random.normal(loc=10.0, scale=1.0) for i in range(n_features)]
elif oddfeat:
feat_temp = [np.random.normal(loc=0.0, scale=1.0) for i in range(n_features - 1)] + [np.random.normal(loc=10000.0, scale=1.0)]
if samefeat:
feat_temp[-1] = 1
features[:, i] = feat_temp
batch.append(1)
# Get directions for features
directions = list()
for i in range(n_features):
direction = random.random()
if direction > 0.5:
directions.append(1.0)
else:
directions.append(-1.0)
# First batch: Gaussian with loc 5, scale 1
for i in range(int(n_patients/2), n_patients):
feat_temp = [np.random.normal(loc=direction*5.0, scale=1.0) for i in range(n_features)]
if oddfeat:
feat_temp = [np.random.normal(loc=5.0, scale=1.0) for i in range(n_features - 1)] + [np.random.normal(loc=10000.0, scale=1.0)]
if difscale:
feat_temp = [f + 1000 for f in feat_temp]
feat_temp = np.multiply(feat_temp, directions)
if samefeat:
feat_temp[-1] = 1
features[:, i] = feat_temp
batch.append(2)
# Create mod var
mod = [[np.random.randint(30, 100) for i in range(n_patients)]]
# Apply ComBat
batch = np.asarray([batch])
mod = np.transpose(np.asarray(mod))
if logarithmic:
minfeat = np.min(features)
features = np.log10(features + np.abs(minfeat) + 1E-100)
data_harmonized = ComBatPython(dat=features, batch=batch, mod=mod, par=par,
eb=eb, per_feature=per_feature)
if logarithmic:
data_harmonized = 10 ** data_harmonized - np.abs(minfeat)
for i in range(n_features):
f = plt.figure()
ax = plt.subplot(2, 1, 1)
ax.scatter(np.ones((int(n_patients/2))), features[i, 0:int(n_patients/2)], color='red')
ax.scatter(np.ones((n_patients - int(n_patients/2))) + 1, features[i, int(n_patients/2):], color='blue')
plt.title('Before Combat')
ax = plt.subplot(2, 1, 2)
ax.scatter(np.ones((int(n_patients/2))), data_harmonized[i, 0:int(n_patients/2)], color='red')
ax.scatter(np.ones((n_patients - int(n_patients/2))) + 1, data_harmonized[i, int(n_patients/2):], color='blue')
plt.title('After Combat')
plt.show()
f.savefig(f'combat_par{par}_eb{eb}_perfeat{per_feature}_feat{i}.png')
# Logarithmic: not useful, as we have negative numbers, and (almost) zeros.
# so combat gives unuseful results.
# Same feature twice with eb and par: nans
[docs]def ComBatMatlab(dat, batch, command, mod=None, par=1, per_feature='true'):
"""
Run the ComBat Function Matlab script.
par = 0 is non-parametric.
"""
# Mod: default argument is empty list
if mod is None:
mod = []
# TODO: Add check whether matlab executable is found
# Save the features in a .mat MatLab Compatible format
# NOTE: Should change this_folder to a proper temporary directory
this_folder = os.path.dirname(os.path.realpath(__file__))
tempdir = tempfile.gettempdir()
tempfile_in = os.path.join(tempdir, 'combat_input.mat')
tempfile_out = os.path.join(tempdir, 'combat_output.mat')
ComBatFolder = os.path.join(os.path.dirname(this_folder),
'external',
'ComBatHarmonization',
'Matlab',
'scripts')
dict = {'output': tempfile_out,
'ComBatFolder': ComBatFolder,
'datvar': dat,
'batchvar': batch,
'modvar': mod,
'parvar': par,
'per_feature': per_feature
}
sio.savemat(tempfile_in, dict)
# Make sure there is no tempfile out from the previous run
if os.path.exists(tempfile_out):
os.remove(tempfile_out)
# Run ComBat
currentdir = os.getcwd()
if platform == "linux" or platform == "linux2":
commandseparator = ' ; '
elif platform == "win32":
commandseparator = ' & '
# BIGR Cluster: /cm/shared/apps/matlab/R2015b/bin/matlab
regcommand = ('cd "' + this_folder + '"' + commandseparator +
'"' + command + '" -nodesktop -nosplash -nojvm -r "combatmatlab(' + "'" + str(tempfile_in) + "'" + ')"' +
commandseparator +
'cd "' + currentdir + '"')
print(f'Executing ComBat in Matlab through command: {regcommand}.')
proc = subprocess.Popen(regcommand,
shell=True,
stdin=subprocess.PIPE,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
)
proc.wait()
stdout_value, stderr_value = proc.communicate()
# BUG: Waiting does not work, just wait for output to arrive, either with
# the actual output or an error message
succes = False
while succes is False:
if os.path.exists(tempfile_out):
try:
mat_dict = sio.loadmat(tempfile_out)
try:
data_harmonized = mat_dict['data_harmonized']
succes = True
except KeyError:
try:
message = mat_dict['message']
raise WORCValueError(f'Error in Matlab ComBat execution: {message}.')
except KeyError:
pass
except (sio.matlab.miobase.MatReadError, ValueError):
pass
# Check if expected output file exists
if not os.path.exists(tempfile_out):
raise WORCValueError(f'Error in Matlab ComBat execution: command: {regcommand}, stdout: {stdout_value}, stderr: {stderr_value}')
# Read the output from ComBat
mat_dict = sio.loadmat(tempfile_out)
data_harmonized = mat_dict['data_harmonized']
data_harmonized = np.transpose(data_harmonized)
# Remove temporary files
os.remove(tempfile_out)
os.remove(tempfile_in)
return data_harmonized