Skip to content

Commit

Permalink
add retries to other connections (#329)
Browse files Browse the repository at this point in the history
Errors in the log showed that we also need retries in main.py and
vx_ingest.py
  • Loading branch information
randytpierce authored Feb 14, 2024
2 parents 5f14bfa + 892e366 commit 36e9abb
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 8 deletions.
3 changes: 2 additions & 1 deletion src/vxingest/builder_common/ingest_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,8 @@ def connect_cb(self):

try:
timeout_options = ClusterTimeoutOptions(
kv_timeout=timedelta(seconds=25), query_timeout=timedelta(seconds=120)
kv_timeout=timedelta(seconds=25),
query_timeout=timedelta(seconds=120),
)
options = ClusterOptions(
PasswordAuthenticator(
Expand Down
18 changes: 15 additions & 3 deletions src/vxingest/builder_common/vx_ingest.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import yaml
from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.exceptions import CouchbaseException
from couchbase.options import ClusterOptions, ClusterTimeoutOptions

# Get a logger with this module's name to help with debugging
Expand Down Expand Up @@ -144,9 +145,20 @@ def connect_cb(self):
),
timeout_options=timeout_options,
)
self.cluster = Cluster(
"couchbase://" + self.cb_credentials["host"], options
)
_attempts = 0
while _attempts < 3:
try:
self.cluster = Cluster(
"couchbase://" + self.cb_credentials["host"], options
)
break
except CouchbaseException as _e:
time.sleep(5)
_attempts = _attempts + 1
if _attempts == 3:
raise CouchbaseException(
"Could not connect to couchbase after 3 attempts"
)
self.collection = self.cluster.bucket(
self.cb_credentials["bucket"]
).collection(self.cb_credentials["collection"])
Expand Down
18 changes: 14 additions & 4 deletions src/vxingest/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import shutil
import sys
import tarfile
import time
from datetime import datetime, timedelta
from multiprocessing import Queue, set_start_method
from pathlib import Path
Expand Down Expand Up @@ -285,10 +286,19 @@ def connect_cb(creds: dict[str, str]) -> Cluster:
# Get a reference to our cluster
# NOTE: For TLS/SSL connection use 'couchbases://<your-ip-address>' instead
logger.info(f"Connecting to Couchbase at: {creds['cb_host']}")
cluster = Cluster(
f"couchbase://{creds['cb_host']}",
ClusterOptions(auth, timeout_options=timeout_config), # type: ignore
)
_attempts = 0
while _attempts < 3:
try:
cluster = Cluster(
f"couchbase://{creds['cb_host']}",
ClusterOptions(auth, timeout_options=timeout_config), # type: ignore
)
break
except CouchbaseException as _e:
time.sleep(5)
_attempts = _attempts + 1
if _attempts == 3:
raise CouchbaseException("Could not connect to couchbase after 3 attempts")

# Wait until the cluster is ready for use.
cluster.wait_until_ready(timedelta(seconds=5))
Expand Down

0 comments on commit 36e9abb

Please sign in to comment.