import os
import sys
import time
from glob import glob
import json
import qt
import slicer
from slicer.ScriptedLoadableModule import *
from DICOMLib import DICOMUtils
from utils.config import module_path, test_data_path
#
# RANOTest
#
[docs]
class RANOTest(ScriptedLoadableModuleTest):
"""
This is the test class for the RANO module.
It is used to test the RANO module and its functionality.
Upon clicking the "Reload and Test" button in the module (only shown when the module is loaded in developer mode),
this classes runTest() method is called.
"""
[docs]
def setUp(self):
""" Do whatever is needed to reset the state - typically a scene clear will be enough.
"""
slicer.mrmlScene.Clear()
[docs]
def runTest(self):
"""Run as few or as many tests as needed here.
"""
self.setUp()
slicer.mrmlScene.Clear()
# check which tests were specified in the command line
tests_to_run = []
for arg in sys.argv:
if arg.startswith("test_"):
tests_to_run.append(arg)
if len(tests_to_run) > 0: # if tests were specified in the command line
for test in tests_to_run:
if hasattr(self, test):
print(f"Running test {test}")
# run the test
try:
getattr(self, test)()
except Exception as e:
print(f"Test {test} failed with error: {e}")
sys.exit(1)
else:
print(f"Test {test} not found")
else: # if no tests were specified run all tests
self.test_RANO_dicom_KCL()
#self.test_RANO_dicom_CPTAC()
self.test_RANO_nifti_MU()
if any(['.py' in arg for arg in sys.argv]):
print('Tests executed from command line. Quitting Slicer...')
slicer.app.quit()
[docs]
def test_RANO_dicom_KCL(self):
slicer.mrmlScene.Clear()
self.delayDisplay("Starting the test")
'''
define the test cases
'''
base_path = os.path.join(test_data_path, 'KCL')
patients = sorted(glob(os.path.join(base_path, "Patient_*")))
kcl_dcm_timepoint_pairs = [((patient, f"TimePoint_001"), (patient, f"TimePoint_002"))
for patient in patients]
cases_of_paths_2tp_times_4channels = [] # n cases, 2 timepoints, 4 channels
for test_case_idx in range(len(kcl_dcm_timepoint_pairs)):
patient = kcl_dcm_timepoint_pairs[test_case_idx][0][0]
timepoint_t1 = kcl_dcm_timepoint_pairs[test_case_idx][0][1]
timepoint_t2 = kcl_dcm_timepoint_pairs[test_case_idx][1][1]
p_t1c_tp1 = os.path.join(base_path, patient, timepoint_t1, "t1c")
p_t1n_tp1 = os.path.join(base_path, patient, timepoint_t1, "t1n")
p_t2f_tp1 = os.path.join(base_path, patient, timepoint_t1, "t2f")
p_t2w_tp1 = os.path.join(base_path, patient, timepoint_t1, "t2w")
p_t1c_tp2 = os.path.join(base_path, patient, timepoint_t2, "t1c")
p_t1n_tp2 = os.path.join(base_path, patient, timepoint_t2, "t1n")
p_t2f_tp2 = os.path.join(base_path, patient, timepoint_t2, "t2f")
p_t2w_tp2 = os.path.join(base_path, patient, timepoint_t2, "t2w")
# check if all files exist
for p in [p_t1c_tp1, p_t1n_tp1, p_t2f_tp1, p_t2w_tp1, p_t1c_tp2, p_t1n_tp2, p_t2f_tp2, p_t2w_tp2]:
if not os.path.isdir(p):
raise NotADirectoryError(f"The directory {p} does not exist...")
cases_of_paths_2tp_times_4channels.append([[p_t1c_tp1, p_t1n_tp1, p_t2f_tp1, p_t2w_tp1],
[p_t1c_tp2, p_t1n_tp2, p_t2f_tp2, p_t2w_tp2]])
assert(len(cases_of_paths_2tp_times_4channels) > 0), f"No test cases found for KCL patients"
'''
run test for each test case
'''
print(f"Running {len(cases_of_paths_2tp_times_4channels)} test cases")
for test_case_idx, curr_paths in enumerate(cases_of_paths_2tp_times_4channels):
# print(f"Paths t1: {curr_paths[0]}")
# print(f"Paths t2: {curr_paths[1]}")
paths_t1 = curr_paths[0]
paths_t2 = curr_paths[1]
# clear the scene
slicer.mrmlScene.Clear()
# load dicoms into slicer using DICOMutils
def dcm_dir_to_node(dcm_dir):
with DICOMUtils.TemporaryDICOMDatabase() as db:
DICOMUtils.importDicom(dcm_dir, db)
patientUIDs = db.patients()
loadedNodeIDs = []
for patientUID in patientUIDs:
loadedNodeIDs.extend(DICOMUtils.loadPatientByUID(patientUID))
if not len(loadedNodeIDs) == 1:
print(f"Expected 1 loaded node from importing DICOM files in {dcm_dir}"
f" but got {len(loadedNodeIDs)}: {loadedNodeIDs}")
loadedNode = slicer.mrmlScene.GetNodeByID(loadedNodeIDs[0])
return loadedNode
inputVolumes = [dcm_dir_to_node(p) for p in paths_t1]
inputVolumes_t2 = [dcm_dir_to_node(p) for p in paths_t2]
self.delayDisplay('Loaded test data set')
self.test_pipeline(inputVolumes,
inputVolumes_t2,
do_affinereg=True,
input_is_bet=False,
method2Dmeas="RANO",
seg_model_key="t1c, t1n, t2f, t2w: task4001",
automatic_segmentation=True,
line_placement=True,
report_creation=True)
[docs]
def test_RANO_dicom_CPTAC(self):
slicer.mrmlScene.Clear()
self.delayDisplay("Starting the test")
'''
define the test cases
'''
base_path = os.path.join(test_data_path, 'CPTAC-GBM')
patients = ["C3L-00016"]
cases_of_paths_2tp_times_4channels = [] # n cases, 2 timepoints, 4 channels
for patient in patients:
if patient == "C3L-00016":
timepoint1 = "11-15-1999-NA-MR BRAIN WOW CONTRAST-47088"
timepoint2 = "11-17-1999-NA-MR BRAIN WOW CONTRAST-15329"
p_t1c_tp1 = os.path.join(base_path, patient, timepoint1, "13.000000-AX T1C-70939")
p_t1n_tp1 = os.path.join(base_path, patient, timepoint1, "9.000000-AX T1-06604")
p_t2f_tp1 = os.path.join(base_path, patient, timepoint1, "7.000000-Ax Flair irFSE H-84835")
p_t2w_tp1 = os.path.join(base_path, patient, timepoint1, "10.000000-Prop T2 TRF-43669")
p_t1c_tp2 = os.path.join(base_path, patient, timepoint2, "12.000000-AX T1C-87798")
p_t1n_tp2 = os.path.join(base_path, patient, timepoint2, "5.000000-AX T1-46920")
p_t2f_tp2 = os.path.join(base_path, patient, timepoint2, "6.000000-Ax Flair irFSE H-49772")
p_t2w_tp2 = os.path.join(base_path, patient, timepoint2, "8.000000-Prop T2 TRF-56708")
else:
raise ValueError(f"No paths defined for patient {patient}")
# check if all directories exist
for p in [p_t1c_tp1, p_t1n_tp1, p_t2f_tp1, p_t2w_tp1, p_t1c_tp2, p_t1n_tp2, p_t2f_tp2, p_t2w_tp2]:
if not os.path.isdir(p):
raise NotADirectoryError(f"The directory {p} does not exist...")
cases_of_paths_2tp_times_4channels.append([[p_t1c_tp1, p_t1n_tp1, p_t2f_tp1, p_t2w_tp1],
[p_t1c_tp2, p_t1n_tp2, p_t2f_tp2, p_t2w_tp2]])
assert(len(cases_of_paths_2tp_times_4channels) > 0), f"No test cases found for CPTAC-GBM patients"
'''
run test for each test case
'''
print(f"Running {len(cases_of_paths_2tp_times_4channels)} test cases")
for test_case_idx, curr_paths in enumerate(cases_of_paths_2tp_times_4channels):
# print(f"Paths t1: {curr_paths[0]}")
# print(f"Paths t2: {curr_paths[1]}")
paths_t1 = curr_paths[0]
paths_t2 = curr_paths[1]
# clear the scene
slicer.mrmlScene.Clear()
# load dicoms into slicer using DICOMutils
def dcm_dir_to_node(dcm_dir):
with DICOMUtils.TemporaryDICOMDatabase() as db:
DICOMUtils.importDicom(dcm_dir, db)
patientUIDs = db.patients()
loadedNodeIDs = []
for patientUID in patientUIDs:
loadedNodeIDs.extend(DICOMUtils.loadPatientByUID(patientUID))
if not len(loadedNodeIDs) == 1:
print(f"Expected 1 loaded node from importing DICOM files in {dcm_dir}"
f" but got {len(loadedNodeIDs)}: {loadedNodeIDs}")
loadedNode = slicer.mrmlScene.GetNodeByID(loadedNodeIDs[0])
return loadedNode
inputVolumes = [dcm_dir_to_node(p) for p in paths_t1]
inputVolumes_t2 = [dcm_dir_to_node(p) for p in paths_t2]
self.delayDisplay('Loaded test data set')
self.test_pipeline(inputVolumes,
inputVolumes_t2,
do_affinereg=True,
input_is_bet=False,
method2Dmeas="RANO_open3D",
seg_model_key="t1c, t1n, t2f, t2w: task4001",
automatic_segmentation=True,
line_placement=True,
report_creation=True)
[docs]
def test_RANO_nifti_MU(self):
slicer.mrmlScene.Clear()
self.delayDisplay("Starting the test")
'''
define the test cases
'''
# to store all the paths for all test cases
cases_of_paths_2tp_times_4channels = [] # n cases, 2 timepoints, 4 channels
base_path = os.path.join(test_data_path, 'MU-Glioma-Post')
patient_dirs = sorted(glob(os.path.join(base_path, "PatientID_*")))
patients = [os.path.basename(p) for p in patient_dirs]
for patient, patient_dir in zip(patients, patient_dirs):
timepoints = [os.path.basename(p) for p in sorted(glob(os.path.join(patient_dir, "Timepoint_*")))]
timepoint_1_dir = os.path.join(patient_dir, timepoints[0])
timepoint_2_dir = os.path.join(patient_dir, timepoints[1])
p_t1c_tp1 = os.path.join(timepoint_1_dir, patient + f"_{timepoints[0]}_brain_t1c.nii.gz")
p_t1n_tp1 = os.path.join(timepoint_1_dir, patient + f"_{timepoints[0]}_brain_t1n.nii.gz")
p_t2f_tp1 = os.path.join(timepoint_1_dir, patient + f"_{timepoints[0]}_brain_t2f.nii.gz")
p_t2w_tp1 = os.path.join(timepoint_1_dir, patient + f"_{timepoints[0]}_brain_t2w.nii.gz")
p_t1c_tp2 = os.path.join(timepoint_2_dir, patient + f"_{timepoints[1]}_brain_t1c.nii.gz")
p_t1n_tp2 = os.path.join(timepoint_2_dir, patient + f"_{timepoints[1]}_brain_t1n.nii.gz")
p_t2f_tp2 = os.path.join(timepoint_2_dir, patient + f"_{timepoints[1]}_brain_t2f.nii.gz")
p_t2w_tp2 = os.path.join(timepoint_2_dir, patient + f"_{timepoints[1]}_brain_t2w.nii.gz")
# check if all files exist
for p in [p_t1c_tp1, p_t1n_tp1, p_t2f_tp1, p_t2w_tp1, p_t1c_tp2, p_t1n_tp2, p_t2f_tp2, p_t2w_tp2]:
if not os.path.isfile(p):
raise FileNotFoundError(f"The file {p} does not exist...")
cases_of_paths_2tp_times_4channels.append([[p_t1c_tp1, p_t1n_tp1, p_t2f_tp1, p_t2w_tp1],
[p_t1c_tp2, p_t1n_tp2, p_t2f_tp2, p_t2w_tp2]])
assert(len(cases_of_paths_2tp_times_4channels) > 0), f"No test cases found for MU-Glioma-Post patients"
'''
run test for each test case
'''
print(f"Running {len(cases_of_paths_2tp_times_4channels)} test cases")
for test_case_idx, curr_paths in enumerate(cases_of_paths_2tp_times_4channels):
# print(f"Paths t1: {curr_paths[0]}")
# print(f"Paths t2: {curr_paths[1]}")
paths_t1 = curr_paths[0]
paths_t2 = curr_paths[1]
# clear the scene
slicer.mrmlScene.Clear()
# load volumes into slicer
inputVolumes = [slicer.util.loadVolume(p) for p in paths_t1]
inputVolumes_t2 = [slicer.util.loadVolume(p) for p in paths_t2]
self.delayDisplay('Loaded test data set')
self.test_pipeline(inputVolumes,
inputVolumes_t2,
do_affinereg=False,
input_is_bet=True,
method2Dmeas="RANO",
seg_model_key="t1c, t1n, t2f, t2w: task4001",
automatic_segmentation=True,
line_placement=True,
report_creation=True)
[docs]
def test_pipeline(self,
inputVolumes,
inputVolumes_t2,
do_affinereg,
input_is_bet,
method2Dmeas,
seg_model_key,
automatic_segmentation,
line_placement,
report_creation,):
# set the UI
# print(f"input volumes = {inputVolumes}")
# print(f"input volumes t2 = {inputVolumes_t2}")
# set the input volumes
slicer.modules.RANOWidget.ui.inputSelector_channel1_t1.setCurrentNode(inputVolumes[0])
slicer.modules.RANOWidget.ui.inputSelector_channel2_t1.setCurrentNode(inputVolumes[1])
slicer.modules.RANOWidget.ui.inputSelector_channel3_t1.setCurrentNode(inputVolumes[2])
slicer.modules.RANOWidget.ui.inputSelector_channel4_t1.setCurrentNode(inputVolumes[3])
slicer.modules.RANOWidget.ui.inputSelector_channel1_t2.setCurrentNode(inputVolumes_t2[0])
slicer.modules.RANOWidget.ui.inputSelector_channel2_t2.setCurrentNode(inputVolumes_t2[1])
slicer.modules.RANOWidget.ui.inputSelector_channel3_t2.setCurrentNode(inputVolumes_t2[2])
slicer.modules.RANOWidget.ui.inputSelector_channel4_t2.setCurrentNode(inputVolumes_t2[3])
# set the output segmentation
outputSegmentation = slicer.mrmlScene.AddNewNodeByClass("vtkMRMLSegmentationNode")
outputSegmentation.SetName("outputSegmentation_t1")
slicer.modules.RANOWidget.ui.outputSelector.setCurrentNode(outputSegmentation)
outputSegmentation_t2 = slicer.mrmlScene.AddNewNodeByClass("vtkMRMLSegmentationNode")
outputSegmentation_t2.SetName("outputSegmentation_t2")
slicer.modules.RANOWidget.ui.outputSelector_t2.setCurrentNode(outputSegmentation_t2)
# set the segmentation model
model_info = json.loads(slicer.modules.RANOWidget._parameterNode.GetParameter("ModelInfo"))
model_index = list(model_info.keys()).index(seg_model_key)
slicer.modules.RANOWidget.ui.modelComboBox.setCurrentIndex(model_index)
slicer.modules.RANOWidget.ui.modelComboBox_t2.setCurrentIndex(model_index)
# set the affine registration and input is bet checkboxes
slicer.modules.RANOWidget.ui.affineregCheckBox.checked = do_affinereg
slicer.modules.RANOWidget.ui.inputisbetCheckBox.checked = input_is_bet
slicer.modules.RANOWidget.ui.affineregCheckBox_t2.checked = do_affinereg
slicer.modules.RANOWidget.ui.inputisbetCheckBox_t2.checked = input_is_bet
if automatic_segmentation:
# press the run segmentation button
print("Starting the automatic segmentation")
cliNode, cliNode_t2 = slicer.modules.RANOWidget.onCalcSegmentationsButton()
# block execution here until all segments have been loaded into the segmentation nodes
# check if segmentations have been loaded
parameterNode = slicer.modules.RANOWidget._parameterNode
while (parameterNode.GetParameter("segmentation_loaded_timepoint1") != "true"
or parameterNode.GetParameter("segmentation_loaded_timepoint2") != "true"):
slicer.app.processEvents() # Keep the GUI responsive and run the events that create the segmentation after the CLIs have finished
time.sleep(0.1)
# print("waiting for the segmentation to be loaded")
else:
print("Skipping the automatic segmentation")
if line_placement:
print("Starting the automatic 2D measurements")
# select the segment in the segment selector
slicer.modules.RANOWidget.ui.SegmentSelectorWidget.setCurrentNode(outputSegmentation)
segmentID = "3" # Enhancing tumor
predicted_segmentIDs = slicer.modules.RANOWidget.ui.SegmentSelectorWidget.currentNode().GetSegmentation().GetSegmentIDs()
# select the first segment in the segment selector
if segmentID in predicted_segmentIDs:
slicer.modules.RANOWidget.ui.SegmentSelectorWidget.setCurrentSegmentID(segmentID)
else:
slicer.modules.RANOWidget.ui.SegmentSelectorWidget.setCurrentSegmentID(None)
# set the method
method_idx = slicer.modules.RANOWidget.ui.method2DmeasComboBox.findText(method2Dmeas)
slicer.modules.RANOWidget.ui.method2DmeasComboBox.setCurrentIndex(method_idx)
# press the calc 2D button
slicer.modules.RANOWidget.onCalc2DButton()
else:
print("Skipping the automatic 2D measurements")
if report_creation:
print("Starting the report creation")
def auto_click_yes():
widgets = qt.QApplication.topLevelWidgets()
for w in widgets:
if isinstance(w, qt.QMessageBox):
yes_button = w.button(qt.QMessageBox.Yes)
yes_button.click()
qt.QTimer.singleShot(0, auto_click_yes)
# press the calc report button
slicer.modules.RANOWidget.onCreateReportButton()
else:
print("Skipping the report creation")
self.delayDisplay('Test passed')