#!/usr/bin/env python # This file contains functions for evaluating algorithms for the 2020 PhysioNet/ # Computing in Cardiology Challenge. You can run it as follows: # # python evaluate_12ECG_score.py labels outputs scores.csv # # where 'labels' is a directory containing files with the labels, 'outputs' is a # directory containing files with the outputs from your model, and 'scores.csv' # (optional) is a collection of scores for the algorithm outputs. # # Each file of labels or outputs must have the format described on the Challenge # webpage. The scores for the algorithm outputs include the area under the # receiver-operating characteristic curve (AUROC), the area under the recall- # precision curve (AUPRC), accuracy (fraction of correct recordings), macro F- # measure, and the Challenge metric, which assigns different weights to # different misclassification errors. import numpy as np, os, os.path, sys def evaluate_12ECG_score(label_directory, output_directory): # Define the weights, the SNOMED CT code for the normal class, and equivalent SNOMED CT codes. weights_file = 'weights.csv' normal_class = '426783006' equivalent_classes = [['713427006', '59118001'], ['284470004', '63593006'], ['427172004', '17338001']] # Find the label and output files. print('Finding label and output files...') label_files, output_files = find_challenge_files(label_directory, output_directory) # Load the labels and outputs. print('Loading labels and outputs...') label_classes, labels = load_labels(label_files, normal_class, equivalent_classes) output_classes, binary_outputs, scalar_outputs = load_outputs(output_files, normal_class, equivalent_classes) # Organize/sort the labels and outputs. print('Organizing labels and outputs...') classes, labels, binary_outputs, scalar_outputs = organize_labels_outputs(label_classes, output_classes, labels, binary_outputs, scalar_outputs) # Load the weights for the Challenge metric. print('Loading weights...') weights = load_weights(weights_file, classes) # Only consider classes that are scored with the Challenge metric. indices = np.any(weights, axis=0) # Find indices of classes in weight matrix. classes = [x for i, x in enumerate(classes) if indices[i]] labels = labels[:, indices] scalar_outputs = scalar_outputs[:, indices] binary_outputs = binary_outputs[:, indices] weights = weights[np.ix_(indices, indices)] # Evaluate the model by comparing the labels and outputs. print('Evaluating model...') print('- AUROC and AUPRC...') auroc, auprc = compute_auc(labels, scalar_outputs) print('- Accuracy...') accuracy = compute_accuracy(labels, binary_outputs) print('- F-measure...') f_measure = compute_f_measure(labels, binary_outputs) print('- F-beta and G-beta measures...') f_beta_measure, g_beta_measure = compute_beta_measures(labels, binary_outputs, beta=2) print('- Challenge metric...') challenge_metric = compute_challenge_metric(weights, labels, binary_outputs, classes, normal_class) print('Done.') # Return the results. return auroc, auprc, accuracy, f_measure, f_beta_measure, g_beta_measure, challenge_metric # Check if the input is a number. def is_number(x): try: float(x) return True except ValueError: return False # Find Challenge files. def find_challenge_files(label_directory, output_directory): label_files = list() output_files = list() for f in sorted(os.listdir(label_directory)): F = os.path.join(label_directory, f) # Full path for label file if os.path.isfile(F) and F.lower().endswith('.hea') and not f.lower().startswith('.'): root, ext = os.path.splitext(f) g = root + '.csv' G = os.path.join(output_directory, g) # Full path for corresponding output file if os.path.isfile(G): label_files.append(F) output_files.append(G) else: raise IOError('Output file {} not found for label file {}.'.format(g, f)) if label_files and output_files: return label_files, output_files else: raise IOError('No label or output files found.') # Load labels from header/label files. def load_labels(label_files, normal_class, equivalent_classes_collection): # The labels should have the following form: # # Dx: label_1, label_2, label_3 # num_recordings = len(label_files) # Load diagnoses. tmp_labels = list() for i in range(num_recordings): with open(label_files[i], 'r') as f: for l in f: if l.startswith('#Dx'): dxs = set(arr.strip() for arr in l.split(': ')[1].split(',')) tmp_labels.append(dxs) # Identify classes. classes = set.union(*map(set, tmp_labels)) if normal_class not in classes: classes.add(normal_class) print('- The normal class {} is not one of the label classes, so it has been automatically added, but please check that you chose the correct normal class.'.format(normal_class)) classes = sorted(classes) num_classes = len(classes) # Use one-hot encoding for labels. labels = np.zeros((num_recordings, num_classes), dtype=np.bool_) for i in range(num_recordings): dxs = tmp_labels[i] for dx in dxs: j = classes.index(dx) labels[i, j] = 1 # For each set of equivalent class, use only one class as the representative class for the set and discard the other classes in the set. # The label for the representative class is positive if any of the labels in the set is positive. remove_classes = list() remove_indices = list() for equivalent_classes in equivalent_classes_collection: equivalent_classes = [x for x in equivalent_classes if x in classes] if len(equivalent_classes)>1: representative_class = equivalent_classes[0] other_classes = equivalent_classes[1:] equivalent_indices = [classes.index(x) for x in equivalent_classes] representative_index = equivalent_indices[0] other_indices = equivalent_indices[1:] labels[:, representative_index] = np.any(labels[:, equivalent_indices], axis=1) remove_classes += other_classes remove_indices += other_indices for x in remove_classes: classes.remove(x) labels = np.delete(labels, remove_indices, axis=1) # If the labels are negative for all classes, then change the label for the normal class to positive. normal_index = classes.index(normal_class) for i in range(num_recordings): num_positive_classes = np.sum(labels[i, :]) if num_positive_classes==0: labels[i, normal_index] = 1 return classes, labels # Load outputs from output files. def load_outputs(output_files, normal_class, equivalent_classes_collection): # The outputs should have the following form: # # diagnosis_1, diagnosis_2, diagnosis_3 # 0, 1, 1 # 0.12, 0.34, 0.56 # num_recordings = len(output_files) tmp_labels = list() tmp_binary_outputs = list() tmp_scalar_outputs = list() for i in range(num_recordings): with open(output_files[i], 'r') as f: for j, l in enumerate(f): arrs = [arr.strip() for arr in l.split(',')] if j==1: row = arrs tmp_labels.append(row) elif j==2: row = list() for arr in arrs: number = 1 if arr in ('1', 'True', 'true', 'T', 't') else 0 row.append(number) tmp_binary_outputs.append(row) elif j==3: row = list() for arr in arrs: number = float(arr) if is_number(arr) else 0 row.append(number) tmp_scalar_outputs.append(row) # Identify classes. classes = set.union(*map(set, tmp_labels)) if normal_class not in classes: classes.add(normal_class) print('- The normal class {} is not one of the output classes, so it has been automatically added, but please check that you identified the correct normal class.'.format(normal_class)) classes = sorted(classes) num_classes = len(classes) # Use one-hot encoding for binary outputs and the same order for scalar outputs. binary_outputs = np.zeros((num_recordings, num_classes), dtype=np.bool_) scalar_outputs = np.zeros((num_recordings, num_classes), dtype=np.float64) for i in range(num_recordings): dxs = tmp_labels[i] for k, dx in enumerate(dxs): j = classes.index(dx) binary_outputs[i, j] = tmp_binary_outputs[i][k] scalar_outputs[i, j] = tmp_scalar_outputs[i][k] # For each set of equivalent class, use only one class as the representative class for the set and discard the other classes in the set. # The binary output for the representative class is positive if any of the classes in the set is positive. # The scalar output is the mean of the scalar outputs for the classes in the set. remove_classes = list() remove_indices = list() for equivalent_classes in equivalent_classes_collection: equivalent_classes = [x for x in equivalent_classes if x in classes] if len(equivalent_classes)>1: representative_class = equivalent_classes[0] other_classes = equivalent_classes[1:] equivalent_indices = [classes.index(x) for x in equivalent_classes] representative_index = equivalent_indices[0] other_indices = equivalent_indices[1:] binary_outputs[:, representative_index] = np.any(binary_outputs[:, equivalent_indices], axis=1) scalar_outputs[:, representative_index] = np.nanmean(scalar_outputs[:, equivalent_indices], axis=1) remove_classes += other_classes remove_indices += other_indices for x in remove_classes: classes.remove(x) binary_outputs = np.delete(binary_outputs, remove_indices, axis=1) scalar_outputs = np.delete(scalar_outputs, remove_indices, axis=1) # If any of the outputs is a NaN, then replace it with a zero. binary_outputs[np.isnan(binary_outputs)] = 0 scalar_outputs[np.isnan(scalar_outputs)] = 0 # If the binary outputs are negative for all classes, then change the binary output for the normal class to positive. normal_index = classes.index(normal_class) for i in range(num_recordings): num_positive_classes = np.sum(binary_outputs[i, :]) if num_positive_classes==0: binary_outputs[i, normal_index] = 1 return classes, binary_outputs, scalar_outputs # Organize labels and outputs. def organize_labels_outputs(label_classes, output_classes, tmp_labels, tmp_binary_outputs, tmp_scalar_outputs): # Include all classes from either the labels or the outputs. classes = sorted(set(label_classes) | set(output_classes)) num_classes = len(classes) # Check that the labels and outputs have the same numbers of recordings. assert(len(tmp_labels)==len(tmp_binary_outputs)==len(tmp_scalar_outputs)) num_recordings = len(tmp_labels) # Rearrange the columns of the labels and the outputs to be consistent with the order of the classes. labels = np.zeros((num_recordings, num_classes), dtype=np.bool_) for k, dx in enumerate(label_classes): j = classes.index(dx) labels[:, j] = tmp_labels[:, k] binary_outputs = np.zeros((num_recordings, num_classes), dtype=np.bool_) scalar_outputs = np.zeros((num_recordings, num_classes), dtype=np.float64) for k, dx in enumerate(output_classes): j = classes.index(dx) binary_outputs[:, j] = tmp_binary_outputs[:, k] scalar_outputs[:, j] = tmp_scalar_outputs[:, k] return classes, labels, binary_outputs, scalar_outputs # Load a table with row and column names. def load_table(table_file): # The table should have the following form: # # , a, b, c # a, 1.2, 2.3, 3.4 # b, 4.5, 5.6, 6.7 # c, 7.8, 8.9, 9.0 # table = list() with open(table_file, 'r') as f: for i, l in enumerate(f): arrs = [arr.strip() for arr in l.split(',')] table.append(arrs) # Define the numbers of rows and columns and check for errors. num_rows = len(table)-1 if num_rows<1: raise Exception('The table {} is empty.'.format(table_file)) num_cols = set(len(table[i])-1 for i in range(num_rows)) if len(num_cols)!=1: raise Exception('The table {} has rows with different lengths.'.format(table_file)) num_cols = min(num_cols) if num_cols<1: raise Exception('The table {} is empty.'.format(table_file)) # Find the row and column labels. rows = [table[0][j+1] for j in range(num_rows)] cols = [table[i+1][0] for i in range(num_cols)] # Find the entries of the table. values = np.zeros((num_rows, num_cols)) for i in range(num_rows): for j in range(num_cols): value = table[i+1][j+1] if is_number(value): values[i, j] = float(value) else: values[i, j] = float('nan') return rows, cols, values # Load weights. def load_weights(weight_file, classes): # Load the weight matrix. rows, cols, values = load_table(weight_file) assert(rows == cols) num_rows = len(rows) # Assign the entries of the weight matrix with rows and columns corresponding to the classes. num_classes = len(classes) weights = np.zeros((num_classes, num_classes), dtype=np.float64) for i, a in enumerate(rows): if a in classes: k = classes.index(a) for j, b in enumerate(rows): if b in classes: l = classes.index(b) weights[k, l] = values[i, j] return weights # Compute recording-wise accuracy. def compute_accuracy(labels, outputs): num_recordings, num_classes = np.shape(labels) num_correct_recordings = 0 for i in range(num_recordings): if np.all(labels[i, :]==outputs[i, :]): num_correct_recordings += 1 return float(num_correct_recordings) / float(num_recordings) # Compute confusion matrices. def compute_confusion_matrices(labels, outputs, normalize=False): # Compute a binary confusion matrix for each class k: # # [TN_k FN_k] # [FP_k TP_k] # # If the normalize variable is set to true, then normalize the contributions # to the confusion matrix by the number of labels per recording. num_recordings, num_classes = np.shape(labels) if not normalize: A = np.zeros((num_classes, 2, 2)) for i in range(num_recordings): for j in range(num_classes): if labels[i, j]==1 and outputs[i, j]==1: # TP A[j, 1, 1] += 1 elif labels[i, j]==0 and outputs[i, j]==1: # FP A[j, 1, 0] += 1 elif labels[i, j]==1 and outputs[i, j]==0: # FN A[j, 0, 1] += 1 elif labels[i, j]==0 and outputs[i, j]==0: # TN A[j, 0, 0] += 1 else: # This condition should not happen. raise ValueError('Error in computing the confusion matrix.') else: A = np.zeros((num_classes, 2, 2)) for i in range(num_recordings): normalization = float(max(np.sum(labels[i, :]), 1)) for j in range(num_classes): if labels[i, j]==1 and outputs[i, j]==1: # TP A[j, 1, 1] += 1.0/normalization elif labels[i, j]==0 and outputs[i, j]==1: # FP A[j, 1, 0] += 1.0/normalization elif labels[i, j]==1 and outputs[i, j]==0: # FN A[j, 0, 1] += 1.0/normalization elif labels[i, j]==0 and outputs[i, j]==0: # TN A[j, 0, 0] += 1.0/normalization else: # This condition should not happen. raise ValueError('Error in computing the confusion matrix.') return A # Compute macro F-measure. def compute_f_measure(labels, outputs): num_recordings, num_classes = np.shape(labels) A = compute_confusion_matrices(labels, outputs) f_measure = np.zeros(num_classes) for k in range(num_classes): tp, fp, fn, tn = A[k, 1, 1], A[k, 1, 0], A[k, 0, 1], A[k, 0, 0] if 2 * tp + fp + fn: f_measure[k] = float(2 * tp) / float(2 * tp + fp + fn) else: f_measure[k] = float('nan') macro_f_measure = np.nanmean(f_measure) return macro_f_measure # Compute F-beta and G-beta measures from the unofficial phase of the Challenge. def compute_beta_measures(labels, outputs, beta): num_recordings, num_classes = np.shape(labels) A = compute_confusion_matrices(labels, outputs, normalize=True) f_beta_measure = np.zeros(num_classes) g_beta_measure = np.zeros(num_classes) for k in range(num_classes): tp, fp, fn, tn = A[k, 1, 1], A[k, 1, 0], A[k, 0, 1], A[k, 0, 0] if (1+beta**2)*tp + fp + beta**2*fn: f_beta_measure[k] = float((1+beta**2)*tp) / float((1+beta**2)*tp + fp + beta**2*fn) else: f_beta_measure[k] = float('nan') if tp + fp + beta*fn: g_beta_measure[k] = float(tp) / float(tp + fp + beta*fn) else: g_beta_measure[k] = float('nan') macro_f_beta_measure = np.nanmean(f_beta_measure) macro_g_beta_measure = np.nanmean(g_beta_measure) return macro_f_beta_measure, macro_g_beta_measure # Compute macro AUROC and macro AUPRC. def compute_auc(labels, outputs): num_recordings, num_classes = np.shape(labels) # Compute and summarize the confusion matrices for each class across at distinct output values. auroc = np.zeros(num_classes) auprc = np.zeros(num_classes) for k in range(num_classes): # We only need to compute TPs, FPs, FNs, and TNs at distinct output values. thresholds = np.unique(outputs[:, k]) thresholds = np.append(thresholds, thresholds[-1]+1) thresholds = thresholds[::-1] num_thresholds = len(thresholds) # Initialize the TPs, FPs, FNs, and TNs. tp = np.zeros(num_thresholds) fp = np.zeros(num_thresholds) fn = np.zeros(num_thresholds) tn = np.zeros(num_thresholds) fn[0] = np.sum(labels[:, k]==1) tn[0] = np.sum(labels[:, k]==0) # Find the indices that result in sorted output values. idx = np.argsort(outputs[:, k])[::-1] # Compute the TPs, FPs, FNs, and TNs for class k across thresholds. i = 0 for j in range(1, num_thresholds): # Initialize TPs, FPs, FNs, and TNs using values at previous threshold. tp[j] = tp[j-1] fp[j] = fp[j-1] fn[j] = fn[j-1] tn[j] = tn[j-1] # Update the TPs, FPs, FNs, and TNs at i-th output value. while i < num_recordings and outputs[idx[i], k] >= thresholds[j]: if labels[idx[i], k]: tp[j] += 1 fn[j] -= 1 else: fp[j] += 1 tn[j] -= 1 i += 1 # Summarize the TPs, FPs, FNs, and TNs for class k. tpr = np.zeros(num_thresholds) tnr = np.zeros(num_thresholds) ppv = np.zeros(num_thresholds) npv = np.zeros(num_thresholds) for j in range(num_thresholds): if tp[j] + fn[j]: tpr[j] = float(tp[j]) / float(tp[j] + fn[j]) else: tpr[j] = float('nan') if fp[j] + tn[j]: tnr[j] = float(tn[j]) / float(fp[j] + tn[j]) else: tnr[j] = float('nan') if tp[j] + fp[j]: ppv[j] = float(tp[j]) / float(tp[j] + fp[j]) else: ppv[j] = float('nan') # Compute AUROC as the area under a piecewise linear function with TPR/ # sensitivity (x-axis) and TNR/specificity (y-axis) and AUPRC as the area # under a piecewise constant with TPR/recall (x-axis) and PPV/precision # (y-axis) for class k. for j in range(num_thresholds-1): auroc[k] += 0.5 * (tpr[j+1] - tpr[j]) * (tnr[j+1] + tnr[j]) auprc[k] += (tpr[j+1] - tpr[j]) * ppv[j+1] # Compute macro AUROC and macro AUPRC across classes. macro_auroc = np.nanmean(auroc) macro_auprc = np.nanmean(auprc) return macro_auroc, macro_auprc # Compute modified confusion matrix for multi-class, multi-label tasks. def compute_modified_confusion_matrix(labels, outputs): # Compute a binary multi-class, multi-label confusion matrix, where the rows # are the labels and the columns are the outputs. num_recordings, num_classes = np.shape(labels) A = np.zeros((num_classes, num_classes)) # Iterate over all of the recordings. for i in range(num_recordings): # Calculate the number of positive labels and/or outputs. normalization = float(max(np.sum(np.any((labels[i, :], outputs[i, :]), axis=0)), 1)) # Iterate over all of the classes. for j in range(num_classes): # Assign full and/or partial credit for each positive class. if labels[i, j]: for k in range(num_classes): if outputs[i, k]: A[j, k] += 1.0/normalization return A # Compute the evaluation metric for the Challenge. def compute_challenge_metric(weights, labels, outputs, classes, normal_class): num_recordings, num_classes = np.shape(labels) normal_index = classes.index(normal_class) # Compute the observed score. A = compute_modified_confusion_matrix(labels, outputs) observed_score = np.nansum(weights * A) # Compute the score for the model that always chooses the correct label(s). correct_outputs = labels A = compute_modified_confusion_matrix(labels, correct_outputs) correct_score = np.nansum(weights * A) # Compute the score for the model that always chooses the normal class. inactive_outputs = np.zeros((num_recordings, num_classes), dtype=np.bool_) inactive_outputs[:, normal_index] = 1 A = compute_modified_confusion_matrix(labels, inactive_outputs) inactive_score = np.nansum(weights * A) if correct_score != inactive_score: normalized_score = float(observed_score - inactive_score) / float(correct_score - inactive_score) else: normalized_score = float('nan') return normalized_score if __name__ == '__main__': #auroc, auprc, accuracy, f_measure, f_beta_measure, g_beta_measure, challenge_metric = evaluate_12ECG_score(sys.argv[1], sys.argv[2]) lbl_dir = '/home/p2017-999/acs_data/processed_data/physionet2020/jonathan/in' out_dir = '/home/p2017-999/acs_data/processed_data/physionet2020/jonathan/out' auroc, auprc, accuracy, f_measure, f_beta_measure, g_beta_measure, challenge_metric = evaluate_12ECG_score(lbl_dir, out_dir) output_string = 'AUROC,AUPRC,Accuracy,F-measure,Fbeta-measure,Gbeta-measure,Challenge metric\n{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},{:.3f},{:.3f}'.format(auroc, auprc, accuracy, f_measure, f_beta_measure, g_beta_measure, challenge_metric) if len(sys.argv) > 3: with open(sys.argv[3], 'w') as f: f.write(output_string) else: print(output_string)