diff --git a/TM1py/Objects/Cube.py b/TM1py/Objects/Cube.py index 52632464..f4119776 100644 --- a/TM1py/Objects/Cube.py +++ b/TM1py/Objects/Cube.py @@ -112,6 +112,6 @@ def _construct_body(self) -> str: body_as_dict['Dimensions@odata.bind'] = [format_url("Dimensions('{}')", dimension) for dimension in self.dimensions] - if self.has_rules: + if self.rules.text: body_as_dict['Rules'] = str(self.rules) return json.dumps(body_as_dict, ensure_ascii=False) diff --git a/TM1py/Objects/Rules.py b/TM1py/Objects/Rules.py index 19f61386..ec4b18e8 100644 --- a/TM1py/Objects/Rules.py +++ b/TM1py/Objects/Rules.py @@ -3,6 +3,7 @@ from TM1py.Objects.TM1Object import TM1Object +KEYWORDS = ['SKIPCHECK', 'FEEDSTRINGS', 'UNDEFVALS', 'FEEDERS'] class Rules(TM1Object): """ @@ -13,7 +14,6 @@ class Rules(TM1Object): comments are not included. """ - KEYWORDS = ['SKIPCHECK', 'FEEDSTRINGS', 'UNDEFVALS', 'FEEDERS'] def __init__(self, rules: str): self._text = rules diff --git a/TM1py/Services/CubeService.py b/TM1py/Services/CubeService.py index fcf31325..de751ea3 100644 --- a/TM1py/Services/CubeService.py +++ b/TM1py/Services/CubeService.py @@ -4,8 +4,10 @@ from typing import List, Iterable, Dict from requests import Response +import base64 from TM1py.Objects.Cube import Cube +from TM1py.Objects.Rules import KEYWORDS from TM1py.Services.CellService import CellService from TM1py.Services.ObjectService import ObjectService from TM1py.Services.RestService import RestService @@ -231,7 +233,7 @@ def search_for_dimension(self, dimension_name: str, skip_control_cubes: bool = F def search_for_dimension_substring(self, substring: str, skip_control_cubes: bool = False, **kwargs) -> Dict[str, List[str]]: - """ Ask TM1 Server for a dictinary of cube names with the dimension whose name contains the substring + """ Ask TM1 Server for a dictionary of cube names with the dimension whose name contains the substring :param substring: string to search for in dim name :param skip_control_cubes: bool, True will exclude control cubes from result @@ -249,6 +251,85 @@ def search_for_dimension_substring(self, substring: str, skip_control_cubes: boo cube_dict = {entry['Name']: [dim['Name'] for dim in entry['Dimensions']] for entry in response.json()['value']} return cube_dict + def enable_cube_rule(self, cube: Cube, sections: List[str] = None) -> None: + """ + Enable a cube rule from its base64-encoded hash if it exists. + + :param cube: An instance of a Cube. + :param sections: a list of valid Rule sections (KEYWORDS) + """ + current_rule = cube.rules.text + if not current_rule: + # If there is no rule, there is nothing to do. + return + + prefix = "# B64 ENCODED " + + if not sections: + # Decode the entire rule + rule_prefix = f"{prefix}RULE=" + encoded_rule = current_rule[len(rule_prefix):] if current_rule.startswith(rule_prefix) else current_rule + cube.rules = base64.b64decode(encoded_rule).decode('utf-8') + else: + for section in [section.upper() for section in sections]: + if section not in KEYWORDS: + raise ValueError(f"{section} is not a valid value, only {KEYWORDS} are accepted.") + else: + new_rule = cube.rules.text.splitlines() + for i, line in enumerate(new_rule): + section_prefix = f"{prefix}{section}=".upper() + if line.upper().startswith(section_prefix): + encoded_section = line[len(section_prefix):] + new_rule[i] = base64.b64decode(encoded_section).decode('utf-8') + + cube.rules = "\n".join(new_rule) + + self.update(cube) + + def disable_cube_rule(self, cube: Cube, sections: List[str] = None) -> None: + """ + Disable a cube rule by saving its base64-encoded hash and commenting each line. + :param cube: An instance of a Cube. + :param sections: a list of valid Rule sections (KEYWORDS) + """ + current_rule = cube.rules.text + if not current_rule: + # If there is no rule, there is nothing to do. + return + + prefix = "# B64 ENCODED " + if not sections: + # Encode the entire rule + cube.rules = f"{prefix}RULE={base64.b64encode(current_rule.encode('utf-8')).decode('utf-8')}" + + else: + for section in [section.upper() for section in sections]: + + if section not in KEYWORDS: + raise ValueError(f"{section} is not a valid value, only {KEYWORDS} are accepted.") + + else: + + if section in ['FEEDSTRINGS', 'UNDEFVALS']: + rule = cube.rules.text.splitlines() + for i, line in enumerate(rule): + section_prefix = f"{prefix}{section}=" + if line.upper().startswith(section): + rule[i] = f"{section_prefix}{base64.b64encode(line.encode('utf-8')).decode('utf-8')}" + cube.rules = "\n".join(rule) + + else: + section_str = 'SKIPCHECK;' if section == 'SKIPCHECK' else 'FEEDERS;' + rule = cube.rules.text.splitlines() + section_starts = rule.index(section_str) + section_ends = rule.index('FEEDERS;') if 'FEEDERS;' in rule and section == 'SKIPCHECK' else len(rule) + section_body = "\n".join(rule[section_starts:section_ends]) + encoded_section = f"{prefix}{section}={base64.b64encode(section_body.encode('utf-8')).decode('utf-8')}" + rule[section_starts:section_ends] = [encoded_section] + cube.rules = "\n".join(rule) + + self.update(cube) + def search_for_rule_substring(self, substring: str, skip_control_cubes: bool = False, case_insensitive=True, space_insensitive=True, **kwargs) -> List[Cube]: """ get all cubes from TM1 Server as TM1py.Cube instances where rules for given cube contain specified substring @@ -405,4 +486,4 @@ def get_vmt(self, cube_name: str): def set_vmt(self, cube_name: str, vmt: int): url = format_url("/Cubes('{}')", cube_name) payload = {"ViewStorageMinTime": vmt} - response = self._rest.PATCH(url=url, data=json.dumps(payload)) \ No newline at end of file + response = self._rest.PATCH(url=url, data=json.dumps(payload)) diff --git a/Tests/CubeService_test.py b/Tests/CubeService_test.py index 0175f1b7..7ad9437a 100644 --- a/Tests/CubeService_test.py +++ b/Tests/CubeService_test.py @@ -1,3 +1,4 @@ +import base64 import configparser import unittest import uuid @@ -27,7 +28,7 @@ def setUp(self): # Connection to TM1 self.config = configparser.ConfigParser() self.config.read(Path(__file__).parent.joinpath('config.ini')) - self.tm1 = TM1Service(**self.config['tm1srv01']) + self.tm1 = TM1Service(**self.config['tm1srv04']) for dimension_name in self.dimension_names: elements = [Element('Element {}'.format(str(j)), 'Numeric') for j in range(1, 1001)] @@ -301,6 +302,37 @@ def test_get_measure_dimension(self): self.assertEqual(self.dimension_names[-1], measure_dimension) + def test_toggle_cube_rule(self): + uncommented = Rules("#comment1\nFEEDSTRINGS;\nUNDEFVALS;\n#comment2\nSKIPCHECK;\n#comment3\n#comment4\n[" \ + "]=N:2;\n#find_me_comment\nFEEDERS;\n#comment5\n[]=>DB(some_cube);\n#comment6") + c = self.tm1.cubes.get(self.cube_name) + c.rules = uncommented + self.tm1.cubes.update(c) + + self.assertEqual(self.tm1.cubes.get(c.name).has_rules, True) + + # test disabling + self.tm1.cubes.disable_cube_rule(c, sections=['FEEDSTRINGS']) + self.tm1.cubes.enable_cube_rule(c, sections=['FEEDSTRINGS']) + self.assertEqual(c.rules.text, uncommented.text) + + self.tm1.cubes.disable_cube_rule(c, sections=['UNDEFVALS']) + self.tm1.cubes.enable_cube_rule(c, sections=['UNDEFVALS']) + self.assertEqual(c.rules.text, uncommented.text) + + self.tm1.cubes.disable_cube_rule(c, sections=['SKIPCHECK', 'FEEDERS']) + self.tm1.cubes.enable_cube_rule(c, sections=['SKIPCHECK', 'FEEDERS']) + self.assertEqual(c.rules.text, uncommented.text) + + self.tm1.cubes.disable_cube_rule(c) + self.assertEqual(c.rules.text.startswith('# B64 ENCODED RULE='), True) + + cells = {('Element 1', 'Element 1', 'Element 1'): 1} + self.tm1.cells.write_values(self.cube_name, cells) + + self.tm1.cubes.enable_cube_rule(c) + self.assertEqual(c.rules.text, uncommented.text) + def tearDown(self): self.tm1.cubes.delete(self.cube_name) if self.tm1.cubes.exists(self.cube_name_to_delete):