Skip to content

VideoTensor

docarray.typing.tensor.video.video_ndarray

VideoNdArray

Bases: NdArray, VideoTensorMixin

Subclass of NdArray, to represent a video tensor. Adds video-specific features to the tensor.


from typing import Optional

import numpy as np
from pydantic import parse_obj_as

from docarray import BaseDoc
from docarray.typing import VideoNdArray, VideoUrl


class MyVideoDoc(BaseDoc):
    title: str
    url: Optional[VideoUrl] = None
    video_tensor: Optional[VideoNdArray] = None


doc_1 = MyVideoDoc(
    title='my_first_video_doc',
    video_tensor=np.random.random((100, 224, 224, 3)),
)

doc_2 = MyVideoDoc(
    title='my_second_video_doc',
    url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true',
)

doc_2.video_tensor = parse_obj_as(VideoNdArray, doc_2.url.load().video)
# doc_2.video_tensor.save(file_path='/tmp/file_2.mp4')

Source code in docarray/typing/tensor/video/video_ndarray.py
@_register_proto(proto_type_name='video_ndarray')
class VideoNdArray(NdArray, VideoTensorMixin):
    """
    Subclass of [`NdArray`][docarray.typing.NdArray], to represent a video tensor.
    Adds video-specific features to the tensor.

    ---

    ```python
    from typing import Optional

    import numpy as np
    from pydantic import parse_obj_as

    from docarray import BaseDoc
    from docarray.typing import VideoNdArray, VideoUrl


    class MyVideoDoc(BaseDoc):
        title: str
        url: Optional[VideoUrl] = None
        video_tensor: Optional[VideoNdArray] = None


    doc_1 = MyVideoDoc(
        title='my_first_video_doc',
        video_tensor=np.random.random((100, 224, 224, 3)),
    )

    doc_2 = MyVideoDoc(
        title='my_second_video_doc',
        url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true',
    )

    doc_2.video_tensor = parse_obj_as(VideoNdArray, doc_2.url.load().video)
    # doc_2.video_tensor.save(file_path='/tmp/file_2.mp4')
    ```

    ---
    """

    @classmethod
    def _docarray_validate(
        cls: Type[T],
        value: Union[T, np.ndarray, List[Any], Tuple[Any], Any],
    ) -> T:
        tensor = super()._docarray_validate(value=value)
        return cls.validate_shape(value=tensor)

__docarray_validate_getitem__(item) classmethod

This method validates the input to AbstractTensor.__class_getitem__.

It is called at "class creation time", i.e. when a class is created with syntax of the form AnyTensor[shape].

The default implementation tries to cast any item to a tuple of ints. A subclass can override this method to implement custom validation logic.

The output of this is eventually passed to AbstractTensor.__docarray_validate_shape__ as its shape argument.

Raises ValueError if the input item does not pass validation.

Parameters:

Name Type Description Default
item Any

The item to validate, passed to __class_getitem__ (Tensor[item]).

required

Returns:

Type Description
Tuple[int]

The validated item == the target shape of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@classmethod
def __docarray_validate_getitem__(cls, item: Any) -> Tuple[int]:
    """This method validates the input to `AbstractTensor.__class_getitem__`.

    It is called at "class creation time",
    i.e. when a class is created with syntax of the form AnyTensor[shape].

    The default implementation tries to cast any `item` to a tuple of ints.
    A subclass can override this method to implement custom validation logic.

    The output of this is eventually passed to
    [`AbstractTensor.__docarray_validate_shape__`]
    [docarray.typing.tensor.abstract_tensor.AbstractTensor.__docarray_validate_shape__]
    as its `shape` argument.

    Raises `ValueError` if the input `item` does not pass validation.

    :param item: The item to validate, passed to `__class_getitem__` (`Tensor[item]`).
    :return: The validated item == the target shape of this tensor.
    """
    if isinstance(item, int):
        item = (item,)
    try:
        item = tuple(item)
    except TypeError:
        raise TypeError(f'{item} is not a valid tensor shape.')
    return item

__docarray_validate_shape__(t, shape) classmethod

Every tensor has to implement this method in order to enable syntax of the form AnyTensor[shape]. It is called when a tensor is assigned to a field of this type. i.e. when a tensor is passed to a Document field of type AnyTensor[shape].

The intended behaviour is as follows:

  • If the shape of t is equal to shape, return t.
  • If the shape of t is not equal to shape, but can be reshaped to shape, return t reshaped to shape.
  • If the shape of t is not equal to shape and cannot be reshaped to shape, raise a ValueError.

Parameters:

Name Type Description Default
t T

The tensor to validate.

required
shape Tuple[Union[int, str], ...]

The shape to validate against.

required

Returns:

Type Description
T

The validated tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@classmethod
def __docarray_validate_shape__(cls, t: T, shape: Tuple[Union[int, str], ...]) -> T:
    """Every tensor has to implement this method in order to
    enable syntax of the form AnyTensor[shape].
    It is called when a tensor is assigned to a field of this type.
    i.e. when a tensor is passed to a Document field of type AnyTensor[shape].

    The intended behaviour is as follows:

    - If the shape of `t` is equal to `shape`, return `t`.
    - If the shape of `t` is not equal to `shape`,
        but can be reshaped to `shape`, return `t` reshaped to `shape`.
    - If the shape of `t` is not equal to `shape`
        and cannot be reshaped to `shape`, raise a ValueError.

    :param t: The tensor to validate.
    :param shape: The shape to validate against.
    :return: The validated tensor.
    """
    comp_be = t.get_comp_backend()
    tshape = comp_be.shape(t)
    if tshape == shape:
        return t
    elif any(isinstance(dim, str) or dim == Ellipsis for dim in shape):
        ellipsis_occurrences = [
            pos for pos, dim in enumerate(shape) if dim == Ellipsis
        ]
        if ellipsis_occurrences:
            if len(ellipsis_occurrences) > 1:
                raise ValueError(
                    f'Cannot use Ellipsis (...) more than once for the shape {shape}'
                )
            ellipsis_pos = ellipsis_occurrences[0]
            # Calculate how many dimensions to add. Should be at least 1.
            dimensions_needed = max(len(tshape) - len(shape) + 1, 1)
            shape = (
                shape[:ellipsis_pos]
                + tuple(
                    f'__dim_var_{index}__' for index in range(dimensions_needed)
                )
                + shape[ellipsis_pos + 1 :]
            )

        if len(tshape) != len(shape):
            raise ValueError(
                f'Tensor shape mismatch. Expected {shape}, got {tshape}'
            )
        known_dims: Dict[str, int] = {}
        for tdim, dim in zip(tshape, shape):
            if isinstance(dim, int) and tdim != dim:
                raise ValueError(
                    f'Tensor shape mismatch. Expected {shape}, got {tshape}'
                )
            elif isinstance(dim, str):
                if dim in known_dims and known_dims[dim] != tdim:
                    raise ValueError(
                        f'Tensor shape mismatch. Expected {shape}, got {tshape}'
                    )
                else:
                    known_dims[dim] = tdim
        else:
            return t
    else:
        shape = cast(Tuple[int], shape)
        warnings.warn(
            f'Tensor shape mismatch. Reshaping tensor '
            f'of shape {tshape} to shape {shape}'
        )
        try:
            value = cls._docarray_from_native(comp_be.reshape(t, shape))
            return cast(T, value)
        except RuntimeError:
            raise ValueError(
                f'Cannot reshape tensor of shape {tshape} to shape {shape}'
            )

__getitem__(item) abstractmethod

Get a slice of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@abc.abstractmethod
def __getitem__(self: T, item) -> T:
    """Get a slice of this tensor."""
    ...

__iter__() abstractmethod

Iterate over the elements of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@abc.abstractmethod
def __iter__(self):
    """Iterate over the elements of this tensor."""
    ...

__setitem__(index, value) abstractmethod

Set a slice of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@abc.abstractmethod
def __setitem__(self, index, value):
    """Set a slice of this tensor."""
    ...

display(audio=None)

Display video data from tensor in notebook.

Parameters:

Name Type Description Default
audio Optional[AudioTensor]

sound to play with video tensor

None
Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def display(self, audio: Optional[AudioTensor] = None) -> None:
    """
    Display video data from tensor in notebook.

    :param audio: sound to play with video tensor
    """
    if is_notebook():
        from IPython.display import Video, display

        b = self.to_bytes(audio_tensor=audio)
        display(Video(data=b, embed=True, mimetype='video/mp4'))
    else:
        warnings.warn('Display of video is only possible in a notebook.')

from_protobuf(pb_msg) classmethod

Read ndarray from a proto msg

Parameters:

Name Type Description Default
pb_msg NdArrayProto
required

Returns:

Type Description
T

a numpy array

Source code in docarray/typing/tensor/ndarray.py
@classmethod
def from_protobuf(cls: Type[T], pb_msg: 'NdArrayProto') -> 'T':
    """
    Read ndarray from a proto msg
    :param pb_msg:
    :return: a numpy array
    """
    source = pb_msg.dense
    if source.buffer:
        x = np.frombuffer(bytearray(source.buffer), dtype=source.dtype)
        return cls._docarray_from_native(x.reshape(source.shape))
    elif len(source.shape) > 0:
        return cls._docarray_from_native(np.zeros(source.shape))
    else:
        raise ValueError(f'proto message {pb_msg} cannot be cast to a NdArray')

get_comp_backend() staticmethod

Return the computational backend of the tensor

Source code in docarray/typing/tensor/ndarray.py
@staticmethod
def get_comp_backend() -> 'NumpyCompBackend':
    """Return the computational backend of the tensor"""
    from docarray.computation.numpy_backend import NumpyCompBackend

    return NumpyCompBackend()

save(file_path, audio_tensor=None, video_frame_rate=24, video_codec='h264', audio_frame_rate=48000, audio_codec='aac', audio_format='fltp')

Save video tensor to a .mp4 file.


import numpy as np

from docarray import BaseDoc
from docarray.typing.tensor.audio.audio_tensor import AudioTensor
from docarray.typing.tensor.video.video_tensor import VideoTensor


class MyDoc(BaseDoc):
    video_tensor: VideoTensor
    audio_tensor: AudioTensor


doc = MyDoc(
    video_tensor=np.random.randint(low=0, high=256, size=(10, 200, 300, 3)),
    audio_tensor=np.random.randn(100, 1, 1024).astype("float32"),
)

doc.video_tensor.save(
    file_path="/tmp/mp_.mp4",
    audio_tensor=doc.audio_tensor,
    audio_format="flt",
)

Parameters:

Name Type Description Default
file_path Union[str, BytesIO]

path to a .mp4 file. If file is a string, open the file by that name, otherwise treat it as a file-like object.

required
audio_tensor Optional[AudioTensor]

AudioTensor containing the video's soundtrack.

None
video_frame_rate int

video frames per second.

24
video_codec str

the name of a video decoder/encoder.

'h264'
audio_frame_rate int

audio frames per second.

48000
audio_codec str

the name of an audio decoder/encoder.

'aac'
audio_format str

the name of one of the audio formats supported by PyAV, such as 'flt', 'fltp', 's16' or 's16p'.

'fltp'
Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def save(
    self: 'T',
    file_path: Union[str, BytesIO],
    audio_tensor: Optional[AudioTensor] = None,
    video_frame_rate: int = 24,
    video_codec: str = 'h264',
    audio_frame_rate: int = 48000,
    audio_codec: str = 'aac',
    audio_format: str = 'fltp',
) -> None:
    """
    Save video tensor to a .mp4 file.

    ---

    ```python
    import numpy as np

    from docarray import BaseDoc
    from docarray.typing.tensor.audio.audio_tensor import AudioTensor
    from docarray.typing.tensor.video.video_tensor import VideoTensor


    class MyDoc(BaseDoc):
        video_tensor: VideoTensor
        audio_tensor: AudioTensor


    doc = MyDoc(
        video_tensor=np.random.randint(low=0, high=256, size=(10, 200, 300, 3)),
        audio_tensor=np.random.randn(100, 1, 1024).astype("float32"),
    )

    doc.video_tensor.save(
        file_path="/tmp/mp_.mp4",
        audio_tensor=doc.audio_tensor,
        audio_format="flt",
    )
    ```

    ---
    :param file_path: path to a .mp4 file. If file is a string, open the file by
        that name, otherwise treat it as a file-like object.
    :param audio_tensor: AudioTensor containing the video's soundtrack.
    :param video_frame_rate: video frames per second.
    :param video_codec: the name of a video decoder/encoder.
    :param audio_frame_rate: audio frames per second.
    :param audio_codec: the name of an audio decoder/encoder.
    :param audio_format: the name of one of the audio formats supported by PyAV,
        such as 'flt', 'fltp', 's16' or 's16p'.
    """
    if TYPE_CHECKING:
        import av
    else:
        av = import_library('av', raise_error=True)

    np_tensor = self.get_comp_backend().to_numpy(array=self)
    video_tensor = np_tensor.astype('uint8')

    if isinstance(file_path, str):
        format = file_path.split('.')[-1]
    else:
        format = 'mp4'

    with av.open(file_path, mode='w', format=format) as container:
        if video_tensor.ndim == 3:
            video_tensor = np.expand_dims(video_tensor, axis=0)

        stream_video = container.add_stream(video_codec, rate=video_frame_rate)
        stream_video.height = video_tensor.shape[-3]
        stream_video.width = video_tensor.shape[-2]

        if audio_tensor is not None:
            stream_audio = container.add_stream(audio_codec)
            audio_np = audio_tensor.get_comp_backend().to_numpy(array=audio_tensor)
            audio_layout = 'stereo' if audio_np.shape[-2] == 2 else 'mono'

            for i, audio in enumerate(audio_np):
                frame = av.AudioFrame.from_ndarray(
                    array=audio, format=audio_format, layout=audio_layout
                )
                frame.rate = audio_frame_rate
                frame.pts = audio.shape[-1] * i
                for packet in stream_audio.encode(frame):
                    container.mux(packet)

            for packet in stream_audio.encode(None):
                container.mux(packet)

        for vid in video_tensor:
            frame = av.VideoFrame.from_ndarray(vid, format='rgb24')
            for packet in stream_video.encode(frame):
                container.mux(packet)

        for packet in stream_video.encode(None):
            container.mux(packet)

to_bytes(audio_tensor=None, video_frame_rate=24, video_codec='h264', audio_frame_rate=48000, audio_codec='aac', audio_format='fltp')

Convert video tensor to VideoBytes.

Parameters:

Name Type Description Default
audio_tensor Optional[AudioTensor]

AudioTensor containing the video's soundtrack.

None
video_frame_rate int

video frames per second.

24
video_codec str

the name of a video decoder/encoder.

'h264'
audio_frame_rate int

audio frames per second.

48000
audio_codec str

the name of an audio decoder/encoder.

'aac'
audio_format str

the name of one of the audio formats supported by PyAV, such as 'flt', 'fltp', 's16' or 's16p'.

'fltp'

Returns:

Type Description
VideoBytes

a VideoBytes object

Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def to_bytes(
    self: 'T',
    audio_tensor: Optional[AudioTensor] = None,
    video_frame_rate: int = 24,
    video_codec: str = 'h264',
    audio_frame_rate: int = 48000,
    audio_codec: str = 'aac',
    audio_format: str = 'fltp',
) -> 'VideoBytes':
    """
    Convert video tensor to [`VideoBytes`][docarray.typing.VideoBytes].

    :param audio_tensor: AudioTensor containing the video's soundtrack.
    :param video_frame_rate: video frames per second.
    :param video_codec: the name of a video decoder/encoder.
    :param audio_frame_rate: audio frames per second.
    :param audio_codec: the name of an audio decoder/encoder.
    :param audio_format: the name of one of the audio formats supported by PyAV,
        such as 'flt', 'fltp', 's16' or 's16p'.

    :return: a VideoBytes object
    """
    from docarray.typing.bytes.video_bytes import VideoBytes

    bytes = BytesIO()
    self.save(
        file_path=bytes,
        audio_tensor=audio_tensor,
        video_frame_rate=video_frame_rate,
        video_codec=video_codec,
        audio_frame_rate=audio_frame_rate,
        audio_codec=audio_codec,
        audio_format=audio_format,
    )
    return VideoBytes(bytes.getvalue())

to_protobuf()

Transform self into a NdArrayProto protobuf message

Source code in docarray/typing/tensor/ndarray.py
def to_protobuf(self) -> 'NdArrayProto':
    """
    Transform self into a NdArrayProto protobuf message
    """
    from docarray.proto import NdArrayProto

    nd_proto = NdArrayProto()

    nd_proto.dense.buffer = self.tobytes()
    nd_proto.dense.ClearField('shape')
    nd_proto.dense.shape.extend(list(self.shape))
    nd_proto.dense.dtype = self.dtype.str

    return nd_proto

unwrap()

Return the original ndarray without any memory copy.

The original view rest intact and is still a Document NdArray but the return object is a pure np.ndarray but both object share the same memory layout.


from docarray.typing import NdArray
import numpy as np
from pydantic import parse_obj_as

t1 = parse_obj_as(NdArray, np.zeros((3, 224, 224)))
t2 = t1.unwrap()
# here t2 is a pure np.ndarray but t1 is still a Docarray NdArray
# But both share the same underlying memory

Returns:

Type Description
ndarray

a numpy.ndarray

Source code in docarray/typing/tensor/ndarray.py
def unwrap(self) -> np.ndarray:
    """
    Return the original ndarray without any memory copy.

    The original view rest intact and is still a Document `NdArray`
    but the return object is a pure `np.ndarray` but both object share
    the same memory layout.

    ---

    ```python
    from docarray.typing import NdArray
    import numpy as np
    from pydantic import parse_obj_as

    t1 = parse_obj_as(NdArray, np.zeros((3, 224, 224)))
    t2 = t1.unwrap()
    # here t2 is a pure np.ndarray but t1 is still a Docarray NdArray
    # But both share the same underlying memory
    ```

    ---

    :return: a `numpy.ndarray`
    """
    return self.view(np.ndarray)

docarray.typing.tensor.video.video_tensor_mixin

VideoTensorMixin

Bases: AbstractTensor, ABC

Source code in docarray/typing/tensor/video/video_tensor_mixin.py
class VideoTensorMixin(AbstractTensor, abc.ABC):
    @classmethod
    def validate_shape(cls: Type['T'], value: 'T') -> 'T':
        comp_be = cls.get_comp_backend()
        shape = comp_be.shape(value)  # type: ignore
        if comp_be.n_dim(value) not in [3, 4] or shape[-1] != 3:  # type: ignore
            raise ValueError(
                f'Expects tensor with 3 or 4 dimensions and the last dimension equal '
                f'to 3, but received {shape}.'
            )
        else:
            return value

    def save(
        self: 'T',
        file_path: Union[str, BytesIO],
        audio_tensor: Optional[AudioTensor] = None,
        video_frame_rate: int = 24,
        video_codec: str = 'h264',
        audio_frame_rate: int = 48000,
        audio_codec: str = 'aac',
        audio_format: str = 'fltp',
    ) -> None:
        """
        Save video tensor to a .mp4 file.

        ---

        ```python
        import numpy as np

        from docarray import BaseDoc
        from docarray.typing.tensor.audio.audio_tensor import AudioTensor
        from docarray.typing.tensor.video.video_tensor import VideoTensor


        class MyDoc(BaseDoc):
            video_tensor: VideoTensor
            audio_tensor: AudioTensor


        doc = MyDoc(
            video_tensor=np.random.randint(low=0, high=256, size=(10, 200, 300, 3)),
            audio_tensor=np.random.randn(100, 1, 1024).astype("float32"),
        )

        doc.video_tensor.save(
            file_path="/tmp/mp_.mp4",
            audio_tensor=doc.audio_tensor,
            audio_format="flt",
        )
        ```

        ---
        :param file_path: path to a .mp4 file. If file is a string, open the file by
            that name, otherwise treat it as a file-like object.
        :param audio_tensor: AudioTensor containing the video's soundtrack.
        :param video_frame_rate: video frames per second.
        :param video_codec: the name of a video decoder/encoder.
        :param audio_frame_rate: audio frames per second.
        :param audio_codec: the name of an audio decoder/encoder.
        :param audio_format: the name of one of the audio formats supported by PyAV,
            such as 'flt', 'fltp', 's16' or 's16p'.
        """
        if TYPE_CHECKING:
            import av
        else:
            av = import_library('av', raise_error=True)

        np_tensor = self.get_comp_backend().to_numpy(array=self)
        video_tensor = np_tensor.astype('uint8')

        if isinstance(file_path, str):
            format = file_path.split('.')[-1]
        else:
            format = 'mp4'

        with av.open(file_path, mode='w', format=format) as container:
            if video_tensor.ndim == 3:
                video_tensor = np.expand_dims(video_tensor, axis=0)

            stream_video = container.add_stream(video_codec, rate=video_frame_rate)
            stream_video.height = video_tensor.shape[-3]
            stream_video.width = video_tensor.shape[-2]

            if audio_tensor is not None:
                stream_audio = container.add_stream(audio_codec)
                audio_np = audio_tensor.get_comp_backend().to_numpy(array=audio_tensor)
                audio_layout = 'stereo' if audio_np.shape[-2] == 2 else 'mono'

                for i, audio in enumerate(audio_np):
                    frame = av.AudioFrame.from_ndarray(
                        array=audio, format=audio_format, layout=audio_layout
                    )
                    frame.rate = audio_frame_rate
                    frame.pts = audio.shape[-1] * i
                    for packet in stream_audio.encode(frame):
                        container.mux(packet)

                for packet in stream_audio.encode(None):
                    container.mux(packet)

            for vid in video_tensor:
                frame = av.VideoFrame.from_ndarray(vid, format='rgb24')
                for packet in stream_video.encode(frame):
                    container.mux(packet)

            for packet in stream_video.encode(None):
                container.mux(packet)

    def to_bytes(
        self: 'T',
        audio_tensor: Optional[AudioTensor] = None,
        video_frame_rate: int = 24,
        video_codec: str = 'h264',
        audio_frame_rate: int = 48000,
        audio_codec: str = 'aac',
        audio_format: str = 'fltp',
    ) -> 'VideoBytes':
        """
        Convert video tensor to [`VideoBytes`][docarray.typing.VideoBytes].

        :param audio_tensor: AudioTensor containing the video's soundtrack.
        :param video_frame_rate: video frames per second.
        :param video_codec: the name of a video decoder/encoder.
        :param audio_frame_rate: audio frames per second.
        :param audio_codec: the name of an audio decoder/encoder.
        :param audio_format: the name of one of the audio formats supported by PyAV,
            such as 'flt', 'fltp', 's16' or 's16p'.

        :return: a VideoBytes object
        """
        from docarray.typing.bytes.video_bytes import VideoBytes

        bytes = BytesIO()
        self.save(
            file_path=bytes,
            audio_tensor=audio_tensor,
            video_frame_rate=video_frame_rate,
            video_codec=video_codec,
            audio_frame_rate=audio_frame_rate,
            audio_codec=audio_codec,
            audio_format=audio_format,
        )
        return VideoBytes(bytes.getvalue())

    def display(self, audio: Optional[AudioTensor] = None) -> None:
        """
        Display video data from tensor in notebook.

        :param audio: sound to play with video tensor
        """
        if is_notebook():
            from IPython.display import Video, display

            b = self.to_bytes(audio_tensor=audio)
            display(Video(data=b, embed=True, mimetype='video/mp4'))
        else:
            warnings.warn('Display of video is only possible in a notebook.')

__docarray_validate_getitem__(item) classmethod

This method validates the input to AbstractTensor.__class_getitem__.

It is called at "class creation time", i.e. when a class is created with syntax of the form AnyTensor[shape].

The default implementation tries to cast any item to a tuple of ints. A subclass can override this method to implement custom validation logic.

The output of this is eventually passed to AbstractTensor.__docarray_validate_shape__ as its shape argument.

Raises ValueError if the input item does not pass validation.

Parameters:

Name Type Description Default
item Any

The item to validate, passed to __class_getitem__ (Tensor[item]).

required

Returns:

Type Description
Tuple[int]

The validated item == the target shape of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@classmethod
def __docarray_validate_getitem__(cls, item: Any) -> Tuple[int]:
    """This method validates the input to `AbstractTensor.__class_getitem__`.

    It is called at "class creation time",
    i.e. when a class is created with syntax of the form AnyTensor[shape].

    The default implementation tries to cast any `item` to a tuple of ints.
    A subclass can override this method to implement custom validation logic.

    The output of this is eventually passed to
    [`AbstractTensor.__docarray_validate_shape__`]
    [docarray.typing.tensor.abstract_tensor.AbstractTensor.__docarray_validate_shape__]
    as its `shape` argument.

    Raises `ValueError` if the input `item` does not pass validation.

    :param item: The item to validate, passed to `__class_getitem__` (`Tensor[item]`).
    :return: The validated item == the target shape of this tensor.
    """
    if isinstance(item, int):
        item = (item,)
    try:
        item = tuple(item)
    except TypeError:
        raise TypeError(f'{item} is not a valid tensor shape.')
    return item

__docarray_validate_shape__(t, shape) classmethod

Every tensor has to implement this method in order to enable syntax of the form AnyTensor[shape]. It is called when a tensor is assigned to a field of this type. i.e. when a tensor is passed to a Document field of type AnyTensor[shape].

The intended behaviour is as follows:

  • If the shape of t is equal to shape, return t.
  • If the shape of t is not equal to shape, but can be reshaped to shape, return t reshaped to shape.
  • If the shape of t is not equal to shape and cannot be reshaped to shape, raise a ValueError.

Parameters:

Name Type Description Default
t T

The tensor to validate.

required
shape Tuple[Union[int, str], ...]

The shape to validate against.

required

Returns:

Type Description
T

The validated tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@classmethod
def __docarray_validate_shape__(cls, t: T, shape: Tuple[Union[int, str], ...]) -> T:
    """Every tensor has to implement this method in order to
    enable syntax of the form AnyTensor[shape].
    It is called when a tensor is assigned to a field of this type.
    i.e. when a tensor is passed to a Document field of type AnyTensor[shape].

    The intended behaviour is as follows:

    - If the shape of `t` is equal to `shape`, return `t`.
    - If the shape of `t` is not equal to `shape`,
        but can be reshaped to `shape`, return `t` reshaped to `shape`.
    - If the shape of `t` is not equal to `shape`
        and cannot be reshaped to `shape`, raise a ValueError.

    :param t: The tensor to validate.
    :param shape: The shape to validate against.
    :return: The validated tensor.
    """
    comp_be = t.get_comp_backend()
    tshape = comp_be.shape(t)
    if tshape == shape:
        return t
    elif any(isinstance(dim, str) or dim == Ellipsis for dim in shape):
        ellipsis_occurrences = [
            pos for pos, dim in enumerate(shape) if dim == Ellipsis
        ]
        if ellipsis_occurrences:
            if len(ellipsis_occurrences) > 1:
                raise ValueError(
                    f'Cannot use Ellipsis (...) more than once for the shape {shape}'
                )
            ellipsis_pos = ellipsis_occurrences[0]
            # Calculate how many dimensions to add. Should be at least 1.
            dimensions_needed = max(len(tshape) - len(shape) + 1, 1)
            shape = (
                shape[:ellipsis_pos]
                + tuple(
                    f'__dim_var_{index}__' for index in range(dimensions_needed)
                )
                + shape[ellipsis_pos + 1 :]
            )

        if len(tshape) != len(shape):
            raise ValueError(
                f'Tensor shape mismatch. Expected {shape}, got {tshape}'
            )
        known_dims: Dict[str, int] = {}
        for tdim, dim in zip(tshape, shape):
            if isinstance(dim, int) and tdim != dim:
                raise ValueError(
                    f'Tensor shape mismatch. Expected {shape}, got {tshape}'
                )
            elif isinstance(dim, str):
                if dim in known_dims and known_dims[dim] != tdim:
                    raise ValueError(
                        f'Tensor shape mismatch. Expected {shape}, got {tshape}'
                    )
                else:
                    known_dims[dim] = tdim
        else:
            return t
    else:
        shape = cast(Tuple[int], shape)
        warnings.warn(
            f'Tensor shape mismatch. Reshaping tensor '
            f'of shape {tshape} to shape {shape}'
        )
        try:
            value = cls._docarray_from_native(comp_be.reshape(t, shape))
            return cast(T, value)
        except RuntimeError:
            raise ValueError(
                f'Cannot reshape tensor of shape {tshape} to shape {shape}'
            )

__getitem__(item) abstractmethod

Get a slice of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@abc.abstractmethod
def __getitem__(self: T, item) -> T:
    """Get a slice of this tensor."""
    ...

__iter__() abstractmethod

Iterate over the elements of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@abc.abstractmethod
def __iter__(self):
    """Iterate over the elements of this tensor."""
    ...

__setitem__(index, value) abstractmethod

Set a slice of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@abc.abstractmethod
def __setitem__(self, index, value):
    """Set a slice of this tensor."""
    ...

display(audio=None)

Display video data from tensor in notebook.

Parameters:

Name Type Description Default
audio Optional[AudioTensor]

sound to play with video tensor

None
Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def display(self, audio: Optional[AudioTensor] = None) -> None:
    """
    Display video data from tensor in notebook.

    :param audio: sound to play with video tensor
    """
    if is_notebook():
        from IPython.display import Video, display

        b = self.to_bytes(audio_tensor=audio)
        display(Video(data=b, embed=True, mimetype='video/mp4'))
    else:
        warnings.warn('Display of video is only possible in a notebook.')

get_comp_backend() abstractmethod staticmethod

The computational backend compatible with this tensor type.

Source code in docarray/typing/tensor/abstract_tensor.py
@staticmethod
@abc.abstractmethod
def get_comp_backend() -> AbstractComputationalBackend:
    """The computational backend compatible with this tensor type."""
    ...

save(file_path, audio_tensor=None, video_frame_rate=24, video_codec='h264', audio_frame_rate=48000, audio_codec='aac', audio_format='fltp')

Save video tensor to a .mp4 file.


import numpy as np

from docarray import BaseDoc
from docarray.typing.tensor.audio.audio_tensor import AudioTensor
from docarray.typing.tensor.video.video_tensor import VideoTensor


class MyDoc(BaseDoc):
    video_tensor: VideoTensor
    audio_tensor: AudioTensor


doc = MyDoc(
    video_tensor=np.random.randint(low=0, high=256, size=(10, 200, 300, 3)),
    audio_tensor=np.random.randn(100, 1, 1024).astype("float32"),
)

doc.video_tensor.save(
    file_path="/tmp/mp_.mp4",
    audio_tensor=doc.audio_tensor,
    audio_format="flt",
)

Parameters:

Name Type Description Default
file_path Union[str, BytesIO]

path to a .mp4 file. If file is a string, open the file by that name, otherwise treat it as a file-like object.

required
audio_tensor Optional[AudioTensor]

AudioTensor containing the video's soundtrack.

None
video_frame_rate int

video frames per second.

24
video_codec str

the name of a video decoder/encoder.

'h264'
audio_frame_rate int

audio frames per second.

48000
audio_codec str

the name of an audio decoder/encoder.

'aac'
audio_format str

the name of one of the audio formats supported by PyAV, such as 'flt', 'fltp', 's16' or 's16p'.

'fltp'
Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def save(
    self: 'T',
    file_path: Union[str, BytesIO],
    audio_tensor: Optional[AudioTensor] = None,
    video_frame_rate: int = 24,
    video_codec: str = 'h264',
    audio_frame_rate: int = 48000,
    audio_codec: str = 'aac',
    audio_format: str = 'fltp',
) -> None:
    """
    Save video tensor to a .mp4 file.

    ---

    ```python
    import numpy as np

    from docarray import BaseDoc
    from docarray.typing.tensor.audio.audio_tensor import AudioTensor
    from docarray.typing.tensor.video.video_tensor import VideoTensor


    class MyDoc(BaseDoc):
        video_tensor: VideoTensor
        audio_tensor: AudioTensor


    doc = MyDoc(
        video_tensor=np.random.randint(low=0, high=256, size=(10, 200, 300, 3)),
        audio_tensor=np.random.randn(100, 1, 1024).astype("float32"),
    )

    doc.video_tensor.save(
        file_path="/tmp/mp_.mp4",
        audio_tensor=doc.audio_tensor,
        audio_format="flt",
    )
    ```

    ---
    :param file_path: path to a .mp4 file. If file is a string, open the file by
        that name, otherwise treat it as a file-like object.
    :param audio_tensor: AudioTensor containing the video's soundtrack.
    :param video_frame_rate: video frames per second.
    :param video_codec: the name of a video decoder/encoder.
    :param audio_frame_rate: audio frames per second.
    :param audio_codec: the name of an audio decoder/encoder.
    :param audio_format: the name of one of the audio formats supported by PyAV,
        such as 'flt', 'fltp', 's16' or 's16p'.
    """
    if TYPE_CHECKING:
        import av
    else:
        av = import_library('av', raise_error=True)

    np_tensor = self.get_comp_backend().to_numpy(array=self)
    video_tensor = np_tensor.astype('uint8')

    if isinstance(file_path, str):
        format = file_path.split('.')[-1]
    else:
        format = 'mp4'

    with av.open(file_path, mode='w', format=format) as container:
        if video_tensor.ndim == 3:
            video_tensor = np.expand_dims(video_tensor, axis=0)

        stream_video = container.add_stream(video_codec, rate=video_frame_rate)
        stream_video.height = video_tensor.shape[-3]
        stream_video.width = video_tensor.shape[-2]

        if audio_tensor is not None:
            stream_audio = container.add_stream(audio_codec)
            audio_np = audio_tensor.get_comp_backend().to_numpy(array=audio_tensor)
            audio_layout = 'stereo' if audio_np.shape[-2] == 2 else 'mono'

            for i, audio in enumerate(audio_np):
                frame = av.AudioFrame.from_ndarray(
                    array=audio, format=audio_format, layout=audio_layout
                )
                frame.rate = audio_frame_rate
                frame.pts = audio.shape[-1] * i
                for packet in stream_audio.encode(frame):
                    container.mux(packet)

            for packet in stream_audio.encode(None):
                container.mux(packet)

        for vid in video_tensor:
            frame = av.VideoFrame.from_ndarray(vid, format='rgb24')
            for packet in stream_video.encode(frame):
                container.mux(packet)

        for packet in stream_video.encode(None):
            container.mux(packet)

to_bytes(audio_tensor=None, video_frame_rate=24, video_codec='h264', audio_frame_rate=48000, audio_codec='aac', audio_format='fltp')

Convert video tensor to VideoBytes.

Parameters:

Name Type Description Default
audio_tensor Optional[AudioTensor]

AudioTensor containing the video's soundtrack.

None
video_frame_rate int

video frames per second.

24
video_codec str

the name of a video decoder/encoder.

'h264'
audio_frame_rate int

audio frames per second.

48000
audio_codec str

the name of an audio decoder/encoder.

'aac'
audio_format str

the name of one of the audio formats supported by PyAV, such as 'flt', 'fltp', 's16' or 's16p'.

'fltp'

Returns:

Type Description
VideoBytes

a VideoBytes object

Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def to_bytes(
    self: 'T',
    audio_tensor: Optional[AudioTensor] = None,
    video_frame_rate: int = 24,
    video_codec: str = 'h264',
    audio_frame_rate: int = 48000,
    audio_codec: str = 'aac',
    audio_format: str = 'fltp',
) -> 'VideoBytes':
    """
    Convert video tensor to [`VideoBytes`][docarray.typing.VideoBytes].

    :param audio_tensor: AudioTensor containing the video's soundtrack.
    :param video_frame_rate: video frames per second.
    :param video_codec: the name of a video decoder/encoder.
    :param audio_frame_rate: audio frames per second.
    :param audio_codec: the name of an audio decoder/encoder.
    :param audio_format: the name of one of the audio formats supported by PyAV,
        such as 'flt', 'fltp', 's16' or 's16p'.

    :return: a VideoBytes object
    """
    from docarray.typing.bytes.video_bytes import VideoBytes

    bytes = BytesIO()
    self.save(
        file_path=bytes,
        audio_tensor=audio_tensor,
        video_frame_rate=video_frame_rate,
        video_codec=video_codec,
        audio_frame_rate=audio_frame_rate,
        audio_codec=audio_codec,
        audio_format=audio_format,
    )
    return VideoBytes(bytes.getvalue())

to_protobuf() abstractmethod

Convert DocList into a Protobuf message

Source code in docarray/typing/tensor/abstract_tensor.py
@abc.abstractmethod
def to_protobuf(self) -> 'NdArrayProto':
    """Convert DocList into a Protobuf message"""
    ...

unwrap()

Return the native tensor object that this DocList tensor wraps.

Source code in docarray/typing/tensor/abstract_tensor.py
def unwrap(self):
    """Return the native tensor object that this DocList tensor wraps."""

docarray.typing.tensor.video.video_tensorflow_tensor

VideoTensorFlowTensor

Bases: TensorFlowTensor, VideoTensorMixin

Subclass of TensorFlowTensor, to represent a video tensor. Adds video-specific features to the tensor.


from typing import Optional

import tensorflow as tf

from docarray import BaseDoc
from docarray.typing import VideoTensorFlowTensor, VideoUrl


class MyVideoDoc(BaseDoc):
    title: str
    url: Optional[VideoUrl]
    video_tensor: Optional[VideoTensorFlowTensor]


doc_1 = MyVideoDoc(
    title='my_first_video_doc',
    video_tensor=tf.random.normal((100, 224, 224, 3)),
)
# doc_1.video_tensor.save(file_path='file_1.mp4')

doc_2 = MyVideoDoc(
    title='my_second_video_doc',
    url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true',
)

doc_2.video_tensor = doc_2.url.load().video
# doc_2.video_tensor.save(file_path='file_2.wav')

Source code in docarray/typing/tensor/video/video_tensorflow_tensor.py
@_register_proto(proto_type_name='video_tensorflow_tensor')
class VideoTensorFlowTensor(
    TensorFlowTensor, VideoTensorMixin, metaclass=metaTensorFlow
):
    """
    Subclass of [`TensorFlowTensor`][docarray.typing.TensorFlowTensor],
    to represent a video tensor. Adds video-specific features to the tensor.

    ---

    ```python
    from typing import Optional

    import tensorflow as tf

    from docarray import BaseDoc
    from docarray.typing import VideoTensorFlowTensor, VideoUrl


    class MyVideoDoc(BaseDoc):
        title: str
        url: Optional[VideoUrl]
        video_tensor: Optional[VideoTensorFlowTensor]


    doc_1 = MyVideoDoc(
        title='my_first_video_doc',
        video_tensor=tf.random.normal((100, 224, 224, 3)),
    )
    # doc_1.video_tensor.save(file_path='file_1.mp4')

    doc_2 = MyVideoDoc(
        title='my_second_video_doc',
        url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true',
    )

    doc_2.video_tensor = doc_2.url.load().video
    # doc_2.video_tensor.save(file_path='file_2.wav')
    ```

    ---
    """

    @classmethod
    def _docarray_validate(
        cls: Type[T],
        value: Union[T, np.ndarray, List[Any], Tuple[Any], Any],
    ) -> T:
        tensor = super()._docarray_validate(value=value)
        return cls.validate_shape(value=tensor)

__docarray_validate_getitem__(item) classmethod

This method validates the input to AbstractTensor.__class_getitem__.

It is called at "class creation time", i.e. when a class is created with syntax of the form AnyTensor[shape].

The default implementation tries to cast any item to a tuple of ints. A subclass can override this method to implement custom validation logic.

The output of this is eventually passed to AbstractTensor.__docarray_validate_shape__ as its shape argument.

Raises ValueError if the input item does not pass validation.

Parameters:

Name Type Description Default
item Any

The item to validate, passed to __class_getitem__ (Tensor[item]).

required

Returns:

Type Description
Tuple[int]

The validated item == the target shape of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@classmethod
def __docarray_validate_getitem__(cls, item: Any) -> Tuple[int]:
    """This method validates the input to `AbstractTensor.__class_getitem__`.

    It is called at "class creation time",
    i.e. when a class is created with syntax of the form AnyTensor[shape].

    The default implementation tries to cast any `item` to a tuple of ints.
    A subclass can override this method to implement custom validation logic.

    The output of this is eventually passed to
    [`AbstractTensor.__docarray_validate_shape__`]
    [docarray.typing.tensor.abstract_tensor.AbstractTensor.__docarray_validate_shape__]
    as its `shape` argument.

    Raises `ValueError` if the input `item` does not pass validation.

    :param item: The item to validate, passed to `__class_getitem__` (`Tensor[item]`).
    :return: The validated item == the target shape of this tensor.
    """
    if isinstance(item, int):
        item = (item,)
    try:
        item = tuple(item)
    except TypeError:
        raise TypeError(f'{item} is not a valid tensor shape.')
    return item

__docarray_validate_shape__(t, shape) classmethod

Every tensor has to implement this method in order to enable syntax of the form AnyTensor[shape]. It is called when a tensor is assigned to a field of this type. i.e. when a tensor is passed to a Document field of type AnyTensor[shape].

The intended behaviour is as follows:

  • If the shape of t is equal to shape, return t.
  • If the shape of t is not equal to shape, but can be reshaped to shape, return t reshaped to shape.
  • If the shape of t is not equal to shape and cannot be reshaped to shape, raise a ValueError.

Parameters:

Name Type Description Default
t T

The tensor to validate.

required
shape Tuple[Union[int, str], ...]

The shape to validate against.

required

Returns:

Type Description
T

The validated tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@classmethod
def __docarray_validate_shape__(cls, t: T, shape: Tuple[Union[int, str], ...]) -> T:
    """Every tensor has to implement this method in order to
    enable syntax of the form AnyTensor[shape].
    It is called when a tensor is assigned to a field of this type.
    i.e. when a tensor is passed to a Document field of type AnyTensor[shape].

    The intended behaviour is as follows:

    - If the shape of `t` is equal to `shape`, return `t`.
    - If the shape of `t` is not equal to `shape`,
        but can be reshaped to `shape`, return `t` reshaped to `shape`.
    - If the shape of `t` is not equal to `shape`
        and cannot be reshaped to `shape`, raise a ValueError.

    :param t: The tensor to validate.
    :param shape: The shape to validate against.
    :return: The validated tensor.
    """
    comp_be = t.get_comp_backend()
    tshape = comp_be.shape(t)
    if tshape == shape:
        return t
    elif any(isinstance(dim, str) or dim == Ellipsis for dim in shape):
        ellipsis_occurrences = [
            pos for pos, dim in enumerate(shape) if dim == Ellipsis
        ]
        if ellipsis_occurrences:
            if len(ellipsis_occurrences) > 1:
                raise ValueError(
                    f'Cannot use Ellipsis (...) more than once for the shape {shape}'
                )
            ellipsis_pos = ellipsis_occurrences[0]
            # Calculate how many dimensions to add. Should be at least 1.
            dimensions_needed = max(len(tshape) - len(shape) + 1, 1)
            shape = (
                shape[:ellipsis_pos]
                + tuple(
                    f'__dim_var_{index}__' for index in range(dimensions_needed)
                )
                + shape[ellipsis_pos + 1 :]
            )

        if len(tshape) != len(shape):
            raise ValueError(
                f'Tensor shape mismatch. Expected {shape}, got {tshape}'
            )
        known_dims: Dict[str, int] = {}
        for tdim, dim in zip(tshape, shape):
            if isinstance(dim, int) and tdim != dim:
                raise ValueError(
                    f'Tensor shape mismatch. Expected {shape}, got {tshape}'
                )
            elif isinstance(dim, str):
                if dim in known_dims and known_dims[dim] != tdim:
                    raise ValueError(
                        f'Tensor shape mismatch. Expected {shape}, got {tshape}'
                    )
                else:
                    known_dims[dim] = tdim
        else:
            return t
    else:
        shape = cast(Tuple[int], shape)
        warnings.warn(
            f'Tensor shape mismatch. Reshaping tensor '
            f'of shape {tshape} to shape {shape}'
        )
        try:
            value = cls._docarray_from_native(comp_be.reshape(t, shape))
            return cast(T, value)
        except RuntimeError:
            raise ValueError(
                f'Cannot reshape tensor of shape {tshape} to shape {shape}'
            )

__iter__()

Iterate over the elements of this tensor's tf.Tensor.

Source code in docarray/typing/tensor/tensorflow_tensor.py
def __iter__(self):
    """Iterate over the elements of this tensor's `tf.Tensor`."""
    for i in range(len(self)):
        yield self[i]

__setitem__(index, value)

Set a slice of this tensor's tf.Tensor

Source code in docarray/typing/tensor/tensorflow_tensor.py
def __setitem__(self, index, value):
    """Set a slice of this tensor's `tf.Tensor`"""
    t = self.unwrap()
    value = tf.cast(value, dtype=t.dtype)
    var = tf.Variable(t)
    var[index].assign(value)
    self.tensor = tf.constant(var)

display(audio=None)

Display video data from tensor in notebook.

Parameters:

Name Type Description Default
audio Optional[AudioTensor]

sound to play with video tensor

None
Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def display(self, audio: Optional[AudioTensor] = None) -> None:
    """
    Display video data from tensor in notebook.

    :param audio: sound to play with video tensor
    """
    if is_notebook():
        from IPython.display import Video, display

        b = self.to_bytes(audio_tensor=audio)
        display(Video(data=b, embed=True, mimetype='video/mp4'))
    else:
        warnings.warn('Display of video is only possible in a notebook.')

from_ndarray(value) classmethod

Create a TensorFlowTensor from a numpy array.

Parameters:

Name Type Description Default
value ndarray

the numpy array

required

Returns:

Type Description
T

a TensorFlowTensor

Source code in docarray/typing/tensor/tensorflow_tensor.py
@classmethod
def from_ndarray(cls: Type[T], value: np.ndarray) -> T:
    """Create a `TensorFlowTensor` from a numpy array.

    :param value: the numpy array
    :return: a `TensorFlowTensor`
    """
    return cls._docarray_from_native(tf.convert_to_tensor(value))

from_protobuf(pb_msg) classmethod

Read ndarray from a proto msg.

Parameters:

Name Type Description Default
pb_msg NdArrayProto
required

Returns:

Type Description
T

a TensorFlowTensor

Source code in docarray/typing/tensor/tensorflow_tensor.py
@classmethod
def from_protobuf(cls: Type[T], pb_msg: 'NdArrayProto') -> 'T':
    """
    Read ndarray from a proto msg.
    :param pb_msg:
    :return: a `TensorFlowTensor`
    """
    source = pb_msg.dense
    if source.buffer:
        x = np.frombuffer(bytearray(source.buffer), dtype=source.dtype)
        return cls.from_ndarray(x.reshape(source.shape))
    elif len(source.shape) > 0:
        return cls.from_ndarray(np.zeros(source.shape))
    else:
        raise ValueError(
            f'Proto message {pb_msg} cannot be cast to a TensorFlowTensor.'
        )

get_comp_backend() staticmethod

Return the computational backend of the tensor

Source code in docarray/typing/tensor/tensorflow_tensor.py
@staticmethod
def get_comp_backend() -> 'TensorFlowCompBackend':
    """Return the computational backend of the tensor"""
    from docarray.computation.tensorflow_backend import TensorFlowCompBackend

    return TensorFlowCompBackend()

save(file_path, audio_tensor=None, video_frame_rate=24, video_codec='h264', audio_frame_rate=48000, audio_codec='aac', audio_format='fltp')

Save video tensor to a .mp4 file.


import numpy as np

from docarray import BaseDoc
from docarray.typing.tensor.audio.audio_tensor import AudioTensor
from docarray.typing.tensor.video.video_tensor import VideoTensor


class MyDoc(BaseDoc):
    video_tensor: VideoTensor
    audio_tensor: AudioTensor


doc = MyDoc(
    video_tensor=np.random.randint(low=0, high=256, size=(10, 200, 300, 3)),
    audio_tensor=np.random.randn(100, 1, 1024).astype("float32"),
)

doc.video_tensor.save(
    file_path="/tmp/mp_.mp4",
    audio_tensor=doc.audio_tensor,
    audio_format="flt",
)

Parameters:

Name Type Description Default
file_path Union[str, BytesIO]

path to a .mp4 file. If file is a string, open the file by that name, otherwise treat it as a file-like object.

required
audio_tensor Optional[AudioTensor]

AudioTensor containing the video's soundtrack.

None
video_frame_rate int

video frames per second.

24
video_codec str

the name of a video decoder/encoder.

'h264'
audio_frame_rate int

audio frames per second.

48000
audio_codec str

the name of an audio decoder/encoder.

'aac'
audio_format str

the name of one of the audio formats supported by PyAV, such as 'flt', 'fltp', 's16' or 's16p'.

'fltp'
Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def save(
    self: 'T',
    file_path: Union[str, BytesIO],
    audio_tensor: Optional[AudioTensor] = None,
    video_frame_rate: int = 24,
    video_codec: str = 'h264',
    audio_frame_rate: int = 48000,
    audio_codec: str = 'aac',
    audio_format: str = 'fltp',
) -> None:
    """
    Save video tensor to a .mp4 file.

    ---

    ```python
    import numpy as np

    from docarray import BaseDoc
    from docarray.typing.tensor.audio.audio_tensor import AudioTensor
    from docarray.typing.tensor.video.video_tensor import VideoTensor


    class MyDoc(BaseDoc):
        video_tensor: VideoTensor
        audio_tensor: AudioTensor


    doc = MyDoc(
        video_tensor=np.random.randint(low=0, high=256, size=(10, 200, 300, 3)),
        audio_tensor=np.random.randn(100, 1, 1024).astype("float32"),
    )

    doc.video_tensor.save(
        file_path="/tmp/mp_.mp4",
        audio_tensor=doc.audio_tensor,
        audio_format="flt",
    )
    ```

    ---
    :param file_path: path to a .mp4 file. If file is a string, open the file by
        that name, otherwise treat it as a file-like object.
    :param audio_tensor: AudioTensor containing the video's soundtrack.
    :param video_frame_rate: video frames per second.
    :param video_codec: the name of a video decoder/encoder.
    :param audio_frame_rate: audio frames per second.
    :param audio_codec: the name of an audio decoder/encoder.
    :param audio_format: the name of one of the audio formats supported by PyAV,
        such as 'flt', 'fltp', 's16' or 's16p'.
    """
    if TYPE_CHECKING:
        import av
    else:
        av = import_library('av', raise_error=True)

    np_tensor = self.get_comp_backend().to_numpy(array=self)
    video_tensor = np_tensor.astype('uint8')

    if isinstance(file_path, str):
        format = file_path.split('.')[-1]
    else:
        format = 'mp4'

    with av.open(file_path, mode='w', format=format) as container:
        if video_tensor.ndim == 3:
            video_tensor = np.expand_dims(video_tensor, axis=0)

        stream_video = container.add_stream(video_codec, rate=video_frame_rate)
        stream_video.height = video_tensor.shape[-3]
        stream_video.width = video_tensor.shape[-2]

        if audio_tensor is not None:
            stream_audio = container.add_stream(audio_codec)
            audio_np = audio_tensor.get_comp_backend().to_numpy(array=audio_tensor)
            audio_layout = 'stereo' if audio_np.shape[-2] == 2 else 'mono'

            for i, audio in enumerate(audio_np):
                frame = av.AudioFrame.from_ndarray(
                    array=audio, format=audio_format, layout=audio_layout
                )
                frame.rate = audio_frame_rate
                frame.pts = audio.shape[-1] * i
                for packet in stream_audio.encode(frame):
                    container.mux(packet)

            for packet in stream_audio.encode(None):
                container.mux(packet)

        for vid in video_tensor:
            frame = av.VideoFrame.from_ndarray(vid, format='rgb24')
            for packet in stream_video.encode(frame):
                container.mux(packet)

        for packet in stream_video.encode(None):
            container.mux(packet)

to_bytes(audio_tensor=None, video_frame_rate=24, video_codec='h264', audio_frame_rate=48000, audio_codec='aac', audio_format='fltp')

Convert video tensor to VideoBytes.

Parameters:

Name Type Description Default
audio_tensor Optional[AudioTensor]

AudioTensor containing the video's soundtrack.

None
video_frame_rate int

video frames per second.

24
video_codec str

the name of a video decoder/encoder.

'h264'
audio_frame_rate int

audio frames per second.

48000
audio_codec str

the name of an audio decoder/encoder.

'aac'
audio_format str

the name of one of the audio formats supported by PyAV, such as 'flt', 'fltp', 's16' or 's16p'.

'fltp'

Returns:

Type Description
VideoBytes

a VideoBytes object

Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def to_bytes(
    self: 'T',
    audio_tensor: Optional[AudioTensor] = None,
    video_frame_rate: int = 24,
    video_codec: str = 'h264',
    audio_frame_rate: int = 48000,
    audio_codec: str = 'aac',
    audio_format: str = 'fltp',
) -> 'VideoBytes':
    """
    Convert video tensor to [`VideoBytes`][docarray.typing.VideoBytes].

    :param audio_tensor: AudioTensor containing the video's soundtrack.
    :param video_frame_rate: video frames per second.
    :param video_codec: the name of a video decoder/encoder.
    :param audio_frame_rate: audio frames per second.
    :param audio_codec: the name of an audio decoder/encoder.
    :param audio_format: the name of one of the audio formats supported by PyAV,
        such as 'flt', 'fltp', 's16' or 's16p'.

    :return: a VideoBytes object
    """
    from docarray.typing.bytes.video_bytes import VideoBytes

    bytes = BytesIO()
    self.save(
        file_path=bytes,
        audio_tensor=audio_tensor,
        video_frame_rate=video_frame_rate,
        video_codec=video_codec,
        audio_frame_rate=audio_frame_rate,
        audio_codec=audio_codec,
        audio_format=audio_format,
    )
    return VideoBytes(bytes.getvalue())

to_protobuf()

Transform self into an NdArrayProto protobuf message.

Source code in docarray/typing/tensor/tensorflow_tensor.py
def to_protobuf(self) -> 'NdArrayProto':
    """
    Transform self into an NdArrayProto protobuf message.
    """
    from docarray.proto import NdArrayProto

    nd_proto = NdArrayProto()

    value_np = self.tensor.numpy()
    nd_proto.dense.buffer = value_np.tobytes()
    nd_proto.dense.ClearField('shape')
    nd_proto.dense.shape.extend(list(value_np.shape))
    nd_proto.dense.dtype = value_np.dtype.str

    return nd_proto

unwrap()

Return the original tf.Tensor without any memory copy.

The original view rest intact and is still a Document TensorFlowTensor but the return object is a pure tf.Tensor but both object share the same memory layout.


from docarray.typing import TensorFlowTensor
import tensorflow as tf

t1 = TensorFlowTensor.validate(tf.zeros((3, 224, 224)), None, None)
# here t1 is a docarray TensorFlowTensor
t2 = t1.unwrap()
# here t2 is a pure tf.Tensor but t1 is still a Docarray TensorFlowTensor

Returns:

Type Description
Tensor

a tf.Tensor

Source code in docarray/typing/tensor/tensorflow_tensor.py
def unwrap(self) -> tf.Tensor:
    """
    Return the original `tf.Tensor` without any memory copy.

    The original view rest intact and is still a Document `TensorFlowTensor`
    but the return object is a pure `tf.Tensor` but both object share
    the same memory layout.

    ---

    ```python
    from docarray.typing import TensorFlowTensor
    import tensorflow as tf

    t1 = TensorFlowTensor.validate(tf.zeros((3, 224, 224)), None, None)
    # here t1 is a docarray TensorFlowTensor
    t2 = t1.unwrap()
    # here t2 is a pure tf.Tensor but t1 is still a Docarray TensorFlowTensor
    ```

    ---
    :return: a `tf.Tensor`
    """
    return self.tensor

docarray.typing.tensor.video.video_torch_tensor

VideoTorchTensor

Bases: TorchTensor, VideoTensorMixin

Subclass of TorchTensor, to represent a video tensor. Adds video-specific features to the tensor.


from typing import Optional

import torch

from docarray import BaseDoc
from docarray.typing import VideoTorchTensor, VideoUrl


class MyVideoDoc(BaseDoc):
    title: str
    url: Optional[VideoUrl] = None
    video_tensor: Optional[VideoTorchTensor] = None


doc_1 = MyVideoDoc(
    title='my_first_video_doc',
    video_tensor=torch.randn(size=(100, 224, 224, 3)),
)
# doc_1.video_tensor.save(file_path='file_1.mp4')

doc_2 = MyVideoDoc(
    title='my_second_video_doc',
    url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true',
)

doc_2.video_tensor = doc_2.url.load().video
# doc_2.video_tensor.save(file_path='file_2.wav')

Source code in docarray/typing/tensor/video/video_torch_tensor.py
@_register_proto(proto_type_name='video_torch_tensor')
class VideoTorchTensor(TorchTensor, VideoTensorMixin, metaclass=metaTorchAndNode):
    """
    Subclass of [`TorchTensor`][docarray.typing.TorchTensor], to represent a video tensor.
    Adds video-specific features to the tensor.

    ---

    ```python
    from typing import Optional

    import torch

    from docarray import BaseDoc
    from docarray.typing import VideoTorchTensor, VideoUrl


    class MyVideoDoc(BaseDoc):
        title: str
        url: Optional[VideoUrl] = None
        video_tensor: Optional[VideoTorchTensor] = None


    doc_1 = MyVideoDoc(
        title='my_first_video_doc',
        video_tensor=torch.randn(size=(100, 224, 224, 3)),
    )
    # doc_1.video_tensor.save(file_path='file_1.mp4')

    doc_2 = MyVideoDoc(
        title='my_second_video_doc',
        url='https://github.com/docarray/docarray/blob/main/tests/toydata/mov_bbb.mp4?raw=true',
    )

    doc_2.video_tensor = doc_2.url.load().video
    # doc_2.video_tensor.save(file_path='file_2.wav')
    ```

    ---

    """

    @classmethod
    def _docarray_validate(
        cls: Type[T],
        value: Union[T, np.ndarray, List[Any], Tuple[Any], Any],
    ) -> T:
        tensor = super()._docarray_validate(value=value)
        return cls.validate_shape(value=tensor)

__deepcopy__(memo)

Custom implementation of deepcopy for TorchTensor to avoid storage sharing issues.

Source code in docarray/typing/tensor/torch_tensor.py
def __deepcopy__(self, memo):
    """
    Custom implementation of deepcopy for TorchTensor to avoid storage sharing issues.
    """
    # Create a new tensor with the same data and properties
    new_tensor = self.clone()
    # Set the class to the custom TorchTensor class
    new_tensor.__class__ = self.__class__
    return new_tensor

__docarray_validate_getitem__(item) classmethod

This method validates the input to AbstractTensor.__class_getitem__.

It is called at "class creation time", i.e. when a class is created with syntax of the form AnyTensor[shape].

The default implementation tries to cast any item to a tuple of ints. A subclass can override this method to implement custom validation logic.

The output of this is eventually passed to AbstractTensor.__docarray_validate_shape__ as its shape argument.

Raises ValueError if the input item does not pass validation.

Parameters:

Name Type Description Default
item Any

The item to validate, passed to __class_getitem__ (Tensor[item]).

required

Returns:

Type Description
Tuple[int]

The validated item == the target shape of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@classmethod
def __docarray_validate_getitem__(cls, item: Any) -> Tuple[int]:
    """This method validates the input to `AbstractTensor.__class_getitem__`.

    It is called at "class creation time",
    i.e. when a class is created with syntax of the form AnyTensor[shape].

    The default implementation tries to cast any `item` to a tuple of ints.
    A subclass can override this method to implement custom validation logic.

    The output of this is eventually passed to
    [`AbstractTensor.__docarray_validate_shape__`]
    [docarray.typing.tensor.abstract_tensor.AbstractTensor.__docarray_validate_shape__]
    as its `shape` argument.

    Raises `ValueError` if the input `item` does not pass validation.

    :param item: The item to validate, passed to `__class_getitem__` (`Tensor[item]`).
    :return: The validated item == the target shape of this tensor.
    """
    if isinstance(item, int):
        item = (item,)
    try:
        item = tuple(item)
    except TypeError:
        raise TypeError(f'{item} is not a valid tensor shape.')
    return item

__docarray_validate_shape__(t, shape) classmethod

Every tensor has to implement this method in order to enable syntax of the form AnyTensor[shape]. It is called when a tensor is assigned to a field of this type. i.e. when a tensor is passed to a Document field of type AnyTensor[shape].

The intended behaviour is as follows:

  • If the shape of t is equal to shape, return t.
  • If the shape of t is not equal to shape, but can be reshaped to shape, return t reshaped to shape.
  • If the shape of t is not equal to shape and cannot be reshaped to shape, raise a ValueError.

Parameters:

Name Type Description Default
t T

The tensor to validate.

required
shape Tuple[Union[int, str], ...]

The shape to validate against.

required

Returns:

Type Description
T

The validated tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@classmethod
def __docarray_validate_shape__(cls, t: T, shape: Tuple[Union[int, str], ...]) -> T:
    """Every tensor has to implement this method in order to
    enable syntax of the form AnyTensor[shape].
    It is called when a tensor is assigned to a field of this type.
    i.e. when a tensor is passed to a Document field of type AnyTensor[shape].

    The intended behaviour is as follows:

    - If the shape of `t` is equal to `shape`, return `t`.
    - If the shape of `t` is not equal to `shape`,
        but can be reshaped to `shape`, return `t` reshaped to `shape`.
    - If the shape of `t` is not equal to `shape`
        and cannot be reshaped to `shape`, raise a ValueError.

    :param t: The tensor to validate.
    :param shape: The shape to validate against.
    :return: The validated tensor.
    """
    comp_be = t.get_comp_backend()
    tshape = comp_be.shape(t)
    if tshape == shape:
        return t
    elif any(isinstance(dim, str) or dim == Ellipsis for dim in shape):
        ellipsis_occurrences = [
            pos for pos, dim in enumerate(shape) if dim == Ellipsis
        ]
        if ellipsis_occurrences:
            if len(ellipsis_occurrences) > 1:
                raise ValueError(
                    f'Cannot use Ellipsis (...) more than once for the shape {shape}'
                )
            ellipsis_pos = ellipsis_occurrences[0]
            # Calculate how many dimensions to add. Should be at least 1.
            dimensions_needed = max(len(tshape) - len(shape) + 1, 1)
            shape = (
                shape[:ellipsis_pos]
                + tuple(
                    f'__dim_var_{index}__' for index in range(dimensions_needed)
                )
                + shape[ellipsis_pos + 1 :]
            )

        if len(tshape) != len(shape):
            raise ValueError(
                f'Tensor shape mismatch. Expected {shape}, got {tshape}'
            )
        known_dims: Dict[str, int] = {}
        for tdim, dim in zip(tshape, shape):
            if isinstance(dim, int) and tdim != dim:
                raise ValueError(
                    f'Tensor shape mismatch. Expected {shape}, got {tshape}'
                )
            elif isinstance(dim, str):
                if dim in known_dims and known_dims[dim] != tdim:
                    raise ValueError(
                        f'Tensor shape mismatch. Expected {shape}, got {tshape}'
                    )
                else:
                    known_dims[dim] = tdim
        else:
            return t
    else:
        shape = cast(Tuple[int], shape)
        warnings.warn(
            f'Tensor shape mismatch. Reshaping tensor '
            f'of shape {tshape} to shape {shape}'
        )
        try:
            value = cls._docarray_from_native(comp_be.reshape(t, shape))
            return cast(T, value)
        except RuntimeError:
            raise ValueError(
                f'Cannot reshape tensor of shape {tshape} to shape {shape}'
            )

__getitem__(item) abstractmethod

Get a slice of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@abc.abstractmethod
def __getitem__(self: T, item) -> T:
    """Get a slice of this tensor."""
    ...

__iter__() abstractmethod

Iterate over the elements of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@abc.abstractmethod
def __iter__(self):
    """Iterate over the elements of this tensor."""
    ...

__setitem__(index, value) abstractmethod

Set a slice of this tensor.

Source code in docarray/typing/tensor/abstract_tensor.py
@abc.abstractmethod
def __setitem__(self, index, value):
    """Set a slice of this tensor."""
    ...

display(audio=None)

Display video data from tensor in notebook.

Parameters:

Name Type Description Default
audio Optional[AudioTensor]

sound to play with video tensor

None
Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def display(self, audio: Optional[AudioTensor] = None) -> None:
    """
    Display video data from tensor in notebook.

    :param audio: sound to play with video tensor
    """
    if is_notebook():
        from IPython.display import Video, display

        b = self.to_bytes(audio_tensor=audio)
        display(Video(data=b, embed=True, mimetype='video/mp4'))
    else:
        warnings.warn('Display of video is only possible in a notebook.')

from_ndarray(value) classmethod

Create a TorchTensor from a numpy array

Parameters:

Name Type Description Default
value ndarray

the numpy array

required

Returns:

Type Description
T

a TorchTensor

Source code in docarray/typing/tensor/torch_tensor.py
@classmethod
def from_ndarray(cls: Type[T], value: np.ndarray) -> T:
    """Create a `TorchTensor` from a numpy array

    :param value: the numpy array
    :return: a `TorchTensor`
    """
    return cls._docarray_from_native(torch.from_numpy(value))

from_protobuf(pb_msg) classmethod

Read ndarray from a proto msg

Parameters:

Name Type Description Default
pb_msg NdArrayProto
required

Returns:

Type Description
T

a TorchTensor

Source code in docarray/typing/tensor/torch_tensor.py
@classmethod
def from_protobuf(cls: Type[T], pb_msg: 'NdArrayProto') -> 'T':
    """
    Read ndarray from a proto msg
    :param pb_msg:
    :return: a `TorchTensor`
    """
    source = pb_msg.dense
    if source.buffer:
        x = np.frombuffer(bytearray(source.buffer), dtype=source.dtype)
        return cls.from_ndarray(x.reshape(source.shape))
    elif len(source.shape) > 0:
        return cls.from_ndarray(np.zeros(source.shape))
    else:
        raise ValueError(f'proto message {pb_msg} cannot be cast to a TorchTensor')

get_comp_backend() staticmethod

Return the computational backend of the tensor

Source code in docarray/typing/tensor/torch_tensor.py
@staticmethod
def get_comp_backend() -> 'TorchCompBackend':
    """Return the computational backend of the tensor"""
    from docarray.computation.torch_backend import TorchCompBackend

    return TorchCompBackend()

new_empty(*args, **kwargs)

This method enables the deepcopy of TorchTensor by returning another instance of this subclass. If this function is not implemented, the deepcopy will throw an RuntimeError from Torch.

Source code in docarray/typing/tensor/torch_tensor.py
def new_empty(self, *args, **kwargs):
    """
    This method enables the deepcopy of `TorchTensor` by returning another instance of this subclass.
    If this function is not implemented, the deepcopy will throw an RuntimeError from Torch.
    """
    return self.__class__(*args, **kwargs)

save(file_path, audio_tensor=None, video_frame_rate=24, video_codec='h264', audio_frame_rate=48000, audio_codec='aac', audio_format='fltp')

Save video tensor to a .mp4 file.


import numpy as np

from docarray import BaseDoc
from docarray.typing.tensor.audio.audio_tensor import AudioTensor
from docarray.typing.tensor.video.video_tensor import VideoTensor


class MyDoc(BaseDoc):
    video_tensor: VideoTensor
    audio_tensor: AudioTensor


doc = MyDoc(
    video_tensor=np.random.randint(low=0, high=256, size=(10, 200, 300, 3)),
    audio_tensor=np.random.randn(100, 1, 1024).astype("float32"),
)

doc.video_tensor.save(
    file_path="/tmp/mp_.mp4",
    audio_tensor=doc.audio_tensor,
    audio_format="flt",
)

Parameters:

Name Type Description Default
file_path Union[str, BytesIO]

path to a .mp4 file. If file is a string, open the file by that name, otherwise treat it as a file-like object.

required
audio_tensor Optional[AudioTensor]

AudioTensor containing the video's soundtrack.

None
video_frame_rate int

video frames per second.

24
video_codec str

the name of a video decoder/encoder.

'h264'
audio_frame_rate int

audio frames per second.

48000
audio_codec str

the name of an audio decoder/encoder.

'aac'
audio_format str

the name of one of the audio formats supported by PyAV, such as 'flt', 'fltp', 's16' or 's16p'.

'fltp'
Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def save(
    self: 'T',
    file_path: Union[str, BytesIO],
    audio_tensor: Optional[AudioTensor] = None,
    video_frame_rate: int = 24,
    video_codec: str = 'h264',
    audio_frame_rate: int = 48000,
    audio_codec: str = 'aac',
    audio_format: str = 'fltp',
) -> None:
    """
    Save video tensor to a .mp4 file.

    ---

    ```python
    import numpy as np

    from docarray import BaseDoc
    from docarray.typing.tensor.audio.audio_tensor import AudioTensor
    from docarray.typing.tensor.video.video_tensor import VideoTensor


    class MyDoc(BaseDoc):
        video_tensor: VideoTensor
        audio_tensor: AudioTensor


    doc = MyDoc(
        video_tensor=np.random.randint(low=0, high=256, size=(10, 200, 300, 3)),
        audio_tensor=np.random.randn(100, 1, 1024).astype("float32"),
    )

    doc.video_tensor.save(
        file_path="/tmp/mp_.mp4",
        audio_tensor=doc.audio_tensor,
        audio_format="flt",
    )
    ```

    ---
    :param file_path: path to a .mp4 file. If file is a string, open the file by
        that name, otherwise treat it as a file-like object.
    :param audio_tensor: AudioTensor containing the video's soundtrack.
    :param video_frame_rate: video frames per second.
    :param video_codec: the name of a video decoder/encoder.
    :param audio_frame_rate: audio frames per second.
    :param audio_codec: the name of an audio decoder/encoder.
    :param audio_format: the name of one of the audio formats supported by PyAV,
        such as 'flt', 'fltp', 's16' or 's16p'.
    """
    if TYPE_CHECKING:
        import av
    else:
        av = import_library('av', raise_error=True)

    np_tensor = self.get_comp_backend().to_numpy(array=self)
    video_tensor = np_tensor.astype('uint8')

    if isinstance(file_path, str):
        format = file_path.split('.')[-1]
    else:
        format = 'mp4'

    with av.open(file_path, mode='w', format=format) as container:
        if video_tensor.ndim == 3:
            video_tensor = np.expand_dims(video_tensor, axis=0)

        stream_video = container.add_stream(video_codec, rate=video_frame_rate)
        stream_video.height = video_tensor.shape[-3]
        stream_video.width = video_tensor.shape[-2]

        if audio_tensor is not None:
            stream_audio = container.add_stream(audio_codec)
            audio_np = audio_tensor.get_comp_backend().to_numpy(array=audio_tensor)
            audio_layout = 'stereo' if audio_np.shape[-2] == 2 else 'mono'

            for i, audio in enumerate(audio_np):
                frame = av.AudioFrame.from_ndarray(
                    array=audio, format=audio_format, layout=audio_layout
                )
                frame.rate = audio_frame_rate
                frame.pts = audio.shape[-1] * i
                for packet in stream_audio.encode(frame):
                    container.mux(packet)

            for packet in stream_audio.encode(None):
                container.mux(packet)

        for vid in video_tensor:
            frame = av.VideoFrame.from_ndarray(vid, format='rgb24')
            for packet in stream_video.encode(frame):
                container.mux(packet)

        for packet in stream_video.encode(None):
            container.mux(packet)

to_bytes(audio_tensor=None, video_frame_rate=24, video_codec='h264', audio_frame_rate=48000, audio_codec='aac', audio_format='fltp')

Convert video tensor to VideoBytes.

Parameters:

Name Type Description Default
audio_tensor Optional[AudioTensor]

AudioTensor containing the video's soundtrack.

None
video_frame_rate int

video frames per second.

24
video_codec str

the name of a video decoder/encoder.

'h264'
audio_frame_rate int

audio frames per second.

48000
audio_codec str

the name of an audio decoder/encoder.

'aac'
audio_format str

the name of one of the audio formats supported by PyAV, such as 'flt', 'fltp', 's16' or 's16p'.

'fltp'

Returns:

Type Description
VideoBytes

a VideoBytes object

Source code in docarray/typing/tensor/video/video_tensor_mixin.py
def to_bytes(
    self: 'T',
    audio_tensor: Optional[AudioTensor] = None,
    video_frame_rate: int = 24,
    video_codec: str = 'h264',
    audio_frame_rate: int = 48000,
    audio_codec: str = 'aac',
    audio_format: str = 'fltp',
) -> 'VideoBytes':
    """
    Convert video tensor to [`VideoBytes`][docarray.typing.VideoBytes].

    :param audio_tensor: AudioTensor containing the video's soundtrack.
    :param video_frame_rate: video frames per second.
    :param video_codec: the name of a video decoder/encoder.
    :param audio_frame_rate: audio frames per second.
    :param audio_codec: the name of an audio decoder/encoder.
    :param audio_format: the name of one of the audio formats supported by PyAV,
        such as 'flt', 'fltp', 's16' or 's16p'.

    :return: a VideoBytes object
    """
    from docarray.typing.bytes.video_bytes import VideoBytes

    bytes = BytesIO()
    self.save(
        file_path=bytes,
        audio_tensor=audio_tensor,
        video_frame_rate=video_frame_rate,
        video_codec=video_codec,
        audio_frame_rate=audio_frame_rate,
        audio_codec=audio_codec,
        audio_format=audio_format,
    )
    return VideoBytes(bytes.getvalue())

to_protobuf()

Transform self into a NdArrayProto protobuf message

Source code in docarray/typing/tensor/torch_tensor.py
def to_protobuf(self) -> 'NdArrayProto':
    """
    Transform self into a `NdArrayProto` protobuf message
    """
    from docarray.proto import NdArrayProto

    nd_proto = NdArrayProto()

    value_np = self.detach().cpu().numpy()
    nd_proto.dense.buffer = value_np.tobytes()
    nd_proto.dense.ClearField('shape')
    nd_proto.dense.shape.extend(list(value_np.shape))
    nd_proto.dense.dtype = value_np.dtype.str

    return nd_proto

unwrap()

Return the original torch.Tensor without any memory copy.

The original view rest intact and is still a Document TorchTensor but the return object is a pure torch.Tensor but both object share the same memory layout.


from docarray.typing import TorchTensor
import torch
from pydantic import parse_obj_as


t = parse_obj_as(TorchTensor, torch.zeros(3, 224, 224))
# here t is a docarray TorchTensor
t2 = t.unwrap()
# here t2 is a pure torch.Tensor but t1 is still a Docarray TorchTensor
# But both share the same underlying memory

Returns:

Type Description
Tensor

a torch.Tensor

Source code in docarray/typing/tensor/torch_tensor.py
def unwrap(self) -> torch.Tensor:
    """
    Return the original `torch.Tensor` without any memory copy.

    The original view rest intact and is still a Document `TorchTensor`
    but the return object is a pure `torch.Tensor` but both object share
    the same memory layout.

    ---

    ```python
    from docarray.typing import TorchTensor
    import torch
    from pydantic import parse_obj_as


    t = parse_obj_as(TorchTensor, torch.zeros(3, 224, 224))
    # here t is a docarray TorchTensor
    t2 = t.unwrap()
    # here t2 is a pure torch.Tensor but t1 is still a Docarray TorchTensor
    # But both share the same underlying memory
    ```

    ---

    :return: a `torch.Tensor`
    """
    value = copy(self)  # as unintuitive as it sounds, this
    # does not do any relevant memory copying, just shallow
    # reference to the torch data
    value.__class__ = torch.Tensor  # type: ignore
    return value