Skip to content

Commit

Permalink
Merge pull request #1143 from moonstream-to/query-api-v3
Browse files Browse the repository at this point in the history
Add queryAPI v3 and update leaderboard generator.
  • Loading branch information
Andrei-Dolgolev authored Nov 27, 2024
2 parents 87e73c9 + be90897 commit 79b464d
Show file tree
Hide file tree
Showing 12 changed files with 135 additions and 34 deletions.
24 changes: 23 additions & 1 deletion crawlers/mooncrawl/mooncrawl/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,11 @@
)

from .middleware import MoonstreamHTTPException
from .settings import bugout_client as bc
from .settings import (
bugout_client as bc,
MOONSTREAM_DB_V3_CONTROLLER_API,
MOONSTREAM_ADMIN_ACCESS_TOKEN,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -204,3 +208,21 @@ def get_all_entries_from_search(
results.extend(existing_methods.results) # type: ignore

return results


def get_customer_db_uri(
customer_id: str,
instance_id: str,
user: str,
) -> str:

try:
response = requests.get(
f"{MOONSTREAM_DB_V3_CONTROLLER_API}/customers/{customer_id}/instances/{instance_id}/creds/{user}/url",
headers={"Authorization": f"Bearer {MOONSTREAM_ADMIN_ACCESS_TOKEN}"},
)
response.raise_for_status()
return response.text.replace('"', "")
except Exception as e:
logger.error(f"Error get customer db uri: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)
6 changes: 6 additions & 0 deletions crawlers/mooncrawl/mooncrawl/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ async def queries_data_update_handler(

requested_query = request_data.query

blockchain_table = "polygon_labels"
if request_data.blockchain:
if request_data.blockchain not in [i.value for i in AvailableBlockchainType]:
logger.error(f"Unknown blockchain {request_data.blockchain}")
Expand All @@ -254,6 +255,8 @@ async def queries_data_update_handler(
)
)

blockchain_table = get_label_model(blockchain).__tablename__

# Check if it can transform to TextClause
try:
query = text(requested_query)
Expand Down Expand Up @@ -296,6 +299,9 @@ async def queries_data_update_handler(
query=query,
params=passed_params,
params_hash=params_hash,
customer_id=request_data.customer_id,
instance_id=request_data.instance_id,
blockchain_table=blockchain_table,
)

except Exception as e:
Expand Down
2 changes: 2 additions & 0 deletions crawlers/mooncrawl/mooncrawl/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ class QueryDataUpdate(BaseModel):
query: str
params: Dict[str, Any] = Field(default_factory=dict)
blockchain: Optional[str] = None
customer_id: Optional[str] = None
instance_id: Optional[str] = None


class TokenURIs(BaseModel):
Expand Down
1 change: 1 addition & 0 deletions crawlers/mooncrawl/mooncrawl/db.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
MOONSTREAM_POOL_SIZE,
create_moonstream_engine,
)
from moonstreamdbv3.db import MoonstreamCustomDBEngine
from sqlalchemy.orm import Session, sessionmaker

from .settings import (
Expand Down
8 changes: 8 additions & 0 deletions crawlers/mooncrawl/mooncrawl/leaderboards_generator/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -91,13 +91,21 @@ def handle_leaderboards(args: argparse.Namespace) -> None:
params = leaderboard_data["params"]

blockchain = leaderboard_data.get("blockchain", None)
query_params = {}

if leaderboard_data.get("customer_id", False):
query_params["customer_id"] = leaderboard_data["customer_id"]

if leaderboard_data.get("instance_id", False):
query_params["instance_id"] = str(leaderboard_data["instance_id"])

### execute query
try:
query_results = get_results_for_moonstream_query(
args.query_api_access_token,
query_name,
params,
query_params,
blockchain,
MOONSTREAM_API_URL,
args.max_retries,
Expand Down
8 changes: 7 additions & 1 deletion crawlers/mooncrawl/mooncrawl/leaderboards_generator/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ def get_results_for_moonstream_query(
moonstream_access_token: str,
query_name: str,
params: Dict[str, Any],
query_params: Dict[str, Any],
blockchain: Optional[str] = None,
api_url: str = MOONSTREAM_API_URL,
max_retries: int = 100,
Expand Down Expand Up @@ -70,8 +71,13 @@ def get_results_for_moonstream_query(
attempts = 0

while not success and attempts < query_api_retries:

response = requests.post(
request_url, json=request_body, headers=headers, timeout=10
request_url,
json=request_body,
headers=headers,
timeout=10,
params=query_params,
)
attempts += 1
response.raise_for_status()
Expand Down
9 changes: 9 additions & 0 deletions crawlers/mooncrawl/mooncrawl/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -481,3 +481,12 @@
raise ValueError(
"MOONSTREAM_STATE_CRAWLER_JOURNAL_ID environment variable must be set"
)


MOONSTREAM_DB_V3_CONTROLLER_API = os.environ.get(
"MOONSTREAM_DB_V3_CONTROLLER_API", "https://mdb-v3-api.moonstream.to"
)

MOONSTREAM_DB_V3_SCHEMA_NAME = os.environ.get(
"MOONSTREAM_DB_V3_SCHEMA_NAME", "blockchain"
)
59 changes: 39 additions & 20 deletions crawlers/mooncrawl/mooncrawl/stats_worker/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,24 @@
import re
from collections import OrderedDict
from io import StringIO
from typing import Any, Dict
from typing import Any, Dict, Optional

from sqlalchemy.orm import sessionmaker
from sqlalchemy.sql import text
from sqlalchemy.sql.expression import TextClause

from ..actions import push_data_to_bucket
from ..db import RO_pre_ping_query_engine
from ..actions import push_data_to_bucket, get_customer_db_uri
from ..db import (
RO_pre_ping_query_engine,
MOONSTREAM_DB_URI_READ_ONLY,
MoonstreamCustomDBEngine,
)
from ..reporter import reporter
from ..settings import MOONSTREAM_S3_QUERIES_BUCKET_PREFIX
from ..settings import (
CRAWLER_LABEL,
SEER_CRAWLER_LABEL,
MOONSTREAM_DB_V3_SCHEMA_NAME,
)

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -64,12 +72,26 @@ def data_generate(
query: TextClause,
params: Dict[str, Any],
params_hash: str,
customer_id: Optional[str] = None,
instance_id: Optional[str] = None,
blockchain_table: Optional[str] = None,
):
"""
Generate query and push it to S3
"""
label = CRAWLER_LABEL
db_uri = MOONSTREAM_DB_URI_READ_ONLY
if customer_id is not None and instance_id is not None:
db_uri = get_customer_db_uri(customer_id, instance_id, "customer")
label = SEER_CRAWLER_LABEL

engine = MoonstreamCustomDBEngine(
url=db_uri, schema=MOONSTREAM_DB_V3_SCHEMA_NAME
)
else:
engine = RO_pre_ping_query_engine

process_session = sessionmaker(bind=RO_pre_ping_query_engine)
process_session = sessionmaker(bind=engine)
db_session = process_session()

metadata = {
Expand All @@ -80,35 +102,32 @@ def data_generate(
"params": json.dumps(params),
}

try:
# TODO:(Andrey) Need optimization that information is usefull but incomplete
block_number, block_timestamp = db_session.execute(
text(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;"
),
).one()
block_number = None
block_timestamp = None

try:
### If blockchain is provided, we need to get the latest block number and timestamp
if blockchain_table is not None:
block_number, block_timestamp = db_session.execute(
text(
f"SELECT block_number, block_timestamp FROM {blockchain_table} WHERE label='{label}' ORDER BY block_number DESC LIMIT 1"
),
).one()
if file_type == "csv":
csv_buffer = StringIO()
csv_writer = csv.writer(csv_buffer, delimiter=";")

# engine.execution_options(stream_results=True)
query_instance = db_session.execute(query, params) # type: ignore

csv_writer.writerow(query_instance.keys())
csv_writer.writerows(query_instance.fetchall())

metadata["block_number"] = block_number
metadata["block_timestamp"] = block_timestamp
metadata["block_number"] = block_number # type: ignore
metadata["block_timestamp"] = block_timestamp # type: ignore

data = csv_buffer.getvalue().encode("utf-8")

else:
block_number, block_timestamp = db_session.execute(
text(
"SELECT block_number, block_timestamp FROM polygon_labels WHERE block_number=(SELECT max(block_number) FROM polygon_labels where label='moonworm-alpha') limit 1;"
),
).one()

data = json.dumps(
{
Expand Down
2 changes: 1 addition & 1 deletion crawlers/mooncrawl/mooncrawl/version.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
Moonstream crawlers version.
"""

MOONCRAWL_VERSION = "0.4.13"
MOONCRAWL_VERSION = "0.5.0"
5 changes: 4 additions & 1 deletion crawlers/mooncrawl/sample.env
Original file line number Diff line number Diff line change
Expand Up @@ -71,4 +71,7 @@ export MOONSTREAM_LEADERBOARD_GENERATOR_JOURNAL_ID="<Bugout_journal_id_for_leade

# DB v3 controller
export MOONSTREAM_DB_V3_CONTROLLER_API="https://mdb-v3-api.moonstream.to"
export MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN="<token_to_access_mdb_v3_controller_api>"
export MOONSTREAM_DB_V3_CONTROLLER_SEER_ACCESS_TOKEN="<token_to_access_mdb_v3_controller_api>"

# Moonstream DB v3
export MOONSTREAM_DB_V3_SCHEMA_NAME="blockchain"
2 changes: 2 additions & 0 deletions moonstreamapi/moonstreamapi/data.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,8 @@ class DashboardUpdate(BaseModel):
class UpdateDataRequest(BaseModel):
params: Dict[str, Any] = Field(default_factory=dict)
blockchain: Optional[str] = None
customer_id: Optional[str] = None
instance_id: Optional[str] = None


class UpdateQueryRequest(BaseModel):
Expand Down
43 changes: 33 additions & 10 deletions moonstreamapi/moonstreamapi/routes/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
get_query_by_name,
name_normalization,
query_parameter_hash,
check_user_resource_access,
)
from ..middleware import MoonstreamHTTPException
from ..settings import (
Expand Down Expand Up @@ -370,6 +371,8 @@ async def update_query_data_handler(
request: Request,
query_name: str = Path(..., description="Query name"),
request_update: data.UpdateDataRequest = Body(...),
customer_id: Optional[str] = Query(None),
instance_id: Optional[str] = Query(None),
) -> data.QueryPresignUrl:
"""
Request update data on S3 bucket
Expand All @@ -386,6 +389,26 @@ async def update_query_data_handler(
detail=f"Provided blockchain is not supported.",
)

json_payload = {}

is_customer_database = customer_id is not None and instance_id is not None

if is_customer_database:

results = check_user_resource_access(
customer_id=customer_id,
user_token=token,
)

if results is None:
raise MoonstreamHTTPException(
status_code=404,
detail="Not found customer",
)

json_payload["customer_id"] = customer_id
json_payload["instance_id"] = instance_id

# normalize query name

try:
Expand Down Expand Up @@ -436,8 +459,9 @@ async def update_query_data_handler(
raise MoonstreamHTTPException(status_code=500, internal_error=e)

### check tags

if "preapprove" in entry.tags or "approved" not in entry.tags:
if (
"preapprove" in entry.tags or "approved" not in entry.tags
) and not is_customer_database:
raise MoonstreamHTTPException(
status_code=403, detail="Query not approved yet."
)
Expand All @@ -456,17 +480,16 @@ async def update_query_data_handler(

if "ext:csv" in tags:
file_type = "csv"

json_payload["query"] = content
json_payload["params"] = request_update.params
json_payload["file_type"] = file_type
json_payload["blockchain"] = request_update.blockchain

try:
responce = requests.post(
f"{MOONSTREAM_CRAWLERS_SERVER_URL}:{MOONSTREAM_CRAWLERS_SERVER_PORT}/jobs/{query_id}/query_update",
json={
"query": content,
"params": request_update.params,
"file_type": file_type,
"blockchain": (
request_update.blockchain if request_update.blockchain else None
),
},
json=json_payload,
timeout=MOONSTREAM_INTERNAL_REQUEST_TIMEOUT_SECONDS,
)
except Exception as e:
Expand Down

0 comments on commit 79b464d

Please sign in to comment.