Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

V3 state crawler #1142

Open
wants to merge 13 commits into
base: main
Choose a base branch
from
90 changes: 89 additions & 1 deletion crawlers/mooncrawl/mooncrawl/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,15 @@
Moonstream,
MoonstreamQueryResultUrl,
)
from sqlalchemy import text, TextClause
from moonstreamtypes.blockchain import (
AvailableBlockchainType,
get_block_model,
get_label_model,
get_transaction_model,
)

from .data import QueryDataUpdate
from .middleware import MoonstreamHTTPException
from .settings import (
bugout_client as bc,
Expand All @@ -34,6 +42,12 @@ class EntityCollectionNotFoundException(Exception):
"""


class QueryTextClauseException(Exception):
"""
Raised when query can't be transformed to TextClause
"""


def push_data_to_bucket(
data: Any, key: str, bucket: str, metadata: Dict[str, Any] = {}
) -> None:
Expand Down Expand Up @@ -120,6 +134,7 @@ def recive_S3_data_from_query(
time_await: int = 2,
max_retries: int = 30,
custom_body: Optional[Dict[str, Any]] = None,
customer_params: Optional[Dict[str, Any]] = {},
) -> Any:
"""
Await the query to be update data on S3 with if_modified_since and return new the data.
Expand All @@ -133,7 +148,7 @@ def recive_S3_data_from_query(
if_modified_since = if_modified_since_datetime.strftime("%a, %d %b %Y %H:%M:%S GMT")

time.sleep(2)
if custom_body:
if custom_body or customer_params:
headers = {
"Authorization": f"Bearer {token}",
}
Expand All @@ -142,9 +157,11 @@ def recive_S3_data_from_query(
response = requests.post(
url=f"{client.api.endpoints[ENDPOINT_QUERIES]}/{query_name}/update_data",
headers=headers,
params=customer_params,
json=json,
timeout=5,
)
print(response.json())
data_url = MoonstreamQueryResultUrl(url=response.json()["url"])
else:
data_url = client.exec_query(
Expand Down Expand Up @@ -226,3 +243,74 @@ def get_customer_db_uri(
except Exception as e:
logger.error(f"Error get customer db uri: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)


def resolve_table_names(request_data: QueryDataUpdate) -> Dict[str, str]:
"""
Determines the table names based on the blockchain and labels version.
Returns an empty dictionary if blockchain is not provided.
"""
if not request_data.blockchain:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe better to throw exception here?

return {"labels_table": "ethereum_labels"}

if request_data.blockchain not in [i.value for i in AvailableBlockchainType]:
logger.error(f"Unknown blockchain {request_data.blockchain}")
raise MoonstreamHTTPException(status_code=403, detail="Unknown blockchain")

blockchain = AvailableBlockchainType(request_data.blockchain)
labels_version = 2

if request_data.customer_id is not None and request_data.instance_id is not None:
labels_version = 3

print(labels_version, blockchain)

tables = {
"labels_table": get_label_model(blockchain, labels_version).__tablename__,
}

if labels_version != 3:
tables.update(
{
"transactions_table": get_transaction_model(blockchain).__tablename__,
"blocks_table": get_block_model(blockchain).__tablename__,
}
)

return tables


def prepare_query(
requested_query: str, tables: Dict[str, str], query_id: str
) -> TextClause:
"""
Prepares the SQL query by replacing placeholders with actual table names.
"""
# Check and replace placeholders only if they exist in the query
if "__labels_table__" in requested_query:
requested_query = requested_query.replace(
"__labels_table__", tables.get("labels_table", "ethereum_labels")
)

if "__transactions_table__" in requested_query and "transactions_table" in tables:
requested_query = requested_query.replace(
"__transactions_table__", tables["transactions_table"]
)

if "__blocks_table__" in requested_query and "blocks_table" in tables:
requested_query = requested_query.replace(
"__blocks_table__", tables["blocks_table"]
)

# Check if it can transform to TextClause
try:
query = text(requested_query)
except Exception as e:
logger.error(
f"Can't parse query {query_id} to TextClause in drones /query_update endpoint, error: {e}"
)
raise QueryTextClauseException(
f"Can't parse query {query_id} to TextClause in drones /query_update endpoint, error: {e}"
)

return query
57 changes: 15 additions & 42 deletions crawlers/mooncrawl/mooncrawl/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,20 +13,16 @@
from bugout.data import BugoutJournalEntity, BugoutResource
from fastapi import BackgroundTasks, FastAPI
from fastapi.middleware.cors import CORSMiddleware
from moonstreamdb.blockchain import (
AvailableBlockchainType,
get_block_model,
get_label_model,
get_transaction_model,
)
from sqlalchemy import text

from . import data
from .actions import (
EntityCollectionNotFoundException,
generate_s3_access_links,
get_entity_subscription_collection_id,
query_parameter_hash,
prepare_query,
resolve_table_names,
QueryTextClauseException,
)
from .middleware import MoonstreamHTTPException
from .settings import (
Expand Down Expand Up @@ -230,43 +226,20 @@ async def queries_data_update_handler(
logger.error(f"Unhandled query execute exception, error: {e}")
raise MoonstreamHTTPException(status_code=500)

requested_query = request_data.query

blockchain_table = "polygon_labels"
if request_data.blockchain:
if request_data.blockchain not in [i.value for i in AvailableBlockchainType]:
logger.error(f"Unknown blockchain {request_data.blockchain}")
raise MoonstreamHTTPException(status_code=403, detail="Unknown blockchain")

blockchain = AvailableBlockchainType(request_data.blockchain)

requested_query = (
requested_query.replace(
"__transactions_table__",
get_transaction_model(blockchain).__tablename__,
)
.replace(
"__blocks_table__",
get_block_model(blockchain).__tablename__,
)
.replace(
"__labels_table__",
get_label_model(blockchain).__tablename__,
)
)

blockchain_table = get_label_model(blockchain).__tablename__
# Resolve table names based on the request data default ethereum
tables = resolve_table_names(request_data)

# Check if it can transform to TextClause
# Prepare the query with the resolved table names
try:
query = text(requested_query)
query = prepare_query(request_data.query, tables, query_id)
except QueryTextClauseException as e:
logger.error(f"Error preparing query for query id: {query_id}, error: {e}")
raise MoonstreamHTTPException(status_code=500, detail="Error preparing query")
except Exception as e:
logger.error(
f"Can't parse query {query_id} to TextClause in drones /query_update endpoint, error: {e}"
)
raise MoonstreamHTTPException(status_code=500, detail="Can't parse query")
logger.error(f"Error preparing query for query id: {query_id}, error: {e}")
raise MoonstreamHTTPException(status_code=500, detail="Error preparing query")

# Get requried keys for query
# Get required keys for query
expected_query_parameters = query._bindparams.keys()

# request.params validations
Expand Down Expand Up @@ -301,9 +274,9 @@ async def queries_data_update_handler(
params_hash=params_hash,
customer_id=request_data.customer_id,
instance_id=request_data.instance_id,
blockchain_table=blockchain_table,
blockchain_table=tables["labels_table"],
# Add any additional parameters needed for the task
)

except Exception as e:
logger.error(f"Unhandled query execute exception, error: {e}")
raise MoonstreamHTTPException(status_code=500)
Expand Down
3 changes: 3 additions & 0 deletions crawlers/mooncrawl/mooncrawl/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -69,3 +69,6 @@ class ViewTasks(BaseModel):
name: str
outputs: List[Dict[str, Any]]
address: str
customer_id: Optional[str] = None
instance_id: Optional[str] = None
v3: Optional[bool] = False
Andrei-Dolgolev marked this conversation as resolved.
Show resolved Hide resolved
27 changes: 19 additions & 8 deletions crawlers/mooncrawl/mooncrawl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,19 +5,14 @@
from bugout.app import Bugout
from moonstreamtypes.blockchain import AvailableBlockchainType

# Bugout

# APIs
## Bugout
BUGOUT_BROOD_URL = os.environ.get("BUGOUT_BROOD_URL", "https://auth.bugout.dev")
BUGOUT_SPIRE_URL = os.environ.get("BUGOUT_SPIRE_URL", "https://spire.bugout.dev")

bugout_client = Bugout(brood_api_url=BUGOUT_BROOD_URL, spire_api_url=BUGOUT_SPIRE_URL)


MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")
MOONSTREAM_ENGINE_URL = os.environ.get(
"MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to"
)


BUGOUT_REQUEST_TIMEOUT_SECONDS_RAW = os.environ.get(
"MOONSTREAM_BUGOUT_TIMEOUT_SECONDS", 30
)
Expand All @@ -31,6 +26,21 @@

HUMBUG_REPORTER_CRAWLERS_TOKEN = os.environ.get("HUMBUG_REPORTER_CRAWLERS_TOKEN")


## Moonstream
MOONSTREAM_API_URL = os.environ.get("MOONSTREAM_API_URL", "https://api.moonstream.to")

## Moonstream Engine
MOONSTREAM_ENGINE_URL = os.environ.get(
"MOONSTREAM_ENGINE_URL", "https://engineapi.moonstream.to"
)

## Moonstream DB
MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to"
)


# Origin
RAW_ORIGINS = os.environ.get("MOONSTREAM_CORS_ALLOWED_ORIGINS")
if RAW_ORIGINS is None:
Expand Down Expand Up @@ -381,6 +391,7 @@
AvailableBlockchainType.BLAST: "0xcA11bde05977b3631167028862bE2a173976CA11",
AvailableBlockchainType.MANTLE: "0xcA11bde05977b3631167028862bE2a173976CA11",
AvailableBlockchainType.MANTLE_SEPOLIA: "0xcA11bde05977b3631167028862bE2a173976CA11",
AvailableBlockchainType.GAME7_TESTNET: "0xcA11bde05977b3631167028862bE2a173976CA11",
}


Expand Down
Loading
Loading