from typing import List, Optional, Union
import numpy as np
[docs]
def generate_block_matrix(
k: int,
p: Optional[float] = None,
q: Optional[float] = None,
) -> np.ndarray:
""" Generates a block matrix.
A square matrix of size :math:`k \\times k` is generated where diagonal elements of the block
matrix represent the probabilities of edges between nodes in the same community, while
off-diagonal elements represent the probabilities of edges between nodes in different
communities, i.e.,
.. math::
\\mathbf{B} = \\begin{pmatrix}
b_{11} & b_{12} & \\cdots & b_{1k} \\\\
b_{21} & b_{22} & \\cdots & b_{2k} \\\\
\\vdots & \\vdots & \\ddots & \\vdots \\\\
b_{k1} & b_{k2} & \\cdots & b_{kk}
\\end{pmatrix},\\,
b_{ij} =
\\begin{cases}
p & \\text{if} \\, i = j \\\\
q & \\text{otherwise}.
\\end{cases}
In this function, parameters :math:`p` and :math:`q` are used to set the diagonal and
off-diagonal elements, respectively. If both are unset, probabilities are uniformly
distributed, i.e., :math:`p = q = 1 / k`. If only one is unset, the other is set to
:math:`p = 1 - q \\, (k - 1)` or :math:`q = (1 - p) / (k - 1)`, respectively.
:param k: Number of communities in the network.
:param p: Edge probability among nodes in the same community.
:param q: Edge probability among nodes in different communities.
"""
if not (type(k) == int and k > 1):
raise ValueError("Communities `k` must be an integer larger than 1.")
if type(p) == float and not 0 <= p <= 1:
raise ValueError("Diagonal `p` must be a float or list of floats between 0 and 1.")
if type(q) == float and not 0 <= q <= 1:
raise ValueError("Off-diagonal `q` must be a float or list of floats between 0 and 1.")
if type(p) == list:
if len(p) != k:
raise ValueError("If `p` is a list, its length must match number of communities `k`.")
p = np.array(p)
if type(q) == list:
if len(q) != k:
raise ValueError("If `q` is a list, its length must match number of communities `k`.")
q = np.array(q)
if p is None and q is None:
p = q = 1 / k
elif p is None:
p = 1 - (q * (k - 1))
elif q is None:
q = (1 - p) / (k - 1)
B = np.eye(k, k) * p + (np.ones((k, k)) - np.eye(k, k)) * q
return B
[docs]
def generate_degree_vector(
nodes: Union[int, list],
min_degree: Optional[int] = None,
max_degree: Optional[int] = None,
alpha: Optional[float] = None,
phi: Optional[float] = None,
shuffle: Optional[bool] = True,
seed: Optional[int] = None,
) -> np.ndarray:
""" Generates a node degree vector.
The degree vector is a list of integers with the degrees of each node in the graph, which may
be provided to the :func:`~networkx_temporal.generators.dynamic_sbm` generator to create
graphs with expected degree distributions.
By default, if ``alpha`` and ``phi`` are unset, degrees are generated by sampling from a uniform
distribution between :math:`d_{min}` and :math:`d_{max}`, that is,
:math:`\\mathbf{d} = \\big[ d_{1}, d_{2}, \\cdots, d_{n} \\, | \\, d_{min} \\leq d \\leq d_{max} \\big]`.
.. rubric:: Gaussian distribution
If ``phi`` is set, the degree vector is sampled from a Gaussian (normal) distribution:
.. math::
\\mathbf{d} = \\big[
d_{1}, \\cdots, d_{n} \\,| \\, d \\sim
\\mathcal{N} \\big( \\mu = \\frac{d_{min} + d_{max}}{2}, \\,
\\sigma = \\frac{d_{max} - d_{min}}{\\varphi} \\big), \\,
d_{min} \\leq d \\leq d_{max}
\\big],
where :math:`\\varphi \\gt 0` is a standard deviation factor that controls how it is spread
around the mean :math:`\\mu`.
.. rubric:: Zipf distribution
If ``alpha`` is set, the degree vector is sampled from a Zipf (power-law)
distribution:
.. math::
\\mathbf{d} = \\big[
d_{1}, \\cdots, d_{n} \\, | \\,
p(d) = \\frac{d^{-(\\alpha + 1)}}{\\sum_{d_{min}}^{d_{max}} d^{-(\\alpha + 1)}}, \\,
d_{min} \\leq d \\leq d_{max}
\\big],\\\\
where :math:`\\alpha \\gt 1` is the exponent of the power-law distribution that controls how
heavy-tailed it is.
.. rubric:: Example
Comparison of degree distributions generated with this function:
.. code-block:: python
>>> import matplotlib.pyplot as plt
>>> import networkx_temporal as tx
>>>
>>> d1 = tx.generators.generate_degree_vector(100, max_degree=20, seed=0)
>>> d2 = tx.generators.generate_degree_vector(100, max_degree=20, phi=6, seed=0)
>>> d3 = tx.generators.generate_degree_vector(100, max_degree=20, alpha=2, seed=0)
>>>
>>> fig = plt.figure(figsize=(6, 4))
>>> plt.hist(d1, bins=20, alpha=0.5, label='Uniform distribution')
>>> plt.hist(d2, bins=20, alpha=0.5, label='Gaussian (normal) $\\phi=6$')
>>> plt.hist(d3, bins=20, alpha=0.5, label='Zipf (power-law) $\\alpha=2$')
>>> plt.title("Node degree distributions ($n=100, deg_{max}=20$)")
>>> plt.grid("#dddddd", linestyle='--', linewidth=0.5)
>>> plt.legend()
>>> plt.show()
.. image:: ../../assets/figure/generators/generate_degree_vector.png
:align: center
:param nodes: An ``int`` with the number of nodes in the network
or a ``list`` with the number of nodes in each community (their sizes).
:param min_degree: Minimum node degree. Defaults to ``1``.
:param max_degree: Maximum node degree. Defaults to ``nodes - 1``.
:param alpha: Exponent of the power-law degree distribution.
:param phi: Standard deviation factor of the normal distribution.
:param shuffle: If ``True``, vector elements are shuffled. Default.
:param seed: Random seed for reproducibility. Optional.
"""
if type(nodes) == int:
nodes = [nodes]
if type(nodes) != list:
raise ValueError("Argument `nodes` must be a positive integer or a list of integers.")
if min_degree is not None and min_degree < 0:
raise ValueError("Argument `min_degree` must be a non-negative value.")
if max_degree is not None and max_degree < (min_degree or 0):
raise ValueError("Argument `max_degree` must be a non-negative value >= `min_degree`.")
if alpha is not None and phi is not None:
raise ValueError("Arguments `alpha` and `phi` are mutually exclusive.")
if alpha is not None and alpha <= 0:
raise ValueError("Argument `alpha` must be a positive integer or float.")
if shuffle is not None and type(shuffle) != bool:
raise ValueError("Argument `shuffle` must be a boolean.")
if seed is not None and not (type(seed) == int):
raise ValueError("Argument `seed` must be an integer.")
degree_vector = np.zeros(sum(nodes), dtype=int)
d_min = 1 if min_degree is None else min_degree
d_max = (sum(nodes) - 1) if max_degree is None else max_degree
indx = 0
for i, num in enumerate(nodes):
if seed is not None:
np.random.seed(seed + i)
if alpha:
# Sample from a power-law (Zipf) distribution.
d = np.random.zipf(alpha, num)
d = np.clip(d, d_min, d_max)
d = np.array(d, dtype=int)
elif phi:
# Sample from a normal distribution.
mean = (d_min + d_max) / 2
std_dev = (d_max - d_min) / phi
d = np.random.normal(mean, std_dev, num)
d = np.clip(d, d_min, d_max).astype(int)
else:
# Sample from a uniform distribution.
d = np.array([np.random.randint(d_min, d_max + 1) for _ in range(num)])
# Degree vectors do not return sorted.
if shuffle is False:
d = np.sort(d)
degree_vector[indx:indx+num] = d
indx += num
return degree_vector
[docs]
def generate_transition_matrix(
k: int,
eta: Optional[Union[float, List[float]]] = None,
uniform_all: Optional[bool] = False,
) -> np.ndarray:
""" Generates a transition matrix.
The transition matrix is a square and symmetrical matrix of size :math:`k \\times k` that
describes the probabilities of nodes transitioning communities. The probability of a node
remaining in its community is given by :math:`\\eta` = ``eta``, and that of transitioning
to another community by :math:`1 - \\eta`, i.e.,
.. math::
\\boldsymbol{\\tau} =
\\begin{pmatrix}
\\tau_{11} & \\tau_{12} & \\cdots & \\tau_{1k} \\\\
\\tau_{21} & \\tau_{22} & \\cdots & \\tau_{2k} \\\\
\\vdots & \\vdots & \\ddots & \\vdots \\\\
\\tau_{k1} & \\tau_{k2} & \\cdots & \\tau_{kk}
\\end{pmatrix},\\,
\\tau_{ij} =
\\begin{cases}
\\eta & \\text{if} \\, i = j \\\\
\\frac{1 - \\eta}{k - 1} & \\text{otherwise},
\\end{cases}
where
:math:`\\boldsymbol{\\tau}` is the transition matrix,
:math:`\\eta \\in [0, 1]` is a parameter,
and :math:`k` is the number of communities,
so larger values of :math:`\\eta` increase the probability of nodes remaining in their current
community.
If ``eta`` is unset, it defaults to :math:`\\eta = 1/k`. A list of probabilities may also be
provided to define non-uniform transition probabilities for each community, e.g.,
``eta = [0.8, 0.6, 0.9]`` for ``k=3``.
.. note::
If ``uniform_all=True``, uniform-at-random probabilities follow the original paper [1]_
and include the node's current community in the distribution, i.e.,
:math:`\\tau_{ij} = (1 - \\eta)/k + \\eta \\, \\delta(i,j)`,
so nodes always have a non-zero probability of remaining in their current community
even if :math:`\\eta = 0`.
.. rubric:: Example
.. code-block:: python
>>> import networkx_temporal as tx
>>>
>>> # Generate a transition matrix for 3 communities with varying stability rates:
>>> tau = tx.generators.generate_transition_matrix(k=3, eta=[.5, .6, .8])
>>> print(tau)
array([[0.5, 0.25, 0.25],
[0.2, 0.6 , 0.2 ],
[0.1, 0.1, 0.8 ]])
:param k: Number of communities in the network.
:param eta: Probability of nodes remaining in their current community. Accepts a float
or a list of floats of size :math:`k`.
:param uniform_all: If ``True``, nodes remain in their current communities with
an added probability :math:`(1 - \\eta)/k`. Default is ``False``.
"""
if eta is None:
eta = 1 / k
eta = np.array(eta)
off_diagonal = (1 - eta) / (k - 1)
if uniform_all:
uniform = (1 - eta) / k
eta += uniform
off_diagonal -= uniform / (k-1)
eta = np.array(eta)
tau = generate_block_matrix(k, p=eta, q=off_diagonal)
if eta.ndim > 0:
tau = tau.T
return tau
[docs]
def transition_node_memberships(
communities: Union[List[int], List[List[float]]],
transition_matrix: List[float],
seed: Optional[int] = None,
) -> np.ndarray:
""" Simulates node-level community transitions.
The function simulates the transition of nodes between communities at time :math:`t+1`
based on a :math:`k \\times k` transition matrix :math:`\\boldsymbol{\\tau}` = ``tau``, where
diagonals describe the probability of a node remaining in its current community and
off-diagonals those of nodes switching to other communities.
Accepts either a vector (hard clustering) or a matrix (soft clustering) as ``communities``.
If a matrix, transition probabilities are computed as the dot product between the
community membership strengths and the transition matrix. Returns new community assignments
from transitions.
.. rubric:: Example
Transition of mixed node memberships defined by a community matrix (soft clustering):
.. code-block:: python
>>> import networkx_temporal as tx
>>>
>>> transition_matrix = [
>>> [ 1, 0, 0], # Nodes in the third community always remain in them.
>>> [.2, .2, .6], # Nodes in the second community mostly transition out.
>>> [.5, .5, 0], # Nodes in the third community always transition out.
>>> ]
>>> community_matrix = [
>>> [ 1, 0, 0], # Node in the first community.
>>> [ 0, 1, 0], # Node in the second community.
>>> [ 0, 0, 1], # Node in the third community.
>>> [.9, .1, 0], # Node mostly in the first community.
>>> [.2, .6, .2], # Node mostly in the second community.
>>> [.5, 0, .5], # Node equally in the first and third communities.
>>> ]
>>>
>>> new_memberships = tx.transition_node_memberships(
>>> community_matrix,
>>> transition_matrix=transition_matrix,
>>> seed=0,
>>> )
>>> print(new_memberships)
[0 2 1 0 1 0]
Transition of node memberships defined by a community vector (hard clustering):
.. code-block:: python
>>> # Total of 3 communities with 2 nodes each.
>>> community_vector = [0, 0, 1, 1, 2, 2]
>>>
>>> new_memberships = tx.transition_node_memberships(
>>> community_vector,
>>> transition_matrix=transition_matrix,
>>> seed=0,
>>> )
>>> print(new_memberships)
[0 0 2 2 0 1]
:param communities: Community assignments of nodes, either as a vector (hard clustering)
or a matrix (soft clustering).
:param transition_matrix: The transition probability matrix.
:param seed: Random seed for reproducibility. Optional.
"""
communities = np.array(communities)
transition_matrix = np.array(transition_matrix)
transition_matrix /= transition_matrix.sum(axis=1, keepdims=True)
num_vertices = len(communities)
num_clusters = len(transition_matrix)
set_clusters = list(range(num_clusters))
if transition_matrix.shape != (num_clusters, num_clusters):
raise ValueError(
"Transition matrix size must match the number of communities "
f"({num_clusters} x {num_clusters})."
)
if communities.ndim == 1 and max(communities)+1 > num_clusters:
raise ValueError(
"If `communities` is a vector, its maximum value must not "
f"exceed the number of communities ({num_clusters})."
)
if communities.ndim == 2 and communities.shape[1] != num_clusters:
raise ValueError(
"If `communities` is a matrix, its number of columns must match "
f"the number of communities ({num_clusters})."
)
if seed is not None:
np.random.seed(seed)
# Transition community vector (hard clustering).
if communities.ndim == 1:
return np.array([
np.random.choice(set_clusters, p=transition_matrix[communities[i]])
for i in range(num_vertices)
])
# Transition community matrix (soft clustering).
p = communities @ transition_matrix
p /= p.sum(axis=1, keepdims=True)
return np.array([
np.random.choice(set_clusters, p=p[i])
for i in range(num_vertices)
])