diff --git a/src/pieces/assets/assets_api.py b/src/pieces/assets/assets_api.py index a8004e58..93cfe74d 100644 --- a/src/pieces/assets/assets_api.py +++ b/src/pieces/assets/assets_api.py @@ -22,10 +22,13 @@ class AssetsCommandsApi: - assets_snapshot: Dict[str, Optional[Asset]] = {} # should be filled in the run in loop + _assets_snapshot: Dict[str, Optional[Asset]] = {} # should be filled in the run in loop asset_queue = queue.Queue() # Queue for asset_ids to be processed block = True # to wait for the queue to recevive the first asset id asset_set = set() # Set for asset_ids in the queue + worker_thread = None # Thread to run the worker function + _lock = threading.Lock() # Protects the thread from being started more than once + first_shot = True # First time to open the websocket @staticmethod def create_new_asset(raw_string, metadata=None): @@ -49,27 +52,33 @@ def create_new_asset(raw_string, metadata=None): created_asset = assets_api.assets_create_new_asset(transferables=False, seed=seed) return created_asset - @classmethod - def get_assets_snapshot(cls): - if cls.assets_snapshot: - return cls.assets_snapshot + + @property + def assets_snapshot(self) -> dict[str:Asset]: + if self._assets_snapshot: + return self._assets_snapshot + assets_api = AssetsApi(Settings.api_client) # Call the API to get assets identifiers api_response = assets_api.assets_identifiers_snapshot() # Extract the 'id' values from each item in the 'iterable' list - cls.assets_snapshot = {item.id:None for item in api_response.iterable} + self._assets_snapshot = {item.id:None for item in api_response.iterable} - return cls.assets_snapshot + return self._assets_snapshot @classmethod - def update_asset_snapshot(cls,asset_id): + def update_asset_snapshot(cls,asset_id) -> Optional[Asset]: asset_api = AssetApi(Settings.api_client) - asset = asset_api.asset_snapshot(asset_id) - cls.assets_snapshot[asset_id] = asset # Cache the asset - return asset + try: + asset = asset_api.asset_snapshot(asset_id) + cls._assets_snapshot[asset_id] = asset # Cache the assete + return asset + except Exception: + return None + @classmethod def worker(cls): @@ -87,26 +96,34 @@ def worker(cls): @classmethod def assets_snapshot_callback(cls,ids:StreamedIdentifiers): # Start the worker thread if it's not running - threading.Thread(target = cls.worker).start() cls.block = True + cls.create_thread() for item in ids.iterable: asset_id = item.asset.id - if asset_id not in cls.assets_snapshot: - cls.assets_snapshot[asset_id] = None - + if asset_id not in cls._assets_snapshot: + if not cls.first_shot: + cls._assets_snapshot = {asset_id:None,**cls.assets_snapshot} + else: + cls._assets_snapshot[asset_id] = None if asset_id not in cls.asset_set: if item.deleted: - cls.assets_snapshot.pop(asset_id) + cls._assets_snapshot.pop(asset_id) else: cls.asset_queue.put(asset_id) # Add asset_id to the queue cls.asset_set.add(asset_id) # Add asset_id to the set cls.block = False # Remove the block to end the thread - - + @classmethod + def create_thread(cls): + with cls._lock: + if cls.worker_thread: + if cls.worker_thread.is_alive(): + return + cls.worker_thread = threading.Thread(target = cls.worker) + cls.worker_thread.start() @classmethod def get_asset_snapshot(cls,asset_id:str): - asset = cls.assets_snapshot.get(asset_id) + asset = cls._assets_snapshot.get(asset_id) if asset: return asset else: @@ -140,7 +157,7 @@ def delete_asset_by_id(asset_id): try: response = delete_instance.assets_delete_asset(asset_id) return response - except Exception as e: + except: return f"Failed to delete {asset_id}" @staticmethod diff --git a/src/pieces/assets/assets_command.py b/src/pieces/assets/assets_command.py index 71f64982..d9693559 100644 --- a/src/pieces/assets/assets_command.py +++ b/src/pieces/assets/assets_command.py @@ -10,7 +10,7 @@ def check_assets_existence(func): """Decorator to ensure user has assets.""" def wrapper(*args, **kwargs): - assets = AssetsCommandsApi.get_assets_snapshot() # Check if there is an asset + assets = AssetsCommandsApi().assets_snapshot # Check if there is an asset if not assets: return show_error("No assets found", "Please create an asset first.") return func(*args, **kwargs) @@ -27,11 +27,10 @@ def wrapper(*args, **kwargs): return show_error("No asset selected.", "Please open an asset first using pieces open.") try: asset_data = AssetsCommandsApi.get_asset_snapshot(AssetsCommands.current_asset) - return func(asset_data=asset_data,*args, **kwargs) except: # The selected asset is deleted return show_error("Error occured in the command", "Please make sure the selected asset is valid.") - + return func(asset_data=asset_data,*args, **kwargs) return wrapper @@ -44,7 +43,7 @@ def open_asset(cls,**kwargs): item_index = kwargs.get('ITEM_INDEX',1) if not item_index: item_index = 1 - asset_ids = AssetsCommandsApi.get_assets_snapshot() + asset_ids = AssetsCommandsApi().assets_snapshot try: cls.current_asset = list(asset_ids.keys())[item_index-1] # because we begin from 1 except IndexError: @@ -128,7 +127,7 @@ def create_asset(cls,**kwargs): new_asset = AssetsCommandsApi.create_new_asset(raw_string=text, metadata=None) cls.current_asset = new_asset.id - print(f"Asset Created use 'open' to view") + print("Asset Created use 'open' to view") return new_asset # Add your saving logic here diff --git a/src/pieces/commands/cli_loop.py b/src/pieces/commands/cli_loop.py index 7fc25b52..f0d6d757 100644 --- a/src/pieces/commands/cli_loop.py +++ b/src/pieces/commands/cli_loop.py @@ -2,6 +2,7 @@ import platform import shlex from prompt_toolkit import PromptSession +from rich.console import Console from pieces import __version__ from pieces.gui import * @@ -94,4 +95,3 @@ def loop(**kwargs): except Exception as e: show_error(f"An error occurred:", {e}) #TODO: Handle by the argparser not a try/except - print() diff --git a/src/pieces/commands/list_command.py b/src/pieces/commands/list_command.py index 48d39a9f..e711fabb 100644 --- a/src/pieces/commands/list_command.py +++ b/src/pieces/commands/list_command.py @@ -53,7 +53,7 @@ def list_apps(): @staticmethod @check_assets_existence def list_assets(max_assets:int=10): - assets_snapshot = AssetsCommandsApi.assets_snapshot + assets_snapshot = AssetsCommandsApi().assets_snapshot for i, uuid in enumerate(list(assets_snapshot.keys())[:max_assets], start=1): asset = assets_snapshot[uuid] if not asset: diff --git a/src/pieces/copilot/ask_command.py b/src/pieces/copilot/ask_command.py index cc4ffa8d..35698ce2 100644 --- a/src/pieces/copilot/ask_command.py +++ b/src/pieces/copilot/ask_command.py @@ -29,7 +29,7 @@ def ask(query, **kwargs): files[idx] = os.path.abspath(file) # Return the abs path # snippets if snippets: - asset_ids = list(AssetsCommandsApi.get_assets_snapshot().keys()) + asset_ids = list(AssetsCommandsApi().assets_snapshot.keys()) for idx,snippet in enumerate(snippets): try: asset_id = asset_ids[snippet-1] # we began enumerating from 1 except KeyError: return show_error("Asset not found","Enter a vaild asset index") diff --git a/src/pieces/copilot/pieces_ask_websocket.py b/src/pieces/copilot/pieces_ask_websocket.py index 7d3a3e36..53e50926 100644 --- a/src/pieces/copilot/pieces_ask_websocket.py +++ b/src/pieces/copilot/pieces_ask_websocket.py @@ -26,7 +26,7 @@ def __init__(self): self.final_answer = "" self.conversation = None self.verbose = True - self.live = None + self.live = Live() self.message_compeleted = threading.Event() def open_websocket(self): @@ -42,24 +42,18 @@ def on_message(self,ws, message): response = QGPTStreamOutput.from_json(message) if response.question: answers = response.question.answers.iterable - if not self.live and self.verbose: # Create live instance if it doesn't exist - self.live = Live() - self.live.__enter__() # Enter the context manually + for answer in answers: text = answer.text self.final_answer += text if self.verbose and text: self.live.update(Markdown(self.final_answer)) - - if response.status == 'COMPLETED': if self.verbose: - self.live.update(Markdown(self.final_answer),refresh=True) + self.live.update(Markdown(self.final_answer), refresh=True) self.live.stop() - self.live = None - print("\n") - + self.conversation = response.conversation @@ -125,14 +119,15 @@ def ask_question(self, model_id,query,relevant={"iterable": []},verbose = True): """Ask a question using the websocket.""" self.final_answer = "" self.verbose = verbose + if verbose: + self.live = Live() + self.live.start(refresh=True) # Start the live context self.send_message(model_id,query,relevant) finishes = self.message_compeleted.wait(Settings.TIMEOUT) self.message_compeleted.clear() if not Settings.run_in_loop and self.is_connected: self.close_websocket_connection() self.verbose = True - if not finishes: - if self.live(): - self.live.stop() + if not finishes and not self.live: raise ConnectionError("Failed to get the reponse back") return self.final_answer diff --git a/src/pieces/settings.py b/src/pieces/settings.py index db0660fa..c441721a 100644 --- a/src/pieces/settings.py +++ b/src/pieces/settings.py @@ -32,7 +32,7 @@ class Settings: PIECES_OS_MIN_VERSION = "9.0.0" # Minium version (9.0.0) PIECES_OS_MAX_VERSION = "10.0.0" # Maxium version (10.0.0) - TIMEOUT = 10 # Websocket ask timeout + TIMEOUT = 20 # Websocket ask timeout # Build paths inside the project like this: os.path.join(BASE_DIR, ...) BASE_DIR = os.path.dirname(__file__) diff --git a/tests/open_and_save_command_test.py b/tests/open_and_save_command_test.py index e6969827..fde96b34 100644 --- a/tests/open_and_save_command_test.py +++ b/tests/open_and_save_command_test.py @@ -15,7 +15,7 @@ def test_open_command(self,ITEM_INDEX=None): sys.stdout = StringIO() - assets_length = len(AssetsCommandsApi.get_assets_snapshot()) + assets_length = len(AssetsCommandsApi().assets_snapshot) sys.stdout = StringIO() @@ -55,7 +55,7 @@ def test_save_command(self, mock_paste,mock_buildins): # Call create_asset to create a new asset to test on AssetsCommands.create_asset() # Create a hello world asset - AssetsCommandsApi.assets_snapshot = {AssetsCommands.current_asset:None} # Update the asset cache + AssetsCommandsApi().assets_snapshot = {AssetsCommands.current_asset:None} # Update the asset cache code_snippet_path = self.test_open_command(ITEM_INDEX=1) # Open the created asset