Skip to content

Commit

Permalink
Bugfixes: Synthetic Datapoints API (#1683)
Browse files Browse the repository at this point in the history
  • Loading branch information
haakonvt authored Apr 8, 2024
1 parent c50ab29 commit cdb7245
Show file tree
Hide file tree
Showing 6 changed files with 185 additions and 134 deletions.
21 changes: 15 additions & 6 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,18 @@ Changes are grouped as follows
- `Fixed` for any bug fixes.
- `Security` in case of vulnerabilities.

## [7.32.8] - 2024-03-21
### Fixed
- When using TimeSeries objects without `external_id` as part of the `variables` parameter in a synthetic datapoints
query, a `CogniteNotFoundError` would most likely be raised, due to `None` being silently cast to a string. It now
raises a friendly `ValueError`.
- An invalid expression could be created when using multiple variables in a synthetic datapoints query. This happened
while substituting the variables into the expression; this was done one at a time, leading to later replacements
possibly affecting earlier ones. Now all variables are substituted at the same time/in a single call.
### Improved
- Passing sympy symbols as part of the variables mapping (in synthetic datapoints queries) is now documented properly
and "officially supported".

## [7.32.7] - 2024-04-05
### Fixed
- Inserting sequence data using `insert_dataframe` would by default drop all rows that contained at least one missing value.
Expand Down Expand Up @@ -58,16 +70,14 @@ Changes are grouped as follows
### Added
- Retrieve method for session, `client.iam.session.retrieve`
- The parameter `limit` to the method `client.iam.session.list`.

### Fixed
- The method `client.iam.session.revoke` is now overloaded correctly and returns a `Session` for single id
and a `SessionList` for multiple ids.

## [7.30.1] - 2024-03-23
### Fixed
- When calling `client.sequences.data.retrieve` in a Jupyter Notebook the returning `SequenceRowsList` would raise
an `AttributeError: 'dict' object has no attribute '_repr_html_'`, i.e., the HTML representation of `SequenceRowsList`
was failing. This is now fixed.
- When calling `client.sequences.data.retrieve` in a Jupyter Notebook the returning `SequenceRowsList` no longer raises
`AttributeError: 'dict' object has no attribute '_repr_html_'` (the HTML representation of `SequenceRowsList` was failing).

## [7.30.0] - 2024-03-20
### Added
Expand All @@ -80,8 +90,7 @@ Changes are grouped as follows

## [7.28.2] - 2024-03-14
### Fixed
- Retrieving more than 100 containers, views, data models, or spaces would raise a
`CogniteAPIError`. This is now fixed.
- Retrieving more than 100 containers, views, data models, or spaces no longer raises a `CogniteAPIError`.

## [7.28.1] - 2024-03-13
### Fixed
Expand Down
194 changes: 112 additions & 82 deletions cognite/client/_api/synthetic_time_series.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@

import re
from datetime import datetime
from typing import TYPE_CHECKING, Any, cast
from functools import cached_property
from types import MappingProxyType
from typing import TYPE_CHECKING, Any, Sequence, Union, cast

from cognite.client._api_client import APIClient
from cognite.client.data_classes import Datapoints, DatapointsList, TimeSeries
from cognite.client.data_classes import Datapoints, DatapointsList, TimeSeries, TimeSeriesWrite
from cognite.client.data_classes.time_series import TimeSeriesCore
from cognite.client.utils._auxiliary import is_unlimited
from cognite.client.utils._concurrency import execute_tasks
from cognite.client.utils._importing import local_import
from cognite.client.utils._time import timestamp_to_ms
Expand All @@ -27,11 +31,11 @@ def __init__(self, config: ClientConfig, api_version: str | None, cognite_client

def query(
self,
expressions: str | sympy.Expr | SequenceNotStr[str | sympy.Expr],
expressions: str | sympy.Basic | Sequence[str | sympy.Basic],
start: int | str | datetime,
end: int | str | datetime,
limit: int | None = None,
variables: dict[str, str | TimeSeries] | None = None,
variables: dict[str | sympy.Symbol, str | TimeSeries | TimeSeriesWrite] | None = None,
aggregate: str | None = None,
granularity: str | None = None,
target_unit: str | None = None,
Expand All @@ -40,11 +44,11 @@ def query(
"""`Calculate the result of a function on time series. <https://developer.cognite.com/api#tag/Synthetic-Time-Series/operation/querySyntheticTimeseries>`_
Args:
expressions (str | sympy.Expr | SequenceNotStr[str | sympy.Expr]): Functions to be calculated. Supports both strings and sympy expressions. Strings can have either the API `ts{}` syntax, or contain variable names to be replaced using the `variables` parameter.
expressions (str | sympy.Basic | Sequence[str | sympy.Basic]): Functions to be calculated. Supports both strings and sympy expressions. Strings can have either the API `ts{}` syntax, or contain variable names to be replaced using the `variables` parameter.
start (int | str | datetime): Inclusive start.
end (int | str | datetime): Exclusive end
end (int | str | datetime): Exclusive end.
limit (int | None): Number of datapoints per expression to retrieve.
variables (dict[str, str | TimeSeries] | None): An optional map of symbol replacements.
variables (dict[str | sympy.Symbol, str | TimeSeries | TimeSeriesWrite] | None): An optional map of symbol replacements.
aggregate (str | None): use this aggregate when replacing entries from `variables`, does not affect time series given in the `ts{}` syntax.
granularity (str | None): use this granularity with the aggregate.
target_unit (str | None): use this target_unit when replacing entries from `variables`, does not affect time series given in the `ts{}` syntax.
Expand All @@ -55,46 +59,55 @@ def query(
Examples:
Request a synthetic time series query with direct syntax
Request a synthetic time series query with direct syntax:
>>> from cognite.client import CogniteClient
>>> client = CogniteClient()
>>> dps = client.time_series.data.synthetic.query(expressions="TS{id:123} + TS{externalId:'abc'}", start="2w-ago", end="now")
>>> dps = client.time_series.data.synthetic.query(
... expressions="ts{id:123} + ts{externalId:'abc'}",
... start="2w-ago",
... end="now")
Use variables to re-use an expression:
>>> vars = {"A": "my_ts_external_id", "B": client.time_series.retrieve(id=1)}
>>> dps = client.time_series.data.synthetic.query(expressions="A+B", start="2w-ago", end="now", variables=vars)
>>> ts = client.time_series.retrieve(id=123)
>>> variables = {"A": ts, "B": "my_ts_external_id"}
>>> dps = client.time_series.data.synthetic.query(
... expressions="A+B", start="2w-ago", end="now", variables=variables)
Use sympy to build complex expressions:
>>> from sympy import symbols, cos, sin
>>> a = symbols('a')
>>> dps = client.time_series.data.synthetic.query([sin(a), cos(a)], start="2w-ago", end="now", variables={"a": "my_ts_external_id"}, aggregate='interpolation', granularity='1m', target_unit='temperature:deg_c')
>>> x, y = symbols("x y")
>>> dps = client.time_series.data.synthetic.query(
... [sin(x), y*cos(x)],
... start="2w-ago",
... end="now",
... variables={x: "foo", y: "bar"},
... aggregate="interpolation",
... granularity="15m",
... target_unit="temperature:deg_c")
"""
if limit is None or limit == -1:
if is_unlimited(limit):
limit = cast(int, float("inf"))

tasks = []
expressions_to_iterate = expressions if isinstance(expressions, SequenceNotStr) else [expressions]
if single_expr := not isinstance(expressions, SequenceNotStr):
expressions = [cast(Union[str, "sympy.Basic"], expressions)]

for exp in expressions_to_iterate:
tasks = []
for user_expr in cast(Sequence[Union[str, "sympy.Basic"]], expressions):
expression, short_expression = self._build_expression(
exp, variables, aggregate, granularity, target_unit, target_unit_system
user_expr, variables, aggregate, granularity, target_unit, target_unit_system
)
query = {"expression": expression, "start": timestamp_to_ms(start), "end": timestamp_to_ms(end)}
values: list[float] = [] # mypy
query_datapoints = Datapoints(value=values, error=[])
query_datapoints.external_id = short_expression

query_datapoints = Datapoints(external_id=short_expression, value=[], error=[])
tasks.append((query, query_datapoints, limit))

datapoints_summary = execute_tasks(self._fetch_datapoints, tasks, max_workers=self._config.max_workers)
datapoints_summary.raise_compound_exception_if_failed_tasks()

return (
DatapointsList(datapoints_summary.results, cognite_client=self._cognite_client)
if isinstance(expressions, list)
if not single_expr
else datapoints_summary.results[0]
)

Expand All @@ -104,85 +117,102 @@ def _fetch_datapoints(self, query: dict[str, Any], datapoints: Datapoints, limit
resp = self._post(url_path=self._RESOURCE_PATH + "/query", json={"items": [query]})
data = resp.json()["items"][0]
datapoints._extend(Datapoints._load(data, expected_fields=["value", "error"]))
limit -= len(data["datapoints"])
if len(data["datapoints"]) < self._DPS_LIMIT_SYNTH or limit <= 0:
limit -= (n_fetched := len(data["datapoints"]))
if n_fetched < self._DPS_LIMIT_SYNTH or limit <= 0:
break
query["start"] = data["datapoints"][-1]["timestamp"] + 1
return datapoints

@staticmethod
def _build_expression(
expression: str | sympy.Expr,
variables: dict[str, Any] | None = None,
self,
expression: str | sympy.Basic,
variables: dict[str | sympy.Symbol, str | TimeSeries | TimeSeriesWrite] | None = None,
aggregate: str | None = None,
granularity: str | None = None,
target_unit: str | None = None,
target_unit_system: str | None = None,
) -> tuple[str, str]:
if expression.__class__.__module__.startswith("sympy."):
expression_str = SyntheticDatapointsAPI._sympy_to_sts(expression)
if not variables:
if getattr(expression, "__sympy__", False) is True:
if variables:
expression_str = self._process_sympy_expression(cast("sympy.Basic", expression))
else:
raise ValueError(
"sympy expressions are only supported in combination with the `variables` parameter to map symbols to time series."
)
elif isinstance(expression, str):
expression_str = expression
else:
expression_str = cast(str, expression)
raise TypeError(f"expression must be str or a sympy expression, not {type(expression)}")

if aggregate and granularity:
aggregate_str = f",aggregate:'{aggregate}',granularity:'{granularity}'"
else:
elif not aggregate and not granularity:
aggregate_str = ""
else:
raise ValueError("Pass either both of 'aggregate' and 'granularity', or neither")

target_unit_str = ""
if target_unit:
if target_unit_system:
raise ValueError("Only one of targetUnit and targetUnitSystem can be specified.")
raise ValueError("Only one of 'target_unit' and 'target_unit_system' can be specified.")
target_unit_str = f",targetUnit:'{target_unit}'"
elif target_unit_system:
target_unit_str = f",targetUnitSystem:'{target_unit_system}'"
else:
target_unit_str = ""
expression_with_ts: str = expression_str
if variables:
for k, v in variables.items():
if isinstance(v, TimeSeries):
v = v.external_id
expression_with_ts = re.sub(
re.compile(rf"\b{k}\b"),
f"ts{{externalId:'{v}'{aggregate_str}{target_unit_str}}}",
expression_with_ts,
)

if not variables:
return expression_str, expression_str

to_substitute = {}
for k, v in variables.items():
if isinstance(v, TimeSeriesCore):
if v.external_id is None:
raise ValueError(f"TimeSeries passed in 'variables' is missing required field 'external_id' ({v})")
v = v.external_id
# We convert to str to ensure any sympy.Symbol is replaced with its name:
to_substitute[re.escape(str(k))] = f"ts{{externalId:'{v}'{aggregate_str}{target_unit_str}}}"

# Substitute all variables in one go to avoid substitution of prior substitutions:
pattern = re.compile(r"\b" + r"\b|\b".join(to_substitute) + r"\b") # note: \b marks a word boundary
expression_with_ts = pattern.sub(lambda match: to_substitute[match[0]], expression_str)
return expression_with_ts, expression_str

@staticmethod
def _sympy_to_sts(expression: str | sympy.Expr) -> str:
sympy_module = local_import("sympy")

infix_ops = {sympy_module.Add: "+", sympy_module.Mul: "*"}
functions = {
sympy_module.cos: "cos",
sympy_module.sin: "sin",
sympy_module.sqrt: "sqrt",
sympy_module.log: "ln",
sympy_module.exp: "exp",
sympy_module.Abs: "abs",
}

def process_symbol(sym: Any) -> str:
if isinstance(sym, sympy_module.AtomicExpr):
if isinstance(sym, sympy_module.NumberSymbol):
return str(sym.evalf(15))
else:
return str(sym)

infixop = infix_ops.get(sym.__class__)
if infixop:
return "(" + infixop.join(process_symbol(s) for s in sym.args) + ")"

if isinstance(sym, sympy_module.Pow):
if sym.args[1] == -1:
return f"(1/{process_symbol(sym.args[0])})"
return f"pow({','.join(map(process_symbol, sym.args))})"

if funop := functions.get(sym.__class__):
return f"{funop}({','.join(map(process_symbol, sym.args))})"
raise ValueError(f"Unsupported sympy class {sym.__class__} encountered in expression")

return process_symbol(expression)
@cached_property
def _supported_sympy_infix_ops(self) -> MappingProxyType[type[sympy.Basic], str]:
sympy = local_import("sympy")
return MappingProxyType({sympy.Add: "+", sympy.Mul: "*"})

@cached_property
def _supported_sympy_functions(self) -> MappingProxyType[type[sympy.Basic], str]:
sympy = local_import("sympy")
return MappingProxyType(
{
sympy.cos: "cos",
sympy.sin: "sin",
sympy.sqrt: "sqrt",
sympy.log: "ln",
sympy.exp: "exp",
sympy.Abs: "abs",
}
)

def _process_sympy_expression(self, expression: sympy.Basic) -> str:
sympy = local_import("sympy")

if isinstance(expression, sympy.AtomicExpr):
if isinstance(expression, sympy.NumberSymbol):
return str(expression.evalf(15)).rstrip("0")
else:
return str(expression).rstrip("0")

expr_cls = type(expression)
if infix_op := self._supported_sympy_infix_ops.get(expr_cls):
return "(" + infix_op.join(self._process_sympy_expression(s) for s in expression.args) + ")"

if isinstance(expression, sympy.Pow):
if expression.args[1] == -1:
return f"(1/{self._process_sympy_expression(expression.args[0])})"
return f"pow({','.join(map(self._process_sympy_expression, expression.args))})"

if fn_op := self._supported_sympy_functions.get(expr_cls):
return f"{fn_op}({','.join(map(self._process_sympy_expression, expression.args))})"
raise TypeError(f"Unsupported sympy class {expr_cls} encountered in expression")
2 changes: 1 addition & 1 deletion cognite/client/_version.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
from __future__ import annotations

__version__ = "7.32.7"
__version__ = "7.32.8"
__api_subversion__ = "20230101"
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
[tool.poetry]
name = "cognite-sdk"

version = "7.32.7"
version = "7.32.8"
description = "Cognite Python SDK"
readme = "README.md"
documentation = "https://cognite-sdk-python.readthedocs-hosted.com"
Expand Down
Loading

0 comments on commit cdb7245

Please sign in to comment.