Skip to content

Commit

Permalink
Merge pull request #57 from JakaKokosar/rank-survival-features
Browse files Browse the repository at this point in the history
owranksurvivalfeatures: add log-rank scorer
  • Loading branch information
JakaKokosar authored Dec 6, 2022
2 parents 23ff454 + 5964b97 commit 35c2a26
Showing 1 changed file with 135 additions and 56 deletions.
191 changes: 135 additions & 56 deletions orangecontrib/survival_analysis/widgets/owranksurvivalfeatures.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,10 @@
import queue
from multiprocessing import cpu_count
from functools import partial
from typing import Any, Optional, List
from typing import Any, Optional

from lifelines import CoxPHFitter
from lifelines.statistics import multivariate_logrank_test
from statsmodels.stats.multitest import fdrcorrection

from AnyQt.QtWidgets import QButtonGroup, QGridLayout, QRadioButton, QAbstractScrollArea
Expand Down Expand Up @@ -33,7 +34,14 @@
from orangecontrib.survival_analysis.modeling.cox import to_data_frame


def batch_to_process(queue, time_var, event_var, df):
class ScoreMethods:
multivariate_log_rank = 0
cox_regression = 1

labels = ['Multivariate log-rank test', 'Cox regression']


def __process_batch(queue, time_var, event_var, df):
batch_results = []
cph = CoxPHFitter()

Expand All @@ -50,7 +58,6 @@ def batch_to_process(queue, time_var, event_var, df):
batch_results.append(
(
covariate,
cph.log_likelihood_,
ll_ratio_test.test_statistic,
ll_ratio_test.p_value,
)
Expand All @@ -59,27 +66,31 @@ def batch_to_process(queue, time_var, event_var, df):
return np.array(batch_results)


def worker(
table: Table, covariates: List, time_var: str, event_var: str, state: TaskState
def cox_regression_scorer_multicore(
data: Table, time_var: str, event_var: str, state: TaskState
):
attr_name_to_variable = {attr.name: attr for attr in data.domain.attributes}
attrs = list(attr_name_to_variable.keys())

# order of the data gets lost with map_async
attr_to_result = {attr.name: None for attr in data.domain.attributes}

with multiprocessing.Manager() as _manager:
_queue = _manager.Queue()
_cpu_count = cpu_count()

df = to_data_frame(table)
if len(covariates) > 50:
batches = (
df[[time_var, event_var] + batch]
for batch in [covariates[i::_cpu_count] for i in range(_cpu_count)]
)
else:
batches = (df[[time_var, event_var] + [cov]] for cov in covariates)
progress_steps = iter(np.linspace(0, 100, len(covariates)))
df = to_data_frame(data)
batches = (
df[[time_var, event_var] + list(batch)]
for batch in [attrs[i::_cpu_count] for i in range(_cpu_count)]
)

progress_steps = iter(np.linspace(0, 100, len(attrs)))

with multiprocessing.Pool(processes=_cpu_count) as pool:
results = pool.map_async(
partial(
batch_to_process,
__process_batch,
_queue,
time_var,
event_var,
Expand All @@ -95,13 +106,86 @@ def worker(
break

stacked_result = np.vstack(results.get())
covariate_names = stacked_result[:, 0]
covariate_names = stacked_result[:, 0].tolist()
results = stacked_result[:, 1:].astype(float)
_, pvals_corrected = fdrcorrection(results[:, -1], is_sorted=False)
results = np.hstack(
(results, pvals_corrected.reshape(pvals_corrected.shape[0], -1))
)
return covariate_names, results

# map attr name to results in 'attr_to_result' dict
for attr_name, row_data in zip(covariate_names, results):
attr_to_result[attr_name] = row_data.tolist()

# output sorted data
return [
[attr_name_to_variable[attr_name]] + row_data
for attr_name, row_data in attr_to_result.items()
]


def cox_regression_scorer(data: Table, time_var: str, event_var: str, state: TaskState):
progress_steps = iter(np.linspace(0, 100, len(data.domain.attributes)))

attr_name_to_variable = {attr.name: attr for attr in data.domain.attributes}

df = to_data_frame(data)
cph = CoxPHFitter()
results = []

for attr_name in [col for col in df.columns if col not in (time_var, event_var)]:
# fit cox
model = cph.fit(
df[[time_var, event_var, attr_name]],
duration_col=time_var,
event_col=event_var,
)
# log-likelihood ratio test
ll_ratio_test = model.log_likelihood_ratio_test()
results.append(
[
attr_name_to_variable[attr_name],
ll_ratio_test.test_statistic,
ll_ratio_test.p_value,
]
)
state.set_progress_value(next(progress_steps))
return results


def log_rank_scorer(data: Table, time_var: str, event_var: str, state: TaskState):

time = data.get_column(time_var)
event = data.get_column(event_var)
progress_steps = iter(np.linspace(0, 100, len(data.domain.attributes)))

results = []
for var in list(data.domain.attributes):
column_values = mask = data.get_column(var)
if var.is_continuous:
mask = column_values > np.median(column_values)
log_rank = multivariate_logrank_test(time, mask, event)
results.append([var, log_rank.test_statistic, log_rank.p_value])
state.set_progress_value(next(progress_steps))

return results


def worker(data: Table, score_method, state: TaskState):
time_var, event_var = get_survival_endpoints(data.domain)
_, columns = data.X.shape

if score_method == ScoreMethods.multivariate_log_rank:
scorer = log_rank_scorer
elif score_method == ScoreMethods.cox_regression:
scorer = (
cox_regression_scorer if columns <= 100 else cox_regression_scorer_multicore
)
else:
raise ValueError('Unexpected scorer type')

results = scorer(data, time_var.name, event_var.name, state)

# the last column is p values, calculate FDR
_, pvals_corrected = fdrcorrection([col[-1] for col in results], is_sorted=False)

return [columns + [fdr] for columns, fdr in zip(results, pvals_corrected)]


class OWRankSurvivalFeatures(OWWidget, ConcurrentWidgetMixin):
Expand All @@ -118,6 +202,7 @@ class OWRankSurvivalFeatures(OWWidget, ConcurrentWidgetMixin):
selected_attrs = ContextSetting([], schema_only=True)
selection_method = Setting(select_n_best, schema_only=True)
n_selected = Setting(20, schema_only=True)
score_method = Setting(0, schema_only=True)
auto_commit: bool = Setting(True, schema_only=True)

class Inputs:
Expand All @@ -131,10 +216,15 @@ def __init__(self):
ConcurrentWidgetMixin.__init__(self)

self.data: Optional[Table] = None
self.attr_name_to_variable: Optional[Table] = None
self.covariates_from_worker_result = None
self.time_var: Optional[str] = None
self.event_var: Optional[str] = None

box = gui.vBox(self.controlArea, 'Score method', margin=0)
gui.comboBox(
box,
self,
'score_method',
items=ScoreMethods.labels,
callback=self.start_worker,
)

gui.rubber(self.controlArea)

Expand Down Expand Up @@ -184,9 +274,6 @@ def button(text, buttonid, tool_tip=None):
self.model = PyTableModel(parent=self)
self.table_view = TableView(parent=self)
self.table_view.setModel(self.model)
self.model.setHorizontalHeaderLabels(
['Log-Likelihood', 'Log-Likelihood Ratio', f'{"p".center(13)}', 'FDR']
)
self.table_view.setSizeAdjustPolicy(
QAbstractScrollArea.AdjustToContentsOnFirstShow
)
Expand All @@ -200,33 +287,26 @@ def _set_select_manual():

self.mainArea.layout().addWidget(self.table_view)

@property
def covariates(self) -> Optional[List[str]]:
if not self.data:
return
return [attr.name for attr in self.data.domain.attributes]

@Inputs.data
@check_survival_data
def set_data(self, data: Table):
self.closeContext()
self.selected_attrs = []
self.covariates_from_worker_result = []
self.model.clear()
self.model.resetSorting()

if not data:
return

self.data = data
self.attr_name_to_variable = {
attr.name: attr for attr in self.data.domain.attributes
}

self.openContext(data)
time_var, event_var = get_survival_endpoints(self.data.domain)
self.time_var, self.event_var = time_var.name, event_var.name
self.start(worker, self.data, self.covariates, self.time_var, self.event_var)
self.start_worker()

def start_worker(self):
if self.data is None or self.score_method is None:
return

self.start(worker, self.data, self.score_method)

def commit(self):
if not self.selected_attrs:
Expand All @@ -239,23 +319,23 @@ def commit(self):
self.Outputs.reduced_data.send(data)

def on_done(self, worker_result):
covariate_names, results = worker_result
self.model.wrap(worker_result)

# wrap everything except covariate names
self.model.wrap(results.tolist())
if self.score_method == ScoreMethods.multivariate_log_rank:
self.model.setHorizontalHeaderLabels(
['', 'Multivariate log-rank test', f'{"p".center(13)}', 'FDR']
)

# this is temp solution because covariate orders gets mixed when using multiprocessing
self.covariates_from_worker_result = covariate_names.tolist()
elif self.score_method == ScoreMethods.cox_regression:
self.model.wrap(worker_result)
self.model.setHorizontalHeaderLabels(
['', 'Log-Likelihood Ratio test', f'{"p".center(13)}', 'FDR']
)

# match covariate names to domain variables and set vertical header
self.model.setVerticalHeaderLabels(
[self.attr_name_to_variable[name] for name in covariate_names]
)
self.table_view.resizeColumnsToContents()

# sort by p values
self.table_view.sortByColumn(2, Qt.AscendingOrder)

self.auto_select()

def on_exception(self, ex):
Expand Down Expand Up @@ -286,9 +366,10 @@ def auto_select(self):
selection = QItemSelection()
if self.selected_attrs is not None:
attr_indices = [
self.covariates_from_worker_result.index(var.name)
self.data.domain.attributes.index(var)
for var in self.selected_attrs
]

for row in self.model.mapFromSourceRows(attr_indices):
selection.append(
QItemSelectionRange(
Expand All @@ -303,9 +384,7 @@ def on_select(self):
selected_rows = self.table_view.selectionModel().selectedRows(0)
row_indices = [i.row() for i in selected_rows]
attr_indices = self.model.mapToSourceRows(row_indices)
self.selected_attrs = [
self.model._headers[Qt.Vertical][row] for row in attr_indices
]
self.selected_attrs = [self.data.domain[idx] for idx in attr_indices]
self.commit()

def sizeHint(self):
Expand Down

0 comments on commit 35c2a26

Please sign in to comment.