Skip to content

Elasticsearch Document Index

DocArray comes with two Document Indexes for Elasticsearch:

Should you use ES v7 or v8?

Elasticsearch v8 is the current version of ES and offers native vector search (ANN) support, alongside text and range search.

Elasticsearch v7.10 can store vectors, but does not support native ANN vector search, but only exhaustive (i.e. slow) vector search, alongside text and range search.

Some users prefer to use ES v7.10 because it is available under a different license to ES v8.0.0.

Installation

To use ElasticDocIndex, you need to install the following dependencies:

pip install elasticsearch==8.6.2
pip install elastic-transport

To use ElasticV7DocIndex, you need to install the following dependencies:

pip install elasticsearch==7.10.1
pip install elastic-transport

The following example is based on ElasticDocIndex, but will also work for ElasticV7DocIndex.

Basic usage

This snippet demonstrates the basic usage of ElasticDocIndex. It defines a document schema with a title and an embedding, creates ten dummy documents with random embeddings, initializes an instance of ElasticDocIndex to index these documents, and performs a vector similarity search to retrieve the ten most similar documents to a given query vector.

from docarray import BaseDoc, DocList
from docarray.index import ElasticDocIndex  # or ElasticV7DocIndex
from docarray.typing import NdArray
import numpy as np

# Define the document schema.
class MyDoc(BaseDoc):
    title: str 
    embedding: NdArray[128]

# Create dummy documents.
docs = DocList[MyDoc](MyDoc(title=f'title #{i}', embedding=np.random.rand(128)) for i in range(10))

# Initialize a new ElasticDocIndex instance and add the documents to the index.
doc_index = ElasticDocIndex[MyDoc](index_name='my_index')
doc_index.index(docs)

# Perform a vector search.
query = np.ones(128)
retrieved_docs = doc_index.find(query, search_field='embedding', limit=10)

Initialize

You can use docker-compose to create a local Elasticsearch service with the following docker-compose.yml.

version: "3.3"
services:
  elastic:
    image: docker.elastic.co/elasticsearch/elasticsearch:8.6.2
    environment:
      - xpack.security.enabled=false
      - discovery.type=single-node
      - ES_JAVA_OPTS=-Xmx1024m
    ports:
      - "9200:9200"
    networks:
      - elastic

networks:
  elastic:
    name: elastic

Run the following command in the folder of the above docker-compose.yml to start the service:

docker-compose up

Schema definition

To construct an index, you first need to define a schema in the form of a Document.

There are a number of configurations you can pack into your schema:

  • Every field in your schema will become one column in the database
  • For vector fields, such as NdArray, TorchTensor, or TensorflowTensor, you need to specify a dimensionality to be able to perform vector search
  • You can override the default column type for every field by passing any ES field data type to field_name: Type = Field(col_type=...). You can see an example of this in the section on keyword filters.

Additionally, you can pass a hosts argument to the __init__() method to connect to an ES instance. By default, it is http://localhost:9200.

import numpy as np
from pydantic import Field

from docarray import BaseDoc
from docarray.index import ElasticDocIndex
from docarray.typing import NdArray


class SimpleDoc(BaseDoc):
    # specify tensor field with dimensionality 128
    tensor: NdArray[128]
    # alternative and equivalent definition:
    # tensor: NdArray = Field(dims=128)


doc_index = ElasticDocIndex[SimpleDoc](hosts='http://localhost:9200')

Using a predefined document as schema

DocArray offers a number of predefined documents, like ImageDoc and TextDoc. If you try to use these directly as a schema for a Document Index, you will get unexpected behavior: Depending on the backend, an exception will be raised, or no vector index for ANN lookup will be built.

The reason for this is that predefined documents don't hold information about the dimensionality of their .embedding field. But this is crucial information for any vector database to work properly!

You can work around this problem by subclassing the predefined document and adding the dimensionality information:

from docarray.documents import TextDoc
from docarray.typing import NdArray
from docarray.index import ElasticDocIndex


class MyDoc(TextDoc):
    embedding: NdArray[128]


db = ElasticDocIndex[MyDoc](index_name='test_db')
from docarray.documents import TextDoc
from docarray.typing import AnyTensor
from docarray.index import ElasticDocIndex
from pydantic import Field


class MyDoc(TextDoc):
    embedding: AnyTensor = Field(dim=128)


db = ElasticDocIndex[MyDoc](index_name='test_db3')

Once you have defined the schema of your Document Index in this way, the data that you index can be either the predefined Document type or your custom Document type.

The next section goes into more detail about data indexing, but note that if you have some TextDocs, ImageDocs etc. that you want to index, you don't need to cast them to MyDoc:

from docarray import DocList

# data of type TextDoc
data = DocList[TextDoc](
    [
        TextDoc(text='hello world', embedding=np.random.rand(128)),
        TextDoc(text='hello world', embedding=np.random.rand(128)),
        TextDoc(text='hello world', embedding=np.random.rand(128)),
    ]
)

# you can index this into Document Index of type MyDoc
db.index(data)

Index

Now that you have a Document Index, you can add data to it, using the index() method. The .num_docs() method returns the total number of documents in the index.

from docarray import DocList

# create some random data
docs = DocList[SimpleDoc]([SimpleDoc(tensor=np.ones(128)) for _ in range(64)])

doc_index.index(docs)

print(f'number of docs in the index: {doc_index.num_docs()}')

As you can see, DocList[SimpleDoc] and ElasticDocIndex[SimpleDoc] both have SimpleDoc as a parameter. This means that they share the same schema, and in general, both the Document Index and the data that you want to store need to have compatible schemas.

When are two schemas compatible?

The schemas of your Document Index and data need to be compatible with each other.

Let's say A is the schema of your Document Index and B is the schema of your data. There are a few rules that determine if schema A is compatible with schema B. If any of the following are true, then A and B are compatible:

  • A and B are the same class
  • A and B have the same field names and field types
  • A and B have the same field names, and, for every field, the type of B is a subclass of the type of A

In particular, this means that you can easily index predefined documents into a Document Index.

Now that you have indexed your data, you can perform vector similarity search using the find() method.

You can use the find() function with a document of the type MyDoc to find similar documents within the Document Index:

You can use the limit argument to configure how many documents to return.

Note

ElasticV7DocIndex uses Elasticsearch v7.10.1, which does not support approximate nearest neighbour algorithms such as HNSW. This can lead to poor performance when the search involves many vectors. ElasticDocIndex does not have this limitation.

# create a query document
query = SimpleDoc(tensor=np.ones(128))

# find similar documents
matches, scores = doc_index.find(query, search_field='tensor', limit=5)

print(f'{matches=}')
print(f'{matches.text=}')
print(f'{scores=}')
# create a query vector
query = np.random.rand(128)

# find similar documents
matches, scores = doc_index.find(query, search_field='tensor', limit=5)

print(f'{matches=}')
print(f'{matches.text=}')
print(f'{scores=}')

To peform a vector search, you need to specify a search_field. This is the field that serves as the basis of comparison between your query and the documents in the Document Index.

In this example you only have one field (tensor) that is a vector, so you can trivially choose that one. In general, you could have multiple fields of type NdArray or TorchTensor or TensorFlowTensor, and you can choose which one to use for the search.

The find() method returns a named tuple containing the closest matching documents and their associated similarity scores.

When searching on the subindex level, you can use the find_subindex() method, which returns a named tuple containing the subindex documents, similarity scores and their associated root documents.

How these scores are calculated depends on the backend, and can usually be configured.

You can also search for multiple documents at once, in a batch, using the find_batched() method.

# create some query Documents
queries = DocList[SimpleDoc](
    SimpleDoc(tensor=np.random.rand(128)) for i in range(3)
)

# find similar documents
matches, scores = doc_index.find_batched(queries, search_field='tensor', limit=5)

print(f'{matches=}')
print(f'{matches[0].text=}')
print(f'{scores=}')
# create some query vectors
query = np.random.rand(3, 128)

# find similar documents
matches, scores = doc_index.find_batched(query, search_field='tensor', limit=5)

print(f'{matches=}')
print(f'{matches[0].text=}')
print(f'{scores=}')

The find_batched() method returns a named tuple containing a list of DocLists, one for each query, containing the closest matching documents and their similarity scores.

Filter

You can filter your documents by using the filter() or filter_batched() method with a corresponding filter query. The query should follow Elastic's query language.

The filter() method accepts queries that follow the Elasticsearch Query DSL and consists of leaf and compound clauses.

Using this, you can perform keyword filters, geolocation filters and range filters.

Keyword filter

To filter documents in your index by keyword, you can use Field(col_type='keyword') to enable keyword search for given fields:

class NewsDoc(BaseDoc):
    text: str
    category: str = Field(col_type='keyword')  # enable keyword filtering


doc_index = ElasticDocIndex[NewsDoc]()
index_docs = [
    NewsDoc(id='0', text='this is a news for sport', category='sport'),
    NewsDoc(id='1', text='this is a news for finance', category='finance'),
    NewsDoc(id='2', text='this is another news for sport', category='sport'),
]
doc_index.index(index_docs)

# search with filer
query_filter = {'terms': {'category': ['sport']}}
docs = doc_index.filter(query_filter)

Geolocation filter

To filter documents in your index by geolocation, you can use Field(col_type='geo_point') on a given field:

class NewsDoc(BaseDoc):
    text: str
    location: dict = Field(col_type='geo_point')  # enable geolocation filtering


doc_index = ElasticDocIndex[NewsDoc]()
index_docs = [
    NewsDoc(text='this is from Berlin', location={'lon': 13.24, 'lat': 50.31}),
    NewsDoc(text='this is from Beijing', location={'lon': 116.22, 'lat': 39.55}),
    NewsDoc(text='this is from San Jose', location={'lon': -121.89, 'lat': 37.34}),
]
doc_index.index(index_docs)

# filter the eastern hemisphere
query = {
    'bool': {
        'filter': {
            'geo_bounding_box': {
                'location': {
                    'top_left': {'lon': 0, 'lat': 90},
                    'bottom_right': {'lon': 180, 'lat': 0},
                }
            }
        }
    }
}

docs = doc_index.filter(query)

Range filter

You can have range field types in your document schema and set Field(col_type='integer_range')(or also date_range, etc.) to filter documents based on the range of the field.

class NewsDoc(BaseDoc):
    time_frame: dict = Field(
        col_type='date_range', format='yyyy-MM-dd'
    )  # enable range filtering


doc_index = ElasticDocIndex[NewsDoc]()
index_docs = [
    NewsDoc(time_frame={'gte': '2023-01-01', 'lt': '2023-02-01'}),
    NewsDoc(time_frame={'gte': '2023-02-01', 'lt': '2023-03-01'}),
    NewsDoc(time_frame={'gte': '2023-03-01', 'lt': '2023-04-01'}),
]
doc_index.index(index_docs)

query = {
    'bool': {
        'filter': {
            'range': {
                'time_frame': {
                    'gte': '2023-02-05',
                    'lt': '2023-02-10',
                    'relation': 'contains',
                }
            }
        }
    }
}

docs = doc_index.filter(query)

In addition to vector similarity search, the Document Index interface offers methods for text search: text_search(), as well as the batched version text_search_batched().

As in "pure" Elasticsearch, you can use text search directly on the field of type str:

class NewsDoc(BaseDoc):
    text: str


doc_index = ElasticDocIndex[NewsDoc]()
index_docs = [
    NewsDoc(id='0', text='this is a news for sport'),
    NewsDoc(id='1', text='this is a news for finance'),
    NewsDoc(id='2', text='this is another news for sport'),
]
doc_index.index(index_docs)
query = 'finance'

# search with text
docs, scores = doc_index.text_search(query, search_field='text')

Document Index supports atomic operations for vector similarity search, text search and filter search.

To combine these operations into a single, hybrid search query, you can use the query builder that is accessible through build_query():

For example, you can build a hybrid serach query that performs range filtering, vector search and text search:

class MyDoc(BaseDoc):
    tens: NdArray[10] = Field(similarity='l2_norm')
    num: int
    text: str


doc_index = ElasticDocIndex[MyDoc]()
index_docs = [
    MyDoc(id=f'{i}', tens=np.ones(10) * i, num=int(i / 2), text=f'text {int(i/2)}')
    for i in range(10)
]
doc_index.index(index_docs)

q = (
    doc_index.build_query()
    .filter({'range': {'num': {'lte': 3}}})
    .find(index_docs[-1], search_field='tens')
    .text_search('0', search_field='text')
    .build()
)
docs, _ = doc_index.execute_query(q)

You can also manually build a valid ES query and directly pass it to the execute_query() method.

Access documents

To access a document, you need to specify its id. You can also pass a list of ids to access multiple documents.

# access a single Doc
doc_index[index_docs[1].id]

# access multiple Docs
doc_index[index_docs[2].id, index_docs[3].id]

Delete documents

To delete documents, use the built-in function del with the id of the documents that you want to delete. You can also pass a list of ids to delete multiple documents.

# delete a single Doc
del doc_index[index_docs[1].id]

# delete multiple Docs
del doc_index[index_docs[2].id, index_docs[3].id]

Configuration

DBConfig

The following configs can be set in DBConfig:

Name Description Default
hosts Hostname of the Elasticsearch server http://localhost:9200
es_config Other ES configuration options in a Dict and pass to Elasticsearch client constructor, e.g. cloud_id, api_key None
index_name Elasticsearch index name, the name of Elasticsearch index object None. Data will be stored in an index named after the Document type used as schema.
index_settings Other index settings in a Dict for creating the index dict
index_mappings Other index mappings in a Dict for creating the index dict
default_column_config The default configurations for every column type. dict

You can pass any of the above as keyword arguments to the __init__() method or pass an entire configuration object. See here for more information.

default_column_config is the default configurations for every column type. Since there are many column types in Elasticsearch, you can also consider changing the column config when defining the schema.

class SimpleDoc(BaseDoc):
    tensor: NdArray[128] = Field(similarity='l2_norm', m=32, num_candidates=5000)


doc_index = ElasticDocIndex[SimpleDoc](index_name='my_index_1')

RuntimeConfig

The RuntimeConfig dataclass of ElasticDocIndex consists of chunk_size. You can change chunk_size for batch operations:

doc_index = ElasticDocIndex[SimpleDoc](index_name='my_index_2')
doc_index.configure(ElasticDocIndex.RuntimeConfig(chunk_size=1000))

You can pass the above as keyword arguments to the configure() method or pass an entire configuration object. See here for more information.

Persistence

You can hook into a database index that was persisted during a previous session by specifying the index_name and hosts:

doc_index = ElasticDocIndex[MyDoc](
    hosts='http://localhost:9200', index_name='previously_stored'
)
doc_index.index(index_docs)

doc_index2 = ElasticDocIndex[MyDoc](
    hosts='http://localhost:9200', index_name='previously_stored'
)

print(f'number of docs in the persisted index: {doc_index2.num_docs()}')

The examples provided primarily operate on a basic schema where each field corresponds to a straightforward type such as str or NdArray. However, it is also feasible to represent and store nested documents in a Document Index, including scenarios where a document contains a DocList of other documents.

Go to the Nested Data section to learn more.