diff --git a/crawlers/mooncrawl/mooncrawl/actions.py b/crawlers/mooncrawl/mooncrawl/actions.py index 3cf7e262..ba6cf868 100644 --- a/crawlers/mooncrawl/mooncrawl/actions.py +++ b/crawlers/mooncrawl/mooncrawl/actions.py @@ -18,7 +18,11 @@ ) from .middleware import MoonstreamHTTPException -from .settings import bugout_client as bc +from .settings import ( + bugout_client as bc, + MOONSTREAM_DB_V3_CONTROLLER_API, + MOONSTREAM_ADMIN_ACCESS_TOKEN, +) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -204,3 +208,21 @@ def get_all_entries_from_search( results.extend(existing_methods.results) # type: ignore return results + + +def get_customer_db_uri( + customer_id: str, + instance_id: str, + user: str, +) -> str: + + try: + response = requests.get( + f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url", + headers={"Authorization": f"Bearer {MOONSTREAM_ADMIN_ACCESS_TOKEN}"}, + ) + response.raise_for_status() + return response.text.replace('"', "") + except Exception as e: + logger.error(f"Error get customer db uri: {str(e)}") + raise MoonstreamHTTPException(status_code=500, internal_error=e) diff --git a/crawlers/mooncrawl/mooncrawl/api.py b/crawlers/mooncrawl/mooncrawl/api.py index 54b8715d..615d2805 100644 --- a/crawlers/mooncrawl/mooncrawl/api.py +++ b/crawlers/mooncrawl/mooncrawl/api.py @@ -232,6 +232,7 @@ async def queries_data_update_handler( requested_query = request_data.query + blockchain_table = "polygon_labels" if request_data.blockchain: if request_data.blockchain not in [i.value for i in AvailableBlockchainType]: logger.error(f"Unknown blockchain {request_data.blockchain}") @@ -254,6 +255,8 @@ async def queries_data_update_handler( ) ) + blockchain_table = get_label_model(blockchain).__tablename__ + # Check if it can transform to TextClause try: query = text(requested_query) @@ -296,6 +299,9 @@ async def queries_data_update_handler( query=query, params=passed_params, params_hash=params_hash, + customer_id=request_data.customer_id, + instance_id=request_data.instance_id, + blockchain_table=blockchain_table, ) except Exception as e: diff --git a/crawlers/mooncrawl/mooncrawl/data.py b/crawlers/mooncrawl/mooncrawl/data.py index 3e76db46..54ae071f 100644 --- a/crawlers/mooncrawl/mooncrawl/data.py +++ b/crawlers/mooncrawl/mooncrawl/data.py @@ -50,6 +50,8 @@ class QueryDataUpdate(BaseModel): query: str params: Dict[str, Any] = Field(default_factory=dict) blockchain: Optional[str] = None + customer_id: Optional[str] = None + instance_id: Optional[str] = None class TokenURIs(BaseModel): diff --git a/crawlers/mooncrawl/mooncrawl/db.py b/crawlers/mooncrawl/mooncrawl/db.py index 8eba88f6..cb22baf7 100644 --- a/crawlers/mooncrawl/mooncrawl/db.py +++ b/crawlers/mooncrawl/mooncrawl/db.py @@ -7,6 +7,7 @@ MOONSTREAM_POOL_SIZE, create_moonstream_engine, ) +from moonstreamdbv3.db import MoonstreamCustomDBEngine from sqlalchemy.orm import Session, sessionmaker from .settings import ( diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py index 8cc2def2..07d2701b 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py @@ -91,6 +91,13 @@ def handle_leaderboards(args: argparse.Namespace) -> None: params = leaderboard_data["params"] blockchain = leaderboard_data.get("blockchain", None) + query_params = {} + + if leaderboard_data.get("customer_id", False): + query_params["customer_id"] = leaderboard_data["customer_id"] + + if leaderboard_data.get("instance_id", False): + query_params["instance_id"] = str(leaderboard_data["instance_id"]) ### execute query try: @@ -98,6 +105,7 @@ def handle_leaderboards(args: argparse.Namespace) -> None: args.query_api_access_token, query_name, params, + query_params, blockchain, MOONSTREAM_API_URL, args.max_retries, diff --git a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py index f1e9e87f..f0c5e833 100644 --- a/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py +++ b/crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py @@ -22,6 +22,7 @@ def get_results_for_moonstream_query( moonstream_access_token: str, query_name: str, params: Dict[str, Any], + query_params: Dict[str, Any], blockchain: Optional[str] = None, api_url: str = MOONSTREAM_API_URL, max_retries: int = 100, @@ -70,8 +71,13 @@ def get_results_for_moonstream_query( attempts = 0 while not success and attempts < query_api_retries: + response = requests.post( - request_url, json=request_body, headers=headers, timeout=10 + request_url, + json=request_body, + headers=headers, + timeout=10, + params=query_params, ) attempts += 1 response.raise_for_status() diff --git a/crawlers/mooncrawl/mooncrawl/settings.py b/crawlers/mooncrawl/mooncrawl/settings.py index f9218296..20cf9253 100644 --- a/crawlers/mooncrawl/mooncrawl/settings.py +++ b/crawlers/mooncrawl/mooncrawl/settings.py @@ -481,3 +481,12 @@ raise ValueError( "MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set" ) + + +MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get( + "MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to" +) + +MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get( + "MOONSTREAM_DB_V3_SCHEMA_NAME", "blockchain" +) diff --git a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py index 55052605..f2178d06 100644 --- a/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py +++ b/crawlers/mooncrawl/mooncrawl/stats_worker/queries.py @@ -5,16 +5,24 @@ import re from collections import OrderedDict from io import StringIO -from typing import Any, Dict +from typing import Any, Dict, Optional from sqlalchemy.orm import sessionmaker from sqlalchemy.sql import text from sqlalchemy.sql.expression import TextClause -from ..actions import push_data_to_bucket -from ..db import RO_pre_ping_query_engine +from ..actions import push_data_to_bucket, get_customer_db_uri +from ..db import ( + RO_pre_ping_query_engine, + MOONSTREAM_DB_URI_READ_ONLY, + MoonstreamCustomDBEngine, +) from ..reporter import reporter -from ..settings import MOONSTREAM_S3_QUERIES_BUCKET_PREFIX +from ..settings import ( + CRAWLER_LABEL, + SEER_CRAWLER_LABEL, + MOONSTREAM_DB_V3_SCHEMA_NAME, +) logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) @@ -64,12 +72,26 @@ def data_generate( query: TextClause, params: Dict[str, Any], params_hash: str, + customer_id: Optional[str] = None, + instance_id: Optional[str] = None, + blockchain_table: Optional[str] = None, ): """ Generate query and push it to S3 """ + label = CRAWLER_LABEL + db_uri = MOONSTREAM_DB_URI_READ_ONLY + if customer_id is not None and instance_id is not None: + db_uri = get_customer_db_uri(customer_id, instance_id, "customer") + label = SEER_CRAWLER_LABEL + + engine = MoonstreamCustomDBEngine( + url=db_uri, schema=MOONSTREAM_DB_V3_SCHEMA_NAME + ) + else: + engine = RO_pre_ping_query_engine - process_session = sessionmaker(bind=RO_pre_ping_query_engine) + process_session = sessionmaker(bind=engine) db_session = process_session() metadata = { @@ -80,35 +102,32 @@ def data_generate( "params": json.dumps(params), } - try: - # TODO:(Andrey) Need optimization that information is usefull but incomplete - block_number, block_timestamp = db_session.execute( - text( - "SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;" - ), - ).one() + block_number = None + block_timestamp = None + try: + ### If blockchain is provided, we need to get the latest block number and timestamp + if blockchain_table is not None: + block_number, block_timestamp = db_session.execute( + text( + f"SELECT block_number, block_timestamp FROM {blockchain_table} WHERE label='{label}' ORDER BY block_number DESC LIMIT 1" + ), + ).one() if file_type == "csv": csv_buffer = StringIO() csv_writer = csv.writer(csv_buffer, delimiter=";") - # engine.execution_options(stream_results=True) query_instance = db_session.execute(query, params) # type: ignore csv_writer.writerow(query_instance.keys()) csv_writer.writerows(query_instance.fetchall()) - metadata["block_number"] = block_number - metadata["block_timestamp"] = block_timestamp + metadata["block_number"] = block_number # type: ignore + metadata["block_timestamp"] = block_timestamp # type: ignore data = csv_buffer.getvalue().encode("utf-8") else: - block_number, block_timestamp = db_session.execute( - text( - "SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;" - ), - ).one() data = json.dumps( { diff --git a/crawlers/mooncrawl/mooncrawl/version.py b/crawlers/mooncrawl/mooncrawl/version.py index d5b0894c..40646fd2 100644 --- a/crawlers/mooncrawl/mooncrawl/version.py +++ b/crawlers/mooncrawl/mooncrawl/version.py @@ -2,4 +2,4 @@ Moonstream crawlers version. """ -MOONCRAWL_VERSION = "0.4.13" +MOONCRAWL_VERSION = "0.5.0" diff --git a/crawlers/mooncrawl/sample.env b/crawlers/mooncrawl/sample.env index 5215576b..1887e9b2 100644 --- a/crawlers/mooncrawl/sample.env +++ b/crawlers/mooncrawl/sample.env @@ -71,4 +71,7 @@ export MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID=" data.QueryPresignUrl: """ Request update data on S3 bucket @@ -386,6 +389,26 @@ async def update_query_data_handler( detail=f"Provided blockchain is not supported.", ) + json_payload = {} + + is_customer_database = customer_id is not None and instance_id is not None + + if is_customer_database: + + results = check_user_resource_access( + customer_id=customer_id, + user_token=token, + ) + + if results is None: + raise MoonstreamHTTPException( + status_code=404, + detail="Not found customer", + ) + + json_payload["customer_id"] = customer_id + json_payload["instance_id"] = instance_id + # normalize query name try: @@ -436,8 +459,9 @@ async def update_query_data_handler( raise MoonstreamHTTPException(status_code=500, internal_error=e) ### check tags - - if "preapprove" in entry.tags or "approved" not in entry.tags: + if ( + "preapprove" in entry.tags or "approved" not in entry.tags + ) and not is_customer_database: raise MoonstreamHTTPException( status_code=403, detail="Query not approved yet." ) @@ -456,17 +480,16 @@ async def update_query_data_handler( if "ext:csv" in tags: file_type = "csv" + + json_payload["query"] = content + json_payload["params"] = request_update.params + json_payload["file_type"] = file_type + json_payload["blockchain"] = request_update.blockchain + try: responce = requests.post( f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update", - json={ - "query": content, - "params": request_update.params, - "file_type": file_type, - "blockchain": ( - request_update.blockchain if request_update.blockchain else None - ), - }, + json=json_payload, timeout=MOONSTREAM_INTERNAL_REQUEST_TIMEOUT_SECONDS, ) except Exception as e: