Skip to content

Commit

Permalink
fix: Redis version upgrade (#98)
Browse files Browse the repository at this point in the history
* fix: Redis version upgrade to 4.6.0 which helps materialization during cluster autoscaling 

---------

Co-authored-by: Bhargav Dodla <[email protected]>
  • Loading branch information
EXPEbdodla and Bhargav Dodla authored Mar 26, 2024
1 parent ce7df35 commit ac0e571
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -291,15 +291,16 @@ def write_to_online_store_in_batches(batch_df: pd.DataFrame):
)
end_time = time.time()
print(
f"INFO!!! Time taken to write batch {batch_id} is {int((end_time - start_time) * 1000)} milliseconds"
f"INFO!!! Processed batch {batch_id} in {int((end_time - start_time) * 1000)} milliseconds"
)

start_time = time.time()
# Spark 3.3.0 or above supports toPandas() method. We are running on spark 3.2.2
pandas_dataframe = pd.DataFrame([row.asDict() for row in rows])
# TODO: For Pyspark applications, we should use py4j bridge to initialize loggers
# Temporarily using print to display logs
print(
f"INFO!!! Processing a partition with {pandas_dataframe.shape[0]} records and batch size {batch_size}"
f"INFO!!! Processing partition {pandas_dataframe.shape[0]} records, batch size {batch_size}"
)

if "fs_batch" in pandas_dataframe.columns:
Expand All @@ -308,3 +309,6 @@ def write_to_online_store_in_batches(batch_df: pd.DataFrame):
)
pandas_dataframe["fs_batch"] = np.arange(len(pandas_dataframe)) // batch_size
pandas_dataframe.groupby("fs_batch").apply(write_to_online_store_in_batches)
print(
f"INFO!!! Processed partition {pandas_dataframe.shape[0]} records, batch size {batch_size}, time {int((time.time() - start_time))} Seconds"
)
2 changes: 1 addition & 1 deletion sdk/python/requirements/py3.10-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -800,7 +800,7 @@ pyzmq==25.1.0
# ipykernel
# jupyter-client
# jupyter-server
redis==4.2.2
redis==4.6.0
# via eg-feast (setup.py)
referencing==0.30.0
# via
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/requirements/py3.8-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -814,7 +814,7 @@ pyzmq==25.1.0
# jupyter-server
# nbclassic
# notebook
redis==4.2.2
redis==4.6.0
# via feast (setup.py)
regex==2023.5.5
# via feast (setup.py)
Expand Down
2 changes: 1 addition & 1 deletion sdk/python/requirements/py3.9-ci-requirements.txt
Original file line number Diff line number Diff line change
Expand Up @@ -806,7 +806,7 @@ pyzmq==25.1.0
# ipykernel
# jupyter-client
# jupyter-server
redis==4.2.2
redis==4.6.0
# via eg-feast (setup.py)
referencing==0.30.0
# via
Expand Down
20 changes: 12 additions & 8 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -97,7 +97,7 @@
]

REDIS_REQUIRED = [
"redis==4.2.2",
"redis==4.6.0",
"hiredis>=2.0.0,<3",
]

Expand Down Expand Up @@ -148,10 +148,7 @@
]


MILVUS_REQUIRED = [
"pymilvus==2.3.0",
"bidict==0.22.1"
]
MILVUS_REQUIRED = ["pymilvus==2.3.0", "bidict==0.22.1"]

ELASTICSEARCH_REQUIRED = [
"elasticsearch==8.8",
Expand Down Expand Up @@ -240,7 +237,9 @@
# Add Support for parsing tags that have a prefix containing '/' (ie 'sdk/go') to setuptools_scm.
# Regex modified from default tag regex in:
# https://github.com/pypa/setuptools_scm/blob/2a1b46d38fb2b8aeac09853e660bcd0d7c1bc7be/src/setuptools_scm/config.py#L9
TAG_REGEX = re.compile(r"^(?:[\/\w-]+)?(?P<version>[vV]?\d+(?:\.\d+){0,2}[^\+]*)(?:\+.*)?$")
TAG_REGEX = re.compile(
r"^(?:[\/\w-]+)?(?P<version>[vV]?\d+(?:\.\d+){0,2}[^\+]*)(?:\+.*)?$"
)

# Only set use_scm_version if git executable exists (setting this variable causes pip to use git under the hood)
if shutil.which("git"):
Expand Down Expand Up @@ -354,7 +353,9 @@ def _ensure_go_and_proto_toolchain():

try:
subprocess.check_call(["protoc-gen-go", "--version"], env={"PATH": path_val})
subprocess.check_call(["protoc-gen-go-grpc", "--version"], env={"PATH": path_val})
subprocess.check_call(
["protoc-gen-go-grpc", "--version"], env={"PATH": path_val}
)
except Exception as e:
raise RuntimeError("Unable to find go/grpc extensions for protoc") from e

Expand Down Expand Up @@ -440,7 +441,10 @@ def finalize_options(self) -> None:
self.extensions = [e for e in self.extensions if not self._is_go_ext(e)]

def _is_go_ext(self, ext: Extension):
return any(source.endswith(".go") or source.startswith("github") for source in ext.sources)
return any(
source.endswith(".go") or source.startswith("github")
for source in ext.sources
)

def build_extension(self, ext: Extension):
print(f"Building extension {ext}")
Expand Down

0 comments on commit ac0e571

Please sign in to comment.