Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add Couchbase v2 destination connector #3478

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ include requirements/ingest/biomed.in
include requirements/ingest/box.in
include requirements/ingest/chroma.in
include requirements/ingest/confluence.in
include requirements/ingest/couchbase.in
include requirements/ingest/databricks-volumes.in
include requirements/ingest/delta-table.in
include requirements/ingest/discord.in
Expand Down
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -261,6 +261,10 @@ install-ingest-astra:
install-ingest-clarifai:
python3 -m pip install -r requirements/ingest/clarifai.txt

.PHONY: install-ingest-couchbase
install-ingest-couchbase:
python3 -m pip install -r requirements/ingest/couchbase.txt

.PHONY: install-embed-huggingface
install-embed-huggingface:
python3 -m pip install -r requirements/ingest/embed-huggingface.txt
Expand Down
6 changes: 6 additions & 0 deletions docs/source/ingest/destination_connectors/couchbase.rst
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
Unstructured Documentation
==========================

The Unstructured documentation page has moved! Check out our new and improved docs page at
`https://docs.unstructured.io <https://docs.unstructured.io>`_ to learn more about our
products and tools.
3 changes: 3 additions & 0 deletions requirements/ingest/couchbase.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-c ../deps/constraints.txt
-c ../base.txt
couchbase
8 changes: 8 additions & 0 deletions requirements/ingest/couchbase.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
#
# This file is autogenerated by pip-compile with Python 3.11
# by the following command:
#
# pip-compile ./ingest/couchbase.in
#
couchbase==4.3.0
# via -r ./ingest/couchbase.in
103 changes: 103 additions & 0 deletions scripts/couchbase-test-helpers/common/check_cluster_health.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
import argparse
import time
from dataclasses import dataclass

import requests
import urllib3


@dataclass
class ClusterConfig:
username: str
password: str
connection_string: str
bucket_name: str


def check_bucket_health(cluster_config: ClusterConfig, url: str):
max_attempts = 20
attempt = 0

while attempt < max_attempts:
response = requests.get(
url, auth=(cluster_config.username, cluster_config.password), verify=False
)
response_data = response.json()
if (
response.status_code == 200
and "nodes" in response_data
and len(response_data["nodes"]) > 0
and response_data["nodes"][0]["status"] == "healthy"
):
print(f"Bucket '{cluster_config.bucket_name}' is healthy.")
break
else:
print(
f"Attempt {attempt + 1}/{max_attempts}: "
f"Bucket '{cluster_config.bucket_name}' health check failed"
)
time.sleep(3) # Wait for 1 second before the next attempt
attempt += 1

if attempt == max_attempts:
print(
f"Bucket '{cluster_config.bucket_name}'"
f" health check failed after {max_attempts} attempts."
)
raise RuntimeError(
f"Bucket '{cluster_config.bucket_name}'"
f" health check failed after {max_attempts} attempts."
)


def check_fts_service_health(cluster_config: ClusterConfig, url: str):
max_attempts = 20
attempt = 0

while attempt < max_attempts:
response = requests.get(
url, auth=(cluster_config.username, cluster_config.password), verify=False
)
if response.status_code == 200:
print("FTS service is healthy.")
break
else:
print(f"Attempt {attempt + 1}: FTS service health check failed")
time.sleep(3)
attempt += 1

if attempt == max_attempts:
print(f"FTS service health check failed after {max_attempts} attempts.")
raise RuntimeError(f"FTS service health check failed after {max_attempts} attempts.")


def check_health(cluster_config: ClusterConfig):
host = urllib3.util.parse_url(cluster_config.connection_string).host

check_bucket_health(
cluster_config,
url=f"http://{host}:{8091}/pools/default/buckets/{cluster_config.bucket_name}",
)

check_fts_service_health(cluster_config, url=f"http://{host}:{8094}/api/index")

print("Cluster is healthy")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Setup Couchbase cluster and create search index.")
parser.add_argument("--username", required=True, help="Couchbase username")
parser.add_argument("--password", required=True, help="Couchbase password")
parser.add_argument("--connection_string", required=True, help="Couchbase connection string")
parser.add_argument("--bucket_name", required=True, help="Couchbase bucket name")

args = parser.parse_args()

config = ClusterConfig(
username=args.username,
password=args.password,
connection_string=args.connection_string,
bucket_name=args.bucket_name,
)

check_health(config)
7 changes: 7 additions & 0 deletions scripts/couchbase-test-helpers/common/constants.env
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
CB_CONN_STR=couchbase://localhost
CB_USERNAME=Administrator
CB_PASSWORD=password
CB_BUCKET=unstructured
CB_SCOPE=_default
CB_COLLECTION=_default
CB_INDEX_NAME=unstructured_test_search
21 changes: 21 additions & 0 deletions scripts/couchbase-test-helpers/common/docker-compose.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
services:
couchbase:
image: couchbase:enterprise-7.6.2
ports:
- "8091-8095:8091-8095"
- "11210:11210"
- "9102:9102"
expose:
- "8091"
- "8092"
- "8093"
- "8094"
- "8095"
- "9102"
- "11210"
healthcheck: # checks couchbase server is up
test: ["CMD", "curl", "-v", "http://localhost:8091/pools"]
interval: 20s
timeout: 20s
retries: 5
container_name: couchbase-db
32 changes: 32 additions & 0 deletions scripts/couchbase-test-helpers/common/setup_couchbase_cluster.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
#!/usr/bin/env bash

set -e

SCRIPT_DIR=$(dirname "$(realpath "$0")")
ENV_FILE="$SCRIPT_DIR"/constants.env

source "$ENV_FILE"

docker compose version
docker compose -f "$SCRIPT_DIR"/docker-compose.yaml up --wait
docker compose -f "$SCRIPT_DIR"/docker-compose.yaml ps

echo "Cluster is live."

echo "Initializing Couchbase cluster"
docker exec -it couchbase-db couchbase-cli cluster-init -c $CB_CONN_STR \
--cluster-username $CB_USERNAME --cluster-password $CB_PASSWORD --cluster-ramsize 512 \
--cluster-index-ramsize 512 --cluster-fts-ramsize 512 --services data,index,query,fts \
&& \
docker exec -it couchbase-db couchbase-cli bucket-create -c $CB_CONN_STR \
--username $CB_USERNAME --password $CB_PASSWORD \
--bucket $CB_BUCKET --bucket-type couchbase --bucket-ramsize 200

echo "Couchbase cluster initialized"

python "$SCRIPT_DIR"/check_cluster_health.py \
--username "$CB_USERNAME" \
--password "$CB_PASSWORD" \
--connection_string "$CB_CONN_STR" \
--bucket_name "$CB_BUCKET"
wait
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
# type: ignore[import]
"""Helper module for setting up and interacting with a Couchbase cluster."""

import argparse
import json
import time
from dataclasses import dataclass
from datetime import timedelta

from couchbase.auth import PasswordAuthenticator
from couchbase.cluster import Cluster
from couchbase.management.search import SearchIndex
from couchbase.options import ClusterOptions


@dataclass
class ClusterConfig:
username: str
password: str
connection_string: str
bucket_name: str
scope_name: str
collection_name: str
search_index_name: str


def get_client(cluster_config: ClusterConfig) -> Cluster:
auth = PasswordAuthenticator(cluster_config.username, cluster_config.password)
options = ClusterOptions(auth)
options.apply_profile("wan_development")
cluster = Cluster(cluster_config.connection_string, options)
cluster.wait_until_ready(timedelta(seconds=5))
return cluster


def setup_cluster(cluster_config: ClusterConfig):
cluster = get_client(cluster_config)
bucket = cluster.bucket(cluster_config.bucket_name)
scope = bucket.scope(cluster_config.scope_name)

# Create Primary Index
cluster.query(
"Create primary index on `{}`.`{}`.`{}`".format(
cluster_config.bucket_name, cluster_config.scope_name, cluster_config.collection_name
)
)

index_definition = {
"type": "fulltext-index",
"name": config.search_index_name,
"sourceType": "couchbase",
"sourceName": config.bucket_name,
"planParams": {"maxPartitionsPerPIndex": 1024, "indexPartitions": 1},
"params": {
"doc_config": {
"docid_prefix_delim": "",
"docid_regexp": "",
"mode": "scope.collection.type_field",
"type_field": "type",
},
"mapping": {
"analysis": {},
"default_analyzer": "standard",
"default_datetime_parser": "dateTimeOptional",
"default_field": "_all",
"default_mapping": {"dynamic": True, "enabled": False},
"default_type": "_default",
"docvalues_dynamic": False,
"index_dynamic": True,
"store_dynamic": True,
"type_field": "_type",
"types": {
f"{config.scope_name}.{config.collection_name}": {
"dynamic": False,
"enabled": True,
"properties": {
"embedding": {
"dynamic": False,
"enabled": True,
"fields": [
{
"dims": 384,
"index": True,
"name": "embedding",
"similarity": "dot_product",
"type": "vector",
"vector_index_optimized_for": "recall",
}
],
},
"metadata": {"dynamic": True, "enabled": True},
"text": {
"dynamic": False,
"enabled": True,
"fields": [
{
"include_in_all": True,
"index": True,
"name": "text",
"store": True,
"type": "text",
}
],
},
},
}
},
},
"store": {"indexType": "scorch", "segmentVersion": 16},
},
"sourceParams": {},
}

scope_index_manager = scope.search_indexes()
search_index_def = SearchIndex.from_json(json.dumps(index_definition))
max_attempts = 20
attempt = 0
while attempt < max_attempts:
try:
scope_index_manager.upsert_index(search_index_def)
break
except Exception as e:
print(f"Attempt {attempt + 1}/{max_attempts}: Error creating search index: {e}")
time.sleep(3)
attempt += 1

if attempt == max_attempts:
print(f"Error creating search index after {max_attempts} attempts.")
raise RuntimeError(f"Error creating search index after {max_attempts} attempts.")


if __name__ == "__main__":
parser = argparse.ArgumentParser(description="Setup Couchbase cluster and create search index.")
parser.add_argument("--username", required=True, help="Couchbase username")
parser.add_argument("--password", required=True, help="Couchbase password")
parser.add_argument("--connection_string", required=True, help="Couchbase connection string")
parser.add_argument("--bucket_name", required=True, help="Couchbase bucket name")
parser.add_argument("--scope_name", required=True, help="Couchbase scope name")
parser.add_argument("--collection_name", required=True, help="Couchbase collection name")
parser.add_argument("--search_index_name", required=True, help="Couchbase search index name")

args = parser.parse_args()

config = ClusterConfig(
username=args.username,
password=args.password,
connection_string=args.connection_string,
bucket_name=args.bucket_name,
scope_name=args.scope_name,
collection_name=args.collection_name,
search_index_name=args.search_index_name,
)

setup_cluster(config)
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ def load_requirements(file_list: Optional[Union[str, List[str]]] = None) -> List
"chroma": load_requirements("requirements/ingest/chroma.in"),
"clarifai": load_requirements("requirements/ingest/clarifai.in"),
"confluence": load_requirements("requirements/ingest/confluence.in"),
"couchbase": load_requirements("requirements/ingest/couchbase.in"),
"delta-table": load_requirements("requirements/ingest/delta-table.in"),
"discord": load_requirements("requirements/ingest/discord.in"),
"dropbox": load_requirements("requirements/ingest/dropbox.in"),
Expand Down
Loading
Loading