Parallel Writes
As mentioned, ArcticDB fundamentally does not support concurrent writers to a single symbol - unless the data is concurrently written as staged data!
Staged data is not available to read, and requires a process to finalize all staged data prior to it being available for reading. Each unit of staged data must not overlap with any other unit of staged data -
as a result staged data must be timeseries indexed. The below code uses Spark to concurrently write to a single symbol in parallel, before finalizing the data:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41 | import pyspark
import arcticdb as adb
# This example assumes the below variables (host, bucket, access, secret) are validly set
ac = adb.Arctic(f"s3://{HOST}:{BUCKET}?access={ACCESS}&secret={SECRET})
def _load(work):
# This method is run in parallel via Spark.
host, bucket, access, secret, symbol, library, file_path = work
ac = adb.Arctic(f"s3://{host}:{bucket}?access={access}&secret={secret}")
library = ac[library]
df = pd.read_csv(file_path)
df = df.set_index(df.columns[0])
df.index = df.index.to_datetime()
# When staged, the written data is not available to read until finalized.
library.write(symbol, df, staged=True)
symbol = "my_data"
library = "my_library"
conf = SparkConf().setAppName('appName').setMaster('local')
sc = SparkContext(conf=conf)
# Assumes there are a set of CSV files in the current directory to load from
data = [(host, bucket, access, secret, symbol, library, f) for f in glob.glob("*.csv")]
dist_data = sc.parallelize(data)
if library not in ac.list_libraries():
ac.create_library(library)
library = ac[library]
ret = dist_data.map(_load)
ret.collect()
library.finalize_staged_data(symbol)
data = library.read(symbol)
|