Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Chunking api + tests #1853

Open
wants to merge 6 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion environment_unix.yml
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ dependencies:
# Build dependencies for tests
- libarrow
# Python dependences
- python =3.10 # Python 3.11 requires protobuf>=4
- python=3.10 # Python 3.11 requires protobuf>=4
- packaging
- numpy
- pandas
Expand Down
193 changes: 192 additions & 1 deletion python/arcticdb/version_store/library.py
Original file line number Diff line number Diff line change
Expand Up @@ -2205,4 +2205,195 @@ def defragment_symbol_data(
@property
def name(self):
"""The name of this library."""
return self._nvs.name()
return self._nvs.name()

def _get_version_rows(self,
symbol: str,
as_of=None
) -> Tuple[int, int]:
""" Returns a version and a row_count for a symbol. Used in the chunking api """
version_query = self._nvs._get_version_query(as_of)
dit = self._nvs.version_store.read_descriptor(symbol, version_query)
return dit.version, dit.timeseries_descriptor.total_rows

@staticmethod
def _row_range_to_positive_bounds(row_range: Optional[Tuple[Optional[int], Optional[int]]],
row_count: int) -> Optional[Tuple[int, int]]:
""" Transforms a row_range by replacing None and negative bounds with positive bounds to make downstream processing simpler """
if row_range is None:
return 0, row_count
row_range_start = row_range[0] if row_range[0] is not None else 0
row_range_end = row_range[1] if row_range[1] is not None else row_count
if row_range_start < 0:
row_range_start += row_count
if row_range_end < 0:
row_range_end += row_count
return row_range_start, row_range_end

def _read_requests_from_row_segments(self,
symbol: str,
as_of: Optional[AsOf] = None,
date_range: Optional[Tuple[Optional[Timestamp], Optional[Timestamp]]] = None,
row_range: Optional[Tuple[int, int]] = None,
columns: Optional[List[str]] = None,
query_builder: Optional[QueryBuilder] = None
) -> List[ReadRequest]:
""" returns a list of RowRequest one for each row segment of the symbol """
if date_range is not None:
raise ValueError("date_range is not yet supported for chunking")
version, row_count = self._get_version_rows(symbol, as_of)
row_range_start, row_range_end = self._row_range_to_positive_bounds(row_range, row_count)
idx = self._nvs.read_index(symbol, as_of=version)
start_row_col = idx.columns.get_loc('start_row')
end_row_col = idx.columns.get_loc('end_row')
start_col_col = idx.columns.get_loc('start_col')
read_requests = []
for i in range(len(idx)):
start_col = idx.iloc[i, start_col_col]
start_row = int(idx.iloc[i, start_row_col])
end_row = int(idx.iloc[i, end_row_col])
# skip the segment if not the first column or completely outside the row range
if start_col > 1 or end_row < row_range_start or start_row >= row_range_end:
continue
read_requests.append(ReadRequest(symbol, as_of=version, columns=columns,
row_range=(max(start_row, row_range_start), min(end_row, row_range_end)),
query_builder=query_builder
)
)
return read_requests

def _read_requests_from_chunks(self,
symbol: str,
as_of: Optional[AsOf] = None,
date_range: Optional[Tuple[Optional[Timestamp], Optional[Timestamp]]] = None,
row_range: Optional[Tuple[int, int]] = None,
columns: Optional[List[str]] = None,
query_builder: Optional[QueryBuilder] = None,
num_chunks: int = None,
rows_per_chunk: int = None
) -> List[ReadRequest]:
""" returns a list of RowRequest with chunk size specified by either `num_chunks` or `rows_per_chunk` """
if date_range is not None:
raise ValueError("date_range is not yet supported for chunking")
version, row_count = self._get_version_rows(symbol, as_of)
row_range_start, row_range_end = self._row_range_to_positive_bounds(row_range, row_count)
row_count_adj = row_range_end - row_range_start
if num_chunks is not None:
if rows_per_chunk is not None:
raise ValueError(f"Only one of the arguments num_chunks ({num_chunks})"
f" and rows_per_chunk ({rows_per_chunk}) can be set")
# make the chunks as balanced as possible (differing in rows by 1 max)
base_rows = row_count_adj // num_chunks
extra_rows = row_count_adj % num_chunks
elif rows_per_chunk is not None:
# make the chunks exactly as requested, with the last block picking up the remaining rows
exact_chunks = 0 if (row_count_adj % rows_per_chunk)==0 else 1
num_chunks = row_count_adj // rows_per_chunk + exact_chunks
base_rows = rows_per_chunk
extra_rows = 0
else:
raise ValueError(f"One of the arguments num_chunks or rows_per_chunk must be set")
#print(num_chunks, base_rows, extra_rows, row_range_start, row_range_end, row_count_adj)
start_row = row_range_start
read_requests = []
for i in range(num_chunks):
end_row = min(start_row + base_rows + (1 if i<extra_rows else 0), row_range_end)
read_requests.append(ReadRequest(symbol, as_of=version, columns=columns,
row_range=(start_row, end_row),
query_builder=query_builder
)
)
start_row = end_row
return read_requests

def iter_read_chunks(self,
symbol: str,
as_of: Optional[AsOf] = None,
date_range: Optional[Tuple[Optional[Timestamp], Optional[Timestamp]]] = None,
row_range: Optional[Tuple[int, int]] = None,
columns: Optional[List[str]] = None,
query_builder: Optional[QueryBuilder] = None,
lazy: bool = False,
num_chunks: int = None,
rows_per_chunk: int = None
):
"""
Iterates through the symbol in row chunks. When run in a loop, each chunk is returned as a separate dataframe.

If `num_chunks` or `rows_per_chunk` are specified, they will be used to set the chunk size. If neither are
specified, the chunks will correspond to the row segments in storage. Using the segments makes efficient use
of the storage, by reducing the number of objects to read.

Reading in chunks can be used to control memory usage for large data sets or to process chunks using a compute
cluster.

Parameters
----------
symbol : str
Symbol name.

as_of : AsOf, default=None
Return the data as it was as of the point in time. ``None`` means that the latest version should be read.
See `read()` for more detail.

date_range: Tuple[Optional[Timestamp], Optional[Timestamp]], default=None
Not yet implemented for chunks.
DateRange to restrict chunks to. See `read()` for more detail.

row_range: `Optional[Tuple[int, int]]`, default=None
Row range to restrict chunks to. See `read()` for more detail.

columns: List[str], default=None
Determines which columns to return data for. See `read()` for more detail.

query_builder: Optional[QueryBuilder], default=None
A QueryBuilder object to apply to each chunk read. See `read()` for more detail on QueryBuilder.
Note that in some cases this will lead to different results from applying the same query_builder to
the whole data set for the symbol.

lazy: bool, default=False
If True, return lazy dataframes rather than data. See `read()` for more detail.

num_chunks: int
The number of chunks to divide the data into. Do not use with `rows_per_chunk`.

rows_per_chunk: int
The number of rows in each chunk. Do not use with `num_chunks`.

Returns
-------
An iterator for which each iteration will yield the dataframe from a chunk.

Examples
--------

>>> df = pd.DataFrame({'row_num': np.arange(11)}, index=pd.date_range('20240101', periods=11))
>>> lib.write('chunk_test', df)
>>> for c in lib.iter_read_chunks('chunk_test', rows_per_chunk=5):
>>> print(c)
row_num
2024-01-01 0
2024-01-02 1
2024-01-03 2
2024-01-04 3
2024-01-05 4
row_num
2024-01-06 5
2024-01-07 6
2024-01-08 7
2024-01-09 8
2024-01-10 9
row_num
2024-01-11 10

"""
if num_chunks is None and rows_per_chunk is None:
read_requests = self._read_requests_from_row_segments(symbol, as_of, date_range, row_range, columns,
query_builder)
else:
read_requests = self._read_requests_from_chunks(symbol, as_of, date_range, row_range, columns,
query_builder, num_chunks, rows_per_chunk)
for rr in read_requests:
rr_dict = rr._asdict()
yield self.read(**rr_dict).data if not lazy else self.read(**rr_dict, lazy=True)

157 changes: 157 additions & 0 deletions python/tests/integration/arcticdb/test_arctic_chunk.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,157 @@
"""
Copyright 2024 Man Group Operations Limited

Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt.

As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0.
"""

import pandas as pd
import numpy as np
import pytest
from arcticdb.util.test import assert_frame_equal, get_sample_dataframe
import arcticdb as adb


def arctic_chunk(df, lib, symbol, read_args, chunk_args):
lib.write(symbol, df)
chunks = [c for c in lib.iter_read_chunks(symbol, **read_args, **chunk_args)]
if 'lazy' in read_args and read_args['lazy']:
chunks = [c.collect().data for c in chunks]
return chunks

def arange_include_stop(start, stop, step):
return np.concatenate([np.arange(start, stop, step), [stop]])

def dataframe_chunks(df, row_chunks):
df_chunks = []
for i in range(len(row_chunks) - 1):
df_chunks.append(df.iloc[row_chunks[i]: row_chunks[i+1]])
return df_chunks

def calc_row_chunks(lib, df, chunk_args):
step = None
if not chunk_args:
step = lib.options().rows_per_segment
elif 'rows_per_chunk' in chunk_args:
step = chunk_args['rows_per_chunk']

if step is not None:
return arange_include_stop(0, len(df), step)

row_count = len(df)
num_chunks = chunk_args['num_chunks']
base_rows = row_count // num_chunks
extra_rows = row_count % num_chunks
start_row = 0
row_chunks = [start_row]
for i in range(num_chunks):
end_row = min(start_row + base_rows + (1 if i<extra_rows else 0), row_count)
row_chunks.append(end_row)
start_row = end_row
return row_chunks

def pandas_chunk(df, lib, symbol, read_args, chunk_args, row_chunks_override=None, ignore_db=False):
if ignore_db:
df_db = df
else:
lib.write(symbol, df)
if 'lazy' in read_args and read_args['lazy']:
df_db = lib.read(symbol, **read_args).collect().data
else:
df_db = lib.read(symbol, **read_args).data
row_chunks = calc_row_chunks(lib, df_db, chunk_args) if row_chunks_override is None else row_chunks_override
return dataframe_chunks(df_db, row_chunks)

def assert_chunks_match(c_test, c_check):
assert len(c_test) == len(c_check), f"Number of chunks does not match: test={len(c_test)}, check={len(c_check)}"
for i in range(len(c_test)):
assert_frame_equal(c_test[i], c_check[i])

def get_sample_dataframe_timeseries(size=1000, start_date='20200101'):
df = get_sample_dataframe(size=size)
df['timestamp'] = pd.date_range(start=start_date, periods=size)
return df.set_index('timestamp')

DF_SMALL = get_sample_dataframe_timeseries(17)
DF_SMALL2 = get_sample_dataframe_timeseries(21)
CHUNK_ARGS_ALL = [dict(), dict(num_chunks=4), dict(rows_per_chunk=3)]

def create_lib(ac):
return ac.get_library('chunk_tests', create_if_missing=True)

def create_lib_tiny_segments(ac):
lib_opts = adb.LibraryOptions(rows_per_segment=2, columns_per_segment=2)
return ac.get_library('chunk_tests_tiny', create_if_missing=True, library_options=lib_opts)

def create_lib_tiny_segments_dynamic(ac):
lib_opts = adb.LibraryOptions(rows_per_segment=2, columns_per_segment=2, dynamic_schema=True)
return ac.get_library('chunk_tests_tiny_dynamic', create_if_missing=True, library_options=lib_opts)

def generic_chunk_test(lib, symbol, df, read_args, chunk_args, df_test=None, pd_ignore_db=False):
chunks_db = arctic_chunk(df, lib, symbol, read_args, chunk_args)
df_pd = df if df_test is None else df_test
chunks_pd = pandas_chunk(df_pd, lib, symbol, read_args, chunk_args, ignore_db=pd_ignore_db)
assert_chunks_match(chunks_db, chunks_pd)

def test_chunk_simple(arctic_client):
lib = create_lib(arctic_client)
sym = 'simple'
for chunk_args in CHUNK_ARGS_ALL:
generic_chunk_test(lib, sym, DF_SMALL, dict(), chunk_args)

def test_chunk_simple_tiny(arctic_client):
lib = create_lib_tiny_segments(arctic_client)
sym = 'simple'
for chunk_args in CHUNK_ARGS_ALL:
generic_chunk_test(lib, sym, DF_SMALL, dict(), chunk_args)

def test_chunk_simple_tiny_lazy(arctic_client):
lib = create_lib_tiny_segments(arctic_client)
sym = 'simple'
for chunk_args in CHUNK_ARGS_ALL:
generic_chunk_test(lib, sym, DF_SMALL, dict(lazy=True), chunk_args)

def test_chunk_simple_tiny_dynamic(arctic_client):
lib = create_lib_tiny_segments_dynamic(arctic_client)
sym = 'simple'
for chunk_args in CHUNK_ARGS_ALL:
generic_chunk_test(lib, sym, DF_SMALL, dict(), chunk_args)

def test_chunk_as_of_tiny(arctic_client):
lib = create_lib_tiny_segments(arctic_client)
sym = 'as_of'
write_v = lib.write(sym, DF_SMALL)
for chunk_args in CHUNK_ARGS_ALL:
generic_chunk_test(lib, sym, DF_SMALL2, dict(as_of=write_v.version), chunk_args, DF_SMALL)

def test_chunk_date_range_tiny(arctic_client):
lib = create_lib_tiny_segments(arctic_client)
sym = 'date_range'
dr = (pd.Timestamp('20200102'), pd.Timestamp('20200103'))
for chunk_args in CHUNK_ARGS_ALL:
with pytest.raises(ValueError):
generic_chunk_test(lib, sym, DF_SMALL, dict(date_range=dr), chunk_args)

def test_chunk_row_range_tiny(arctic_client):
lib = create_lib_tiny_segments(arctic_client)
sym = 'row_range'
for chunk_args in CHUNK_ARGS_ALL:
generic_chunk_test(lib, sym, DF_SMALL, dict(row_range=(0,-1)), chunk_args)

def test_chunk_columns_tiny(arctic_client):
lib = create_lib_tiny_segments(arctic_client)
sym = 'columns'
cols = DF_SMALL.columns[[1, 7, 11]]
for chunk_args in CHUNK_ARGS_ALL:
#generic_chunk_test(lib, sym, DF_SMALL, dict(columns=cols), chunk_args, DF_SMALL[cols])
generic_chunk_test(lib, sym, DF_SMALL, dict(columns=cols), chunk_args)

def test_chunk_querybuilder_tiny(arctic_client):
# behaviour of query_builder with/without chunks can be complicated, because the query is run against each chunk
# so this test is limited to a simple case
lib = create_lib_tiny_segments(arctic_client)
sym = 'query_builder'
q = adb.QueryBuilder()[adb.col('timestamp') > DF_SMALL.index[0]]
df_test = DF_SMALL[DF_SMALL.index > DF_SMALL.index[0]]
generic_chunk_test(lib, sym, DF_SMALL, dict(query_builder=q), dict(num_chunks=2), df_test, pd_ignore_db=True)
Loading