Skip to content

Commit

Permalink
add arrow support in put raw
Browse files Browse the repository at this point in the history
  • Loading branch information
MartinBelthle committed Jul 9, 2024
1 parent 255a61f commit a840085
Show file tree
Hide file tree
Showing 4 changed files with 34 additions and 78 deletions.
14 changes: 11 additions & 3 deletions antarest/study/service.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@

import numpy as np
import pandas as pd
import pyarrow as pa
import pyarrow.feather as feather
from fastapi import HTTPException, UploadFile
from markupsafe import escape
from starlette.responses import FileResponse, Response
Expand Down Expand Up @@ -1440,9 +1442,15 @@ def _create_edit_study_command(
)
elif isinstance(tree_node, InputSeriesMatrix):
if isinstance(data, bytes):
# noinspection PyTypeChecker
matrix = np.loadtxt(io.BytesIO(data), delimiter="\t", dtype=np.float64, ndmin=2)
matrix = matrix.reshape((1, 0)) if matrix.size == 0 else matrix
# checks if it corresponds to arrow format or if it's a classic file.
if data[:5].decode("utf-8") == "ARROW":
buffer = pa.BufferReader(data) # type: ignore
table = feather.read_table(buffer)
df = table.to_pandas()
matrix = df.to_numpy()
else:
matrix = np.loadtxt(io.BytesIO(data), delimiter="\t", dtype=np.float64, ndmin=2)
matrix = matrix.reshape((1, 0)) if matrix.size == 0 else matrix
return ReplaceMatrix(
target=url,
matrix=matrix.tolist(),
Expand Down
2 changes: 1 addition & 1 deletion antarest/study/web/raw_studies_blueprint.py
Original file line number Diff line number Diff line change
Expand Up @@ -385,7 +385,7 @@ def replace_study_file(
Parameters:
- `uuid`: The UUID of the study.
- `path`: The path to the data to update. Defaults to "/".
- `file`: The raw file to be posted (e.g. a CSV file opened in binary mode).
- `file`: The raw file to be posted (e.g. a CSV file opened in binary mode or a matrix in arrow format).
- `create_missing`: Flag to indicate whether to create file or parent directories if missing.
"""
logger.info(
Expand Down
1 change: 1 addition & 0 deletions requirements-dev.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ pyinstaller-hooks-contrib==2024.6
# of the corresponding implementation libraries used in production (in `requirements.txt`).

pandas-stubs~=1.4.0
pyarrow-stubs~=10.0.1.7
types-psycopg2~=2.9.4
types-redis~=4.1.2
types-requests~=2.27.1
Expand Down
95 changes: 21 additions & 74 deletions tests/integration/raw_studies_blueprint/test_fetch_raw_data.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ def test_get_study(
with db():
study: RawStudy = db.session.get(Study, internal_study_id)
study_dir = pathlib.Path(study.path)
headers = {"Authorization": f"Bearer {user_access_token}"}
client.headers = {"Authorization": f"Bearer {user_access_token}"}

shutil.copytree(
ASSETS_DIR.joinpath("user"),
Expand All @@ -58,11 +58,7 @@ def test_get_study(
user_folder_dir = study_dir.joinpath("user/folder")
for file_path in user_folder_dir.glob("*.*"):
rel_path = file_path.relative_to(study_dir).as_posix()
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": rel_path, "depth": 1},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": rel_path, "depth": 1})
assert res.status_code == 200, res.json()
if file_path.suffix == ".json":
# special case for JSON files
Expand All @@ -85,21 +81,15 @@ def test_get_study(
for file_path in user_folder_dir.glob("*.*"):
rel_path = file_path.relative_to(study_dir)
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": f"/{rel_path.as_posix()}", "depth": 1},
headers=headers,
f"/v1/studies/{internal_study_id}/raw", params={"path": f"/{rel_path.as_posix()}", "depth": 1}
)
assert res.status_code == 200, res.json()
actual = res.content
expected = file_path.read_bytes()
assert actual == expected

# If you try to retrieve a file that doesn't exist, we should have a 404 error
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": "user/somewhere/something.txt"},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": "user/somewhere/something.txt"})
assert res.status_code == 404, res.json()
assert res.json() == {
"description": "'somewhere' not a child of User",
Expand All @@ -111,7 +101,6 @@ def test_get_study(
res = client.put(
f"/v1/studies/{internal_study_id}/raw",
params={"path": "user/somewhere/something.txt"},
headers=headers,
files={"file": io.BytesIO(b"Goodbye World!")},
)
assert res.status_code == 404, res.json()
Expand All @@ -125,7 +114,6 @@ def test_get_study(
res = client.put(
f"/v1/studies/{internal_study_id}/raw",
params={"path": "user/somewhere/something.txt", "create_missing": True},
headers=headers,
files={"file": io.BytesIO(b"Goodbye Cruel World!")},
)
assert res.status_code == 204, res.json()
Expand All @@ -135,27 +123,18 @@ def test_get_study(
res = client.put(
f"/v1/studies/{internal_study_id}/raw",
params={"path": "user/somewhere/something.txt", "create_missing": True},
headers=headers,
files={"file": io.BytesIO(b"This is the end!")},
)
assert res.status_code == 204, res.json()

# You can check that the resource has been created or updated.
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": "user/somewhere/something.txt"},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": "user/somewhere/something.txt"})
assert res.status_code == 200, res.json()
assert res.content == b"This is the end!"

# If we ask for properties, we should have a JSON content
rel_path = "/input/links/de/properties/fr"
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": rel_path, "depth": 2},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": rel_path, "depth": 2})
assert res.status_code == 200, res.json()
actual = res.json()
assert actual == {
Expand All @@ -177,69 +156,46 @@ def test_get_study(
# If we ask for a matrix, we should have a JSON content if formatted is True
rel_path = "/input/links/de/fr"
expected_row = [100000, 100000, 0.01, 0.01, 0, 0, 0, 0]
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": rel_path, "formatted": True},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": rel_path, "formatted": True})
assert res.status_code == 200, res.json()
old_result = res.json()
assert old_result == {"index": ANY, "columns": ANY, "data": ANY}
assert old_result["data"][0] == expected_row

# We should have the same result with new flag 'format' set to 'JSON'
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": rel_path, "format": "json"},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": rel_path, "format": "json"})
assert res.status_code == 200, res.json()
new_result = res.json()
assert new_result == old_result

# If we ask for a matrix, we should have a CSV content if formatted is False
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": rel_path, "formatted": False},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": rel_path, "formatted": False})
assert res.status_code == 200, res.json()
old_result = res.text
actual_lines = old_result.splitlines()
first_row = [float(x) for x in actual_lines[0].split("\t")]
assert first_row == expected_row

# We should have the same result with new flag 'format' set to 'bytes'
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": rel_path, "format": "bytes"},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": rel_path, "format": "bytes"})
assert res.status_code == 200, res.json()
new_result = res.text
assert new_result == old_result

# If we ask for a matrix, we should have arrow binary if format = "arrow"
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": rel_path, "format": "arrow"},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": rel_path, "format": "arrow"})
assert res.status_code == 200
assert isinstance(res.content, bytes)
assert res.text.startswith("ARROW")
buffer = pa.BufferReader(res.content)
arrow_bytes = res.content
buffer = pa.BufferReader(arrow_bytes)
table = feather.read_table(buffer)
df = table.to_pandas()
assert list(df.loc[0]) == expected_row

# Asserts output matrix (containing index and columns) can be retrieved with arrow
output_path = "/output/20201014-1422eco-hello/economy/mc-all/areas/de/id-daily"
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": output_path, "format": "arrow"},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": output_path, "format": "arrow"})
assert res.status_code == 200
assert isinstance(res.content, bytes)
assert res.text.startswith("ARROW")
Expand All @@ -248,11 +204,14 @@ def test_get_study(
df = table.to_pandas()
assert df.columns[0] == "Index" # asserts the first columns corresponds to the index in such a case.

# Try to replace a matrix with a one in arrow format
res = client.put(f"/v1/studies/{internal_study_id}/raw", params={"path": rel_path}, files={"file": arrow_bytes})
assert res.status_code in {201, 204}

# If ask for an empty matrix, we should have an empty binary content
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": "input/thermal/prepro/de/01_solar/data", "formatted": False},
headers=headers,
)
assert res.status_code == 200, res.json()
assert res.content == b""
Expand All @@ -261,7 +220,6 @@ def test_get_study(
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": "input/thermal/prepro/de/01_solar/data", "formatted": True},
headers=headers,
)
assert res.status_code == 200, res.json()
assert res.json() == {"index": [0], "columns": [], "data": []}
Expand All @@ -271,36 +229,25 @@ def test_get_study(
for file_path in user_folder_dir.glob("*.*"):
rel_path = file_path.relative_to(study_dir)
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": f"/{rel_path.as_posix()}", "depth": 1},
headers=headers,
f"/v1/studies/{internal_study_id}/raw", params={"path": f"/{rel_path.as_posix()}", "depth": 1}
)
assert res.status_code == http.HTTPStatus.UNPROCESSABLE_ENTITY

# We can access to the configuration the classic way,
# for instance, we can get the list of areas:
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": "/input/areas/list", "depth": 1},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": "/input/areas/list", "depth": 1})
assert res.status_code == 200, res.json()
assert res.json() == ["DE", "ES", "FR", "IT"]

# asserts that the GET /raw endpoint is able to read matrix containing NaN values
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": "output/20201014-1427eco/economy/mc-all/areas/de/id-monthly"},
headers=headers,
)
assert res.status_code == 200
assert np.isnan(res.json()["data"][0]).any()

# Iterate over all possible combinations of path and depth
for path, depth in itertools.product([None, "", "/"], [0, 1, 2]):
res = client.get(
f"/v1/studies/{internal_study_id}/raw",
params={"path": path, "depth": depth},
headers=headers,
)
res = client.get(f"/v1/studies/{internal_study_id}/raw", params={"path": path, "depth": depth})
assert res.status_code == 200, f"Error for path={path} and depth={depth}"

0 comments on commit a840085

Please sign in to comment.