Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feat: Add query param support and Made Neo4jOps singleton! #9

Merged
merged 12 commits into from
May 22, 2024
2 changes: 0 additions & 2 deletions docker-compose.test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,6 @@ services:
- NEO4J_USER=neo4j
- NEO4J_PASSWORD=password
- NEO4J_DB=neo4j
- SENTRY_DSN=sample_dsn
- SENTRY_ENV=local
volumes:
- ./coverage:/project/coverage
depends_on:
Expand Down
2 changes: 1 addition & 1 deletion setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

setup(
name="tc-neo4j-lib",
version="1.0.2",
version="2.0.0",
author="Mohammad Amin Dadgar, TogetherCrew",
maintainer="Mohammad Amin Dadgar",
maintainer_email="[email protected]",
Expand Down
1 change: 1 addition & 0 deletions tc_neo4j_lib/__init__.py
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
# flake8: noqa
from .neo4j_ops import Neo4jOps
from .schema import Query
38 changes: 38 additions & 0 deletions tc_neo4j_lib/credentials.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
import os

from dotenv import load_dotenv


def load_neo4j_credentials() -> dict[str, tuple[str, str] | str]:
"""
load neo4j credentials

Parameters
------------
creds : dict[str, str]
the neo4j credentials to use
the keys are representing what the values are
keys are `auth` ,`url` ,`db_name`
"""
load_dotenv()

protocol = os.getenv("NEO4J_PROTOCOL")
host = os.getenv("NEO4J_HOST")
port = os.getenv("NEO4J_PORT")
db_name = os.getenv("NEO4J_DB")

user = os.getenv("NEO4J_USER")
password = os.getenv("NEO4J_PASSWORD")

if any(var is None for var in [protocol, host, port, db_name, user, password]):
raise ValueError("At least one of the neo4j credentials is missing!")

url = f"{protocol}://{host}:{port}"

creds: dict[str, tuple[str, str] | str] = {
"auth": (user, password), # type: ignore
"url": url, # type: ignore
"db_name": db_name, # type: ignore
}

return creds
119 changes: 47 additions & 72 deletions tc_neo4j_lib/neo4j_ops.py
Original file line number Diff line number Diff line change
@@ -1,78 +1,51 @@
import logging
from typing import Optional
import threading

from graphdatascience import GraphDataScience
from neo4j import Driver, GraphDatabase, Transaction
from neo4j import GraphDatabase, Transaction
from neo4j.exceptions import ClientError, DatabaseError, TransientError

from .credentials import load_neo4j_credentials
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
from .schema import Query


class Neo4jOps:
__instance = None

def __init__(self) -> None:
"""
neo4j utility functions
"""
# Neo4J credentials
self.neo4j_db_name: Optional[str] = None
self.neo4j_protocol: Optional[str] = None
self.neo4j_host: Optional[str] = None
self.neo4j_port: Optional[str] = None
self.neo4j_auth: tuple[Optional[str], Optional[str]] = (None, None)
self.neo4j_driver: Optional[Driver] = None
self.gds: Optional[GraphDataScience] = None

def set_neo4j_db_info(
self,
neo4j_db_name: str,
neo4j_protocol: str,
neo4j_host: str,
neo4j_port: str,
neo4j_user: str,
neo4j_password: str,
) -> None:
"""
Neo4j Database information setter

Parameters:
-------------
neo4j_db_ame : str
the database name to save the results in it
neo4j_protocol : str
the protocol we're using to connect to neo4j
neo4j_host : str
our neo4j host ip or domain
neo4j_port : str
the port of neo4j to connect
neo4j_user : str
neo4j username to connect
neo4j_password : str
neo4j database password
"""
neo4j_auth = (neo4j_user, neo4j_password)

url = f"{neo4j_protocol}://{neo4j_host}:{neo4j_port}"

self.neo4j_url = url
self.neo4j_auth = neo4j_auth
self.neo4j_db_name = neo4j_db_name

def neo4j_database_connect(self) -> None:
if Neo4jOps.__instance is not None:
raise Exception("Singletone class! use `get_instance` method always.")
else:
self._neo4j_database_connect()
Neo4jOps.__instance = self

@staticmethod
def get_instance():
if Neo4jOps.__instance is None:
with threading.Lock():
if Neo4jOps.__instance is None: # Double-checked locking
Neo4jOps()
return Neo4jOps.__instance
amindadgar marked this conversation as resolved.
Show resolved Hide resolved

def _neo4j_database_connect(self) -> None:
"""
connect to neo4j database and set the database driver it the class
"""
with GraphDatabase.driver(
self.neo4j_url, auth=self.neo4j_auth, database=self.neo4j_db_name
) as driver:
driver.verify_connectivity()
creds = load_neo4j_credentials()

self.neo4j_driver = driver
self.gds = self.setup_gds()
url = creds["url"]
auth = creds["auth"]
self.db_name = creds["db_name"]

def setup_gds(self):
gds = GraphDataScience(self.neo4j_url, self.neo4j_auth)
self.neo4j_driver = GraphDatabase.driver(url, auth=auth, database=self.db_name)
self.neo4j_driver.verify_connectivity()

return gds
self.gds = GraphDataScience(url, auth)

def _run_query(self, tx: Transaction, query: str) -> None:
def _run_query(self, tx: Transaction, query: str, **kwargs) -> None:
"""
handle neo4j queries in a transaction

Expand All @@ -82,9 +55,11 @@ def _run_query(self, tx: Transaction, query: str) -> None:
the transaction instance for neo4j python driver
query : str
the query to run for neo4j
**kwargs : dict[str, Any]
the parameters for the neo4j query
"""
try:
tx.run(query)
tx.run(query, kwargs)
except TransientError as err:
logging.error("Neo4j transient error!")
logging.error(f"Code: {err.code}, message: {err.message}")
Expand All @@ -95,16 +70,17 @@ def _run_query(self, tx: Transaction, query: str) -> None:
logging.error("Neo4j Client Error!")
logging.error(f"Code: {err.code}, message: {err.message}")

def store_data_neo4j(
self, query_list: list[str], message: str = "", session_batch: int = 30000
def run_queries_in_batch(
self, queries: list[Query], message: str = "", session_batch: int = 30000
) -> None:
"""
store data into neo4j using the given query list

Parameters:
------------
query_list : list[str]
list of strings to add data into neo4j
queries : list[neo4j_lib_py.schema.Query]
list of tuples with lenght of 2. the first index is the neo4j query
and the second index is the neo4j query parameters
min length is 1
message : str
the message to be printed out
Expand All @@ -113,29 +89,28 @@ def store_data_neo4j(
the number of queries to run in one session
default is 30K transactions
"""
if self.neo4j_driver is None:
raise ConnectionError(
"first connect to neo4j using the method neo4j_database_connect"
)

try:
# splitting the transactions
queries_idx = list(range(len(query_list)))[::session_batch]
queries_idx = list(range(len(queries)))[::session_batch]
if len(queries_idx) > 1:
logging.info(
f"{message} huge query count, doing operations in multi-session"
)
for session_number, index in enumerate(queries_idx):
queries = query_list[index : index + session_batch]
with self.neo4j_driver.session(database=self.neo4j_db_name) as session:
queries = queries[index : index + session_batch]
with self.neo4j_driver.session(database=self.db_name) as session:
with session.begin_transaction() as tx:
query_count = len(queries)
for idx, query in enumerate(queries):

for idx, query_item in enumerate(queries):
query = query_item.query
query_parameters = query_item.parameters

msg_title = "Neo4J Transaction session "
msg_title += f"{session_number + 1}/{len(queries_idx)}"
logging.info(
f"{message} {msg_title}: Batch {idx + 1}/{query_count}"
)
self._run_query(tx, query)
self._run_query(tx, query, **query_parameters)
amindadgar marked this conversation as resolved.
Show resolved Hide resolved
except Exception as e:
logging.error(f"Couldn't execute Neo4J DB transaction, exception: {e}")
17 changes: 17 additions & 0 deletions tc_neo4j_lib/schema.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
from typing import Any


class Query:
def __init__(self, query: str, parameters: dict[str, Any]) -> None:
"""
A schema for passing neo4j query

Paramters
------------
query : str
the actual query to pass to neo4j driver
parameters : dict[str, Any]
the neo4j parameters for the query itself
"""
self.query = query
self.parameters = parameters
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The Query class is well-defined with clear documentation. However, there's a small typo in the documentation: "Paramters" should be corrected to "Parameters".

- Paramters
+ Parameters

Committable suggestion

‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation.

Suggested change
class Query:
def __init__(self, query: str, parameters: dict[str, Any]) -> None:
"""
A schema for passing neo4j query
Paramters
------------
query : str
the actual query to pass to neo4j driver
parameters : dict[str, Any]
the neo4j parameters for the query itself
"""
self.query = query
self.parameters = parameters
class Query:
def __init__(self, query: str, parameters: dict[str, Any]) -> None:
"""
A schema for passing neo4j query
Parameters
------------
query : str
the actual query to pass to neo4j driver
parameters : dict[str, Any]
the neo4j parameters for the query itself
"""
self.query = query
self.parameters = parameters

99 changes: 0 additions & 99 deletions tests/integration/test_neo4j_connection.py

This file was deleted.

Loading
Loading