ArcticDB_merge
View in Github | Open in Google ColabArcticDB Merge¶
Motivation¶
The merge API offers a straightforward way for users to modify or insert specific rows in their data. Data providers often issue corrections for particular entries, and the merge functionality can efficiently apply these updates.
Example¶
Let's start with a quick example before exploring the details.
In this example, we have an ArcticDB library called prices that stores price data, and a symbol named daily_prices for daily prices. At some point, the data provider issued corrections for 2023-01-05, 2023-01-07, and 2023-01-19. We want to apply these corrections, modifying only the affected rows.
Begin by importing the necessary modules.
import pandas as pd
import arcticdb
import numpy as np
from IPython.display import display
from arcticdb.version_store.library import MergeStrategy, MergeAction
Next, create an ArcticDB instance and a library.
ac = arcticdb.Arctic("lmdb://merge_example")
lib = ac.get_library("prices", create_if_missing=True)
Create an example prices DataFrame and store it in ArcticDB.
# Create example data
daily_prices = pd.DataFrame(
data={
"High": [102.5, 103.6, 101.7, 103.2, 104.8, 106.0, 108.9, 107.6, 109.3, 108.7,
111.1, 113.5, 113.2, 115.0, 112.3, 115.9, 117.3, 118.6, 119.8, 119.1],
"Low": [100.5, 101.8, 99.7, 100.9, 101.7, 103.4, 106.2, 104.8, 107.5, 106.3,
109.1, 110.8, 110.2, 111.7, 109.6, 112.7, 114.0, 115.4, 116.5, 116.2],
"Volume": [1200, 1500, 900, 1400, 1600, 1900, 2500, 2300, 1800, 2100,
2000, 2700, 2600, 3000, 2200, 3100, 2800, 3200, 3500, 3400],
},
index=pd.date_range(start="2023-01-01", periods=20, freq="D")
)
daily_prices
| High | Low | Volume | |
|---|---|---|---|
| 2023-01-01 | 102.5 | 100.5 | 1200 |
| 2023-01-02 | 103.6 | 101.8 | 1500 |
| 2023-01-03 | 101.7 | 99.7 | 900 |
| 2023-01-04 | 103.2 | 100.9 | 1400 |
| 2023-01-05 | 104.8 | 101.7 | 1600 |
| 2023-01-06 | 106.0 | 103.4 | 1900 |
| 2023-01-07 | 108.9 | 106.2 | 2500 |
| 2023-01-08 | 107.6 | 104.8 | 2300 |
| 2023-01-09 | 109.3 | 107.5 | 1800 |
| 2023-01-10 | 108.7 | 106.3 | 2100 |
| 2023-01-11 | 111.1 | 109.1 | 2000 |
| 2023-01-12 | 113.5 | 110.8 | 2700 |
| 2023-01-13 | 113.2 | 110.2 | 2600 |
| 2023-01-14 | 115.0 | 111.7 | 3000 |
| 2023-01-15 | 112.3 | 109.6 | 2200 |
| 2023-01-16 | 115.9 | 112.7 | 3100 |
| 2023-01-17 | 117.3 | 114.0 | 2800 |
| 2023-01-18 | 118.6 | 115.4 | 3200 |
| 2023-01-19 | 119.8 | 116.5 | 3500 |
| 2023-01-20 | 119.1 | 116.2 | 3400 |
lib.write("daily_prices", daily_prices)
VersionedItem(symbol='daily_prices', library='prices', data=n/a, version=0, metadata=None, host='LMDB(path=/home/vasil/Documents/source/ArcticDB/docs/mkdocs/docs/notebooks/merge_example)', timestamp=1769761430529539749)
Create the correction DataFrame.
daily_prices_correction = pd.DataFrame(
data={
"High": [3000.0, 4000.0, 5000.0],
"Low": [1000.0, 2000.0, 3000.0],
"Volume":[10000, 20000, 30000]
},
index=pd.DatetimeIndex([pd.Timestamp("2023-01-05"), pd.Timestamp("2023-01-07"), pd.Timestamp("2023-01-19")])
)
daily_prices_correction
| High | Low | Volume | |
|---|---|---|---|
| 2023-01-05 | 3000.0 | 1000.0 | 10000 |
| 2023-01-07 | 4000.0 | 2000.0 | 20000 |
| 2023-01-19 | 5000.0 | 3000.0 | 30000 |
Perform the merge update using the correction data. Ensure that both daily_prices_correction and the stored data are sorted. By default, two rows in DataFrames with a datetime index are considered matching if their indexes are equal.
lib.merge_experimental("daily_prices", daily_prices_correction, strategy=MergeStrategy(matched="update", not_matched_by_target="do_nothing"))
print("Merged result")
display(lib.read("daily_prices").data)
print("Diff between merged and original data")
daily_prices.compare(lib.read("daily_prices").data, keep_equal=True)
Merged result
| High | Low | Volume | |
|---|---|---|---|
| 2023-01-01 | 102.5 | 100.5 | 1200 |
| 2023-01-02 | 103.6 | 101.8 | 1500 |
| 2023-01-03 | 101.7 | 99.7 | 900 |
| 2023-01-04 | 103.2 | 100.9 | 1400 |
| 2023-01-05 | 3000.0 | 1000.0 | 10000 |
| 2023-01-06 | 106.0 | 103.4 | 1900 |
| 2023-01-07 | 4000.0 | 2000.0 | 20000 |
| 2023-01-08 | 107.6 | 104.8 | 2300 |
| 2023-01-09 | 109.3 | 107.5 | 1800 |
| 2023-01-10 | 108.7 | 106.3 | 2100 |
| 2023-01-11 | 111.1 | 109.1 | 2000 |
| 2023-01-12 | 113.5 | 110.8 | 2700 |
| 2023-01-13 | 113.2 | 110.2 | 2600 |
| 2023-01-14 | 115.0 | 111.7 | 3000 |
| 2023-01-15 | 112.3 | 109.6 | 2200 |
| 2023-01-16 | 115.9 | 112.7 | 3100 |
| 2023-01-17 | 117.3 | 114.0 | 2800 |
| 2023-01-18 | 118.6 | 115.4 | 3200 |
| 2023-01-19 | 5000.0 | 3000.0 | 30000 |
| 2023-01-20 | 119.1 | 116.2 | 3400 |
Diff between merged and original data
| High | Low | Volume | ||||
|---|---|---|---|---|---|---|
| self | other | self | other | self | other | |
| 2023-01-05 | 104.8 | 3000.0 | 101.7 | 1000.0 | 1600 | 10000 |
| 2023-01-07 | 108.9 | 4000.0 | 106.2 | 2000.0 | 2500 | 20000 |
| 2023-01-19 | 119.8 | 5000.0 | 116.5 | 3000.0 | 3500 | 30000 |
Merge vs. Update¶
ArcticDB also provides an update method, which requires sorted input. The key difference is that update overwrites all data between the start and end of the input, potentially removing any rows that fall within those bounds and are not in the input.
lib.write("prices_update_example", daily_prices)
lib.update("prices_update_example", daily_prices_correction)
lib.read("prices_update_example").data
| High | Low | Volume | |
|---|---|---|---|
| 2023-01-01 | 102.5 | 100.5 | 1200 |
| 2023-01-02 | 103.6 | 101.8 | 1500 |
| 2023-01-03 | 101.7 | 99.7 | 900 |
| 2023-01-04 | 103.2 | 100.9 | 1400 |
| 2023-01-05 | 3000.0 | 1000.0 | 10000 |
| 2023-01-07 | 4000.0 | 2000.0 | 20000 |
| 2023-01-19 | 5000.0 | 3000.0 | 30000 |
| 2023-01-20 | 119.1 | 116.2 | 3400 |
In the example above, the input to update included only rows for 2023-01-05, 2023-01-07, and 2023-01-19. As a result, any data between 2023-01-05 and 2023-01-19 that was not present in the update input is omitted from the final result.
Merge: Semantics and Behavior¶
In this context, the data stored in ArcticDB is referred to as the target, while the input to merge is called the source.
Merge works by performing a join between target and source on a subset of columns, and updating target based on the strategy parameter. If target is a time series, the index will always be included among the join columns. This is done to ensure that ordered date-time indexes stay ordered after performing a merge. Otherwise the following would be possible
lib.write("sym", pd.DataFrame({"a": [1, 2, 3]}, index=pd.DatetimeIndex([pd.Timestamp(1), pd.Timestamp(2), pd.Timestamp(3)])))
lib.merge_experimental("sym", pd.DataFrame({"a": [2]}, index=pd.DatetimeIndex([pd.Timestamp(10)])), on=["a"])
print(lib.read("sym").data)
a
1970-01-01 00:00:00.000000001 1
1970-01-01 00:00:00.000000010 2
1970-01-01 00:00:00.000000003 3
Strategies¶
The strategy is a named tuple that defines how target will be modified. Its members can be either case-insensitive strings (update, insert, or do_nothing) or members of the MergeAction enum.
matchedspecifies what to do when a row insourcematches a row intarget. Acceptable values areupdateanddo_nothing.not_matched_by_targetspecifies what to do when a row insourcedoes not match any row intarget. Acceptable values areinsertanddo_nothing.
Acceptable combinations¶
matched="update",not_matched_by_target="do_nothing"matched="update",not_matched_by_target="insert"matched="do_nothing",not_matched_by_target="insert"
Unacceptable combinations¶
matched="do_nothing",not_matched_by_target="do_nothing"- nothing is going to happenmatched="do_nothing"|"insert"|"update",not_matched_by_target="update"- cannot update a row that is not existing in the targetmatched="insert",not_matched_by_target="do_nothing"|"insert"|"update"- while technically possible it doesn't make sense to insert duplicates
Examples¶
MergeStrategy(matched="update", not_matched_by_target="do_nothing")updates only the rows intargetthat match the selected columns. Rows that do not match remain unchanged, and new rows are not inserted. If a row insourcematches multiple rows intarget, all matching rows intargetwill be updated.
data_with_duplicates = pd.DataFrame(
{"Bid": [100, 101, 102], "Ask": [101.1, 101, 102.5]},
index=pd.DatetimeIndex([pd.Timestamp("2025-01-01 08:00:00"), pd.Timestamp("2025-01-01 08:00:00"), pd.Timestamp("2025-01-01 09:00:00")])
)
lib.write("merge_update_with_duplicates", data_with_duplicates)
print("Original data")
display(lib.read("merge_update_with_duplicates").data)
lib.merge_experimental(
"merge_update_with_duplicates",
pd.DataFrame(
{"Bid": [105, 102], "Ask": [105.3, 102]},
index=pd.DatetimeIndex([pd.Timestamp("2025-01-01 08:00:00"), pd.Timestamp("2025-02-01 08:00:00")])
),
MergeStrategy(matched="update", not_matched_by_target="do_nothing")
)
print("Data after merge")
display(lib.read("merge_update_with_duplicates").data)
Original data
| Bid | Ask | |
|---|---|---|
| 2025-01-01 08:00:00 | 100 | 101.1 |
| 2025-01-01 08:00:00 | 101 | 101.0 |
| 2025-01-01 09:00:00 | 102 | 102.5 |
Data after merge
| Bid | Ask | |
|---|---|---|
| 2025-01-01 08:00:00 | 105 | 105.3 |
| 2025-01-01 08:00:00 | 105 | 105.3 |
| 2025-01-01 09:00:00 | 102 | 102.5 |
In the example above, there is a row in source with the index value 2025-01-01 08:00:00 that matches two rows in target. Both matching rows in target are updated. A row in source that does not match any row in target is not inserted. Rows in target that do not match any row in source remain unchanged.
The elements of MergeStrategy can also be values of the MergeAction enum.
data_with_duplicates = pd.DataFrame(
{"Bid": [100, 101, 102], "Ask": [101.1, 101, 102.5]},
index=pd.DatetimeIndex([pd.Timestamp("2025-01-01 08:00:00"), pd.Timestamp("2025-01-01 08:00:00"), pd.Timestamp("2025-01-01 09:00:00")])
)
lib.write("merge_update_with_duplicates", data_with_duplicates)
print("Original data")
display(lib.read("merge_update_with_duplicates").data)
lib.merge_experimental(
"merge_update_with_duplicates",
pd.DataFrame(
{"Bid": [105, 102], "Ask": [105.3, 102]},
index=pd.DatetimeIndex([pd.Timestamp("2025-01-01 08:00:00"), pd.Timestamp("2025-02-01 08:00:00")])
),
MergeStrategy(MergeAction.UPDATE, not_matched_by_target=MergeAction.DO_NOTHING)
)
print("Data after merge")
display(lib.read("merge_update_with_duplicates").data)
Original data
| Bid | Ask | |
|---|---|---|
| 2025-01-01 08:00:00 | 100 | 101.1 |
| 2025-01-01 08:00:00 | 101 | 101.0 |
| 2025-01-01 09:00:00 | 102 | 102.5 |
Data after merge
| Bid | Ask | |
|---|---|---|
| 2025-01-01 08:00:00 | 105 | 105.3 |
| 2025-01-01 08:00:00 | 105 | 105.3 |
| 2025-01-01 09:00:00 | 102 | 102.5 |
MergeStrategy(matched="do_nothing", not_matched_by_target="insert")inserts rows fromsourcethat do not match the selected columns.MergeStrategy(matched="update", not_matched_by_target="insert")updatestargetby modifying matching rows and inserting non-matching rows fromsource.
When the strategy involves updating on match, a row in target must not be matched by more than one row in source, as this would create ambiguity about which values to use.
Limitations¶
Current implementation has a few limitations
- Both
targetandsourcemust have a sortedpd.DatetimeIndex. - The library must use a static schema, and both
sourceandtargetmust share the same schema. - The only supported strategy is
MergeStrategy(matched="update", not_matched_by_target="do_nothing"). - Matching on columns other than the index is not yet implemented.