diff --git a/moonstreamapi/moonstreamapi/actions.py b/moonstreamapi/moonstreamapi/actions.py index 16b6279c6..13d4d0e6a 100644 --- a/moonstreamapi/moonstreamapi/actions.py +++ b/moonstreamapi/moonstreamapi/actions.py @@ -486,7 +486,7 @@ def get_all_entries_from_search( limit=limit, offset=offset, ) - results.extend(existing_methods.results) # type: ignore + results.extend(existing_methods.results) # type: ignore if len(results) != existing_methods.total_results: for offset in range(limit, existing_methods.total_results, limit): @@ -499,7 +499,7 @@ def get_all_entries_from_search( limit=limit, offset=offset, ) - results.extend(existing_methods.results) # type: ignore + results.extend(existing_methods.results) # type: ignore return results @@ -783,6 +783,62 @@ def query_parameter_hash(params: Dict[str, Any]) -> str: return hash +def parse_abi_to_name_tags(user_abi: List[Dict[str, Any]]): + return [ + f"abi_name:{method['name']}" + for method in user_abi + if method["type"] in ("event", "function") + ] + + +def filter_tasks(entries, tag_filters): + return [entry for entry in entries if any(tag in tag_filters for tag in entry.tags)] + + +def fetch_and_filter_tasks( + journal_id, address, subscription_type_id, token, user_abi, limit=100 +) -> List[BugoutSearchResult]: + """ + Fetch tasks from journal and filter them by user abi + """ + entries = get_all_entries_from_search( + journal_id=journal_id, + search_query=f"tag:address:{address} tag:subscription_type:{subscription_type_id}", + limit=limit, + token=token, + ) + + user_loaded_abi_tags = parse_abi_to_name_tags(json.loads(user_abi)) + + moonworm_tasks = filter_tasks(entries, user_loaded_abi_tags) + + return moonworm_tasks + + +def get_moonworm_tasks( + subscription_type_id: str, + address: str, + user_abi: List[Dict[str, Any]], +) -> List[BugoutSearchResult]: + """ + Get moonworm tasks from journal and filter them by user abi + """ + + try: + moonworm_tasks = fetch_and_filter_tasks( + journal_id=MOONSTREAM_MOONWORM_TASKS_JOURNAL, + address=address, + subscription_type_id=subscription_type_id, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + user_abi=user_abi, + ) + except Exception as e: + logger.error(f"Error get moonworm tasks: {str(e)}") + MoonstreamHTTPException(status_code=500, internal_error=e) + + return moonworm_tasks + + def get_list_of_support_interfaces( blockchain_type: AvailableBlockchainType, address: str, @@ -793,97 +849,86 @@ def get_list_of_support_interfaces( Returns list of interfaces supported by given address """ - _, _, is_contract = check_if_smart_contract( - blockchain_type=blockchain_type, address=address, user_token=user_token - ) - - if not is_contract: - raise AddressNotSmartContractException(f"Address not are smart contract") - - web3_client = connect(blockchain_type, user_token=user_token) - - contract = web3_client.eth.contract( - address=Web3.toChecksumAddress(address), - abi=supportsInterface_abi, - ) - - calls = [] + try: + _, _, is_contract = check_if_smart_contract( + blockchain_type=blockchain_type, address=address, user_token=user_token + ) - list_of_interfaces = list(selectors.keys()) + if not is_contract: + raise AddressNotSmartContractException(f"Address not are smart contract") - list_of_interfaces.sort() + web3_client = connect(blockchain_type, user_token=user_token) - for interaface in list_of_interfaces: - calls.append( - ( - contract.address, - FunctionSignature(contract.get_function_by_name("supportsInterface")) - .encode_data([bytes.fromhex(interaface)]) - .hex(), - ) + contract = web3_client.eth.contract( + address=Web3.toChecksumAddress(address), + abi=supportsInterface_abi, ) - result = {} + result = {} - if blockchain_type in multicall_contracts: - calls = [] + if blockchain_type in multicall_contracts: + calls = [] - list_of_interfaces = list(selectors.keys()) + list_of_interfaces = list(selectors.keys()) - list_of_interfaces.sort() + list_of_interfaces.sort() - for interface in list_of_interfaces: - calls.append( - ( - contract.address, - FunctionSignature( - contract.get_function_by_name("supportsInterface") + for interface in list_of_interfaces: + calls.append( + ( + contract.address, + FunctionSignature( + contract.get_function_by_name("supportsInterface") + ) + .encode_data([bytes.fromhex(interface)]) + .hex(), ) - .encode_data([bytes.fromhex(interface)]) - .hex(), ) - ) - try: - multicall_result = multicall( - web3_client=web3_client, - blockchain_type=blockchain_type, - calls=calls, - method=multicall_method, - ) - except Exception as e: - logger.error(f"Error while getting list of support interfaces: {e}") - - for i, selector in enumerate(list_of_interfaces): - if multicall_result[i][0]: - supported = FunctionSignature( - contract.get_function_by_name("supportsInterface") - ).decode_data(multicall_result[i][1]) - - if supported[0]: - result[selectors[selector]["name"]] = { # type: ignore - "selector": selector, - "abi": selectors[selector]["abi"], # type: ignore - } + try: + multicall_result = multicall( + web3_client=web3_client, + blockchain_type=blockchain_type, + calls=calls, + method=multicall_method, + ) + except Exception as e: + logger.error(f"Error while getting list of support interfaces: {e}") - else: - general_interfaces = ["IERC165", "IERC721", "IERC1155", "IERC20"] + for i, selector in enumerate(list_of_interfaces): + if multicall_result[i][0]: + supported = FunctionSignature( + contract.get_function_by_name("supportsInterface") + ).decode_data(multicall_result[i][1]) - basic_selectors = { - interface["name"]: selector - for selector, interface in selectors.items() - if interface["name"] in general_interfaces - } + if supported[0]: + result[selectors[selector]["name"]] = { # type: ignore + "selector": selector, + "abi": selectors[selector]["abi"], # type: ignore + } - for selector_name in basic_selectors: - selector_result = contract.get_function_by_name("supportsInterface").call( - bytes.fromhex(selectors[selector_name]) - ) - if selector_result: - result[selector_name] = { - "selector": basic_selectors[selector_name], - "abi": selectors[selectors[selector_name]]["abi"], - } + else: + general_interfaces = ["IERC165", "IERC721", "IERC1155", "IERC20"] + + basic_selectors = { + interface["name"]: selector + for selector, interface in selectors.items() + if interface["name"] in general_interfaces + } + + for interface_name, selector in basic_selectors.items(): + selector_result = contract.functions.supportsInterface( + bytes.fromhex(selector) + ).call() # returns bool + + if selector_result: + result[interface_name] = { + "selector": basic_selectors[interface_name], + "abi": selectors[basic_selectors[interface_name]]["abi"], + } + except Exception as err: + logger.error(f"Error while getting list of support interfaces: {err}") + MoonstreamHTTPException(status_code=500, internal_error=err) return result diff --git a/moonstreamapi/moonstreamapi/routes/subscriptions.py b/moonstreamapi/moonstreamapi/routes/subscriptions.py index 9e8ae77c8..af4977af4 100644 --- a/moonstreamapi/moonstreamapi/routes/subscriptions.py +++ b/moonstreamapi/moonstreamapi/routes/subscriptions.py @@ -21,6 +21,7 @@ check_if_smart_contract, get_entity_subscription_journal_id, get_list_of_support_interfaces, + get_moonworm_tasks, validate_abi_json, ) from ..admin import subscription_types @@ -642,6 +643,57 @@ async def list_subscription_types() -> data.SubscriptionTypesListResponse: return data.SubscriptionTypesListResponse(subscription_types=results) +@router.get( + "/{subscription_id}/jobs", + tags=["subscriptions"], + response_model=List[BugoutSearchResult], +) +async def get_subscription_jobs_handler( + request: Request, + subscription_id: str = Path(...), +) -> Any: + token = request.state.token + user = request.state.user + + try: + journal_id = get_entity_subscription_journal_id( + resource_type=BUGOUT_RESOURCE_TYPE_ENTITY_SUBSCRIPTION, + token=MOONSTREAM_ADMIN_ACCESS_TOKEN, + user_id=user.id, + ) + + # get subscription entity + subscription_resource = bc.get_entity( + token=token, + journal_id=journal_id, + entity_id=subscription_id, + ) + + except EntityJournalNotFoundException as e: + raise MoonstreamHTTPException( + status_code=404, + detail="User subscriptions journal not found", + internal_error=e, + ) + except Exception as e: + logger.error(f"Error get subscriptions for user ({user}), error: {str(e)}") + raise MoonstreamHTTPException(status_code=500, internal_error=e) + + for field in subscription_resource.required_fields: + if "subscription_type_id" in field: + subscription_type_id = field["subscription_type_id"] + + subscription_address = subscription_resource.address + + get_moonworm_jobs_response = get_moonworm_tasks( + subscription_type_id=subscription_type_id, + address=subscription_address, + user_abi=subscription_resource.secondary_fields.get("abi") or [], + ) + + return get_moonworm_jobs_response + + @router.get( "/is_contract", tags=["subscriptions"],