#!/usr/bin/env python
# Copyright 2016-2020 Biomedical Imaging Group Rotterdam, Departments of
# Medical Informatics and Radiology, Erasmus MC, Rotterdam, The Netherlands
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import division
from sklearn.metrics import accuracy_score, balanced_accuracy_score
from sklearn.metrics import roc_auc_score
from sklearn.metrics import confusion_matrix
from sklearn.metrics import f1_score
import numpy as np
from sklearn import metrics
from scipy.stats import pearsonr, spearmanr
from sklearn.metrics import make_scorer, average_precision_score
from sklearn.metrics import check_scoring as check_scoring_sklearn
from scipy.linalg import pinv
from imblearn.metrics import geometric_mean_score
[docs]def pairwise_auc(y_truth, y_score, class_i, class_j):
# Filter out the probabilities for class_i and class_j
y_score = [est[class_i] for ref, est in zip(y_truth, y_score) if ref in (class_i, class_j)]
y_truth = [ref for ref in y_truth if ref in (class_i, class_j)]
# Sort the y_truth by the estimated probabilities
sorted_y_truth = [y for x, y in sorted(zip(y_score, y_truth), key=lambda p: p[0])]
# Calculated the sum of ranks for class_i
sum_rank = 0
for index, element in enumerate(sorted_y_truth):
if element == class_i:
sum_rank += index + 1
sum_rank = float(sum_rank)
# Get the counts for class_i and class_j
n_class_i = float(y_truth.count(class_i))
n_class_j = float(y_truth.count(class_j))
# If a class in empty, AUC is 0.0
if n_class_i == 0 or n_class_j == 0:
return 0.0
# Calculate the pairwise AUC
return (sum_rank - (0.5 * n_class_i * (n_class_i + 1))) / (n_class_i * n_class_j)
[docs]def multi_class_auc(y_truth, y_score):
classes = np.unique(y_truth)
# if any(t == 0.0 for t in np.sum(y_score, axis=1)):
# raise ValueError('No AUC is calculated, output probabilities are missing')
pairwise_auc_list = [0.5 * (pairwise_auc(y_truth, y_score, i, j) +
pairwise_auc(y_truth, y_score, j, i)) for i in classes for j in classes if i < j]
c = len(classes)
return (2.0 * sum(pairwise_auc_list)) / (c * (c - 1))
[docs]def multi_class_auc_score(y_truth, y_score):
return make_scorer(multi_class_auc, needs_proba=True)
[docs]def check_scoring(estimator, scoring=None, allow_none=False):
'''
Surrogate for sklearn's check_scoring to enable use of some other
scoring metrics.
'''
if scoring == 'average_precision_weighted':
scorer = make_scorer(average_precision_score, average='weighted', needs_proba=True)
elif scoring == 'gmean':
scorer = make_scorer(geometric_mean_score(), needs_proba=True)
else:
scorer = check_scoring_sklearn(estimator, scoring=scoring)
return scorer
[docs]def check_multimetric_scoring(estimator, scoring=None):
"""Wrapper around sklearn function to enable more scoring options.
Check the scoring parameter in cases when multiple metrics are allowed
Parameters
----------
estimator : sklearn estimator instance
The estimator for which the scoring will be applied.
scoring : string, callable, list/tuple, dict or None, default: None
A single string (see :ref:`scoring_parameter`) or a callable
(see :ref:`scoring`) to evaluate the predictions on the test set.
For evaluating multiple metrics, either give a list of (unique) strings
or a dict with names as keys and callables as values.
NOTE that when using custom scorers, each scorer should return a single
value. Metric functions returning a list/array of values can be wrapped
into multiple scorers that return one value each.
See :ref:`multimetric_grid_search` for an example.
If None the estimator's score method is used.
The return value in that case will be ``{'score': <default_scorer>}``.
If the estimator's score method is not available, a ``TypeError``
is raised.
Returns
-------
scorers_dict : dict
A dict mapping each scorer name to its validated scorer.
is_multimetric : bool
True if scorer is a list/tuple or dict of callables
False if scorer is None/str/callable
"""
if callable(scoring) or scoring is None or isinstance(scoring,
str):
scorers = {"score": check_scoring(estimator, scoring=scoring)}
return scorers, False
else:
err_msg_generic = ("scoring should either be a single string or "
"callable for single metric evaluation or a "
"list/tuple of strings or a dict of scorer name "
"mapped to the callable for multiple metric "
"evaluation. Got %s of type %s"
% (repr(scoring), type(scoring)))
if isinstance(scoring, (list, tuple, set)):
err_msg = ("The list/tuple elements must be unique "
"strings of predefined scorers. ")
invalid = False
try:
keys = set(scoring)
except TypeError:
invalid = True
if invalid:
raise ValueError(err_msg)
if len(keys) != len(scoring):
raise ValueError(err_msg + "Duplicate elements were found in"
" the given list. %r" % repr(scoring))
elif len(keys) > 0:
if not all(isinstance(k, str) for k in keys):
if any(callable(k) for k in keys):
raise ValueError(err_msg +
"One or more of the elements were "
"callables. Use a dict of score name "
"mapped to the scorer callable. "
"Got %r" % repr(scoring))
else:
raise ValueError(err_msg +
"Non-string types were found in "
"the given list. Got %r"
% repr(scoring))
scorers = {scorer: check_scoring(estimator, scoring=scorer)
for scorer in scoring}
else:
raise ValueError(err_msg +
"Empty list was given. %r" % repr(scoring))
elif isinstance(scoring, dict):
keys = set(scoring)
if not all(isinstance(k, str) for k in keys):
raise ValueError("Non-string types were found in the keys of "
"the given dict. scoring=%r" % repr(scoring))
if len(keys) == 0:
raise ValueError("An empty dict was passed. %r"
% repr(scoring))
scorers = {key: check_scoring(estimator, scoring=scorer)
for key, scorer in scoring.items()}
else:
raise ValueError(err_msg_generic)
return scorers, True
[docs]def ICC(M, ICCtype='inter'):
'''
Input:
M is matrix of observations. Rows: patients, columns: observers.
type: ICC type, currently "inter" or "intra".
'''
n, k = M.shape
SStotal = np.var(M, ddof=1) * (n*k - 1)
MSR = np.var(np.mean(M, 1), ddof=1) * k
MSW = np.sum(np.var(M, 1, ddof=1)) / n
MSC = np.var(np.mean(M, 0), ddof=1) * n
MSE = (SStotal - MSR * (n - 1) - MSC * (k -1)) / ((n - 1) * (k - 1))
if ICCtype == 'intra':
r = (MSR - MSW) / (MSR + (k-1)*MSW)
elif ICCtype == 'inter':
r = (MSR - MSE) / (MSR + (k-1)*MSE + k*(MSC-MSE)/n)
else:
raise ValueError('No valid ICC type given.')
return r
[docs]def ICC_anova(Y, ICCtype='inter', more=False):
'''
Adopted from Nipype with a slight alteration to distinguish inter and intra.
the data Y are entered as a 'table' ie subjects are in rows and repeated
measures in columns
One Sample Repeated measure ANOVA
Y = XB + E with X = [FaTor / Subjects]
'''
[nb_subjects, nb_conditions] = Y.shape
dfc = nb_conditions - 1
dfe = (nb_subjects - 1) * dfc
dfr = nb_subjects - 1
# Compute the repeated measure effect
# ------------------------------------
# Sum Square Total
mean_Y = np.mean(Y)
SST = ((Y - mean_Y) ** 2).sum()
# create the design matrix for the different levels
x = np.kron(np.eye(nb_conditions), np.ones((nb_subjects, 1))) # sessions
x0 = np.tile(np.eye(nb_subjects), (nb_conditions, 1)) # subjects
X = np.hstack([x, x0])
# Sum Square Error
predicted_Y = np.dot(np.dot(np.dot(X, pinv(np.dot(X.T, X))), X.T), Y.flatten('F'))
residuals = Y.flatten('F') - predicted_Y
SSE = (residuals ** 2).sum()
residuals.shape = Y.shape
MSE = SSE / dfe
# Sum square session effect - between colums/sessions
SSC = ((np.mean(Y, 0) - mean_Y) ** 2).sum() * nb_subjects
MSC = SSC / dfc / nb_subjects
session_effect_F = MSC / MSE
# Sum Square subject effect - between rows/subjects
SSR = SST - SSC - SSE
MSR = SSR / dfr
# ICC(3,1) = (mean square subjeT - mean square error) / (mean square subjeT + (k-1)*-mean square error)
if ICCtype == 'intra':
ICC = (MSR - MSE) / (MSR + dfc*MSE)
elif ICCtype == 'inter':
ICC = (MSR - MSE) / (MSR + dfc*MSE + nb_conditions*(MSC-MSE)/nb_subjects)
else:
raise ValueError('No valid ICC type given.')
e_var = MSE # variance of error
r_var = (MSR - MSE) / nb_conditions # variance between subjects
if more:
return ICC, r_var, e_var, session_effect_F, dfc, dfe
else:
return ICC