From cdb7245d761f170da7cd4f102a947fb07196792e Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?H=C3=A5kon=20V=2E=20Treider?= Date: Mon, 8 Apr 2024 11:23:20 +0200 Subject: [PATCH] Bugfixes: Synthetic Datapoints API (#1683) --- CHANGELOG.md | 21 +- cognite/client/_api/synthetic_time_series.py | 194 ++++++++++-------- cognite/client/_version.py | 2 +- pyproject.toml | 2 +- .../test_api/test_synthetic_time_series.py | 53 +++-- .../test_api/test_synthetic_time_series.py | 47 ++--- 6 files changed, 185 insertions(+), 134 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 07782fc179..75031796ba 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,18 @@ Changes are grouped as follows - `Fixed` for any bug fixes. - `Security` in case of vulnerabilities. +## [7.32.8] - 2024-03-21 +### Fixed +- When using TimeSeries objects without `external_id` as part of the `variables` parameter in a synthetic datapoints + query, a `CogniteNotFoundError` would most likely be raised, due to `None` being silently cast to a string. It now + raises a friendly `ValueError`. +- An invalid expression could be created when using multiple variables in a synthetic datapoints query. This happened + while substituting the variables into the expression; this was done one at a time, leading to later replacements + possibly affecting earlier ones. Now all variables are substituted at the same time/in a single call. +### Improved +- Passing sympy symbols as part of the variables mapping (in synthetic datapoints queries) is now documented properly + and "officially supported". + ## [7.32.7] - 2024-04-05 ### Fixed - Inserting sequence data using `insert_dataframe` would by default drop all rows that contained at least one missing value. @@ -58,16 +70,14 @@ Changes are grouped as follows ### Added - Retrieve method for session, `client.iam.session.retrieve` - The parameter `limit` to the method `client.iam.session.list`. - ### Fixed - The method `client.iam.session.revoke` is now overloaded correctly and returns a `Session` for single id and a `SessionList` for multiple ids. ## [7.30.1] - 2024-03-23 ### Fixed -- When calling `client.sequences.data.retrieve` in a Jupyter Notebook the returning `SequenceRowsList` would raise - an `AttributeError: 'dict' object has no attribute '_repr_html_'`, i.e., the HTML representation of `SequenceRowsList` - was failing. This is now fixed. +- When calling `client.sequences.data.retrieve` in a Jupyter Notebook the returning `SequenceRowsList` no longer raises + `AttributeError: 'dict' object has no attribute '_repr_html_'` (the HTML representation of `SequenceRowsList` was failing). ## [7.30.0] - 2024-03-20 ### Added @@ -80,8 +90,7 @@ Changes are grouped as follows ## [7.28.2] - 2024-03-14 ### Fixed -- Retrieving more than 100 containers, views, data models, or spaces would raise a - `CogniteAPIError`. This is now fixed. +- Retrieving more than 100 containers, views, data models, or spaces no longer raises a `CogniteAPIError`. ## [7.28.1] - 2024-03-13 ### Fixed diff --git a/cognite/client/_api/synthetic_time_series.py b/cognite/client/_api/synthetic_time_series.py index 8739b68fae..8baa3b0f39 100644 --- a/cognite/client/_api/synthetic_time_series.py +++ b/cognite/client/_api/synthetic_time_series.py @@ -2,10 +2,14 @@ import re from datetime import datetime -from typing import TYPE_CHECKING, Any, cast +from functools import cached_property +from types import MappingProxyType +from typing import TYPE_CHECKING, Any, Sequence, Union, cast from cognite.client._api_client import APIClient -from cognite.client.data_classes import Datapoints, DatapointsList, TimeSeries +from cognite.client.data_classes import Datapoints, DatapointsList, TimeSeries, TimeSeriesWrite +from cognite.client.data_classes.time_series import TimeSeriesCore +from cognite.client.utils._auxiliary import is_unlimited from cognite.client.utils._concurrency import execute_tasks from cognite.client.utils._importing import local_import from cognite.client.utils._time import timestamp_to_ms @@ -27,11 +31,11 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client def query( self, - expressions: str | sympy.Expr | SequenceNotStr[str | sympy.Expr], + expressions: str | sympy.Basic | Sequence[str | sympy.Basic], start: int | str | datetime, end: int | str | datetime, limit: int | None = None, - variables: dict[str, str | TimeSeries] | None = None, + variables: dict[str | sympy.Symbol, str | TimeSeries | TimeSeriesWrite] | None = None, aggregate: str | None = None, granularity: str | None = None, target_unit: str | None = None, @@ -40,11 +44,11 @@ def query( """`Calculate the result of a function on time series. `_ Args: - expressions (str | sympy.Expr | SequenceNotStr[str | sympy.Expr]): Functions to be calculated. Supports both strings and sympy expressions. Strings can have either the API `ts{}` syntax, or contain variable names to be replaced using the `variables` parameter. + expressions (str | sympy.Basic | Sequence[str | sympy.Basic]): Functions to be calculated. Supports both strings and sympy expressions. Strings can have either the API `ts{}` syntax, or contain variable names to be replaced using the `variables` parameter. start (int | str | datetime): Inclusive start. - end (int | str | datetime): Exclusive end + end (int | str | datetime): Exclusive end. limit (int | None): Number of datapoints per expression to retrieve. - variables (dict[str, str | TimeSeries] | None): An optional map of symbol replacements. + variables (dict[str | sympy.Symbol, str | TimeSeries | TimeSeriesWrite] | None): An optional map of symbol replacements. aggregate (str | None): use this aggregate when replacing entries from `variables`, does not affect time series given in the `ts{}` syntax. granularity (str | None): use this granularity with the aggregate. target_unit (str | None): use this target_unit when replacing entries from `variables`, does not affect time series given in the `ts{}` syntax. @@ -55,46 +59,55 @@ def query( Examples: - Request a synthetic time series query with direct syntax + Request a synthetic time series query with direct syntax: >>> from cognite.client import CogniteClient >>> client = CogniteClient() - >>> dps = client.time_series.data.synthetic.query(expressions="TS{id:123} + TS{externalId:'abc'}", start="2w-ago", end="now") + >>> dps = client.time_series.data.synthetic.query( + ... expressions="ts{id:123} + ts{externalId:'abc'}", + ... start="2w-ago", + ... end="now") Use variables to re-use an expression: - >>> vars = {"A": "my_ts_external_id", "B": client.time_series.retrieve(id=1)} - >>> dps = client.time_series.data.synthetic.query(expressions="A+B", start="2w-ago", end="now", variables=vars) + >>> ts = client.time_series.retrieve(id=123) + >>> variables = {"A": ts, "B": "my_ts_external_id"} + >>> dps = client.time_series.data.synthetic.query( + ... expressions="A+B", start="2w-ago", end="now", variables=variables) Use sympy to build complex expressions: >>> from sympy import symbols, cos, sin - >>> a = symbols('a') - >>> dps = client.time_series.data.synthetic.query([sin(a), cos(a)], start="2w-ago", end="now", variables={"a": "my_ts_external_id"}, aggregate='interpolation', granularity='1m', target_unit='temperature:deg_c') + >>> x, y = symbols("x y") + >>> dps = client.time_series.data.synthetic.query( + ... [sin(x), y*cos(x)], + ... start="2w-ago", + ... end="now", + ... variables={x: "foo", y: "bar"}, + ... aggregate="interpolation", + ... granularity="15m", + ... target_unit="temperature:deg_c") """ - if limit is None or limit == -1: + if is_unlimited(limit): limit = cast(int, float("inf")) - tasks = [] - expressions_to_iterate = expressions if isinstance(expressions, SequenceNotStr) else [expressions] + if single_expr := not isinstance(expressions, SequenceNotStr): + expressions = [cast(Union[str, "sympy.Basic"], expressions)] - for exp in expressions_to_iterate: + tasks = [] + for user_expr in cast(Sequence[Union[str, "sympy.Basic"]], expressions): expression, short_expression = self._build_expression( - exp, variables, aggregate, granularity, target_unit, target_unit_system + user_expr, variables, aggregate, granularity, target_unit, target_unit_system ) query = {"expression": expression, "start": timestamp_to_ms(start), "end": timestamp_to_ms(end)} - values: list[float] = [] # mypy - query_datapoints = Datapoints(value=values, error=[]) - query_datapoints.external_id = short_expression - + query_datapoints = Datapoints(external_id=short_expression, value=[], error=[]) tasks.append((query, query_datapoints, limit)) datapoints_summary = execute_tasks(self._fetch_datapoints, tasks, max_workers=self._config.max_workers) datapoints_summary.raise_compound_exception_if_failed_tasks() - return ( DatapointsList(datapoints_summary.results, cognite_client=self._cognite_client) - if isinstance(expressions, list) + if not single_expr else datapoints_summary.results[0] ) @@ -104,85 +117,102 @@ def _fetch_datapoints(self, query: dict[str, Any], datapoints: Datapoints, limit resp = self._post(url_path=self._RESOURCE_PATH + "/query", json={"items": [query]}) data = resp.json()["items"][0] datapoints._extend(Datapoints._load(data, expected_fields=["value", "error"])) - limit -= len(data["datapoints"]) - if len(data["datapoints"]) < self._DPS_LIMIT_SYNTH or limit <= 0: + limit -= (n_fetched := len(data["datapoints"])) + if n_fetched < self._DPS_LIMIT_SYNTH or limit <= 0: break query["start"] = data["datapoints"][-1]["timestamp"] + 1 return datapoints - @staticmethod def _build_expression( - expression: str | sympy.Expr, - variables: dict[str, Any] | None = None, + self, + expression: str | sympy.Basic, + variables: dict[str | sympy.Symbol, str | TimeSeries | TimeSeriesWrite] | None = None, aggregate: str | None = None, granularity: str | None = None, target_unit: str | None = None, target_unit_system: str | None = None, ) -> tuple[str, str]: - if expression.__class__.__module__.startswith("sympy."): - expression_str = SyntheticDatapointsAPI._sympy_to_sts(expression) - if not variables: + if getattr(expression, "__sympy__", False) is True: + if variables: + expression_str = self._process_sympy_expression(cast("sympy.Basic", expression)) + else: raise ValueError( "sympy expressions are only supported in combination with the `variables` parameter to map symbols to time series." ) + elif isinstance(expression, str): + expression_str = expression else: - expression_str = cast(str, expression) + raise TypeError(f"expression must be str or a sympy expression, not {type(expression)}") + if aggregate and granularity: aggregate_str = f",aggregate:'{aggregate}',granularity:'{granularity}'" - else: + elif not aggregate and not granularity: aggregate_str = "" + else: + raise ValueError("Pass either both of 'aggregate' and 'granularity', or neither") + + target_unit_str = "" if target_unit: if target_unit_system: - raise ValueError("Only one of targetUnit and targetUnitSystem can be specified.") + raise ValueError("Only one of 'target_unit' and 'target_unit_system' can be specified.") target_unit_str = f",targetUnit:'{target_unit}'" elif target_unit_system: target_unit_str = f",targetUnitSystem:'{target_unit_system}'" - else: - target_unit_str = "" - expression_with_ts: str = expression_str - if variables: - for k, v in variables.items(): - if isinstance(v, TimeSeries): - v = v.external_id - expression_with_ts = re.sub( - re.compile(rf"\b{k}\b"), - f"ts{{externalId:'{v}'{aggregate_str}{target_unit_str}}}", - expression_with_ts, - ) + + if not variables: + return expression_str, expression_str + + to_substitute = {} + for k, v in variables.items(): + if isinstance(v, TimeSeriesCore): + if v.external_id is None: + raise ValueError(f"TimeSeries passed in 'variables' is missing required field 'external_id' ({v})") + v = v.external_id + # We convert to str to ensure any sympy.Symbol is replaced with its name: + to_substitute[re.escape(str(k))] = f"ts{{externalId:'{v}'{aggregate_str}{target_unit_str}}}" + + # Substitute all variables in one go to avoid substitution of prior substitutions: + pattern = re.compile(r"\b" + r"\b|\b".join(to_substitute) + r"\b") # note: \b marks a word boundary + expression_with_ts = pattern.sub(lambda match: to_substitute[match[0]], expression_str) return expression_with_ts, expression_str - @staticmethod - def _sympy_to_sts(expression: str | sympy.Expr) -> str: - sympy_module = local_import("sympy") - - infix_ops = {sympy_module.Add: "+", sympy_module.Mul: "*"} - functions = { - sympy_module.cos: "cos", - sympy_module.sin: "sin", - sympy_module.sqrt: "sqrt", - sympy_module.log: "ln", - sympy_module.exp: "exp", - sympy_module.Abs: "abs", - } - - def process_symbol(sym: Any) -> str: - if isinstance(sym, sympy_module.AtomicExpr): - if isinstance(sym, sympy_module.NumberSymbol): - return str(sym.evalf(15)) - else: - return str(sym) - - infixop = infix_ops.get(sym.__class__) - if infixop: - return "(" + infixop.join(process_symbol(s) for s in sym.args) + ")" - - if isinstance(sym, sympy_module.Pow): - if sym.args[1] == -1: - return f"(1/{process_symbol(sym.args[0])})" - return f"pow({','.join(map(process_symbol, sym.args))})" - - if funop := functions.get(sym.__class__): - return f"{funop}({','.join(map(process_symbol, sym.args))})" - raise ValueError(f"Unsupported sympy class {sym.__class__} encountered in expression") - - return process_symbol(expression) + @cached_property + def _supported_sympy_infix_ops(self) -> MappingProxyType[type[sympy.Basic], str]: + sympy = local_import("sympy") + return MappingProxyType({sympy.Add: "+", sympy.Mul: "*"}) + + @cached_property + def _supported_sympy_functions(self) -> MappingProxyType[type[sympy.Basic], str]: + sympy = local_import("sympy") + return MappingProxyType( + { + sympy.cos: "cos", + sympy.sin: "sin", + sympy.sqrt: "sqrt", + sympy.log: "ln", + sympy.exp: "exp", + sympy.Abs: "abs", + } + ) + + def _process_sympy_expression(self, expression: sympy.Basic) -> str: + sympy = local_import("sympy") + + if isinstance(expression, sympy.AtomicExpr): + if isinstance(expression, sympy.NumberSymbol): + return str(expression.evalf(15)).rstrip("0") + else: + return str(expression).rstrip("0") + + expr_cls = type(expression) + if infix_op := self._supported_sympy_infix_ops.get(expr_cls): + return "(" + infix_op.join(self._process_sympy_expression(s) for s in expression.args) + ")" + + if isinstance(expression, sympy.Pow): + if expression.args[1] == -1: + return f"(1/{self._process_sympy_expression(expression.args[0])})" + return f"pow({','.join(map(self._process_sympy_expression, expression.args))})" + + if fn_op := self._supported_sympy_functions.get(expr_cls): + return f"{fn_op}({','.join(map(self._process_sympy_expression, expression.args))})" + raise TypeError(f"Unsupported sympy class {expr_cls} encountered in expression") diff --git a/cognite/client/_version.py b/cognite/client/_version.py index 04bf50cf7c..1f296cd459 100644 --- a/cognite/client/_version.py +++ b/cognite/client/_version.py @@ -1,4 +1,4 @@ from __future__ import annotations -__version__ = "7.32.7" +__version__ = "7.32.8" __api_subversion__ = "20230101" diff --git a/pyproject.toml b/pyproject.toml index 051b529bd5..3218df5c1e 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,7 +1,7 @@ [tool.poetry] name = "cognite-sdk" -version = "7.32.7" +version = "7.32.8" description = "Cognite Python SDK" readme = "README.md" documentation = "https://cognite-sdk-python.readthedocs-hosted.com" diff --git a/tests/tests_integration/test_api/test_synthetic_time_series.py b/tests/tests_integration/test_api/test_synthetic_time_series.py index 69ff072393..02bc18598e 100644 --- a/tests/tests_integration/test_api/test_synthetic_time_series.py +++ b/tests/tests_integration/test_api/test_synthetic_time_series.py @@ -51,7 +51,7 @@ def test_query_with_multiple_expressions(self, cognite_client, test_time_series, assert 23456 == len(dps[1]) assert 6 == cognite_client.time_series.data.synthetic._post.call_count - def test_query_with_errors(self, cognite_client, test_time_series, post_spy): + def test_query_using_time_series_objs__with_errors(self, cognite_client, test_time_series, post_spy): dps = cognite_client.time_series.data.synthetic.query( expressions=["A / (B - B)"], start=datetime(2017, 1, 1), @@ -66,6 +66,21 @@ def test_query_with_errors(self, cognite_client, test_time_series, post_spy): assert (100, 1) == dps.to_pandas().shape assert (100, 2) == dps.to_pandas(include_errors=True).shape + def test_query_using_time_series_objs__missing_external_id(self, cognite_client, test_time_series): + (whoopsie_ts := test_time_series[1].as_write()).external_id = None + # Before SDK version 7.32.8, when a passed TimeSeries missing external_id was passed, None + # was just cast to string and passed to the API, most likely leading to a "not found" error + with pytest.raises( + ValueError, match="^TimeSeries passed in 'variables' is missing required field 'external_id'" + ): + cognite_client.time_series.data.synthetic.query( + expressions="A / B", + start=datetime(2017, 1, 1), + end="now", + limit=100, + variables={"A": test_time_series[0], "B": whoopsie_ts}, + ) + @pytest.mark.dsl def test_expression_builder_time_series_vs_string(self, cognite_client, test_time_series): from sympy import symbols @@ -90,28 +105,30 @@ def test_expression_builder_time_series_vs_string(self, cognite_client, test_tim assert isinstance(dps1, Datapoints) assert isinstance(dps2, DatapointsList) - @pytest.mark.skip("flaky") @pytest.mark.dsl def test_expression_builder_complex(self, cognite_client, test_time_series): - from sympy import cos, log, pi, sin, sqrt, symbols + from sympy import Abs, cos, log, pi, sin, sqrt, symbols - abc = list("abcdefghij") - syms = symbols(abc) - expression = syms[0] - for s in syms: - expression = expression + s + string_symbols = list("abcdefghij") + syms = symbols(string_symbols) expression = ( - (expression * expression) + sum(syms) ** 2 + sqrt(sin(pi * 0.1 ** syms[1])) + log(23 + syms[5] ** 1.234) + cos(syms[3] ** (1 + 0.1 ** syms[4])) - + sqrt(log(abs(syms[8]) + 1)) + + sqrt(log(Abs(syms[8]) + 1)) ) - dps1 = cognite_client.time_series.data.synthetic.query( - expressions=[expression], - start=datetime(2017, 1, 1), - end="now", - limit=100, - variables={v: test_time_series[tsi] for v, tsi in zip(abc, range(10))}, - )[0] - assert 100 == len(dps1) + symbolic_vars = {sym: ts for sym, ts in zip(syms, test_time_series.values())} + string_variables = {ss: ts for ss, ts in zip(string_symbols, test_time_series.values())} + + for variables in symbolic_vars, string_variables: + dps1 = cognite_client.time_series.data.synthetic.query( + expressions=expression, + start=datetime(2017, 1, 1, tzinfo=timezone.utc), + end="now", + limit=100, + variables=variables, + aggregate="average", + granularity="3s", + ) + assert 100 == len(dps1) diff --git a/tests/tests_unit/test_api/test_synthetic_time_series.py b/tests/tests_unit/test_api/test_synthetic_time_series.py index 226940e6dd..969447f131 100644 --- a/tests/tests_unit/test_api/test_synthetic_time_series.py +++ b/tests/tests_unit/test_api/test_synthetic_time_series.py @@ -9,13 +9,7 @@ def generate_datapoints(start: int, end: int, granularity=1): - dps = [] - for i in range(start, end, granularity): - dp = {} - dp["value"] = random() - dp["timestamp"] = i - dps.append(dp) - return dps + return [{"value": random(), "timestamp": i} for i in range(start, end, granularity)] @pytest.fixture @@ -94,36 +88,37 @@ def test_query_empty(self, cognite_client, mock_get_datapoints_empty): def test_expression_builder(self, cognite_client): from sympy import symbols - assert ("ts{externalId:'x'}", "a") == cognite_client.time_series.data.synthetic._build_expression( - symbols("a"), {"a": "x"} - ) + build_fn = cognite_client.time_series.data.synthetic._build_expression + assert ("ts{externalId:'x'}", "a") == build_fn(symbols("a"), {"a": "x"}) assert ( "ts{externalId:'x',aggregate:'average',granularity:'1m'}", "a", - ) == cognite_client.time_series.data.synthetic._build_expression( - symbols("a"), {"a": "x"}, aggregate="average", granularity="1m" - ) + ) == build_fn(symbols("a"), {"a": "x"}, aggregate="average", granularity="1m") assert ( "(ts{externalId:'x'}+ts{externalId:'y'}+ts{externalId:'z'})", "(a+b+c)", - ) == cognite_client.time_series.data.synthetic._build_expression( - symbols("a") + symbols("b") + symbols("c"), {"a": "x", "b": "y", "c": "z"} - ) - assert ("(1/ts{externalId:'a'})", "(1/a)") == cognite_client.time_series.data.synthetic._build_expression( - 1 / symbols("a"), {"a": "a"} - ) + ) == build_fn(symbols("a") + symbols("b") + symbols("c"), {"a": "x", "b": "y", "c": "z"}) + assert ("(1/ts{externalId:'a'})", "(1/a)") == build_fn(1 / symbols("a"), {"a": "a"}) assert ( "ts{externalId:'x',targetUnit:'temperature:deg_c'}", "a", - ) == cognite_client.time_series.data.synthetic._build_expression( - symbols("a"), {"a": "x"}, target_unit="temperature:deg_c" - ) + ) == build_fn(symbols("a"), {"a": "x"}, target_unit="temperature:deg_c") assert ( "ts{externalId:'x',targetUnitSystem:'Imperial'}", "a", - ) == cognite_client.time_series.data.synthetic._build_expression( - symbols("a"), {"a": "x"}, target_unit_system="Imperial" - ) + ) == build_fn(symbols("a"), {"a": "x"}, target_unit_system="Imperial") + + @pytest.mark.dsl + def test_expression_builder__overlapping(self, cognite_client): + # Before SDK version 7.30.1, variable replacements were done one-by-one, which could mean + # that a later replacement would affect an earlier one. + from sympy import symbols + + build_fn = cognite_client.time_series.data.synthetic._build_expression + x, y = symbols("x y") + long_expr, short_expr = build_fn(x + y, {x: "test-x-y-z", y: "foo"}) + assert short_expr == "(x+y)" + assert long_expr == "(ts{externalId:'test-x-y-z'}+ts{externalId:'foo'})", "(x+y)" @pytest.mark.dsl def test_expression_builder_variables_missing(self, cognite_client): @@ -138,7 +133,7 @@ def test_expression_builder_variables_missing(self, cognite_client): def test_expression_builder_unsupported_missing(self, cognite_client): from sympy import cot, symbols - with pytest.raises(ValueError, match="Unsupported sympy class cot"): + with pytest.raises(TypeError, match="^Unsupported sympy class cot"): cognite_client.time_series.data.synthetic.query( [symbols("a") + cot(symbols("a"))], start=0, end="now", variables={"a": "a"} )