Skip to content

Commit

Permalink
Merge pull request #130 from pieces-app/fix-websocket
Browse files Browse the repository at this point in the history
fix ask websocket and assets issues
  • Loading branch information
bishoy-at-pieces authored Jun 3, 2024
2 parents 13b62e1 + 914d900 commit 7b0978a
Show file tree
Hide file tree
Showing 8 changed files with 55 additions and 44 deletions.
57 changes: 37 additions & 20 deletions src/pieces/assets/assets_api.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@


class AssetsCommandsApi:
assets_snapshot: Dict[str, Optional[Asset]] = {} # should be filled in the run in loop
_assets_snapshot: Dict[str, Optional[Asset]] = {} # should be filled in the run in loop
asset_queue = queue.Queue() # Queue for asset_ids to be processed
block = True # to wait for the queue to recevive the first asset id
asset_set = set() # Set for asset_ids in the queue
worker_thread = None # Thread to run the worker function
_lock = threading.Lock() # Protects the thread from being started more than once
first_shot = True # First time to open the websocket

@staticmethod
def create_new_asset(raw_string, metadata=None):
Expand All @@ -49,27 +52,33 @@ def create_new_asset(raw_string, metadata=None):
created_asset = assets_api.assets_create_new_asset(transferables=False, seed=seed)
return created_asset

@classmethod
def get_assets_snapshot(cls):
if cls.assets_snapshot:
return cls.assets_snapshot


@property
def assets_snapshot(self) -> dict[str:Asset]:
if self._assets_snapshot:
return self._assets_snapshot


assets_api = AssetsApi(Settings.api_client)
# Call the API to get assets identifiers
api_response = assets_api.assets_identifiers_snapshot()

# Extract the 'id' values from each item in the 'iterable' list
cls.assets_snapshot = {item.id:None for item in api_response.iterable}
self._assets_snapshot = {item.id:None for item in api_response.iterable}

return cls.assets_snapshot
return self._assets_snapshot

@classmethod
def update_asset_snapshot(cls,asset_id):
def update_asset_snapshot(cls,asset_id) -> Optional[Asset]:
asset_api = AssetApi(Settings.api_client)
asset = asset_api.asset_snapshot(asset_id)
cls.assets_snapshot[asset_id] = asset # Cache the asset
return asset
try:
asset = asset_api.asset_snapshot(asset_id)
cls._assets_snapshot[asset_id] = asset # Cache the assete
return asset
except Exception:
return None


@classmethod
def worker(cls):
Expand All @@ -87,26 +96,34 @@ def worker(cls):
@classmethod
def assets_snapshot_callback(cls,ids:StreamedIdentifiers):
# Start the worker thread if it's not running
threading.Thread(target = cls.worker).start()
cls.block = True
cls.create_thread()
for item in ids.iterable:
asset_id = item.asset.id
if asset_id not in cls.assets_snapshot:
cls.assets_snapshot[asset_id] = None

if asset_id not in cls._assets_snapshot:
if not cls.first_shot:
cls._assets_snapshot = {asset_id:None,**cls.assets_snapshot}
else:
cls._assets_snapshot[asset_id] = None
if asset_id not in cls.asset_set:
if item.deleted:
cls.assets_snapshot.pop(asset_id)
cls._assets_snapshot.pop(asset_id)
else:
cls.asset_queue.put(asset_id) # Add asset_id to the queue
cls.asset_set.add(asset_id) # Add asset_id to the set
cls.block = False # Remove the block to end the thread


@classmethod
def create_thread(cls):
with cls._lock:
if cls.worker_thread:
if cls.worker_thread.is_alive():
return
cls.worker_thread = threading.Thread(target = cls.worker)
cls.worker_thread.start()

@classmethod
def get_asset_snapshot(cls,asset_id:str):
asset = cls.assets_snapshot.get(asset_id)
asset = cls._assets_snapshot.get(asset_id)
if asset:
return asset
else:
Expand Down Expand Up @@ -140,7 +157,7 @@ def delete_asset_by_id(asset_id):
try:
response = delete_instance.assets_delete_asset(asset_id)
return response
except Exception as e:
except:
return f"Failed to delete {asset_id}"

@staticmethod
Expand Down
9 changes: 4 additions & 5 deletions src/pieces/assets/assets_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
def check_assets_existence(func):
"""Decorator to ensure user has assets."""
def wrapper(*args, **kwargs):
assets = AssetsCommandsApi.get_assets_snapshot() # Check if there is an asset
assets = AssetsCommandsApi().assets_snapshot # Check if there is an asset
if not assets:
return show_error("No assets found", "Please create an asset first.")
return func(*args, **kwargs)
Expand All @@ -27,11 +27,10 @@ def wrapper(*args, **kwargs):
return show_error("No asset selected.", "Please open an asset first using pieces open.")
try:
asset_data = AssetsCommandsApi.get_asset_snapshot(AssetsCommands.current_asset)
return func(asset_data=asset_data,*args, **kwargs)
except:
# The selected asset is deleted
return show_error("Error occured in the command", "Please make sure the selected asset is valid.")

return func(asset_data=asset_data,*args, **kwargs)

return wrapper

Expand All @@ -44,7 +43,7 @@ def open_asset(cls,**kwargs):
item_index = kwargs.get('ITEM_INDEX',1)
if not item_index:
item_index = 1
asset_ids = AssetsCommandsApi.get_assets_snapshot()
asset_ids = AssetsCommandsApi().assets_snapshot
try:
cls.current_asset = list(asset_ids.keys())[item_index-1] # because we begin from 1
except IndexError:
Expand Down Expand Up @@ -128,7 +127,7 @@ def create_asset(cls,**kwargs):
new_asset = AssetsCommandsApi.create_new_asset(raw_string=text, metadata=None)

cls.current_asset = new_asset.id
print(f"Asset Created use 'open' to view")
print("Asset Created use 'open' to view")

return new_asset
# Add your saving logic here
Expand Down
2 changes: 1 addition & 1 deletion src/pieces/commands/cli_loop.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
import platform
import shlex
from prompt_toolkit import PromptSession
from rich.console import Console

from pieces import __version__
from pieces.gui import *
Expand Down Expand Up @@ -94,4 +95,3 @@ def loop(**kwargs):
except Exception as e:
show_error(f"An error occurred:", {e}) #TODO: Handle by the argparser not a try/except

print()
2 changes: 1 addition & 1 deletion src/pieces/commands/list_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ def list_apps():
@staticmethod
@check_assets_existence
def list_assets(max_assets:int=10):
assets_snapshot = AssetsCommandsApi.assets_snapshot
assets_snapshot = AssetsCommandsApi().assets_snapshot
for i, uuid in enumerate(list(assets_snapshot.keys())[:max_assets], start=1):
asset = assets_snapshot[uuid]
if not asset:
Expand Down
2 changes: 1 addition & 1 deletion src/pieces/copilot/ask_command.py
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ def ask(query, **kwargs):
files[idx] = os.path.abspath(file) # Return the abs path
# snippets
if snippets:
asset_ids = list(AssetsCommandsApi.get_assets_snapshot().keys())
asset_ids = list(AssetsCommandsApi().assets_snapshot.keys())
for idx,snippet in enumerate(snippets):
try: asset_id = asset_ids[snippet-1] # we began enumerating from 1
except KeyError: return show_error("Asset not found","Enter a vaild asset index")
Expand Down
21 changes: 8 additions & 13 deletions src/pieces/copilot/pieces_ask_websocket.py
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ def __init__(self):
self.final_answer = ""
self.conversation = None
self.verbose = True
self.live = None
self.live = Live()
self.message_compeleted = threading.Event()

def open_websocket(self):
Expand All @@ -42,24 +42,18 @@ def on_message(self,ws, message):
response = QGPTStreamOutput.from_json(message)
if response.question:
answers = response.question.answers.iterable
if not self.live and self.verbose: # Create live instance if it doesn't exist
self.live = Live()
self.live.__enter__() # Enter the context manually

for answer in answers:
text = answer.text
self.final_answer += text
if self.verbose and text:
self.live.update(Markdown(self.final_answer))



if response.status == 'COMPLETED':
if self.verbose:
self.live.update(Markdown(self.final_answer),refresh=True)
self.live.update(Markdown(self.final_answer), refresh=True)
self.live.stop()
self.live = None
print("\n")



self.conversation = response.conversation

Expand Down Expand Up @@ -125,14 +119,15 @@ def ask_question(self, model_id,query,relevant={"iterable": []},verbose = True):
"""Ask a question using the websocket."""
self.final_answer = ""
self.verbose = verbose
if verbose:
self.live = Live()
self.live.start(refresh=True) # Start the live context
self.send_message(model_id,query,relevant)
finishes = self.message_compeleted.wait(Settings.TIMEOUT)
self.message_compeleted.clear()
if not Settings.run_in_loop and self.is_connected:
self.close_websocket_connection()
self.verbose = True
if not finishes:
if self.live():
self.live.stop()
if not finishes and not self.live:
raise ConnectionError("Failed to get the reponse back")
return self.final_answer
2 changes: 1 addition & 1 deletion src/pieces/settings.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ class Settings:
PIECES_OS_MIN_VERSION = "9.0.0" # Minium version (9.0.0)
PIECES_OS_MAX_VERSION = "10.0.0" # Maxium version (10.0.0)

TIMEOUT = 10 # Websocket ask timeout
TIMEOUT = 20 # Websocket ask timeout

# Build paths inside the project like this: os.path.join(BASE_DIR, ...)
BASE_DIR = os.path.dirname(__file__)
Expand Down
4 changes: 2 additions & 2 deletions tests/open_and_save_command_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ def test_open_command(self,ITEM_INDEX=None):
sys.stdout = StringIO()


assets_length = len(AssetsCommandsApi.get_assets_snapshot())
assets_length = len(AssetsCommandsApi().assets_snapshot)


sys.stdout = StringIO()
Expand Down Expand Up @@ -55,7 +55,7 @@ def test_save_command(self, mock_paste,mock_buildins):
# Call create_asset to create a new asset to test on
AssetsCommands.create_asset() # Create a hello world asset

AssetsCommandsApi.assets_snapshot = {AssetsCommands.current_asset:None} # Update the asset cache
AssetsCommandsApi().assets_snapshot = {AssetsCommands.current_asset:None} # Update the asset cache

code_snippet_path = self.test_open_command(ITEM_INDEX=1) # Open the created asset

Expand Down

0 comments on commit 7b0978a

Please sign in to comment.