diff --git a/environment_unix.yml b/environment_unix.yml index ab8475f500..326081a5ca 100644 --- a/environment_unix.yml +++ b/environment_unix.yml @@ -55,7 +55,7 @@ dependencies: # Build dependencies for tests - libarrow # Python dependences - - python =3.10 # Python 3.11 requires protobuf>=4 + - python=3.10 # Python 3.11 requires protobuf>=4 - packaging - numpy - pandas diff --git a/python/arcticdb/version_store/library.py b/python/arcticdb/version_store/library.py index 26fab9967f..9c185d3d96 100644 --- a/python/arcticdb/version_store/library.py +++ b/python/arcticdb/version_store/library.py @@ -2205,4 +2205,195 @@ def defragment_symbol_data( @property def name(self): """The name of this library.""" - return self._nvs.name() \ No newline at end of file + return self._nvs.name() + + def _get_version_rows(self, + symbol: str, + as_of=None + ) -> Tuple[int, int]: + """ Returns a version and a row_count for a symbol. Used in the chunking api """ + version_query = self._nvs._get_version_query(as_of) + dit = self._nvs.version_store.read_descriptor(symbol, version_query) + return dit.version, dit.timeseries_descriptor.total_rows + + @staticmethod + def _row_range_to_positive_bounds(row_range: Optional[Tuple[Optional[int], Optional[int]]], + row_count: int) -> Optional[Tuple[int, int]]: + """ Transforms a row_range by replacing None and negative bounds with positive bounds to make downstream processing simpler """ + if row_range is None: + return 0, row_count + row_range_start = row_range[0] if row_range[0] is not None else 0 + row_range_end = row_range[1] if row_range[1] is not None else row_count + if row_range_start < 0: + row_range_start += row_count + if row_range_end < 0: + row_range_end += row_count + return row_range_start, row_range_end + + def _read_requests_from_row_segments(self, + symbol: str, + as_of: Optional[AsOf] = None, + date_range: Optional[Tuple[Optional[Timestamp], Optional[Timestamp]]] = None, + row_range: Optional[Tuple[int, int]] = None, + columns: Optional[List[str]] = None, + query_builder: Optional[QueryBuilder] = None + ) -> List[ReadRequest]: + """ returns a list of RowRequest one for each row segment of the symbol """ + if date_range is not None: + raise ValueError("date_range is not yet supported for chunking") + version, row_count = self._get_version_rows(symbol, as_of) + row_range_start, row_range_end = self._row_range_to_positive_bounds(row_range, row_count) + idx = self._nvs.read_index(symbol, as_of=version) + start_row_col = idx.columns.get_loc('start_row') + end_row_col = idx.columns.get_loc('end_row') + start_col_col = idx.columns.get_loc('start_col') + read_requests = [] + for i in range(len(idx)): + start_col = idx.iloc[i, start_col_col] + start_row = int(idx.iloc[i, start_row_col]) + end_row = int(idx.iloc[i, end_row_col]) + # skip the segment if not the first column or completely outside the row range + if start_col > 1 or end_row < row_range_start or start_row >= row_range_end: + continue + read_requests.append(ReadRequest(symbol, as_of=version, columns=columns, + row_range=(max(start_row, row_range_start), min(end_row, row_range_end)), + query_builder=query_builder + ) + ) + return read_requests + + def _read_requests_from_chunks(self, + symbol: str, + as_of: Optional[AsOf] = None, + date_range: Optional[Tuple[Optional[Timestamp], Optional[Timestamp]]] = None, + row_range: Optional[Tuple[int, int]] = None, + columns: Optional[List[str]] = None, + query_builder: Optional[QueryBuilder] = None, + num_chunks: int = None, + rows_per_chunk: int = None + ) -> List[ReadRequest]: + """ returns a list of RowRequest with chunk size specified by either `num_chunks` or `rows_per_chunk` """ + if date_range is not None: + raise ValueError("date_range is not yet supported for chunking") + version, row_count = self._get_version_rows(symbol, as_of) + row_range_start, row_range_end = self._row_range_to_positive_bounds(row_range, row_count) + row_count_adj = row_range_end - row_range_start + if num_chunks is not None: + if rows_per_chunk is not None: + raise ValueError(f"Only one of the arguments num_chunks ({num_chunks})" + f" and rows_per_chunk ({rows_per_chunk}) can be set") + # make the chunks as balanced as possible (differing in rows by 1 max) + base_rows = row_count_adj // num_chunks + extra_rows = row_count_adj % num_chunks + elif rows_per_chunk is not None: + # make the chunks exactly as requested, with the last block picking up the remaining rows + exact_chunks = 0 if (row_count_adj % rows_per_chunk)==0 else 1 + num_chunks = row_count_adj // rows_per_chunk + exact_chunks + base_rows = rows_per_chunk + extra_rows = 0 + else: + raise ValueError(f"One of the arguments num_chunks or rows_per_chunk must be set") + #print(num_chunks, base_rows, extra_rows, row_range_start, row_range_end, row_count_adj) + start_row = row_range_start + read_requests = [] + for i in range(num_chunks): + end_row = min(start_row + base_rows + (1 if i>> df = pd.DataFrame({'row_num': np.arange(11)}, index=pd.date_range('20240101', periods=11)) + >>> lib.write('chunk_test', df) + >>> for c in lib.iter_read_chunks('chunk_test', rows_per_chunk=5): + >>> print(c) + row_num + 2024-01-01 0 + 2024-01-02 1 + 2024-01-03 2 + 2024-01-04 3 + 2024-01-05 4 + row_num + 2024-01-06 5 + 2024-01-07 6 + 2024-01-08 7 + 2024-01-09 8 + 2024-01-10 9 + row_num + 2024-01-11 10 + + """ + if num_chunks is None and rows_per_chunk is None: + read_requests = self._read_requests_from_row_segments(symbol, as_of, date_range, row_range, columns, + query_builder) + else: + read_requests = self._read_requests_from_chunks(symbol, as_of, date_range, row_range, columns, + query_builder, num_chunks, rows_per_chunk) + for rr in read_requests: + rr_dict = rr._asdict() + yield self.read(**rr_dict).data if not lazy else self.read(**rr_dict, lazy=True) + diff --git a/python/tests/integration/arcticdb/test_arctic_chunk.py b/python/tests/integration/arcticdb/test_arctic_chunk.py new file mode 100644 index 0000000000..ff24a1e96e --- /dev/null +++ b/python/tests/integration/arcticdb/test_arctic_chunk.py @@ -0,0 +1,157 @@ +""" +Copyright 2024 Man Group Operations Limited + +Use of this software is governed by the Business Source License 1.1 included in the file licenses/BSL.txt. + +As of the Change Date specified in that file, in accordance with the Business Source License, use of this software will be governed by the Apache License, version 2.0. +""" + +import pandas as pd +import numpy as np +import pytest +from arcticdb.util.test import assert_frame_equal, get_sample_dataframe +import arcticdb as adb + + +def arctic_chunk(df, lib, symbol, read_args, chunk_args): + lib.write(symbol, df) + chunks = [c for c in lib.iter_read_chunks(symbol, **read_args, **chunk_args)] + if 'lazy' in read_args and read_args['lazy']: + chunks = [c.collect().data for c in chunks] + return chunks + +def arange_include_stop(start, stop, step): + return np.concatenate([np.arange(start, stop, step), [stop]]) + +def dataframe_chunks(df, row_chunks): + df_chunks = [] + for i in range(len(row_chunks) - 1): + df_chunks.append(df.iloc[row_chunks[i]: row_chunks[i+1]]) + return df_chunks + +def calc_row_chunks(lib, df, chunk_args): + step = None + if not chunk_args: + step = lib.options().rows_per_segment + elif 'rows_per_chunk' in chunk_args: + step = chunk_args['rows_per_chunk'] + + if step is not None: + return arange_include_stop(0, len(df), step) + + row_count = len(df) + num_chunks = chunk_args['num_chunks'] + base_rows = row_count // num_chunks + extra_rows = row_count % num_chunks + start_row = 0 + row_chunks = [start_row] + for i in range(num_chunks): + end_row = min(start_row + base_rows + (1 if i DF_SMALL.index[0]] + df_test = DF_SMALL[DF_SMALL.index > DF_SMALL.index[0]] + generic_chunk_test(lib, sym, DF_SMALL, dict(query_builder=q), dict(num_chunks=2), df_test, pd_ignore_db=True)