import os.path as osp
import zipfile
from io import BufferedReader, BytesIO, StringIO, TextIOWrapper
from typing import Callable, Optional, Union
import networkx as nx
from .readwrite import _get_filepath, _get_filename, _get_format, _get_function
from ..transform import from_snapshots, from_static
from ..typing import TemporalGraph
[docs]
def read_graph(
file: Union[str, BufferedReader, BytesIO],
format: Optional[Union[str, Callable]] = None,
**kwargs,
) -> TemporalGraph:
""" Returns a :class:`~networkx_temporal.classes.TemporalGraph` from file or binary object.
If ``file`` is a compressed
`ZipFile <https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile>`__,
it must contain one or more graphs named ``{name}_{t}.{ext}``,
where ``t`` is their snapshot index and ``ext`` is the extension format.
See: :func:`~networkx_temporal.readwrite.write_graph`.
.. rubric:: Example
Reading a temporal graph from a compressed file:
.. code-block:: python
>>> import networkx_temporal as tx
>>>
>>> TG = tx.read_graph("snapshots.graphml.zip")
.. seealso::
The latest `read and write documentation
<https://networkx.org/documentation/stable/reference/io/index.html>`__
from NetworkX for a list of supported formats.
:param object file: Binary file-like object or string containing path to ZIP file.
:param format: Extension format or callable function to read compressed graphs with. If unset,
it is inferred from their file extension.
:param kwargs: Additional arguments to pass to NetworkX reader function.
"""
def read(file, format, **kwargs):
path = _get_filepath(file)
frmt = _get_format(path, format)
func = _get_function(frmt, "read")
if frmt is None:
raise ValueError("Missing extension format to read graph in file name or `format`.")
if not (type(frmt) == str or callable(frmt)):
raise TypeError(f"Argument `format` expects a string or callable, received: {type(frmt)}.")
if func is None:
raise ValueError(f"Extension '{frmt}' is not supported by NetworkX. Supported formats: "
f"{[f[5:] for f in dir(nx) if f.startswith('read_')]}")
return func(file, **kwargs)
path = _get_filepath(file)
name = _get_filename(path)
if type(file) == str and osp.isdir(file):
raise FileNotFoundError("Argument `file` must be a file path or object, not a directory.")
if type(file) == TextIOWrapper:
raise TypeError(
f"File must be opened in binary mode, "
f"received {type(file)} but expected {BufferedWriter}."
)
if type(file) == StringIO:
raise TypeError(
f"Buffer must be binary, "
f"received {type(file)} but expected {BytesIO}."
)
if type(file) == bytes:
file = BytesIO(file)
if zipfile.is_zipfile(file):
with zipfile.ZipFile(file, "r") as zf:
if len(zf.namelist()) == 0:
raise ValueError("ZIP file is empty.")
if len(zf.namelist()) == 1:
with zf.open(zf.namelist()[0]) as f:
TG = from_static(read(f, format, **kwargs))
else:
if not all(osp.splitext(z)[0].rsplit("_", 1)[-1].isdigit() for z in zf.namelist()):
raise ValueError(
"Snapshots in ZIP file must contain an index 't' in their filename, "
"such as '{name}_{t}.{ext}'."
)
TG = from_snapshots([
read(zf.open(z), format, **kwargs)
for z in sorted(zf.namelist(), key=lambda x: int(osp.splitext(x)[0].rsplit("_", 1)[-1]))
])
else:
TG = from_static(read(file, format, **kwargs))
TG.name = name
return TG