Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

bugfix(matrix): correct the loading and saving of empty matrices as TSV files #1746

Merged
merged 3 commits into from
Sep 28, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
29 changes: 17 additions & 12 deletions antarest/matrixstore/repository.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
import hashlib
import logging
import typing as t
from pathlib import Path
from typing import List, Optional, Union

import numpy as np
from filelock import FileLock
Expand Down Expand Up @@ -31,19 +31,19 @@ def save(self, matrix_user_metadata: MatrixDataSet) -> MatrixDataSet:
logger.debug(f"Matrix dataset {matrix_user_metadata.id} for user {matrix_user_metadata.owner_id} saved")
return matrix_user_metadata

def get(self, id: str) -> Optional[MatrixDataSet]:
def get(self, id: str) -> t.Optional[MatrixDataSet]:
matrix: MatrixDataSet = db.session.query(MatrixDataSet).get(id)
return matrix

def get_all_datasets(self) -> List[MatrixDataSet]:
matrix_datasets: List[MatrixDataSet] = db.session.query(MatrixDataSet).all()
def get_all_datasets(self) -> t.List[MatrixDataSet]:
matrix_datasets: t.List[MatrixDataSet] = db.session.query(MatrixDataSet).all()
return matrix_datasets

def query(
self,
name: Optional[str],
owner: Optional[int] = None,
) -> List[MatrixDataSet]:
name: t.Optional[str],
owner: t.Optional[int] = None,
) -> t.List[MatrixDataSet]:
"""
Query a list of MatrixUserMetadata by searching for each one separately if a set of filter match

Expand All @@ -59,7 +59,7 @@ def query(
query = query.filter(MatrixDataSet.name.ilike(f"%{name}%")) # type: ignore
if owner is not None:
query = query.filter(MatrixDataSet.owner_id == owner)
datasets: List[MatrixDataSet] = query.distinct().all()
datasets: t.List[MatrixDataSet] = query.distinct().all()
return datasets

def delete(self, dataset_id: str) -> None:
Expand All @@ -83,7 +83,7 @@ def save(self, matrix: Matrix) -> Matrix:
logger.debug(f"Matrix {matrix.id} saved")
return matrix

def get(self, matrix_hash: str) -> Optional[Matrix]:
def get(self, matrix_hash: str) -> t.Optional[Matrix]:
matrix: Matrix = db.session.query(Matrix).get(matrix_hash)
return matrix

Expand Down Expand Up @@ -130,6 +130,7 @@ def get(self, matrix_hash: str) -> MatrixContent:

matrix_file = self.bucket_dir.joinpath(f"{matrix_hash}.tsv")
matrix = np.loadtxt(matrix_file, delimiter="\t", dtype=np.float64, ndmin=2)
matrix = matrix.reshape((1, 0)) if matrix.size == 0 else matrix
data = matrix.tolist()
index = list(range(matrix.shape[0]))
columns = list(range(matrix.shape[1]))
Expand All @@ -148,7 +149,7 @@ def exists(self, matrix_hash: str) -> bool:
matrix_file = self.bucket_dir.joinpath(f"{matrix_hash}.tsv")
return matrix_file.exists()

def save(self, content: Union[List[List[MatrixData]], npt.NDArray[np.float64]]) -> str:
def save(self, content: t.Union[t.List[t.List[MatrixData]], npt.NDArray[np.float64]]) -> str:
"""
Saves the content of a matrix as a TSV file in the bucket directory
and returns its SHA256 hash.
Expand Down Expand Up @@ -188,8 +189,12 @@ def save(self, content: Union[List[List[MatrixData]], npt.NDArray[np.float64]])
# Ensure exclusive access to the matrix file between multiple processes (or threads).
lock_file = matrix_file.with_suffix(".tsv.lock")
with FileLock(lock_file, timeout=15):
# noinspection PyTypeChecker
np.savetxt(matrix_file, matrix, delimiter="\t", fmt="%.18f")
if matrix.size == 0:
# If the array or dataframe is empty, create an empty file instead of
# traditional saving to avoid unwanted line breaks.
open(matrix_file, mode="wb").close()
else:
np.savetxt(matrix_file, matrix, delimiter="\t", fmt="%.18f")

# IMPORTANT: Deleting the lock file under Linux can make locking unreliable.
# See https://github.com/tox-dev/py-filelock/issues/31
Expand Down
87 changes: 63 additions & 24 deletions antarest/matrixstore/service.py
Original file line number Diff line number Diff line change
@@ -1,12 +1,13 @@
import contextlib
import io
import json
import logging
import tempfile
import zipfile
from abc import ABC, abstractmethod
from datetime import datetime, timezone
from io import BytesIO
from pathlib import Path
from typing import List, Optional, Sequence, Tuple, Union
from zipfile import ZipFile

import numpy as np
from fastapi import UploadFile
Expand Down Expand Up @@ -36,6 +37,18 @@
)
from antarest.matrixstore.repository import MatrixContentRepository, MatrixDataSetRepository, MatrixRepository

# List of files to exclude from ZIP archives
EXCLUDED_FILES = {
"__MACOSX",
".DS_Store",
"._.DS_Store",
"Thumbs.db",
"desktop.ini",
"$RECYCLE.BIN",
"System Volume Information",
"RECYCLER",
}

logger = logging.getLogger(__name__)


Expand Down Expand Up @@ -150,29 +163,42 @@ def create(self, data: Union[List[List[MatrixData]], npt.NDArray[np.float64]]) -
self.repo.save(matrix)
return matrix_id

def create_by_importation(self, file: UploadFile, json: bool = False) -> List[MatrixInfoDTO]:
def create_by_importation(self, file: UploadFile, is_json: bool = False) -> List[MatrixInfoDTO]:
"""
Imports a matrix from a TSV or JSON file or a collection of matrices from a ZIP file.

TSV-formatted files are expected to contain only matrix data without any header.

JSON-formatted files are expected to contain the following attributes:

- `index`: The list of row labels.
- `columns`: The list of column labels.
- `data`: The matrix data as a nested list of floats.

Args:
file: The file to import (TSV, JSON or ZIP).
is_json: Flag indicating if the file is JSON-encoded.

Returns:
A list of `MatrixInfoDTO` objects containing the SHA256 hash of the imported matrices.
"""
with file.file as f:
if file.content_type == "application/zip":
input_zip = ZipFile(BytesIO(f.read()))
files = {
info.filename: input_zip.read(info.filename) for info in input_zip.infolist() if not info.is_dir()
}
with contextlib.closing(f):
buffer = io.BytesIO(f.read())
matrix_info: List[MatrixInfoDTO] = []
for name in files:
if all(
[
not name.startswith("__MACOSX/"),
not name.startswith(".DS_Store"),
]
):
matrix_id = self._file_importation(files[name], json)
matrix_info.append(MatrixInfoDTO(id=matrix_id, name=name))
with zipfile.ZipFile(buffer) as zf:
for info in zf.infolist():
if info.is_dir() or info.filename in EXCLUDED_FILES:
continue
matrix_id = self._file_importation(zf.read(info.filename), is_json=is_json)
matrix_info.append(MatrixInfoDTO(id=matrix_id, name=info.filename))
return matrix_info
else:
matrix_id = self._file_importation(f.read(), json)
matrix_id = self._file_importation(f.read(), is_json=is_json)
return [MatrixInfoDTO(id=matrix_id, name=file.filename)]

def _file_importation(self, file: bytes, is_json: bool = False) -> str:
def _file_importation(self, file: bytes, *, is_json: bool = False) -> str:
"""
Imports a matrix from a TSV or JSON file in bytes format.

Expand All @@ -184,9 +210,12 @@ def _file_importation(self, file: bytes, is_json: bool = False) -> str:
A SHA256 hash that identifies the imported matrix.
"""
if is_json:
return self.create(MatrixContent.parse_raw(file).data)
obj = json.loads(file)
content = MatrixContent(**obj)
return self.create(content.data)
# noinspection PyTypeChecker
matrix = np.loadtxt(BytesIO(file), delimiter="\t", dtype=np.float64, ndmin=2)
matrix = np.loadtxt(io.BytesIO(file), delimiter="\t", dtype=np.float64, ndmin=2)
matrix = matrix.reshape((1, 0)) if matrix.size == 0 else matrix
return self.create(matrix)

def get_dataset(
Expand Down Expand Up @@ -380,8 +409,13 @@ def create_matrix_files(self, matrix_ids: Sequence[str], export_path: Path) -> s
name = f"matrix-{mtx.id}.txt"
filepath = f"{tmpdir}/{name}"
array = np.array(mtx.data, dtype=np.float64)
# noinspection PyTypeChecker
np.savetxt(filepath, array, delimiter="\t", fmt="%.18f")
if array.size == 0:
# If the array or dataframe is empty, create an empty file instead of
# traditional saving to avoid unwanted line breaks.
open(filepath, mode="wb").close()
else:
# noinspection PyTypeChecker
np.savetxt(filepath, array, delimiter="\t", fmt="%.18f")
zip_dir(Path(tmpdir), export_path)
stopwatch.log_elapsed(lambda x: logger.info(f"Matrix dataset exported (zipped mode) in {x}s"))
return str(export_path)
Expand Down Expand Up @@ -467,5 +501,10 @@ def download_matrix(
raise UserHasNotPermissionError()
if matrix := self.get(matrix_id):
array = np.array(matrix.data, dtype=np.float64)
# noinspection PyTypeChecker
np.savetxt(filepath, array, delimiter="\t", fmt="%.18f")
if array.size == 0:
# If the array or dataframe is empty, create an empty file instead of
# traditional saving to avoid unwanted line breaks.
open(filepath, mode="wb").close()
else:
# noinspection PyTypeChecker
np.savetxt(filepath, array, delimiter="\t", fmt="%.18f")
24 changes: 11 additions & 13 deletions antarest/matrixstore/uri_resolver_service.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ class UriResolverService:
def __init__(self, matrix_service: ISimpleMatrixService):
self.matrix_service = matrix_service

def resolve(self, uri: str, formatted: bool = True) -> Optional[SUB_JSON]:
def resolve(self, uri: str, formatted: bool = True) -> SUB_JSON:
res = UriResolverService._extract_uri_components(uri)
if res:
protocol, uuid = res
Expand Down Expand Up @@ -52,19 +52,17 @@ def _resolve_matrix(self, id: str, formatted: bool = True) -> SUB_JSON:
index=data.index,
columns=data.columns,
)
if not df.empty:
return (
df.to_csv(
None,
sep="\t",
header=False,
index=False,
float_format="%.6f",
)
or ""
)
else:
if df.empty:
return ""
else:
csv = df.to_csv(
None,
sep="\t",
header=False,
index=False,
float_format="%.6f",
)
return csv or ""
raise ValueError(f"id matrix {id} not found")

def build_matrix_uri(self, id: str) -> str:
Expand Down
2 changes: 1 addition & 1 deletion antarest/matrixstore/web.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ def create_by_importation(
) -> Any:
logger.info("Importing new matrix dataset", extra={"user": current_user.id})
if current_user.id is not None:
return service.create_by_importation(file, json)
return service.create_by_importation(file, is_json=json)
raise UserHasNotPermissionError()

@bp.get("/matrix/{id}", tags=[APITag.matrix], response_model=MatrixDTO)
Expand Down
1 change: 1 addition & 0 deletions antarest/study/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -1378,6 +1378,7 @@ def _create_edit_study_command(
if isinstance(data, bytes):
# noinspection PyTypeChecker
matrix = np.loadtxt(io.BytesIO(data), delimiter="\t", dtype=np.float64, ndmin=2)
matrix = matrix.reshape((1, 0)) if matrix.size == 0 else matrix
return ReplaceMatrix(
target=url,
matrix=matrix.tolist(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,16 @@ def _dump_json(self, data: JSON) -> None:
matrix = pd.concat([time, matrix], axis=1)

head = self.head_writer.build(var=df.columns.size, end=df.index.size)
self.config.path.write_text(head)

matrix.to_csv(
open(self.config.path, "a", newline="\n"),
sep="\t",
index=False,
header=False,
line_terminator="\n",
)
with self.config.path.open(mode="w", newline="\n") as fd:
fd.write(head)
if not matrix.empty:
matrix.to_csv(
fd,
sep="\t",
header=False,
index=False,
float_format="%.6f",
)

def check_errors(
self,
Expand Down
1 change: 1 addition & 0 deletions antarest/tools/lib.py
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,7 @@ def apply_commands(
matrix_dataset: List[str] = []
for matrix_file in matrices_dir.iterdir():
matrix = np.loadtxt(matrix_file, delimiter="\t", dtype=np.float64, ndmin=2)
matrix = matrix.reshape((1, 0)) if matrix.size == 0 else matrix
matrix_data = matrix.tolist()
res = self.session.post(self.build_url("/v1/matrix"), json=matrix_data)
res.raise_for_status()
Expand Down
Loading