diff --git a/dags/hivemind_etl_helpers/ingestion_pipeline.py b/dags/hivemind_etl_helpers/ingestion_pipeline.py index 39aa48bf..ccdd6283 100644 --- a/dags/hivemind_etl_helpers/ingestion_pipeline.py +++ b/dags/hivemind_etl_helpers/ingestion_pipeline.py @@ -32,7 +32,7 @@ def __init__(self, community_id: str, collection_name: str, testing: bool = Fals _, self.embedding_dim = load_model_hyperparams() self.pg_creds = load_postgres_credentials() self.redis_cred = load_redis_credentials() - self.collection_name = community_id + self.collection_name = f"{community_id}_{collection_name}" self.platform_name = collection_name self.embed_model = ( @@ -63,8 +63,7 @@ def run_pipeline(self, docs: list[Document]) -> list[BaseNode]: logging.info( f"{len(docs)} docuemnts was extracted and now loading into QDrant DB!" ) - qdrant_collection_name = f"{self.collection_name}_{self.platform_name}" - vector_access = QDrantVectorAccess(collection_name=qdrant_collection_name) + vector_access = QDrantVectorAccess(collection_name=self.collection_name) vector_store = vector_access.setup_qdrant_vector_store() pipeline = IngestionPipeline( @@ -74,13 +73,13 @@ def run_pipeline(self, docs: list[Document]) -> list[BaseNode]: ], docstore=MongoDocumentStore.from_uri( uri=get_mongo_uri(), - db_name=f"docstore_{self.collection_name}", + db_name=f"docstore_{self.community_id}", namespace=self.platform_name, ), vector_store=vector_store, cache=IngestionCache( cache=RedisCache.from_redis_client(self.redis_client), - collection=f"{self.collection_name}_{self.platform_name}_ingestion_cache", + collection=f"{self.collection_name}_ingestion_cache", docstore_strategy=DocstoreStrategy.UPSERTS, ), docstore_strategy=DocstoreStrategy.UPSERTS,