diff --git a/moonstreamapi/moonstreamapi/admin/cli.py b/moonstreamapi/moonstreamapi/admin/cli.py index 422aa5a7..a0f82863 100644 --- a/moonstreamapi/moonstreamapi/admin/cli.py +++ b/moonstreamapi/moonstreamapi/admin/cli.py @@ -315,6 +315,33 @@ def create_v3_task_handler(args: argparse.Namespace) -> None: ) +def delete_v3_task_handler(args: argparse.Namespace) -> None: + + tasks = moonworm_tasks.get_v3_tasks( + user_id=args.user_id, + customer_id=args.customer_id, + blockchain=args.blockchain, + address=args.address, + ) + + tasks_dict_output: Dict[str, int] = {} + for task in tasks: + if task.chain not in tasks_dict_output: + tasks_dict_output[task.chain] = 0 + tasks_dict_output[task.chain] += 1 + + print("Found:") + for k, v in tasks_dict_output.items(): + print(f"- {k} - {v} tasks") + + response = input(f"Delete {len(tasks)} tasks? (yes/y): ").strip().lower() + if response != "yes" and response != "y": + logger.warning("Canceled") + return + + moonworm_tasks.delete_v3_tasks(tasks=tasks) + + def main() -> None: cli_description = f"""Moonstream Admin CLI @@ -598,8 +625,8 @@ def main() -> None: parser_moonworm_tasks_migrate.set_defaults(func=moonworm_tasks_v3_migrate) parser_moonworm_tasks_v3_create = subcommands_moonworm_tasks.add_parser( - "create_v3_tasks", - description="Create v3 tasks from v2 tasks", + "create-v3-tasks", + description="Create new v3 tasks", ) parser_moonworm_tasks_v3_create.add_argument( @@ -639,6 +666,37 @@ def main() -> None: parser_moonworm_tasks_v3_create.set_defaults(func=create_v3_task_handler) + parser_moonworm_tasks_v3_delete = subcommands_moonworm_tasks.add_parser( + "delete-v3-tasks", + description="Delete v3 tasks", + ) + + parser_moonworm_tasks_v3_delete.add_argument( + "--user-id", + type=uuid_type, + help="The user ID of which we wish to delete the task", + ) + + parser_moonworm_tasks_v3_delete.add_argument( + "--customer-id", + type=uuid_type, + help="The customer ID of which we wish to delete the task", + ) + + parser_moonworm_tasks_v3_delete.add_argument( + "--blockchain", + type=str, + help="Blockchain name", + ) + + parser_moonworm_tasks_v3_delete.add_argument( + "--address", + type=str, + help="Contract address", + ) + + parser_moonworm_tasks_v3_delete.set_defaults(func=delete_v3_task_handler) + queries_parser = subcommands.add_parser( "queries", description="Manage Moonstream queries" ) diff --git a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py index f6dd1dd5..bbaec2d5 100644 --- a/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py +++ b/moonstreamapi/moonstreamapi/admin/moonworm_tasks.py @@ -174,6 +174,70 @@ def create_v3_task( return None +def get_v3_tasks( + customer_id: Optional[str] = None, + user_id: Optional[str] = None, + address: Optional[str] = None, + blockchain: Optional[str] = None, +) -> List[AbiJobs]: + """ + Get moonworm v3 tasks. + """ + if ( + customer_id is None + and user_id is None + and address is None + and blockchain is None + ): + raise Exception( + "At least one of customer_id, or user_id, or address, or blockchain should be set" + ) + + db_engine = MoonstreamDBIndexesEngine() + + with db_engine.yield_db_session_ctx() as db_session_v3: + query = db_session_v3.query(AbiJobs) + + if customer_id is not None: + query = query.filter(AbiJobs.customer_id == customer_id) + if user_id is not None: + query = query.filter(AbiJobs.user_id == user_id) + if address is not None: + query = query.filter(AbiJobs.address == bytes.fromhex(address[2:])) + if blockchain is not None: + query = query.filter(AbiJobs.chain == blockchain) + + try: + tasks = query.all() + except Exception as e: + logger.error(f"Error selecting tasks, err: {str(e)}") + raise e + + return tasks + + +def delete_v3_tasks(tasks: List[AbiJobs]) -> None: + tasks_len = len(tasks) + if tasks_len == 0: + raise Exception("No tasks to delete") + + db_engine = MoonstreamDBIndexesEngine() + + with db_engine.yield_db_session_ctx() as db_session_v3: + try: + for task in tasks: + db_session_v3.delete(task) + pass + + db_session_v3.commit() + except Exception as e: + logger.error(f"Error delete tasks: {str(e)}") + db_session_v3.rollback() + raise e + + logger.info(f"Deleted {tasks_len} tasks") + + def migrate_v3_tasks( user_id: UUID, customer_id: UUID, blockchain: Optional[str] = None ):