diff --git a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py index eae55fbe7f..6a123016ad 100644 --- a/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py +++ b/sdk/python/feast/infra/materialization/contrib/spark/spark_materialization_engine.py @@ -291,15 +291,16 @@ def write_to_online_store_in_batches(batch_df: pd.DataFrame): ) end_time = time.time() print( - f"INFO!!! Time taken to write batch {batch_id} is {int((end_time - start_time) * 1000)} milliseconds" + f"INFO!!! Processed batch {batch_id} in {int((end_time - start_time) * 1000)} milliseconds" ) + start_time = time.time() # Spark 3.3.0 or above supports toPandas() method. We are running on spark 3.2.2 pandas_dataframe = pd.DataFrame([row.asDict() for row in rows]) # TODO: For Pyspark applications, we should use py4j bridge to initialize loggers # Temporarily using print to display logs print( - f"INFO!!! Processing a partition with {pandas_dataframe.shape[0]} records and batch size {batch_size}" + f"INFO!!! Processing partition {pandas_dataframe.shape[0]} records, batch size {batch_size}" ) if "fs_batch" in pandas_dataframe.columns: @@ -308,3 +309,6 @@ def write_to_online_store_in_batches(batch_df: pd.DataFrame): ) pandas_dataframe["fs_batch"] = np.arange(len(pandas_dataframe)) // batch_size pandas_dataframe.groupby("fs_batch").apply(write_to_online_store_in_batches) + print( + f"INFO!!! Processed partition {pandas_dataframe.shape[0]} records, batch size {batch_size}, time {int((time.time() - start_time))} Seconds" + ) diff --git a/sdk/python/requirements/py3.10-ci-requirements.txt b/sdk/python/requirements/py3.10-ci-requirements.txt index 223e76b2e6..0ef7d35489 100644 --- a/sdk/python/requirements/py3.10-ci-requirements.txt +++ b/sdk/python/requirements/py3.10-ci-requirements.txt @@ -800,7 +800,7 @@ pyzmq==25.1.0 # ipykernel # jupyter-client # jupyter-server -redis==4.2.2 +redis==4.6.0 # via eg-feast (setup.py) referencing==0.30.0 # via diff --git a/sdk/python/requirements/py3.8-ci-requirements.txt b/sdk/python/requirements/py3.8-ci-requirements.txt index 0a1e7d74de..5a8ca76192 100644 --- a/sdk/python/requirements/py3.8-ci-requirements.txt +++ b/sdk/python/requirements/py3.8-ci-requirements.txt @@ -814,7 +814,7 @@ pyzmq==25.1.0 # jupyter-server # nbclassic # notebook -redis==4.2.2 +redis==4.6.0 # via feast (setup.py) regex==2023.5.5 # via feast (setup.py) diff --git a/sdk/python/requirements/py3.9-ci-requirements.txt b/sdk/python/requirements/py3.9-ci-requirements.txt index 31eb4496c6..1cc84a03c8 100644 --- a/sdk/python/requirements/py3.9-ci-requirements.txt +++ b/sdk/python/requirements/py3.9-ci-requirements.txt @@ -806,7 +806,7 @@ pyzmq==25.1.0 # ipykernel # jupyter-client # jupyter-server -redis==4.2.2 +redis==4.6.0 # via eg-feast (setup.py) referencing==0.30.0 # via diff --git a/setup.py b/setup.py index 3a0a2c4314..002c8b749e 100644 --- a/setup.py +++ b/setup.py @@ -97,7 +97,7 @@ ] REDIS_REQUIRED = [ - "redis==4.2.2", + "redis==4.6.0", "hiredis>=2.0.0,<3", ] @@ -148,10 +148,7 @@ ] -MILVUS_REQUIRED = [ - "pymilvus==2.3.0", - "bidict==0.22.1" -] +MILVUS_REQUIRED = ["pymilvus==2.3.0", "bidict==0.22.1"] ELASTICSEARCH_REQUIRED = [ "elasticsearch==8.8", @@ -240,7 +237,9 @@ # Add Support for parsing tags that have a prefix containing '/' (ie 'sdk/go') to setuptools_scm. # Regex modified from default tag regex in: # https://github.com/pypa/setuptools_scm/blob/2a1b46d38fb2b8aeac09853e660bcd0d7c1bc7be/src/setuptools_scm/config.py#L9 -TAG_REGEX = re.compile(r"^(?:[\/\w-]+)?(?P[vV]?\d+(?:\.\d+){0,2}[^\+]*)(?:\+.*)?$") +TAG_REGEX = re.compile( + r"^(?:[\/\w-]+)?(?P[vV]?\d+(?:\.\d+){0,2}[^\+]*)(?:\+.*)?$" +) # Only set use_scm_version if git executable exists (setting this variable causes pip to use git under the hood) if shutil.which("git"): @@ -354,7 +353,9 @@ def _ensure_go_and_proto_toolchain(): try: subprocess.check_call(["protoc-gen-go", "--version"], env={"PATH": path_val}) - subprocess.check_call(["protoc-gen-go-grpc", "--version"], env={"PATH": path_val}) + subprocess.check_call( + ["protoc-gen-go-grpc", "--version"], env={"PATH": path_val} + ) except Exception as e: raise RuntimeError("Unable to find go/grpc extensions for protoc") from e @@ -440,7 +441,10 @@ def finalize_options(self) -> None: self.extensions = [e for e in self.extensions if not self._is_go_ext(e)] def _is_go_ext(self, ext: Extension): - return any(source.endswith(".go") or source.startswith("github") for source in ext.sources) + return any( + source.endswith(".go") or source.startswith("github") + for source in ext.sources + ) def build_extension(self, ext: Extension): print(f"Building extension {ext}")