Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return jobs endpoint #876

Merged
merged 8 commits into from
Aug 31, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
201 changes: 123 additions & 78 deletions moonstreamapi/moonstreamapi/actions.py
Original file line number Diff line number Diff line change
Expand Up @@ -486,7 +486,7 @@ def get_all_entries_from_search(
limit=limit,
offset=offset,
)
results.extend(existing_methods.results) # type: ignore
results.extend(existing_methods.results) # type: ignore

if len(results) != existing_methods.total_results:
for offset in range(limit, existing_methods.total_results, limit):
Expand All @@ -499,7 +499,7 @@ def get_all_entries_from_search(
limit=limit,
offset=offset,
)
results.extend(existing_methods.results) # type: ignore
results.extend(existing_methods.results) # type: ignore

return results

Expand Down Expand Up @@ -783,6 +783,62 @@ def query_parameter_hash(params: Dict[str, Any]) -> str:
return hash


def parse_abi_to_name_tags(user_abi: List[Dict[str, Any]]):
return [
f"abi_name:{method['name']}"
for method in user_abi
if method["type"] in ("event", "function")
]


def filter_tasks(entries, tag_filters):
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why we need this func if we use it only in 1 place?

return [entry for entry in entries if any(tag in tag_filters for tag in entry.tags)]


def fetch_and_filter_tasks(
journal_id, address, subscription_type_id, token, user_abi, limit=100
) -> List[BugoutSearchResult]:
"""
Fetch tasks from journal and filter them by user abi
"""
entries = get_all_entries_from_search(
journal_id=journal_id,
search_query=f"tag:address:{address} tag:subscription_type:{subscription_type_id}",
limit=limit,
token=token,
)

user_loaded_abi_tags = parse_abi_to_name_tags(json.loads(user_abi))

moonworm_tasks = filter_tasks(entries, user_loaded_abi_tags)

return moonworm_tasks


def get_moonworm_tasks(
subscription_type_id: str,
address: str,
user_abi: List[Dict[str, Any]],
) -> List[BugoutSearchResult]:
"""
Get moonworm tasks from journal and filter them by user abi
"""

try:
moonworm_tasks = fetch_and_filter_tasks(
journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL,
address=address,
subscription_type_id=subscription_type_id,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_abi=user_abi,
)
except Exception as e:
logger.error(f"Error get moonworm tasks: {str(e)}")
MoonstreamHTTPException(status_code=500, internal_error=e)

return moonworm_tasks


def get_list_of_support_interfaces(
blockchain_type: AvailableBlockchainType,
address: str,
Expand All @@ -793,97 +849,86 @@ def get_list_of_support_interfaces(
Returns list of interfaces supported by given address
"""

_, _, is_contract = check_if_smart_contract(
blockchain_type=blockchain_type, address=address, user_token=user_token
)

if not is_contract:
raise AddressNotSmartContractException(f"Address not are smart contract")

web3_client = connect(blockchain_type, user_token=user_token)

contract = web3_client.eth.contract(
address=Web3.toChecksumAddress(address),
abi=supportsInterface_abi,
)

calls = []
try:
_, _, is_contract = check_if_smart_contract(
blockchain_type=blockchain_type, address=address, user_token=user_token
)

list_of_interfaces = list(selectors.keys())
if not is_contract:
raise AddressNotSmartContractException(f"Address not are smart contract")

list_of_interfaces.sort()
web3_client = connect(blockchain_type, user_token=user_token)

for interaface in list_of_interfaces:
calls.append(
(
contract.address,
FunctionSignature(contract.get_function_by_name("supportsInterface"))
.encode_data([bytes.fromhex(interaface)])
.hex(),
)
contract = web3_client.eth.contract(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would put this after line 868 if not is_contract: so we will not run additional operation if error will be raised

address=Web3.toChecksumAddress(address),
abi=supportsInterface_abi,
)

result = {}
result = {}

if blockchain_type in multicall_contracts:
calls = []
if blockchain_type in multicall_contracts:
calls = []

list_of_interfaces = list(selectors.keys())
list_of_interfaces = list(selectors.keys())

list_of_interfaces.sort()
list_of_interfaces.sort()

for interface in list_of_interfaces:
calls.append(
(
contract.address,
FunctionSignature(
contract.get_function_by_name("supportsInterface")
for interface in list_of_interfaces:
calls.append(
(
contract.address,
FunctionSignature(
contract.get_function_by_name("supportsInterface")
)
.encode_data([bytes.fromhex(interface)])
.hex(),
)
.encode_data([bytes.fromhex(interface)])
.hex(),
)
)

try:
multicall_result = multicall(
web3_client=web3_client,
blockchain_type=blockchain_type,
calls=calls,
method=multicall_method,
)
except Exception as e:
logger.error(f"Error while getting list of support interfaces: {e}")

for i, selector in enumerate(list_of_interfaces):
if multicall_result[i][0]:
supported = FunctionSignature(
contract.get_function_by_name("supportsInterface")
).decode_data(multicall_result[i][1])

if supported[0]:
result[selectors[selector]["name"]] = { # type: ignore
"selector": selector,
"abi": selectors[selector]["abi"], # type: ignore
}
try:
multicall_result = multicall(
web3_client=web3_client,
blockchain_type=blockchain_type,
calls=calls,
method=multicall_method,
)
except Exception as e:
logger.error(f"Error while getting list of support interfaces: {e}")

else:
general_interfaces = ["IERC165", "IERC721", "IERC1155", "IERC20"]
for i, selector in enumerate(list_of_interfaces):
if multicall_result[i][0]:
supported = FunctionSignature(
contract.get_function_by_name("supportsInterface")
).decode_data(multicall_result[i][1])

basic_selectors = {
interface["name"]: selector
for selector, interface in selectors.items()
if interface["name"] in general_interfaces
}
if supported[0]:
result[selectors[selector]["name"]] = { # type: ignore
"selector": selector,
"abi": selectors[selector]["abi"], # type: ignore
}

for selector_name in basic_selectors:
selector_result = contract.get_function_by_name("supportsInterface").call(
bytes.fromhex(selectors[selector_name])
)
if selector_result:
result[selector_name] = {
"selector": basic_selectors[selector_name],
"abi": selectors[selectors[selector_name]]["abi"],
}
else:
general_interfaces = ["IERC165", "IERC721", "IERC1155", "IERC20"]

basic_selectors = {
interface["name"]: selector
for selector, interface in selectors.items()
if interface["name"] in general_interfaces
}

for interface_name, selector in basic_selectors.items():
selector_result = contract.functions.supportsInterface(
bytes.fromhex(selector)
).call() # returns bool

if selector_result:
result[interface_name] = {
"selector": basic_selectors[interface_name],
"abi": selectors[basic_selectors[interface_name]]["abi"],
}
except Exception as err:
logger.error(f"Error while getting list of support interfaces: {err}")
MoonstreamHTTPException(status_code=500, internal_error=err)

return result

Expand Down
52 changes: 52 additions & 0 deletions moonstreamapi/moonstreamapi/routes/subscriptions.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
check_if_smart_contract,
get_entity_subscription_journal_id,
get_list_of_support_interfaces,
get_moonworm_tasks,
validate_abi_json,
)
from ..admin import subscription_types
Expand Down Expand Up @@ -642,6 +643,57 @@ async def list_subscription_types() -> data.SubscriptionTypesListResponse:
return data.SubscriptionTypesListResponse(subscription_types=results)


@router.get(
"/{subscription_id}/jobs",
tags=["subscriptions"],
response_model=List[BugoutSearchResult],
)
async def get_subscription_jobs_handler(
request: Request,
subscription_id: str = Path(...),
) -> Any:
token = request.state.token
user = request.state.user

try:
journal_id = get_entity_subscription_journal_id(
resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION,
token=MOONSTREAM_ADMIN_ACCESS_TOKEN,
user_id=user.id,
)

# get subscription entity
subscription_resource = bc.get_entity(
token=token,
journal_id=journal_id,
entity_id=subscription_id,
)

except EntityJournalNotFoundException as e:
raise MoonstreamHTTPException(
status_code=404,
detail="User subscriptions journal not found",
internal_error=e,
)
except Exception as e:
logger.error(f"Error get subscriptions for user ({user}), error: {str(e)}")
raise MoonstreamHTTPException(status_code=500, internal_error=e)

for field in subscription_resource.required_fields:
if "subscription_type_id" in field:
subscription_type_id = field["subscription_type_id"]

subscription_address = subscription_resource.address

get_moonworm_jobs_response = get_moonworm_tasks(
subscription_type_id=subscription_type_id,
address=subscription_address,
user_abi=subscription_resource.secondary_fields.get("abi") or [],
)

return get_moonworm_jobs_response


@router.get(
"/is_contract",
tags=["subscriptions"],
Expand Down
Loading