Source code for networkx_temporal.readwrite.reader

import os.path as osp
import zipfile
from io import BufferedReader, BytesIO, StringIO, TextIOWrapper
from typing import Callable, Optional, Union

import networkx as nx

from .readwrite import _get_filepath, _get_filename, _get_format, _get_function
from ..transform import from_snapshots, from_static
from ..typing import TemporalGraph


[docs] def read_graph( file: Union[str, BufferedReader, BytesIO], format: Optional[Union[str, Callable]] = None, **kwargs, ) -> TemporalGraph: """ Returns a :class:`~networkx_temporal.classes.TemporalGraph` from file or binary object. If ``file`` is a compressed `ZipFile <https://docs.python.org/3/library/zipfile.html#zipfile.ZipFile>`__, it must contain one or more graphs named ``{name}_{t}.{ext}``, where ``t`` is their snapshot index and ``ext`` is the extension format. See: :func:`~networkx_temporal.readwrite.write_graph`. .. rubric:: Example Reading a temporal graph from a compressed file: .. code-block:: python >>> import networkx_temporal as tx >>> >>> TG = tx.read_graph("snapshots.graphml.zip") .. seealso:: The latest `read and write documentation <https://networkx.org/documentation/stable/reference/io/index.html>`__ from NetworkX for a list of supported formats. :param object file: Binary file-like object or string containing path to ZIP file. :param format: Extension format or callable function to read compressed graphs with. If unset, it is inferred from their file extension. :param kwargs: Additional arguments to pass to NetworkX reader function. """ def read(file, format, **kwargs): path = _get_filepath(file) frmt = _get_format(path, format) func = _get_function(frmt, "read") if frmt is None: raise ValueError("Missing extension format to read graph in file name or `format`.") if not (type(frmt) == str or callable(frmt)): raise TypeError(f"Argument `format` expects a string or callable, received: {type(frmt)}.") if func is None: raise ValueError(f"Extension '{frmt}' is not supported by NetworkX. Supported formats: " f"{[f[5:] for f in dir(nx) if f.startswith('read_')]}") return func(file, **kwargs) path = _get_filepath(file) name = _get_filename(path) if type(file) == str and osp.isdir(file): raise FileNotFoundError("Argument `file` must be a file path or object, not a directory.") if type(file) == TextIOWrapper: raise TypeError( f"File must be opened in binary mode, " f"received {type(file)} but expected {BufferedWriter}." ) if type(file) == StringIO: raise TypeError( f"Buffer must be binary, " f"received {type(file)} but expected {BytesIO}." ) if type(file) == bytes: file = BytesIO(file) if zipfile.is_zipfile(file): with zipfile.ZipFile(file, "r") as zf: if len(zf.namelist()) == 0: raise ValueError("ZIP file is empty.") if len(zf.namelist()) == 1: with zf.open(zf.namelist()[0]) as f: TG = from_static(read(f, format, **kwargs)) else: if not all(osp.splitext(z)[0].rsplit("_", 1)[-1].isdigit() for z in zf.namelist()): raise ValueError( "Snapshots in ZIP file must contain an index 't' in their filename, " "such as '{name}_{t}.{ext}'." ) TG = from_snapshots([ read(zf.open(z), format, **kwargs) for z in sorted(zf.namelist(), key=lambda x: int(osp.splitext(x)[0].rsplit("_", 1)[-1])) ]) else: TG = from_static(read(file, format, **kwargs)) TG.name = name return TG