From 5853f6dee04c465f9e2be0ed05d8c778dc997463 Mon Sep 17 00:00:00 2001 From: Heiko Klein Date: Mon, 6 May 2024 20:24:43 +0000 Subject: [PATCH 1/2] reduce memory consumption of cams283 obsreader --- pyaerocom/io/cams2_83/obs.py | 6 +++++- pyaerocom/io/cams2_83/read_obs.py | 36 +++++++++++++++++++++---------- pyaerocom/ungriddeddata.py | 2 +- 3 files changed, 31 insertions(+), 13 deletions(-) diff --git a/pyaerocom/io/cams2_83/obs.py b/pyaerocom/io/cams2_83/obs.py index 2cba3a059..e617fc1b8 100644 --- a/pyaerocom/io/cams2_83/obs.py +++ b/pyaerocom/io/cams2_83/obs.py @@ -52,7 +52,9 @@ def poll_names(df: pd.DataFrame) -> pd.DataFrame: return df.assign(poll=poll) -def read_csv(path: str | Path, *, domain: Domain = CAMS2_50_DOMAIN) -> pd.DataFrame: +def read_csv( + path: str | Path, *, domain: Domain = CAMS2_50_DOMAIN, polls: list[str] = None +) -> pd.DataFrame: df = pd.read_csv( path, sep=";", @@ -61,6 +63,8 @@ def read_csv(path: str | Path, *, domain: Domain = CAMS2_50_DOMAIN) -> pd.DataFr usecols=lambda x: x != "_", ) df = df.pipe(add_time).pipe(conc_units).pipe(poll_names) + if polls is not None: + df = df[df.poll.isin(polls)] if not in_domain(df, domain=domain).all(): logger.warning("found obs outside the model domain") df = df[in_domain(df, domain=domain)] diff --git a/pyaerocom/io/cams2_83/read_obs.py b/pyaerocom/io/cams2_83/read_obs.py index 7e5d66dd2..fd18a8393 100644 --- a/pyaerocom/io/cams2_83/read_obs.py +++ b/pyaerocom/io/cams2_83/read_obs.py @@ -68,6 +68,17 @@ def read( first_file: int | None = None, last_file: int | None = None, ) -> UngriddedData: + """Read observations as ungridded + + :param vars_to_retrieve: pyaerocom-variables to read, defaults to None + None meaning all default variables + :param files: files to read, defaults to None + None meaning all known files in the date-range + :param first_file: first file to process from from files, defaults to None=0 + :param last_file: last file to process from files, defaults to None=-1 + :raises TypeError: wrong input type + :return: ungridded data object + """ if vars_to_retrieve is None: vars_to_retrieve = self.DEFAULT_VARS if isinstance(vars_to_retrieve, str): @@ -89,12 +100,11 @@ def read( files = files[:last_file] start = time.time() - data = list(self.__reader(vars_to_retrieve, files)) - end = time.time() - print(end - start) - - ungriddeddata = UngriddedData.from_station_data(data) - print(time.time() - end, (time.time() - end) / 60.0) + logger.info(f"Start read obs") + # lazy data_iterator returns immediately, unpacked in from_station_data + data_iterator = self.__reader(vars_to_retrieve, files) + ungriddeddata = UngriddedData.from_station_data(data_iterator) + logger.info(f"Time needed to convert obs to ungridded: {time.time() - end}s") return ungriddeddata def read_file(self, filename, vars_to_retrieve=None): @@ -102,8 +112,12 @@ def read_file(self, filename, vars_to_retrieve=None): @classmethod def __reader(cls, vars_to_retrieve: list[str], files: list[str | Path]) -> Iterator[dict]: - logger.debug(f"reading {cls.DATA_ID} {vars_to_retrieve=} from {files=}") - data = pd.concat(read_csv(path) for path in files).drop_duplicates( + logger.info(f"reading {cls.DATA_ID} {vars_to_retrieve=}") + logger.debug(f"reading from {files=}") + reverse_aerocom = {v: k for k, v in AEROCOM_NAMES.items()} + polls = [reverse_aerocom[v] for v in vars_to_retrieve] + + data = pd.concat(read_csv(path, polls=polls) for path in files).drop_duplicates( subset=["station", "poll", "time"] ) df: pd.DataFrame @@ -115,8 +129,8 @@ def __reader(cls, vars_to_retrieve: list[str], files: list[str | Path]) -> Itera latitude=df["lat"].iloc[0], longitude=df["lon"].iloc[0], altitude=df["alt"].iloc[0], - variables=cls.DEFAULT_VARS, - var_info=dict.fromkeys(cls.DEFAULT_VARS, dict(units="ug m-3")), + variables=vars_to_retrieve, + var_info=dict.fromkeys(vars_to_retrieve, dict(units="ug m-3")), data_id=cls.DATA_ID, ts_type=cls.TS_TYPE, ) @@ -128,6 +142,6 @@ def __reader(cls, vars_to_retrieve: list[str], files: list[str | Path]) -> Itera for poll in missing: df[poll] = np.nan df = df.rename(AEROCOM_NAMES, axis="columns") - for poll in cls.DEFAULT_VARS: + for poll in vars_to_retrieve: output[poll] = df[poll] yield output diff --git a/pyaerocom/ungriddeddata.py b/pyaerocom/ungriddeddata.py index 5f3195978..0f848fedc 100644 --- a/pyaerocom/ungriddeddata.py +++ b/pyaerocom/ungriddeddata.py @@ -247,7 +247,7 @@ def from_station_data(stats, add_meta_keys=None): Parameters ---------- - stats : list or StationData + stats : iterator or StationData input data object(s) add_meta_keys : list, optional list of metadata keys that are supposed to be imported from the From dcc6532872b42f79eb6013bbbdef2a99e57702a5 Mon Sep 17 00:00:00 2001 From: Heiko Klein Date: Mon, 6 May 2024 20:35:54 +0000 Subject: [PATCH 2/2] fix log-message --- pyaerocom/io/cams2_83/read_obs.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pyaerocom/io/cams2_83/read_obs.py b/pyaerocom/io/cams2_83/read_obs.py index fd18a8393..97bf92913 100644 --- a/pyaerocom/io/cams2_83/read_obs.py +++ b/pyaerocom/io/cams2_83/read_obs.py @@ -104,7 +104,7 @@ def read( # lazy data_iterator returns immediately, unpacked in from_station_data data_iterator = self.__reader(vars_to_retrieve, files) ungriddeddata = UngriddedData.from_station_data(data_iterator) - logger.info(f"Time needed to convert obs to ungridded: {time.time() - end}s") + logger.info(f"Time needed to convert obs to ungridded: {time.time() - start}s") return ungriddeddata def read_file(self, filename, vars_to_retrieve=None):