diff --git a/py/server/deephaven/table.py b/py/server/deephaven/table.py index d5d1b69140b..de1a1901371 100644 --- a/py/server/deephaven/table.py +++ b/py/server/deephaven/table.py @@ -12,7 +12,7 @@ from enum import Enum from enum import auto from functools import cached_property -from typing import Any, Optional, Callable, Dict, Generator, Tuple, Literal +from typing import Any, Optional, Callable, Dict, Generator, Tuple, Literal, TypeVar, Generic from typing import Sequence, List, Union, Protocol, Mapping, Iterable import jpy @@ -538,7 +538,220 @@ def __hash__(self): return JObjectWrapper.__hash__(self) -class Table(JObjectWrapper): +T = TypeVar("T", "Table", "PartitionedTableProxy") + +class TableOperations(Generic[T]): + """A protocol for classes that support table operations.""" + j_object: jpy.JType + + def head(self: T, num_rows: int) -> T: + """ When called on a :class:`Table`, the head method creates a new table with a specific number of rows from the + beginning of the table. + + When called on a :class:`PartitionedTableProxy`, the head method applies the head operation to all constituent tables + of the underlying partitioned table and produces a new :class:`PartitionedTableProxy` with the result tables as the + constituents of its underlying partitioned table. + + Args: + num_rows (int): the number of rows at the head of table or the constituent tables + + Returns: + a new :class:`Table` or :class:`PartitionedTableProxy` + + Raises: + DHError + """ + try: + with auto_locking_ctx(self): + return self.__class__(self.j_object.head(num_rows)) + except Exception as e: + raise DHError(e, f"head operation on the {self.__class__.__name__} failed.") from e + + + def tail(self: T, num_rows: int) -> T: + """When called on a :class:`Table`, the tail method creates a new table with a specific number of rows from the + end of the table. + + When called on a :class:`PartitionedTableProxy`, the tail method applies the tail operation to all constituent tables + of the underlying partitioned table, and produces a new :class:`PartitionedTableProxy` with the result tables as the + constituents of its underlying partitioned table. + + Args: + num_rows (int): the number of rows at the tail of table or the constituent tables + + Returns: + a new :class:`Table` or :class:`PartitionedTableProxy` + + Raises: + DHError + """ + try: + with auto_locking_ctx(self): + return self.__class__(self.j_object.tail(num_rows)) + except Exception as e: + raise DHError(e, f"tail operation on the {self.__class__.__name__} failed.") from e + + def reverse(self: T) -> T: + """When called on a :class:`Table`, the reverse method creates a new table with all of the rows from this table + in reverse order. + + When called on a :class:`PartitionedTableProxy`, the reverse method applies the reverse operation to all constituent + tables of the underlying partitioned table, and produces a new :class:`PartitionedTableProxy` with the result tables + as the constituents of its underlying partitioned table. + + Returns: + a new :class:`Table` or :class:`PartitionedTableProxy` + + Raises: + DHError + """ + try: + with auto_locking_ctx(self): + return self.__class__(self.j_object.reverse()) + except Exception as e: + raise DHError(e, f"reverse operation on the {self.__class__.__name__} failed.") from e + + + def snapshot(self: T) -> T: + """When called on a :class:`Table`, the snapshot method returns a static snapshot table. + + When called on a :class:`PartitionedTableProxy`, the snapshot method applies the snapshot operation to all constituent + tables of the underlying partitioned table, and produces a new :class:`PartitionedTableProxy` with the result tables + as the constituents of its underlying partitioned table. + + Returns: + a new :class:`Table` or :class:`PartitionedTableProxy` + + Raises: + DHError + """ + try: + with auto_locking_ctx(self): + return self.__class__(self.j_object.snapshot()) + except Exception as e: + raise DHError(e, "snapshot operation on the {self.__class__.__name__} failed.") from e + + def snapshot_when(self: T, trigger_table: Union[Table, PartitionedTableProxy], + stamp_cols: Union[str, List[str]] = None, initial: bool = False, incremental: bool = False, + history: bool = False) -> T: + """When called on a :class:`Table`, the snapshot_when method returns a table that captures a snapshot of this + table whenever trigger_table updates. When trigger_table updates, a snapshot of this table and the "stamp key" + from trigger_table form the resulting table. The "stamp key" is the last row of the trigger_table, limited by + the stamp_cols. If trigger_table is empty, the "stamp key" will be represented by NULL values. + Note: the trigger_table must be append-only when the history flag is set to True. If the trigger_table is not + append-only and has modified or removed rows in its updates, the result snapshot table will be put in a failure + state and become unusable. + + When called on a :class:`PartitionedTableProxy`, the snapshot_when method applies the snapshot_when operation to + all constituent tables of the underlying partitioned table with the provided trigger :class:`Table or + :class:`.PartitionedTableProxy`, and produces a new :class:`PartitionedTableProxy` with the result tables as the + constituents of its underlying partitioned table. In the case of the trigger table being another :class:`PartitionedTableProxy`, + the snapshot_when operation is applied to the matching pairs of the constituent tables from both underlying + partitioned tables. + + Args: + trigger_table (Union[Table, PartitionedTableProxy]): the trigger Table or PartitionedTableProxy which is only + allowed when called on a PartitionedTableProxy. + stamp_cols (Union[str, Sequence[str]): The columns from trigger_table that form the "stamp key", may be + renames. None, or empty, means that all columns from trigger_table form the "stamp key". + initial (bool): Whether to take an initial snapshot upon construction, default is False. When False, the + resulting table will remain empty until trigger_table first updates. + incremental (bool): Whether the resulting table should be incremental, default is False. When False, all + rows of this table will have the latest "stamp key". When True, only the rows of this table that have + been added or updated will have the latest "stamp key". + history (bool): Whether the resulting table should keep history, default is False. A history table appends a + full snapshot of this table and the "stamp key" as opposed to updating existing rows. The history flag + is currently incompatible with initial and incremental: when history is True, incremental and initial + must be False. + + Returns + a new :class:`Table` or :class:`PartitionedTableProxy` + + Raises: + DHError + """ + try: + if isinstance(self, Table) and isinstance(trigger_table, PartitionedTableProxy): + raise ValueError("snapshot_when operation on a Table with a PartitionedTableProxy trigger_table is not supported.") + + options = _JSnapshotWhenOptions.of(initial, incremental, history, to_sequence(stamp_cols)) + with auto_locking_ctx(self, trigger_table): + return self.__class__(self.j_object.snapshotWhen(trigger_table.j_object, options)) + except Exception as e: + raise DHError(e, f"snapshot_when operation on the {self.__class__.__name__} failed.") from e + + def sort(self: T, order_by: Union[str, Sequence[str]], + order: Union[SortDirection, Sequence[SortDirection]] = None) -> T: + """ When called on a :class:`Table`, the sort method creates a new table where the rows are ordered based on + values in a specified set of columns. + + When called on a :class:`PartitionedTableProxy`, the sort method applies the sort operation to all constituent + tables of the underlying partitioned table, and produces a new :class:`PartitionedTableProxy` with the result + tables as the constituents of its underlying partitioned table. + + Applies the :meth:`~Table.sort` table operation to all constituent tables of the underlying partitioned + table, and produces a new PartitionedTableProxy with the result tables as the constituents of its underlying + partitioned table. + + Args: + order_by (Union[str, Sequence[str]]): the column(s) to be sorted on + order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for + each sort column, default is None, meaning ascending order for all the sort columns. + + Returns: + a new :class:`Table` or :class:`PartitionedTableProxy` + + Raises: + DHError + """ + try: + if not order: + order = (SortDirection.ASCENDING,) * len(order_by) + else: + order = to_sequence(order) + if any([o not in (SortDirection.ASCENDING, SortDirection.DESCENDING) for o in order]): + raise DHError(message="The sort direction must be either 'ASCENDING' or 'DESCENDING'.") + if len(order_by) != len(order): + raise DHError(message="The number of sort columns must be the same as the number of sort directions.") + + sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)] + j_sc_list = j_array_list(sort_columns) + with auto_locking_ctx(self): + return self.__class__(self.j_object.sort(j_sc_list)) + except Exception as e: + raise DHError(e, f"sort operation on the {self.__class__.__name__} failed.") from e + + def sort_descending(self: T, order_by: Union[str, Sequence[str]]) -> T: + """ When called on a :class:`Table`, the sort_descending method creates a new table where rows in a table are + sorted in descending order based on the order_by column(s). + + When called on a :class:`PartitionedTableProxy`, the sort_descending method applies the sort_descending operation + to all constituent tables of the underlying partitioned table, and produces a new :class:`PartitionedTableProxy` + with the result tables as the constituents of its underlying partitioned table. + + + Applies the :meth:`~Table.sort_descending` table operation to all constituent tables of the underlying + partitioned table, and produces a new PartitionedTableProxy with the result tables as the constituents of its + underlying partitioned table. + + Args: + order_by (Union[str, Sequence[str]]): the column(s) to be sorted on + + Returns: + a new :class:`Table` or :class:`PartitionedTableProxy` + + Raises: + DHError + """ + try: + order_by = to_sequence(order_by) + with auto_locking_ctx(self): + return self.__class__(self.j_object.sortDescending(*order_by)) + except Exception as e: + raise DHError(e, f"sort_descending operation on the {self.__class__.__name__} failed.") from e + + +class Table(JObjectWrapper, TableOperations["Table"]): """A Table represents a Deephaven table. It allows applications to perform powerful Deephaven table operations. Note: It should not be instantiated directly by user code. Tables are mostly created by factory methods, @@ -820,59 +1033,6 @@ def remove_blink(self) -> Table: """Returns a non-blink child table, or this table if it is not a blink table.""" return Table(j_table=self.j_table.removeBlink()) - def snapshot(self) -> Table: - """Returns a static snapshot table. - - Returns: - a new table - - Raises: - DHError - """ - try: - with auto_locking_ctx(self): - return Table(j_table=self.j_table.snapshot()) - except Exception as e: - raise DHError(message="failed to create a snapshot.") from e - - def snapshot_when(self, trigger_table: Table, stamp_cols: Union[str, List[str]] = None, initial: bool = False, - incremental: bool = False, history: bool = False) -> Table: - """Returns a table that captures a snapshot of this table whenever trigger_table updates. - - When trigger_table updates, a snapshot of this table and the "stamp key" from trigger_table form the resulting - table. The "stamp key" is the last row of the trigger_table, limited by the stamp_cols. If trigger_table is - empty, the "stamp key" will be represented by NULL values. - - Note: the trigger_table must be append-only when the history flag is set to True. If the trigger_table is not - append-only and has modified or removed rows in its updates, the result snapshot table will be put in a failure - state and become unusable. - - Args: - trigger_table (Table): the trigger table - stamp_cols (Union[str, Sequence[str]): The columns from trigger_table that form the "stamp key", may be - renames. None, or empty, means that all columns from trigger_table form the "stamp key". - initial (bool): Whether to take an initial snapshot upon construction, default is False. When False, the - resulting table will remain empty until trigger_table first updates. - incremental (bool): Whether the resulting table should be incremental, default is False. When False, all - rows of this table will have the latest "stamp key". When True, only the rows of this table that have - been added or updated will have the latest "stamp key". - history (bool): Whether the resulting table should keep history, default is False. A history table appends a - full snapshot of this table and the "stamp key" as opposed to updating existing rows. The history flag - is currently incompatible with initial and incremental: when history is True, incremental and initial - must be False. - - Returns: - a new table - - Raises: - DHError - """ - try: - options = _JSnapshotWhenOptions.of(initial, incremental, history, to_sequence(stamp_cols)) - with auto_locking_ctx(self, trigger_table): - return Table(j_table=self.j_table.snapshotWhen(trigger_table.j_table, options)) - except Exception as e: - raise DHError(message="failed to create a snapshot_when table.") from e # # Table operation category: Select @@ -1189,23 +1349,6 @@ def where_one_of(self, filters: Union[str, Filter, Sequence[str], Sequence[Filte except Exception as e: raise DHError(e, "table where_one_of operation failed.") from e - def head(self, num_rows: int) -> Table: - """The head method creates a new table with a specific number of rows from the beginning of the table. - - Args: - num_rows (int): the number of rows at the head of table - - Returns: - a new table - - Raises: - DHError - """ - try: - return Table(j_table=self.j_table.head(num_rows)) - except Exception as e: - raise DHError(e, "table head operation failed.") from e - def head_pct(self, pct: float) -> Table: """The head_pct method creates a new table with a specific percentage of rows from the beginning of the table. @@ -1223,23 +1366,6 @@ def head_pct(self, pct: float) -> Table: except Exception as e: raise DHError(e, "table head_pct operation failed.") from e - def tail(self, num_rows: int) -> Table: - """The tail method creates a new table with a specific number of rows from the end of the table. - - Args: - num_rows (int): the number of rows at the end of table - - Returns: - a new table - - Raises: - DHError - """ - try: - return Table(j_table=self.j_table.tail(num_rows)) - except Exception as e: - raise DHError(e, "table tail operation failed.") from e - def tail_pct(self, pct: float) -> Table: """The tail_pct method creates a new table with a specific percentage of rows from the end of the table. @@ -1283,71 +1409,6 @@ def restrict_sort_to(self, cols: Union[str, Sequence[str]]): except Exception as e: raise DHError(e, "table restrict_sort_to operation failed.") from e - def sort_descending(self, order_by: Union[str, Sequence[str]]) -> Table: - """The sort_descending method creates a new table where rows in a table are sorted in descending order based on - the order_by column(s). - - Args: - order_by (Union[str, Sequence[str]], optional): the column name(s) - - Returns: - a new table - - Raises: - DHError - """ - try: - order_by = to_sequence(order_by) - return Table(j_table=self.j_table.sortDescending(*order_by)) - except Exception as e: - raise DHError(e, "table sort_descending operation failed.") from e - - def reverse(self) -> Table: - """The reverse method creates a new table with all of the rows from this table in reverse order. - - Returns: - a new table - - Raises: - DHError - """ - try: - return Table(j_table=self.j_table.reverse()) - except Exception as e: - raise DHError(e, "table reverse operation failed.") from e - - def sort(self, order_by: Union[str, Sequence[str]], - order: Union[SortDirection, Sequence[SortDirection]] = None) -> Table: - """The sort method creates a new table where the rows are ordered based on values in a specified set of columns. - - Args: - order_by (Union[str, Sequence[str]]): the column(s) to be sorted on - order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for - each sort column, default is None, meaning ascending order for all the sort columns. - - Returns: - a new table - - Raises: - DHError - """ - - try: - order_by = to_sequence(order_by) - if not order: - order = (SortDirection.ASCENDING,) * len(order_by) - else: - order = to_sequence(order) - if any([o not in (SortDirection.ASCENDING, SortDirection.DESCENDING) for o in order]): - raise DHError(message="The sort direction must be either 'ASCENDING' or 'DESCENDING'.") - if len(order_by) != len(order): - raise DHError(message="The number of sort columns must be the same as the number of sort directions.") - - sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)] - j_sc_list = j_array_list(sort_columns) - return Table(j_table=self.j_table.sort(j_sc_list)) - except Exception as e: - raise DHError(e, "table sort operation failed.") from e # endregion @@ -2809,7 +2870,7 @@ def proxy(self, require_matching_keys: bool = True, sanity_check_joins: bool = T j_pt_proxy=self.j_partitioned_table.proxy(require_matching_keys, sanity_check_joins)) -class PartitionedTableProxy(JObjectWrapper): +class PartitionedTableProxy(JObjectWrapper, TableOperations["PartitionedTableProxy"]): """A PartitionedTableProxy is a table operation proxy for the underlying partitioned table. It provides methods that apply table operations to the constituent tables of the underlying partitioned table, produce a new partitioned table from the resulting constituent tables, and return a proxy of it. @@ -2845,171 +2906,6 @@ def __init__(self, j_pt_proxy): self.sanity_check_joins = self.j_pt_proxy.sanityChecksJoins() self.target = PartitionedTable(j_partitioned_table=self.j_pt_proxy.target()) - def head(self, num_rows: int) -> PartitionedTableProxy: - """Applies the :meth:`~Table.head` table operation to all constituent tables of the underlying partitioned - table, and produces a new PartitionedTableProxy with the result tables as the constituents of its underlying - partitioned table. - - Args: - num_rows (int): the number of rows at the head of the constituent tables - - Returns: - a new PartitionedTableProxy - - Raises: - DHError - """ - try: - with auto_locking_ctx(self): - return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.head(num_rows)) - except Exception as e: - raise DHError(e, "head operation on the PartitionedTableProxy failed.") from e - - def tail(self, num_rows: int) -> PartitionedTableProxy: - """Applies the :meth:`~Table.tail` table operation to all constituent tables of the underlying partitioned - table, and produces a new PartitionedTableProxy with the result tables as the constituents of its underlying - partitioned table. - - Args: - num_rows (int): the number of rows at the end of the constituent tables - - Returns: - a new PartitionedTableProxy - - Raises: - DHError - """ - try: - with auto_locking_ctx(self): - return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.tail(num_rows)) - except Exception as e: - raise DHError(e, "tail operation on the PartitionedTableProxy failed.") from e - - def reverse(self) -> PartitionedTableProxy: - """Applies the :meth:`~Table.reverse` table operation to all constituent tables of the underlying partitioned - table, and produces a new PartitionedTableProxy with the result tables as the constituents of its underlying - partitioned table. - - Returns: - a new PartitionedTableProxy - - Raises: - DHError - """ - try: - with auto_locking_ctx(self): - return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.reverse()) - except Exception as e: - raise DHError(e, "reverse operation on the PartitionedTableProxy failed.") from e - - def snapshot(self) -> PartitionedTableProxy: - """Applies the :meth:`~Table.snapshot` table operation to all constituent tables of the underlying partitioned - table, and produces a new PartitionedTableProxy with the result tables as the constituents of its underlying - partitioned table. - - Returns: - a new PartitionedTableProxy - - Raises: - DHError - """ - try: - with auto_locking_ctx(self): - return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.snapshot()) - except Exception as e: - raise DHError(e, "snapshot operation on the PartitionedTableProxy failed.") from e - - def snapshot_when(self, trigger_table: Union[Table, PartitionedTableProxy], - stamp_cols: Union[str, List[str]] = None, initial: bool = False, incremental: bool = False, - history: bool = False) -> PartitionedTableProxy: - """Applies the :meth:`~Table.snapshot_when` table operation to all constituent tables of the underlying - partitioned table with the provided trigger table or PartitionedTableProxy, and produces a new - PartitionedTableProxy with the result tables as the constituents of its underlying partitioned table. - - In the case of the trigger table being another PartitionedTableProxy, the :meth:`~Table.snapshot_when` table - operation is applied to the matching pairs of the constituent tables from both underlying partitioned tables. - - Args: - trigger_table (Union[Table, PartitionedTableProxy]): the trigger Table or PartitionedTableProxy - stamp_cols (Union[str, Sequence[str]): The columns from trigger_table that form the "stamp key", may be - renames. None, or empty, means that all columns from trigger_table form the "stamp key". - initial (bool): Whether to take an initial snapshot upon construction, default is False. When False, the - resulting table will remain empty until trigger_table first updates. - incremental (bool): Whether the resulting table should be incremental, default is False. When False, all - rows of this table will have the latest "stamp key". When True, only the rows of this table that have - been added or updated will have the latest "stamp key". - history (bool): Whether the resulting table should keep history, default is False. A history table appends a - full snapshot of this table and the "stamp key" as opposed to updating existing rows. The history flag - is currently incompatible with initial and incremental: when history is True, incremental and initial - must be False. - Returns: - a new PartitionedTableProxy - - Raises: - DHError - """ - try: - options = _JSnapshotWhenOptions.of(initial, incremental, history, to_sequence(stamp_cols)) - with auto_locking_ctx(self, trigger_table): - return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.snapshotWhen(trigger_table.j_object, options)) - except Exception as e: - raise DHError(e, "snapshot_when operation on the PartitionedTableProxy failed.") from e - - def sort(self, order_by: Union[str, Sequence[str]], - order: Union[SortDirection, Sequence[SortDirection]] = None) -> PartitionedTableProxy: - """Applies the :meth:`~Table.sort` table operation to all constituent tables of the underlying partitioned - table, and produces a new PartitionedTableProxy with the result tables as the constituents of its underlying - partitioned table. - - Args: - order_by (Union[str, Sequence[str]]): the column(s) to be sorted on - order (Union[SortDirection, Sequence[SortDirection], optional): the corresponding sort directions for - each sort column, default is None, meaning ascending order for all the sort columns. - - Returns: - a new PartitionedTableProxy - - Raises: - DHError - """ - try: - if not order: - order = (SortDirection.ASCENDING,) * len(order_by) - else: - order = to_sequence(order) - if any([o not in (SortDirection.ASCENDING, SortDirection.DESCENDING) for o in order]): - raise DHError(message="The sort direction must be either 'ASCENDING' or 'DESCENDING'.") - if len(order_by) != len(order): - raise DHError(message="The number of sort columns must be the same as the number of sort directions.") - - sort_columns = [_sort_column(col, dir_) for col, dir_ in zip(order_by, order)] - j_sc_list = j_array_list(sort_columns) - with auto_locking_ctx(self): - return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.sort(j_sc_list)) - except Exception as e: - raise DHError(e, "sort operation on the PartitionedTableProxy failed.") from e - - def sort_descending(self, order_by: Union[str, Sequence[str]]) -> PartitionedTableProxy: - """Applies the :meth:`~Table.sort_descending` table operation to all constituent tables of the underlying - partitioned table, and produces a new PartitionedTableProxy with the result tables as the constituents of its - underlying partitioned table. - - Args: - order_by (Union[str, Sequence[str]]): the column(s) to be sorted on - - Returns: - a new PartitionedTableProxy - - Raises: - DHError - """ - try: - order_by = to_sequence(order_by) - with auto_locking_ctx(self): - return PartitionedTableProxy(j_pt_proxy=self.j_pt_proxy.sortDescending(*order_by)) - except Exception as e: - raise DHError(e, "sort_descending operation on the PartitionedTableProxy failed.") from e - def where(self, filters: Union[str, Filter, Sequence[str], Sequence[Filter]] = None) -> PartitionedTableProxy: """Applies the :meth:`~Table.where` table operation to all constituent tables of the underlying partitioned table, and produces a new PartitionedTableProxy with the result tables as the constituents of its underlying diff --git a/sphinx/source/conf.py b/sphinx/source/conf.py index b735b38838d..3c19e732da2 100644 --- a/sphinx/source/conf.py +++ b/sphinx/source/conf.py @@ -68,7 +68,7 @@ # if we allow sphinx to generate type hints for signatures (default), it would make the generated doc cluttered and hard to read autodoc_typehints = 'none' autoclass_content = 'both' - +autodoc_default_options = {'inherited-members': False} #########################################################################################################################################################################