Source code for netneurotools.freesurfer

# -*- coding: utf-8 -*-
"""Functions for working with FreeSurfer data and parcellations."""

import os
import os.path as op
import warnings

from nibabel.freesurfer import read_annot, read_geometry
import numpy as np
from scipy import sparse
try:  # scipy >= 1.8.0
    from scipy.ndimage._measurements import _stats, labeled_comprehension
except ImportError:  # scipy < 1.8.0
    from scipy.ndimage.measurements import _stats, labeled_comprehension
from scipy.spatial.distance import cdist

from .datasets import fetch_fsaverage
from .stats import gen_spinsamples
from .surface import make_surf_graph
from .utils import check_fs_subjid, run

FSIGNORE = [
    'unknown', 'corpuscallosum', 'Background+FreeSurfer_Defined_Medial_Wall'
]


[docs]def apply_prob_atlas(subject_id, gcs, hemi, *, orig='white', annot=None, ctab=None, subjects_dir=None, use_cache=True, quiet=False): """ Create an annotation file for `subject_id` by applying atlas in `gcs`. Runs subprocess calling FreeSurfer's "mris_ca_label" function; as such, FreeSurfer must be installed and accesible on the local system path. Parameters ---------- subject_id : str FreeSurfer subject ID gcs : str Filepath to .gcs file containing classifier array hemi : {'lh', 'rh'} Hemisphere corresponding to `gcs` file orig : str, optional Original surface to which to apply classifer. Default: 'white' annot : str, optional Path to output annotation file to generate. If set to None, the name is created from the provided `hemi` and `gcs`. If provided as a relative path, it is assumed to stem from `subjects_dir`/`subject_id`. Default: None ctab : str, optional Path to colortable corresponding to `gcs`. Default: None subjects_dir : str, optional Path to FreeSurfer subject directory. If not set, will inherit from the environmental variable $SUBJECTS_DIR. Default: None use_cache : bool, optional Whether to check for existence of `annot` in directory specified by `{subjects_dir}/{subject_id}/label' and use that, if it exists. If False, will create a new annot file. Default: True quiet : bool, optional Whether to restrict status messages. Default: False Returns ------- annot : str Path to generated annotation file """ cmd = 'mris_ca_label {opts}{subject_id} {hemi} {hemi}.sphere.reg ' \ '{gcs} {annot}' if hemi not in ['rh', 'lh']: raise ValueError('Provided hemisphere designation `hemi` must be one ' 'of \'rh\' or \'lh\'. Provided: {}'.format(hemi)) if not op.isfile(gcs): raise ValueError('Cannot find specified `gcs` file {}.'.format(gcs)) subject_id, subjects_dir = check_fs_subjid(subject_id, subjects_dir) # add all the options together, as specified opts = '' if ctab is not None and op.isfile(ctab): opts += '-t {} '.format(ctab) if orig is not None: opts += '-orig {} '.format(orig) if subjects_dir is not None: opts += '-sdir {} '.format(subjects_dir) else: subjects_dir = os.environ['SUBJECTS_DIR'] # generate output filename if annot is None: base = '{}.{}.annot'.format(hemi, gcs[:-4]) annot = op.join(subjects_dir, subject_id, 'label', base) else: # if not a full path, assume relative from subjects_dir/subject_id if not annot.startswith(op.abspath(os.sep)): annot = op.join(subjects_dir, subject_id, annot) # if annotation file doesn't exist or we explicitly want to make a new one if not op.isfile(annot) or not use_cache: run(cmd.format(opts=opts, subject_id=subject_id, hemi=hemi, gcs=gcs, annot=annot), quiet=quiet) return annot
def _decode_list(vals): """List decoder.""" return [val.decode() if hasattr(val, 'decode') else val for val in vals]
[docs]def find_parcel_centroids(*, lhannot, rhannot, method='surface', version='fsaverage', surf='sphere', drop=None): """ Return vertex coords corresponding to centroids of parcels in annotations. Note that using any other `surf` besides the default of 'sphere' may result in centroids that are not directly within the parcels themselves due to sulcal folding patterns. Parameters ---------- {lh,rh}annot : str Path to .annot file containing labels of parcels on the {left,right} hemisphere. These must be specified as keyword arguments to avoid accidental order switching. method : {'average', 'surface', 'geodesic'}, optional Method for calculation of parcel centroid. See Notes for more information. Default: 'surface' version : str, optional Specifies which version of `fsaverage` provided annotation files correspond to. Must be one of {'fsaverage', 'fsaverage3', 'fsaverage4', 'fsaverage5', 'fsaverage6'}. Default: 'fsaverage' surf : str, optional Specifies which surface projection of fsaverage to use for finding parcel centroids. Default: 'sphere' drop : list, optional Specifies regions in {lh,rh}annot for which the parcel centroid should not be calculated. If not specified, centroids for parcels defined in `netneurotools.freesurfer.FSIGNORE` are not calculated. Default: None Returns ------- centroids : (N, 3) numpy.ndarray xyz coordinates of vertices closest to the centroid of each parcel defined in `lhannot` and `rhannot` hemiid : (N,) numpy.ndarray Array denoting hemisphere designation of coordinates in `centroids`, where `hemiid=0` denotes the left and `hemiid=1` the right hemisphere Notes ----- The following methods can be used for finding parcel centroids: 1. ``method='average'`` Uses the arithmetic mean of the coordinates for the vertices in each parcel. Note that in this case the calculated centroids will not act actually fall on the surface of `surf`. 2. ``method='surface'`` Calculates the 'average' coordinates and then finds the closest vertex on `surf`, where closest is defined as the vertex with the minimum Euclidean distance. 3. ``method='geodesic'`` Uses the coordinates of the vertex with the minimum average geodesic distance to all other vertices in the parcel. Note that this is slightly more time-consuming than the other two methods, especially for high-resolution meshes. """ methods = ['average', 'surface', 'geodesic'] if method not in methods: raise ValueError('Provided method for centroid calculation {} is ' 'invalid. Must be one of {}'.format(methods, methods)) if drop is None: drop = FSIGNORE drop = _decode_list(drop) surfaces = fetch_fsaverage(version)[surf] centroids, hemiid = [], [] for n, (annot, surf) in enumerate(zip([lhannot, rhannot], surfaces)): vertices, faces = read_geometry(surf) labels, ctab, names = read_annot(annot) names = _decode_list(names) for lab in np.unique(labels): if names[lab] in drop: continue if method in ['average', 'surface']: roi = np.atleast_2d(vertices[labels == lab].mean(axis=0)) if method == 'surface': # find closest vertex on the sphere roi = vertices[np.argmin(cdist(vertices, roi), axis=0)[0]] elif method == 'geodesic': inds, = np.where(labels == lab) roi = _geodesic_parcel_centroid(vertices, faces, inds) centroids.append(roi) hemiid.append(n) return np.vstack(centroids), np.asarray(hemiid)
def _geodesic_parcel_centroid(vertices, faces, inds): """ Calculate parcel centroids based on surface distance. Parameters ---------- vertices : (N, 3) Coordinates of vertices defining surface faces : (F, 3) Triangular faces defining surface inds : (R,) Indices of `vertices` that belong to parcel Returns ------- roi : (3,) numpy.ndarray Vertex corresponding to centroid of parcel """ mask = np.ones(len(vertices), dtype=bool) mask[inds] = False mat = make_surf_graph(vertices, faces, mask=mask) paths = sparse.csgraph.dijkstra(mat, directed=False, indices=inds)[:, inds] # the selected vertex is the one with the minimum average shortest path # to the other vertices in the parcel roi = vertices[inds[paths.mean(axis=1).argmin()]] return roi
[docs]def parcels_to_vertices(data, *, lhannot, rhannot, drop=None): """ Project parcellated `data` to vertices defined in annotation files. Assigns np.nan to all ROIs in `drop` Parameters ---------- data : (N,) numpy.ndarray Parcellated data to be projected to vertices. Parcels should be ordered by [left, right] hemisphere; ordering within hemisphere should correspond to the provided annotation files. {lh,rh}annot : str Path to .annot file containing labels of parcels on the {left,right} hemisphere. These must be specified as keyword arguments to avoid accidental order switching. drop : list, optional Specifies regions in {lh,rh}annot that are not present in `data`. NaNs will be inserted in place of the these regions in the returned data. If not specified, parcels defined in `netneurotools.freesurfer.FSIGNORE` are assumed to not be present. Default: None Returns ------- projected : numpy.ndarray Vertex-level data """ if drop is None: drop = FSIGNORE drop = _decode_list(drop) data = np.vstack(data).astype(float) # check this so we're not unduly surprised by anything... n_vert = expected = 0 for a in [lhannot, rhannot]: vn, _, names = read_annot(a) n_vert += len(vn) names = _decode_list(names) expected += len(names) - len(set(drop) & set(names)) if expected != len(data): raise ValueError('Number of parcels in provided annotation files ' 'differs from size of parcellated data array.\n' ' EXPECTED: {} parcels\n' ' RECEIVED: {} parcels' .format(expected, len(data))) projected = np.zeros((n_vert, data.shape[-1]), dtype=data.dtype) start = end = n_vert = 0 for annot in [lhannot, rhannot]: # read files and update end index for `data` labels, ctab, names = read_annot(annot) names = _decode_list(names) todrop = set(names) & set(drop) end += len(names) - len(todrop) # unknown and corpuscallosum # get indices of unknown and corpuscallosum and insert NaN values inds = sorted([names.index(f) for f in todrop]) inds = [f - n for n, f in enumerate(inds)] currdata = np.insert(data[start:end], inds, np.nan, axis=0) # project to vertices and store projected[n_vert:n_vert + len(labels), :] = currdata[labels] start = end n_vert += len(labels) return np.squeeze(projected)
[docs]def vertices_to_parcels(data, *, lhannot, rhannot, drop=None): """ Reduce vertex-level `data` to parcels defined in annotation files. Takes average of vertices within each parcel, excluding np.nan values (i.e., np.nanmean). Assigns np.nan to parcels for which all vertices are np.nan. Parameters ---------- data : (N,) numpy.ndarray Vertex-level data to be reduced to parcels {lh,rh}annot : str Path to .annot file containing labels to parcels on the {left,right} hemisphere drop : list, optional Specifies regions in {lh,rh}annot that should be removed from the parcellated version of `data`. If not specified, vertices corresponding to parcels defined in `netneurotools.freesurfer.FSIGNORE` will be removed. Default: None Returns ------- reduced : numpy.ndarray Parcellated `data`, without regions specified in `drop` """ if drop is None: drop = FSIGNORE drop = _decode_list(drop) data = np.vstack(data) n_parc = expected = 0 for a in [lhannot, rhannot]: vn, _, names = read_annot(a) expected += len(vn) names = _decode_list(names) n_parc += len(names) - len(set(drop) & set(names)) if expected != len(data): raise ValueError('Number of vertices in provided annotation files ' 'differs from size of vertex-level data array.\n' ' EXPECTED: {} vertices\n' ' RECEIVED: {} vertices' .format(expected, len(data))) reduced = np.zeros((n_parc, data.shape[-1]), dtype=data.dtype) start = end = n_parc = 0 for annot in [lhannot, rhannot]: # read files and update end index for `data` labels, ctab, names = read_annot(annot) names = _decode_list(names) indices = np.unique(labels) end += len(labels) for idx in range(data.shape[-1]): # get average of vertex-level data within parcels # set all NaN values to 0 before calling `_stats` because we are # returning sums, so the 0 values won't impact the sums (if we left # the NaNs then all parcels with even one NaN entry would be NaN) currdata = np.squeeze(data[start:end, idx]) isna = np.isnan(currdata) counts, sums = _stats(np.nan_to_num(currdata), labels, indices) # however, we do need to account for the NaN values in the counts # so that our means are similar to what we'd get from e.g., # np.nanmean here, our "sums" are the counts of NaN values in our # parcels _, nacounts = _stats(isna, labels, indices) counts = (np.asanyarray(counts, dtype=float) - np.asanyarray(nacounts, dtype=float)) with np.errstate(divide='ignore', invalid='ignore'): currdata = sums / counts # get indices of unkown and corpuscallosum and delete from parcels inds = sorted([names.index(f) for f in set(drop) & set(names)]) currdata = np.delete(currdata, inds) # store parcellated data reduced[n_parc:n_parc + len(names) - len(inds), idx] = currdata start = end n_parc += len(names) - len(inds) return np.squeeze(reduced)
def _get_fsaverage_coords(version='fsaverage', surface='sphere'): """ Get vertex coordinates for specified `surface` of fsaverage `version`. Parameters ---------- version : str, optional One of {'fsaverage', 'fsaverage3', 'fsaverage4', 'fsaverage5', 'fsaverage6'}. Default: 'fsaverage' surface : str, optional Surface for which to return vertex coordinates. Default: 'sphere' Returns ------- coords : (N, 3) numpy.ndarray xyz coordinates of vertices for {left,right} hemisphere hemiid : (N,) numpy.ndarray Array denoting hemisphere designation of entries in `coords`, where `hemiid=0` denotes the left and `hemiid=1` the right hemisphere """ # get coordinates and hemisphere designation for spin generation lhsphere, rhsphere = fetch_fsaverage(version)[surface] coords, hemi = [], [] for n, sphere in enumerate([lhsphere, rhsphere]): coords.append(read_geometry(sphere)[0]) hemi.append(np.ones(len(coords[-1])) * n) return np.vstack(coords), np.hstack(hemi) def _get_fsaverage_spins(version='fsaverage', spins=None, n_rotate=1000, **kwargs): """ Generate spatial permutation resamples for fsaverage `version`. If `spins` are provided then performs checks to confirm they are valid Parameters ---------- version : str, optional Specifies which version of `fsaverage` for which to generate spins. Must be one of {'fsaverage', 'fsaverage3', 'fsaverage4', 'fsaverage5', 'fsaverage6'}. Default: 'fsaverage' spins : array_like, optional Pre-computed spins to use instead of generating them on the fly. If not provided will use other provided parameters to create them. Default: None n_rotate : int, optional Number of rotations to generate. Default: 1000 return_cost : bool, optional Whether to return cost array (specified as Euclidean distance) for each coordinate for each rotation. Currently this option is not supported if pre-computed `spins` are provided. Default: True kwargs : key-value pairs Keyword arguments passed to `netneurotools.stats.gen_spinsamples` Returns ------- spins : (N, S) numpy.ndarray Resampling array """ if spins is None: coords, hemiid = _get_fsaverage_coords(version, 'sphere') spins = gen_spinsamples(coords, hemiid, n_rotate=n_rotate, **kwargs) if kwargs.get('return_cost'): return spins spins = np.asarray(spins, dtype='int32') if spins.shape[-1] != n_rotate: warnings.warn('Shape of provided `spins` array does not match ' 'number of rotations requested with `n_rotate`. ' 'Ignoring specified `n_rotate` parameter and using ' 'all provided `spins`.', stacklevel=2) n_rotate = spins.shape[-1] return spins, None
[docs]def spin_data(data, *, lhannot, rhannot, version='fsaverage', n_rotate=1000, spins=None, drop=None, verbose=False, **kwargs): """ Project parcellated `data` to surface, rotates, and re-parcellates. Projection to the surface uses `{lh,rh}annot` files. Rotation uses vertex coordinates from the specified fsaverage `version` and relies on :func:`netneurotools.stats.gen_spinsamples`. Re-parcellated data will not be exactly identical to original values due to re-averaging process. Parcels subsumed by regions in `drop` will be listed as NaN. Parameters ---------- data : (N,) numpy.ndarray Parcellated data to be rotated. Parcels should be ordered by [left, right] hemisphere; ordering within hemisphere should correspond to the provided `{lh,rh}annot` annotation files. {lh,rh}annot : str Path to .annot file containing labels to parcels on the {left,right} hemisphere version : str, optional Specifies which version of `fsaverage` provided annotation files correspond to. Must be one of {'fsaverage', 'fsaverage3', 'fsaverage4', 'fsaverage5', 'fsaverage6'}. Default: 'fsaverage' n_rotate : int, optional Number of rotations to generate. Default: 1000 spins : array_like, optional Pre-computed spins to use instead of generating them on the fly. If not provided will use other provided parameters to create them. Default: None drop : list, optional Specifies regions in {lh,rh}annot that are not present in `data`. NaNs will be inserted in place of the these regions in the returned data. If not specified, parcels defined in `netneurotools.freesurfer.FSIGNORE` are assumed to not be present. Default: None verbose : bool, optional Whether to print occasional status messages. Default: False kwargs : key-value pairs Keyword arguments passed to `netneurotools.stats.gen_spinsamples` Returns ------- rotated : (N, `n_rotate`) numpy.ndarray Rotated `data cost : (N, `n_rotate`,) numpy.ndarray Cost (specified as Euclidean distance) of re-assigning each coordinate for every rotation in `spinsamples`. Only provided if `return_cost` is True. """ if drop is None: drop = FSIGNORE # get coordinates and hemisphere designation for spin generation vertices = parcels_to_vertices(data, lhannot=lhannot, rhannot=rhannot, drop=drop) # get spins + cost (if requested) spins, cost = _get_fsaverage_spins(version=version, spins=spins, n_rotate=n_rotate, verbose=verbose, **kwargs) if len(vertices) != len(spins): raise ValueError('Provided annotation files have a different ' 'number of vertices than the specified fsaverage ' 'surface.\n ANNOTATION: {} vertices\n ' 'FSAVERAGE: {} vertices' .format(len(vertices), len(spins))) spun = np.zeros(data.shape + (n_rotate,)) for n in range(n_rotate): if verbose: msg = f'Reducing vertices to parcels: {n:>5}/{n_rotate}' print(msg, end='\b' * len(msg), flush=True) spun[..., n] = vertices_to_parcels(vertices[spins[:, n]], lhannot=lhannot, rhannot=rhannot, drop=drop) if verbose: print(' ' * len(msg) + '\b' * len(msg), end='', flush=True) if kwargs.get('return_cost'): return spun, cost return spun
[docs]def spin_parcels(*, lhannot, rhannot, version='fsaverage', n_rotate=1000, spins=None, drop=None, verbose=False, **kwargs): """ Rotate parcels in `{lh,rh}annot` and re-assigns based on maximum overlap. Vertex labels are rotated with :func:`netneurotools.stats.gen_spinsamples` and a new label is assigned to each *parcel* based on the region maximally overlapping with its boundaries. Parameters ---------- {lh,rh}annot : str Path to .annot file containing labels to parcels on the {left,right} hemisphere version : str, optional Specifies which version of `fsaverage` provided annotation files correspond to. Must be one of {'fsaverage', 'fsaverage3', 'fsaverage4', 'fsaverage5', 'fsaverage6'}. Default: 'fsaverage' n_rotate : int, optional Number of rotations to generate. Default: 1000 spins : array_like, optional Pre-computed spins to use instead of generating them on the fly. If not provided will use other provided parameters to create them. Default: None drop : list, optional Specifies regions in {lh,rh}annot that are not present in `data`. NaNs will be inserted in place of the these regions in the returned data. If not specified, parcels defined in `netneurotools.freesurfer.FSIGNORE` are assumed to not be present. Default: None seed : {int, np.random.RandomState instance, None}, optional Seed for random number generation. Default: None verbose : bool, optional Whether to print occasional status messages. Default: False return_cost : bool, optional Whether to return cost array (specified as Euclidean distance) for each coordinate for each rotation. Default: True kwargs : key-value pairs Keyword arguments passed to `netneurotools.stats.gen_spinsamples` Returns ------- spinsamples : (N, `n_rotate`) numpy.ndarray Resampling matrix to use in permuting data parcellated with labels from {lh,rh}annot, where `N` is the number of parcels. Indices of -1 indicate that the parcel was completely encompassed by regions in `drop` and should be ignored. cost : (N, `n_rotate`,) numpy.ndarray Cost (specified as Euclidean distance) of re-assigning each coordinate for every rotation in `spinsamples`. Only provided if `return_cost` is True. """ def overlap(vals): """Return most common non-negative value in `vals`; -1 if all neg.""" vals = np.asarray(vals) vals, counts = np.unique(vals[vals > 0], return_counts=True) try: return vals[counts.argmax()] except ValueError: return -1 if drop is None: drop = FSIGNORE drop = _decode_list(drop) # get vertex-level labels (set drop labels to - values) vertices, end = [], 0 for n, annot in enumerate([lhannot, rhannot]): labels, ctab, names = read_annot(annot) names = _decode_list(names) todrop = set(names) & set(drop) inds = [names.index(f) - n for n, f in enumerate(todrop)] labs = np.arange(len(names) - len(inds)) + (end - (len(inds) * n)) insert = np.arange(-1, -(len(inds) + 1), -1) vertices.append(np.insert(labs, inds, insert)[labels]) end += len(names) vertices = np.hstack(vertices) labels = np.unique(vertices) mask = labels > -1 # get spins + cost (if requested) spins, cost = _get_fsaverage_spins(version=version, spins=spins, n_rotate=n_rotate, verbose=verbose, **kwargs) if len(vertices) != len(spins): raise ValueError('Provided annotation files have a different ' 'number of vertices than the specified fsaverage ' 'surface.\n ANNOTATION: {} vertices\n ' 'FSAVERAGE: {} vertices' .format(len(vertices), len(spins))) # spin and assign regions based on max overlap regions = np.zeros((len(labels[mask]), n_rotate), dtype='int32') for n in range(n_rotate): if verbose: msg = f'Calculating parcel overlap: {n:>5}/{n_rotate}' print(msg, end='\b' * len(msg), flush=True) regions[:, n] = labeled_comprehension(vertices[spins[:, n]], vertices, labels, overlap, int, -1)[mask] if kwargs.get('return_cost'): return regions, cost return regions