# -*- coding: utf-8 -*-
"""Functions for generating group-level networks from individual measurements."""
import bct
import numpy as np
from tqdm import tqdm
from scipy.sparse import csgraph
from sklearn.utils.validation import (check_random_state, check_array,
check_consistent_length)
from . import utils
try:
from numba import njit
use_numba = True
except ImportError:
use_numba = False
[docs]def func_consensus(data, n_boot=1000, ci=95, seed=None):
"""
Calculate thresholded group consensus functional connectivity graph.
This function concatenates all time series in `data` and computes a group
correlation matrix based on this extended time series. It then generates
length `T` bootstrapped samples from the concatenated matrix and estimates
confidence intervals for all correlations. Correlations whose sign is
consistent across bootstraps are retained; inconsistent correlations are
set to zero.
If `n_boot` is set to 0 or None a simple, group-averaged functional
connectivity matrix is estimated, instead.
Parameters
----------
data : (N, T, S) array_like (or a list of S arrays, each shaped as (N, T))
Pre-processed functional time series, where `N` is the number of nodes,
`T` is the number of volumes in the time series, and `S` is the number
of subjects.
n_boot : int, optional
Number of bootstraps for which to generate correlation. Default: 1000
ci : (0, 100) float, optional
Confidence interval for which to assess the reliability of correlations
with bootstraps. Default: 95
seed : int, optional
Random seed. Default: None
Returns
-------
consensus : (N, N) numpy.ndarray
Thresholded, group-level correlation matrix
References
----------
Mišić, B., Betzel, R. F., Nematzadeh, A., Goni, J., Griffa, A., Hagmann,
P., Flammini, A., Ahn, Y.-Y., & Sporns, O. (2015). Cooperative and
competitive spreading dynamics on the human connectome. Neuron, 86(6),
1518-1529.
"""
# check inputs
rs = check_random_state(seed)
if ci > 100 or ci < 0:
raise ValueError("`ci` must be between 0 and 100.")
# group-average functional connectivity matrix desired instead of bootstrap
if n_boot == 0 or n_boot is None:
if isinstance(data, list):
corrs = [np.corrcoef(sub) for sub in data]
else:
corrs = [np.corrcoef(data[..., sub]) for sub in
range(data.shape[-1])]
return np.nanmean(corrs, axis=0)
if isinstance(data, list):
collapsed_data = np.hstack(data)
nsample = int(collapsed_data.shape[-1] / len(data))
else:
collapsed_data = data.reshape((len(data), -1), order='F')
nsample = data.shape[1]
consensus = np.corrcoef(collapsed_data)
# only keep the upper triangle for the bootstraps to save on memory usage
triu_inds = np.triu_indices_from(consensus, k=1)
bootstrapped_corrmat = np.zeros((len(triu_inds[0]), n_boot))
# generate `n_boot` bootstrap correlation matrices by sampling `t` time
# points from the concatenated time series
for boot in range(n_boot):
inds = rs.randint(collapsed_data.shape[-1], size=nsample)
bootstrapped_corrmat[..., boot] = \
np.corrcoef(collapsed_data[:, inds])[triu_inds]
# extract the CIs from the bootstrapped correlation matrices
# we don't need the input anymore so overwrite it
bootstrapped_ci = np.percentile(bootstrapped_corrmat, [100 - ci, ci],
axis=-1, overwrite_input=True)
# remove unreliable (i.e., CI zero-crossing) correlations
# if the signs of the bootstrapped confidence intervals are different
# (i.e., their signs sum to 0), then we want to remove them
# so, take the logical not of the CI (CI = 0 ---> True) and create a mask
# then, set all connections from the consensus array inside the mask to 0
remove_inds = np.logical_not(np.sign(bootstrapped_ci).sum(axis=0))
mask = np.zeros_like(consensus, dtype=bool)
mask[triu_inds] = remove_inds
consensus[mask + mask.T] = 0
return consensus
def _ecdf(data):
"""
Estimate empirical cumulative distribution function of `data`.
Taken directly from StackOverflow. See original answer at
https://stackoverflow.com/questions/33345780.
Parameters
----------
data : array_like
Returns
-------
prob : numpy.ndarray
Cumulative probability
quantiles : numpy.darray
Quantiles
"""
sample = np.atleast_1d(data)
# find the unique values and their corresponding counts
quantiles, counts = np.unique(sample, return_counts=True)
# take the cumulative sum of the counts and divide by the sample size to
# get the cumulative probabilities between 0 and 1
prob = np.cumsum(counts).astype(float) / sample.size
# match MATLAB
prob, quantiles = np.append([0], prob), np.append(quantiles[0], quantiles)
return prob, quantiles
[docs]def struct_consensus(data, distance, hemiid,
conn_num_inter=None,
conn_num_intra=None,
weighted=False):
"""
Calculate distance-dependent group consensus structural connectivity graph.
Takes as input a weighted stack of connectivity matrices with dimensions
(N, N, S) where `N` is the number of nodes and `S` is the number of
matrices or subjects. The matrices must be weighted, and ideally with
continuous weights (e.g. fractional anisotropy rather than streamline
count). The second input is a pairwise distance matrix, where distance(i,j)
is the Euclidean distance between nodes i and j. The final input is an
(N, 1) vector which labels nodes as belonging to the right (`hemiid==0`) or
left (`hemiid=1`) hemisphere (note that these values can be flipped as long
as `hemiid` contains only values of 0 and 1).
This function estimates the average edge length distribution and builds
a group-averaged connectivity matrix that approximates this distribution
with density equal to the mean density across subjects.
The algorithm works as follows:
1. Estimate the cumulative edge length distribution,
2. Divide the distribution into M length bins, one for each edge that will
be added to the group-average matrix, and
3. Within each bin, select the edge that is most consistently expressed
expressed across subjects, breaking ties according to average edge
weight (which is why the input matrix `data` must be weighted).
The algorithm works separately on within/between hemisphere links.
M is the sum of `conn_num_inter` and `conn_num_intra`, if provided.
Otherwise, M is estimated from the data.
Parameters
----------
data : (N, N, S) array_like
Weighted connectivity matrices (i.e., fractional anisotropy), where `N`
is nodes and `S` is subjects
distance : (N, N) array_like
Array where `distance[i, j]` is the Euclidean distance between nodes
`i` and `j`
hemiid : (N, 1) array_like
Hemisphere designation for `N` nodes where a value of 0/1 indicates
node `N_{i}` is in the right/left hemisphere, respectively
conn_num_inter : int, optional
Number of inter-hemispheric connections to include in the consensus
matrix. If `None`, the number of inter-hemispheric connections will be
estimated from the data. Default = `None`.
conn_num_intra : int, optional
Number of intra-hemispheric connections to include in the consensus
matrix. If `None`, the number of intra-hemispheric connections will be
estimated from the data. Default = `None`.
weighted : bool
Flag indicating whether or not to return a weighted consensus map. If
`True`, the consensus will be multiplied by the mean of `data`.
Returns
-------
consensus : (N, N) numpy.ndarray
Binary (default) or mean-weighted group-level connectivity matrix
References
----------
Betzel, R. F., Griffa, A., Hagmann, P., & Mišić, B. (2018). Distance-
dependent consensus thresholds for generating group-representative
structural brain networks. Network Neuroscience, 1-22.
"""
# confirm input shapes are as expected
check_consistent_length(data, distance, hemiid)
try:
hemiid = check_array(hemiid, ensure_2d=True)
except ValueError:
raise ValueError('Provided hemiid must be a 2D array. Reshape your '
'data using array.reshape(-1, 1) and try again.') from None
num_node, _, num_sub = data.shape # info on connectivity matrices
pos_data = data > 0 # location of + values in matrix
pos_data_count = pos_data.sum(axis=2) # num sub with + values at each node
with np.errstate(divide='ignore', invalid='ignore'):
average_weights = data.sum(axis=2) / pos_data_count
# empty array to hold inter/intra hemispheric connections
consensus = np.zeros((num_node, num_node, 2))
for conn_type in range(2): # iterate through inter/intra hemisphere conn
if conn_type == 0: # get inter hemisphere edges
inter_hemi = (hemiid == 0) @ (hemiid == 1).T
keep_conn = np.logical_or(inter_hemi, inter_hemi.T)
else: # get intra hemisphere edges
right_hemi = (hemiid == 0) @ (hemiid == 0).T
left_hemi = (hemiid == 1) @ (hemiid == 1).T
keep_conn = np.logical_or(right_hemi @ right_hemi.T,
left_hemi @ left_hemi.T)
# mask the distance array for only those edges we want to examine
full_dist_conn = distance * keep_conn
upper_dist_conn = np.atleast_3d(np.triu(full_dist_conn))
# generate array of weighted (by distance), positive edges across subs
pos_dist = pos_data * upper_dist_conn
pos_dist = pos_dist[np.nonzero(pos_dist)]
# determine average # of positive edges across subs
# we will use this to bin the edge weights
if conn_type == 0:
if conn_num_inter is None:
avg_conn_num = len(pos_dist) / num_sub
else:
avg_conn_num = conn_num_inter
else:
if conn_num_intra is None:
avg_conn_num = len(pos_dist) / num_sub
else:
avg_conn_num = conn_num_intra
# estimate empirical CDF of weighted, positive edges across subs
cumprob, quantiles = _ecdf(pos_dist)
cumprob = np.round(cumprob * avg_conn_num).astype(int)
# empty array to hold group-average matrix for current connection type
# (i.e., inter/intra hemispheric connections)
group_conn_type = np.zeros((num_node, num_node))
# iterate through bins (for edge weights)
for n in range(1, int(avg_conn_num) + 1):
# get current quantile of interest
curr_quant = quantiles[np.logical_and(cumprob >= (n - 1),
cumprob < n)]
if curr_quant.size == 0:
continue
# find edges in distance connectivity matrix w/i current quantile
mask = np.logical_and(full_dist_conn >= curr_quant.min(),
full_dist_conn <= curr_quant.max())
i, j = np.where(np.triu(mask)) # indices of edges of interest
c = pos_data_count[i, j] # get num sub with + values at edges
w = average_weights[i, j] # get averaged weight of edges
# find locations of edges most commonly represented across subs
indmax = np.argwhere(c == c.max())
# determine index of most frequent edge; break ties with higher
# weighted edge
if indmax.size == 1: # only one edge found
group_conn_type[i[indmax], j[indmax]] = 1
else: # multiple edges found
indmax = indmax[np.argmax(w[indmax])]
group_conn_type[i[indmax], j[indmax]] = 1
consensus[:, :, conn_type] = group_conn_type
# collapse across hemispheric connections types and make symmetrical array
consensus = consensus.sum(axis=2)
consensus = np.logical_or(consensus, consensus.T).astype(int)
if weighted:
consensus = consensus * np.mean(data, axis=2)
return consensus
[docs]def binarize_network(network, retain=10, keep_diag=False):
"""
Keep top `retain` % of connections in `network` and binarizes.
Uses the upper triangle for determining connection percentage, which may
result in disconnected nodes. If this behavior is not desired see
:py:func:`netneurotools.networks.threshold_network`.
Parameters
----------
network : (N, N) array_like
Input graph
retain : [0, 100] float, optional
Percent connections to retain. Default: 10
keep_diag : bool, optional
Whether to keep the diagonal instead of setting it to 0. Default: False
Returns
-------
binarized : (N, N) numpy.ndarray
Binarized, thresholded graph
See Also
--------
netneurotools.networks.threshold_network
"""
if retain < 0 or retain > 100:
raise ValueError('Value provided for `retain` is outside [0, 100]: {}'
.format(retain))
prctile = 100 - retain
triu = utils.get_triu(network)
thresh = np.percentile(triu, prctile, axis=0, keepdims=True)
binarized = np.array(network > thresh, dtype=int)
if not keep_diag:
binarized[np.diag_indices(len(binarized))] = 0
return binarized
[docs]def threshold_network(network, retain=10):
"""
Keep top `retain` % of connections in `network` and binarizes.
Uses a minimum spanning tree to ensure that no nodes are disconnected from
the resulting thresholded graph
Parameters
----------
network : (N, N) array_like
Input graph
retain : [0, 100] float, optional
Percent connections to retain. Default: 10
Returns
-------
thresholded : (N, N) numpy.ndarray
Binarized, thresholded graph
See Also
--------
netneurotools.networks.binarize_network
"""
if retain < 0 or retain > 100:
raise ValueError('Value provided for `retain` must be a percent '
'in range [0, 100]. Provided: {}'.format(retain))
# get number of nodes in graph and invert weights (MINIMUM spanning tree)
nodes = len(network)
graph = np.triu(network * -1)
# find MST and count # of edges in graph
mst = csgraph.minimum_spanning_tree(graph).todense()
mst_edges = np.sum(mst != 0)
# determine # of remaining edges and ensure we're not over the limit
remain = int((retain / 100) * ((nodes * (nodes - 1)) / 2)) - mst_edges
if remain < 0:
raise ValueError('Minimum spanning tree with {} edges exceeds desired '
'connection density of {}% ({} edges). Cannot '
'proceed with graph creation.'
.format(mst_edges, retain, remain + mst_edges))
# zero out edges already in MST and then get indices of next best edges
graph -= mst
inds = utils.get_triu(graph).argsort()[:remain]
inds = tuple(e[inds] for e in np.triu_indices_from(graph, k=1))
# add edges to MST, symmetrize, and convert to binary matrix
mst[inds] = graph[inds]
mst = np.array((mst + mst.T) != 0, dtype=int)
return mst
[docs]def match_length_degree_distribution(W, D, nbins=10, nswap=1000,
replacement=False, weighted=True,
seed=None):
"""
Generate degree- and edge length-preserving surrogate connectomes.
Parameters
----------
W : (N, N) array-like
weighted or binary symmetric connectivity matrix.
D : (N, N) array-like
symmetric distance matrix.
nbins : int
number of distance bins (edge length matrix is performed by swapping
connections in the same bin). Default = 10.
nswap : int
total number of edge swaps to perform. Recommended = nnodes * 20
Default = 1000.
replacement : bool, optional
if True all the edges are available for swapping. Default= False
weighted : bool, optional
Whether to return weighted rewired connectivity matrix. Default = True
seed : float, optional
Random seed. Default = None
Returns
-------
newB : (N, N) array-like
binary rewired matrix
newW: (N, N) array-like
weighted rewired matrix. Returns matrix of zeros if weighted=False.
nr : int
number of successful rewires
Notes
-----
Takes a weighted, symmetric connectivity matrix `data` and Euclidean/fiber
length matrix `distance` and generates a randomized network with:
1. exactly the same degree sequence
2. approximately the same edge length distribution
3. exactly the same edge weight distribution
4. approximately the same weight-length relationship
Reference
---------
Betzel, R. F., Bassett, D. S. (2018) Specificity and robustness of
long-distance connections in weighted, interareal connectomes. PNAS.
"""
rs = check_random_state(seed)
N = len(W)
# divide the distances by lengths
bins = np.linspace(D[D.nonzero()].min(), D[D.nonzero()].max(), nbins + 1)
bins[-1] += 1
L = np.zeros((N, N))
for n in range(nbins):
i, j = np.where(np.logical_and(bins[n] <= D, D < bins[n + 1]))
L[i, j] = n + 1
# binarized connectivity
B = (W != 0).astype(np.int_)
# existing edges (only upper triangular cause it's symmetric)
cn_x, cn_y = np.where(np.triu((B != 0) * B, k=1))
tries = 0
nr = 0
newB = np.copy(B)
while ((len(cn_x) >= 2) & (nr < nswap)):
# choose randomly the edge to be rewired
r = rs.randint(len(cn_x))
n_x, n_y = cn_x[r], cn_y[r]
tries += 1
# options to rewire with
# connected nodes that doesn't involve (n_x, n_y)
index = (cn_x != n_x) & (cn_y != n_y) & (cn_y != n_x) & (cn_x != n_y)
if len(np.where(index)[0]) == 0:
cn_x = np.delete(cn_x, r)
cn_y = np.delete(cn_y, r)
else:
ops1_x, ops1_y = cn_x[index], cn_y[index]
# options that will preserve the distances
# (ops1_x, ops1_y) such that
# L(n_x,n_y) = L(n_x, ops1_x) & L(ops1_x,ops1_y) = L(n_y, ops1_y)
index = (L[n_x, n_y] == L[n_x, ops1_x]) & (
L[ops1_x, ops1_y] == L[n_y, ops1_y])
if len(np.where(index)[0]) == 0:
cn_x = np.delete(cn_x, r)
cn_y = np.delete(cn_y, r)
else:
ops2_x, ops2_y = ops1_x[index], ops1_y[index]
# options of edges that didn't exist before
index = [(newB[min(n_x, ops2_x[i])][max(n_x, ops2_x[i])] == 0)
& (newB[min(n_y, ops2_y[i])][max(n_y,
ops2_y[i])] == 0)
for i in range(len(ops2_x))]
if (len(np.where(index)[0]) == 0):
cn_x = np.delete(cn_x, r)
cn_y = np.delete(cn_y, r)
else:
ops3_x, ops3_y = ops2_x[index], ops2_y[index]
# choose randomly one edge from the final options
r1 = rs.randint(len(ops3_x))
nn_x, nn_y = ops3_x[r1], ops3_y[r1]
# Disconnect the existing edges
newB[n_x, n_y] = 0
newB[nn_x, nn_y] = 0
# Connect the new edges
newB[min(n_x, nn_x), max(n_x, nn_x)] = 1
newB[min(n_y, nn_y), max(n_y, nn_y)] = 1
# one successfull rewire!
nr += 1
# rewire with replacement
if replacement:
cn_x[r], cn_y[r] = min(n_x, nn_x), max(n_x, nn_x)
index = np.where((cn_x == nn_x) & (cn_y == nn_y))[0]
cn_x[index], cn_y[index] = min(
n_y, nn_y), max(n_y, nn_y)
# rewire without replacement
else:
cn_x = np.delete(cn_x, r)
cn_y = np.delete(cn_y, r)
index = np.where((cn_x == nn_x) & (cn_y == nn_y))[0]
cn_x = np.delete(cn_x, index)
cn_y = np.delete(cn_y, index)
if nr < nswap:
print(f"I didn't finish, out of rewirable edges: {len(cn_x)}")
i, j = np.triu_indices(N, k=1)
# Make the connectivity matrix symmetric
newB[j, i] = newB[i, j]
# check the number of edges is preserved
if len(np.where(B != 0)[0]) != len(np.where(newB != 0)[0]):
print(
f"ERROR --- number of edges changed, \
B:{len(np.where(B!=0)[0])}, newB:{len(np.where(newB!=0)[0])}")
# check that the degree of the nodes it's the same
for i in range(N):
if np.sum(B[i]) != np.sum(newB[i]):
print(
f"ERROR --- node {i} changed k by: \
{np.sum(B[i]) - np.sum(newB[i])}")
newW = np.zeros((N, N))
if weighted:
# Reassign the weights
mask = np.triu(B != 0, k=1)
inids = D[mask]
iniws = W[mask]
inids_index = np.argsort(inids)
# Weights from the shortest to largest edges
iniws = iniws[inids_index]
mask = np.triu(newB != 0, k=1)
finds = D[mask]
i, j = np.where(mask)
# Sort the new edges from the shortest to the largest
finds_index = np.argsort(finds)
i_sort = i[finds_index]
j_sort = j[finds_index]
# Assign the initial sorted weights
newW[i_sort, j_sort] = iniws
# Make it symmetrical
newW[j_sort, i_sort] = iniws
return newB, newW, nr
[docs]def randmio_und(W, itr):
"""
Optimized version of randmio_und.
This function randomizes an undirected network, while preserving the
degree distribution. The function does not preserve the strength
distribution in weighted networks.
This function is significantly faster if numba is enabled, because
the main overhead is `np.random.randint`, see `here <https://stackoverflow.com/questions/58124646/why-in-python-is-random-randint-so-much-slower-than-random-random>`_
Parameters
----------
W : (N, N) array-like
Undirected binary/weighted connection matrix
itr : int
rewiring parameter. Each edge is rewired approximately itr times.
Returns
-------
W : (N, N) array-like
Randomized network
eff : int
number of actual rewirings carried out
""" # noqa: E501
W = W.copy()
n = len(W)
i, j = np.where(np.triu(W > 0, 1))
k = len(i)
itr *= k
# maximum number of rewiring attempts per iteration
max_attempts = np.round(n * k / (n * (n - 1)))
# actual number of successful rewirings
eff = 0
for _ in range(int(itr)):
att = 0
while att <= max_attempts: # while not rewired
while True:
e1, e2 = np.random.randint(k), np.random.randint(k)
while e1 == e2:
e2 = np.random.randint(k)
a, b = i[e1], j[e1]
c, d = i[e2], j[e2]
if a != c and a != d and b != c and b != d:
break # all 4 vertices must be different
# flip edge c-d with 50% probability
# to explore all potential rewirings
if np.random.random() > .5:
i[e2], j[e2] = d, c
c, d = d, c
# rewiring condition
# not flipped
# a--b a b
# TO X
# c--d c d
# if flipped
# a--b a--b a b
# TO TO X
# c--d d--c d c
if not (W[a, d] or W[c, b]):
W[a, d] = W[a, b]
W[a, b] = 0
W[d, a] = W[b, a]
W[b, a] = 0
W[c, b] = W[c, d]
W[c, d] = 0
W[b, c] = W[d, c]
W[d, c] = 0
j[e1] = d
j[e2] = b # reassign edge indices
eff += 1
break
att += 1
return W, eff
if use_numba:
randmio_und = njit(randmio_und)
[docs]def strength_preserving_rand_sa(A, rewiring_iter=10,
nstage=100, niter=10000,
temp=1000, frac=0.5,
energy_type='sse', energy_func=None,
R=None, connected=None,
verbose=False, seed=None):
"""
Strength-preserving network randomization using simulated annealing.
Randomize an undirected weighted network, while preserving
the degree and strength sequences using simulated annealing.
This function allows for a flexible choice of energy function.
Parameters
----------
A : (N, N) array-like
Undirected weighted connectivity matrix
rewiring_iter : int, optional
Rewiring parameter. Default = 10.
Each edge is rewired approximately rewiring_iter times.
nstage : int, optional
Number of annealing stages. Default = 100.
niter : int, optional
Number of iterations per stage. Default = 10000.
temp : float, optional
Initial temperature. Default = 1000.
frac : float, optional
Fractional decrease in temperature per stage. Default = 0.5.
energy_type: str, optional
Energy function to minimize. Can be either:
'sse': Sum of squared errors between strength sequence vectors
of the original network and the randomized network
'max': Maximum absolute error
'mae': Mean absolute error
'mse': Mean squared error
'rmse': Root mean squared error
Default = 'sse'.
energy_func: callable, optional
Callable with two positional arguments corresponding to
two strength sequence numpy arrays that returns an energy value.
Overwrites “energy_type”.
See “energy_type” for specifying a predefined energy type instead.
R : (N, N) array-like, optional
Pre-randomized connectivity matrix.
If None, a rewired connectivity matrix is generated using the
Maslov & Sneppen algorithm.
Default = None.
connected: bool, optional
Whether to ensure connectedness of the randomized network.
By default, this is inferred from data.
verbose: bool, optional
Whether to print status to screen at the end of every stage.
Default = False.
seed: float, optional
Random seed. Default = None.
Returns
-------
B : (N, N) array-like
Randomized connectivity matrix
min_energy : float
Minimum energy obtained by annealing
Notes
-----
Uses Maslov & Sneppen rewiring model to produce a
surrogate connectivity matrix, B, with the same
size, density, and degree sequence as A.
The weights are then permuted to optimize the
match between the strength sequences of A and B
using simulated annealing.
This function is adapted from a function written in MATLAB
by Richard Betzel.
References
----------
Misic, B. et al. (2015) Cooperative and Competitive Spreading Dynamics
on the Human Connectome. Neuron.
Milisav, F. et al. (2024) A simulated annealing algorithm for
randomizing weighted networks.
"""
try:
A = np.asarray(A)
except TypeError as err:
msg = ('A must be array_like. Received: {}.'.format(type(A)))
raise TypeError(msg) from err
if frac > 1 or frac <= 0:
msg = ('frac must be between 0 and 1. '
'Received: {}.'.format(frac))
raise ValueError(msg)
rs = check_random_state(seed)
n = A.shape[0]
s = np.sum(A, axis=1) #strengths of A
#Maslov & Sneppen rewiring
if R is None:
#ensuring connectedness if the original network is connected
if connected is None:
connected = False if bct.number_of_components(A) > 1 else True
if connected:
B = bct.randmio_und_connected(A, rewiring_iter, seed=seed)[0]
else:
B = bct.randmio_und(A, rewiring_iter, seed=seed)[0]
else:
B = R.copy()
u, v = np.triu(B, k=1).nonzero() #upper triangle indices
wts = np.triu(B, k=1)[(u, v)] #upper triangle values
m = len(wts)
sb = np.sum(B, axis=1) #strengths of B
if energy_func is not None:
energy = energy_func(s, sb)
elif energy_type == 'sse':
energy = np.sum((s - sb)**2)
elif energy_type == 'max':
energy = np.max(np.abs(s - sb))
elif energy_type == 'mae':
energy = np.mean(np.abs(s - sb))
elif energy_type == 'mse':
energy = np.mean((s - sb)**2)
elif energy_type == 'rmse':
energy = np.sqrt(np.mean((s - sb)**2))
else:
msg = ("energy_type must be one of 'sse', 'max', "
"'mae', 'mse', or 'rmse'. Received: {}.".format(energy_type))
raise ValueError(msg)
energymin = energy
wtsmin = wts.copy()
if verbose:
print('\ninitial energy {:.5f}'.format(energy))
for istage in tqdm(range(nstage), desc='annealing progress'):
naccept = 0
for _ in range(niter):
#permutation
e1 = rs.randint(m)
e2 = rs.randint(m)
a, b = u[e1], v[e1]
c, d = u[e2], v[e2]
sb_prime = sb.copy()
sb_prime[[a, b]] = sb_prime[[a, b]] - wts[e1] + wts[e2]
sb_prime[[c, d]] = sb_prime[[c, d]] + wts[e1] - wts[e2]
if energy_func is not None:
energy_prime = energy_func(sb_prime, s)
elif energy_type == 'sse':
energy_prime = np.sum((sb_prime - s)**2)
elif energy_type == 'max':
energy_prime = np.max(np.abs(sb_prime - s))
elif energy_type == 'mae':
energy_prime = np.mean(np.abs(sb_prime - s))
elif energy_type == 'mse':
energy_prime = np.mean((sb_prime - s)**2)
elif energy_type == 'rmse':
energy_prime = np.sqrt(np.mean((sb_prime - s)**2))
else:
msg = ("energy_type must be one of 'sse', 'max', "
"'mae', 'mse', or 'rmse'. "
"Received: {}.".format(energy_type))
raise ValueError(msg)
#permutation acceptance criterion
if (energy_prime < energy or
rs.rand() < np.exp(-(energy_prime - energy)/temp)):
sb = sb_prime.copy()
wts[[e1, e2]] = wts[[e2, e1]]
energy = energy_prime
if energy < energymin:
energymin = energy
wtsmin = wts.copy()
naccept = naccept + 1
#temperature update
temp = temp*frac
if verbose:
print('\nstage {:d}, temp {:.5f}, best energy {:.5f}, '
'frac of accepted moves {:.3f}'.format(istage, temp,
energymin,
naccept/niter))
B = np.zeros((n, n))
B[(u, v)] = wtsmin
B = B + B.T
return B, energymin
[docs]def strength_preserving_rand_sa_mse_opt(A, rewiring_iter=10,
nstage=100, niter=10000,
temp=1000, frac=0.5,
R=None, connected=None,
verbose=False, seed=None):
"""
Strength-preserving network randomization using simulated annealing.
Randomize an undirected weighted network, while preserving
the degree and strength sequences using simulated annealing.
This function has been optimized for speed but only allows the
mean squared error energy function.
Parameters
----------
A : (N, N) array-like
Undirected weighted connectivity matrix
rewiring_iter : int, optional
Rewiring parameter. Default = 10.
Each edge is rewired approximately rewiring_iter times.
nstage : int, optional
Number of annealing stages. Default = 100.
niter : int, optional
Number of iterations per stage. Default = 10000.
temp : float, optional
Initial temperature. Default = 1000.
frac : float, optional
Fractional decrease in temperature per stage. Default = 0.5.
R : (N, N) array-like, optional
Pre-randomized connectivity matrix.
If None, a rewired connectivity matrix is generated using the
Maslov & Sneppen algorithm.
Default = None.
connected: bool, optional
Whether to ensure connectedness of the randomized network.
By default, this is inferred from data.
verbose: bool, optional
Whether to print status to screen at the end of every stage.
Default = False.
seed: float, optional
Random seed. Default = None.
Returns
-------
B : (N, N) array-like
Randomized connectivity matrix
min_energy : float
Minimum energy obtained by annealing
Notes
-----
Uses Maslov & Sneppen rewiring model to produce a
surrogate connectivity matrix, B, with the same
size, density, and degree sequence as A.
The weights are then permuted to optimize the
match between the strength sequences of A and B
using simulated annealing.
This function is adapted from a function written in MATLAB
by Richard Betzel and was optimized by Vincent Bazinet.
References
----------
Misic, B. et al. (2015) Cooperative and Competitive Spreading Dynamics
on the Human Connectome. Neuron.
Milisav, F. et al. (2024) A simulated annealing algorithm for
randomizing weighted networks.
"""
try:
A = np.asarray(A)
except TypeError as err:
msg = ('A must be array_like. Received: {}.'.format(type(A)))
raise TypeError(msg) from err
if frac > 1 or frac <= 0:
msg = ('frac must be between 0 and 1. '
'Received: {}.'.format(frac))
raise ValueError(msg)
rs = check_random_state(seed)
n = A.shape[0]
s = np.sum(A, axis=1) #strengths of A
#Maslov & Sneppen rewiring
if R is None:
#ensuring connectedness if the original network is connected
if connected is None:
connected = False if bct.number_of_components(A) > 1 else True
if connected:
B = bct.randmio_und_connected(A, rewiring_iter, seed=seed)[0]
else:
B = bct.randmio_und(A, rewiring_iter, seed=seed)[0]
else:
B = R.copy()
u, v = np.triu(B, k=1).nonzero() #upper triangle indices
wts = np.triu(B, k=1)[(u, v)] #upper triangle values
m = len(wts)
sb = np.sum(B, axis=1) #strengths of B
energy = np.mean((s - sb)**2)
energymin = energy
wtsmin = wts.copy()
if verbose:
print('\ninitial energy {:.5f}'.format(energy))
for istage in tqdm(range(nstage), desc='annealing progress'):
naccept = 0
for (e1, e2), prob in zip(rs.randint(m, size=(niter, 2)),
rs.rand(niter)
):
#permutation
a, b, c, d = u[e1], v[e1], u[e2], v[e2]
wts_change = wts[e1] - wts[e2]
delta_energy = (2 * wts_change *
(2 * wts_change +
(s[a] - sb[a]) +
(s[b] - sb[b]) -
(s[c] - sb[c]) -
(s[d] - sb[d])
)
)/n
#permutation acceptance criterion
if (delta_energy < 0 or prob < np.e**(-(delta_energy)/temp)):
sb[[a, b]] -= wts_change
sb[[c, d]] += wts_change
wts[[e1, e2]] = wts[[e2, e1]]
energy = np.mean((sb - s)**2)
if energy < energymin:
energymin = energy
wtsmin = wts.copy()
naccept = naccept + 1
#temperature update
temp = temp*frac
if verbose:
print('\nstage {:d}, temp {:.5f}, best energy {:.5f}, '
'frac of accepted moves {:.3f}'.format(istage, temp,
energymin,
naccept/niter))
B = np.zeros((n, n))
B[(u, v)] = wtsmin
B = B + B.T
return B, energymin
[docs]def strength_preserving_rand_sa_dir(A, rewiring_iter=10,
nstage=100, niter=10000,
temp=1000, frac=0.5,
energy_type='sse', energy_func=None,
connected=True, verbose=False,
seed=None):
"""
Strength-preserving network randomization using simulated annealing.
Randomize a directed weighted network, while preserving
the in- and out-degree and strength sequences using simulated annealing.
Parameters
----------
A : (N, N) array-like
Directed weighted connectivity matrix
rewiring_iter : int, optional
Rewiring parameter. Default = 10.
Each edge is rewired approximately rewiring_iter times.
nstage : int, optional
Number of annealing stages. Default = 100.
niter : int, optional
Number of iterations per stage. Default = 10000.
temp : float, optional
Initial temperature. Default = 1000.
frac : float, optional
Fractional decrease in temperature per stage. Default = 0.5.
energy_type: str, optional
Energy function to minimize. Can be either:
'sse': Sum of squared errors between strength sequence vectors
of the original network and the randomized network
'max': Maximum absolute error
'mae': Mean absolute error
'mse': Mean squared error
'rmse': Root mean squared error
Default = 'sse'.
energy_func: callable, optional
Callable with two positional arguments corresponding to
two strength sequence numpy arrays that returns an energy value.
Overwrites “energy_type”.
See “energy_type” for specifying a predefined energy type instead.
connected: bool, optional
Whether to ensure connectedness of the randomized network.
Default = True.
verbose: bool, optional
Whether to print status to screen at the end of every stage.
Default = False.
seed: float, optional
Random seed. Default = None.
Returns
-------
B : (N, N) array-like
Randomized connectivity matrix
min_energy : float
Minimum energy obtained by annealing
Notes
-----
Uses Maslov & Sneppen rewiring model to produce a
surrogate connectivity matrix, B, with the same
size, density, and in- and out-degree sequences as A.
The weights are then permuted to optimize the
match between the strength sequences of A and B
using simulated annealing.
Both in- and out-strengths are preserved.
This function is adapted from a function written in MATLAB
by Richard Betzel.
References
----------
Misic, B. et al. (2015) Cooperative and Competitive Spreading Dynamics
on the Human Connectome. Neuron.
Rubinov, M. (2016) Constraints and spandrels of interareal connectomes.
Nature Communications.
Milisav, F. et al. (2024) A simulated annealing algorithm for
randomizing weighted networks.
"""
try:
A = np.asarray(A)
except TypeError as err:
msg = ('A must be array_like. Received: {}.'.format(type(A)))
raise TypeError(msg) from err
if frac > 1 or frac <= 0:
msg = ('frac must be between 0 and 1. '
'Received: {}.'.format(frac))
raise ValueError(msg)
rs = check_random_state(seed)
n = A.shape[0]
s_in = np.sum(A, axis=0) #in-strengths of A
s_out = np.sum(A, axis=1) #out-strengths of A
#Maslov & Sneppen rewiring
if connected:
B = bct.randmio_dir_connected(A, rewiring_iter, seed=seed)[0]
else:
B = bct.randmio_dir(A, rewiring_iter, seed=seed)[0]
u, v = B.nonzero() #nonzero indices of B
wts = B[(u, v)] #nonzero values of B
m = len(wts)
sb_in = np.sum(B, axis=0) #in-strengths of B
sb_out = np.sum(B, axis=1) #out-strengths of B
if energy_func is not None:
energy = energy_func(s_in, sb_in) + energy_func(s_out, sb_out)
elif energy_type == 'sse':
energy = np.sum((s_in - sb_in)**2) + np.sum((s_out - sb_out)**2)
elif energy_type == 'max':
energy = np.max(np.abs(s_in - sb_in)) + np.max(np.abs(s_out - sb_out))
elif energy_type == 'mae':
energy= np.mean(np.abs(s_in - sb_in)) + np.mean(np.abs(s_out - sb_out))
elif energy_type == 'mse':
energy = np.mean((s_in - sb_in)**2) + np.mean((s_out - sb_out)**2)
elif energy_type == 'rmse':
energy = (np.sqrt(np.mean((s_in - sb_in)**2)) +
np.sqrt(np.mean((s_out - sb_out)**2)))
else:
msg = ("energy_type must be one of 'sse', 'max', "
"'mae', 'mse', or 'rmse'. Received: {}.".format(energy_type))
raise ValueError(msg)
energymin = energy
wtsmin = wts.copy()
if verbose:
print('\ninitial energy {:.5f}'.format(energy))
for istage in tqdm(range(nstage), desc='annealing progress'):
naccept = 0
for _ in range(niter):
#permutation
e1 = rs.randint(m)
e2 = rs.randint(m)
a, b = u[e1], v[e1]
c, d = u[e2], v[e2]
sb_prime_in = sb_in.copy()
sb_prime_out = sb_out.copy()
sb_prime_in[b] = sb_prime_in[b] - wts[e1] + wts[e2]
sb_prime_out[a] = sb_prime_out[a] - wts[e1] + wts[e2]
sb_prime_in[d] = sb_prime_in[d] - wts[e2] + wts[e1]
sb_prime_out[c] = sb_prime_out[c] - wts[e2] + wts[e1]
if energy_func is not None:
energy_prime = (energy_func(sb_prime_in, s_in) +
energy_func(sb_prime_out, s_out))
elif energy_type == 'sse':
energy_prime = (np.sum((sb_prime_in - s_in)**2) +
np.sum((sb_prime_out - s_out)**2))
elif energy_type == 'max':
energy_prime = (np.max(np.abs(sb_prime_in - s_in)) +
np.max(np.abs(sb_prime_out - s_out)))
elif energy_type == 'mae':
energy_prime = (np.mean(np.abs(sb_prime_in - s_in)) +
np.mean(np.abs(sb_prime_out - s_out)))
elif energy_type == 'mse':
energy_prime = (np.mean((sb_prime_in - s_in)**2) +
np.mean((sb_prime_out - s_out)**2))
elif energy_type == 'rmse':
energy_prime = (np.sqrt(np.mean((sb_prime_in - s_in)**2)) +
np.sqrt(np.mean((sb_prime_out - s_out)**2)))
else:
msg = ("energy_type must be one of 'sse', 'max', "
"'mae', 'mse', or 'rmse'. "
"Received: {}.".format(energy_type))
raise ValueError(msg)
#permutation acceptance criterion
if (energy_prime < energy or
rs.rand() < np.exp(-(energy_prime - energy)/temp)):
sb_in = sb_prime_in.copy()
sb_out = sb_prime_out.copy()
wts[[e1, e2]] = wts[[e2, e1]]
energy = energy_prime
if energy < energymin:
energymin = energy
wtsmin = wts.copy()
naccept = naccept + 1
#temperature update
temp = temp*frac
if verbose:
print('\nstage {:d}, temp {:.5f}, best energy {:.5f}, '
'frac of accepted moves {:.3f}'.format(istage, temp,
energymin,
naccept/niter))
B = np.zeros((n, n))
B[(u, v)] = wtsmin
return B, energymin