Skip to content

BaseDoc

docarray.base_doc.doc.BaseDoc

Bases: BaseModel, IOMixin, UpdateMixin, BaseNode

BaseDoc is the base class for all Documents. This class should be subclassed to create new Document types with a specific schema.

The schema of a Document is defined by the fields of the class.

Example:

from docarray import BaseDoc
from docarray.typing import NdArray, ImageUrl
import numpy as np


class MyDoc(BaseDoc):
    embedding: NdArray[512]
    image: ImageUrl


doc = MyDoc(embedding=np.zeros(512), image='https://example.com/image.jpg')

BaseDoc is a subclass of pydantic.BaseModel and can be used in a similar way.

Source code in docarray/base_doc/doc.py
class BaseDoc(BaseModel, IOMixin, UpdateMixin, BaseNode):
    """
    BaseDoc is the base class for all Documents. This class should be subclassed
    to create new Document types with a specific schema.

    The schema of a Document is defined by the fields of the class.

    Example:
    ```python
    from docarray import BaseDoc
    from docarray.typing import NdArray, ImageUrl
    import numpy as np


    class MyDoc(BaseDoc):
        embedding: NdArray[512]
        image: ImageUrl


    doc = MyDoc(embedding=np.zeros(512), image='https://example.com/image.jpg')
    ```


    BaseDoc is a subclass of [pydantic.BaseModel](
    https://docs.pydantic.dev/usage/models/) and can be used in a similar way.
    """

    id: Optional[ID] = Field(default_factory=lambda: ID(os.urandom(16).hex()))

    class Config:
        json_loads = orjson.loads
        json_dumps = orjson_dumps_and_decode
        # `DocArrayResponse` is able to handle tensors by itself.
        # Therefore, we stop FastAPI from doing any transformations
        # on tensors by setting an identity function as a custom encoder.
        json_encoders = {AbstractTensor: lambda x: x}

        validate_assignment = True
        _load_extra_fields_from_protobuf = False

    @classmethod
    def from_view(cls: Type[T], storage_view: 'ColumnStorageView') -> T:
        doc = cls.__new__(cls)
        object.__setattr__(doc, '__dict__', storage_view)
        object.__setattr__(doc, '__fields_set__', set(storage_view.keys()))

        doc._init_private_attributes()
        return doc

    @classmethod
    def _get_field_type(cls, field: str) -> Type:
        """
        Accessing the nested python Class define in the schema. Could be useful for
        reconstruction of Document in serialization/deserilization
        :param field: name of the field
        :return:
        """
        return cls.__fields__[field].outer_type_

    def __str__(self) -> str:
        content: Any = None
        if self.is_view():
            attr_str = ", ".join(
                f"{field}={self.__getattr__(field)}" for field in self.__dict__.keys()
            )
            content = f"{self.__class__.__name__}({attr_str})"
        else:
            content = self

        with _console.capture() as capture:
            _console.print(content)

        return capture.get().strip()

    def summary(self) -> None:
        """Print non-empty fields and nested structure of this Document object."""
        from docarray.display.document_summary import DocumentSummary

        DocumentSummary(doc=self).summary()

    @classmethod
    def schema_summary(cls) -> None:
        """Print a summary of the Documents schema."""
        from docarray.display.document_summary import DocumentSummary

        DocumentSummary.schema_summary(cls)

    def _ipython_display_(self) -> None:
        """Displays the object in IPython as a summary"""
        self.summary()

    def is_view(self) -> bool:
        from docarray.array.doc_vec.column_storage import ColumnStorageView

        return isinstance(self.__dict__, ColumnStorageView)

    def __getattr__(self, item) -> Any:
        if item in self.__fields__.keys():
            return self.__dict__[item]
        else:
            return super().__getattribute__(item)

    def __setattr__(self, field, value) -> None:
        if not self.is_view():
            super().__setattr__(field, value)
        else:
            # here we first validate with pydantic
            # Then we apply the value to the remote dict,
            # and we change back the __dict__ value to the remote dict
            dict_ref = self.__dict__
            super().__setattr__(field, value)
            for key, val in self.__dict__.items():
                dict_ref[key] = val
            object.__setattr__(self, '__dict__', dict_ref)

    def __eq__(self, other) -> bool:
        if not isinstance(other, BaseDoc):
            return False

        if self.__fields__.keys() != other.__fields__.keys():
            return False

        for field_name in self.__fields__:
            value1 = getattr(self, field_name)
            value2 = getattr(other, field_name)

            if field_name == 'id':
                continue

            if isinstance(value1, AbstractTensor) and isinstance(
                value2, AbstractTensor
            ):
                comp_be1 = value1.get_comp_backend()
                comp_be2 = value2.get_comp_backend()

                if comp_be1.shape(value1) != comp_be2.shape(value2):
                    return False
                if (
                    not (comp_be1.to_numpy(value1) == comp_be2.to_numpy(value2))
                    .all()
                    .item()
                ):
                    return False
            else:
                if value1 != value2:
                    return False
        return True

    def __ne__(self, other) -> bool:
        return not (self == other)

    def _docarray_to_json_compatible(self) -> Dict:
        """
        Convert itself into a json compatible object
        :return: A dictionary of the BaseDoc object
        """
        return self.dict()

    ########################################################################################################################################################
    ### this section is just for documentation purposes will be removed later once
    # https://github.com/mkdocstrings/griffe/issues/138 is fixed ##############
    ########################################################################################################################################################

    def json(
        self,
        *,
        include: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None,
        exclude: ExcludeType = None,
        by_alias: bool = False,
        skip_defaults: Optional[bool] = None,
        exclude_unset: bool = False,
        exclude_defaults: bool = False,
        exclude_none: bool = False,
        encoder: Optional[Callable[[Any], Any]] = None,
        models_as_dict: bool = True,
        **dumps_kwargs: Any,
    ) -> str:
        """
        Generate a JSON representation of the model, `include` and `exclude`
        arguments as per `dict()`.

        `encoder` is an optional function to supply as `default` to json.dumps(),
        other arguments as per `json.dumps()`.
        """
        exclude, original_exclude, doclist_exclude_fields = self._exclude_doclist(
            exclude=exclude
        )

        # this is copy from pydantic code
        if skip_defaults is not None:
            warnings.warn(
                f'{self.__class__.__name__}.json(): "skip_defaults" is deprecated and replaced by "exclude_unset"',
                DeprecationWarning,
            )
            exclude_unset = skip_defaults
        encoder = cast(Callable[[Any], Any], encoder or self.__json_encoder__)

        # We don't directly call `self.dict()`, which does exactly this with `to_dict=True`
        # because we want to be able to keep raw `BaseModel` instances and not as `dict`.
        # This allows users to write custom JSON encoders for given `BaseModel` classes.
        data = dict(
            self._iter(
                to_dict=models_as_dict,
                by_alias=by_alias,
                include=include,
                exclude=exclude,
                exclude_unset=exclude_unset,
                exclude_defaults=exclude_defaults,
                exclude_none=exclude_none,
            )
        )

        # this is the custom part to deal with DocList
        for field in doclist_exclude_fields:
            # we need to do this because pydantic will not recognize DocList correctly
            original_exclude = original_exclude or {}
            if field not in original_exclude:
                data[field] = getattr(
                    self, field
                )  # here we need to keep doclist as doclist otherwise if a user want to have a special json config it will not work

        # this is copy from pydantic code
        if self.__custom_root_type__:
            data = data[ROOT_KEY]
        return self.__config__.json_dumps(data, default=encoder, **dumps_kwargs)

    @no_type_check
    @classmethod
    def parse_raw(
        cls: Type[T],
        b: 'StrBytes',
        *,
        content_type: str = None,
        encoding: str = 'utf8',
        proto: 'Protocol' = None,
        allow_pickle: bool = False,
    ) -> T:
        """
        Parse a raw string or bytes into a base doc
        :param b:
        :param content_type:
        :param encoding: the encoding to use when parsing a string, defaults to 'utf8'
        :param proto: protocol to use.
        :param allow_pickle: allow pickle protocol
        :return: a document
        """
        return super(BaseDoc, cls).parse_raw(
            b,
            content_type=content_type,
            encoding=encoding,
            proto=proto,
            allow_pickle=allow_pickle,
        )

    def dict(
        self,
        *,
        include: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None,
        exclude: ExcludeType = None,
        by_alias: bool = False,
        skip_defaults: Optional[bool] = None,
        exclude_unset: bool = False,
        exclude_defaults: bool = False,
        exclude_none: bool = False,
    ) -> 'DictStrAny':
        """
        Generate a dictionary representation of the model, optionally specifying
        which fields to include or exclude.

        """

        exclude, original_exclude, doclist_exclude_fields = self._exclude_doclist(
            exclude=exclude
        )

        data = super().dict(
            include=include,
            exclude=exclude,
            by_alias=by_alias,
            skip_defaults=skip_defaults,
            exclude_unset=exclude_unset,
            exclude_defaults=exclude_defaults,
            exclude_none=exclude_none,
        )

        for field in doclist_exclude_fields:
            # we need to do this because pydantic will not recognize DocList correctly
            original_exclude = original_exclude or {}
            if field not in original_exclude:
                val = getattr(self, field)
                data[field] = [doc.dict() for doc in val] if val is not None else None

        return data

    def _exclude_doclist(
        self, exclude: ExcludeType
    ) -> Tuple[ExcludeType, ExcludeType, List[str]]:
        doclist_exclude_fields = []
        for field in self.__fields__.keys():
            from docarray import DocList

            type_ = self._get_field_type(field)
            if isinstance(type_, type) and issubclass(type_, DocList):
                doclist_exclude_fields.append(field)

        original_exclude = exclude
        if exclude is None:
            exclude = set(doclist_exclude_fields)
        elif isinstance(exclude, AbstractSet):
            exclude = set([*exclude, *doclist_exclude_fields])
        elif isinstance(exclude, Mapping):
            exclude = dict(**exclude)
            exclude.update({field: ... for field in doclist_exclude_fields})

        return (
            exclude,
            original_exclude,
            doclist_exclude_fields,
        )

    to_json = json

dict(*, include=None, exclude=None, by_alias=False, skip_defaults=None, exclude_unset=False, exclude_defaults=False, exclude_none=False)

Generate a dictionary representation of the model, optionally specifying which fields to include or exclude.

Source code in docarray/base_doc/doc.py
def dict(
    self,
    *,
    include: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None,
    exclude: ExcludeType = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
) -> 'DictStrAny':
    """
    Generate a dictionary representation of the model, optionally specifying
    which fields to include or exclude.

    """

    exclude, original_exclude, doclist_exclude_fields = self._exclude_doclist(
        exclude=exclude
    )

    data = super().dict(
        include=include,
        exclude=exclude,
        by_alias=by_alias,
        skip_defaults=skip_defaults,
        exclude_unset=exclude_unset,
        exclude_defaults=exclude_defaults,
        exclude_none=exclude_none,
    )

    for field in doclist_exclude_fields:
        # we need to do this because pydantic will not recognize DocList correctly
        original_exclude = original_exclude or {}
        if field not in original_exclude:
            val = getattr(self, field)
            data[field] = [doc.dict() for doc in val] if val is not None else None

    return data

json(*, include=None, exclude=None, by_alias=False, skip_defaults=None, exclude_unset=False, exclude_defaults=False, exclude_none=False, encoder=None, models_as_dict=True, **dumps_kwargs)

Generate a JSON representation of the model, include and exclude arguments as per dict().

encoder is an optional function to supply as default to json.dumps(), other arguments as per json.dumps().

Source code in docarray/base_doc/doc.py
def json(
    self,
    *,
    include: Optional[Union['AbstractSetIntStr', 'MappingIntStrAny']] = None,
    exclude: ExcludeType = None,
    by_alias: bool = False,
    skip_defaults: Optional[bool] = None,
    exclude_unset: bool = False,
    exclude_defaults: bool = False,
    exclude_none: bool = False,
    encoder: Optional[Callable[[Any], Any]] = None,
    models_as_dict: bool = True,
    **dumps_kwargs: Any,
) -> str:
    """
    Generate a JSON representation of the model, `include` and `exclude`
    arguments as per `dict()`.

    `encoder` is an optional function to supply as `default` to json.dumps(),
    other arguments as per `json.dumps()`.
    """
    exclude, original_exclude, doclist_exclude_fields = self._exclude_doclist(
        exclude=exclude
    )

    # this is copy from pydantic code
    if skip_defaults is not None:
        warnings.warn(
            f'{self.__class__.__name__}.json(): "skip_defaults" is deprecated and replaced by "exclude_unset"',
            DeprecationWarning,
        )
        exclude_unset = skip_defaults
    encoder = cast(Callable[[Any], Any], encoder or self.__json_encoder__)

    # We don't directly call `self.dict()`, which does exactly this with `to_dict=True`
    # because we want to be able to keep raw `BaseModel` instances and not as `dict`.
    # This allows users to write custom JSON encoders for given `BaseModel` classes.
    data = dict(
        self._iter(
            to_dict=models_as_dict,
            by_alias=by_alias,
            include=include,
            exclude=exclude,
            exclude_unset=exclude_unset,
            exclude_defaults=exclude_defaults,
            exclude_none=exclude_none,
        )
    )

    # this is the custom part to deal with DocList
    for field in doclist_exclude_fields:
        # we need to do this because pydantic will not recognize DocList correctly
        original_exclude = original_exclude or {}
        if field not in original_exclude:
            data[field] = getattr(
                self, field
            )  # here we need to keep doclist as doclist otherwise if a user want to have a special json config it will not work

    # this is copy from pydantic code
    if self.__custom_root_type__:
        data = data[ROOT_KEY]
    return self.__config__.json_dumps(data, default=encoder, **dumps_kwargs)

parse_raw(b, *, content_type=None, encoding='utf8', proto=None, allow_pickle=False) classmethod

Parse a raw string or bytes into a base doc

Parameters:

Name Type Description Default
b StrBytes required
content_type str None
encoding str

the encoding to use when parsing a string, defaults to 'utf8'

'utf8'
proto Protocol

protocol to use.

None
allow_pickle bool

allow pickle protocol

False

Returns:

Type Description
T

a document

Source code in docarray/base_doc/doc.py
@no_type_check
@classmethod
def parse_raw(
    cls: Type[T],
    b: 'StrBytes',
    *,
    content_type: str = None,
    encoding: str = 'utf8',
    proto: 'Protocol' = None,
    allow_pickle: bool = False,
) -> T:
    """
    Parse a raw string or bytes into a base doc
    :param b:
    :param content_type:
    :param encoding: the encoding to use when parsing a string, defaults to 'utf8'
    :param proto: protocol to use.
    :param allow_pickle: allow pickle protocol
    :return: a document
    """
    return super(BaseDoc, cls).parse_raw(
        b,
        content_type=content_type,
        encoding=encoding,
        proto=proto,
        allow_pickle=allow_pickle,
    )

schema_summary() classmethod

Print a summary of the Documents schema.

Source code in docarray/base_doc/doc.py
@classmethod
def schema_summary(cls) -> None:
    """Print a summary of the Documents schema."""
    from docarray.display.document_summary import DocumentSummary

    DocumentSummary.schema_summary(cls)

summary()

Print non-empty fields and nested structure of this Document object.

Source code in docarray/base_doc/doc.py
def summary(self) -> None:
    """Print non-empty fields and nested structure of this Document object."""
    from docarray.display.document_summary import DocumentSummary

    DocumentSummary(doc=self).summary()

docarray.base_doc.mixins.io.IOMixin

Bases: Iterable[Tuple[str, Any]]

IOMixin to define all the bytes/protobuf/json related part of BaseDoc

Source code in docarray/base_doc/mixins/io.py
class IOMixin(Iterable[Tuple[str, Any]]):
    """
    IOMixin to define all the bytes/protobuf/json related part of BaseDoc
    """

    __fields__: Dict[str, 'ModelField']

    class Config:
        _load_extra_fields_from_protobuf: bool

    @classmethod
    @abstractmethod
    def _get_field_type(cls, field: str) -> Type:
        ...

    @classmethod
    def _get_field_type_array(cls, field: str) -> Type:
        return cls._get_field_type(field)

    def __bytes__(self) -> bytes:
        return self.to_bytes()

    def to_bytes(
        self, protocol: str = 'protobuf', compress: Optional[str] = None
    ) -> bytes:
        """Serialize itself into bytes.

        For more Pythonic code, please use ``bytes(...)``.

        :param protocol: protocol to use. It can be 'pickle' or 'protobuf'
        :param compress: compression algorithm to use
        :return: the binary serialization in bytes
        """
        import pickle

        if protocol == 'pickle':
            bstr = pickle.dumps(self)
        elif protocol == 'protobuf':
            bstr = self.to_protobuf().SerializePartialToString()
        else:
            raise ValueError(
                f'protocol={protocol} is not supported. Can be only `protobuf` or '
                f'pickle protocols 0-5.'
            )
        return _compress_bytes(bstr, algorithm=compress)

    @classmethod
    def from_bytes(
        cls: Type[T],
        data: bytes,
        protocol: str = 'protobuf',
        compress: Optional[str] = None,
    ) -> T:
        """Build Document object from binary bytes

        :param data: binary bytes
        :param protocol: protocol to use. It can be 'pickle' or 'protobuf'
        :param compress: compress method to use
        :return: a Document object
        """
        bstr = _decompress_bytes(data, algorithm=compress)
        if protocol == 'pickle':
            return pickle.loads(bstr)
        elif protocol == 'protobuf':
            from docarray.proto import DocProto

            pb_msg = DocProto()
            pb_msg.ParseFromString(bstr)
            return cls.from_protobuf(pb_msg)
        else:
            raise ValueError(
                f'protocol={protocol} is not supported. Can be only `protobuf` or '
                f'pickle protocols 0-5.'
            )

    def to_base64(
        self, protocol: str = 'protobuf', compress: Optional[str] = None
    ) -> str:
        """Serialize a Document object into as base64 string

        :param protocol: protocol to use. It can be 'pickle' or 'protobuf'
        :param compress: compress method to use
        :return: a base64 encoded string
        """
        return base64.b64encode(self.to_bytes(protocol, compress)).decode('utf-8')

    @classmethod
    def from_base64(
        cls: Type[T],
        data: str,
        protocol: str = 'pickle',
        compress: Optional[str] = None,
    ) -> T:
        """Build Document object from binary bytes

        :param data: a base64 encoded string
        :param protocol: protocol to use. It can be 'pickle' or 'protobuf'
        :param compress: compress method to use
        :return: a Document object
        """
        return cls.from_bytes(base64.b64decode(data), protocol, compress)

    @classmethod
    def from_protobuf(cls: Type[T], pb_msg: 'DocProto') -> T:
        """create a Document from a protobuf message

        :param pb_msg: the proto message of the Document
        :return: a Document initialize with the proto data
        """

        fields: Dict[str, Any] = {}

        for field_name in pb_msg.data:
            if (
                not (cls.Config._load_extra_fields_from_protobuf)
                and field_name not in cls.__fields__.keys()
            ):
                continue  # optimization we don't even load the data if the key does not
                # match any field in the cls or in the mapping

            fields[field_name] = cls._get_content_from_node_proto(
                pb_msg.data[field_name], field_name
            )

        return cls(**fields)

    @classmethod
    def _get_content_from_node_proto(
        cls,
        value: 'NodeProto',
        field_name: Optional[str] = None,
        field_type: Optional[Type] = None,
    ) -> Any:
        """
        load the proto data from a node proto

        :param value: the proto node value
        :param field_name: the name of the field
        :return: the loaded field
        """

        if field_name is not None and field_type is not None:
            raise ValueError("field_type and field_name cannot be both passed")

        field_type = field_type or (
            cls._get_field_type(field_name) if field_name else None
        )

        content_type_dict = _PROTO_TYPE_NAME_TO_CLASS

        content_key = value.WhichOneof('content')
        docarray_type = (
            value.type if value.WhichOneof('docarray_type') is not None else None
        )

        return_field: Any

        if docarray_type in content_type_dict:
            return_field = content_type_dict[docarray_type].from_protobuf(
                getattr(value, content_key)
            )
        elif content_key == 'doc':
            if field_type is None:
                raise ValueError(
                    'field_type cannot be None when trying to deserialize a BaseDoc'
                )
            return_field = field_type.from_protobuf(
                getattr(value, content_key)
            )  # we get to the parent class
        elif content_key == 'doc_array':
            if field_name is None:
                raise ValueError(
                    'field_name cannot be None when trying to deserialize a BaseDoc'
                )
            return_field = cls._get_field_type_array(field_name).from_protobuf(
                getattr(value, content_key)
            )  # we get to the parent class
        elif content_key is None:
            return_field = None
        elif docarray_type is None:
            arg_to_container: Dict[str, Callable] = {
                'list': list,
                'set': set,
                'tuple': tuple,
            }

            if content_key in ['text', 'blob', 'integer', 'float', 'boolean']:
                return_field = getattr(value, content_key)

            elif content_key in arg_to_container.keys():
                field_type = (
                    cls.__fields__[field_name].type_
                    if field_name and field_name in cls.__fields__
                    else None
                )
                return_field = arg_to_container[content_key](
                    cls._get_content_from_node_proto(node, field_type=field_type)
                    for node in getattr(value, content_key).data
                )

            elif content_key == 'dict':
                deser_dict: Dict[str, Any] = dict()
                field_type = (
                    cls.__fields__[field_name].type_
                    if field_name and field_name in cls.__fields__
                    else None
                )
                for key_name, node in value.dict.data.items():
                    deser_dict[key_name] = cls._get_content_from_node_proto(
                        node, field_type=field_type
                    )
                return_field = deser_dict
            else:
                raise ValueError(
                    f'key {content_key} is not supported for deserialization'
                )

        else:
            raise ValueError(
                f'type {docarray_type}, with key {content_key} is not supported for'
                f' deserialization'
            )

        return return_field

    def to_protobuf(self: T) -> 'DocProto':
        """Convert Document into a Protobuf message.

        :return: the protobuf message
        """
        from docarray.proto import DocProto

        data = {}
        for field, value in self:
            try:
                data[field] = _type_to_protobuf(value)
            except RecursionError as ex:
                if len(ex.args) >= 1:
                    ex.args = (
                        (
                            f'Field `{field}` contains cyclic reference in memory. '
                            'Could it be your Document is referring to itself?'
                        ),
                    )
                raise ex
            except Exception as ex:
                if len(ex.args) >= 1:
                    ex.args = (f'Field `{field}` is problematic',) + ex.args
                raise ex

        return DocProto(data=data)

    def _to_node_protobuf(self) -> 'NodeProto':
        from docarray.proto import NodeProto

        """Convert Document into a NodeProto protobuf message. This function should be
        called when the Document is nest into another Document that need to be
        converted into a protobuf

        :return: the nested item protobuf message
        """
        return NodeProto(doc=self.to_protobuf())

    @classmethod
    def _get_access_paths(cls) -> List[str]:
        """
        Get "__"-separated access paths of all fields, including nested ones.

        :return: list of all access paths
        """
        from docarray import BaseDoc

        paths = []
        for field in cls.__fields__.keys():
            field_type = cls._get_field_type(field)
            if not is_union_type(field_type) and safe_issubclass(field_type, BaseDoc):
                sub_paths = field_type._get_access_paths()
                for path in sub_paths:
                    paths.append(f'{field}__{path}')
            else:
                paths.append(field)
        return paths

from_base64(data, protocol='pickle', compress=None) classmethod

Build Document object from binary bytes

Parameters:

Name Type Description Default
data str

a base64 encoded string

required
protocol str

protocol to use. It can be 'pickle' or 'protobuf'

'pickle'
compress Optional[str]

compress method to use

None

Returns:

Type Description
T

a Document object

Source code in docarray/base_doc/mixins/io.py
@classmethod
def from_base64(
    cls: Type[T],
    data: str,
    protocol: str = 'pickle',
    compress: Optional[str] = None,
) -> T:
    """Build Document object from binary bytes

    :param data: a base64 encoded string
    :param protocol: protocol to use. It can be 'pickle' or 'protobuf'
    :param compress: compress method to use
    :return: a Document object
    """
    return cls.from_bytes(base64.b64decode(data), protocol, compress)

from_bytes(data, protocol='protobuf', compress=None) classmethod

Build Document object from binary bytes

Parameters:

Name Type Description Default
data bytes

binary bytes

required
protocol str

protocol to use. It can be 'pickle' or 'protobuf'

'protobuf'
compress Optional[str]

compress method to use

None

Returns:

Type Description
T

a Document object

Source code in docarray/base_doc/mixins/io.py
@classmethod
def from_bytes(
    cls: Type[T],
    data: bytes,
    protocol: str = 'protobuf',
    compress: Optional[str] = None,
) -> T:
    """Build Document object from binary bytes

    :param data: binary bytes
    :param protocol: protocol to use. It can be 'pickle' or 'protobuf'
    :param compress: compress method to use
    :return: a Document object
    """
    bstr = _decompress_bytes(data, algorithm=compress)
    if protocol == 'pickle':
        return pickle.loads(bstr)
    elif protocol == 'protobuf':
        from docarray.proto import DocProto

        pb_msg = DocProto()
        pb_msg.ParseFromString(bstr)
        return cls.from_protobuf(pb_msg)
    else:
        raise ValueError(
            f'protocol={protocol} is not supported. Can be only `protobuf` or '
            f'pickle protocols 0-5.'
        )

from_protobuf(pb_msg) classmethod

create a Document from a protobuf message

Parameters:

Name Type Description Default
pb_msg DocProto

the proto message of the Document

required

Returns:

Type Description
T

a Document initialize with the proto data

Source code in docarray/base_doc/mixins/io.py
@classmethod
def from_protobuf(cls: Type[T], pb_msg: 'DocProto') -> T:
    """create a Document from a protobuf message

    :param pb_msg: the proto message of the Document
    :return: a Document initialize with the proto data
    """

    fields: Dict[str, Any] = {}

    for field_name in pb_msg.data:
        if (
            not (cls.Config._load_extra_fields_from_protobuf)
            and field_name not in cls.__fields__.keys()
        ):
            continue  # optimization we don't even load the data if the key does not
            # match any field in the cls or in the mapping

        fields[field_name] = cls._get_content_from_node_proto(
            pb_msg.data[field_name], field_name
        )

    return cls(**fields)

to_base64(protocol='protobuf', compress=None)

Serialize a Document object into as base64 string

Parameters:

Name Type Description Default
protocol str

protocol to use. It can be 'pickle' or 'protobuf'

'protobuf'
compress Optional[str]

compress method to use

None

Returns:

Type Description
str

a base64 encoded string

Source code in docarray/base_doc/mixins/io.py
def to_base64(
    self, protocol: str = 'protobuf', compress: Optional[str] = None
) -> str:
    """Serialize a Document object into as base64 string

    :param protocol: protocol to use. It can be 'pickle' or 'protobuf'
    :param compress: compress method to use
    :return: a base64 encoded string
    """
    return base64.b64encode(self.to_bytes(protocol, compress)).decode('utf-8')

to_bytes(protocol='protobuf', compress=None)

Serialize itself into bytes.

For more Pythonic code, please use bytes(...).

Parameters:

Name Type Description Default
protocol str

protocol to use. It can be 'pickle' or 'protobuf'

'protobuf'
compress Optional[str]

compression algorithm to use

None

Returns:

Type Description
bytes

the binary serialization in bytes

Source code in docarray/base_doc/mixins/io.py
def to_bytes(
    self, protocol: str = 'protobuf', compress: Optional[str] = None
) -> bytes:
    """Serialize itself into bytes.

    For more Pythonic code, please use ``bytes(...)``.

    :param protocol: protocol to use. It can be 'pickle' or 'protobuf'
    :param compress: compression algorithm to use
    :return: the binary serialization in bytes
    """
    import pickle

    if protocol == 'pickle':
        bstr = pickle.dumps(self)
    elif protocol == 'protobuf':
        bstr = self.to_protobuf().SerializePartialToString()
    else:
        raise ValueError(
            f'protocol={protocol} is not supported. Can be only `protobuf` or '
            f'pickle protocols 0-5.'
        )
    return _compress_bytes(bstr, algorithm=compress)

to_protobuf()

Convert Document into a Protobuf message.

Returns:

Type Description
DocProto

the protobuf message

Source code in docarray/base_doc/mixins/io.py
def to_protobuf(self: T) -> 'DocProto':
    """Convert Document into a Protobuf message.

    :return: the protobuf message
    """
    from docarray.proto import DocProto

    data = {}
    for field, value in self:
        try:
            data[field] = _type_to_protobuf(value)
        except RecursionError as ex:
            if len(ex.args) >= 1:
                ex.args = (
                    (
                        f'Field `{field}` contains cyclic reference in memory. '
                        'Could it be your Document is referring to itself?'
                    ),
                )
            raise ex
        except Exception as ex:
            if len(ex.args) >= 1:
                ex.args = (f'Field `{field}` is problematic',) + ex.args
            raise ex

    return DocProto(data=data)

docarray.base_doc.mixins.update.UpdateMixin

Source code in docarray/base_doc/mixins/update.py
class UpdateMixin:
    __fields__: Dict[str, 'ModelField']

    def _get_string_for_regex_filter(self):
        return str(self)

    @classmethod
    @abstractmethod
    def _get_field_type(cls, field: str) -> Type['UpdateMixin']:
        ...

    def update(self, other: T):
        """
        Updates self with the content of other. Changes are applied to self.
        Updating one Document with another consists in the following:

         - Setting data properties of the second Document to the first Document
         if they are not None
         - Concatenating lists and updating sets
         - Updating recursively Documents and DocLists
         - Updating Dictionaries of the left with the right

        It behaves as an update operation for Dictionaries, except that since
        it is applied to a static schema type, the presence of the field is
        given by the field not having a None value and that DocLists,
        lists and sets are concatenated. It is worth mentioning that Tuples
        are not merged together since they are meant to be immutable,
        so they behave as regular types and the value of `self` is updated
        with the value of `other`.


        ---

        ```python
        from typing import List, Optional

        from docarray import BaseDoc


        class MyDocument(BaseDoc):
            content: str
            title: Optional[str] = None
            tags_: List


        doc1 = MyDocument(
            content='Core content of the document', title='Title', tags_=['python', 'AI']
        )
        doc2 = MyDocument(content='Core content updated', tags_=['docarray'])

        doc1.update(doc2)
        assert doc1.content == 'Core content updated'
        assert doc1.title == 'Title'
        assert doc1.tags_ == ['python', 'AI', 'docarray']
        ```

        ---
        :param other: The Document with which to update the contents of this
        """
        if type(self) != type(other):
            raise Exception(
                f'Update operation can only be applied to '
                f'Documents of the same type. '
                f'Trying to update Document of type '
                f'{type(self)} with Document of type '
                f'{type(other)}'
            )
        from collections import namedtuple

        from docarray import DocList
        from docarray.utils.reduce import reduce

        # Declaring namedtuple()
        _FieldGroups = namedtuple(
            '_FieldGroups',
            [
                'simple_non_empty_fields',
                'list_fields',
                'set_fields',
                'dict_fields',
                'nested_docarray_fields',
                'nested_docs_fields',
            ],
        )

        FORBIDDEN_FIELDS_TO_UPDATE = ['ID']

        def _group_fields(doc: 'UpdateMixin') -> _FieldGroups:
            simple_non_empty_fields: List[str] = []
            list_fields: List[str] = []
            set_fields: List[str] = []
            dict_fields: List[str] = []
            nested_docs_fields: List[str] = []
            nested_docarray_fields: List[str] = []

            for field_name, field in doc.__fields__.items():
                if field_name not in FORBIDDEN_FIELDS_TO_UPDATE:
                    field_type = doc._get_field_type(field_name)

                    if isinstance(field_type, type) and issubclass(field_type, DocList):
                        nested_docarray_fields.append(field_name)
                    else:
                        origin = get_origin(field_type)
                        if origin is list:
                            list_fields.append(field_name)
                        elif origin is set:
                            set_fields.append(field_name)
                        elif origin is dict:
                            dict_fields.append(field_name)
                        else:
                            v = getattr(doc, field_name)
                            if v:
                                if isinstance(v, UpdateMixin):
                                    nested_docs_fields.append(field_name)
                                else:
                                    simple_non_empty_fields.append(field_name)
            return _FieldGroups(
                simple_non_empty_fields,
                list_fields,
                set_fields,
                dict_fields,
                nested_docarray_fields,
                nested_docs_fields,
            )

        doc1_fields = _group_fields(self)
        doc2_fields = _group_fields(other)

        for field in doc2_fields.simple_non_empty_fields:
            setattr(self, field, getattr(other, field))

        for field in set(
            doc1_fields.nested_docs_fields + doc2_fields.nested_docs_fields
        ):
            sub_doc_1: T = getattr(self, field)
            sub_doc_2: T = getattr(other, field)
            sub_doc_1.update(sub_doc_2)
            setattr(self, field, sub_doc_1)

        for field in set(doc1_fields.list_fields + doc2_fields.list_fields):
            array1 = getattr(self, field)
            array2 = getattr(other, field)
            if array1 is None and array2 is not None:
                setattr(self, field, array2)
            elif array1 is not None and array2 is not None:
                array1.extend(array2)
                setattr(self, field, array1)

        for field in set(doc1_fields.set_fields + doc2_fields.set_fields):
            array1 = getattr(self, field)
            array2 = getattr(other, field)
            if array1 is None and array2 is not None:
                setattr(self, field, array2)
            elif array1 is not None and array2 is not None:
                array1.update(array2)
                setattr(self, field, array1)

        for field in set(
            doc1_fields.nested_docarray_fields + doc2_fields.nested_docarray_fields
        ):
            array1 = getattr(self, field)
            array2 = getattr(other, field)
            if array1 is None and array2 is not None:
                setattr(self, field, array2)
            elif array1 is not None and array2 is not None:
                array1 = reduce(array1, array2)
                setattr(self, field, array1)

        for field in set(doc1_fields.dict_fields + doc2_fields.dict_fields):
            dict1 = getattr(self, field)
            dict2 = getattr(other, field)
            if dict1 is None and dict2 is not None:
                setattr(self, field, dict2)
            elif dict1 is not None and dict2 is not None:
                dict1.update(dict2)
                setattr(self, field, dict1)

update(other)

Updates self with the content of other. Changes are applied to self. Updating one Document with another consists in the following:

  • Setting data properties of the second Document to the first Document if they are not None
  • Concatenating lists and updating sets
  • Updating recursively Documents and DocLists
  • Updating Dictionaries of the left with the right

It behaves as an update operation for Dictionaries, except that since it is applied to a static schema type, the presence of the field is given by the field not having a None value and that DocLists, lists and sets are concatenated. It is worth mentioning that Tuples are not merged together since they are meant to be immutable, so they behave as regular types and the value of self is updated with the value of other.


from typing import List, Optional

from docarray import BaseDoc


class MyDocument(BaseDoc):
    content: str
    title: Optional[str] = None
    tags_: List


doc1 = MyDocument(
    content='Core content of the document', title='Title', tags_=['python', 'AI']
)
doc2 = MyDocument(content='Core content updated', tags_=['docarray'])

doc1.update(doc2)
assert doc1.content == 'Core content updated'
assert doc1.title == 'Title'
assert doc1.tags_ == ['python', 'AI', 'docarray']

Parameters:

Name Type Description Default
other T

The Document with which to update the contents of this

required
Source code in docarray/base_doc/mixins/update.py
def update(self, other: T):
    """
    Updates self with the content of other. Changes are applied to self.
    Updating one Document with another consists in the following:

     - Setting data properties of the second Document to the first Document
     if they are not None
     - Concatenating lists and updating sets
     - Updating recursively Documents and DocLists
     - Updating Dictionaries of the left with the right

    It behaves as an update operation for Dictionaries, except that since
    it is applied to a static schema type, the presence of the field is
    given by the field not having a None value and that DocLists,
    lists and sets are concatenated. It is worth mentioning that Tuples
    are not merged together since they are meant to be immutable,
    so they behave as regular types and the value of `self` is updated
    with the value of `other`.


    ---

    ```python
    from typing import List, Optional

    from docarray import BaseDoc


    class MyDocument(BaseDoc):
        content: str
        title: Optional[str] = None
        tags_: List


    doc1 = MyDocument(
        content='Core content of the document', title='Title', tags_=['python', 'AI']
    )
    doc2 = MyDocument(content='Core content updated', tags_=['docarray'])

    doc1.update(doc2)
    assert doc1.content == 'Core content updated'
    assert doc1.title == 'Title'
    assert doc1.tags_ == ['python', 'AI', 'docarray']
    ```

    ---
    :param other: The Document with which to update the contents of this
    """
    if type(self) != type(other):
        raise Exception(
            f'Update operation can only be applied to '
            f'Documents of the same type. '
            f'Trying to update Document of type '
            f'{type(self)} with Document of type '
            f'{type(other)}'
        )
    from collections import namedtuple

    from docarray import DocList
    from docarray.utils.reduce import reduce

    # Declaring namedtuple()
    _FieldGroups = namedtuple(
        '_FieldGroups',
        [
            'simple_non_empty_fields',
            'list_fields',
            'set_fields',
            'dict_fields',
            'nested_docarray_fields',
            'nested_docs_fields',
        ],
    )

    FORBIDDEN_FIELDS_TO_UPDATE = ['ID']

    def _group_fields(doc: 'UpdateMixin') -> _FieldGroups:
        simple_non_empty_fields: List[str] = []
        list_fields: List[str] = []
        set_fields: List[str] = []
        dict_fields: List[str] = []
        nested_docs_fields: List[str] = []
        nested_docarray_fields: List[str] = []

        for field_name, field in doc.__fields__.items():
            if field_name not in FORBIDDEN_FIELDS_TO_UPDATE:
                field_type = doc._get_field_type(field_name)

                if isinstance(field_type, type) and issubclass(field_type, DocList):
                    nested_docarray_fields.append(field_name)
                else:
                    origin = get_origin(field_type)
                    if origin is list:
                        list_fields.append(field_name)
                    elif origin is set:
                        set_fields.append(field_name)
                    elif origin is dict:
                        dict_fields.append(field_name)
                    else:
                        v = getattr(doc, field_name)
                        if v:
                            if isinstance(v, UpdateMixin):
                                nested_docs_fields.append(field_name)
                            else:
                                simple_non_empty_fields.append(field_name)
        return _FieldGroups(
            simple_non_empty_fields,
            list_fields,
            set_fields,
            dict_fields,
            nested_docarray_fields,
            nested_docs_fields,
        )

    doc1_fields = _group_fields(self)
    doc2_fields = _group_fields(other)

    for field in doc2_fields.simple_non_empty_fields:
        setattr(self, field, getattr(other, field))

    for field in set(
        doc1_fields.nested_docs_fields + doc2_fields.nested_docs_fields
    ):
        sub_doc_1: T = getattr(self, field)
        sub_doc_2: T = getattr(other, field)
        sub_doc_1.update(sub_doc_2)
        setattr(self, field, sub_doc_1)

    for field in set(doc1_fields.list_fields + doc2_fields.list_fields):
        array1 = getattr(self, field)
        array2 = getattr(other, field)
        if array1 is None and array2 is not None:
            setattr(self, field, array2)
        elif array1 is not None and array2 is not None:
            array1.extend(array2)
            setattr(self, field, array1)

    for field in set(doc1_fields.set_fields + doc2_fields.set_fields):
        array1 = getattr(self, field)
        array2 = getattr(other, field)
        if array1 is None and array2 is not None:
            setattr(self, field, array2)
        elif array1 is not None and array2 is not None:
            array1.update(array2)
            setattr(self, field, array1)

    for field in set(
        doc1_fields.nested_docarray_fields + doc2_fields.nested_docarray_fields
    ):
        array1 = getattr(self, field)
        array2 = getattr(other, field)
        if array1 is None and array2 is not None:
            setattr(self, field, array2)
        elif array1 is not None and array2 is not None:
            array1 = reduce(array1, array2)
            setattr(self, field, array1)

    for field in set(doc1_fields.dict_fields + doc2_fields.dict_fields):
        dict1 = getattr(self, field)
        dict2 = getattr(other, field)
        if dict1 is None and dict2 is not None:
            setattr(self, field, dict2)
        elif dict1 is not None and dict2 is not None:
            dict1.update(dict2)
            setattr(self, field, dict1)