Skip to content

FileDocStore

docarray.store.file.FileDocStore

Bases: AbstractDocStore

Class to push and pull DocList on-disk.

Source code in docarray/store/file.py
class FileDocStore(AbstractDocStore):
    """Class to push and pull [`DocList`][docarray.DocList] on-disk."""

    @staticmethod
    def _abs_filepath(name: str) -> Path:
        """Resolve a name to an absolute path.

        :param name: If it is not a path, the cache directory is prepended.
            If it is a path, it is resolved to an absolute path.
        :return: Path
        """
        if not (name.startswith('/') or name.startswith('~') or name.startswith('.')):
            name = str(_get_cache_path() / name)
        if name.startswith('~'):
            name = str(Path.home() / name[2:])
        return Path(name).resolve()

    @classmethod
    def list(
        cls: Type[SelfFileDocStore], namespace: str, show_table: bool
    ) -> List[str]:
        """List all [`DocList`s][docarray.DocList] in a directory.

        :param namespace: The directory to list.
        :param show_table: If True, print a table of the files in the directory.
        :return: A list of the names of the `DocLists` in the directory.
        """
        namespace_dir = cls._abs_filepath(namespace)
        if not namespace_dir.exists():
            raise FileNotFoundError(f'Directory {namespace} does not exist')
        da_files = [dafile for dafile in namespace_dir.glob('*.docs')]

        if show_table:
            from datetime import datetime

            from rich import box, filesize
            from rich.console import Console
            from rich.table import Table

            table = Table(
                title=f'You have {len(da_files)} DocLists in file://{namespace_dir}',
                box=box.SIMPLE,
                highlight=True,
            )
            table.add_column('Name')
            table.add_column('Last Modified', justify='center')
            table.add_column('Size')

            for da_file in da_files:
                table.add_row(
                    da_file.stem,
                    str(datetime.fromtimestamp(int(da_file.stat().st_ctime))),
                    str(filesize.decimal(da_file.stat().st_size)),
                )

            Console().print(table)

        return [dafile.stem for dafile in da_files]

    @classmethod
    def delete(
        cls: Type[SelfFileDocStore], name: str, missing_ok: bool = False
    ) -> bool:
        """Delete a [`DocList`][docarray.DocList] from the local filesystem.

        :param name: The name of the `DocList` to delete.
        :param missing_ok: If True, do not raise an exception if the file does not exist. Defaults to False.
        :return: True if the file was deleted, False if it did not exist.
        """
        path = cls._abs_filepath(name)
        try:
            path.with_suffix('.docs').unlink()
            return True
        except FileNotFoundError:
            if not missing_ok:
                raise
        return False

    @classmethod
    def push(
        cls: Type[SelfFileDocStore],
        docs: 'DocList',
        name: str,
        show_progress: bool,
    ) -> Dict:
        """Push this [`DocList`][docarray.DocList] object to the specified file path.

        :param docs: The `DocList` to push.
        :param name: The file path to push to.
        :param show_progress: If true, a progress bar will be displayed.
        """
        return cls.push_stream(iter(docs), name, show_progress)

    @classmethod
    def push_stream(
        cls: Type[SelfFileDocStore],
        docs: Iterator['BaseDoc'],
        name: str,
        show_progress: bool = False,
    ) -> Dict:
        """Push a stream of documents to the specified file path.

        :param docs: a stream of documents
        :param name: The file path to push to.
        :param show_progress: If true, a progress bar will be displayed.
        """
        source = _to_binary_stream(
            docs, protocol='protobuf', compress='gzip', show_progress=show_progress
        )
        path = cls._abs_filepath(name).with_suffix('.docs.tmp')
        if path.exists():
            raise ConcurrentPushException(f'File {path} already exists.')
        with open(path, 'wb') as f:
            while True:
                try:
                    f.write(next(source))
                except StopIteration:
                    break
        path.rename(path.with_suffix(''))
        return {}

    @classmethod
    def pull(
        cls: Type[SelfFileDocStore],
        docs_cls: Type['DocList'],
        name: str,
        show_progress: bool,
        local_cache: bool,
    ) -> 'DocList':
        """Pull a [`DocList`][docarray.DocList] from the specified url.

        :param name: The file path to pull from.
        :param show_progress: if true, display a progress bar.
        :param local_cache: store the downloaded `DocList` to local folder
        :return: a `DocList` object
        """

        return docs_cls(
            cls.pull_stream(
                docs_cls, name, show_progress=show_progress, local_cache=local_cache
            )
        )

    @classmethod
    def pull_stream(
        cls: Type[SelfFileDocStore],
        docs_cls: Type['DocList'],
        name: str,
        show_progress: bool,
        local_cache: bool,
    ) -> Iterator['BaseDoc']:
        """Pull a stream of Documents from the specified file.

        :param name: The file path to pull from.
        :param show_progress: if true, display a progress bar.
        :param local_cache: Not used by the ``file`` protocol.
        :return: Iterator of Documents
        """

        if local_cache:
            logging.warning('local_cache is not supported for "file" protocol')

        path = cls._abs_filepath(name).with_suffix('.docs')
        source = open(path, 'rb')
        return _from_binary_stream(
            docs_cls.doc_type,
            source,
            protocol='protobuf',
            compress='gzip',
            show_progress=show_progress,
        )

delete(name, missing_ok=False) classmethod

Delete a DocList from the local filesystem.

Parameters:

Name Type Description Default
name str

The name of the DocList to delete.

required
missing_ok bool

If True, do not raise an exception if the file does not exist. Defaults to False.

False

Returns:

Type Description
bool

True if the file was deleted, False if it did not exist.

Source code in docarray/store/file.py
@classmethod
def delete(
    cls: Type[SelfFileDocStore], name: str, missing_ok: bool = False
) -> bool:
    """Delete a [`DocList`][docarray.DocList] from the local filesystem.

    :param name: The name of the `DocList` to delete.
    :param missing_ok: If True, do not raise an exception if the file does not exist. Defaults to False.
    :return: True if the file was deleted, False if it did not exist.
    """
    path = cls._abs_filepath(name)
    try:
        path.with_suffix('.docs').unlink()
        return True
    except FileNotFoundError:
        if not missing_ok:
            raise
    return False

list(namespace, show_table) classmethod

List all DocLists in a directory.

Parameters:

Name Type Description Default
namespace str

The directory to list.

required
show_table bool

If True, print a table of the files in the directory.

required

Returns:

Type Description
List[str]

A list of the names of the DocLists in the directory.

Source code in docarray/store/file.py
@classmethod
def list(
    cls: Type[SelfFileDocStore], namespace: str, show_table: bool
) -> List[str]:
    """List all [`DocList`s][docarray.DocList] in a directory.

    :param namespace: The directory to list.
    :param show_table: If True, print a table of the files in the directory.
    :return: A list of the names of the `DocLists` in the directory.
    """
    namespace_dir = cls._abs_filepath(namespace)
    if not namespace_dir.exists():
        raise FileNotFoundError(f'Directory {namespace} does not exist')
    da_files = [dafile for dafile in namespace_dir.glob('*.docs')]

    if show_table:
        from datetime import datetime

        from rich import box, filesize
        from rich.console import Console
        from rich.table import Table

        table = Table(
            title=f'You have {len(da_files)} DocLists in file://{namespace_dir}',
            box=box.SIMPLE,
            highlight=True,
        )
        table.add_column('Name')
        table.add_column('Last Modified', justify='center')
        table.add_column('Size')

        for da_file in da_files:
            table.add_row(
                da_file.stem,
                str(datetime.fromtimestamp(int(da_file.stat().st_ctime))),
                str(filesize.decimal(da_file.stat().st_size)),
            )

        Console().print(table)

    return [dafile.stem for dafile in da_files]

pull(docs_cls, name, show_progress, local_cache) classmethod

Pull a DocList from the specified url.

Parameters:

Name Type Description Default
name str

The file path to pull from.

required
show_progress bool

if true, display a progress bar.

required
local_cache bool

store the downloaded DocList to local folder

required

Returns:

Type Description
DocList

a DocList object

Source code in docarray/store/file.py
@classmethod
def pull(
    cls: Type[SelfFileDocStore],
    docs_cls: Type['DocList'],
    name: str,
    show_progress: bool,
    local_cache: bool,
) -> 'DocList':
    """Pull a [`DocList`][docarray.DocList] from the specified url.

    :param name: The file path to pull from.
    :param show_progress: if true, display a progress bar.
    :param local_cache: store the downloaded `DocList` to local folder
    :return: a `DocList` object
    """

    return docs_cls(
        cls.pull_stream(
            docs_cls, name, show_progress=show_progress, local_cache=local_cache
        )
    )

pull_stream(docs_cls, name, show_progress, local_cache) classmethod

Pull a stream of Documents from the specified file.

Parameters:

Name Type Description Default
name str

The file path to pull from.

required
show_progress bool

if true, display a progress bar.

required
local_cache bool

Not used by the file protocol.

required

Returns:

Type Description
Iterator[BaseDoc]

Iterator of Documents

Source code in docarray/store/file.py
@classmethod
def pull_stream(
    cls: Type[SelfFileDocStore],
    docs_cls: Type['DocList'],
    name: str,
    show_progress: bool,
    local_cache: bool,
) -> Iterator['BaseDoc']:
    """Pull a stream of Documents from the specified file.

    :param name: The file path to pull from.
    :param show_progress: if true, display a progress bar.
    :param local_cache: Not used by the ``file`` protocol.
    :return: Iterator of Documents
    """

    if local_cache:
        logging.warning('local_cache is not supported for "file" protocol')

    path = cls._abs_filepath(name).with_suffix('.docs')
    source = open(path, 'rb')
    return _from_binary_stream(
        docs_cls.doc_type,
        source,
        protocol='protobuf',
        compress='gzip',
        show_progress=show_progress,
    )

push(docs, name, show_progress) classmethod

Push this DocList object to the specified file path.

Parameters:

Name Type Description Default
docs DocList

The DocList to push.

required
name str

The file path to push to.

required
show_progress bool

If true, a progress bar will be displayed.

required
Source code in docarray/store/file.py
@classmethod
def push(
    cls: Type[SelfFileDocStore],
    docs: 'DocList',
    name: str,
    show_progress: bool,
) -> Dict:
    """Push this [`DocList`][docarray.DocList] object to the specified file path.

    :param docs: The `DocList` to push.
    :param name: The file path to push to.
    :param show_progress: If true, a progress bar will be displayed.
    """
    return cls.push_stream(iter(docs), name, show_progress)

push_stream(docs, name, show_progress=False) classmethod

Push a stream of documents to the specified file path.

Parameters:

Name Type Description Default
docs Iterator[BaseDoc]

a stream of documents

required
name str

The file path to push to.

required
show_progress bool

If true, a progress bar will be displayed.

False
Source code in docarray/store/file.py
@classmethod
def push_stream(
    cls: Type[SelfFileDocStore],
    docs: Iterator['BaseDoc'],
    name: str,
    show_progress: bool = False,
) -> Dict:
    """Push a stream of documents to the specified file path.

    :param docs: a stream of documents
    :param name: The file path to push to.
    :param show_progress: If true, a progress bar will be displayed.
    """
    source = _to_binary_stream(
        docs, protocol='protobuf', compress='gzip', show_progress=show_progress
    )
    path = cls._abs_filepath(name).with_suffix('.docs.tmp')
    if path.exists():
        raise ConcurrentPushException(f'File {path} already exists.')
    with open(path, 'wb') as f:
        while True:
            try:
                f.write(next(source))
            except StopIteration:
                break
    path.rename(path.with_suffix(''))
    return {}