ArcticDB_demo_lazydataframe
View in Github | Open in Google ColabArcticDB LazyDataFrame demo
In this demo, we will explore the DataFrame processing options available in ArcticDB using the LazyDataFrame class. We will cover various possibilities of this API, including:
- Filtering
- Projections
- Groupbys and Aggregations
- Combinations of the above features
Why perform the processing in ArcticDB?
- Performance boost via efficient C++ implementation that uses multi-threading
- Efficient data access - only reads the data needed
- For very large data sets some queries are possible that would not fit into memory
Note that all of the operations described here are also available using the legacy QueryBuilder
class, but we think this API is more intuitive!
Demo setup¶
Necessary packages installation
!pip install arcticdb
Necessary libraries imports
import os
import numpy as np
import pandas as pd
import random
import arcticdb as adb
from arcticdb.util.test import random_strings_of_length
For this demo we will configure the LMDB file based backend. ArcticDB achieves its high performance and scale when configured with an object store backend (e.g. S3).
arctic = adb.Arctic("lmdb://arcticdb_demo")
You can have an unlimited number of libraries, but we will just create one to start with.
if 'sample' not in arctic.list_libraries():
# library does not already exist
arctic.create_library('sample')
lib = arctic.get_library('sample')
Run the cell to set up preliminary variables. 100,000 unique strings is a pathological case for us, as with the default row-slicing policy there are 100,000 rows per data segment, and so each unique strings will appear around once per data segment in this column.
ten_grouping_values = random_strings_of_length(10, 10, True)
one_hundred_thousand_grouping_values = random_strings_of_length(100_000, 10, True)
rng = np.random.RandomState()
sym_10M = "demo_10M"
sym_100M = "demo_100M"
sym_1B = "demo_1B"
Choose which symbol you want to work with
- sym_10M: symbol with 10 million rows
- sym_100M: symbol with 100 million rows
- sym_1B: symbol with 1 billion rows
assign the symbol you want to work with to the sym variable
- example: sym = sym_10M
sym = sym_10M
Run this cell to set up the DataFrame according to the symbol name
if sym==sym_10M:
num_rows = 10_000_000
elif sym==sym_100M:
num_rows = 100_000_000
elif sym==sym_1B:
num_rows = 1_000_000_000
input_df = pd.DataFrame(
{
"grouping_column_10": list(random.choices(ten_grouping_values, k=num_rows)),
"grouping_column_100_000": list(random.choices(one_hundred_thousand_grouping_values, k=num_rows)),
"numeric_column": rng.rand((num_rows))
}
)
Demo Start¶
lib.write(sym, input_df)
Show how the data has been sliced and written to disk.
lib._nvs.read_index(sym)
Show the first 100 rows of data as a sample.
lib.head(sym, n=100).data
Reading¶
Read the symbol without any filtering.
%%time
lib.read(sym)
Most of the time is spent allocating Python strings in the column with 100,000 unique strings, so omitting this column is much faster.
%%time
lib.read(sym, columns=["grouping_column_10", "numeric_column"])
Filtering¶
Note that all of the values in the numeric column are between 0 and 1. This query therefore does not filter out any data. This demonstrates that doing a full table scan does not significantly impact the performance. Also note that the read call is not practically instant, as no data is read until collect is called on the LazyDataFrame.
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df = lazy_df[lazy_df["numeric_column"] < 2.0]
%%time
lazy_df.collect()
Now we are filtering down to approximately 10% of the rows in the symbol. This is faster than reading, as there are now fewer Python strings to allocate.
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df = lazy_df[lazy_df["numeric_column"] < 0.1]
df = lazy_df.collect().data
df
Projections¶
Creating a new column as a funtion of existing columns and constants is approximately the same speed as a filter that doesn't reduce the amount of data displayed.
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df["new_column"] = lazy_df["numeric_column"] * 2.0
df = lazy_df.collect().data
df
Equivalently, use the apply method to achieve the same results.
lazy_df = lib.read(sym, lazy=True)
lazy_df.apply("new_column", lazy_df["numeric_column"] * 2.0)
lazy_df.collect().data
If using apply before the LazyDataFrame
object has been created, the col
function can be used as placeholders for columns names.
lazy_df = lib.read(sym, lazy=True).apply("new_column", adb.col("numeric_column") * 2.0)
lazy_df.collect().data
Groupbys and Aggregations¶
Grouping is again faster than just reading due to the reduced number of Python string allocations, even with the extra computation performed.
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df.groupby("grouping_column_10").agg({"numeric_column": "mean"})
df = lazy_df.collect().data
df
Even grouping on a pathologically large number of unique values does not significantly reduce the performance.
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df.groupby("grouping_column_100_000").agg({"numeric_column": "mean"})
df = lazy_df.collect().data
df
Combinations¶
These operations can be arbitrarily combined in a seqential pipeline.
%%time
lazy_df = lib.read(sym, lazy=True)
lazy_df = lazy_df[lazy_df["numeric_column"] < 0.1].apply("new_column", lazy_df["numeric_column"] * 2.0).groupby("grouping_column_10").agg({"numeric_column": "mean", "new_column": "max"})
df = lazy_df.collect().data
df
Batch Operations¶
# Setup two symbols
batch_sym_1 = f'{sym}_1'
batch_sym_2 = f'{sym}_2'
syms = [batch_sym_1, batch_sym_2]
lib.write(batch_sym_1, input_df)
lib.write(batch_sym_2, input_df)
read_batch
also has a lazy
argument, which returns a LazyDataFrameCollection
.
lazy_dfs = lib.read_batch(syms, lazy=True)
lazy_dfs
The same processing operations can be applied to all of the symbols being read in the batch. Note in the cell output that the pipe |
is outside the list of LazyDataFrame
s, so the WHERE
clause is applied to all of the symbols.
lazy_dfs = lazy_dfs[lazy_dfs["numeric_column"] < 0.1]
lazy_dfs
Calling collect()
on a LazyDataFrameCollection
uses read_batch
under the hood, and so is generally more performant than serialised read calls.
dfs = lazy_dfs.collect()
dfs
dfs[0].data.head()
dfs[1].data.head()
Separate processing operations can be applied to the individual symbols in the batch if desired.
lazy_dfs = lib.read_batch(syms, lazy=True)
lazy_dfs = lazy_dfs.split()
lazy_dfs
Note in the cell output that the pipes |
are now inside the list of LazyDataFrame
s, so the PROJECT
clauses are applied to individual symbols.
lazy_dfs[0].apply("new_column_1", 2 * adb.col("numeric_column"))
lazy_dfs[1].apply("new_column_1", 4 * adb.col("numeric_column"))
lazy_dfs = adb.LazyDataFrameCollection(lazy_dfs)
lazy_dfs
dfs = lazy_dfs.collect()
dfs
dfs[0].data
dfs[1].data
If desired, these two modes of operation can be combined in an intuitive manner.
lazy_dfs = lib.read_batch(syms, lazy=True)
lazy_dfs = lazy_dfs[lazy_dfs["numeric_column"] < 0.1]
lazy_dfs = lazy_dfs.split()
lazy_dfs[0].apply("new_column_1", 2 * adb.col("numeric_column"))
lazy_dfs[1].apply("new_column_1", 4 * adb.col("numeric_column"))
lazy_dfs = adb.LazyDataFrameCollection(lazy_dfs)
lazy_dfs = lazy_dfs[lazy_dfs["new_column_1"] < 0.1]
lazy_dfs
dfs = lazy_dfs.collect()
dfs[0].data
dfs[1].data