Source code for asdf.stream

from .tags.core import ndarray


[docs]class Stream(ndarray.NDArrayType): """ Used to put a streamed array into the tree. Examples -------- Save a double-precision array with 1024 columns, one row at a time:: >>> from asdf import AsdfFile, Stream >>> import numpy as np >>> ff = AsdfFile() >>> ff.tree['streamed'] = Stream([1024], np.float64) >>> with open('test.asdf', 'wb') as fd: ... ff.write_to(fd) ... for i in range(200): ... nbytes = fd.write( ... np.array([i] * 1024, np.float64).tobytes()) """ name = None types = [] def __init__(self, shape, dtype, strides=None): self._shape = shape self._datatype, self._byteorder = ndarray.numpy_dtype_to_asdf_datatype(dtype) self._strides = strides self._array = None def _make_array(self): self._array = None
[docs] @classmethod def reserve_blocks(cls, data, ctx): if isinstance(data, Stream): yield ctx.blocks.get_streamed_block()
[docs] @classmethod def from_tree(cls, data, ctx): return ndarray.NDArrayType.from_tree(data, ctx)
[docs] @classmethod def to_tree(cls, data, ctx): ctx.blocks.get_streamed_block() result = {} result["source"] = -1 result["shape"] = ["*"] + data._shape result["datatype"] = data._datatype result["byteorder"] = data._byteorder if data._strides is not None: result["strides"] = data._strides return result
def __repr__(self): return f"Stream({self._shape}, {self._datatype}, strides={self._strides})" def __str__(self): return str(self.__repr__())