From 12135849e14c9e60a555c22d6e87cd55ee569849 Mon Sep 17 00:00:00 2001 From: "Dr.Blank" <64108942+Dr-Blank@users.noreply.github.com> Date: Thu, 12 Oct 2023 05:12:24 -0400 Subject: [PATCH] add a crypto solver for the module (#3) --- lrclib/api.py | 24 +++-- lrclib/cryptographic_challenge_solver.py | 119 +++++++++++++++-------- tests/test_api_with_mock.py | 21 ++-- tests/test_crypto_solver.py | 23 +++++ 4 files changed, 120 insertions(+), 67 deletions(-) create mode 100644 tests/test_crypto_solver.py diff --git a/lrclib/api.py b/lrclib/api.py index 9482a00..1848d93 100644 --- a/lrclib/api.py +++ b/lrclib/api.py @@ -1,5 +1,6 @@ """ API for lrclib""" +import os import warnings from typing import Any, Dict, Optional @@ -13,11 +14,7 @@ RateLimitError, ServerError, ) -from .models import ( - CryptographicChallenge, - Lyrics, - SearchResult, -) +from .models import CryptographicChallenge, Lyrics, SearchResult BASE_URL = "https://lrclib.net/api" ENDPOINTS: Dict[str, str] = { @@ -44,10 +41,8 @@ def __init__( if not user_agent: warnings.warn( - ( - "Missing user agent, please set it with the `user_agent`" - " argument" - ), + "Missing user agent" + + "please set it with the `user_agent` argument", UserWarning, ) else: @@ -187,16 +182,19 @@ def request_challenge(self) -> CryptographicChallenge: raise exc return CryptographicChallenge.from_dict(response.json()) - def obtain_publish_token(self) -> str: + def _obtain_publish_token(self) -> str: """ Obtain a Publish Token for submitting lyrics to LRCLIB. :return: A Publish Token :rtype: str """ + + num_threads = os.cpu_count() or 1 challenge = self.request_challenge() - solver = CryptoChallengeSolver() - nonce = solver.solve_challenge(challenge.prefix, challenge.target) + nonce = CryptoChallengeSolver.solve( + challenge.prefix, challenge.target, num_threads=num_threads + ) return f"{challenge.prefix}:{nonce}" def publish_lyrics( # pylint: disable=too-many-arguments @@ -235,7 +233,7 @@ def publish_lyrics( # pylint: disable=too-many-arguments endpoint = ENDPOINTS["publish"] if not publish_token: - publish_token = self.obtain_publish_token() + publish_token = self._obtain_publish_token() headers = {"X-Publish-Token": publish_token} data = { diff --git a/lrclib/cryptographic_challenge_solver.py b/lrclib/cryptographic_challenge_solver.py index 27b88d6..e77c1c5 100644 --- a/lrclib/cryptographic_challenge_solver.py +++ b/lrclib/cryptographic_challenge_solver.py @@ -2,56 +2,97 @@ # https://github.com/tranxuanthang/lrcget/blob/main/src-tauri/src/lrclib/challenge_solver.rs import hashlib -from typing import List +import threading +from dataclasses import dataclass +from typing import List, Optional -class CryptoChallengeSolver: +@dataclass +class Solution: + """Class for storing the solution of a cryptographic challenge.""" + + prefix: str + target_hex: str + nonce: Optional[int] = None + + @property + def is_solved(self) -> bool: + """Check if the challenge is solved.""" + return self.nonce is not None + + +def is_nonce_valid(prefix: str, nonce: int | str, target: bytes) -> bool: + """Check if the given nonce satisfies the target hash. + + :param prefix: The prefix string of the challenge. + :param nonce: The nonce to check. + :param target: The target hash in bytes format. + :return: True if the nonce satisfies the target, False otherwise. """ - A class that provides a method to solve a cryptographic challenge. + message = f"{prefix}{nonce}".encode() + hash_value = hashlib.sha256(message).digest() + return hash_value < target + + +def find_nonce( + prefix: str, + target: bytes, + solution: Optional[Solution] = None, + start: int = 0, + step: int = 1, +) -> Solution: + """Find the nonce that satisfies the target hash. + + :param prefix: The prefix string of the challenge. + :param target: The target hash in bytes format. + :param start: The starting nonce value. + :param step: The step size for incrementing the nonce. + :param solution: The solution object to store the valid nonce. + :return: The solution object. """ + if solution is None: + solution = Solution(prefix, target.hex()) - @staticmethod - def verify_nonce(result: List[int], target: List[int]) -> bool: - """ - Verify if the result nonce satisfies the target. + nonce = start + while not solution.is_solved: + if is_nonce_valid(prefix, nonce, target): + solution.nonce = nonce + print(f"Found nonce: {nonce}") + break + nonce += step - :param result: The nonce to verify. - :param target: The target to satisfy. - :return: True if the nonce satisfies the target, False otherwise. - """ - if len(result) != len(target): - return False + return solution - for res, tar in zip(result, target): - if res > tar: - return False - if res < tar: - break - return True +class CryptoChallengeSolver: + """Class for solving cryptographic challenges.""" - @classmethod - def solve_challenge(cls, prefix: str, target_hex: str) -> str: - """ - Solve a cryptographic challenge by finding a nonce that satisfies the \ - target. + @staticmethod + def solve(prefix: str, target_hex: str, num_threads: int = 1): + """Solve the cryptographic challenge. :param prefix: The prefix string of the challenge. :param target_hex: The target hash in hexadecimal format. - :return: The nonce that satisfies the target. + :param num_threads: The number of threads to use. + :return: The smallest nonce that satisfies the target. """ - raise NotImplementedError + target = bytes.fromhex(target_hex) + step = num_threads + threads: List[threading.Thread] = [] + solution = Solution(prefix, target.hex()) - @staticmethod - def _solve_challenge_for_nonce(args): - prefix, target, nonce = args - input_str = f"{prefix}{nonce}" - input_bytes = input_str.encode("utf-8") - context = hashlib.sha256(input_bytes) - hashed = context.digest() - - result = CryptoChallengeSolver.verify_nonce(list(hashed), list(target)) - if result: - return nonce - - return None + for i in range(num_threads): + start = i + thread = threading.Thread( + target=find_nonce, + args=(prefix, target, solution, start, step), + ) + threads.append(thread) + + for thread in threads: + thread.start() + + for thread in threads: + thread.join() + + return str(solution.nonce) diff --git a/tests/test_api_with_mock.py b/tests/test_api_with_mock.py index 463f2cf..fb5e0de 100644 --- a/tests/test_api_with_mock.py +++ b/tests/test_api_with_mock.py @@ -1,23 +1,14 @@ -from unittest.mock import Mock import warnings +from unittest.mock import Mock import pytest from requests import HTTPError, Response from lrclib.api import BASE_URL, ENDPOINTS, LrcLibAPI -from lrclib.exceptions import ( - APIError, - IncorrectPublishTokenError, - NotFoundError, - RateLimitError, - ServerError, -) -from lrclib.models import ( - Lyrics, - LyricsMinimal, - SearchResult, - CryptographicChallenge, -) +from lrclib.exceptions import (APIError, IncorrectPublishTokenError, + NotFoundError, RateLimitError, ServerError) +from lrclib.models import (CryptographicChallenge, Lyrics, LyricsMinimal, + SearchResult) @pytest.fixture(scope="module") @@ -190,7 +181,7 @@ def test_publish_lyrics(api: LrcLibAPI) -> None: # Set the session object and obtain_publish_token method of the LrcLibAPI instance to the mock objects api.session = session_mock - api.obtain_publish_token = api_mock.obtain_publish_token + api._obtain_publish_token = api_mock.obtain_publish_token # Call the publish_lyrics method result = api.publish_lyrics( diff --git a/tests/test_crypto_solver.py b/tests/test_crypto_solver.py new file mode 100644 index 0000000..eafd39d --- /dev/null +++ b/tests/test_crypto_solver.py @@ -0,0 +1,23 @@ +import random +import string + +from lrclib.cryptographic_challenge_solver import ( + CryptoChallengeSolver, + is_nonce_valid, +) + +easy_target_hex = ( + "0000FFF000000000000000000000000000000000000000000000000000000000" +) + + +# generate a random prefixes +def random_prefix() -> str: + return "".join(random.choice(string.ascii_letters) for _ in range(10)) + + +def test_solve_random() -> None: + prefix = random_prefix() + target_hex = easy_target_hex + nonce = CryptoChallengeSolver.solve(prefix, target_hex, 4) + assert is_nonce_valid(prefix, nonce, bytes.fromhex(target_hex))