Skip to content

Commit

Permalink
Make _has_response realization-specific for combined
Browse files Browse the repository at this point in the history
  • Loading branch information
yngve-sk committed Oct 25, 2024
1 parent 55fa828 commit dee5e51
Showing 1 changed file with 39 additions and 20 deletions.
59 changes: 39 additions & 20 deletions src/ert/storage/local_ensemble.py
Original file line number Diff line number Diff line change
Expand Up @@ -299,21 +299,39 @@ def _responses_exist_for_realization(

if not self.experiment.response_configuration:
return True
path = self._realization_dir(realization)

responses_for_all_reals: Dict[str, polars.DataFrame] = {}

_response_types = (
[self.experiment.response_key_to_response_type.get(key, key)]
if key
else list(self.experiment.response_configuration)
)

for response_type in _response_types:
responses_for_all_reals[response_type] = self.load_responses(
response_type, tuple(range(self.ensemble_size))
)

# Note: potential performance bottleneck,
# should be improved greatly by having statemap.
# Should also be faster due to lru cache on load_responses
def _has_response(_response_type: str) -> bool:
if (path / f"{_response_type}.parquet").exists():
return True
# This currently also relies on LRU cache on load_responses
# If it is removed, we should try to hold these datasets
# especially when we loop over all reals and invoke this function
if key and key not in responses_for_all_reals:

if (self.mount_point / f"{_response_type}.parquet").exists():
def _has_response(_response_type: str) -> bool:
return (
realization
in self.load_responses(
_response_type, tuple(range(self.ensemble_size))
)["realization"]
in responses_for_all_reals[_response_type]["realization"]
and key in responses_for_all_reals[_response_type]["response_key"]
)
else:

def _has_response(_response_type: str) -> bool:
return (
realization
in responses_for_all_reals[_response_type]["realization"]
)

if key:
Expand Down Expand Up @@ -970,25 +988,26 @@ def combine_responses(self, response_type: Optional[str] = None) -> None:
for _response_type in self.experiment.response_configuration:
response_file = f"{response_type}.parquet"

reals_with_response = self.get_realization_list_with_responses(
response_type
)
reals_with_new_response = [
real
for real in range(self.ensemble_size)
if (self._realization_dir(real) / response_file).exists()
]

if not os.path.exists(self.mount_point / response_file):
combined = self.load_responses(
response_type, tuple(reals_with_response)
response_type, tuple(reals_with_new_response)
)
else:
# Some files may be newer
combined = polars.read_parquet(self.mount_point / response_file)

to_write = []
for real in range(self.ensemble_size):
if (self._realization_dir(real) / response_file).exists():
response_df = polars.read_parquet(
self._realization_dir(real) / response_file
)
to_write.append(response_df)
for real in reals_with_new_response:
response_df = polars.read_parquet(
self._realization_dir(real) / response_file
)
to_write.append(response_df)

if len(to_write) == 0:
continue
Expand All @@ -1008,6 +1027,6 @@ def combine_responses(self, response_type: Optional[str] = None) -> None:

combined.write_parquet(self.mount_point / response_file)

for real in reals_with_response:
for real in reals_with_new_response:
if os.path.exists(self._realization_dir(real) / response_file):
os.remove(self._realization_dir(real) / response_file)

0 comments on commit dee5e51

Please sign in to comment.