"""
Copyright 2023 Man Group Operations Limited
Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.
As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""
import copy
import datetime
from math import inf
import numpy as np
import pandas as pd
from abc import ABC, abstractmethod
from arcticdb.exceptions import ArcticNativeException, UserInputException
from arcticdb.supported_types import time_types as supported_time_types
from arcticdb_ext.version_store import ExecutionContextOptimisation as _Optimisation
from arcticdb_ext.version_store import ExecutionContext as _ExecutionContext
from arcticdb_ext.version_store import ExpressionName as _ExpressionName
from arcticdb_ext.version_store import ColumnName as _ColumnName
from arcticdb_ext.version_store import ValueName as _ValueName
from arcticdb_ext.version_store import ValueSetName as _ValueSetName
from arcticdb_ext.version_store import Value as _Value
from arcticdb_ext.version_store import ValueSet as _ValueSet
from arcticdb_ext.version_store import (
ValueBool,
ValueUint8,
ValueUint16,
ValueUint32,
ValueUint64,
ValueInt8,
ValueInt16,
ValueInt32,
ValueInt64,
ValueFloat32,
ValueFloat64,
)
from arcticdb_ext.version_store import ExpressionNode as _ExpressionNode
from arcticdb_ext.version_store import OperationType as _OperationType
from arcticdb_ext.version_store import ClauseBuilder as _ClauseBuilder
COLUMN = "COLUMN"
class ExpressionNode:
# Required so that comparisons like:
# np.int(0) < <ExpressionNode object>
# work as we want
__array_priority__ = 100
def __init__(self):
self.left = self.right = self.operator = None
self.name = None
@classmethod
def compose(cls, left, operator, right):
output = cls()
output.left = left
output.operator = operator
output.right = right
return output
@classmethod
def column_ref(cls, left):
return cls.compose(left, COLUMN, None)
def _apply(self, right, operator):
left = ExpressionNode.compose(self.left, self.operator, self.right)
self = ExpressionNode()
self.left = left
self.operator = operator
self.right = right
return self
def _rapply(self, left, operator):
right = ExpressionNode.compose(self.left, self.operator, self.right)
self = ExpressionNode()
self.right = right
self.operator = operator
self.left = left
return self
def __abs__(self):
return ExpressionNode.compose(self, _OperationType.ABS, None)
def __neg__(self):
return ExpressionNode.compose(self, _OperationType.NEG, None)
def __invert__(self):
return ExpressionNode.compose(self, _OperationType.NOT, None)
def __add__(self, right):
return self._apply(right, _OperationType.ADD)
def __sub__(self, right):
return self._apply(right, _OperationType.SUB)
def __mul__(self, right):
return self._apply(right, _OperationType.MUL)
def __truediv__(self, right):
return self._apply(right, _OperationType.DIV)
def __eq__(self, right):
if is_supported_sequence(right):
return self.isin(right)
else:
return self._apply(right, _OperationType.EQ)
def __ne__(self, right):
if is_supported_sequence(right):
return self.isnotin(right)
else:
return self._apply(right, _OperationType.NE)
def __lt__(self, right):
return self._apply(right, _OperationType.LT)
def __le__(self, right):
return self._apply(right, _OperationType.LE)
def __gt__(self, right):
return self._apply(right, _OperationType.GT)
def __ge__(self, right):
return self._apply(right, _OperationType.GE)
def __and__(self, right):
if right is True:
return self
elif right is False:
return False
else:
return self._apply(right, _OperationType.AND)
def __or__(self, right):
if right is True:
return True
elif right is False:
return self
else:
return self._apply(right, _OperationType.OR)
def __xor__(self, right):
if right is True:
return ~self
elif right is False:
return self
else:
return self._apply(right, _OperationType.XOR)
def __radd__(self, left):
return self._rapply(left, _OperationType.ADD)
def __rsub__(self, left):
return self._rapply(left, _OperationType.SUB)
def __rmul__(self, left):
return self._rapply(left, _OperationType.MUL)
def __rtruediv__(self, left):
return self._rapply(left, _OperationType.DIV)
def __rand__(self, left):
if left is True:
return self
elif left is False:
return False
else:
return self._rapply(left, _OperationType.AND)
def __ror__(self, left):
if left is True:
return True
elif left is False:
return self
else:
return self._rapply(left, _OperationType.OR)
def __rxor__(self, left):
if left is True:
return ~self
elif left is False:
return self
else:
return self._rapply(left, _OperationType.XOR)
def isin(self, *args):
value_list = value_list_from_args(*args)
return self._apply(value_list, _OperationType.ISIN)
def isnotin(self, *args):
value_list = value_list_from_args(*args)
return self._apply(value_list, _OperationType.ISNOTIN)
def __str__(self):
return self.get_name()
def get_name(self):
if not self.name:
if self.operator == COLUMN:
self.name = 'Column["{}"]'.format(self.left)
elif self.operator in [_OperationType.ABS, _OperationType.NEG, _OperationType.NOT]:
self.name = "{}({})".format(self.operator.name, self.left)
else:
if isinstance(self.left, ExpressionNode):
left = str(self.left)
else:
left = to_string(self.left)
if isinstance(self.right, ExpressionNode):
right = str(self.right)
else:
right = to_string(self.right)
self.name = "({} {} {})".format(left, self.operator.name, right)
return self.name
def is_supported_sequence(obj):
return isinstance(obj, (list, set, frozenset, tuple, np.ndarray))
def value_list_from_args(*args):
if len(args) == 1 and is_supported_sequence(args[0]):
collection = args[0]
else:
collection = args
array_list = []
value_set = set()
contains_integer = False
if len(collection) > 0:
for value in collection:
if value not in value_set:
value_set.add(value)
if isinstance(value, supported_time_types):
value = int(value.timestamp() * 1_000_000_000)
elem = np.array([value]) if isinstance(value, (int, np.integer)) else np.full(1, value, dtype=None)
array_list.append(elem)
contains_integer = contains_integer or isinstance(value, (int, np.integer))
value_list = np.concatenate(array_list)
if contains_integer and value_list.dtype == np.float64:
raise UserInputException("Invalid datatype conversion to double")
if value_list.dtype == np.float16:
value_list = value_list.astype(np.float32)
elif value_list.dtype.kind == "U":
value_list = value_list.tolist()
else:
# Return an empty list. This will call the string ctor for ValueSet, but also set a bool flag so that numeric
# types also behave as expected
value_list = []
return value_list
class PyClauseBase(ABC):
@abstractmethod
def to_cpp(self, clause_builder) -> None:
pass
class WhereClause(PyClauseBase):
def __init__(self, expr):
self.expr = expr
def __str__(self):
return "WhereClause: {}".format(str(self.expr))
def to_cpp(self, clause_builder):
clause_builder.add_FilterClause(visit_expression(self.expr))
class ProjectClause(PyClauseBase):
def __init__(self, name, expr):
self.name = name
self.expr = expr
def __str__(self):
return "ProjectClause:{} -> {}".format(str(self.expr), self.name)
def to_cpp(self, clause_builder):
clause_builder.add_ProjectClause(self.name, visit_expression(self.expr))
class Aggregation:
def __init__(self, source, operator):
self.source = source
self.operator = operator
def __str__(self):
return "{}({})".format(self.operator, self.source)
def to_cpp(self, clause_builder):
# TODO: Move to dictionary
if self.operator.lower() == "sum":
clause_builder.add_SumAggregationOperator(self.source, self.source)
elif self.operator.lower() == "mean":
clause_builder.add_MeanAggregationOperator(self.source, self.source)
elif self.operator.lower() == "max":
clause_builder.add_MaxAggregationOperator(self.source, self.source)
elif self.operator.lower() == "min":
clause_builder.add_MinAggregationOperator(self.source, self.source)
else:
raise ValueError("Aggregation operators are limited to 'sum', 'mean', 'max' and 'min'.")
class GroupByClause(PyClauseBase):
def __init__(self, key, query_builder):
self.key = key
self.query_builder = query_builder
self.aggregations = {}
def __str__(self):
return "GroupByClause: key={}, [{}]".format(
str(self.key), ", ".join(["{} <- {}".format(k, v) for k, v in self.aggregations.items()])
)
def agg(self, aggregations):
for key, value in aggregations.items():
self.aggregations[key] = Aggregation(key, value)
return self.query_builder
def to_cpp(self, clause_builder):
def _expression_root_only(col_name: str):
_ec = _ExecutionContext()
_ec.root_node_name = _ExpressionName(col_name)
return _ec
clause_builder.prepare_AggregationClause(_expression_root_only(self.key))
for agg in self.aggregations.values():
agg.to_cpp(clause_builder)
clause_builder.finalize_AggregationClause()
[docs]class QueryBuilder:
"""
Build a query to process read results with. Syntax is designed to be similar to Pandas:
>>> q = QueryBuilder()
>>> q = q[q["a"] < 5] (equivalent to q = q[q.a < 5] provided the column name is also a valid Python variable name)
>>> dataframe = lib.read(symbol, query_builder=q).data
QueryBuilder objects are stateful, and so should not be reused without reinitialising:
>>> q = QueryBuilder()
For Group By and Aggregation functionality please see the documentation for the `groupby`. For projection
functionality, see the documentation for the `apply` method.
Supported numeric operations when filtering:
* Binary comparisons: <, <=, >, >=, ==, !=
* Unary NOT: ~
* Binary arithmetic: +, -, *, /
* Unary arithmetic: -, abs
* Binary combinators: &, |, ^
* List membership: isin, isnotin (also accessible with == and !=)
isin/isnotin accept lists, sets, frozensets, 1D ndarrays, or *args unpacking. For example:
>>> l = [1, 2, 3]
>>> q.isin(l)
is equivalent to...
>>> q.isin(1, 2, 3)
Boolean columns can be filtered on directly:
>>> q = QueryBuilder()
>>> q = q[q["boolean_column"]]
and combined with other operations intuitively:
>>> q = QueryBuilder()
>>> q = q[(q["boolean_column_1"] & ~q["boolean_column_2"]) & (q["numeric_column"] > 0)]
Arbitrary combinations of these expressions is possible, for example:
>>> q = q[(((q["a"] * q["b"]) / 5) < (0.7 * q["c"])) & (q["b"] != 12)]
See tests/unit/arcticdb/version_store/test_filtering.py for more example uses.
Timestamp filtering:
pandas.Timestamp, datetime.datetime, pandas.Timedelta, and datetime.timedelta objects are supported.
Note that internally all of these types are converted to nanoseconds (since epoch in the Timestamp/datetime
cases). This means that nonsensical operations such as multiplying two times together are permitted (but not
encouraged).
Restrictions:
String equality/inequality (and isin/isnotin) is supported for printable ASCII characters only.
Although not prohibited, it is not recommended to use ==, !=, isin, or isnotin with floating point values.
Exceptions:
inf or -inf values are provided for comparison
Column involved in query is a Categorical
Symbol is pickled
Column involved in query is not present in symbol
Query involves comparing strings using <, <=, >, or >= operators
Query involves comparing a string to one or more numeric values, or vice versa
Query involves arithmetic with a column containing strings
"""
def __init__(self):
self.stages = []
self._optimisation = _Optimisation.SPEED
self._clause_builder = _ClauseBuilder()
[docs] def apply(self, name, expr):
"""
Apply enables new columns to be created using supported QueryBuilder numeric operations. See the documentation for the
QueryBuilder class for more information on supported expressions - any expression valid in a filter is valid when using
`apply`.
Parameters
----------
name: `str`
Name of the column to be created
expr:
Expression
Examples
--------
>>> df = pd.DataFrame(
{
"VWAP": np.arange(0, 10, dtype=np.float64),
"ASK": np.arange(10, 20, dtype=np.uint16),
"VOL_ACC": np.arange(20, 30, dtype=np.int32),
},
index=np.arange(10),
)
>>> lib.write("expression", df)
>>> q = QueryBuilder()
>>> q = q.apply("ADJUSTED", q["ASK"] * q["VOL_ACC"] + 7)
>>> lib.read("expression", query_builder=q).data
VOL_ACC ASK VWAP ADJUSTED
0 20 10 0.0 207
1 21 11 1.0 238
2 22 12 2.0 271
3 23 13 3.0 306
4 24 14 4.0 343
5 25 15 5.0 382
6 26 16 6.0 423
7 27 17 7.0 466
8 28 18 8.0 511
9 29 19 9.0 558
Returns
-------
QueryBuilder
Modified QueryBuilder object.
"""
self.stages.append(ProjectClause(name, expr))
return self
[docs] def groupby(self, expr: str):
"""
Group symbol by column name. GroupBy operations must be followed by an aggregation operator. Currently the following four aggregation
operators are supported:
* "mean" - compute the mean of the group
* "sum" - compute the sum of the group
* "min" - compute the min of the group
* "max" - compute the max of the group
For usage examples, see below.
Parameters
----------
expr: `str`
Name of the symbol to group on. Note that currently GroupBy only supports single-column groupings.
Examples
--------
Average (mean) over two groups:
>>> df = pd.DataFrame(
{
"grouping_column": ["group_1", "group_1", "group_1", "group_2", "group_2"],
"to_mean": [1.1, 1.4, 2.5, np.nan, 2.2],
},
index=np.arange(5),
)
>>> q = QueryBuilder()
>>> q = q.groupby("grouping_column").agg({"to_mean": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
to_mean
group_1 1.666667
group_2 NaN
Max over one group:
>>> df = pd.DataFrame(
{
"grouping_column": ["group_1", "group_1", "group_1"],
"to_max": [1, 5, 4],
},
index=np.arange(3),
)
>>> q = QueryBuilder()
>>> q = q.groupby("grouping_column").agg({"to_max": "max"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
to_max
group_1 5
Max and Mean:
>>> df = pd.DataFrame(
{
"grouping_column": ["group_1", "group_1", "group_1"],
"to_mean": [1.1, 1.4, 2.5],
"to_max": [1.1, 1.4, 2.5]
},
index=np.arange(3),
)
>>> q = QueryBuilder()
>>> q = q.groupby("grouping_column").agg({"to_max": "max", "to_mean": "mean"})
>>> lib.write("symbol", df)
>>> lib.read("symbol", query_builder=q).data
to_max to_mean
group_1 2.5 1.666667
Returns
-------
QueryBuilder
Modified QueryBuilder object.
"""
self.stages.append(GroupByClause(expr, self))
return self.stages[-1]
def __eq__(self, right):
return str(self) == str(right)
def __str__(self):
return " | ".join(str(e) for e in self.stages)
def __getitem__(self, item):
if isinstance(item, str):
return ExpressionNode.column_ref(item)
else:
# This handles the case where the filtering is on a single boolean column
# e.g. q = q[q["col"]]
if isinstance(item, ExpressionNode) and item.operator == COLUMN:
item = ExpressionNode.compose(item, _OperationType.IDENTITY, None)
self.stages.append(WhereClause(item))
return self
def __getattr__(self, key):
return self[key]
def __getstate__(self):
rv = vars(self).copy()
del rv["_clause_builder"]
return rv
def __setstate__(self, state):
vars(self).update(state)
self._clause_builder = _ClauseBuilder()
def __copy__(self):
cls = self.__class__
result = cls.__new__(cls)
result.__dict__.update(self.__dict__)
return result
def __deepcopy__(self, memo):
cls = self.__class__
result = cls.__new__(cls)
memo[id(self)] = result
for k, v in self.__dict__.items():
if k != "_clause_builder":
setattr(result, k, copy.deepcopy(v, memo))
result._clause_builder = _ClauseBuilder()
return result
# Might want to apply different optimisations to different clauses once projections/group-bys are implemented
[docs] def optimise_for_speed(self):
"""Process query as fast as possible (the default behaviour)"""
self._optimisation = _Optimisation.SPEED
[docs] def optimise_for_memory(self):
"""Reduce peak memory usage during the query, at the expense of some performance.
Optimisations applied:
* Memory used by strings that are present in segments read from storage, but are not required in the final dataframe that will be presented back to the user, is reclaimed earlier in the processing pipeline.
"""
self._optimisation = _Optimisation.MEMORY
def execution_contexts(self):
res = [visit_expression(stage.expr) for stage in self.stages]
for execution_context in res:
execution_context.optimisation = self._optimisation
return res
def finalize_clause_builder(self):
for py_clause in self.stages:
py_clause.to_cpp(self._clause_builder)
return self._clause_builder
CONSTRUCTOR_MAP = {
"u": {1: ValueUint8, 2: ValueUint16, 4: ValueUint32, 8: ValueUint64},
"i": {1: ValueInt8, 2: ValueInt16, 4: ValueInt32, 8: ValueInt64},
"f": {1: ValueFloat32, 2: ValueFloat32, 4: ValueFloat32, 8: ValueFloat64},
}
def create_value(value):
if value in [inf, -inf]:
raise ArcticNativeException("Infinite values not supported in queries")
if isinstance(value, np.floating):
f = CONSTRUCTOR_MAP.get(value.dtype.kind).get(value.dtype.itemsize)
elif isinstance(value, np.integer):
min_scalar_type = np.min_scalar_type(value)
f = CONSTRUCTOR_MAP.get(min_scalar_type.kind).get(min_scalar_type.itemsize)
elif isinstance(value, (pd.Timestamp, pd.Timedelta)):
# pd.Timestamp is in supported_time_types, but its timestamp() method can't provide ns precision
value = value.value
f = ValueInt64
elif isinstance(value, supported_time_types):
value = int(value.timestamp() * 1_000_000_000)
f = ValueInt64
elif isinstance(value, datetime.timedelta):
value = int(value.total_seconds() * 1_000_000_000)
f = ValueInt64
elif isinstance(value, bool):
f = ValueBool
else:
f = _Value
return f(value)
def to_string(leaf):
if isinstance(leaf, (np.ndarray, list)):
# Truncate value set keys to first 100 characters
key = str(leaf)[:100]
else:
if isinstance(leaf, str):
key = "Str({})".format(leaf)
elif isinstance(leaf, bool):
key = "Bool({})".format(leaf)
else:
key = "Num({})".format(leaf)
return key
def visit_expression(expr):
def _visit(node):
def _visit_child(node):
def _handle_leaf(node):
key = to_string(node)
if isinstance(node, (np.ndarray, list)):
# There is a possibility that two distinct value sets have the same repr, eiter if the first 100
# chars match, or if the repr is truncated like '[ 0 1 2 ... 9997 9998 9999]'
# Append -vX to handle this case, while keeping ValueSet keys short and readable in most cases
if key not in valueset_keys:
valueset_keys[key] = 0
else:
valueset_keys[key] += 1
key = key + "-v" + str(valueset_keys[key])
execution_context.add_value_set(key, _ValueSet(node))
return _ValueSetName(key)
else:
execution_context.add_value(key, create_value(node))
return _ValueName(key)
if isinstance(node, ExpressionNode):
if node.operator == COLUMN:
execution_context.add_column(node.left)
return _ColumnName(node.left)
else:
_visit(node)
return _ExpressionName(node.get_name())
else:
return _handle_leaf(node)
if isinstance(node, bool):
raise ArcticNativeException("Query is trivially {}".format(node))
left = _visit_child(node.left)
if node.right is not None:
right = _visit_child(node.right)
expression_node = _ExpressionNode(left, right, node.operator)
else:
expression_node = _ExpressionNode(left, node.operator)
execution_context.add_expression_node(node.get_name(), expression_node)
execution_context = _ExecutionContext()
valueset_keys = dict()
_visit(expr)
execution_context.root_node_name = _ExpressionName(expr.get_name())
return execution_context