Skip to content

ElasticV7DocIndex

docarray.index.backends.elasticv7.ElasticV7DocIndex

Bases: ElasticDocIndex

Source code in docarray/index/backends/elasticv7.py
class ElasticV7DocIndex(ElasticDocIndex):
    _index_vector_params: Optional[Tuple[str]] = ('dims',)
    _index_vector_options: Optional[Tuple[str]] = None

    def __init__(self, db_config=None, **kwargs):
        """Initialize ElasticV7DocIndex"""
        from elasticsearch import __version__ as __es__version__

        if __es__version__[0] > 7:
            raise ImportError(
                'ElasticV7DocIndex requires the elasticsearch library to be version 7.10.1'
            )

        super().__init__(db_config, **kwargs)

    ###############################################
    # Inner classes for query builder and configs #
    ###############################################

    class QueryBuilder(ElasticDocIndex.QueryBuilder):
        def build(self, *args, **kwargs) -> Any:
            """Build the elastic search v7 query object."""
            if (
                'script_score' in self._query['query']
                and 'bool' in self._query['query']
                and len(self._query['query']['bool']) > 0
            ):
                self._query['query']['script_score']['query'] = {}
                self._query['query']['script_score']['query']['bool'] = self._query[
                    'query'
                ]['bool']
                del self._query['query']['bool']

            return self._query

        def find(
            self,
            query: Union[AnyTensor, BaseDoc],
            search_field: str = 'embedding',
            limit: int = 10,
            num_candidates: Optional[int] = None,
        ):
            """
            Find k-nearest neighbors of the query.

            :param query: query vector for KNN/ANN search. Has single axis.
            :param search_field: name of the field to search on
            :param limit: maximum number of documents to return per query
            :return: self
            """
            if num_candidates:
                warnings.warn('`num_candidates` is not supported in ElasticV7DocIndex')

            if isinstance(query, BaseDoc):
                query_vec = BaseDocIndex._get_values_by_column([query], search_field)[0]
            else:
                query_vec = query
            query_vec_np = BaseDocIndex._to_numpy(self._outer_instance, query_vec)
            self._query['size'] = limit
            self._query['query'][
                'script_score'
            ] = self._outer_instance._form_search_body(
                query_vec_np, limit, search_field
            )[
                'query'
            ][
                'script_score'
            ]

            return self

    @dataclass
    class DBConfig(ElasticDocIndex.DBConfig):
        """Dataclass that contains all "static" configurations of ElasticDocIndex."""

        hosts: Union[str, List[str], None] = 'http://localhost:9200'  # type: ignore

        def dense_vector_config(self):
            return {'dims': 128}

    @dataclass
    class RuntimeConfig(ElasticDocIndex.RuntimeConfig):
        """Dataclass that contains all "dynamic" configurations of ElasticDocIndex."""

        pass

    ###############################################
    # Implementation of abstract methods          #
    ###############################################

    def execute_query(self, query: Dict[str, Any], *args, **kwargs) -> Any:
        """
        Execute a query on the ElasticDocIndex.

        Can take two kinds of inputs:

        1. A native query of the underlying database. This is meant as a passthrough so that you
        can enjoy any functionality that is not available through the Document index API.
        2. The output of this Document index' `QueryBuilder.build()` method.

        :param query: the query to execute
        :return: the result of the query
        """
        if args or kwargs:
            raise ValueError(
                f'args and kwargs not supported for `execute_query` on {type(self)}'
            )

        resp = self._client.search(index=self.index_name, body=query)
        docs, scores = self._format_response(resp)

        return _FindResult(documents=docs, scores=parse_obj_as(NdArray, scores))

    ###############################################
    # Helpers                                     #
    ###############################################

    def _form_search_body(self, query: np.ndarray, limit: int, search_field: str = '') -> Dict[str, Any]:  # type: ignore
        body = {
            'size': limit,
            'query': {
                'script_score': {
                    'query': {'match_all': {}},
                    'script': {
                        'source': f'cosineSimilarity(params.query_vector, \'{search_field}\') + 1.0',
                        'params': {'query_vector': query},
                    },
                }
            },
        }
        return body

    ###############################################
    # API Wrappers                                #
    ###############################################

    def _client_put_mapping(self, mappings: Dict[str, Any]):
        self._client.indices.put_mapping(index=self.index_name, body=mappings)

    def _client_create(self, mappings: Dict[str, Any]):
        body = {'mappings': mappings}
        self._client.indices.create(index=self.index_name, body=body)

    def _client_put_settings(self, settings: Dict[str, Any]):
        self._client.indices.put_settings(index=self.index_name, body=settings)

    def _client_mget(self, ids: Sequence[str]):
        return self._client.mget(index=self.index_name, body={'ids': ids})

    def _client_search(self, **kwargs):
        return self._client.search(index=self.index_name, body=kwargs)

    def _client_msearch(self, request: List[Dict[str, Any]]):
        return self._client.msearch(index=self.index_name, body=request)

DBConfig dataclass

Bases: DBConfig

Dataclass that contains all "static" configurations of ElasticDocIndex.

Source code in docarray/index/backends/elasticv7.py
@dataclass
class DBConfig(ElasticDocIndex.DBConfig):
    """Dataclass that contains all "static" configurations of ElasticDocIndex."""

    hosts: Union[str, List[str], None] = 'http://localhost:9200'  # type: ignore

    def dense_vector_config(self):
        return {'dims': 128}

QueryBuilder

Bases: QueryBuilder

Source code in docarray/index/backends/elasticv7.py
class QueryBuilder(ElasticDocIndex.QueryBuilder):
    def build(self, *args, **kwargs) -> Any:
        """Build the elastic search v7 query object."""
        if (
            'script_score' in self._query['query']
            and 'bool' in self._query['query']
            and len(self._query['query']['bool']) > 0
        ):
            self._query['query']['script_score']['query'] = {}
            self._query['query']['script_score']['query']['bool'] = self._query[
                'query'
            ]['bool']
            del self._query['query']['bool']

        return self._query

    def find(
        self,
        query: Union[AnyTensor, BaseDoc],
        search_field: str = 'embedding',
        limit: int = 10,
        num_candidates: Optional[int] = None,
    ):
        """
        Find k-nearest neighbors of the query.

        :param query: query vector for KNN/ANN search. Has single axis.
        :param search_field: name of the field to search on
        :param limit: maximum number of documents to return per query
        :return: self
        """
        if num_candidates:
            warnings.warn('`num_candidates` is not supported in ElasticV7DocIndex')

        if isinstance(query, BaseDoc):
            query_vec = BaseDocIndex._get_values_by_column([query], search_field)[0]
        else:
            query_vec = query
        query_vec_np = BaseDocIndex._to_numpy(self._outer_instance, query_vec)
        self._query['size'] = limit
        self._query['query'][
            'script_score'
        ] = self._outer_instance._form_search_body(
            query_vec_np, limit, search_field
        )[
            'query'
        ][
            'script_score'
        ]

        return self

build(*args, **kwargs)

Build the elastic search v7 query object.

Source code in docarray/index/backends/elasticv7.py
def build(self, *args, **kwargs) -> Any:
    """Build the elastic search v7 query object."""
    if (
        'script_score' in self._query['query']
        and 'bool' in self._query['query']
        and len(self._query['query']['bool']) > 0
    ):
        self._query['query']['script_score']['query'] = {}
        self._query['query']['script_score']['query']['bool'] = self._query[
            'query'
        ]['bool']
        del self._query['query']['bool']

    return self._query

filter(query, limit=10)

Find documents in the index based on a filter query

Parameters:

Name Type Description Default
query Dict[str, Any]

the query to execute

required
limit int

maximum number of documents to return

10

Returns:

Type Description

self

Source code in docarray/index/backends/elastic.py
def filter(self, query: Dict[str, Any], limit: int = 10):
    """Find documents in the index based on a filter query

    :param query: the query to execute
    :param limit: maximum number of documents to return
    :return: self
    """
    self._outer_instance._logger.debug('Executing filter query')

    self._query['size'] = limit
    self._query['query']['bool']['filter'].append(query)
    return self

find(query, search_field='embedding', limit=10, num_candidates=None)

Find k-nearest neighbors of the query.

Parameters:

Name Type Description Default
query Union[AnyTensor, BaseDoc]

query vector for KNN/ANN search. Has single axis.

required
search_field str

name of the field to search on

'embedding'
limit int

maximum number of documents to return per query

10

Returns:

Type Description

self

Source code in docarray/index/backends/elasticv7.py
def find(
    self,
    query: Union[AnyTensor, BaseDoc],
    search_field: str = 'embedding',
    limit: int = 10,
    num_candidates: Optional[int] = None,
):
    """
    Find k-nearest neighbors of the query.

    :param query: query vector for KNN/ANN search. Has single axis.
    :param search_field: name of the field to search on
    :param limit: maximum number of documents to return per query
    :return: self
    """
    if num_candidates:
        warnings.warn('`num_candidates` is not supported in ElasticV7DocIndex')

    if isinstance(query, BaseDoc):
        query_vec = BaseDocIndex._get_values_by_column([query], search_field)[0]
    else:
        query_vec = query
    query_vec_np = BaseDocIndex._to_numpy(self._outer_instance, query_vec)
    self._query['size'] = limit
    self._query['query'][
        'script_score'
    ] = self._outer_instance._form_search_body(
        query_vec_np, limit, search_field
    )[
        'query'
    ][
        'script_score'
    ]

    return self

Find documents in the index based on a text search query

Parameters:

Name Type Description Default
query str

The text to search for

required
search_field str

name of the field to search on

'text'
limit int

maximum number of documents to find

10

Returns:

Type Description

self

Source code in docarray/index/backends/elastic.py
def text_search(self, query: str, search_field: str = 'text', limit: int = 10):
    """Find documents in the index based on a text search query

    :param query: The text to search for
    :param search_field: name of the field to search on
    :param limit: maximum number of documents to find
    :return: self
    """
    self._outer_instance._logger.debug('Executing text search query')

    self._outer_instance._validate_search_field(search_field)
    self._query['size'] = limit
    self._query['query']['bool']['must'].append(
        {'match': {search_field: query}}
    )
    return self

RuntimeConfig dataclass

Bases: RuntimeConfig

Dataclass that contains all "dynamic" configurations of ElasticDocIndex.

Source code in docarray/index/backends/elasticv7.py
@dataclass
class RuntimeConfig(ElasticDocIndex.RuntimeConfig):
    """Dataclass that contains all "dynamic" configurations of ElasticDocIndex."""

    pass

__contains__(item)

Checks if a given document exists in the index.

Parameters:

Name Type Description Default
item BaseDoc

The document to check. It must be an instance of BaseDoc or its subclass.

required

Returns:

Type Description
bool

True if the document exists in the index, False otherwise.

Source code in docarray/index/abstract.py
def __contains__(self, item: BaseDoc) -> bool:
    """
    Checks if a given document exists in the index.

    :param item: The document to check.
        It must be an instance of BaseDoc or its subclass.
    :return: True if the document exists in the index, False otherwise.
    """
    if safe_issubclass(type(item), BaseDoc):
        return self._doc_exists(str(item.id))
    else:
        raise TypeError(
            f"item must be an instance of BaseDoc or its subclass, not '{type(item).__name__}'"
        )

__delitem__(key)

Delete one or multiple Documents from the index, by id. If no document is found, a KeyError is raised.

Parameters:

Name Type Description Default
key Union[str, Sequence[str]]

id or ids to delete from the Document index

required
Source code in docarray/index/abstract.py
def __delitem__(self, key: Union[str, Sequence[str]]):
    """Delete one or multiple Documents from the index, by `id`.
    If no document is found, a KeyError is raised.

    :param key: id or ids to delete from the Document index
    """
    self._logger.info(f'Deleting documents with id(s) {key} from the index')
    if isinstance(key, str):
        key = [key]

    # delete nested data
    for field_name, type_, _ in self._flatten_schema(
        cast(Type[BaseDoc], self._schema)
    ):
        if safe_issubclass(type_, AnyDocArray):
            for doc_id in key:
                nested_docs_id = self._subindices[field_name]._filter_by_parent_id(
                    doc_id
                )
                if nested_docs_id:
                    del self._subindices[field_name][nested_docs_id]
    # delete data
    self._del_items(key)

__getitem__(key)

Get one or multiple Documents into the index, by id. If no document is found, a KeyError is raised.

Parameters:

Name Type Description Default
key Union[str, Sequence[str]]

id or ids to get from the Document index

required
Source code in docarray/index/abstract.py
def __getitem__(
    self, key: Union[str, Sequence[str]]
) -> Union[TSchema, DocList[TSchema]]:
    """Get one or multiple Documents into the index, by `id`.
    If no document is found, a KeyError is raised.

    :param key: id or ids to get from the Document index
    """
    # normalize input
    if isinstance(key, str):
        return_singleton = True
        key = [key]
    else:
        return_singleton = False

    # retrieve data
    doc_sequence = self._get_items(key)

    # check data
    if len(doc_sequence) == 0:
        raise KeyError(f'No document with id {key} found')

    # retrieve nested data
    for field_name, type_, _ in self._flatten_schema(
        cast(Type[BaseDoc], self._schema)
    ):
        if safe_issubclass(type_, AnyDocArray) and isinstance(
            doc_sequence[0], Dict
        ):
            for doc in doc_sequence:
                self._get_subindex_doclist(doc, field_name)  # type: ignore

    # cast output
    if isinstance(doc_sequence, DocList):
        out_docs: DocList[TSchema] = doc_sequence
    elif isinstance(doc_sequence[0], Dict):
        out_docs = self._dict_list_to_docarray(doc_sequence)  # type: ignore
    else:
        docs_cls = DocList.__class_getitem__(cast(Type[BaseDoc], self._schema))
        out_docs = docs_cls(doc_sequence)

    return out_docs[0] if return_singleton else out_docs

__init__(db_config=None, **kwargs)

Initialize ElasticV7DocIndex

Source code in docarray/index/backends/elasticv7.py
def __init__(self, db_config=None, **kwargs):
    """Initialize ElasticV7DocIndex"""
    from elasticsearch import __version__ as __es__version__

    if __es__version__[0] > 7:
        raise ImportError(
            'ElasticV7DocIndex requires the elasticsearch library to be version 7.10.1'
        )

    super().__init__(db_config, **kwargs)

build_query(**kwargs)

Build a query for ElasticDocIndex.

Parameters:

Name Type Description Default
kwargs

parameters to forward to QueryBuilder initialization

{}

Returns:

Type Description
QueryBuilder

QueryBuilder object

Source code in docarray/index/backends/elastic.py
def build_query(self, **kwargs) -> QueryBuilder:
    """
    Build a query for ElasticDocIndex.
    :param kwargs: parameters to forward to QueryBuilder initialization
    :return: QueryBuilder object
    """
    return self.QueryBuilder(self, **kwargs)

configure(runtime_config=None, **kwargs)

Configure the DocumentIndex. You can either pass a config object to config or pass individual config parameters as keyword arguments. If a configuration object is passed, it will replace the current configuration. If keyword arguments are passed, they will update the current configuration.

Parameters:

Name Type Description Default
runtime_config

the configuration to apply

None
kwargs

individual configuration parameters

{}
Source code in docarray/index/abstract.py
def configure(self, runtime_config=None, **kwargs):
    """
    Configure the DocumentIndex.
    You can either pass a config object to `config` or pass individual config
    parameters as keyword arguments.
    If a configuration object is passed, it will replace the current configuration.
    If keyword arguments are passed, they will update the current configuration.

    :param runtime_config: the configuration to apply
    :param kwargs: individual configuration parameters
    """
    if runtime_config is None:
        self._runtime_config = replace(self._runtime_config, **kwargs)
    else:
        if not isinstance(runtime_config, self.RuntimeConfig):
            raise ValueError(f'runtime_config must be of type {self.RuntimeConfig}')
        self._runtime_config = runtime_config

execute_query(query, *args, **kwargs)

Execute a query on the ElasticDocIndex.

Can take two kinds of inputs:

  1. A native query of the underlying database. This is meant as a passthrough so that you can enjoy any functionality that is not available through the Document index API.
  2. The output of this Document index' QueryBuilder.build() method.

Parameters:

Name Type Description Default
query Dict[str, Any]

the query to execute

required

Returns:

Type Description
Any

the result of the query

Source code in docarray/index/backends/elasticv7.py
def execute_query(self, query: Dict[str, Any], *args, **kwargs) -> Any:
    """
    Execute a query on the ElasticDocIndex.

    Can take two kinds of inputs:

    1. A native query of the underlying database. This is meant as a passthrough so that you
    can enjoy any functionality that is not available through the Document index API.
    2. The output of this Document index' `QueryBuilder.build()` method.

    :param query: the query to execute
    :return: the result of the query
    """
    if args or kwargs:
        raise ValueError(
            f'args and kwargs not supported for `execute_query` on {type(self)}'
        )

    resp = self._client.search(index=self.index_name, body=query)
    docs, scores = self._format_response(resp)

    return _FindResult(documents=docs, scores=parse_obj_as(NdArray, scores))

filter(filter_query, limit=10, **kwargs)

Find documents in the index based on a filter query

Parameters:

Name Type Description Default
filter_query Any

the DB specific filter query to execute

required
limit int

maximum number of documents to return

10

Returns:

Type Description
DocList

a DocList containing the documents that match the filter query

Source code in docarray/index/abstract.py
def filter(
    self,
    filter_query: Any,
    limit: int = 10,
    **kwargs,
) -> DocList:
    """Find documents in the index based on a filter query

    :param filter_query: the DB specific filter query to execute
    :param limit: maximum number of documents to return
    :return: a DocList containing the documents that match the filter query
    """
    self._logger.debug(f'Executing `filter` for the query {filter_query}')
    docs = self._filter(filter_query, limit=limit, **kwargs)

    if isinstance(docs, List) and not isinstance(docs, DocList):
        docs = self._dict_list_to_docarray(docs)

    return docs

filter_batched(filter_queries, limit=10, **kwargs)

Find documents in the index based on multiple filter queries.

Parameters:

Name Type Description Default
filter_queries Any

the DB specific filter query to execute

required
limit int

maximum number of documents to return

10

Returns:

Type Description
List[DocList]

a DocList containing the documents that match the filter query

Source code in docarray/index/abstract.py
def filter_batched(
    self,
    filter_queries: Any,
    limit: int = 10,
    **kwargs,
) -> List[DocList]:
    """Find documents in the index based on multiple filter queries.

    :param filter_queries: the DB specific filter query to execute
    :param limit: maximum number of documents to return
    :return: a DocList containing the documents that match the filter query
    """
    self._logger.debug(
        f'Executing `filter_batched` for the queries {filter_queries}'
    )
    da_list = self._filter_batched(filter_queries, limit=limit, **kwargs)

    if len(da_list) > 0 and isinstance(da_list[0], List):
        da_list = [self._dict_list_to_docarray(docs) for docs in da_list]

    return da_list  # type: ignore

filter_subindex(filter_query, subindex, limit=10, **kwargs)

Find documents in subindex level based on a filter query

Parameters:

Name Type Description Default
filter_query Any

the DB specific filter query to execute

required
subindex str

name of the subindex to search on

required
limit int

maximum number of documents to return

10

Returns:

Type Description
DocList

a DocList containing the subindex level documents that match the filter query

Source code in docarray/index/abstract.py
def filter_subindex(
    self,
    filter_query: Any,
    subindex: str,
    limit: int = 10,
    **kwargs,
) -> DocList:
    """Find documents in subindex level based on a filter query

    :param filter_query: the DB specific filter query to execute
    :param subindex: name of the subindex to search on
    :param limit: maximum number of documents to return
    :return: a DocList containing the subindex level documents that match the filter query
    """
    self._logger.debug(
        f'Executing `filter` for the query {filter_query} in subindex {subindex}'
    )
    if '__' in subindex:
        fields = subindex.split('__')
        return self._subindices[fields[0]].filter_subindex(
            filter_query, '__'.join(fields[1:]), limit=limit, **kwargs
        )
    else:
        return self._subindices[subindex].filter(
            filter_query, limit=limit, **kwargs
        )

find(query, search_field='', limit=10, **kwargs)

Find documents in the index using nearest neighbor search.

Parameters:

Name Type Description Default
query Union[AnyTensor, BaseDoc]

query vector for KNN/ANN search. Can be either a tensor-like (np.array, torch.Tensor, etc.) with a single axis, or a Document

required
search_field str

name of the field to search on. Documents in the index are retrieved based on this similarity of this field to the query.

''
limit int

maximum number of documents to return

10

Returns:

Type Description
FindResult

a named tuple containing documents and scores

Source code in docarray/index/abstract.py
def find(
    self,
    query: Union[AnyTensor, BaseDoc],
    search_field: str = '',
    limit: int = 10,
    **kwargs,
) -> FindResult:
    """Find documents in the index using nearest neighbor search.

    :param query: query vector for KNN/ANN search.
        Can be either a tensor-like (np.array, torch.Tensor, etc.)
        with a single axis, or a Document
    :param search_field: name of the field to search on.
        Documents in the index are retrieved based on this similarity
        of this field to the query.
    :param limit: maximum number of documents to return
    :return: a named tuple containing `documents` and `scores`
    """
    self._logger.debug(f'Executing `find` for search field {search_field}')

    self._validate_search_field(search_field)
    if isinstance(query, BaseDoc):
        query_vec = self._get_values_by_column([query], search_field)[0]
    else:
        query_vec = query
    query_vec_np = self._to_numpy(query_vec)
    docs, scores = self._find(
        query_vec_np, search_field=search_field, limit=limit, **kwargs
    )

    if isinstance(docs, List) and not isinstance(docs, DocList):
        docs = self._dict_list_to_docarray(docs)

    return FindResult(documents=docs, scores=scores)

find_batched(queries, search_field='', limit=10, **kwargs)

Find documents in the index using nearest neighbor search.

Parameters:

Name Type Description Default
queries Union[AnyTensor, DocList]

query vector for KNN/ANN search. Can be either a tensor-like (np.array, torch.Tensor, etc.) with a, or a DocList. If a tensor-like is passed, it should have shape (batch_size, vector_dim)

required
search_field str

name of the field to search on. Documents in the index are retrieved based on this similarity of this field to the query.

''
limit int

maximum number of documents to return per query

10

Returns:

Type Description
FindResultBatched

a named tuple containing documents and scores

Source code in docarray/index/abstract.py
def find_batched(
    self,
    queries: Union[AnyTensor, DocList],
    search_field: str = '',
    limit: int = 10,
    **kwargs,
) -> FindResultBatched:
    """Find documents in the index using nearest neighbor search.

    :param queries: query vector for KNN/ANN search.
        Can be either a tensor-like (np.array, torch.Tensor, etc.) with a,
        or a DocList.
        If a tensor-like is passed, it should have shape (batch_size, vector_dim)
    :param search_field: name of the field to search on.
        Documents in the index are retrieved based on this similarity
        of this field to the query.
    :param limit: maximum number of documents to return per query
    :return: a named tuple containing `documents` and `scores`
    """
    self._logger.debug(f'Executing `find_batched` for search field {search_field}')

    if search_field:
        if '__' in search_field:
            fields = search_field.split('__')
            if safe_issubclass(self._schema._get_field_annotation(fields[0]), AnyDocArray):  # type: ignore
                return self._subindices[fields[0]].find_batched(
                    queries,
                    search_field='__'.join(fields[1:]),
                    limit=limit,
                    **kwargs,
                )

    self._validate_search_field(search_field)
    if isinstance(queries, Sequence):
        query_vec_list = self._get_values_by_column(queries, search_field)
        query_vec_np = np.stack(
            tuple(self._to_numpy(query_vec) for query_vec in query_vec_list)
        )
    else:
        query_vec_np = self._to_numpy(queries)

    da_list, scores = self._find_batched(
        query_vec_np, search_field=search_field, limit=limit, **kwargs
    )
    if (
        len(da_list) > 0
        and isinstance(da_list[0], List)
        and not isinstance(da_list[0], DocList)
    ):
        da_list = [self._dict_list_to_docarray(docs) for docs in da_list]

    return FindResultBatched(documents=da_list, scores=scores)  # type: ignore

find_subindex(query, subindex='', search_field='', limit=10, **kwargs)

Find documents in subindex level.

Parameters:

Name Type Description Default
query Union[AnyTensor, BaseDoc]

query vector for KNN/ANN search. Can be either a tensor-like (np.array, torch.Tensor, etc.) with a single axis, or a Document

required
subindex str

name of the subindex to search on

''
search_field str

name of the field to search on

''
limit int

maximum number of documents to return

10

Returns:

Type Description
SubindexFindResult

a named tuple containing root docs, subindex docs and scores

Source code in docarray/index/abstract.py
def find_subindex(
    self,
    query: Union[AnyTensor, BaseDoc],
    subindex: str = '',
    search_field: str = '',
    limit: int = 10,
    **kwargs,
) -> SubindexFindResult:
    """Find documents in subindex level.

    :param query: query vector for KNN/ANN search.
        Can be either a tensor-like (np.array, torch.Tensor, etc.)
        with a single axis, or a Document
    :param subindex: name of the subindex to search on
    :param search_field: name of the field to search on
    :param limit: maximum number of documents to return
    :return: a named tuple containing root docs, subindex docs and scores
    """
    self._logger.debug(f'Executing `find_subindex` for search field {search_field}')

    sub_docs, scores = self._find_subdocs(
        query, subindex=subindex, search_field=search_field, limit=limit, **kwargs
    )

    fields = subindex.split('__')
    root_ids = [
        self._get_root_doc_id(doc.id, fields[0], '__'.join(fields[1:]))
        for doc in sub_docs
    ]
    root_docs = DocList[self._schema]()  # type: ignore
    for id in root_ids:
        root_docs.append(self[id])

    return SubindexFindResult(
        root_documents=root_docs, sub_documents=sub_docs, scores=scores  # type: ignore
    )

index(docs, **kwargs)

index Documents into the index.

Note

Passing a sequence of Documents that is not a DocList (such as a List of Docs) comes at a performance penalty. This is because the Index needs to check compatibility between itself and the data. With a DocList as input this is a single check; for other inputs compatibility needs to be checked for every Document individually.

Parameters:

Name Type Description Default
docs Union[BaseDoc, Sequence[BaseDoc]]

Documents to index.

required
Source code in docarray/index/abstract.py
def index(self, docs: Union[BaseDoc, Sequence[BaseDoc]], **kwargs):
    """index Documents into the index.

    !!! note
        Passing a sequence of Documents that is not a DocList
        (such as a List of Docs) comes at a performance penalty.
        This is because the Index needs to check compatibility between itself and
        the data. With a DocList as input this is a single check; for other inputs
        compatibility needs to be checked for every Document individually.

    :param docs: Documents to index.
    """
    n_docs = 1 if isinstance(docs, BaseDoc) else len(docs)
    self._logger.debug(f'Indexing {n_docs} documents')
    docs_validated = self._validate_docs(docs)
    self._update_subindex_data(docs_validated)
    data_by_columns = self._get_col_value_dict(docs_validated)
    self._index(data_by_columns, **kwargs)

num_docs()

Get the number of documents.

Source code in docarray/index/backends/elastic.py
def num_docs(self) -> int:
    """
    Get the number of documents.
    """
    self._logger.debug('Getting the number of documents in the index')
    return self._client.count(index=self.index_name)['count']

python_type_to_db_type(python_type)

Map python type to database type. Takes any python type and returns the corresponding database column type.

Parameters:

Name Type Description Default
python_type Type

a python type.

required

Returns:

Type Description
Any

the corresponding database column type, or None if python_type is not supported.

Source code in docarray/index/backends/elastic.py
def python_type_to_db_type(self, python_type: Type) -> Any:
    """Map python type to database type.
    Takes any python type and returns the corresponding database column type.

    :param python_type: a python type.
    :return: the corresponding database column type,
        or None if ``python_type`` is not supported.
    """
    self._logger.debug(f'Mapping Python type {python_type} to database type')

    for allowed_type in ELASTIC_PY_VEC_TYPES:
        if safe_issubclass(python_type, allowed_type):
            self._logger.info(
                f'Mapped Python type {python_type} to database type "dense_vector"'
            )
            return 'dense_vector'

    elastic_py_types = {
        docarray.typing.ID: 'keyword',
        docarray.typing.AnyUrl: 'keyword',
        bool: 'boolean',
        int: 'integer',
        float: 'float',
        str: 'text',
        bytes: 'binary',
        dict: 'object',
    }

    for type in elastic_py_types.keys():
        if safe_issubclass(python_type, type):
            self._logger.info(
                f'Mapped Python type {python_type} to database type "{elastic_py_types[type]}"'
            )
            return elastic_py_types[type]

    err_msg = f'Unsupported column type for {type(self)}: {python_type}'
    self._logger.error(err_msg)
    raise ValueError(err_msg)

subindex_contains(item)

Checks if a given BaseDoc item is contained in the index or any of its subindices.

Parameters:

Name Type Description Default
item BaseDoc

the given BaseDoc

required

Returns:

Type Description
bool

if the given BaseDoc item is contained in the index/subindices

Source code in docarray/index/abstract.py
def subindex_contains(self, item: BaseDoc) -> bool:
    """Checks if a given BaseDoc item is contained in the index or any of its subindices.

    :param item: the given BaseDoc
    :return: if the given BaseDoc item is contained in the index/subindices
    """
    if self._is_index_empty:
        return False

    if safe_issubclass(type(item), BaseDoc):
        return self.__contains__(item) or any(
            index.subindex_contains(item) for index in self._subindices.values()
        )
    else:
        raise TypeError(
            f"item must be an instance of BaseDoc or its subclass, not '{type(item).__name__}'"
        )

Find documents in the index based on a text search query.

Parameters:

Name Type Description Default
query Union[str, BaseDoc]

The text to search for

required
search_field str

name of the field to search on

''
limit int

maximum number of documents to return

10

Returns:

Type Description
FindResult

a named tuple containing documents and scores

Source code in docarray/index/abstract.py
def text_search(
    self,
    query: Union[str, BaseDoc],
    search_field: str = '',
    limit: int = 10,
    **kwargs,
) -> FindResult:
    """Find documents in the index based on a text search query.

    :param query: The text to search for
    :param search_field: name of the field to search on
    :param limit: maximum number of documents to return
    :return: a named tuple containing `documents` and `scores`
    """
    self._logger.debug(f'Executing `text_search` for search field {search_field}')
    self._validate_search_field(search_field)
    if isinstance(query, BaseDoc):
        query_text = self._get_values_by_column([query], search_field)[0]
    else:
        query_text = query
    docs, scores = self._text_search(
        query_text, search_field=search_field, limit=limit, **kwargs
    )

    if isinstance(docs, List) and not isinstance(docs, DocList):
        docs = self._dict_list_to_docarray(docs)

    return FindResult(documents=docs, scores=scores)

text_search_batched(queries, search_field='', limit=10, **kwargs)

Find documents in the index based on a text search query.

Parameters:

Name Type Description Default
queries Union[Sequence[str], Sequence[BaseDoc]]

The texts to search for

required
search_field str

name of the field to search on

''
limit int

maximum number of documents to return

10

Returns:

Type Description
FindResultBatched

a named tuple containing documents and scores

Source code in docarray/index/abstract.py
def text_search_batched(
    self,
    queries: Union[Sequence[str], Sequence[BaseDoc]],
    search_field: str = '',
    limit: int = 10,
    **kwargs,
) -> FindResultBatched:
    """Find documents in the index based on a text search query.

    :param queries: The texts to search for
    :param search_field: name of the field to search on
    :param limit: maximum number of documents to return
    :return: a named tuple containing `documents` and `scores`
    """
    self._logger.debug(
        f'Executing `text_search_batched` for search field {search_field}'
    )
    self._validate_search_field(search_field)
    if isinstance(queries[0], BaseDoc):
        query_docs: Sequence[BaseDoc] = cast(Sequence[BaseDoc], queries)
        query_texts: Sequence[str] = self._get_values_by_column(
            query_docs, search_field
        )
    else:
        query_texts = cast(Sequence[str], queries)
    da_list, scores = self._text_search_batched(
        query_texts, search_field=search_field, limit=limit, **kwargs
    )

    if len(da_list) > 0 and isinstance(da_list[0], List):
        docs = [self._dict_list_to_docarray(docs) for docs in da_list]
        return FindResultBatched(documents=docs, scores=scores)

    da_list_ = cast(List[DocList], da_list)
    return FindResultBatched(documents=da_list_, scores=scores)