Skip to content

Commit

Permalink
feat/singlestore dest connector (Unstructured-IO#3320)
Browse files Browse the repository at this point in the history
### Description
Adds [SingleStore](https://www.singlestore.com/) database destination
connector with associated ingest test.
  • Loading branch information
rbiseck3 authored Jul 3, 2024
1 parent 0046f58 commit f1a2860
Show file tree
Hide file tree
Showing 27 changed files with 607 additions and 27 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
## 0.14.10-dev6
## 0.14.10-dev7

### Enhancements
* **Update unstructured-client dependency** Change unstructured-client dependency pin back to
Expand Down
1 change: 1 addition & 0 deletions MANIFEST.in
Original file line number Diff line number Diff line change
Expand Up @@ -53,5 +53,6 @@ include requirements/ingest/salesforce.in
include requirements/ingest/sftp.in
include requirements/ingest/sharepoint.in
include requirements/ingest/slack.in
include requirements/ingest/singlestore.in
include requirements/ingest/weaviate.in
include requirements/ingest/wikipedia.in
4 changes: 4 additions & 0 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,10 @@ install-ingest-airtable:
install-ingest-sharepoint:
python3 -m pip install -r requirements/ingest/sharepoint.txt

.PHONY: install-ingest-singlestore
install-ingest-singlestore:
python3 -m pip install -r requirements/ingest/singlestore.txt

.PHONY: install-ingest-weaviate
install-ingest-weaviate:
python3 -m pip install -r requirements/ingest/weaviate.txt
Expand Down
3 changes: 3 additions & 0 deletions requirements/ingest/singlestore.in
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
-c ../deps/constraints.txt
-c ../base.txt
singlestoredb
66 changes: 66 additions & 0 deletions requirements/ingest/singlestore.txt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
#
# This file is autogenerated by pip-compile with Python 3.9
# by the following command:
#
# pip-compile singlestore.in
#
build==1.2.1
# via singlestoredb
certifi==2024.6.2
# via
# -c ../base.txt
# -c ../deps/constraints.txt
# requests
charset-normalizer==3.3.2
# via
# -c ../base.txt
# requests
idna==3.7
# via
# -c ../base.txt
# requests
importlib-metadata==7.1.0
# via
# -c ../deps/constraints.txt
# build
packaging==23.2
# via
# -c ../base.txt
# -c ../deps/constraints.txt
# build
parsimonious==0.10.0
# via singlestoredb
pyjwt==2.8.0
# via singlestoredb
pyproject-hooks==1.1.0
# via build
regex==2024.5.15
# via
# -c ../base.txt
# parsimonious
requests==2.32.3
# via
# -c ../base.txt
# singlestoredb
singlestoredb==1.4.0
# via -r singlestore.in
sqlparams==6.0.1
# via singlestoredb
tomli==2.0.1
# via
# build
# singlestoredb
urllib3==1.26.19
# via
# -c ../base.txt
# -c ../deps/constraints.txt
# requests
wheel==0.43.0
# via
# -c ../deps/constraints.txt
# singlestoredb
zipp==3.19.2
# via importlib-metadata

# The following packages are considered to be unsafe in a requirements file:
# setuptools
21 changes: 21 additions & 0 deletions scripts/singlestore-test-helpers/docker-compose.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
services:
singlestore:
container_name: "singlestore"
image: ghcr.io/singlestore-labs/singlestoredb-dev:latest
platform: linux/amd64
ports:
- 3306:3306
- 8080:8080
- 9000:9000
environment:
- ROOT_PASSWORD=password
volumes:
- ./schema.sql:/init.sql

# Allow docker compose up --wait to exit only when singlestore is healthy
wait:
image: hello-world:latest
container_name: singlestore-waiter
depends_on:
singlestore:
condition: service_healthy
49 changes: 49 additions & 0 deletions scripts/singlestore-test-helpers/schema.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
CREATE DATABASE ingest_test;
USE ingest_test;

CREATE TABLE elements (
id INT PRIMARY KEY NOT NULL AUTO_INCREMENT,
element_id TEXT,
text TEXT,
embeddings Vector(384),
type TEXT,
url TEXT,
version TEXT,
data_source_date_created TIMESTAMP,
data_source_date_modified TIMESTAMP,
data_source_date_processed TIMESTAMP,
data_source_permissions_data TEXT,
data_source_url TEXT,
data_source_version TEXT,
data_source_record_locator JSON,
category_depth INTEGER,
parent_id TEXT,
attached_filename TEXT,
filetype TEXT,
last_modified TIMESTAMP,
file_directory TEXT,
filename TEXT,
languages TEXT,
page_number TEXT,
links TEXT,
page_name TEXT,
link_urls TEXT,
link_texts TEXT,
sent_from TEXT,
sent_to TEXT,
subject TEXT,
section TEXT,
header_footer_type TEXT,
emphasized_text_contents TEXT,
emphasized_text_tags TEXT,
text_as_html TEXT,
regex_metadata TEXT,
detection_class_prob DECIMAL,
is_continuation BOOLEAN,
orig_elements TEXT,
coordinates_points TEXT,
coordinates_system TEXT,
coordinates_layout_width DECIMAL,
coordinates_layout_height DECIMAL
);

56 changes: 56 additions & 0 deletions scripts/singlestore-test-helpers/test_outputs.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
#!/usr/bin/env python3

import click
import singlestoredb as s2
from singlestoredb.connection import Connection


def get_connection(
host: str = None, port: int = None, database: str = None, user: str = None, password: str = None
) -> Connection:
conn = s2.connect(
host=host,
port=port,
database=database,
user=user,
password=password,
)
return conn


def validate(table_name: str, conn: Connection, num_elements: int):
with conn.cursor() as cur:
stmt = f"select * from {table_name}"
count = cur.execute(stmt)
assert (
count == num_elements
), f"found count ({count}) doesn't match expected value: {num_elements}"
print("validation successful")


@click.command()
@click.option("--host", type=str, default="localhost", show_default=True)
@click.option("--port", type=int, default=3306, show_default=True)
@click.option("--user", type=str, default="root", show_default=True)
@click.option("--password", type=str, default="password")
@click.option("--database", type=str, required=True)
@click.option("--table-name", type=str, required=True)
@click.option(
"--num-elements", type=int, required=True, help="The expected number of elements to exist"
)
def run_validation(
host: str,
port: int,
user: str,
database: str,
password: str,
table_name: str,
num_elements: int,
):
print(f"Validating that table {table_name} in database {database} has {num_elements} entries")
conn = get_connection(host=host, port=port, database=database, user=user, password=password)
validate(table_name=table_name, conn=conn, num_elements=num_elements)


if __name__ == "__main__":
run_validation()
1 change: 1 addition & 0 deletions setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ def load_requirements(file_list: Optional[Union[str, List[str]]] = None) -> List
"openai": load_requirements("requirements/ingest/embed-openai.in"),
"bedrock": load_requirements("requirements/ingest/embed-aws-bedrock.in"),
"databricks-volumes": load_requirements("requirements/ingest/databricks-volumes.in"),
"singlestore": load_requirements("requirements/ingest/singlestore.in"),
},
package_dir={"unstructured": "unstructured"},
package_data={"unstructured": ["nlp/*.txt", "py.typed"]},
Expand Down
65 changes: 65 additions & 0 deletions test_unstructured_ingest/dest/singlestore.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
#!/usr/bin/env bash

set -e

DEST_PATH=$(dirname "$(realpath "$0")")
SCRIPT_DIR=$(dirname "$DEST_PATH")
cd "$SCRIPT_DIR"/.. || exit 1
OUTPUT_FOLDER_NAME=singlestore-dest
OUTPUT_ROOT=${OUTPUT_ROOT:-$SCRIPT_DIR}
OUTPUT_DIR=$OUTPUT_ROOT/structured-output/$OUTPUT_FOLDER_NAME
WORK_DIR=$OUTPUT_ROOT/workdir/$OUTPUT_FOLDER_NAME
CI=${CI:-"false"}
max_processes=${MAX_PROCESSES:=$(python3 -c "import os; print(os.cpu_count())")}

# shellcheck disable=SC1091
source "$SCRIPT_DIR"/cleanup.sh
function cleanup {
# Index cleanup
echo "Stopping Singlestore Docker container"
docker compose -f scripts/singlestore-test-helpers/docker-compose.yml down --remove-orphans -v

# Local file cleanup
cleanup_dir "$WORK_DIR"
cleanup_dir "$OUTPUT_DIR"

}

trap cleanup EXIT

# Create singlestore instance and create `elements` class
echo "Creating singlestore instance"
# shellcheck source=/dev/null
docker compose -f scripts/singlestore-test-helpers/docker-compose.yml up -d --wait-timeout 60

DATABASE=ingest_test
USER=root
HOST=localhost
PASSWORD=password
PORT=3306
TABLE=elements

PYTHONPATH=. ./unstructured/ingest/main.py \
local \
--num-processes "$max_processes" \
--output-dir "$OUTPUT_DIR" \
--strategy fast \
--verbose \
--reprocess \
--input-path example-docs/fake-memo.pdf \
--work-dir "$WORK_DIR" \
--embedding-provider "langchain-huggingface" \
singlestore \
--host $HOST \
--user $USER \
--password $PASSWORD \
--database $DATABASE \
--port $PORT \
--table-name $TABLE \
--drop-empty-cols

expected_num_elements=$(cat "$WORK_DIR"/embed/* | jq 'length')
./scripts/singlestore-test-helpers/test_outputs.py \
--table-name $TABLE \
--database $DATABASE \
--num-elements "$expected_num_elements"
1 change: 1 addition & 0 deletions test_unstructured_ingest/test-ingest-dest.sh
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ all_tests=(
'sharepoint-embed-cog-index.sh'
'sqlite.sh'
'vectara.sh'
'singlestore.sh'
'weaviate.sh'
)

Expand Down
2 changes: 1 addition & 1 deletion unstructured/__version__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__version__ = "0.14.10-dev6" # pragma: no cover
__version__ = "0.14.10-dev7" # pragma: no cover
4 changes: 2 additions & 2 deletions unstructured/ingest/connector/astra.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.ingest.utils.data_prep import chunk_generator
from unstructured.ingest.utils.data_prep import batch_generator
from unstructured.utils import requires_dependencies

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -114,7 +114,7 @@ def write_dict(self, *args, elements_dict: t.List[t.Dict[str, t.Any]], **kwargs)

astra_batch_size = self.write_config.batch_size

for chunk in chunk_generator(elements_dict, astra_batch_size):
for chunk in batch_generator(elements_dict, astra_batch_size):
self._astra_db_collection.insert_many(chunk)

def normalize_dict(self, element_dict: dict) -> dict:
Expand Down
4 changes: 2 additions & 2 deletions unstructured/ingest/connector/chroma.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.ingest.utils.data_prep import chunk_generator
from unstructured.ingest.utils.data_prep import batch_generator
from unstructured.staging.base import flatten_dict
from unstructured.utils import requires_dependencies

Expand Down Expand Up @@ -144,7 +144,7 @@ def write_dict(self, *args, elements_dict: t.List[t.Dict[str, t.Any]], **kwargs)

chroma_batch_size = self.write_config.batch_size

for chunk in chunk_generator(elements_dict, chroma_batch_size):
for chunk in batch_generator(elements_dict, chroma_batch_size):
self.upsert_batch(self.prepare_chroma_list(chunk))

def normalize_dict(self, element_dict: dict) -> dict:
Expand Down
4 changes: 2 additions & 2 deletions unstructured/ingest/connector/kafka.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.ingest.utils.data_prep import chunk_generator
from unstructured.ingest.utils.data_prep import batch_generator
from unstructured.utils import requires_dependencies

if t.TYPE_CHECKING:
Expand Down Expand Up @@ -270,7 +270,7 @@ def write_dict(self, *args, dict_list: t.List[t.Dict[str, t.Any]], **kwargs) ->
logger.info(f"Writing {len(dict_list)} documents to Kafka")
num_uploaded = 0

for chunk in chunk_generator(dict_list, self.write_config.batch_size):
for chunk in batch_generator(dict_list, self.write_config.batch_size):
num_uploaded += self.upload_msg(chunk) # noqa: E203

producer = self.kafka_producer
Expand Down
6 changes: 3 additions & 3 deletions unstructured/ingest/connector/pinecone.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
WriteConfig,
)
from unstructured.ingest.logger import logger
from unstructured.ingest.utils.data_prep import chunk_generator
from unstructured.ingest.utils.data_prep import batch_generator
from unstructured.staging.base import flatten_dict
from unstructured.utils import requires_dependencies

Expand Down Expand Up @@ -111,15 +111,15 @@ def write_dict(self, *args, elements_dict: t.List[t.Dict[str, t.Any]], **kwargs)

logger.info(f"using {self.write_config.num_processes} processes to upload")
if self.write_config.num_processes == 1:
for chunk in chunk_generator(elements_dict, pinecone_batch_size):
for chunk in batch_generator(elements_dict, pinecone_batch_size):
self.upsert_batch(chunk) # noqa: E203

else:
with mp.Pool(
processes=self.write_config.num_processes,
) as pool:
pool.map(
self.upsert_batch, list(chunk_generator(elements_dict, pinecone_batch_size))
self.upsert_batch, list(batch_generator(elements_dict, pinecone_batch_size))
)

def normalize_dict(self, element_dict: dict) -> dict:
Expand Down
Loading

0 comments on commit f1a2860

Please sign in to comment.