Skip to content

DocList

docarray.array.doc_list.doc_list.DocList

Bases: ListAdvancedIndexing[T_doc], PushPullMixin, IOMixinDocList, AnyDocArray[T_doc]

DocList is a container of Documents.

A DocList is a list of Documents of any schema. However, many DocList features are only available if these Documents are homogeneous and follow the same schema. To precise this schema you can use the DocList[MyDocument] syntax where MyDocument is a Document class (i.e. schema). This creates a DocList that can only contains Documents of the type MyDocument.

from docarray import BaseDoc, DocList
from docarray.typing import NdArray, ImageUrl
from typing import Optional


class Image(BaseDoc):
    tensor: Optional[NdArray[100]] = None
    url: ImageUrl


docs = DocList[Image](
    Image(url='http://url.com/foo.png') for _ in range(10)
)  # noqa: E510


# If your DocList is homogeneous (i.e. follows the same schema), you can access
# fields at the DocList level (for example `docs.tensor` or `docs.url`).

print(docs.url)
# [ImageUrl('http://url.com/foo.png', host_type='domain'), ...]


# You can also set fields, with `docs.tensor = np.random.random([10, 100])`:


import numpy as np

docs.tensor = np.random.random([10, 100])

print(docs.tensor)
# [NdArray([0.11299577, 0.47206767, 0.481723  , 0.34754724, 0.15016037,
#          0.88861321, 0.88317666, 0.93845579, 0.60486676, ... ]), ...]


# You can index into a DocList like a numpy doc_list or torch tensor:

docs[0]  # index by position
docs[0:5:2]  # index by slice
docs[[0, 2, 3]]  # index by list of indices
docs[True, False, True, True, ...]  # index by boolean mask


# You can delete items from a DocList like a Python List

del docs[0]  # remove first element from DocList
del docs[0:5]  # remove elements for 0 to 5 from DocList

Note

If the DocList is homogeneous and its schema contains nested BaseDoc (i.e, BaseDoc inside a BaseDoc) where the nested Document is Optional, calling docs.nested_doc will return a List of the nested BaseDoc instead of DocList. This is because the nested field could be None and therefore could not fit into a DocList.

Parameters:

Name Type Description Default
docs Optional[Iterable[T_doc]]

iterable of Document

None
Source code in docarray/array/doc_list/doc_list.py
class DocList(
    ListAdvancedIndexing[T_doc],
    PushPullMixin,
    IOMixinDocList,
    AnyDocArray[T_doc],
):
    """
     DocList is a container of Documents.

    A DocList is a list of Documents of any schema. However, many
    DocList features are only available if these Documents are
    homogeneous and follow the same schema. To precise this schema you can use
    the `DocList[MyDocument]` syntax where MyDocument is a Document class
    (i.e. schema). This creates a DocList that can only contains Documents of
    the type `MyDocument`.


    ```python
    from docarray import BaseDoc, DocList
    from docarray.typing import NdArray, ImageUrl
    from typing import Optional


    class Image(BaseDoc):
        tensor: Optional[NdArray[100]] = None
        url: ImageUrl


    docs = DocList[Image](
        Image(url='http://url.com/foo.png') for _ in range(10)
    )  # noqa: E510


    # If your DocList is homogeneous (i.e. follows the same schema), you can access
    # fields at the DocList level (for example `docs.tensor` or `docs.url`).

    print(docs.url)
    # [ImageUrl('http://url.com/foo.png', host_type='domain'), ...]


    # You can also set fields, with `docs.tensor = np.random.random([10, 100])`:


    import numpy as np

    docs.tensor = np.random.random([10, 100])

    print(docs.tensor)
    # [NdArray([0.11299577, 0.47206767, 0.481723  , 0.34754724, 0.15016037,
    #          0.88861321, 0.88317666, 0.93845579, 0.60486676, ... ]), ...]


    # You can index into a DocList like a numpy doc_list or torch tensor:

    docs[0]  # index by position
    docs[0:5:2]  # index by slice
    docs[[0, 2, 3]]  # index by list of indices
    docs[True, False, True, True, ...]  # index by boolean mask


    # You can delete items from a DocList like a Python List

    del docs[0]  # remove first element from DocList
    del docs[0:5]  # remove elements for 0 to 5 from DocList
    ```

    !!! note
        If the DocList is homogeneous and its schema contains nested BaseDoc
        (i.e, BaseDoc inside a BaseDoc) where the nested Document is `Optional`, calling
        `docs.nested_doc` will return a List of the nested BaseDoc instead of DocList.
        This is because the nested field could be None and therefore could not fit into
        a DocList.

    :param docs: iterable of Document

    """

    doc_type: Type[BaseDocWithoutId] = AnyDoc

    def __init__(
        self,
        docs: Optional[Iterable[T_doc]] = None,
        validate_input_docs: bool = True,
    ):
        if validate_input_docs:
            docs = self._validate_docs(docs) if docs else []
        else:
            docs = docs if docs else []
        super().__init__(docs)

    @classmethod
    def construct(
        cls: Type[T],
        docs: Sequence[T_doc],
    ) -> T:
        """
        Create a `DocList` without validation any data. The data must come from a
        trusted source
        :param docs: a Sequence (list) of Document with the same schema
        :return: a `DocList` object
        """
        return cls(docs, False)

    def __eq__(self, other: Any) -> bool:
        if self.__len__() != other.__len__():
            return False
        for doc_self, doc_other in zip(self, other):
            if doc_self != doc_other:
                return False
        return True

    def _validate_docs(self, docs: Iterable[T_doc]) -> Iterable[T_doc]:
        """
        Validate if an Iterable of Document are compatible with this `DocList`
        """
        for doc in docs:
            yield self._validate_one_doc(doc)

    def _validate_one_doc(self, doc: T_doc) -> T_doc:
        """Validate if a Document is compatible with this `DocList`"""
        if not safe_issubclass(self.doc_type, AnyDoc) and not isinstance(
            doc, self.doc_type
        ):
            raise ValueError(f'{doc} is not a {self.doc_type}')
        return doc

    def __bytes__(self) -> bytes:
        with io.BytesIO() as bf:
            self._write_bytes(bf=bf)
            return bf.getvalue()

    def append(self, doc: T_doc):
        """
        Append a Document to the `DocList`. The Document must be from the same class
        as the `.doc_type` of this `DocList` otherwise it will fail.
        :param doc: A Document
        """
        return super().append(self._validate_one_doc(doc))

    def extend(self, docs: Iterable[T_doc]):
        """
        Extend a `DocList` with an Iterable of Document. The Documents must be from
        the same class as the `.doc_type` of this `DocList` otherwise it will
        fail.
        :param docs: Iterable of Documents
        """
        it: Iterable[T_doc] = list()
        if self is docs:
            # see https://github.com/docarray/docarray/issues/1489
            it = list(docs)
        else:
            it = self._validate_docs(docs)

        return super().extend(it)

    def insert(self, i: SupportsIndex, doc: T_doc):
        """
        Insert a Document to the `DocList`. The Document must be from the same
        class as the doc_type of this `DocList` otherwise it will fail.
        :param i: index to insert
        :param doc: A Document
        """
        super().insert(i, self._validate_one_doc(doc))

    def _get_data_column(
        self: T,
        field: str,
    ) -> Union[MutableSequence, T, 'TorchTensor', 'NdArray']:
        """Return all v  @classmethod
          def __class_getitem__(cls, item: Union[Type[BaseDoc], TypeVar, str]):alues of the fields from all docs this doc_list contains
        @classmethod
          def __class_getitem__(cls, item: Union[Type[BaseDoc], TypeVar, str]):
              :param field: name of the fields to extract
              :return: Returns a list of the field value for each document
              in the doc_list like container
        """
        field_type = self.__class__.doc_type._get_field_annotation(field)
        field_info = self.__class__.doc_type._docarray_fields()[field]
        is_field_required = (
            field_info.is_required() if is_pydantic_v2 else field_info.required
        )

        if (
            not is_union_type(field_type)
            and is_field_required
            and isinstance(field_type, type)
            and safe_issubclass(field_type, BaseDocWithoutId)
        ):
            # calling __class_getitem__ ourselves is a hack otherwise mypy complain
            # most likely a bug in mypy though
            # bug reported here https://github.com/python/mypy/issues/14111
            return DocList.__class_getitem__(field_type)(
                (getattr(doc, field) for doc in self),
            )
        else:
            return [getattr(doc, field) for doc in self]

    def _set_data_column(
        self: T,
        field: str,
        values: Union[List, T, 'AbstractTensor'],
    ):
        """Set all Documents in this `DocList` using the passed values

        :param field: name of the fields to set
        :values: the values to set at the `DocList` level
        """
        ...

        for doc, value in zip(self, values):
            setattr(doc, field, value)

    def to_doc_vec(
        self,
        tensor_type: Type['AbstractTensor'] = NdArray,
    ) -> 'DocVec':
        """
        Convert the `DocList` into a `DocVec`. `Self` cannot be used
        afterward
        :param tensor_type: Tensor Class used to wrap the doc_vec tensors. This is useful
        if the BaseDoc has some undefined tensor type like AnyTensor or Union of NdArray and TorchTensor
        :return: A `DocVec` of the same document type as self
        """
        from docarray.array.doc_vec.doc_vec import DocVec

        return DocVec.__class_getitem__(self.doc_type)(self, tensor_type=tensor_type)

    @classmethod
    def _docarray_validate(
        cls: Type[T],
        value: Union[T, Iterable[BaseDocWithoutId]],
    ):
        from docarray.array.doc_vec.doc_vec import DocVec

        if isinstance(value, cls):
            return value
        elif isinstance(value, DocVec):
            if (
                safe_issubclass(value.doc_type, cls.doc_type)
                or value.doc_type == cls.doc_type
            ):
                return cast(T, value.to_doc_list())
            else:
                raise ValueError(
                    f'DocList[value.doc_type] is not compatible with {cls}'
                )
        elif isinstance(value, cls):
            return cls(value)
        elif isinstance(value, Iterable):
            docs = []
            for doc in value:
                docs.append(parse_obj_as(cls.doc_type, doc))
            return cls(docs)
        else:
            raise TypeError(f'Expecting an Iterable of {cls.doc_type}')

    def traverse_flat(
        self: 'DocList',
        access_path: str,
    ) -> List[Any]:
        nodes = list(AnyDocArray._traverse(node=self, access_path=access_path))
        flattened = AnyDocArray._flatten_one_level(nodes)

        return flattened

    @classmethod
    def from_protobuf(cls: Type[T], pb_msg: 'DocListProto') -> T:
        """create a Document from a protobuf message
        :param pb_msg: The protobuf message from where to construct the `DocList`
        """
        return super().from_protobuf(pb_msg)

    @classmethod
    def _get_proto_class(cls: Type[T]):
        from docarray.proto import DocListProto

        return DocListProto

    @overload
    def __getitem__(self, item: SupportsIndex) -> T_doc:
        ...

    @overload
    def __getitem__(self: T, item: IndexIterType) -> T:
        ...

    def __getitem__(self, item):
        return super().__getitem__(item)

    @classmethod
    def __class_getitem__(cls, item: Union[Type[BaseDocWithoutId], TypeVar, str]):
        if cls.doc_type != AnyDoc:
            raise TypeError(f'{cls} object is not subscriptable')

        if isinstance(item, type) and safe_issubclass(item, BaseDocWithoutId):
            return AnyDocArray.__class_getitem__.__func__(cls, item)  # type: ignore
        if (
            isinstance(item, object)
            and not is_typevar(item)
            and not isinstance(item, str)
            and item is not Any
        ):
            raise TypeError('Expecting a type, got object instead')

        return super().__class_getitem__(item)

    def __repr__(self):
        return AnyDocArray.__repr__(self)  # type: ignore

    if is_pydantic_v2:

        @classmethod
        def __get_pydantic_core_schema__(
            cls, _source_type: Any, _handler: GetCoreSchemaHandler
        ) -> core_schema.CoreSchema:
            return core_schema.general_plain_validator_function(
                cls.validate,
            )

append(doc)

Append a Document to the DocList. The Document must be from the same class as the .doc_type of this DocList otherwise it will fail.

Parameters:

Name Type Description Default
doc T_doc

A Document

required
Source code in docarray/array/doc_list/doc_list.py
def append(self, doc: T_doc):
    """
    Append a Document to the `DocList`. The Document must be from the same class
    as the `.doc_type` of this `DocList` otherwise it will fail.
    :param doc: A Document
    """
    return super().append(self._validate_one_doc(doc))

construct(docs) classmethod

Create a DocList without validation any data. The data must come from a trusted source

Parameters:

Name Type Description Default
docs Sequence[T_doc]

a Sequence (list) of Document with the same schema

required

Returns:

Type Description
T

a DocList object

Source code in docarray/array/doc_list/doc_list.py
@classmethod
def construct(
    cls: Type[T],
    docs: Sequence[T_doc],
) -> T:
    """
    Create a `DocList` without validation any data. The data must come from a
    trusted source
    :param docs: a Sequence (list) of Document with the same schema
    :return: a `DocList` object
    """
    return cls(docs, False)

extend(docs)

Extend a DocList with an Iterable of Document. The Documents must be from the same class as the .doc_type of this DocList otherwise it will fail.

Parameters:

Name Type Description Default
docs Iterable[T_doc]

Iterable of Documents

required
Source code in docarray/array/doc_list/doc_list.py
def extend(self, docs: Iterable[T_doc]):
    """
    Extend a `DocList` with an Iterable of Document. The Documents must be from
    the same class as the `.doc_type` of this `DocList` otherwise it will
    fail.
    :param docs: Iterable of Documents
    """
    it: Iterable[T_doc] = list()
    if self is docs:
        # see https://github.com/docarray/docarray/issues/1489
        it = list(docs)
    else:
        it = self._validate_docs(docs)

    return super().extend(it)

from_base64(data, protocol='protobuf-array', compress=None, show_progress=False) classmethod

Deserialize base64 strings into a DocList.

Parameters:

Name Type Description Default
data str

Base64 string to deserialize

required
protocol ProtocolType

protocol that was used to serialize

'protobuf-array'
compress Optional[str]

compress algorithm that was used to serialize between lz4, bz2, lzma, zlib, gzip

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False

Returns:

Type Description
T

the deserialized DocList

Source code in docarray/array/doc_list/io.py
@classmethod
def from_base64(
    cls: Type[T],
    data: str,
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    show_progress: bool = False,
) -> T:
    """Deserialize base64 strings into a `DocList`.

    :param data: Base64 string to deserialize
    :param protocol: protocol that was used to serialize
    :param compress: compress algorithm that was used to serialize between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    :return: the deserialized `DocList`
    """
    return cls._load_binary_all(
        file_ctx=nullcontext(base64.b64decode(data)),
        protocol=protocol,
        compress=compress,
        show_progress=show_progress,
    )

from_bytes(data, protocol='protobuf-array', compress=None, show_progress=False) classmethod

Deserialize bytes into a DocList.

Parameters:

Name Type Description Default
data bytes

Bytes from which to deserialize

required
protocol ProtocolType

protocol that was used to serialize

'protobuf-array'
compress Optional[str]

compression algorithm that was used to serialize between lz4, bz2, lzma, zlib, gzip

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False

Returns:

Type Description
T

the deserialized DocList

Source code in docarray/array/doc_list/io.py
@classmethod
def from_bytes(
    cls: Type[T],
    data: bytes,
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    show_progress: bool = False,
) -> T:
    """Deserialize bytes into a `DocList`.

    :param data: Bytes from which to deserialize
    :param protocol: protocol that was used to serialize
    :param compress: compression algorithm that was used to serialize between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    :return: the deserialized `DocList`
    """
    return cls._load_binary_all(
        file_ctx=nullcontext(data),
        protocol=protocol,
        compress=compress,
        show_progress=show_progress,
    )

from_csv(file_path, encoding='utf-8', dialect='excel') classmethod

Load a DocList from a csv file following the schema defined in the .doc_type attribute. Every row of the csv file will be mapped to one document in the doc_list. The column names (defined in the first row) have to match the field names of the Document type. For nested fields use "__"-separated access paths, such as 'image__url'.

List-like fields (including field of type DocList) are not supported.

Parameters:

Name Type Description Default
file_path str

path to csv file to load DocList from.

required
encoding str

encoding used to read the csv file. Defaults to 'utf-8'.

'utf-8'
dialect Union[str, Dialect]

defines separator and how to handle whitespaces etc. Can be a csv.Dialect instance or one string of: 'excel' (for comma separated values), 'excel-tab' (for tab separated values), 'unix' (for csv file generated on UNIX systems).

'excel'

Returns:

Type Description
T

DocList object

Source code in docarray/array/doc_list/io.py
@classmethod
def from_csv(
    cls: Type['T'],
    file_path: str,
    encoding: str = 'utf-8',
    dialect: Union[str, csv.Dialect] = 'excel',
) -> 'T':
    """
    Load a DocList from a csv file following the schema defined in the
    [`.doc_type`][docarray.DocList] attribute.
    Every row of the csv file will be mapped to one document in the doc_list.
    The column names (defined in the first row) have to match the field names
    of the Document type.
    For nested fields use "__"-separated access paths, such as `'image__url'`.

    List-like fields (including field of type DocList) are not supported.

    :param file_path: path to csv file to load DocList from.
    :param encoding: encoding used to read the csv file. Defaults to 'utf-8'.
    :param dialect: defines separator and how to handle whitespaces etc.
        Can be a [`csv.Dialect`](https://docs.python.org/3/library/csv.html#csv.Dialect)
        instance or one string of:
        `'excel'` (for comma separated values),
        `'excel-tab'` (for tab separated values),
        `'unix'` (for csv file generated on UNIX systems).

    :return: `DocList` object
    """
    if cls.doc_type == AnyDoc or cls.doc_type == BaseDoc:
        raise TypeError(
            'There is no document schema defined. '
            f'Please specify the {cls}\'s Document type using `{cls}[MyDoc]`.'
        )

    if file_path.startswith('http'):
        import urllib.request

        with urllib.request.urlopen(file_path) as f:
            file = StringIO(f.read().decode(encoding))
            return cls._from_csv_file(file, dialect)
    else:
        with open(file_path, 'r', encoding=encoding) as fp:
            return cls._from_csv_file(fp, dialect)

from_dataframe(df) classmethod

Load a DocList from a pandas.DataFrame following the schema defined in the .doc_type attribute. Every row of the dataframe will be mapped to one Document in the doc_list. The column names of the dataframe have to match the field names of the Document type. For nested fields use "__"-separated access paths as column names, such as 'image__url'.

List-like fields (including field of type DocList) are not supported.


import pandas as pd

from docarray import BaseDoc, DocList


class Person(BaseDoc):
    name: str
    follower: int


df = pd.DataFrame(
    data=[['Maria', 12345], ['Jake', 54321]], columns=['name', 'follower']
)

docs = DocList[Person].from_dataframe(df)

assert docs.name == ['Maria', 'Jake']
assert docs.follower == [12345, 54321]

Parameters:

Name Type Description Default
df DataFrame

pandas.DataFrame to extract Document's information from

required

Returns:

Type Description
T

DocList where each Document contains the information of one corresponding row of the pandas.DataFrame.

Source code in docarray/array/doc_list/io.py
@classmethod
def from_dataframe(cls: Type['T'], df: 'pd.DataFrame') -> 'T':
    """
    Load a `DocList` from a `pandas.DataFrame` following the schema
    defined in the [`.doc_type`][docarray.DocList] attribute.
    Every row of the dataframe will be mapped to one Document in the doc_list.
    The column names of the dataframe have to match the field names of the
    Document type.
    For nested fields use "__"-separated access paths as column names,
    such as `'image__url'`.

    List-like fields (including field of type DocList) are not supported.

    ---

    ```python
    import pandas as pd

    from docarray import BaseDoc, DocList


    class Person(BaseDoc):
        name: str
        follower: int


    df = pd.DataFrame(
        data=[['Maria', 12345], ['Jake', 54321]], columns=['name', 'follower']
    )

    docs = DocList[Person].from_dataframe(df)

    assert docs.name == ['Maria', 'Jake']
    assert docs.follower == [12345, 54321]
    ```

    ---

    :param df: `pandas.DataFrame` to extract Document's information from
    :return: `DocList` where each Document contains the information of one
        corresponding row of the `pandas.DataFrame`.
    """
    from docarray import DocList

    if cls.doc_type == AnyDoc or cls.doc_type == BaseDoc:
        raise TypeError(
            'There is no document schema defined. '
            f'Please specify the {cls}\'s Document type using `{cls}[MyDoc]`.'
        )

    doc_type = cls.doc_type
    docs = DocList.__class_getitem__(doc_type)()
    field_names = df.columns.tolist()

    if field_names is None or len(field_names) == 0:
        raise TypeError("No field names are given.")

    valid_paths = _all_access_paths_valid(
        doc_type=doc_type, access_paths=field_names
    )
    if not all(valid_paths):
        raise ValueError(
            f'Column names do not match the schema of the DocList\'s '
            f'document type ({cls.doc_type.__name__}): '
            f'{list(compress(field_names, [not v for v in valid_paths]))}'
        )

    for row in df.itertuples():
        access_path2val = row._asdict()
        access_path2val.pop('index', None)
        doc_dict = _access_path_dict_to_nested_dict(access_path2val)
        docs.append(doc_type.parse_obj(doc_dict))

    return docs

from_json(file) classmethod

Deserialize JSON strings or bytes into a DocList.

Parameters:

Name Type Description Default
file Union[str, bytes, bytearray]

JSON object from where to deserialize a DocList

required

Returns:

Type Description
T

the deserialized DocList

Source code in docarray/array/doc_list/io.py
@classmethod
def from_json(
    cls: Type[T],
    file: Union[str, bytes, bytearray],
) -> T:
    """Deserialize JSON strings or bytes into a `DocList`.

    :param file: JSON object from where to deserialize a `DocList`
    :return: the deserialized `DocList`
    """
    json_docs = orjson.loads(file)
    return cls([cls.doc_type(**v) for v in json_docs])

from_protobuf(pb_msg) classmethod

create a Document from a protobuf message

Parameters:

Name Type Description Default
pb_msg DocListProto

The protobuf message from where to construct the DocList

required
Source code in docarray/array/doc_list/doc_list.py
@classmethod
def from_protobuf(cls: Type[T], pb_msg: 'DocListProto') -> T:
    """create a Document from a protobuf message
    :param pb_msg: The protobuf message from where to construct the `DocList`
    """
    return super().from_protobuf(pb_msg)

get_pushpull_backend(protocol) classmethod

Get the backend for the given protocol.

Parameters:

Name Type Description Default
protocol PUSH_PULL_PROTOCOL

the protocol to use, e.g. 'file', 's3'

required

Returns:

Type Description
Type[AbstractDocStore]

the backend class

Source code in docarray/array/doc_list/pushpull.py
@classmethod
def get_pushpull_backend(
    cls: Type[SelfPushPullMixin], protocol: PUSH_PULL_PROTOCOL
) -> Type['AbstractDocStore']:
    """
    Get the backend for the given protocol.

    :param protocol: the protocol to use, e.g. 'file', 's3'
    :return: the backend class
    """
    if protocol in cls.__backends__:
        return cls.__backends__[protocol]

    if protocol == 'file':
        from docarray.store.file import FileDocStore

        cls.__backends__[protocol] = FileDocStore
        logging.debug('Loaded Local Filesystem backend')
    elif protocol == 's3':
        from docarray.store.s3 import S3DocStore

        cls.__backends__[protocol] = S3DocStore
        logging.debug('Loaded S3 backend')
    else:
        raise NotImplementedError(f'protocol {protocol} not supported')

    return cls.__backends__[protocol]

insert(i, doc)

Insert a Document to the DocList. The Document must be from the same class as the doc_type of this DocList otherwise it will fail.

Parameters:

Name Type Description Default
i SupportsIndex

index to insert

required
doc T_doc

A Document

required
Source code in docarray/array/doc_list/doc_list.py
def insert(self, i: SupportsIndex, doc: T_doc):
    """
    Insert a Document to the `DocList`. The Document must be from the same
    class as the doc_type of this `DocList` otherwise it will fail.
    :param i: index to insert
    :param doc: A Document
    """
    super().insert(i, self._validate_one_doc(doc))

load_binary(file, protocol='protobuf-array', compress=None, show_progress=False, streaming=False) classmethod

Load doc_list elements from a compressed binary file.

In case protocol is pickle the Documents are streamed from disk to save memory usage

Note

If file is str it can specify protocol and compress as file extensions. This functionality assumes file=file_name.$protocol.$compress where $protocol and $compress refer to a string interpolation of the respective protocol and compress methods. For example if file=my_docarray.protobuf.lz4 then the binary data will be loaded assuming protocol=protobuf and compress=lz4.

Parameters:

Name Type Description Default
file Union[str, bytes, Path, BufferedReader, _LazyRequestReader]

File or filename or serialized bytes where the data is stored.

required
protocol ProtocolType

protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'

'protobuf-array'
compress Optional[str]

compress algorithm to use between lz4, bz2, lzma, zlib, gzip

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False
streaming bool

if True returns a generator over Document objects.

False

Returns:

Type Description
Union[T, Generator[T_doc, None, None]]

a DocList object

Source code in docarray/array/doc_list/io.py
@classmethod
def load_binary(
    cls: Type[T],
    file: Union[str, bytes, pathlib.Path, io.BufferedReader, _LazyRequestReader],
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    show_progress: bool = False,
    streaming: bool = False,
) -> Union[T, Generator['T_doc', None, None]]:
    """Load doc_list elements from a compressed binary file.

    In case protocol is pickle the `Documents` are streamed from disk to save memory usage

    !!! note
        If `file` is `str` it can specify `protocol` and `compress` as file extensions.
        This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a
        string interpolation of the respective `protocol` and `compress` methods.
        For example if `file=my_docarray.protobuf.lz4` then the binary data will be loaded assuming `protocol=protobuf`
        and `compress=lz4`.

    :param file: File or filename or serialized bytes where the data is stored.
    :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
    :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    :param streaming: if `True` returns a generator over `Document` objects.

    :return: a `DocList` object

    """
    file_ctx, load_protocol, load_compress = cls._get_file_context(
        file, protocol, compress
    )
    if streaming:
        if load_protocol not in SINGLE_PROTOCOLS:
            raise ValueError(
                f'`streaming` is only available when using {" or ".join(map(lambda x: f"`{x}`", SINGLE_PROTOCOLS))} as protocol, '
                f'got {load_protocol}'
            )
        else:
            return cls._load_binary_stream(
                file_ctx,
                protocol=load_protocol,
                compress=load_compress,
                show_progress=show_progress,
            )
    else:
        return cls._load_binary_all(
            file_ctx, load_protocol, load_compress, show_progress
        )

pull(url, show_progress=False, local_cache=True) classmethod

Pull a DocList from the specified url.

Parameters:

Name Type Description Default
url str

url specifying the protocol and save name of the DocList. Should be of the form protocol://namespace/name. e.g. s3://bucket/path/to/namespace/name, file:///path/to/folder/name

required
show_progress bool

if true, display a progress bar.

False
local_cache bool

store the downloaded DocList to local folder

True

Returns:

Type Description
DocList

a DocList object

Source code in docarray/array/doc_list/pushpull.py
@classmethod
def pull(
    cls: Type[SelfPushPullMixin],
    url: str,
    show_progress: bool = False,
    local_cache: bool = True,
) -> 'DocList':
    """Pull a `DocList` from the specified url.

    :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
    :param show_progress: if true, display a progress bar.
    :param local_cache: store the downloaded `DocList` to local folder
    :return: a `DocList` object
    """
    from docarray.base_doc import AnyDoc

    if cls.doc_type == AnyDoc:
        raise TypeError(
            'There is no document schema defined. '
            'Please specify the `DocList`\'s Document type using `DocList[MyDoc]`.'
        )

    logging.info(f'Pulling {url}')
    protocol, name = cls.resolve_url(url)
    return cls.get_pushpull_backend(protocol).pull(
        cls, name, show_progress, local_cache  # type: ignore
    )

pull_stream(url, show_progress=False, local_cache=False) classmethod

Pull a stream of Documents from the specified url.

Parameters:

Name Type Description Default
url str

url specifying the protocol and save name of the DocList. Should be of the form protocol://namespace/name. e.g. s3://bucket/path/to/namespace/name, file:///path/to/folder/name

required
show_progress bool

if true, display a progress bar.

False
local_cache bool

store the downloaded DocList to local folder

False

Returns:

Type Description
Iterator[BaseDoc]

Iterator of Documents

Source code in docarray/array/doc_list/pushpull.py
@classmethod
def pull_stream(
    cls: Type[SelfPushPullMixin],
    url: str,
    show_progress: bool = False,
    local_cache: bool = False,
) -> Iterator['BaseDoc']:
    """Pull a stream of Documents from the specified url.

    :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
    :param show_progress: if true, display a progress bar.
    :param local_cache: store the downloaded `DocList` to local folder
    :return: Iterator of Documents
    """
    from docarray.base_doc import AnyDoc

    if cls.doc_type == AnyDoc:
        raise TypeError(
            'There is no document schema defined. '
            'Please specify the `DocList`\'s Document type using `DocList[MyDoc]`.'
        )

    logging.info(f'Pulling Document stream from {url}')
    protocol, name = cls.resolve_url(url)
    return cls.get_pushpull_backend(protocol).pull_stream(
        cls, name, show_progress, local_cache  # type: ignore
    )

push(url, show_progress=False, **kwargs)

Push this DocList object to the specified url.

Parameters:

Name Type Description Default
url str

url specifying the protocol and save name of the DocList. Should be of the form protocol://namespace/name. e.g. s3://bucket/path/to/namespace/name, file:///path/to/folder/name

required
show_progress bool

If true, a progress bar will be displayed.

False
Source code in docarray/array/doc_list/pushpull.py
def push(
    self,
    url: str,
    show_progress: bool = False,
    **kwargs,
) -> Dict:
    """Push this `DocList` object to the specified url.

    :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
    :param show_progress: If true, a progress bar will be displayed.
    """
    logging.info(f'Pushing {len(self)} docs to {url}')
    protocol, name = self.__class__.resolve_url(url)
    return self.__class__.get_pushpull_backend(protocol).push(
        self, name, show_progress  # type: ignore
    )

push_stream(docs, url, show_progress=False) classmethod

Push a stream of documents to the specified url.

Parameters:

Name Type Description Default
docs Iterator[BaseDoc]

a stream of documents

required
url str

url specifying the protocol and save name of the DocList. Should be of the form protocol://namespace/name. e.g. s3://bucket/path/to/namespace/name, file:///path/to/folder/name

required
show_progress bool

If true, a progress bar will be displayed.

False
Source code in docarray/array/doc_list/pushpull.py
@classmethod
def push_stream(
    cls: Type[SelfPushPullMixin],
    docs: Iterator['BaseDoc'],
    url: str,
    show_progress: bool = False,
) -> Dict:
    """Push a stream of documents to the specified url.

    :param docs: a stream of documents
    :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
    :param show_progress: If true, a progress bar will be displayed.
    """
    logging.info(f'Pushing stream to {url}')
    protocol, name = cls.resolve_url(url)
    return cls.get_pushpull_backend(protocol).push_stream(docs, name, show_progress)

resolve_url(url) staticmethod

Resolve the URL to the correct protocol and name.

Parameters:

Name Type Description Default
url str

url to resolve

required
Source code in docarray/array/doc_list/pushpull.py
@staticmethod
def resolve_url(url: str) -> Tuple[PUSH_PULL_PROTOCOL, str]:
    """Resolve the URL to the correct protocol and name.
    :param url: url to resolve
    """
    protocol, name = url.split('://', 2)
    if protocol in SUPPORTED_PUSH_PULL_PROTOCOLS:
        protocol = cast(PUSH_PULL_PROTOCOL, protocol)
        return protocol, name
    else:
        raise ValueError(f'Unsupported protocol {protocol}')

save_binary(file, protocol='protobuf-array', compress=None, show_progress=False)

Save DocList into a binary file.

It will use the protocol to pick how to save the DocList. If used picke-doc_list and protobuf-array the DocList will be stored and compressed at complete level using pickle or protobuf. When using protobuf or pickle as protocol each Document in DocList will be stored individually and this would make it available for streaming.

!!! note If file is str it can specify protocol and compress as file extensions. This functionality assumes file=file_name.$protocol.$compress where $protocol and $compress refer to a string interpolation of the respective protocol and compress methods. For example if file=my_docarray.protobuf.lz4 then the binary data will be created using protocol=protobuf and compress=lz4.

Parameters:

Name Type Description Default
file Union[str, Path]

File or filename to which the data is saved.

required
protocol ProtocolType

protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'

'protobuf-array'
compress Optional[str]

compress algorithm to use between lz4, bz2, lzma, zlib, gzip

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False
Source code in docarray/array/doc_list/io.py
def save_binary(
    self,
    file: Union[str, pathlib.Path],
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    show_progress: bool = False,
) -> None:
    """Save DocList into a binary file.

    It will use the protocol to pick how to save the DocList.
    If used `picke-doc_list` and `protobuf-array` the DocList will be stored
    and compressed at complete level using `pickle` or `protobuf`.
    When using `protobuf` or `pickle` as protocol each Document in DocList
    will be stored individually and this would make it available for streaming.

     !!! note
        If `file` is `str` it can specify `protocol` and `compress` as file extensions.
        This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a
        string interpolation of the respective `protocol` and `compress` methods.
        For example if `file=my_docarray.protobuf.lz4` then the binary data will be created using `protocol=protobuf`
        and `compress=lz4`.

    :param file: File or filename to which the data is saved.
    :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
    :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    """
    if isinstance(file, io.BufferedWriter):
        file_ctx = nullcontext(file)
    else:
        _protocol, _compress = _protocol_and_compress_from_file_path(file)

        if _protocol is not None:
            protocol = _protocol
        if _compress is not None:
            compress = _compress

        file_ctx = open(file, 'wb')

    self.to_bytes(
        protocol=protocol,
        compress=compress,
        file_ctx=file_ctx,
        show_progress=show_progress,
    )

summary()

Print a summary of this DocList object and a summary of the schema of its Document type.

Source code in docarray/array/any_array.py
def summary(self):
    """
    Print a summary of this [`DocList`][docarray.array.doc_list.doc_list.DocList] object and a summary of the schema of its
    Document type.
    """
    DocArraySummary(self).summary()

to_base64(protocol='protobuf-array', compress=None, show_progress=False)

Serialize itself into base64 encoded string.

Parameters:

Name Type Description Default
protocol ProtocolType

protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'

'protobuf-array'
compress Optional[str]

compress algorithm to use between lz4, bz2, lzma, zlib, gzip

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False

Returns:

Type Description
str

the binary serialization in bytes or None if file_ctx is passed where to store

Source code in docarray/array/doc_list/io.py
def to_base64(
    self,
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    show_progress: bool = False,
) -> str:
    """Serialize itself into base64 encoded string.

    :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
    :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    :return: the binary serialization in bytes or None if file_ctx is passed where to store
    """
    with io.BytesIO() as bf:
        self._write_bytes(
            bf=bf,
            compress=compress,
            protocol=protocol,
            show_progress=show_progress,
        )
        return base64.b64encode(bf.getvalue()).decode('utf-8')

to_bytes(protocol='protobuf-array', compress=None, file_ctx=None, show_progress=False)

Serialize itself into bytes.

For more Pythonic code, please use bytes(...).

Parameters:

Name Type Description Default
protocol ProtocolType

protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'

'protobuf-array'
compress Optional[str]

compress algorithm to use between : lz4, bz2, lzma, zlib, gzip

None
file_ctx Optional[BinaryIO]

File or filename or serialized bytes where the data is stored.

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False

Returns:

Type Description
Optional[bytes]

the binary serialization in bytes or None if file_ctx is passed where to store

Source code in docarray/array/doc_list/io.py
def to_bytes(
    self,
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    file_ctx: Optional[BinaryIO] = None,
    show_progress: bool = False,
) -> Optional[bytes]:
    """Serialize itself into `bytes`.

    For more Pythonic code, please use ``bytes(...)``.

    :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
    :param compress: compress algorithm to use between : `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param file_ctx: File or filename or serialized bytes where the data is stored.
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    :return: the binary serialization in bytes or None if file_ctx is passed where to store
    """

    with file_ctx or io.BytesIO() as bf:
        self._write_bytes(
            bf=bf,
            protocol=protocol,
            compress=compress,
            show_progress=show_progress,
        )
        if isinstance(bf, io.BytesIO):
            return bf.getvalue()

    return None

to_csv(file_path, dialect='excel')

Save a DocList to a csv file. The field names will be stored in the first row. Each row corresponds to the information of one Document. Columns for nested fields will be named after the "__"-seperated access paths, such as 'image__url' for image.url.

Parameters:

Name Type Description Default
file_path str

path to a csv file.

required
dialect Union[str, Dialect]

defines separator and how to handle whitespaces etc. Can be a csv.Dialect instance or one string of: 'excel' (for comma separated values), 'excel-tab' (for tab separated values), 'unix' (for csv file generated on UNIX systems).

'excel'
Source code in docarray/array/doc_list/io.py
def to_csv(
    self, file_path: str, dialect: Union[str, csv.Dialect] = 'excel'
) -> None:
    """
    Save a `DocList` to a csv file.
    The field names will be stored in the first row. Each row corresponds to the
    information of one Document.
    Columns for nested fields will be named after the "__"-seperated access paths,
    such as `'image__url'` for `image.url`.

    :param file_path: path to a csv file.
    :param dialect: defines separator and how to handle whitespaces etc.
        Can be a [`csv.Dialect`](https://docs.python.org/3/library/csv.html#csv.Dialect)
        instance or one string of:
        `'excel'` (for comma separated values),
        `'excel-tab'` (for tab separated values),
        `'unix'` (for csv file generated on UNIX systems).

    """
    if self.doc_type == AnyDoc or self.doc_type == BaseDoc:
        raise TypeError(
            f'{type(self)} must be homogeneous to be converted to a csv.'
            'There is no document schema defined. '
            f'Please specify the {type(self)}\'s Document type using `{type(self)}[MyDoc]`.'
        )
    fields = self.doc_type._get_access_paths()

    with open(file_path, 'w') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=fields, dialect=dialect)
        writer.writeheader()

        for doc in self:
            doc_dict = _dict_to_access_paths(doc.dict())
            writer.writerow(doc_dict)

to_dataframe()

Save a DocList to a pandas.DataFrame. The field names will be stored as column names. Each row of the dataframe corresponds to the information of one Document. Columns for nested fields will be named after the "__"-seperated access paths, such as 'image__url' for image.url.

Returns:

Type Description
DataFrame

pandas.DataFrame

Source code in docarray/array/doc_list/io.py
def to_dataframe(self) -> 'pd.DataFrame':
    """
    Save a DocList to a `pandas.DataFrame`.
    The field names will be stored as column names. Each row of the dataframe corresponds
    to the information of one Document.
    Columns for nested fields will be named after the "__"-seperated access paths,
    such as `'image__url'` for `image.url`.

    :return: `pandas.DataFrame`
    """
    if TYPE_CHECKING:
        import pandas as pd
    else:
        pd = import_library('pandas', raise_error=True)

    if self.doc_type == AnyDoc:
        raise TypeError(
            'DocList must be homogeneous to be converted to a DataFrame.'
            'There is no document schema defined. '
            'Please specify the DocList\'s Document type using `DocList[MyDoc]`.'
        )

    fields = self.doc_type._get_access_paths()
    df = pd.DataFrame(columns=fields)

    for doc in self:
        doc_dict = _dict_to_access_paths(doc.dict())
        doc_dict = {k: [v] for k, v in doc_dict.items()}
        df = pd.concat([df, pd.DataFrame.from_dict(doc_dict)], ignore_index=True)

    return df

to_doc_vec(tensor_type=NdArray)

Convert the DocList into a DocVec. Self cannot be used afterward

Parameters:

Name Type Description Default
tensor_type Type[AbstractTensor]

Tensor Class used to wrap the doc_vec tensors. This is useful if the BaseDoc has some undefined tensor type like AnyTensor or Union of NdArray and TorchTensor

NdArray

Returns:

Type Description
DocVec

A DocVec of the same document type as self

Source code in docarray/array/doc_list/doc_list.py
def to_doc_vec(
    self,
    tensor_type: Type['AbstractTensor'] = NdArray,
) -> 'DocVec':
    """
    Convert the `DocList` into a `DocVec`. `Self` cannot be used
    afterward
    :param tensor_type: Tensor Class used to wrap the doc_vec tensors. This is useful
    if the BaseDoc has some undefined tensor type like AnyTensor or Union of NdArray and TorchTensor
    :return: A `DocVec` of the same document type as self
    """
    from docarray.array.doc_vec.doc_vec import DocVec

    return DocVec.__class_getitem__(self.doc_type)(self, tensor_type=tensor_type)

to_json()

Convert the object into JSON bytes. Can be loaded via .from_json.

Returns:

Type Description
str

JSON serialization of DocList

Source code in docarray/array/doc_list/io.py
def to_json(self) -> str:
    """Convert the object into JSON bytes. Can be loaded via `.from_json`.
    :return: JSON serialization of `DocList`
    """
    return orjson_dumps(self).decode('UTF-8')

to_protobuf()

Convert DocList into a Protobuf message

Source code in docarray/array/doc_list/io.py
def to_protobuf(self) -> 'DocListProto':
    """Convert `DocList` into a Protobuf message"""
    from docarray.proto import DocListProto

    da_proto = DocListProto()
    for doc in self:
        da_proto.docs.append(doc.to_protobuf())

    return da_proto

docarray.array.doc_list.io.IOMixinDocList

Bases: Iterable[T_doc]

Source code in docarray/array/doc_list/io.py
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
class IOMixinDocList(Iterable[T_doc]):
    doc_type: Type[T_doc]

    @abstractmethod
    def __len__(self):
        ...

    @abstractmethod
    def __init__(
        self,
        docs: Optional[Iterable[BaseDoc]] = None,
    ):
        ...

    @classmethod
    def from_protobuf(cls: Type[T], pb_msg: 'DocListProto') -> T:
        """create a Document from a protobuf message
        :param pb_msg: The protobuf message from where to construct the DocList
        """
        return cls(cls.doc_type.from_protobuf(doc_proto) for doc_proto in pb_msg.docs)

    def to_protobuf(self) -> 'DocListProto':
        """Convert `DocList` into a Protobuf message"""
        from docarray.proto import DocListProto

        da_proto = DocListProto()
        for doc in self:
            da_proto.docs.append(doc.to_protobuf())

        return da_proto

    @classmethod
    def from_bytes(
        cls: Type[T],
        data: bytes,
        protocol: ProtocolType = 'protobuf-array',
        compress: Optional[str] = None,
        show_progress: bool = False,
    ) -> T:
        """Deserialize bytes into a `DocList`.

        :param data: Bytes from which to deserialize
        :param protocol: protocol that was used to serialize
        :param compress: compression algorithm that was used to serialize between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
        :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
        :return: the deserialized `DocList`
        """
        return cls._load_binary_all(
            file_ctx=nullcontext(data),
            protocol=protocol,
            compress=compress,
            show_progress=show_progress,
        )

    def _write_bytes(
        self,
        bf: BinaryIO,
        protocol: ProtocolType = 'protobuf-array',
        compress: Optional[str] = None,
        show_progress: bool = False,
    ) -> None:
        if protocol in ARRAY_PROTOCOLS:
            compress_ctx = _get_compress_ctx(compress)
        else:
            # delegate the compression to per-doc compression
            compress_ctx = None

        fc: ContextManager
        if compress_ctx is None:
            # if compress do not support streaming then postpone the compress
            # into the for-loop
            f, fc = bf, nullcontext()
        else:
            f = compress_ctx(bf)
            fc = f
            compress = None

        with fc:
            if protocol == 'protobuf-array':
                f.write(self.to_protobuf().SerializePartialToString())
            elif protocol == 'pickle-array':
                f.write(pickle.dumps(self))
            elif protocol == 'json-array':
                f.write(self.to_json().encode())
            elif protocol in SINGLE_PROTOCOLS:
                f.write(
                    b''.join(
                        self._to_binary_stream(
                            protocol=protocol,
                            compress=compress,
                            show_progress=show_progress,
                        )
                    )
                )
            else:
                raise ValueError(
                    f'protocol={protocol} is not supported. Can be only {ALLOWED_PROTOCOLS}.'
                )

    def _to_binary_stream(
        self,
        protocol: ProtocolType = 'protobuf',
        compress: Optional[str] = None,
        show_progress: bool = False,
    ) -> Iterator[bytes]:
        from rich import filesize

        if show_progress:
            from docarray.utils._internal.progress_bar import _get_progressbar

            pbar, t = _get_progressbar(
                'Serializing', disable=not show_progress, total=len(self)
            )
        else:
            from contextlib import nullcontext

            pbar = nullcontext()

        yield self._stream_header

        with pbar:
            if show_progress:
                _total_size = 0
                pbar.start_task(t)
            for doc in self:
                doc_bytes = doc.to_bytes(protocol=protocol, compress=compress)
                len_doc_as_bytes = len(doc_bytes).to_bytes(4, 'big', signed=False)
                all_bytes = len_doc_as_bytes + doc_bytes

                yield all_bytes

                if show_progress:
                    _total_size += len(all_bytes)
                    pbar.update(
                        t,
                        advance=1,
                        total_size=str(filesize.decimal(_total_size)),
                    )

    def to_bytes(
        self,
        protocol: ProtocolType = 'protobuf-array',
        compress: Optional[str] = None,
        file_ctx: Optional[BinaryIO] = None,
        show_progress: bool = False,
    ) -> Optional[bytes]:
        """Serialize itself into `bytes`.

        For more Pythonic code, please use ``bytes(...)``.

        :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
        :param compress: compress algorithm to use between : `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
        :param file_ctx: File or filename or serialized bytes where the data is stored.
        :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
        :return: the binary serialization in bytes or None if file_ctx is passed where to store
        """

        with file_ctx or io.BytesIO() as bf:
            self._write_bytes(
                bf=bf,
                protocol=protocol,
                compress=compress,
                show_progress=show_progress,
            )
            if isinstance(bf, io.BytesIO):
                return bf.getvalue()

        return None

    @classmethod
    def from_base64(
        cls: Type[T],
        data: str,
        protocol: ProtocolType = 'protobuf-array',
        compress: Optional[str] = None,
        show_progress: bool = False,
    ) -> T:
        """Deserialize base64 strings into a `DocList`.

        :param data: Base64 string to deserialize
        :param protocol: protocol that was used to serialize
        :param compress: compress algorithm that was used to serialize between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
        :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
        :return: the deserialized `DocList`
        """
        return cls._load_binary_all(
            file_ctx=nullcontext(base64.b64decode(data)),
            protocol=protocol,
            compress=compress,
            show_progress=show_progress,
        )

    def to_base64(
        self,
        protocol: ProtocolType = 'protobuf-array',
        compress: Optional[str] = None,
        show_progress: bool = False,
    ) -> str:
        """Serialize itself into base64 encoded string.

        :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
        :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
        :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
        :return: the binary serialization in bytes or None if file_ctx is passed where to store
        """
        with io.BytesIO() as bf:
            self._write_bytes(
                bf=bf,
                compress=compress,
                protocol=protocol,
                show_progress=show_progress,
            )
            return base64.b64encode(bf.getvalue()).decode('utf-8')

    @classmethod
    def from_json(
        cls: Type[T],
        file: Union[str, bytes, bytearray],
    ) -> T:
        """Deserialize JSON strings or bytes into a `DocList`.

        :param file: JSON object from where to deserialize a `DocList`
        :return: the deserialized `DocList`
        """
        json_docs = orjson.loads(file)
        return cls([cls.doc_type(**v) for v in json_docs])

    def to_json(self) -> str:
        """Convert the object into JSON bytes. Can be loaded via `.from_json`.
        :return: JSON serialization of `DocList`
        """
        return orjson_dumps(self).decode('UTF-8')

    @classmethod
    def from_csv(
        cls: Type['T'],
        file_path: str,
        encoding: str = 'utf-8',
        dialect: Union[str, csv.Dialect] = 'excel',
    ) -> 'T':
        """
        Load a DocList from a csv file following the schema defined in the
        [`.doc_type`][docarray.DocList] attribute.
        Every row of the csv file will be mapped to one document in the doc_list.
        The column names (defined in the first row) have to match the field names
        of the Document type.
        For nested fields use "__"-separated access paths, such as `'image__url'`.

        List-like fields (including field of type DocList) are not supported.

        :param file_path: path to csv file to load DocList from.
        :param encoding: encoding used to read the csv file. Defaults to 'utf-8'.
        :param dialect: defines separator and how to handle whitespaces etc.
            Can be a [`csv.Dialect`](https://docs.python.org/3/library/csv.html#csv.Dialect)
            instance or one string of:
            `'excel'` (for comma separated values),
            `'excel-tab'` (for tab separated values),
            `'unix'` (for csv file generated on UNIX systems).

        :return: `DocList` object
        """
        if cls.doc_type == AnyDoc or cls.doc_type == BaseDoc:
            raise TypeError(
                'There is no document schema defined. '
                f'Please specify the {cls}\'s Document type using `{cls}[MyDoc]`.'
            )

        if file_path.startswith('http'):
            import urllib.request

            with urllib.request.urlopen(file_path) as f:
                file = StringIO(f.read().decode(encoding))
                return cls._from_csv_file(file, dialect)
        else:
            with open(file_path, 'r', encoding=encoding) as fp:
                return cls._from_csv_file(fp, dialect)

    @classmethod
    def _from_csv_file(
        cls: Type['T'],
        file: Union[StringIO, TextIOWrapper],
        dialect: Union[str, csv.Dialect],
    ) -> 'T':
        rows = csv.DictReader(file, dialect=dialect)

        doc_type = cls.doc_type
        docs = []

        field_names: List[str] = (
            [] if rows.fieldnames is None else [str(f) for f in rows.fieldnames]
        )
        if field_names is None or len(field_names) == 0:
            raise TypeError("No field names are given.")

        valid_paths = _all_access_paths_valid(
            doc_type=doc_type, access_paths=field_names
        )
        if not all(valid_paths):
            raise ValueError(
                f'Column names do not match the schema of the DocList\'s '
                f'document type ({cls.doc_type.__name__}): '
                f'{list(compress(field_names, [not v for v in valid_paths]))}'
            )

        for access_path2val in rows:
            doc_dict: Dict[Any, Any] = _access_path_dict_to_nested_dict(access_path2val)
            docs.append(doc_type.parse_obj(doc_dict))

        return cls(docs)

    def to_csv(
        self, file_path: str, dialect: Union[str, csv.Dialect] = 'excel'
    ) -> None:
        """
        Save a `DocList` to a csv file.
        The field names will be stored in the first row. Each row corresponds to the
        information of one Document.
        Columns for nested fields will be named after the "__"-seperated access paths,
        such as `'image__url'` for `image.url`.

        :param file_path: path to a csv file.
        :param dialect: defines separator and how to handle whitespaces etc.
            Can be a [`csv.Dialect`](https://docs.python.org/3/library/csv.html#csv.Dialect)
            instance or one string of:
            `'excel'` (for comma separated values),
            `'excel-tab'` (for tab separated values),
            `'unix'` (for csv file generated on UNIX systems).

        """
        if self.doc_type == AnyDoc or self.doc_type == BaseDoc:
            raise TypeError(
                f'{type(self)} must be homogeneous to be converted to a csv.'
                'There is no document schema defined. '
                f'Please specify the {type(self)}\'s Document type using `{type(self)}[MyDoc]`.'
            )
        fields = self.doc_type._get_access_paths()

        with open(file_path, 'w') as csv_file:
            writer = csv.DictWriter(csv_file, fieldnames=fields, dialect=dialect)
            writer.writeheader()

            for doc in self:
                doc_dict = _dict_to_access_paths(doc.dict())
                writer.writerow(doc_dict)

    @classmethod
    def from_dataframe(cls: Type['T'], df: 'pd.DataFrame') -> 'T':
        """
        Load a `DocList` from a `pandas.DataFrame` following the schema
        defined in the [`.doc_type`][docarray.DocList] attribute.
        Every row of the dataframe will be mapped to one Document in the doc_list.
        The column names of the dataframe have to match the field names of the
        Document type.
        For nested fields use "__"-separated access paths as column names,
        such as `'image__url'`.

        List-like fields (including field of type DocList) are not supported.

        ---

        ```python
        import pandas as pd

        from docarray import BaseDoc, DocList


        class Person(BaseDoc):
            name: str
            follower: int


        df = pd.DataFrame(
            data=[['Maria', 12345], ['Jake', 54321]], columns=['name', 'follower']
        )

        docs = DocList[Person].from_dataframe(df)

        assert docs.name == ['Maria', 'Jake']
        assert docs.follower == [12345, 54321]
        ```

        ---

        :param df: `pandas.DataFrame` to extract Document's information from
        :return: `DocList` where each Document contains the information of one
            corresponding row of the `pandas.DataFrame`.
        """
        from docarray import DocList

        if cls.doc_type == AnyDoc or cls.doc_type == BaseDoc:
            raise TypeError(
                'There is no document schema defined. '
                f'Please specify the {cls}\'s Document type using `{cls}[MyDoc]`.'
            )

        doc_type = cls.doc_type
        docs = DocList.__class_getitem__(doc_type)()
        field_names = df.columns.tolist()

        if field_names is None or len(field_names) == 0:
            raise TypeError("No field names are given.")

        valid_paths = _all_access_paths_valid(
            doc_type=doc_type, access_paths=field_names
        )
        if not all(valid_paths):
            raise ValueError(
                f'Column names do not match the schema of the DocList\'s '
                f'document type ({cls.doc_type.__name__}): '
                f'{list(compress(field_names, [not v for v in valid_paths]))}'
            )

        for row in df.itertuples():
            access_path2val = row._asdict()
            access_path2val.pop('index', None)
            doc_dict = _access_path_dict_to_nested_dict(access_path2val)
            docs.append(doc_type.parse_obj(doc_dict))

        return docs

    def to_dataframe(self) -> 'pd.DataFrame':
        """
        Save a DocList to a `pandas.DataFrame`.
        The field names will be stored as column names. Each row of the dataframe corresponds
        to the information of one Document.
        Columns for nested fields will be named after the "__"-seperated access paths,
        such as `'image__url'` for `image.url`.

        :return: `pandas.DataFrame`
        """
        if TYPE_CHECKING:
            import pandas as pd
        else:
            pd = import_library('pandas', raise_error=True)

        if self.doc_type == AnyDoc:
            raise TypeError(
                'DocList must be homogeneous to be converted to a DataFrame.'
                'There is no document schema defined. '
                'Please specify the DocList\'s Document type using `DocList[MyDoc]`.'
            )

        fields = self.doc_type._get_access_paths()
        df = pd.DataFrame(columns=fields)

        for doc in self:
            doc_dict = _dict_to_access_paths(doc.dict())
            doc_dict = {k: [v] for k, v in doc_dict.items()}
            df = pd.concat([df, pd.DataFrame.from_dict(doc_dict)], ignore_index=True)

        return df

    # Methods to load from/to files in different formats
    @property
    def _stream_header(self) -> bytes:
        # Binary format for streaming case

        # V2 DocList streaming serialization format
        # | 1 byte | 8 bytes | 4 bytes | variable(DocArray >=0.30) | 4 bytes | variable(DocArray >=0.30) ...

        # 1 byte (uint8)
        version_byte = b'\x02'
        # 8 bytes (uint64)
        num_docs_as_bytes = len(self).to_bytes(8, 'big', signed=False)
        return version_byte + num_docs_as_bytes

    @classmethod
    @abstractmethod
    def _get_proto_class(cls: Type[T]):
        ...

    @classmethod
    def _load_binary_all(
        cls: Type[T],
        file_ctx: Union[ContextManager[io.BufferedReader], ContextManager[bytes]],
        protocol: Optional[ProtocolType],
        compress: Optional[str],
        show_progress: bool,
        tensor_type: Optional[Type['AbstractTensor']] = None,
    ):
        """Read a `DocList` object from a binary file
        :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
        :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
        :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
        :param tensor_type: only relevant for DocVec; tensor_type of the DocVec
        :return: a `DocList`
        """
        with file_ctx as fp:
            if isinstance(fp, bytes):
                d = fp
            else:
                d = fp.read()

        if protocol is not None and protocol in (
            'pickle-array',
            'protobuf-array',
            'json-array',
        ):
            if _get_compress_ctx(algorithm=compress) is not None:
                d = _decompress_bytes(d, algorithm=compress)
                compress = None

        if protocol is not None and protocol == 'protobuf-array':
            proto = cls._get_proto_class()()
            proto.ParseFromString(d)

            if tensor_type is not None:
                cls_ = cast('IOMixinDocVec', cls)
                return cls_.from_protobuf(proto, tensor_type=tensor_type)
            else:
                return cls.from_protobuf(proto)
        elif protocol is not None and protocol == 'pickle-array':
            return pickle.loads(d)

        elif protocol is not None and protocol == 'json-array':
            if tensor_type is not None:
                cls_ = cast('IOMixinDocVec', cls)
                return cls_.from_json(d, tensor_type=tensor_type)
            else:
                return cls.from_json(d)

        # Binary format for streaming case
        else:
            from rich import filesize

            from docarray.utils._internal.progress_bar import _get_progressbar

            # 1 byte (uint8)
            version_num = int.from_bytes(d[0:1], 'big', signed=False)
            if version_num != 2:
                raise ValueError(
                    f'Unsupported version number {version_num} in binary format, expected 2'
                )

            # 8 bytes (uint64)
            num_docs = int.from_bytes(d[1:9], 'big', signed=False)

            pbar, t = _get_progressbar(
                'Deserializing', disable=not show_progress, total=num_docs
            )

            # this 9 is version + num_docs bytes used
            start_pos = 9
            docs = []
            with pbar:
                _total_size = 0
                pbar.start_task(t)

                for _ in range(num_docs):
                    # 4 bytes (uint32)
                    len_current_doc_in_bytes = int.from_bytes(
                        d[start_pos : start_pos + 4], 'big', signed=False
                    )
                    start_doc_pos = start_pos + 4
                    end_doc_pos = start_doc_pos + len_current_doc_in_bytes
                    start_pos = end_doc_pos

                    # variable length bytes doc
                    load_protocol: ProtocolType = protocol or cast(
                        ProtocolType, 'protobuf'
                    )
                    doc = cls.doc_type.from_bytes(
                        d[start_doc_pos:end_doc_pos],
                        protocol=load_protocol,
                        compress=compress,
                    )
                    docs.append(doc)
                    _total_size += len_current_doc_in_bytes
                    pbar.update(
                        t, advance=1, total_size=str(filesize.decimal(_total_size))
                    )
            if tensor_type is not None:
                cls__ = cast(Type['DocVec'], cls)
                # mypy doesn't realize that cls_ is callable
                return cls__(docs, tensor_type=tensor_type)  # type: ignore
            return cls(docs)

    @classmethod
    def _load_binary_stream(
        cls: Type[T],
        file_ctx: ContextManager[io.BufferedReader],
        protocol: ProtocolType = 'protobuf',
        compress: Optional[str] = None,
        show_progress: bool = False,
    ) -> Generator['T_doc', None, None]:
        """Yield `Document` objects from a binary file

        :param protocol: protocol to use. It can be 'pickle' or 'protobuf'
        :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
        :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
        :return: a generator of `Document` objects
        """

        from rich import filesize

        with file_ctx as f:
            version_numdocs_lendoc0 = f.read(9)
            # 1 byte (uint8)
            version_num = int.from_bytes(
                version_numdocs_lendoc0[0:1], 'big', signed=False
            )
            if version_num != 2:
                raise ValueError(
                    f'Unsupported version number {version_num} in binary format, expected 2'
                )

            # 8 bytes (uint64)
            num_docs = int.from_bytes(version_numdocs_lendoc0[1:9], 'big', signed=False)

            if show_progress:
                from docarray.utils._internal.progress_bar import _get_progressbar

                pbar, t = _get_progressbar(
                    'Deserializing', disable=not show_progress, total=num_docs
                )
            else:
                from contextlib import nullcontext

                pbar = nullcontext()

            with pbar:
                if show_progress:
                    _total_size = 0
                    pbar.start_task(t)
                for _ in range(num_docs):
                    # 4 bytes (uint32)
                    len_current_doc_in_bytes = int.from_bytes(
                        f.read(4), 'big', signed=False
                    )
                    load_protocol: ProtocolType = protocol
                    yield cls.doc_type.from_bytes(
                        f.read(len_current_doc_in_bytes),
                        protocol=load_protocol,
                        compress=compress,
                    )
                    if show_progress:
                        _total_size += len_current_doc_in_bytes
                        pbar.update(
                            t, advance=1, total_size=str(filesize.decimal(_total_size))
                        )

    @staticmethod
    def _get_file_context(
        file: Union[str, bytes, pathlib.Path, io.BufferedReader, _LazyRequestReader],
        protocol: ProtocolType,
        compress: Optional[str] = None,
    ) -> Tuple[
        Union[nullcontext, io.BufferedReader], Optional[ProtocolType], Optional[str]
    ]:
        load_protocol: Optional[ProtocolType] = protocol
        load_compress: Optional[str] = compress
        file_ctx: Union[nullcontext, io.BufferedReader]
        if isinstance(file, (io.BufferedReader, _LazyRequestReader, bytes)):
            file_ctx = nullcontext(file)
        # by checking path existence we allow file to be of type Path, LocalPath, PurePath and str
        elif isinstance(file, (str, pathlib.Path)) and os.path.exists(file):
            load_protocol, load_compress = _protocol_and_compress_from_file_path(
                file, protocol, compress
            )
            file_ctx = open(file, 'rb')
        else:
            raise FileNotFoundError(f'cannot find file {file}')
        return file_ctx, load_protocol, load_compress

    @classmethod
    def load_binary(
        cls: Type[T],
        file: Union[str, bytes, pathlib.Path, io.BufferedReader, _LazyRequestReader],
        protocol: ProtocolType = 'protobuf-array',
        compress: Optional[str] = None,
        show_progress: bool = False,
        streaming: bool = False,
    ) -> Union[T, Generator['T_doc', None, None]]:
        """Load doc_list elements from a compressed binary file.

        In case protocol is pickle the `Documents` are streamed from disk to save memory usage

        !!! note
            If `file` is `str` it can specify `protocol` and `compress` as file extensions.
            This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a
            string interpolation of the respective `protocol` and `compress` methods.
            For example if `file=my_docarray.protobuf.lz4` then the binary data will be loaded assuming `protocol=protobuf`
            and `compress=lz4`.

        :param file: File or filename or serialized bytes where the data is stored.
        :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
        :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
        :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
        :param streaming: if `True` returns a generator over `Document` objects.

        :return: a `DocList` object

        """
        file_ctx, load_protocol, load_compress = cls._get_file_context(
            file, protocol, compress
        )
        if streaming:
            if load_protocol not in SINGLE_PROTOCOLS:
                raise ValueError(
                    f'`streaming` is only available when using {" or ".join(map(lambda x: f"`{x}`", SINGLE_PROTOCOLS))} as protocol, '
                    f'got {load_protocol}'
                )
            else:
                return cls._load_binary_stream(
                    file_ctx,
                    protocol=load_protocol,
                    compress=load_compress,
                    show_progress=show_progress,
                )
        else:
            return cls._load_binary_all(
                file_ctx, load_protocol, load_compress, show_progress
            )

    def save_binary(
        self,
        file: Union[str, pathlib.Path],
        protocol: ProtocolType = 'protobuf-array',
        compress: Optional[str] = None,
        show_progress: bool = False,
    ) -> None:
        """Save DocList into a binary file.

        It will use the protocol to pick how to save the DocList.
        If used `picke-doc_list` and `protobuf-array` the DocList will be stored
        and compressed at complete level using `pickle` or `protobuf`.
        When using `protobuf` or `pickle` as protocol each Document in DocList
        will be stored individually and this would make it available for streaming.

         !!! note
            If `file` is `str` it can specify `protocol` and `compress` as file extensions.
            This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a
            string interpolation of the respective `protocol` and `compress` methods.
            For example if `file=my_docarray.protobuf.lz4` then the binary data will be created using `protocol=protobuf`
            and `compress=lz4`.

        :param file: File or filename to which the data is saved.
        :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
        :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
        :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
        """
        if isinstance(file, io.BufferedWriter):
            file_ctx = nullcontext(file)
        else:
            _protocol, _compress = _protocol_and_compress_from_file_path(file)

            if _protocol is not None:
                protocol = _protocol
            if _compress is not None:
                compress = _compress

            file_ctx = open(file, 'wb')

        self.to_bytes(
            protocol=protocol,
            compress=compress,
            file_ctx=file_ctx,
            show_progress=show_progress,
        )

from_base64(data, protocol='protobuf-array', compress=None, show_progress=False) classmethod

Deserialize base64 strings into a DocList.

Parameters:

Name Type Description Default
data str

Base64 string to deserialize

required
protocol ProtocolType

protocol that was used to serialize

'protobuf-array'
compress Optional[str]

compress algorithm that was used to serialize between lz4, bz2, lzma, zlib, gzip

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False

Returns:

Type Description
T

the deserialized DocList

Source code in docarray/array/doc_list/io.py
@classmethod
def from_base64(
    cls: Type[T],
    data: str,
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    show_progress: bool = False,
) -> T:
    """Deserialize base64 strings into a `DocList`.

    :param data: Base64 string to deserialize
    :param protocol: protocol that was used to serialize
    :param compress: compress algorithm that was used to serialize between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    :return: the deserialized `DocList`
    """
    return cls._load_binary_all(
        file_ctx=nullcontext(base64.b64decode(data)),
        protocol=protocol,
        compress=compress,
        show_progress=show_progress,
    )

from_bytes(data, protocol='protobuf-array', compress=None, show_progress=False) classmethod

Deserialize bytes into a DocList.

Parameters:

Name Type Description Default
data bytes

Bytes from which to deserialize

required
protocol ProtocolType

protocol that was used to serialize

'protobuf-array'
compress Optional[str]

compression algorithm that was used to serialize between lz4, bz2, lzma, zlib, gzip

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False

Returns:

Type Description
T

the deserialized DocList

Source code in docarray/array/doc_list/io.py
@classmethod
def from_bytes(
    cls: Type[T],
    data: bytes,
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    show_progress: bool = False,
) -> T:
    """Deserialize bytes into a `DocList`.

    :param data: Bytes from which to deserialize
    :param protocol: protocol that was used to serialize
    :param compress: compression algorithm that was used to serialize between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    :return: the deserialized `DocList`
    """
    return cls._load_binary_all(
        file_ctx=nullcontext(data),
        protocol=protocol,
        compress=compress,
        show_progress=show_progress,
    )

from_csv(file_path, encoding='utf-8', dialect='excel') classmethod

Load a DocList from a csv file following the schema defined in the .doc_type attribute. Every row of the csv file will be mapped to one document in the doc_list. The column names (defined in the first row) have to match the field names of the Document type. For nested fields use "__"-separated access paths, such as 'image__url'.

List-like fields (including field of type DocList) are not supported.

Parameters:

Name Type Description Default
file_path str

path to csv file to load DocList from.

required
encoding str

encoding used to read the csv file. Defaults to 'utf-8'.

'utf-8'
dialect Union[str, Dialect]

defines separator and how to handle whitespaces etc. Can be a csv.Dialect instance or one string of: 'excel' (for comma separated values), 'excel-tab' (for tab separated values), 'unix' (for csv file generated on UNIX systems).

'excel'

Returns:

Type Description
T

DocList object

Source code in docarray/array/doc_list/io.py
@classmethod
def from_csv(
    cls: Type['T'],
    file_path: str,
    encoding: str = 'utf-8',
    dialect: Union[str, csv.Dialect] = 'excel',
) -> 'T':
    """
    Load a DocList from a csv file following the schema defined in the
    [`.doc_type`][docarray.DocList] attribute.
    Every row of the csv file will be mapped to one document in the doc_list.
    The column names (defined in the first row) have to match the field names
    of the Document type.
    For nested fields use "__"-separated access paths, such as `'image__url'`.

    List-like fields (including field of type DocList) are not supported.

    :param file_path: path to csv file to load DocList from.
    :param encoding: encoding used to read the csv file. Defaults to 'utf-8'.
    :param dialect: defines separator and how to handle whitespaces etc.
        Can be a [`csv.Dialect`](https://docs.python.org/3/library/csv.html#csv.Dialect)
        instance or one string of:
        `'excel'` (for comma separated values),
        `'excel-tab'` (for tab separated values),
        `'unix'` (for csv file generated on UNIX systems).

    :return: `DocList` object
    """
    if cls.doc_type == AnyDoc or cls.doc_type == BaseDoc:
        raise TypeError(
            'There is no document schema defined. '
            f'Please specify the {cls}\'s Document type using `{cls}[MyDoc]`.'
        )

    if file_path.startswith('http'):
        import urllib.request

        with urllib.request.urlopen(file_path) as f:
            file = StringIO(f.read().decode(encoding))
            return cls._from_csv_file(file, dialect)
    else:
        with open(file_path, 'r', encoding=encoding) as fp:
            return cls._from_csv_file(fp, dialect)

from_dataframe(df) classmethod

Load a DocList from a pandas.DataFrame following the schema defined in the .doc_type attribute. Every row of the dataframe will be mapped to one Document in the doc_list. The column names of the dataframe have to match the field names of the Document type. For nested fields use "__"-separated access paths as column names, such as 'image__url'.

List-like fields (including field of type DocList) are not supported.


import pandas as pd

from docarray import BaseDoc, DocList


class Person(BaseDoc):
    name: str
    follower: int


df = pd.DataFrame(
    data=[['Maria', 12345], ['Jake', 54321]], columns=['name', 'follower']
)

docs = DocList[Person].from_dataframe(df)

assert docs.name == ['Maria', 'Jake']
assert docs.follower == [12345, 54321]

Parameters:

Name Type Description Default
df DataFrame

pandas.DataFrame to extract Document's information from

required

Returns:

Type Description
T

DocList where each Document contains the information of one corresponding row of the pandas.DataFrame.

Source code in docarray/array/doc_list/io.py
@classmethod
def from_dataframe(cls: Type['T'], df: 'pd.DataFrame') -> 'T':
    """
    Load a `DocList` from a `pandas.DataFrame` following the schema
    defined in the [`.doc_type`][docarray.DocList] attribute.
    Every row of the dataframe will be mapped to one Document in the doc_list.
    The column names of the dataframe have to match the field names of the
    Document type.
    For nested fields use "__"-separated access paths as column names,
    such as `'image__url'`.

    List-like fields (including field of type DocList) are not supported.

    ---

    ```python
    import pandas as pd

    from docarray import BaseDoc, DocList


    class Person(BaseDoc):
        name: str
        follower: int


    df = pd.DataFrame(
        data=[['Maria', 12345], ['Jake', 54321]], columns=['name', 'follower']
    )

    docs = DocList[Person].from_dataframe(df)

    assert docs.name == ['Maria', 'Jake']
    assert docs.follower == [12345, 54321]
    ```

    ---

    :param df: `pandas.DataFrame` to extract Document's information from
    :return: `DocList` where each Document contains the information of one
        corresponding row of the `pandas.DataFrame`.
    """
    from docarray import DocList

    if cls.doc_type == AnyDoc or cls.doc_type == BaseDoc:
        raise TypeError(
            'There is no document schema defined. '
            f'Please specify the {cls}\'s Document type using `{cls}[MyDoc]`.'
        )

    doc_type = cls.doc_type
    docs = DocList.__class_getitem__(doc_type)()
    field_names = df.columns.tolist()

    if field_names is None or len(field_names) == 0:
        raise TypeError("No field names are given.")

    valid_paths = _all_access_paths_valid(
        doc_type=doc_type, access_paths=field_names
    )
    if not all(valid_paths):
        raise ValueError(
            f'Column names do not match the schema of the DocList\'s '
            f'document type ({cls.doc_type.__name__}): '
            f'{list(compress(field_names, [not v for v in valid_paths]))}'
        )

    for row in df.itertuples():
        access_path2val = row._asdict()
        access_path2val.pop('index', None)
        doc_dict = _access_path_dict_to_nested_dict(access_path2val)
        docs.append(doc_type.parse_obj(doc_dict))

    return docs

from_json(file) classmethod

Deserialize JSON strings or bytes into a DocList.

Parameters:

Name Type Description Default
file Union[str, bytes, bytearray]

JSON object from where to deserialize a DocList

required

Returns:

Type Description
T

the deserialized DocList

Source code in docarray/array/doc_list/io.py
@classmethod
def from_json(
    cls: Type[T],
    file: Union[str, bytes, bytearray],
) -> T:
    """Deserialize JSON strings or bytes into a `DocList`.

    :param file: JSON object from where to deserialize a `DocList`
    :return: the deserialized `DocList`
    """
    json_docs = orjson.loads(file)
    return cls([cls.doc_type(**v) for v in json_docs])

from_protobuf(pb_msg) classmethod

create a Document from a protobuf message

Parameters:

Name Type Description Default
pb_msg DocListProto

The protobuf message from where to construct the DocList

required
Source code in docarray/array/doc_list/io.py
@classmethod
def from_protobuf(cls: Type[T], pb_msg: 'DocListProto') -> T:
    """create a Document from a protobuf message
    :param pb_msg: The protobuf message from where to construct the DocList
    """
    return cls(cls.doc_type.from_protobuf(doc_proto) for doc_proto in pb_msg.docs)

load_binary(file, protocol='protobuf-array', compress=None, show_progress=False, streaming=False) classmethod

Load doc_list elements from a compressed binary file.

In case protocol is pickle the Documents are streamed from disk to save memory usage

Note

If file is str it can specify protocol and compress as file extensions. This functionality assumes file=file_name.$protocol.$compress where $protocol and $compress refer to a string interpolation of the respective protocol and compress methods. For example if file=my_docarray.protobuf.lz4 then the binary data will be loaded assuming protocol=protobuf and compress=lz4.

Parameters:

Name Type Description Default
file Union[str, bytes, Path, BufferedReader, _LazyRequestReader]

File or filename or serialized bytes where the data is stored.

required
protocol ProtocolType

protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'

'protobuf-array'
compress Optional[str]

compress algorithm to use between lz4, bz2, lzma, zlib, gzip

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False
streaming bool

if True returns a generator over Document objects.

False

Returns:

Type Description
Union[T, Generator[T_doc, None, None]]

a DocList object

Source code in docarray/array/doc_list/io.py
@classmethod
def load_binary(
    cls: Type[T],
    file: Union[str, bytes, pathlib.Path, io.BufferedReader, _LazyRequestReader],
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    show_progress: bool = False,
    streaming: bool = False,
) -> Union[T, Generator['T_doc', None, None]]:
    """Load doc_list elements from a compressed binary file.

    In case protocol is pickle the `Documents` are streamed from disk to save memory usage

    !!! note
        If `file` is `str` it can specify `protocol` and `compress` as file extensions.
        This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a
        string interpolation of the respective `protocol` and `compress` methods.
        For example if `file=my_docarray.protobuf.lz4` then the binary data will be loaded assuming `protocol=protobuf`
        and `compress=lz4`.

    :param file: File or filename or serialized bytes where the data is stored.
    :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
    :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    :param streaming: if `True` returns a generator over `Document` objects.

    :return: a `DocList` object

    """
    file_ctx, load_protocol, load_compress = cls._get_file_context(
        file, protocol, compress
    )
    if streaming:
        if load_protocol not in SINGLE_PROTOCOLS:
            raise ValueError(
                f'`streaming` is only available when using {" or ".join(map(lambda x: f"`{x}`", SINGLE_PROTOCOLS))} as protocol, '
                f'got {load_protocol}'
            )
        else:
            return cls._load_binary_stream(
                file_ctx,
                protocol=load_protocol,
                compress=load_compress,
                show_progress=show_progress,
            )
    else:
        return cls._load_binary_all(
            file_ctx, load_protocol, load_compress, show_progress
        )

save_binary(file, protocol='protobuf-array', compress=None, show_progress=False)

Save DocList into a binary file.

It will use the protocol to pick how to save the DocList. If used picke-doc_list and protobuf-array the DocList will be stored and compressed at complete level using pickle or protobuf. When using protobuf or pickle as protocol each Document in DocList will be stored individually and this would make it available for streaming.

!!! note If file is str it can specify protocol and compress as file extensions. This functionality assumes file=file_name.$protocol.$compress where $protocol and $compress refer to a string interpolation of the respective protocol and compress methods. For example if file=my_docarray.protobuf.lz4 then the binary data will be created using protocol=protobuf and compress=lz4.

Parameters:

Name Type Description Default
file Union[str, Path]

File or filename to which the data is saved.

required
protocol ProtocolType

protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'

'protobuf-array'
compress Optional[str]

compress algorithm to use between lz4, bz2, lzma, zlib, gzip

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False
Source code in docarray/array/doc_list/io.py
def save_binary(
    self,
    file: Union[str, pathlib.Path],
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    show_progress: bool = False,
) -> None:
    """Save DocList into a binary file.

    It will use the protocol to pick how to save the DocList.
    If used `picke-doc_list` and `protobuf-array` the DocList will be stored
    and compressed at complete level using `pickle` or `protobuf`.
    When using `protobuf` or `pickle` as protocol each Document in DocList
    will be stored individually and this would make it available for streaming.

     !!! note
        If `file` is `str` it can specify `protocol` and `compress` as file extensions.
        This functionality assumes `file=file_name.$protocol.$compress` where `$protocol` and `$compress` refer to a
        string interpolation of the respective `protocol` and `compress` methods.
        For example if `file=my_docarray.protobuf.lz4` then the binary data will be created using `protocol=protobuf`
        and `compress=lz4`.

    :param file: File or filename to which the data is saved.
    :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
    :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    """
    if isinstance(file, io.BufferedWriter):
        file_ctx = nullcontext(file)
    else:
        _protocol, _compress = _protocol_and_compress_from_file_path(file)

        if _protocol is not None:
            protocol = _protocol
        if _compress is not None:
            compress = _compress

        file_ctx = open(file, 'wb')

    self.to_bytes(
        protocol=protocol,
        compress=compress,
        file_ctx=file_ctx,
        show_progress=show_progress,
    )

to_base64(protocol='protobuf-array', compress=None, show_progress=False)

Serialize itself into base64 encoded string.

Parameters:

Name Type Description Default
protocol ProtocolType

protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'

'protobuf-array'
compress Optional[str]

compress algorithm to use between lz4, bz2, lzma, zlib, gzip

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False

Returns:

Type Description
str

the binary serialization in bytes or None if file_ctx is passed where to store

Source code in docarray/array/doc_list/io.py
def to_base64(
    self,
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    show_progress: bool = False,
) -> str:
    """Serialize itself into base64 encoded string.

    :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
    :param compress: compress algorithm to use between `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    :return: the binary serialization in bytes or None if file_ctx is passed where to store
    """
    with io.BytesIO() as bf:
        self._write_bytes(
            bf=bf,
            compress=compress,
            protocol=protocol,
            show_progress=show_progress,
        )
        return base64.b64encode(bf.getvalue()).decode('utf-8')

to_bytes(protocol='protobuf-array', compress=None, file_ctx=None, show_progress=False)

Serialize itself into bytes.

For more Pythonic code, please use bytes(...).

Parameters:

Name Type Description Default
protocol ProtocolType

protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'

'protobuf-array'
compress Optional[str]

compress algorithm to use between : lz4, bz2, lzma, zlib, gzip

None
file_ctx Optional[BinaryIO]

File or filename or serialized bytes where the data is stored.

None
show_progress bool

show progress bar, only works when protocol is pickle or protobuf

False

Returns:

Type Description
Optional[bytes]

the binary serialization in bytes or None if file_ctx is passed where to store

Source code in docarray/array/doc_list/io.py
def to_bytes(
    self,
    protocol: ProtocolType = 'protobuf-array',
    compress: Optional[str] = None,
    file_ctx: Optional[BinaryIO] = None,
    show_progress: bool = False,
) -> Optional[bytes]:
    """Serialize itself into `bytes`.

    For more Pythonic code, please use ``bytes(...)``.

    :param protocol: protocol to use. It can be 'pickle-array', 'protobuf-array', 'pickle' or 'protobuf'
    :param compress: compress algorithm to use between : `lz4`, `bz2`, `lzma`, `zlib`, `gzip`
    :param file_ctx: File or filename or serialized bytes where the data is stored.
    :param show_progress: show progress bar, only works when protocol is `pickle` or `protobuf`
    :return: the binary serialization in bytes or None if file_ctx is passed where to store
    """

    with file_ctx or io.BytesIO() as bf:
        self._write_bytes(
            bf=bf,
            protocol=protocol,
            compress=compress,
            show_progress=show_progress,
        )
        if isinstance(bf, io.BytesIO):
            return bf.getvalue()

    return None

to_csv(file_path, dialect='excel')

Save a DocList to a csv file. The field names will be stored in the first row. Each row corresponds to the information of one Document. Columns for nested fields will be named after the "__"-seperated access paths, such as 'image__url' for image.url.

Parameters:

Name Type Description Default
file_path str

path to a csv file.

required
dialect Union[str, Dialect]

defines separator and how to handle whitespaces etc. Can be a csv.Dialect instance or one string of: 'excel' (for comma separated values), 'excel-tab' (for tab separated values), 'unix' (for csv file generated on UNIX systems).

'excel'
Source code in docarray/array/doc_list/io.py
def to_csv(
    self, file_path: str, dialect: Union[str, csv.Dialect] = 'excel'
) -> None:
    """
    Save a `DocList` to a csv file.
    The field names will be stored in the first row. Each row corresponds to the
    information of one Document.
    Columns for nested fields will be named after the "__"-seperated access paths,
    such as `'image__url'` for `image.url`.

    :param file_path: path to a csv file.
    :param dialect: defines separator and how to handle whitespaces etc.
        Can be a [`csv.Dialect`](https://docs.python.org/3/library/csv.html#csv.Dialect)
        instance or one string of:
        `'excel'` (for comma separated values),
        `'excel-tab'` (for tab separated values),
        `'unix'` (for csv file generated on UNIX systems).

    """
    if self.doc_type == AnyDoc or self.doc_type == BaseDoc:
        raise TypeError(
            f'{type(self)} must be homogeneous to be converted to a csv.'
            'There is no document schema defined. '
            f'Please specify the {type(self)}\'s Document type using `{type(self)}[MyDoc]`.'
        )
    fields = self.doc_type._get_access_paths()

    with open(file_path, 'w') as csv_file:
        writer = csv.DictWriter(csv_file, fieldnames=fields, dialect=dialect)
        writer.writeheader()

        for doc in self:
            doc_dict = _dict_to_access_paths(doc.dict())
            writer.writerow(doc_dict)

to_dataframe()

Save a DocList to a pandas.DataFrame. The field names will be stored as column names. Each row of the dataframe corresponds to the information of one Document. Columns for nested fields will be named after the "__"-seperated access paths, such as 'image__url' for image.url.

Returns:

Type Description
DataFrame

pandas.DataFrame

Source code in docarray/array/doc_list/io.py
def to_dataframe(self) -> 'pd.DataFrame':
    """
    Save a DocList to a `pandas.DataFrame`.
    The field names will be stored as column names. Each row of the dataframe corresponds
    to the information of one Document.
    Columns for nested fields will be named after the "__"-seperated access paths,
    such as `'image__url'` for `image.url`.

    :return: `pandas.DataFrame`
    """
    if TYPE_CHECKING:
        import pandas as pd
    else:
        pd = import_library('pandas', raise_error=True)

    if self.doc_type == AnyDoc:
        raise TypeError(
            'DocList must be homogeneous to be converted to a DataFrame.'
            'There is no document schema defined. '
            'Please specify the DocList\'s Document type using `DocList[MyDoc]`.'
        )

    fields = self.doc_type._get_access_paths()
    df = pd.DataFrame(columns=fields)

    for doc in self:
        doc_dict = _dict_to_access_paths(doc.dict())
        doc_dict = {k: [v] for k, v in doc_dict.items()}
        df = pd.concat([df, pd.DataFrame.from_dict(doc_dict)], ignore_index=True)

    return df

to_json()

Convert the object into JSON bytes. Can be loaded via .from_json.

Returns:

Type Description
str

JSON serialization of DocList

Source code in docarray/array/doc_list/io.py
def to_json(self) -> str:
    """Convert the object into JSON bytes. Can be loaded via `.from_json`.
    :return: JSON serialization of `DocList`
    """
    return orjson_dumps(self).decode('UTF-8')

to_protobuf()

Convert DocList into a Protobuf message

Source code in docarray/array/doc_list/io.py
def to_protobuf(self) -> 'DocListProto':
    """Convert `DocList` into a Protobuf message"""
    from docarray.proto import DocListProto

    da_proto = DocListProto()
    for doc in self:
        da_proto.docs.append(doc.to_protobuf())

    return da_proto

docarray.array.doc_list.pushpull.PushPullMixin

Bases: Iterable['BaseDoc']

Mixin class for push/pull functionality.

Source code in docarray/array/doc_list/pushpull.py
class PushPullMixin(Iterable['BaseDoc']):
    """Mixin class for push/pull functionality."""

    __backends__: Dict[str, Type['AbstractDocStore']] = {}
    doc_type: Type['BaseDoc']

    @abstractmethod
    def __len__(self) -> int:
        ...

    @staticmethod
    def resolve_url(url: str) -> Tuple[PUSH_PULL_PROTOCOL, str]:
        """Resolve the URL to the correct protocol and name.
        :param url: url to resolve
        """
        protocol, name = url.split('://', 2)
        if protocol in SUPPORTED_PUSH_PULL_PROTOCOLS:
            protocol = cast(PUSH_PULL_PROTOCOL, protocol)
            return protocol, name
        else:
            raise ValueError(f'Unsupported protocol {protocol}')

    @classmethod
    def get_pushpull_backend(
        cls: Type[SelfPushPullMixin], protocol: PUSH_PULL_PROTOCOL
    ) -> Type['AbstractDocStore']:
        """
        Get the backend for the given protocol.

        :param protocol: the protocol to use, e.g. 'file', 's3'
        :return: the backend class
        """
        if protocol in cls.__backends__:
            return cls.__backends__[protocol]

        if protocol == 'file':
            from docarray.store.file import FileDocStore

            cls.__backends__[protocol] = FileDocStore
            logging.debug('Loaded Local Filesystem backend')
        elif protocol == 's3':
            from docarray.store.s3 import S3DocStore

            cls.__backends__[protocol] = S3DocStore
            logging.debug('Loaded S3 backend')
        else:
            raise NotImplementedError(f'protocol {protocol} not supported')

        return cls.__backends__[protocol]

    def push(
        self,
        url: str,
        show_progress: bool = False,
        **kwargs,
    ) -> Dict:
        """Push this `DocList` object to the specified url.

        :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
        :param show_progress: If true, a progress bar will be displayed.
        """
        logging.info(f'Pushing {len(self)} docs to {url}')
        protocol, name = self.__class__.resolve_url(url)
        return self.__class__.get_pushpull_backend(protocol).push(
            self, name, show_progress  # type: ignore
        )

    @classmethod
    def push_stream(
        cls: Type[SelfPushPullMixin],
        docs: Iterator['BaseDoc'],
        url: str,
        show_progress: bool = False,
    ) -> Dict:
        """Push a stream of documents to the specified url.

        :param docs: a stream of documents
        :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
        :param show_progress: If true, a progress bar will be displayed.
        """
        logging.info(f'Pushing stream to {url}')
        protocol, name = cls.resolve_url(url)
        return cls.get_pushpull_backend(protocol).push_stream(docs, name, show_progress)

    @classmethod
    def pull(
        cls: Type[SelfPushPullMixin],
        url: str,
        show_progress: bool = False,
        local_cache: bool = True,
    ) -> 'DocList':
        """Pull a `DocList` from the specified url.

        :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
        :param show_progress: if true, display a progress bar.
        :param local_cache: store the downloaded `DocList` to local folder
        :return: a `DocList` object
        """
        from docarray.base_doc import AnyDoc

        if cls.doc_type == AnyDoc:
            raise TypeError(
                'There is no document schema defined. '
                'Please specify the `DocList`\'s Document type using `DocList[MyDoc]`.'
            )

        logging.info(f'Pulling {url}')
        protocol, name = cls.resolve_url(url)
        return cls.get_pushpull_backend(protocol).pull(
            cls, name, show_progress, local_cache  # type: ignore
        )

    @classmethod
    def pull_stream(
        cls: Type[SelfPushPullMixin],
        url: str,
        show_progress: bool = False,
        local_cache: bool = False,
    ) -> Iterator['BaseDoc']:
        """Pull a stream of Documents from the specified url.

        :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
        :param show_progress: if true, display a progress bar.
        :param local_cache: store the downloaded `DocList` to local folder
        :return: Iterator of Documents
        """
        from docarray.base_doc import AnyDoc

        if cls.doc_type == AnyDoc:
            raise TypeError(
                'There is no document schema defined. '
                'Please specify the `DocList`\'s Document type using `DocList[MyDoc]`.'
            )

        logging.info(f'Pulling Document stream from {url}')
        protocol, name = cls.resolve_url(url)
        return cls.get_pushpull_backend(protocol).pull_stream(
            cls, name, show_progress, local_cache  # type: ignore
        )

get_pushpull_backend(protocol) classmethod

Get the backend for the given protocol.

Parameters:

Name Type Description Default
protocol PUSH_PULL_PROTOCOL

the protocol to use, e.g. 'file', 's3'

required

Returns:

Type Description
Type[AbstractDocStore]

the backend class

Source code in docarray/array/doc_list/pushpull.py
@classmethod
def get_pushpull_backend(
    cls: Type[SelfPushPullMixin], protocol: PUSH_PULL_PROTOCOL
) -> Type['AbstractDocStore']:
    """
    Get the backend for the given protocol.

    :param protocol: the protocol to use, e.g. 'file', 's3'
    :return: the backend class
    """
    if protocol in cls.__backends__:
        return cls.__backends__[protocol]

    if protocol == 'file':
        from docarray.store.file import FileDocStore

        cls.__backends__[protocol] = FileDocStore
        logging.debug('Loaded Local Filesystem backend')
    elif protocol == 's3':
        from docarray.store.s3 import S3DocStore

        cls.__backends__[protocol] = S3DocStore
        logging.debug('Loaded S3 backend')
    else:
        raise NotImplementedError(f'protocol {protocol} not supported')

    return cls.__backends__[protocol]

pull(url, show_progress=False, local_cache=True) classmethod

Pull a DocList from the specified url.

Parameters:

Name Type Description Default
url str

url specifying the protocol and save name of the DocList. Should be of the form protocol://namespace/name. e.g. s3://bucket/path/to/namespace/name, file:///path/to/folder/name

required
show_progress bool

if true, display a progress bar.

False
local_cache bool

store the downloaded DocList to local folder

True

Returns:

Type Description
DocList

a DocList object

Source code in docarray/array/doc_list/pushpull.py
@classmethod
def pull(
    cls: Type[SelfPushPullMixin],
    url: str,
    show_progress: bool = False,
    local_cache: bool = True,
) -> 'DocList':
    """Pull a `DocList` from the specified url.

    :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
    :param show_progress: if true, display a progress bar.
    :param local_cache: store the downloaded `DocList` to local folder
    :return: a `DocList` object
    """
    from docarray.base_doc import AnyDoc

    if cls.doc_type == AnyDoc:
        raise TypeError(
            'There is no document schema defined. '
            'Please specify the `DocList`\'s Document type using `DocList[MyDoc]`.'
        )

    logging.info(f'Pulling {url}')
    protocol, name = cls.resolve_url(url)
    return cls.get_pushpull_backend(protocol).pull(
        cls, name, show_progress, local_cache  # type: ignore
    )

pull_stream(url, show_progress=False, local_cache=False) classmethod

Pull a stream of Documents from the specified url.

Parameters:

Name Type Description Default
url str

url specifying the protocol and save name of the DocList. Should be of the form protocol://namespace/name. e.g. s3://bucket/path/to/namespace/name, file:///path/to/folder/name

required
show_progress bool

if true, display a progress bar.

False
local_cache bool

store the downloaded DocList to local folder

False

Returns:

Type Description
Iterator[BaseDoc]

Iterator of Documents

Source code in docarray/array/doc_list/pushpull.py
@classmethod
def pull_stream(
    cls: Type[SelfPushPullMixin],
    url: str,
    show_progress: bool = False,
    local_cache: bool = False,
) -> Iterator['BaseDoc']:
    """Pull a stream of Documents from the specified url.

    :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
    :param show_progress: if true, display a progress bar.
    :param local_cache: store the downloaded `DocList` to local folder
    :return: Iterator of Documents
    """
    from docarray.base_doc import AnyDoc

    if cls.doc_type == AnyDoc:
        raise TypeError(
            'There is no document schema defined. '
            'Please specify the `DocList`\'s Document type using `DocList[MyDoc]`.'
        )

    logging.info(f'Pulling Document stream from {url}')
    protocol, name = cls.resolve_url(url)
    return cls.get_pushpull_backend(protocol).pull_stream(
        cls, name, show_progress, local_cache  # type: ignore
    )

push(url, show_progress=False, **kwargs)

Push this DocList object to the specified url.

Parameters:

Name Type Description Default
url str

url specifying the protocol and save name of the DocList. Should be of the form protocol://namespace/name. e.g. s3://bucket/path/to/namespace/name, file:///path/to/folder/name

required
show_progress bool

If true, a progress bar will be displayed.

False
Source code in docarray/array/doc_list/pushpull.py
def push(
    self,
    url: str,
    show_progress: bool = False,
    **kwargs,
) -> Dict:
    """Push this `DocList` object to the specified url.

    :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
    :param show_progress: If true, a progress bar will be displayed.
    """
    logging.info(f'Pushing {len(self)} docs to {url}')
    protocol, name = self.__class__.resolve_url(url)
    return self.__class__.get_pushpull_backend(protocol).push(
        self, name, show_progress  # type: ignore
    )

push_stream(docs, url, show_progress=False) classmethod

Push a stream of documents to the specified url.

Parameters:

Name Type Description Default
docs Iterator[BaseDoc]

a stream of documents

required
url str

url specifying the protocol and save name of the DocList. Should be of the form protocol://namespace/name. e.g. s3://bucket/path/to/namespace/name, file:///path/to/folder/name

required
show_progress bool

If true, a progress bar will be displayed.

False
Source code in docarray/array/doc_list/pushpull.py
@classmethod
def push_stream(
    cls: Type[SelfPushPullMixin],
    docs: Iterator['BaseDoc'],
    url: str,
    show_progress: bool = False,
) -> Dict:
    """Push a stream of documents to the specified url.

    :param docs: a stream of documents
    :param url: url specifying the protocol and save name of the `DocList`. Should be of the form ``protocol://namespace/name``. e.g. ``s3://bucket/path/to/namespace/name``, ``file:///path/to/folder/name``
    :param show_progress: If true, a progress bar will be displayed.
    """
    logging.info(f'Pushing stream to {url}')
    protocol, name = cls.resolve_url(url)
    return cls.get_pushpull_backend(protocol).push_stream(docs, name, show_progress)

resolve_url(url) staticmethod

Resolve the URL to the correct protocol and name.

Parameters:

Name Type Description Default
url str

url to resolve

required
Source code in docarray/array/doc_list/pushpull.py
@staticmethod
def resolve_url(url: str) -> Tuple[PUSH_PULL_PROTOCOL, str]:
    """Resolve the URL to the correct protocol and name.
    :param url: url to resolve
    """
    protocol, name = url.split('://', 2)
    if protocol in SUPPORTED_PUSH_PULL_PROTOCOLS:
        protocol = cast(PUSH_PULL_PROTOCOL, protocol)
        return protocol, name
    else:
        raise ValueError(f'Unsupported protocol {protocol}')