Skip to content

Commit

Permalink
refactor(node): use url instead of paths
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBelthle committed Aug 20, 2024
1 parent 555c133 commit c261c4d
Show file tree
Hide file tree
Showing 3 changed files with 129 additions and 32 deletions.
22 changes: 22 additions & 0 deletions antarest/study/storage/rawstudy/model/filesystem/lazy_node.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,12 @@
import shutil
import typing as t
from abc import ABC, abstractmethod
from dataclasses import dataclass
from datetime import datetime, timedelta
from pathlib import Path
from zipfile import ZipFile

from antarest.core.exceptions import ChildNotFoundError
from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfig
from antarest.study.storage.rawstudy.model.filesystem.context import ContextServer
from antarest.study.storage.rawstudy.model.filesystem.inode import G, INode, S, V
Expand Down Expand Up @@ -126,6 +128,26 @@ def save(self, data: t.Union[str, bytes, S], url: t.Optional[t.List[str]] = None
self.get_link_path().unlink()
return None

def _infer_path(self) -> Path:
if self.get_link_path().exists():
return self.get_link_path()
elif self.config.path.exists():
return self.config.path
raise ChildNotFoundError(f"Neither link file {self.get_link_path()} nor matrix file {self.config.path} exists")

def get_suffixes(self) -> t.List[str]:
return self._infer_path().suffixes

def rename_file(self, url: t.List[str]) -> None:
target_path = self.config.study_path.joinpath(*url)
target_path.unlink(missing_ok=True)
self._infer_path().rename(target_path)

def copy_file(self, url: t.List[str]) -> None:
target_path = self.config.study_path.joinpath(*url)
target_path.unlink(missing_ok=True)
shutil.copy(self._infer_path(), target_path)

def get_lazy_content(
self,
url: t.Optional[t.List[str]] = None,
Expand Down
Original file line number Diff line number Diff line change
@@ -1,16 +1,14 @@
import json
import shutil
import typing as t
from pathlib import Path

from antarest.core.model import JSON
from antarest.matrixstore.model import MatrixData
from antarest.study.storage.rawstudy.model.filesystem.config.binding_constraint import (
BindingConstraintFrequency,
BindingConstraintOperator,
)
from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfig
from antarest.study.storage.rawstudy.model.filesystem.factory import FileStudy
from antarest.study.storage.rawstudy.model.filesystem.matrix.input_series_matrix import InputSeriesMatrix
from antarest.study.storage.variantstudy.model.command.common import CommandName, CommandOutput
from antarest.study.storage.variantstudy.model.command.create_binding_constraint import (
DEFAULT_GROUP,
Expand All @@ -21,8 +19,6 @@
from antarest.study.storage.variantstudy.model.command.icommand import MATCH_SIGNATURE_SEPARATOR, ICommand
from antarest.study.storage.variantstudy.model.model import CommandDTO

MatrixType = t.List[t.List[MatrixData]]


def _update_matrices_names(
file_study: FileStudy,
Expand Down Expand Up @@ -65,41 +61,39 @@ def _update_matrices_names(
BindingConstraintOperator.LESS: "lt",
}

def get_matrix_path(constraint_folder: Path, matrix_id: str) -> Path:
raw_file = constraint_folder / f"{matrix_id}.txt"
return raw_file if raw_file.exists() else raw_file.with_suffix(".txt.link")

def get_target_path(existing_path: Path, constraint_folder: Path, matrix_id: str) -> Path:
new_path = constraint_folder / f"{matrix_id}{''.join(existing_path.suffixes)}"
new_path.unlink(missing_ok=True)
return new_path
def _get_matrix_url(node: InputSeriesMatrix, bc_term: str) -> t.List[str]:
matrix_path = node.config.path.parent / f"{bc_id}_{bc_term}{''.join(node.get_suffixes())}"
return list(matrix_path.relative_to(file_study.config.study_path).parts)

base_path = file_study.config.study_path / "input" / "bindingconstraints"
parent_folder_node = file_study.tree.get_node(["input", "bindingconstraints"])
error_msg = "Unhandled node type, expected InputSeriesMatrix, got "
if existing_operator != BindingConstraintOperator.BOTH and new_operator != BindingConstraintOperator.BOTH:
current_path = get_matrix_path(base_path, f"{bc_id}_{operator_matrices_map[existing_operator]}")
target_path = get_target_path(current_path, base_path, f"{bc_id}_{operator_matrices_map[new_operator]}")
current_path.rename(target_path)
current_node = parent_folder_node.get_node([f"{bc_id}_{operator_matrices_map[existing_operator]}"])
assert isinstance(current_node, InputSeriesMatrix), f"{error_msg}{type(current_node)}"
target_url = _get_matrix_url(current_node, operator_matrices_map[new_operator])
current_node.rename_file(target_url)
elif new_operator == BindingConstraintOperator.BOTH:
current_path = get_matrix_path(base_path, f"{bc_id}_{operator_matrices_map[existing_operator]}")
current_node = parent_folder_node.get_node([f"{bc_id}_{operator_matrices_map[existing_operator]}"])
assert isinstance(current_node, InputSeriesMatrix), f"{error_msg}{type(current_node)}"
if existing_operator == BindingConstraintOperator.EQUAL:
lt_path = get_target_path(current_path, base_path, f"{bc_id}_lt")
gt_path = get_target_path(current_path, base_path, f"{bc_id}_gt")
current_path.rename(lt_path)
shutil.copy(lt_path, gt_path)
lt_url = _get_matrix_url(current_node, "lt")
gt_url = _get_matrix_url(current_node, "gt")
current_node.copy_file(gt_url)
current_node.rename_file(lt_url)
else:
term = "lt" if existing_operator == BindingConstraintOperator.GREATER else "gt"
target_path = get_target_path(current_path, base_path, f"{bc_id}_{term}")
shutil.copy(current_path, target_path)
target_url = _get_matrix_url(current_node, term)
current_node.copy_file(target_url)
else:
if new_operator == BindingConstraintOperator.EQUAL:
get_matrix_path(base_path, f"{bc_id}_gt").unlink(missing_ok=True)
current_path = get_matrix_path(base_path, f"{bc_id}_lt")
target_path = get_target_path(current_path, base_path, f"{bc_id}_eq")
current_path.rename(target_path)
elif new_operator == BindingConstraintOperator.LESS:
get_matrix_path(base_path, f"{bc_id}_gt").unlink(missing_ok=True)
current_node = parent_folder_node.get_node([f"{bc_id}_lt"])
assert isinstance(current_node, InputSeriesMatrix), f"{error_msg}{type(current_node)}"
if new_operator == BindingConstraintOperator.GREATER:
current_node.delete()
else:
get_matrix_path(base_path, f"{bc_id}_lt").unlink(missing_ok=True)
parent_folder_node.get_node([f"{bc_id}_gt"]).delete()
if new_operator == BindingConstraintOperator.EQUAL:
eq_url = _get_matrix_url(current_node, "eq")
current_node.rename_file(eq_url)


class UpdateBindingConstraint(AbstractBindingConstraintCommand):
Expand Down
81 changes: 81 additions & 0 deletions tests/storage/repository/filesystem/test_lazy_node.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,12 @@
import itertools
import shutil
from pathlib import Path
from typing import List, Optional
from unittest.mock import Mock

import pytest

from antarest.core.exceptions import ChildNotFoundError
from antarest.study.storage.rawstudy.model.filesystem.config.model import FileStudyTreeConfig
from antarest.study.storage.rawstudy.model.filesystem.context import ContextServer
from antarest.study.storage.rawstudy.model.filesystem.lazy_node import LazyNode
Expand Down Expand Up @@ -138,3 +143,79 @@ def test_save_txt(tmp_path: Path):
assert file.read_text() == content
assert not link.exists()
resolver.resolve.assert_called_once_with(content)


class TestCopyAndRenameFile:
node: MockLazyNode
fake_node: MockLazyNode
target: List[str]
file: Path
link: Path
modified_file: Path
modified_link: Path
resolver: Mock

def _set_up(self, tmp_path: Path, target_is_link: bool) -> None:
self.file = tmp_path / "my-study" / "lazy.txt"
self.file.parent.mkdir()

self.link = self.file.parent / f"{self.file.name}.link"
self.link.write_text("Link: Mock File Content")

self.resolver = Mock()
self.resolver.resolve.return_value = None

config = FileStudyTreeConfig(study_path=self.file.parent, path=self.file, version=-1, study_id="")
context = ContextServer(matrix=Mock(), resolver=self.resolver)
self.node = MockLazyNode(context=context, config=config)

self.modified_file = self.file.parent / "lazy_modified.txt"
self.modified_link = self.file.parent / f"{self.modified_file.name}.link"
config2 = FileStudyTreeConfig(study_path=self.file.parent, path=self.modified_file, version=-1, study_id="")
self.fake_node = MockLazyNode(context=Mock(), config=config2)
target_path = self.modified_link if target_is_link else self.modified_file
self.target = list(target_path.relative_to(config.study_path).parts)

def _checks_behavior(self, rename: bool, target_is_link: bool):
# Asserts `_infer_path` fails if there's no file
with pytest.raises(ChildNotFoundError):
self.fake_node._infer_path()

# Checks `copy_file`, `rename_file` and `get_suffixes` methods
if target_is_link:
assert not self.modified_link.exists()
assert self.link.exists()
assert not self.file.exists()
assert not self.modified_file.exists()
assert self.node.get_suffixes() == [".txt", ".link"]

self.node.rename_file(self.target) if rename else self.node.copy_file(self.target)

assert not self.link.exists() if rename else self.link.exists()
assert self.modified_link.exists()
assert not self.file.exists()
assert not self.modified_file.exists()
assert self.modified_link.read_text() == "Link: Mock File Content"

else:
content = "No Link: Mock File Content"
self.node.save(content)
assert self.file.read_text() == content
assert not self.link.exists()
assert not self.modified_file.exists()
self.resolver.resolve.assert_called_once_with(content)
assert self.node.get_suffixes() == [".txt"]

self.node.rename_file(self.target) if rename else self.node.copy_file(self.target)

assert not self.link.exists()
assert not self.file.exists() if rename else self.file.exists()
assert self.modified_file.exists()
assert not self.modified_link.exists()
assert self.modified_file.read_text() == content

def test_copy_and_rename_file(self, tmp_path: Path):
for rename, target_is_link in itertools.product([True, False], repeat=2):
self._set_up(tmp_path, target_is_link)
self._checks_behavior(rename, target_is_link)
shutil.rmtree(tmp_path / "my-study")

0 comments on commit c261c4d

Please sign in to comment.