Source code for hdireg.HDIreg.transformix
#Image transformation using elastix/transformix
#Joshua Hess
#Import modules
import numpy as np
import sys
import time
import os
import re
import nibabel as nib
from pathlib import Path
import pandas as pd
import tempfile
from skimage.transform import resize
import shutil
#Import external modules
from hdiutils.HDIimport import hdi_reader
from hdiutils.HDIexport import hdi_exporter
[docs]def GetCropCoords(coords_csv, correction = 80):
"""
Read csv file containing coordinates for rectangular crop through
FIJI and return the coordinates as a list.
Parameters
----------
coords_csv: string.
Path to csv file containing coordinates.
correction: integer (Default: 80)
Extra pixels to take on each side of the crop contour for buffer.
Returns
-------
list with min row, max row, min col, max col coordinates of bounding box.
"""
#Ensure that the csv is pathlib object
coords_csv = Path(coords_csv)
#Read the file
file_cont = pd.read_csv(coords_csv)
#Get the XY coordinates from the dataframe
col = list(file_cont['X']-1)
row = list(file_cont['Y']-1)
#Get min and max for rows
min_row = min(row)
max_row = max(row)
#Get column min and max
min_col = min(col)
max_col = max(col)
#Check to see if we are adding overlap to our ROIs
if correction != None:
#Get the bounding box
min_row = (min_row-(correction))
max_row = (max_row+(correction))
min_col = (min_col-(correction))
max_col = (max_col+(correction))
#Return the contour of the rectangular crop
return [min_row,max_row,min_col,max_col]
[docs]def CropROI(coords,full_img,target_size = None):
"""Extract region of interest from full image array.
Parameters
----------
coords: list
Returned from ``GetCropCoords``
full_img: array
Contains the image to be cropped from.
target_size: tuple of type integer (sizex,sizey; Default: None)
Resize image using bilinear interpolation before exporting.
Returns
-------
nifti_ROI: object of class nibabel.Nifti1Image
Returns nibabel nifti object that is the cropped region to be exported.
"""
#Get region of interest crop from the full array using coorinates
tmp_ROI = full_img[int(coords[2]):int(coords[3]),int(coords[0]):int(coords[1])]
#Get the size of the imc image
#multiple = int(imc_im.shape[1]/tmp_ROI.shape[0])
#Use the rounded multiple to resize our ROI for image registration
#HiRes_size = (tmp_ROI.shape[1]*multiple,tmp_ROI.shape[0]*multiple)
#HiRes_size = (imc_im.shape[0],imc_im.shape[1])
if target_size is not None:
#Remember that cv2 does the axis in the opposite order as numpy
# note here that we are no longer using cv2 -- this is skimage resizing...
tmp_ROI = resize(tmp_ROI,target_size)
#Create a nifti object
nifti_ROI = nib.Nifti1Image(np.rot90(tmp_ROI.T,1), affine=np.eye(4))
#Return image
return nifti_ROI
[docs]def ExtractROIcrop(coords_csv, full_img, target_size = None, correction = 80):
"""
Combine ``CropROI`` and ``GetCropCoords`` to return
a nifti object available for export.
Parameters
----------
coords_csv: string.
Path to csv file containing coordinates.
full_img: array
Contains the image to be cropped from.
target_size: tuple of type integer (sizex,sizey; Default: None)
Resize image using bilinear interpolation before exporting.
correction: integer (Default: 80)
Extra pixels to take on each side of the crop contour for buffer.
Returns
-------
crop_im: object of class nibabel.Nifti1Image
Returns nibabel nifti object that is the cropped region to be exported.
"""
#Extract the coordinates from the csv file
coords = GetCropCoords(coords_csv, correction)
#Extract the nifti cropped region
crop_im = CropROI(coords,full_img,target_size)
#Return the cropped image -- nifti formatted
return crop_im
#Define base component transformix
[docs]def RunTransformix(command):
"""Run the transfomix transformation. You must be able to call transformix
from your command shell to use this. You must also have your transformation
parameter files set before running (see transformix parameter files).
Parameters
----------
command: string
Sent to the system for transformix running (see elastix command line implementation).
Returns
-------
command: string
Same string as above.
"""
#Print command
print(str(command))
#Print transformix update
#print('Running transformix...')
#Send the command to the shell
os.system(command)
#Print update
#print('Finished')
#Return values
return command
[docs]def CreateCompositeTransforms(tps, out_dir):
"""Apply a series of transformations to an image using transformix. Prior
results show that composing transforms in elastix do not yield the same
result as appying each transformation in series. This will take more time
but will give the expected result. A new copy of transform parameters will
be created and exported to the out_dir
Parameters
----------
tps: list (length number of transformation parameter files)
Path to elastix image registration transform parameter files (in order of application).
out_dir: string
Path to output directory for transform parameters and output results.
Returns
-------
transform_calls: list
List of transform parameters to insert into the transformix command "--tp".
init_trans_list: list
List of initial transformations in the tranform parameter files. These
are part of the actual transform parameter .txt files.
"""
#Ensure the input tps are pathlib objects
tps = [Path(t) for t in tps]
#Ensure the out directory is a pathlib object
out_dir = Path(out_dir)
#Create a list of new filenames that will be exported
new_tps = [Path(os.path.join(str(out_dir),"TransformParameters_comp."+str(i)+".txt")) for i in range(len(tps))]
#Create a list of items to store for no initial transforms -- these are going
init_trans_list = []
#Create a lsit of files that transformix will call on
transform_calls = []
#Create a dictionary that stores old tps and new tps
tp_dict = {}
#Iterate through tps and add to dictionary
for t in range(len(tps)):
#Access the old tp name and new, add to the dictionary
tp_dict.update({tps[t]:new_tps[t]})
#Iterate through all other files and change inital transforms
for t in range(0,len(tps[1:])+1):
#Extract and modify the first file in the list of transform parameters
with open(tps[t], 'r') as f:
#Read the file
filedata = f.read()
#Search for the initial transform parameter
for l in list(filedata.split("\n")):
#Check if inital transform string is in the line
if "InitialTransformParametersFileName" in l:
#Extract the inital transform parameter string
init_trans = l.split(" ")[1].strip(")").strip('"')
#Update the list of init trans
init_trans_list.append(init_trans)
#Now iterate through the init trans files and add true or false for whether they are essential (called in transformix)
for t in range(0,len(init_trans_list[1:])):
#Extract the init trans file and the one above -- skip last one -- always used!
if init_trans_list[t+1] == 'NoInitialTransform':
#Update the list at t to be true
transform_calls.append(new_tps[t])
#Add the last transform parameter to the transform call by default
transform_calls.append(new_tps[len(new_tps)-1])
#Iterate through all other files and change inital transforms
for t in range(0,len(tps[1:])+1):
#Extract and modify the first file in the list of transform parameters
with open(tps[t], 'r') as f:
#Read the file
filedata = f.read()
#Search for the initial transform parameter
for l in list(filedata.split("\n")):
#Check if inital transform string is in the line
if "InitialTransformParametersFileName" in l:
#Extract the inital transform parameter string
init_trans = l.split(" ")[1].strip(")").strip('"')
#Check to ensure that the initial transform is no initial transform
if init_trans != 'NoInitialTransform':
#Get the corresponding new file name from the old
#n_tp = tp_dict[Path(init_trans)]
n_tp = new_tps[t-1]
#Then replace the initial transform in the file with the new tp
filedata = filedata.replace(init_trans, str(n_tp))
#Write out the new data to the new transform parameter filename
with open(new_tps[t], 'w') as file:
#Write the new file
file.write(filedata)
#Remove the filedata for now
filedata = None
#Return the list of new parameter files
return transform_calls,init_trans_list
[docs]def MultiTransformix(in_im, out_dir, tps):
"""Applies multiple transform files in sequence to an image.
Parameters
----------
in_im: string
Input image path.
tps: list (length number of transformation parameter files)
Path to elastix image registration transform parameter files (in order of application).
out_dir: string
Path to output directory for transform parameters and output results.
"""
#Ensure the input image is pathlib object
in_im = Path(in_im)
#Ensure the input tps are pathlib objects
tps = [Path(t) for t in tps]
#Ensure the out directory is a pathlib object
out_dir = Path(out_dir)
#Create transformix command
command = "transformix"
#Run the CreateCompositeTransforms(tps, out_dir) function
trans_calls,init_trans_list = CreateCompositeTransforms(tps, out_dir)
print("Created transform parameters length:" + str(len(trans_calls)))
#Create temporary directory in the out_dir
with tempfile.TemporaryDirectory(dir=out_dir) as nestdirname:
#Run the first transformation
tmp_command = command +' -out ' + str(nestdirname)
tmp_command = tmp_command + ' -tp ' + str(trans_calls[0])
tmp_command = tmp_command + ' -in ' + str(in_im)
#Iterate through each transform parameter file and run transformix
RunTransformix(tmp_command)
#rint("Getting first results")
#Get a result name for the output of transformix (assumes nifti for now)
res_name = Path(os.path.join(str(nestdirname),"result"+in_im.suffix))
#print("Got first results")
#Check to see if the list is larger than 2 (if no then only take the last parameter file)
if len(trans_calls) > 1:
#Print update
#print("Number of transform calls:" + str(len(trans_calls)))
#Now iterate through all the other transform parameter files and run transformix
for t in range(1,len(trans_calls)):
#print("Running transform call:" + str(t))
#Create the temporary transformix command
tmp_command = command
#Check to see if this is the last iteration
if t == (len(trans_calls)-1):
#Add the result name
tmp_command = tmp_command + ' -in ' + str(res_name)
#Make the output directory the final out_dir
tmp_command = tmp_command +' -out ' + str(out_dir)
#Update result name
res_name = Path(os.path.join(str(out_dir),"result"+in_im.suffix))
#Otherwise, leave it as the tmp directory
else:
#Add the result name to the command
tmp_command = tmp_command + ' -in ' + str(res_name)
#Run transformixs
tmp_command = tmp_command +' -out ' + str(nestdirname)
#Get a result name for the output of transformix (assumes nifti for now)
res_name = Path(os.path.join(str(nestdirname),"result"+in_im.suffix))
#Add the transform parameters
tmp_command = tmp_command + ' -tp ' + str(trans_calls[t])
#Iterate through each transform parameter file and run transformix
RunTransformix(tmp_command)
else:
#Just change the results to the output directory
new_name = Path(os.path.join(str(out_dir),"result"+in_im.suffix))
#Get the resulting image to rename (so we don't overwrite results)
res_name.rename(new_name)
#Set back the res name
res_name = new_name
#Return the result name
return res_name
#Create class structure for transformix implementation
[docs]class Transformix():
"""Python implementation of Transformix with automated image pre-processing.
Parameters
----------
in_im: string
Input image path.
out_dir: string
Path to output directory for transform parameters and output results.
tps: list (length number of transformation parameter files)
Path to elastix image registration transform parameter files (in order of application).
target_size: tuple of type integer (sizex,sizey; Default: None)
Resize image using bilinear interpolation before exporting.
pad: tuple of type integer (padx,pady; Default: None)
Indicates height and length padding to add to the image before exporting.
trim: tuple of type integer (trimx,trimy; Default: None)
Indicates height and length trimming to remove from the image before exporting.
crops: dictionary (Default: None)
Dictionary of dictionaries containing {crop_name(s): {coords_csv: , target_size: , correction: , tp: , fixed_pad: }}.
All arguments for each cropped region (i.e., region of interest) are performed
only on the given crop.
* coords_csv: string.
Path to csv file containing coordinates.
* target_size: tuple of type integer (sizex,sizey; Default: None)
Resize image using bilinear interpolation before exporting.
* correction: integer
Extra pixels to take on each side of the crop contour for buffer.
* tp: list (length number of transformation parameter files)
Path to elastix image registration transform parameter files
(in order of application) for this crop.
* fixed_pad: integer
Indicates trim to remove from the cropped image before exporting. This should be the
same value as the amount of padding applied to
the cropped region's corresponding fixed image that was used for
registration.
"""
def __init__(self, in_im, out_dir, tps, target_size = None, pad = None, trim = None, crops = None, out_ext = ".nii"):
#Create pathlib objects and set class parameters
self.in_im = Path(in_im)
self.in_channels = []
self.out_channels = []
self.multichannel = None
self.out_dir = Path(out_dir)
self.tps = [Path(t) for t in tps]
self.command = "transformix"
self.intermediate = False
self.out_ext = out_ext
self.target_size = target_size
self.pad = pad
self.trim = trim
self.crops = crops
# Check for input list or none
if self.target_size!=None:
# convert it to tuple from list (command line parser)
self.target_size = tuple(self.target_size)
# Check for input list or none
if self.pad!=None:
# convert it to tuple from list (command line parser)
self.pad = tuple(self.pad)
#Load images
niiIn = hdi_reader.HDIreader(
path_to_data=in_im,
path_to_markers=None,
flatten=False,
subsample=None,
mask=None,
save_mem=False
)
# update file information
self.baseName = niiIn.hdi.data.filename.stem.replace(".ome","")
self.ext = "".join(niiIn.hdi.data.filename.suffixes)
# Check for input extension or none
if self.out_ext==None:
# convert it to be whatever extension the input image contains as default
self.out_ext = self.ext
# make sure that the export type is not .imzML for now...
if self.out_ext==".imzML":
# change to nifti
self.out_ext=".nii"
# check for number of channels in the image by accessing the class
if niiIn.hdi.data.num_channels >= 2:
# Update multichannel class option
self.multichannel = True
# otherwise this is a single channel image
else:
# multichannel is false
self.multichannel = False
#Check to see if there is single channel input (grayscale)
if not self.multichannel:
# run single channel registration on array
self._singlechannelTransformixArray(niiIn)
# Otherwise there is multichannel input. here we run multichannel registration
# and all operations for preprocessing are performed on a per slice basis to
# save disk space (dont have to export the full z stack at once). For now
# all processing steps are tailored for nifti formats. In the future this
# should easily be changed to allow for any data format
else:
# print update
print('Detected multichannel input')
# Check to see if cropping the resulting image
if self.crops==None:
# check for raster data
if niiIn.hdi.data.hdi_type == 'raster':
# raster transformix
self._multichannelTransformixRaster(niiIn)
# check for image data type
else:
# run multichannel transformix with the array type
self._multichannelTransformixArray(niiIn)
# !!!!!!!!!!!!!!!
# Cropping is true
# for now this is not supported in the nextflow version of miaaim.
# TODO: add this to nextflow. This can be rin python and strings
# together multiple transformations from images and crops within
# those images so that very large arrays do not have to be fully
# exported (e.g. for MSI data that contains thousands of channels
# and low resolution on full tissues)
else:
#Read the image
niiIn = niiIn.get_fdata()
#Create a temporary outer directory
with tempfile.TemporaryDirectory(dir=self.out_dir) as tmphomedir:
#Create a dictionary to store the temporary folders in
tmpfolds = {}
#Create a dictionary to store list of channels in for each roi
roi_channels = {}
#Iterate through each crop and create a temporary directory for it
for roi, pars in crops.items():
#Create a temporary directory and reserve the roi name from the crops input
tmpfolds.update({roi: tempfile.mkdtemp(dir = self.out_dir)})
#Update the channels dictionary to be empty lists
roi_channels.update({roi:[]})
#Print update on array shape
print('Image has ' +str(niiIn.shape[2])+' channels')
#Iterate through each channel in the data
for i in range(niiIn.shape[2]):
#Print update
print('Working on slice '+str(i))
#print("Multichannel crop is true")
#print('Shape of NiiIn is' +str(niiIn.shape[2]))
#Update the list of names for image channels
#self.in_channels.append(im_name)
#create a temporary directory using the context manager for channel-wise images
with tempfile.TemporaryDirectory(dir=tmphomedir) as tmpdirname:
#Create a name for a temporary image
im_name = Path(os.path.join(tmpdirname,self.in_im.stem+str(i)+self.in_im.suffix))
#Check to see if the path exists
if not im_name.is_file():
#Check to see if there is a target size for the image
if target_size != None:
#print("Got the resize")
#Create a nifti image from this slice
nii_im = nib.Nifti1Image(resize(niiIn[:,:,i],target_size), affine=np.eye(4))
#print(" resized")
#No resize
else:
#Leave the image unchanged
nii_im = nib.Nifti1Image(niiIn[:,:,i], affine=np.eye(4))
#Save the nifti image
nib.save(nii_im,str(im_name))
#print("saved "+ str(im_name))
#Remove the nifti slice to clear memory
nii_im = None
#add transform -- check for list size
if len(self.tps) > 1:
#print('GOt the multi tp input')
#Run the composition function for transformix
res_name = MultiTransformix(in_im = im_name, out_dir = tmpdirname, tps = self.tps)
else:
#Create a temporary command to be sent to the shell
tmp_command = self.command + ' -in ' + str(im_name) + ' -out ' + str(tmpdirname)
#Add full tissue transform paramaeters
tmp_command = tmp_command + ' -tp ' + str(self.tps[0])
#Send the command to the shell
RunTransformix(tmp_command)
#Get a temporary result name for the output of transformix (assumes nifti for now)
res_name = Path(os.path.join(tmpdirname,"result"+self.in_im.suffix))
#Load the resulting transformed image
niiResult = nib.load(str(res_name)).get_fdata()
#Iterate through each of the crops
for roi, pars in crops.items():
#Create a temporary directory to store the
with tempfile.TemporaryDirectory(dir=tmpdirname) as roitmp:
#Create a temporary name
tmproiname = Path(os.path.join(roitmp,str(roi)+str(i)+self.in_im.suffix))
#Extract a temporary crop for this region
tmpcrop = ExtractROIcrop(coords_csv = pars['coords_csv'],full_img = niiResult, target_size = pars['target_size'], correction = pars['correction'])
#Save this temporary crop to the roi temp directory
nib.save(tmpcrop,str(tmproiname))
#add transform -- check for list size
if len(pars['tps']) > 1:
#Run the composition function for transformix
roires_name = MultiTransformix(in_im = tmproiname, out_dir = roitmp, tps = pars['tps'])
else:
#Create a temporary command to be sent to the shell -- output directory in the tmp folder created previously
roi_command = self.command + ' -in ' + str(tmproiname) + ' -out ' + str(roitmp)
#Add these transform parameters for this roi
roi_command = roi_command + ' -tp ' + str(Path(pars['tp']))
#Send the command to the shell
RunTransformix(roi_command)
#Get a temporary result name for the output of transformix (assumes nifti for now)
roires_name = Path(os.path.join(roitmp,"result"+self.in_im.suffix))
#Create a new name for the roi result
roinew_name = Path(os.path.join(tmpfolds[roi],str(roi)+'_result'+str(i)+self.in_im.suffix))
#Rename the produced file :)
roires_name.rename(roinew_name)
#Update the list of output channel names
roi_channels[str(roi)].append(str(roinew_name))
#Clear the nifti objects
tmpcrop = None
#Clear the nifti full tissue result
niiResult = None
#Access each temporary roi results folder and concatenate the results
for roi, pars in crops.items():
#Concatenate the output channels into a single result file in the output directory
roi_results = nib.concat_images([str(i) for i in roi_channels[str(roi)]])
#Check to see if there is fixed image padding
if pars['fixed_pad'] != None:
#Get the value of the padding
pads = pars['fixed_pad']
#Extract only the needed region
roi_results = roi_results.get_fdata()[pads:-pads,pads:-pads,:]
#create a filename for the full nifti results
roi_name = Path(os.path.join(str(self.out_dir),str(roi)+"_result"+self.out_ext))
# use the exported from hdiutils
hdi_exporter.HDIexporter(full_result.get_fdata().transpose(1,0,2),roi_name)
#Remove roi results from memory
roi_results = None
#Now delete the temporary folder stored for the single channel ROI results
shutil.rmtree(tmpfolds[roi], ignore_errors=True)
# remove the temporary image if there was a nifti-1 intermediate
if self.intermediate:
# remove using pathlib
self.in_im.unlink()
#Print update
print("Finished")
#Return the command
#return self.command
def _singlechannelTransformixArray(self, niiIn):
"""Helper function for single channel transformix call on array data.
Parameters
----------
niiIn: class HDIreader
Input image that is read initially with the hdiutils python package.
The details of the image are stored inside of this class object,
and the ``Transformix`` class will access those to automatically
process the image.
"""
# here we will get the extension of the image and will convert it to the nift-1
# format if it is not already in that format. While users can supply their own
# nifti formatted image to the pipeline, this ensures that other file formats
# can be used, although, it creates additionally overhead
# here we supply all preprocessing commands that were used to preprocess or morph
# the array size of the input image through the hdiprep workflow. Transformix
# must be run on images with the same size as the elastix registration
if ((self.out_ext!=".nii") or (self.target_size!=None) or (self.pad!=None)):
# get the shape of the image
shp = len(niiIn.hdi.data.image_shape)
# create new name for the temporary image
tmp_nm = os.path.join(out_dir, next(tempfile._get_candidate_names())+".nii")
# export nifti intermediate
print('Creating nifti-1 intermediate for registration')
# check for padding
if self.pad!=None:
# pad the single-channel
niiIn.hdi.data.image = np.pad(niiIn.hdi.data.image,[(self.pad[0], self.pad[0]), (self.pad[1], self.pad[1])],mode='constant')
# check for image resizing
if (self.target_size != None) and (self.crops==None):
# transform the image
niiIn.hdi.data.image = resize(niiIn.hdi.data.image,self.target_size)
# Create nifti oject -- transpose axes because of the transformation!
nii_im = nib.Nifti1Image(niiIn.hdi.data.image.T, affine=np.eye(4))
#Save the nifti image
nib.save(nii_im,str(tmp_nm))
# remove the nifit memory
nii_im = None
# update the image name
print('Using nifti-1 intermediate for registration')
# update the input image
self.in_im = Path(tmp_nm)
# update the intermediate flag
self.intermediate = True
#Remove loaded image to clear memory
niiIn = None
#Print update
print('Detected single channel input images...')
#Update the fixed channels
self.in_channels.append(self.in_im)
#add transform -- check for list size
if len(self.tps) > 1:
#Run the composition function for transformix
res_name = MultiTransformix(in_im = self.in_im, out_dir = self.out_dir, tps = self.tps)
#Otherwise only use the first transform parameter
else:
#Updatethe command with the single channel path alone
self.command = self.command + ' -in ' + str(self.in_im)
#use the first transform parameter file
self.command = self.command + ' -tp ' + str(self.tps[0])
#Update the command with the output directory
self.command = self.command + ' -out ' + str(self.out_dir)
#Run single channel transformix without temporary directories
RunTransformix(self.command)
#Get a result name for the output of transformix (assumes nifti for now)
res_name = Path(os.path.join(self.out_dir,"result"+self.in_im.suffix))
#Create a new name
new_name = Path(os.path.join(self.out_dir,self.baseName+'_result'+self.out_ext))
# check if the output format needs to be switched -- set by the user
if (self.out_ext!=".nii") or (self.trim!=None):
# use HDIreader for now to parse image and exporter to export
niiIn = hdi_reader.HDIreader(
path_to_data=self.in_im,
path_to_markers=None,
flatten=False,
subsample=None,
mask=None,
save_mem=False
)
# check the trim
if self.trim!=None:
# trim the image borders
niiIn.hdi.data.image = niiIn.hdi.data.image[self.trim:-self.trim,self.trim:-self.trim]
# export new data
hdi_exporter.HDIexporter(niiIn.hdi.data.image,new_name)
else:
# simply rename the file that is already in the nifti format
res_name.rename(new_name)
def _multichannelTransformixArray(self, niiIn):
"""Helper function for running multichannel transformix on array data.
Parameters
----------
niiIn: class HDIreader
Input image that is read initially with the hdiutils python package.
The details of the image are stored inside of this class object,
and the ``Transformix`` class will access those to automatically
process the image.
"""
# create a temporary directory using the context manager for channel-wise images
with tempfile.TemporaryDirectory(dir=self.out_dir) as tmpdirname:
# Print update
print('Created temporary directory', tmpdirname)
# Iterate through the channels
for i in range(niiIn.hdi.data.num_channels):
# Print update
print('Working on slice '+str(i))
# Create a name for a temporary image
im_name = Path(os.path.join(tmpdirname,self.in_im.stem+str(i)+".nii"))
# Update the list of names for image channels
self.in_channels.append(im_name)
# set a temporary channel to work with throughout the data prep stage
slice_in = niiIn.hdi.data.image[:,:,i]
# Check to see if the path exists
if not im_name.is_file():
# check for padding
if self.pad!=None:
# pad the single-channel
slice_in = np.pad(slice_in,[(self.pad[0], self.pad[0]), (self.pad[1], self.pad[1])],mode='constant')
# Check to see if there is a target size for the image
if self.target_size!=None:
# Resize the image
slice_in = resize(slice_in,self.target_size)
# Create a nifti image from this slice
nii_im = nib.Nifti1Image(slice_in.T, affine=np.eye(4))
# remove memory
slice_in = None
#Save the nifti image
nib.save(nii_im,str(im_name))
#Remove the nifti slice to clear memory
nii_im = None
#add transform -- check for list size
if len(self.tps) > 1:
#Run the composition function for transformix
res_name = MultiTransformix(in_im = im_name, out_dir = tmpdirname, tps = self.tps)
else:
#Create a temporary command to be sent to the shell
tmp_command = self.command + ' -in ' + str(im_name) + ' -out ' + str(tmpdirname)
#Add full tissue transform paramaeters
tmp_command = tmp_command + ' -tp ' + str(self.tps[0])
#Send the command to the shell
RunTransformix(tmp_command)
#Get a temporary result name for the output of transformix (assumes nifti for now)
res_name = Path(os.path.join(tmpdirname,"result"+".nii"))
# remove the temporary image if there was a nifti-1 intermediate
im_name.unlink()
#Create a new name
new_name = Path(os.path.join(tmpdirname,self.in_im.stem+str(i)+'_result'+".nii"))
#Get the resulting image to rename (so we don't overwrite results)
res_name.rename(new_name)
#Update the list of output channel names
self.out_channels.append(new_name)
#Remove loaded image to clear memory
niiIn = None
#Concatenate the output channels into a single result file in the output directory
full_result = nib.concat_images([str(i) for i in self.out_channels])
#create a filename for the full nifti results
full_name = Path(os.path.join(self.out_dir,self.baseName+"_result"+self.out_ext))
# check if the output format needs to be switched -- set by the user
if (self.out_ext!=".nii") or (self.trim!=None):
# check the trim
if self.trim!=None:
# trim the image borders
full_result = full_result.get_fdata()[self.trim:-self.trim,self.trim:-self.trim,:]
# export new data
hdi_exporter.HDIexporter(full_result.transpose(1,0,2),full_name)
else:
# export the non trimmed image
hdi_exporter.HDIexporter(full_result.get_fdata().transpose(1,0,2),full_name)
else:
# export new data using the aggregated nifti objects
# doesnt need to be formally read in because it is memory
# mapped to the full_result object
hdi_exporter.HDIexporter(full_result.get_fdata().transpose(1,0,2),full_name)
def _multichannelTransformixRaster(self, niiIn):
"""Helper function for running multichannel transformix on raster data.
Parameters
----------
niiIn: class HDIreader
Input image that is read initially with the hdiutils python package.
The details of the image are stored inside of this class object,
and the ``Transformix`` class will access those to automatically
process the image.
"""
# create a temporary directory using the context manager for channel-wise images
with tempfile.TemporaryDirectory(dir=self.out_dir) as tmpdirname:
# Print update
print('Created temporary directory', tmpdirname)
# Iterate through the channels
for i in range(niiIn.hdi.data.num_channels):
# Print update
print('Working on slice '+str(i))
# Create a name for a temporary image
im_name = Path(os.path.join(tmpdirname,self.in_im.stem+str(i)+".nii"))
# Update the list of names for image channels
self.in_channels.append(im_name)
# set a temporary channel to work with throughout the data prep stage
slice_in = niiIn.hdi.CreateSingleChannelArray(i)
# Check to see if the path exists
if not im_name.is_file():
# check for padding
if self.pad!=None:
# pad the single-channel
slice_in = np.pad(slice_in,[(self.pad[0], self.pad[0]), (self.pad[1], self.pad[1])],mode='constant')
# Check to see if there is a target size for the image
if self.target_size!=None:
# Resize the image
slice_in = resize(slice_in,self.target_size)
# Create a nifti image from this slice
nii_im = nib.Nifti1Image(slice_in.T, affine=np.eye(4))
# remove memory
slice_in = None
#Save the nifti image
nib.save(nii_im,str(im_name))
#Remove the nifti slice to clear memory
nii_im = None
#add transform -- check for list size
if len(self.tps) > 1:
#Run the composition function for transformix
res_name = MultiTransformix(in_im = im_name, out_dir = tmpdirname, tps = self.tps)
else:
#Create a temporary command to be sent to the shell
tmp_command = self.command + ' -in ' + str(im_name) + ' -out ' + str(tmpdirname)
#Add full tissue transform paramaeters
tmp_command = tmp_command + ' -tp ' + str(self.tps[0])
#Send the command to the shell
RunTransformix(tmp_command)
#Get a temporary result name for the output of transformix (assumes nifti for now)
res_name = Path(os.path.join(tmpdirname,"result"+".nii"))
# remove the temporary image if there was a nifti-1 intermediate
im_name.unlink()
#Create a new name
new_name = Path(os.path.join(tmpdirname,self.in_im.stem+str(i)+'_result'+".nii"))
#Get the resulting image to rename (so we don't overwrite results)
res_name.rename(new_name)
#Update the list of output channel names
self.out_channels.append(new_name)
#Remove loaded image to clear memory
niiIn = None
#Concatenate the output channels into a single result file in the output directory
full_result = nib.concat_images([str(i) for i in self.out_channels])
#create a filename for the full nifti results
full_name = Path(os.path.join(self.out_dir,self.baseName+"_result"+self.out_ext))
# check if the output format needs to be switched -- set by the user
if (self.out_ext!=".nii") or (self.trim!=None):
# check the trim
if self.trim!=None:
# trim the image borders
full_result = full_result.get_fdata()[self.trim:-self.trim,self.trim:-self.trim,:]
# export new data
hdi_exporter.HDIexporter(full_result.transpose(1,0,2),full_name)
else:
# export the non trimmed image
hdi_exporter.HDIexporter(full_result.get_fdata().transpose(1,0,2),full_name)
else:
# export new data using the aggregated nifti objects
# doesnt need to be formally read in because it is memory
# mapped to the full_result object
hdi_exporter.HDIexporter(full_result.get_fdata().transpose(1,0,2),full_name)
#