"""
Copyright 2023 Man Group Operations Limited
Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
import datetime
from enum import Enum, auto
from typing import Optional, Any, Tuple, Dict, AnyStr, Union, List, Iterable, NamedTuple
from numpy import datetime64
from arcticdb.supported_types import Timestamp
from arcticdb.version_store.processing import QueryBuilder
from arcticdb.version_store._store import NativeVersionStore, VersionedItem, VersionQueryInput
from arcticdb_ext.exceptions import ArcticException
from arcticdb_ext.version_store import DataError
import pandas as pd
import numpy as np
import logging
logger = logging.getLogger(__name__)
AsOf = Union[int, str, datetime.datetime]
NORMALIZABLE_TYPES = (pd.DataFrame, pd.Series, np.ndarray)
NormalizableType = Union[NORMALIZABLE_TYPES]
"""Types that can be normalised into Arctic's internal storage structure.
See Also
--------
Library.write: for more documentation on normalisation.
"""
[docs]
class ArcticInvalidApiUsageException(ArcticException):
"""Exception indicating an invalid call made to the Arctic API."""
[docs]
class ArcticDuplicateSymbolsInBatchException(ArcticInvalidApiUsageException):
"""Exception indicating that duplicate symbols were passed to a batch method of this module."""
[docs]
class ArcticUnsupportedDataTypeException(ArcticInvalidApiUsageException):
"""Exception indicating that a method does not support the type of data provided."""
[docs]
class SymbolVersion(NamedTuple):
"""A named tuple. A symbol name - version pair.
Attributes
----------
symbol : str
Symbol name.
version : int
Version of the symbol.
"""
symbol: str
version: int
def __repr__(self):
return f"{self.symbol}_v{self.version}"
[docs]
class VersionInfo(NamedTuple):
"""A named tuple. Descriptive information about a particular version of a symbol.
Attributes
----------
date: datetime.datetime
Time that the version was written in UTC.
deleted: bool
True if the version has been deleted and is only being kept alive via a snapshot.
snapshots: List[str]
Snapshots that refer to this version.
"""
date: datetime.datetime
deleted: bool
snapshots: List[str]
def __repr__(self):
result = f"(date={self.date}"
if self.deleted:
result += ", deleted"
if self.snapshots:
result += f", snapshots={self.snapshots}"
result += ")"
return result
class NameWithDType(NamedTuple):
"""A named tuple. A name and dtype description pair."""
name: str
dtype: str
[docs]
class SymbolDescription(NamedTuple):
"""A named tuple. Descriptive information about the data stored under a particular symbol.
Attributes
----------
columns: Tuple[NameWithDType]
Columns stored under the symbol.
index : NameWithDType
Index of the symbol.
index_type : str {"NA", "index", "multi_index"}
Whether the index is a simple index or a multi_index. ``NA`` indicates that the stored data does not have an index.
row_count : int
Number of rows.
last_update_time : datetime64
The time of the last update to the symbol, in UTC.
date_range : Tuple[datetime.datetime, datetime.datetime]
The times in UTC that data for this symbol spans. If the data is not timeseries indexed then this value will be
``(datetime.datetime(1970, 1, 1), datetime.datetime(1970, 1, 1))``.
"""
columns: Tuple[NameWithDType]
index: NameWithDType
index_type: str
row_count: int
last_update_time: datetime64
date_range: Tuple[datetime.datetime, datetime.datetime]
[docs]
class WritePayload:
"""
WritePayload is designed to enable batching of multiple operations with an API that mirrors the singular
``write`` API.
Construction of ``WritePayload`` objects is only required for batch write operations.
One instance of ``WritePayload`` refers to one unit that can be written through to ArcticDB.
"""
[docs]
def __init__(self, symbol: str, data: Union[Any, NormalizableType], metadata: Any = None):
"""
Constructor.
Parameters
----------
symbol : str
Symbol name. Limited to 255 characters. The following characters are not supported in symbols:
``"*", "&", "<", ">"``
data : Any
Data to be written. If data is not of NormalizableType then it will be pickled.
metadata : Any, default=None
Optional metadata to persist along with the symbol.
See Also
--------
Library.write_pickle: For information on the implications of providing data that needs to be pickled.
"""
self.symbol = symbol
self.data = data
self.metadata = metadata
def __repr__(self):
return f"WriteArgs(symbol={self.symbol}, data_id={id(self.data)}, metadata={self.metadata})"
def __iter__(self):
yield self.symbol
yield self.data
if self.metadata is not None:
yield self.metadata
[docs]
class ReadRequest(NamedTuple):
"""ReadRequest is designed to enable batching of read operations with an API that mirrors the singular ``read`` API.
Therefore, construction of this object is only required for batch read operations.
Attributes
----------
as_of: Optional[AsOf], default=none
See `read` method.
date_range: Optional[Tuple[Optional[Timestamp], Optional[Timestamp]]], default=none
See `read`method.
columns: Optional[List[str]], default=none
See `read` method.
query_builder: Optional[Querybuilder], default=none
See `read` method.
See Also
--------
Library.read: For documentation on the parameters.
"""
symbol: str
as_of: Optional[AsOf] = None
date_range: Optional[Tuple[Optional[Timestamp], Optional[Timestamp]]] = None
columns: Optional[List[str]] = None
query_builder: Optional[QueryBuilder] = None
[docs]
class ReadInfoRequest(NamedTuple):
"""ReadInfoRequest is useful for batch methods like read_metadata_batch and get_description_batch, where we
only need to specify the symbol and the version information. Therefore, construction of this object is
only required for these batch operations.
Attributes
----------
symbol : str
See `read_metadata` method.
as_of: Optional[AsOf], default=none
See `read_metadata` method.
See Also
--------
Library.read: For documentation on the parameters.
"""
symbol: str
as_of: Optional[AsOf] = None
class StagedDataFinalizeMethod(Enum):
WRITE = auto()
APPEND = auto()
[docs]
class Library:
"""
The main interface exposing read/write functionality within a given Arctic instance.
Arctic libraries contain named symbols which are the atomic unit of data storage within Arctic. Symbols
contain data that in most cases resembles a DataFrame and are versioned such that all modifying
operations can be tracked and reverted.
Instances of this class provide a number of primitives to write, modify and remove symbols, as well as
also providing methods to manage library snapshots. For more information on snapshots please see the `snapshot`
method.
Arctic libraries support concurrent writes and reads to multiple symbols as well as concurrent reads to a single
symbol. However, concurrent writers to a single symbol are not supported other than for primitives that
explicitly state support for single-symbol concurrent writes.
"""
[docs]
def __init__(self, arctic_instance_description: str, nvs: NativeVersionStore):
"""
Parameters
----------
arctic_instance_description
Human readable description of the Arctic instance to which this library belongs. Used for informational
purposes only.
nvs
The native version store that backs this library.
"""
self.arctic_instance_desc = arctic_instance_description
self._nvs = nvs
self._nvs._normalizer.df._skip_df_consolidation = True
def __repr__(self):
return "Library(%s, path=%s, storage=%s)" % (
self.arctic_instance_desc,
self._nvs._lib_cfg.lib_desc.name,
self._nvs.get_backing_store(),
)
def __getitem__(self, symbol: str) -> VersionedItem:
return self.read(symbol)
def __contains__(self, symbol: str):
return self.has_symbol(symbol)
[docs]
def write(
self,
symbol: str,
data: NormalizableType,
metadata: Any = None,
prune_previous_versions: bool = False,
staged=False,
validate_index=True,
) -> VersionedItem:
"""
Write ``data`` to the specified ``symbol``. If ``symbol`` already exists then a new version will be created to
reference the newly written data. For more information on versions see the documentation for the `read`
primitive.
``data`` must be of a format that can be normalised into Arctic's internal storage structure. Pandas
DataFrames, Pandas Series and Numpy NDArrays can all be normalised. Normalised data will be split along both the
columns and rows into segments. By default, a segment will contain 100,000 rows and 127 columns.
If this library has ``write_deduplication`` enabled then segments will be deduplicated against storage prior to
write to reduce required IO operations and storage requirements. Data will be effectively deduplicated for all
segments up until the first differing row when compared to storage. As a result, modifying the beginning
of ``data`` with respect to previously written versions may significantly reduce the effectiveness of
deduplication.
Note that `write` is not designed for multiple concurrent writers over a single symbol *unless the staged
keyword argument is set to True*. If ``staged`` is True, written segments will be staged and left in an
"incomplete" stage, unable to be read until they are finalized. This enables multiple
writers to a single symbol - all writing staged data at the same time - with one process able to later finalise
all staged data rendering the data readable by clients. To finalise staged data, see `finalize_staged_data`.
Note: ArcticDB will use the 0-th level index of the Pandas DataFrame for its on-disk index.
Any non-`DatetimeIndex` will converted into an internal `RowCount` index. That is, ArcticDB will assign each
row a monotonically increasing integer identifier and that will be used for the index.
Parameters
----------
symbol : str
Symbol name. Limited to 255 characters. The following characters are not supported in symbols:
``"*", "&", "<", ">"``
data : NormalizableType
Data to be written. To write non-normalizable data, use `write_pickle`.
metadata : Any, default=None
Optional metadata to persist along with the symbol.
prune_previous_versions : bool, default=False
Removes previous (non-snapshotted) versions from the database.
staged : bool, default=False
Whether to write to a staging area rather than immediately to the library.
validate_index: bool, default=False
If True, will verify that the index of `data` supports date range searches and update operations. This in effect tests that the data is sorted in ascending order.
ArcticDB relies on Pandas to detect if data is sorted - you can call DataFrame.index.is_monotonic_increasing on your input DataFrame to see if Pandas believes the
data to be sorted
Note that each unit of staged data must a) be datetime indexed and b) not overlap with any other unit of
staged data. Note that this will create symbols with Dynamic Schema enabled.
Returns
-------
VersionedItem
Structure containing metadata and version number of the written symbol in the store.
Raises
------
ArcticUnsupportedDataTypeException
If ``data`` is not of NormalizableType.
UnsortedDataException
If data is unsorted, when validate_index is set to True.
Examples
--------
>>> df = pd.DataFrame({'column': [5,6,7]})
>>> lib.write("symbol", df, metadata={'my_dictionary': 'is_great'})
>>> lib.read("symbol").data
column
0 5
1 6
2 7
Staging data for later finalisation (enables concurrent writes):
>>> df = pd.DataFrame({'column': [5,6,7]}, index=pd.date_range(start='1/1/2000', periods=3))
>>> lib.write("staged", df, staged=True) # Multiple staged writes can occur in parallel
>>> lib.finalize_staged_data("staged", StagedDataFinalizeMethod.WRITE) # Must be run after all staged writes have completed
>>> lib.read("staged").data # Would return error if run before finalization
column
2000-01-01 5
2000-01-02 6
2000-01-03 7
WritePayload objects can be unpacked and used as parameters:
>>> w = WritePayload("symbol", df, metadata={'the': 'metadata'})
>>> lib.write(*w, staged=True)
"""
if not isinstance(data, NORMALIZABLE_TYPES):
raise ArcticUnsupportedDataTypeException(
"data is of a type that cannot be normalized. Consider using "
f"write_pickle instead. type(data)=[{type(data)}]"
)
return self._nvs.write(
symbol=symbol,
data=data,
metadata=metadata,
prune_previous_version=prune_previous_versions,
pickle_on_failure=False,
parallel=staged,
validate_index=validate_index,
)
[docs]
def write_pickle(
self, symbol: str, data: Any, metadata: Any = None, prune_previous_versions: bool = False, staged=False
) -> VersionedItem:
"""
See `write`. This method differs from `write` only in that ``data`` can be of any type that is serialisable via
the Pickle library. There are significant downsides to storing data in this way:
- Retrieval can only be done in bulk. Calls to `read` will not support `date_range`, `query_builder` or `columns`.
- The data cannot be updated or appended to via the update and append methods.
- Writes cannot be deduplicated in any way.
Parameters
----------
symbol
See documentation on `write`.
data : `Any`
Data to be written.
metadata
See documentation on `write`.
prune_previous_versions
See documentation on `write`.
staged
See documentation on `write`.
Returns
-------
VersionedItem
See documentation on `write`.
Examples
--------
>>> lib.write_pickle("symbol", [1,2,3])
>>> lib.read("symbol").data
[1, 2, 3]
See Also
--------
write: For more detailed documentation.
"""
return self._nvs.write(
symbol=symbol,
data=data,
metadata=metadata,
prune_previous_version=prune_previous_versions,
pickle_on_failure=True,
parallel=staged,
)
@staticmethod
def _raise_if_duplicate_symbols_in_batch(batch):
symbols = {p.symbol for p in batch}
if len(symbols) < len(batch):
raise ArcticDuplicateSymbolsInBatchException
@staticmethod
def _raise_if_unsupported_type_in_write_batch(payloads):
bad_symbols = []
for p in payloads:
if not isinstance(p.data, NORMALIZABLE_TYPES):
bad_symbols.append((p.symbol, type(p.data)))
if not bad_symbols:
return
error_message = (
"payload contains some data of types that cannot be normalized. Consider using "
f"write_pickle_batch instead. symbols with bad datatypes={bad_symbols[:5]}"
)
if len(bad_symbols) > 5:
error_message += f" (and more)... {len(bad_symbols)} data in total have bad types."
raise ArcticUnsupportedDataTypeException(error_message)
[docs]
def write_batch(
self, payloads: List[WritePayload], prune_previous_versions: bool = False, staged=False, validate_index=True
) -> List[VersionedItem]:
"""
Write a batch of multiple symbols.
Parameters
----------
payloads : `List[WritePayload]`
Symbols and their corresponding data. There must not be any duplicate symbols in `payload`.
prune_previous_versions: `bool`, default=False
See `write`.
staged: `bool`, default=False
See `write`.
validate_index: bool, default=False
If True, will verify for each entry in the batch hat the index of `data` supports date range searches and update operations.
This in effect tests that the data is sorted in ascending order. ArcticDB relies on Pandas to detect if data is sorted -
you can call DataFrame.index.is_monotonic_increasing on your input DataFrame to see if Pandas believes the data to be sorted
Returns
-------
List[VersionedItem]
Structure containing metadata and version number of the written symbols in the store, in the
same order as `payload`.
Raises
------
ArcticDuplicateSymbolsInBatchException
When duplicate symbols appear in payload.
ArcticUnsupportedDataTypeException
If data that is not of NormalizableType appears in any of the payloads.
UnsortedDataException
If data is unsorted, when validate_index is set to True.
See Also
--------
write: For more detailed documentation.
Examples
--------
Writing a simple batch:
>>> df_1 = pd.DataFrame({'column': [1,2,3]})
>>> df_2 = pd.DataFrame({'column': [4,5,6]})
>>> payload_1 = WritePayload("symbol_1", df_1, metadata={'the': 'metadata'})
>>> payload_2 = WritePayload("symbol_2", df_2)
>>> items = lib.write_batch([payload_1, payload_2])
>>> lib.read("symbol_1").data
column
0 1
1 2
2 3
>>> lib.read("symbol_2").data
column
0 4
1 5
2 6
>>> items[0].symbol, items[1].symbol
('symbol_1', 'symbol_2')
"""
self._raise_if_duplicate_symbols_in_batch(payloads)
self._raise_if_unsupported_type_in_write_batch(payloads)
return self._nvs.batch_write(
[p.symbol for p in payloads],
[p.data for p in payloads],
[p.metadata for p in payloads],
prune_previous_version=prune_previous_versions,
pickle_on_failure=False,
parallel=staged,
validate_index=validate_index,
)
[docs]
def write_pickle_batch(
self, payloads: List[WritePayload], prune_previous_versions: bool = False, staged=False
) -> List[VersionedItem]:
"""
Write a batch of multiple symbols, pickling their data if necessary.
Parameters
----------
payloads : `List[WritePayload]`
Symbols and their corresponding data. There must not be any duplicate symbols in `payload`.
prune_previous_versions: `bool`, default=False
See `write`.
staged: `bool`, default=False
See `write`.
Returns
-------
List[VersionedItem]
Structures containing metadata and version number of the written symbols in the store, in the
same order as `payload`.
Raises
------
ArcticDuplicateSymbolsInBatchException
When duplicate symbols appear in payload.
See Also
--------
write: For more detailed documentation.
write_pickle: For information on the implications of providing data that needs to be pickled.
"""
self._raise_if_duplicate_symbols_in_batch(payloads)
return self._nvs.batch_write(
[p.symbol for p in payloads],
[p.data for p in payloads],
[p.metadata for p in payloads],
prune_previous_version=prune_previous_versions,
pickle_on_failure=True,
parallel=staged,
)
[docs]
def append(
self,
symbol: str,
data: NormalizableType,
metadata: Any = None,
prune_previous_versions: bool = False,
validate_index: bool = True,
) -> Optional[VersionedItem]:
"""
Appends the given data to the existing, stored data. Append always appends along the index. A new version will
be created to reference the newly-appended data. Append only accepts data for which the index of the first
row is equal to or greater than the index of the last row in the existing data.
Appends containing differing column sets to the existing data are only possible if the library has been
configured to support dynamic schemas.
Note that `append` is not designed for multiple concurrent writers over a single symbol.
Parameters
----------
symbol
Symbol name.
data
Data to be written.
metadata
Optional metadata to persist along with the new symbol version. Note that the metadata is
not combined in any way with the metadata stored in the previous version.
prune_previous_versions
Removes previous (non-snapshotted) versions from the database when True.
validate_index: bool, default=False
If True, will verify that resulting symbol will support date range searches and update operations. This in effect tests that the previous version of the
data and `data` are both sorted in ascending order. ArcticDB relies on Pandas to detect if data is sorted - you can call DataFrame.index.is_monotonic_increasing
on your input DataFrame to see if Pandas believes the data to be sorted
Returns
-------
VersionedItem
Structure containing metadata and version number of the written symbol in the store.
Raises
------
UnsortedDataException
If data is unsorted, when validate_index is set to True.
Examples
--------
>>> df = pd.DataFrame(
... {'column': [1,2,3]},
... index=pd.date_range(start='1/1/2018', end='1/03/2018')
... )
>>> df
column
2018-01-01 1
2018-01-02 2
2018-01-03 3
>>> lib.write("symbol", df)
>>> to_append_df = pd.DataFrame(
... {'column': [4,5,6]},
... index=pd.date_range(start='1/4/2018', end='1/06/2018')
... )
>>> to_append_df
column
2018-01-04 4
2018-01-05 5
2018-01-06 6
>>> lib.append("symbol", to_append_df)
>>> lib.read("symbol").data
column
2018-01-01 1
2018-01-02 2
2018-01-03 3
2018-01-04 4
2018-01-05 5
2018-01-06 6
"""
return self._nvs.append(
symbol=symbol,
dataframe=data,
metadata=metadata,
prune_previous_version=prune_previous_versions,
validate_index=validate_index,
)
[docs]
def update(
self,
symbol: str,
data: Union[pd.DataFrame, pd.Series],
metadata: Any = None,
upsert: bool = False,
date_range: Optional[Tuple[Optional[Timestamp], Optional[Timestamp]]] = None,
prune_previous_versions=False,
) -> VersionedItem:
"""
Overwrites existing symbol data with the contents of ``data``. The entire range between the first and last index
entry in ``data`` is replaced in its entirety with the contents of ``data``, adding additional index entries if
required. `update` only operates over the outermost index level - this means secondary index rows will be
removed if not contained in ``data``.
Both the existing symbol version and ``data`` must be timeseries-indexed.
Note that `update` is not designed for multiple concurrent writers over a single symbol.
Parameters
----------
symbol
Symbol name.
data
Timeseries indexed data to use for the update.
metadata
Metadata to persist along with the new symbol version.
upsert: bool, default=False
If True, will write the data even if the symbol does not exist.
date_range: `Tuple[Optional[Timestamp], Optional[Timestamp]]`, default=None
If a range is specified, it will delete the stored value within the range and overwrite it with the data in
``data``. This allows the user to update with data that might only be a subset of the stored value. Leaving
any part of the tuple as None leaves that part of the range open ended. Only data with date_range will be
modified, even if ``data`` covers a wider date range.
prune_previous_versions, default=False
Removes previous (non-snapshotted) versions from the database when True.
Examples
--------
>>> df = pd.DataFrame(
... {'column': [1,2,3,4]},
... index=pd.date_range(start='1/1/2018', end='1/4/2018')
... )
>>> df
column
2018-01-01 1
2018-01-02 2
2018-01-03 3
2018-01-04 4
>>> lib.write("symbol", df)
>>> update_df = pd.DataFrame(
... {'column': [400, 40]},
... index=pd.date_range(start='1/1/2018', end='1/3/2018', freq='2D')
... )
>>> update_df
column
2018-01-01 400
2018-01-03 40
>>> lib.update("symbol", update_df)
>>> # Note that 2018-01-02 is gone despite not being in update_df
>>> lib.read("symbol").data
column
2018-01-01 400
2018-01-03 40
2018-01-04 4
"""
return self._nvs.update(
symbol=symbol,
data=data,
metadata=metadata,
upsert=upsert,
date_range=date_range,
prune_previous_version=prune_previous_versions,
)
[docs]
def finalize_staged_data(
self, symbol: str, mode: Optional[StagedDataFinalizeMethod] = StagedDataFinalizeMethod.WRITE
):
"""
Finalises staged data, making it available for reads.
Parameters
----------
symbol : `str`
Symbol to finalize data for.
mode : `StagedDataFinalizeMethod`, default=StagedDataFinalizeMethod.WRITE
Finalise mode. Valid options are WRITE or APPEND. Write collects the staged data and writes them to a
new timeseries. Append collects the staged data and appends them to the latest version.
See Also
--------
write
Documentation on the ``staged`` parameter explains the concept of staged data in more detail.
"""
self._nvs.compact_incomplete(symbol, mode == StagedDataFinalizeMethod.APPEND, False)
[docs]
def sort_and_finalize_staged_data(
self, symbol: str, mode: Optional[StagedDataFinalizeMethod] = StagedDataFinalizeMethod.WRITE
):
"""
sort_merge will sort and finalize staged data. This differs from `finalize_staged_data` in that it
can support staged segments with interleaved time periods - the end result will be ordered. This requires
performing a full sort in memory so can be time consuming.
Parameters
----------
symbol : `str`
Symbol to finalize data for.
mode : `StagedDataFinalizeMethod`, default=StagedDataFinalizeMethod.WRITE
Finalise mode. Valid options are WRITE or APPEND. Write collects the staged data and writes them to a
new timeseries. Append collects the staged data and appends them to the latest version.
See Also
--------
write
Documentation on the ``staged`` parameter explains the concept of staged data in more detail.
"""
self._nvs.version_store.sort_merge(symbol, None, mode == StagedDataFinalizeMethod.APPEND, False)
[docs]
def get_staged_symbols(self) -> List[str]:
"""
Returns all symbols with staged, unfinalized data.
Returns
-------
List[str]
Symbol names.
See Also
--------
write
Documentation on the ``staged`` parameter explains the concept of staged data in more detail.
"""
return self._nvs.version_store.get_incomplete_symbols()
[docs]
def read(
self,
symbol: str,
as_of: Optional[AsOf] = None,
date_range: Optional[Tuple[Optional[Timestamp], Optional[Timestamp]]] = None,
columns: Optional[List[str]] = None,
query_builder: Optional[QueryBuilder] = None,
) -> VersionedItem:
"""
Read data for the named symbol. Returns a VersionedItem object with a data and metadata element (as passed into
write).
Parameters
----------
symbol : str
Symbol name.
as_of : AsOf, default=None
Return the data as it was as of the point in time. ``None`` means that the latest version should be read. The
various types of this parameter mean:
- ``int``: specific version number. Negative indexing is supported, with -1 representing the latest version, -2 the version before that, etc.
- ``str``: snapshot name which contains the version
- ``datetime.datetime`` : the version of the data that existed ``as_of`` the requested point in time
date_range: Tuple[Optional[Timestamp], Optional[Timestamp]], default=None
DateRange to restrict read data to.
Applicable only for time-indexed Pandas dataframes or series. Returns only the
part of the data that falls withing the given range (inclusive). None on either end leaves that part of the
range open-ended. Hence specifying ``(None, datetime(2025, 1, 1)`` declares that you wish to read all data up
to and including 20250101.
columns: List[str], default=None
Applicable only for Pandas data. Determines which columns to return data for.
query_builder: Optional[QueryBuilder], default=None
A QueryBuilder object to apply to the dataframe before it is returned. For more information see the
documentation for the QueryBuilder class (``from arcticdb import QueryBuilder; help(QueryBuilder)``).
Returns
-------
VersionedItem object that contains a .data and .metadata element
Examples
--------
>>> df = pd.DataFrame({'column': [5,6,7]})
>>> lib.write("symbol", df, metadata={'my_dictionary': 'is_great'})
>>> lib.read("symbol").data
column
0 5
1 6
2 7
The default read behaviour is also available through subscripting:
>>> lib["symbol"].data
column
0 5
1 6
2 7
"""
return self._nvs.read(
symbol=symbol, as_of=as_of, date_range=date_range, columns=columns, query_builder=query_builder
)
[docs]
def read_batch(
self, symbols: List[Union[str, ReadRequest]], query_builder: Optional[QueryBuilder] = None
) -> List[Union[VersionedItem, DataError]]:
"""
Reads multiple symbols.
Parameters
----------
symbols : List[Union[str, ReadRequest]]
List of symbols to read.
query_builder: Optional[QueryBuilder], default=None
A single QueryBuilder to apply to all the dataframes before they are returned. If this argument is passed
then none of the ``symbols`` may have their own query_builder specified in their request.
Returns
-------
List[Union[VersionedItem, DataError]]
A list of the read results, whose i-th element corresponds to the i-th element of the ``symbols`` parameter.
If the specified version does not exist, a DataError object is returned, with symbol, version_request_type,
version_request_data properties, error_code, error_category, and exception_string properties.
Raises
------
ArcticInvalidApiUsageException
If kwarg query_builder and per-symbol query builders both used.
Examples
--------
>>> lib.write("s1", pd.DataFrame())
>>> lib.write("s2", pd.DataFrame({"col": [1, 2, 3]}))
>>> lib.write("s2", pd.DataFrame(), prune_previous_versions=False)
>>> lib.write("s3", pd.DataFrame())
>>> batch = lib.read_batch(["s1", ReadRequest("s2", as_of=0), "s3", ReadRequest("s2", as_of=1000)])
>>> batch[0].data.empty
True
>>> batch[1].data.empty
False
>>> batch[2].data.empty
True
>>> batch[3].symbol
"s2"
>>> from arcticdb import DataError
>>> isinstance(batch[3], DataError)
True
>>> batch[3].version_request_type
VersionRequestType.SPECIFIC
>>> batch[3].version_request_data
1000
>>> batch[3].error_code
ErrorCode.E_NO_SUCH_VERSION
>>> batch[3].error_category
ErrorCategory.MISSING_DATA
See Also
--------
read
"""
symbol_strings = []
as_ofs = []
date_ranges = []
columns = []
query_builders = []
def handle_read_request(s_):
symbol_strings.append(s_.symbol)
as_ofs.append(s_.as_of)
date_ranges.append(s_.date_range)
columns.append(s_.columns)
if s_.query_builder is not None and query_builder is not None:
raise ArcticInvalidApiUsageException(
"kwarg query_builder and per-symbol query builders cannot "
f"both be used but {s_} had its own query_builder specified."
)
else:
query_builders.append(s_.query_builder)
def handle_symbol(s_):
symbol_strings.append(s_)
for l_ in (as_ofs, date_ranges, columns, query_builders):
l_.append(None)
for s in symbols:
if isinstance(s, str):
handle_symbol(s)
elif isinstance(s, ReadRequest):
handle_read_request(s)
else:
raise ArcticInvalidApiUsageException(
f"Unsupported item in the symbols argument s=[{s}] type(s)=[{type(s)}]. Only [str] and"
" [ReadRequest] are supported."
)
throw_on_missing_version = False
return self._nvs._batch_read_to_versioned_items(
symbol_strings, as_ofs, date_ranges, columns, query_builder or query_builders, throw_on_missing_version
)
[docs]
def snapshot(
self,
snapshot_name: str,
metadata: Any = None,
skip_symbols: Optional[List[str]] = None,
versions: Optional[Dict[str, int]] = None,
) -> None:
"""
Creates a named snapshot of the data within a library.
By default, the latest version of every symbol that has not been deleted will be contained within the snapshot.
You can change this behaviour with either ``versions`` (an allow-list) or with ``skip_symbols`` (a deny-list).
Concurrent writes with prune previous versions set while the snapshot is being taken can potentially lead to
corruption of the affected symbols in the snapshot.
The symbols and versions contained within the snapshot will persist regardless of new symbols and versions
being written to the library afterwards. If a version or symbol referenced in a snapshot is deleted then the
underlying data will be preserved to ensure the snapshot is still accessible. Only once all referencing
snapshots have been removed will the underlying data be removed as well.
At most one of ``skip_symbols`` and ``versions`` may be truthy.
Parameters
----------
snapshot_name
Name of the snapshot.
metadata : Any, default=None
Optional metadata to persist along with the snapshot.
skip_symbols : List[str], default=None
Optional symbols to be excluded from the snapshot.
versions: Dict[str, int], default=None
Optional dictionary of versions of symbols to snapshot. For example `versions={"a": 2, "b": 3}` will
snapshot version 2 of symbol "a" and version 3 of symbol "b".
Raises
------
InternalException
If a snapshot already exists with ``snapshot_name``. You must explicitly delete the pre-existing snapshot.
"""
self._nvs.snapshot(snap_name=snapshot_name, metadata=metadata, skip_symbols=skip_symbols, versions=versions)
[docs]
def delete(self, symbol: str, versions: Optional[Union[int, Iterable[int]]] = None):
"""
Delete all versions of the symbol from the library, unless ``version`` is specified, in which case only those
versions are deleted.
This may not actually delete the underlying data if a snapshot still references the version. See `snapshot` for
more detail.
Note that this may require data to be removed from the underlying storage which can be slow.
If no symbol called ``symbol`` exists then this is a no-op. In particular this method does not raise in this case.
Parameters
----------
symbol
Symbol to delete.
versions
Version or versions of symbol to delete. If ``None`` then all versions will be deleted.
"""
if versions is None:
self._nvs.delete(symbol)
return
if isinstance(versions, int):
versions = (versions,)
for v in versions:
self._nvs.delete_version(symbol, v)
[docs]
def prune_previous_versions(self, symbol):
"""Removes all (non-snapshotted) versions from the database for the given symbol, except the latest.
Parameters
----------
symbol : `str`
Symbol name to prune.
"""
self._nvs.prune_previous_versions(symbol)
[docs]
def delete_data_in_range(self, symbol: str, date_range: Tuple[Optional[Timestamp], Optional[Timestamp]]):
"""Delete data within the given date range, creating a new version of ``symbol``.
The existing symbol version must be timeseries-indexed.
Parameters
----------
symbol
Symbol name.
date_range
The date range in which to delete data. Leaving any part of the tuple as None leaves that part of the range
open ended.
Examples
--------
>>> df = pd.DataFrame({"column": [5, 6, 7, 8]}, index=pd.date_range(start="1/1/2018", end="1/4/2018"))
>>> lib.write("symbol", df)
>>> lib.delete_data_in_range("symbol", date_range=(datetime.datetime(2018, 1, 1), datetime.datetime(2018, 1, 2)))
>>> lib["symbol"].version
1
>>> lib["symbol"].data
column
2018-01-03 7
2018-01-04 8
"""
if date_range is None:
raise ArcticInvalidApiUsageException("date_range must be given but was None")
self._nvs.delete(symbol, date_range=date_range)
[docs]
def delete_snapshot(self, snapshot_name: str) -> None:
"""
Delete a named snapshot. This may take time if the given snapshot is the last reference to the underlying
symbol(s) as the underlying data will be removed as well.
Parameters
----------
snapshot_name
The snapshot name to delete.
Raises
------
Exception
If the named snapshot does not exist.
"""
return self._nvs.delete_snapshot(snapshot_name)
[docs]
def list_symbols(self, snapshot_name: Optional[str] = None) -> List[str]:
"""
Return the symbols in this library.
Parameters
----------
snapshot_name
Return the symbols available under the snapshot. If None then considers symbols that are live in the
library as of the current time.
Returns
-------
List[str]
Symbols in the library.
"""
return self._nvs.list_symbols(snapshot=snapshot_name)
[docs]
def has_symbol(self, symbol: str, as_of: Optional[AsOf] = None) -> bool:
"""
Whether this library contains the given symbol.
Parameters
----------
symbol
Symbol name for the item
as_of : AsOf, default=None
Return the data as it was as_of the point in time. See `read` for more documentation. If absent then
considers symbols that are live in the library as of the current time.
Returns
-------
bool
True if the symbol is in the library, False otherwise.
Examples
--------
>>> lib.write("symbol", pd.DataFrame())
>>> lib.has_symbol("symbol")
True
>>> lib.has_symbol("another_symbol")
False
The __contains__ operator also checks whether a symbol exists in this library as of now:
>>> "symbol" in lib
True
>>> "another_symbol" in lib
False
"""
return self._nvs.has_symbol(symbol, as_of=as_of)
[docs]
def list_snapshots(self) -> Dict[str, Any]:
"""
List the snapshots in the library.
Returns
-------
Dict[str, Any]
Snapshots in the library. Keys are snapshot names, values are metadata associated with that snapshot.
"""
return self._nvs.list_snapshots()
[docs]
def list_versions(
self,
symbol: Optional[str] = None,
snapshot: Optional[str] = None,
latest_only: bool = False,
skip_snapshots: bool = False,
) -> Dict[SymbolVersion, VersionInfo]:
"""
Get the versions in this library, filtered by the passed in parameters.
Parameters
----------
symbol
Symbol to return versions for. If None returns versions across all symbols in the library.
snapshot
Only return the versions contained in the named snapshot.
latest_only : bool, default=False
Only include the latest version for each returned symbol.
skip_snapshots : bool, default=False
Don't populate version list with snapshot information. Can improve performance significantly if there are
many snapshots.
Returns
-------
Dict[SymbolVersion, VersionInfo]
Dictionary describing the version for each symbol-version pair in the library. Since symbol version is a
(named) tuple you can index in to the dictionary simply as shown in the examples below.
Examples
--------
>>> df = pd.DataFrame()
>>> lib.write("symbol", df, metadata=10)
>>> lib.write("symbol", df, metadata=11, prune_previous_versions=False)
>>> lib.snapshot("snapshot")
>>> lib.write("symbol", df, metadata=12, prune_previous_versions=False)
>>> lib.delete("symbol", versions=(1, 2))
>>> versions = lib.list_versions("symbol")
>>> versions["symbol", 1].deleted
True
>>> versions["symbol", 1].snapshots
["my_snap"]
"""
versions = self._nvs.list_versions(
symbol=symbol,
snapshot=snapshot,
latest_only=latest_only,
iterate_on_failure=False,
skip_snapshots=skip_snapshots,
)
return {
SymbolVersion(v["symbol"], v["version"]): VersionInfo(v["date"], v["deleted"], v["snapshots"])
for v in versions
}
[docs]
def head(self, symbol: str, n: int = 5, as_of: Optional[AsOf] = None, columns: List[str] = None) -> VersionedItem:
"""
Read the first n rows of data for the named symbol. If n is negative, return all rows except the last n rows.
Parameters
----------
symbol
Symbol name.
n : int, default=5
Number of rows to select if non-negative, otherwise number of rows to exclude.
as_of : AsOf, default=None
See documentation on `read`.
columns
See documentation on `read`.
Returns
-------
VersionedItem object that contains a .data and .metadata element.
"""
return self._nvs.head(symbol=symbol, n=n, as_of=as_of, columns=columns)
[docs]
def tail(
self, symbol: str, n: int = 5, as_of: Optional[Union[int, str]] = None, columns: List[str] = None
) -> VersionedItem:
"""
Read the last n rows of data for the named symbol. If n is negative, return all rows except the first n rows.
Parameters
----------
symbol
Symbol name.
n : int, default=5
Number of rows to select if non-negative, otherwise number of rows to exclude.
as_of : AsOf, default=None
See documentation on `read`.
columns
See documentation on `read`.
Returns
-------
VersionedItem object that contains a .data and .metadata element.
"""
return self._nvs.tail(symbol=symbol, n=n, as_of=as_of, columns=columns)
[docs]
def get_description(self, symbol: str, as_of: Optional[AsOf] = None) -> SymbolDescription:
"""
Returns descriptive data for ``symbol``.
Parameters
----------
symbol
Symbol name.
as_of : AsOf, default=None
See documentation on `read`.
Returns
-------
SymbolDescription
Named tuple containing the descriptive data.
See Also
--------
SymbolDescription
For documentation on each field.
"""
info = self._nvs.get_info(symbol, as_of)
last_update_time = pd.to_datetime(info["last_update"], utc=True)
columns = tuple(NameWithDType(n, t) for n, t in zip(info["col_names"]["columns"], info["dtype"]))
index = NameWithDType(info["col_names"]["index"], info["col_names"]["index_dtype"])
date_range = tuple(
map(
lambda x: x.replace(tzinfo=datetime.timezone.utc) if not np.isnat(np.datetime64(x)) else x,
info["date_range"],
)
)
return SymbolDescription(
columns=columns,
index=index,
row_count=info["rows"],
last_update_time=last_update_time,
index_type=info["index_type"],
date_range=date_range,
)
[docs]
def get_description_batch(self, symbols: List[Union[str, ReadInfoRequest]]) -> List[SymbolDescription]:
"""
Returns descriptive data for a list of ``symbols``.
Parameters
----------
symbols : List[Union[str, ReadInfoRequest]]
List of symbols to read.
Params columns, date_range and query_builder from ReadInfoRequest are not used
Returns
-------
List[SymbolDescription]
A list of the descriptive data, whose i-th element corresponds to the i-th element of the ``symbols`` parameter.
See Also
--------
SymbolDescription
For documentation on each field.
"""
symbol_strings = []
as_ofs = []
def handle_read_request(s):
symbol_strings.append(s.symbol)
as_ofs.append(s.as_of)
def handle_symbol(s):
symbol_strings.append(s)
as_ofs.append(None)
for s in symbols:
if isinstance(s, str):
handle_symbol(s)
elif isinstance(s, ReadInfoRequest):
handle_read_request(s)
else:
raise ArcticInvalidApiUsageException(
f"Unsupported item in the symbols argument s=[{s}] type(s)=[{type(s)}]. Only [str] and"
" [ReadInfoRequest] are supported."
)
infos = self._nvs.batch_get_info(symbol_strings, as_ofs)
list_descriptions = []
for info in infos:
last_update_time = pd.to_datetime(info["last_update"], utc=True)
columns = tuple(NameWithDType(n, t) for n, t in zip(info["col_names"]["columns"], info["dtype"]))
index = NameWithDType(info["col_names"]["index"], info["col_names"]["index_dtype"])
date_range = tuple(
map(
lambda x: x.replace(tzinfo=datetime.timezone.utc) if not np.isnat(np.datetime64(x)) else x,
info["date_range"],
)
)
list_descriptions.append(
SymbolDescription(
columns=columns,
index=index,
row_count=info["rows"],
last_update_time=last_update_time,
index_type=info["index_type"],
date_range=date_range,
)
)
return list_descriptions
[docs]
def reload_symbol_list(self):
"""
Forces the symbol list cache to be reloaded.
This can take a long time on large libraries or certain S3 implementations, and once started, it cannot be
safely interrupted. If the call is interrupted somehow (exception/process killed), please call this again ASAP.
"""
self._nvs.version_store.reload_symbol_list()
[docs]
def is_symbol_fragmented(self, symbol: str, segment_size: Optional[int] = None) -> bool:
"""
Check whether the number of segments that would be reduced by compaction is more than or equal to the
value specified by the configuration option "SymbolDataCompact.SegmentCount" (defaults to 100).
Parameters
----------
symbol: `str`
Symbol name.
segment_size: `int`
Target for maximum no. of rows per segment, after compaction.
If parameter is not provided, library option for segments's maximum row size will be used
Notes
----------
Config map setting - SymbolDataCompact.SegmentCount will be replaced by a library setting
in the future. This API will allow overriding the setting as well.
Returns
-------
bool
"""
return self._nvs.is_symbol_fragmented(symbol, segment_size)
[docs]
def defragment_symbol_data(self, symbol: str, segment_size: Optional[int] = None) -> VersionedItem:
"""
Compacts fragmented segments by merging row-sliced segments (https://docs.arcticdb.io/technical/on_disk_storage/#data-layer).
This method calls `is_symbol_fragmented` to determine whether to proceed with the defragmentation operation.
CAUTION - Please note that a major restriction of this method at present is that any column slicing present on the data will be
removed in the new version created as a result of this method.
As a result, if the impacted symbol has more than 127 columns (default value), the performance of selecting individual columns of
the symbol (by using the `columns` parameter) may be negatively impacted in the defragmented version.
If your symbol has less than 127 columns this caveat does not apply.
For more information, please see `columns_per_segment` here:
https://docs.arcticdb.io/api/arcticdb/arcticdb.LibraryOptions
Parameters
----------
symbol: `str`
Symbol name.
segment_size: `int`
Target for maximum no. of rows per segment, after compaction.
If parameter is not provided, library option - "segment_row_size" will be used
Note that no. of rows per segment, after compaction, may exceed the target.
It is for achieving smallest no. of segment after compaction. Please refer to below example for further explantion.
Returns
-------
VersionedItem
Structure containing metadata and version number of the defragmented symbol in the store.
Raises
------
1002 ErrorCategory.INTERNAL:E_ASSERTION_FAILURE
If `is_symbol_fragmented` returns false.
2001 ErrorCategory.NORMALIZATION:E_UNIMPLEMENTED_INPUT_TYPE
If library option - "bucketize_dynamic" is ON
Examples
--------
>>> lib.write("symbol", pd.DataFrame({"A": [0]}, index=[pd.Timestamp(0)]))
>>> lib.append("symbol", pd.DataFrame({"A": [1, 2]}, index=[pd.Timestamp(1), pd.Timestamp(2)]))
>>> lib.append("symbol", pd.DataFrame({"A": [3]}, index=[pd.Timestamp(3)]))
>>> lib.read_index(sym)
start_index end_index version_id stream_id creation_ts content_hash index_type key_type start_col end_col start_row end_row
1970-01-01 00:00:00.000000000 1970-01-01 00:00:00.000000001 20 b'sym' 1678974096622685727 6872717287607530038 84 2 1 2 0 1
1970-01-01 00:00:00.000000001 1970-01-01 00:00:00.000000003 21 b'sym' 1678974096931527858 12345256156783683504 84 2 1 2 1 3
1970-01-01 00:00:00.000000003 1970-01-01 00:00:00.000000004 22 b'sym' 1678974096970045987 7952936283266921920 84 2 1 2 3 4
>>> lib.version_store.defragment_symbol_data("symbol", 2)
>>> lib.read_index(sym) # Returns two segments rather than three as a result of the defragmentation operation
start_index end_index version_id stream_id creation_ts content_hash index_type key_type start_col end_col start_row end_row
1970-01-01 00:00:00.000000000 1970-01-01 00:00:00.000000003 23 b'sym' 1678974097067271451 5576804837479525884 84 2 1 2 0 3
1970-01-01 00:00:00.000000003 1970-01-01 00:00:00.000000004 23 b'sym' 1678974097067427062 7952936283266921920 84 2 1 2 3 4
Notes
----------
Config map setting - SymbolDataCompact.SegmentCount will be replaced by a library setting
in the future. This API will allow overriding the setting as well.
"""
return self._nvs.defragment_symbol_data(symbol, segment_size)
@property
def name(self):
"""The name of this library."""
return self._nvs.name()