diff --git a/README.md b/README.md index 18f7d2e9..3eb7a4bc 100644 --- a/README.md +++ b/README.md @@ -199,6 +199,29 @@ To activate: 6. Go to playground and start a new session, select the 'Recipes data Analysis' workflow 7. Ask 'What is the total population of Mali?' +# Evaluation with Prompt Flow + +First, you will need to build the environment to include Prompt Flow ... + +`docker compose -f docker-compose.yml -f docker-compose-dev.yml up -d --build` + +Then ... + +1. Install the DevContainers VSCode extension +2. Build data recipes using the `docker compose` command mentioned above +3. Open the command palette in VSCode (CMD + Shift + P on Mac; CTRL + Shift + P on Windows) and select + + `Dev Containers: Attach to remote container`. + + Select the promptflow container. This opens a new VSCode window - use it for the next steps. +4. Install Promptflow add-in +5. Open folder `/app` +6. Click on `flow.dag.yaml` +7. Top left of main pane, click on 'Visual editor' +8. On bottom left under connections, configure an Azure OpenAI connection called 'azure_openai' +9. On the Groundedness node, select your new connection +10. You can no run by clicking the play icon. See Promptflow documentation for more details + # Deployment We will add more details here soon, for now, here are some notes on Azure ... diff --git a/docker-compose-dev.yml b/docker-compose-dev.yml new file mode 100644 index 00000000..fabd2495 --- /dev/null +++ b/docker-compose-dev.yml @@ -0,0 +1,21 @@ +#version: "3.4" + +services: + promptflow: + #image: mcr.microsoft.com/azureml/promptflow/promptflow-runtime-stable:latest + build: + context: . + dockerfile: ./flows/chainlit-ui-evaluation//Dockerfile + container_name: recipes-ai-promptflow + env_file: + - .env + volumes: + - ./flows:/app + - ./utils:/app/chainlit-ui-evaluation/utils + - ./templates:/app/chainlit-ui-evaluation/templates + - shared-data:/app/chainlit-ui-evaluation/recipes/public + - ./management/skills.py:/app/chainlit-ui-evaluation/recipes/skills.py + - ./ui/chat-chainlit-assistant/app.py:/app/chainlit-ui-evaluation/app.py +volumes: + pgdata2: + shared-data: \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index 9adbef07..f6cb9e43 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -140,7 +140,6 @@ services: - ./utils:/app/utils - ./templates:/app/templates - ./db/recipedb:/app/db - volumes: pgdata2: shared-data: \ No newline at end of file diff --git a/flows/chainlit-ui-evaluation/Dockerfile b/flows/chainlit-ui-evaluation/Dockerfile new file mode 100644 index 00000000..42b6b36f --- /dev/null +++ b/flows/chainlit-ui-evaluation/Dockerfile @@ -0,0 +1,6 @@ +FROM mcr.microsoft.com/azureml/promptflow/promptflow-runtime-stable:latest + +# No need to copy the app code, we mount via docker-compose-dev.yml + +RUN pip3 install --upgrade pip +RUN pip3 install chainlit==1.1.305 \ No newline at end of file diff --git a/flows/chainlit-ui-evaluation/aggregate_variant_results.py b/flows/chainlit-ui-evaluation/aggregate_variant_results.py new file mode 100644 index 00000000..bffbefc3 --- /dev/null +++ b/flows/chainlit-ui-evaluation/aggregate_variant_results.py @@ -0,0 +1,38 @@ +from typing import List + +import numpy as np +from promptflow import log_metric, tool + + +@tool +def aggregate_variants_results(results: List[dict]): + """ + Aggregate the results of multiple variants. + + Args: + results (List[dict]): A list of dictionaries containing the results for each variant. + + Returns: + dict: A dictionary containing the aggregated results, with the metric names as keys and the aggregated values as values. + """ + aggregate_results = {} + for result in results: + for name, value in result.items(): + if name not in aggregate_results.keys(): + aggregate_results[name] = [] + try: + float_val = float(value) + except Exception: + float_val = np.nan + aggregate_results[name].append(float_val) + + for name, value in aggregate_results.items(): + metric_name = name + aggregate_results[name] = np.nanmean(value) + if "pass_rate" in metric_name: + metric_name = metric_name + "(%)" + aggregate_results[name] = aggregate_results[name] * 100.0 + aggregate_results[name] = round(aggregate_results[name], 2) + log_metric(metric_name, aggregate_results[name]) + + return aggregate_results diff --git a/flows/chainlit-ui-evaluation/azure_openai.yaml b/flows/chainlit-ui-evaluation/azure_openai.yaml new file mode 100644 index 00000000..5b916e77 --- /dev/null +++ b/flows/chainlit-ui-evaluation/azure_openai.yaml @@ -0,0 +1,6 @@ +$schema: https://azuremlschemas.azureedge.net/promptflow/latest/AzureOpenAIConnection.schema.json +name: open_ai_connection +type: azure_open_ai +api_key: "" +api_base: "" +api_type: "azure" \ No newline at end of file diff --git a/flows/chainlit-ui-evaluation/call_assistant.py b/flows/chainlit-ui-evaluation/call_assistant.py new file mode 100644 index 00000000..156ae81e --- /dev/null +++ b/flows/chainlit-ui-evaluation/call_assistant.py @@ -0,0 +1,419 @@ +import argparse +import asyncio +import inspect +import json +import os +import signal +import subprocess +import threading +import time + +from promptflow.core import tool + +FINISH_PHRASE = "all done" +OUTPUT_TAG = "ASSISTANT_OUTPUT" + + +@tool +# async def call_assistant(chat_history: list) -> dict: +def call_assistant(query: str, chat_history: str) -> dict: + """ + Calls the assistant API with the given input and retrieves the response. + + Args: + query: What the user asked + chat_history (list): A list containing the chat history, of the format ... + + [ + { + "author": "user", + "content": "Hi" + }, + { + "author": "assistant", + "content": "Hello! How can I help you today?", + }, + { + "author": "assistant", + "content": "What's the total population of Mali?", + } + ] + + Returns: + dict: A dictionary containing the response from the assistant, function name, function arguments, + function output, and the number of tokens in the function output. + """ + + print(chat_history) + + chat_history = json.loads(chat_history) + + # Add user query to chat history + chat_history.append({"author": "user", "content": query}) + + # chat_history = [ + # {"author": "user", "content": "Hi"}, + # { + # "author": "assistant", + # "content": "Hello! How can I help you today?", + # }, + # { + # "author": "assistant", + # "content": "Hi again!", + # }, + # ] + + chat_history = json.dumps(chat_history) + chat_history = chat_history.replace('"', '\\"') + chat_history = chat_history.replace("'", "\\'") + + print("History:", chat_history) + + result = run_chainlit_mock(chat_history) + + response = {"response": result} + + return response + + +def setup_mock_class(): + """ + Creates and returns a mock class for testing purposes. + + Returns: + cl_mock (MockChainlit): The mock class instance. + """ + + class MockMessage: + """ + A class representing a mock message. + + Attributes: + author (str): The author of the message. + content (str): The content of the message. + elements (list): The elements of the message. + disable_feedback (bool): Flag indicating whether feedback is disabled. + + Methods: + send(): Sends the message. + stream_token(content): Streams a token. + update(): Updates the message. + """ + + def __init__( + self, author=None, content=None, elements=None, disable_feedback=False + ): + if content is None: + content = "" + self.author = author + self.content = content + self.disable_feedback = disable_feedback + self.elements = elements if elements is not None else [] + + async def send(self): + """ + Sends the message. + + Returns: + MockMessage: The sent message. + """ + print( + f"Sending message: Author: {self.author}, Content: {self.content}, Elements: {self.elements}" + ) + return self + + async def stream_token(self, content): + """ + Streams a token. + + Args: + content (str): The content of the token. + + Returns: + MockMessage: The updated message. + """ + # print(f"Streaming token: Author: {self.author}, Content: {content}") + self.content += content + return self + + async def update(self): + """ + Updates the message. + + Returns: + MockMessage: The updated message. + """ + print( + f"Updating message: Author: {self.author}, Content: {self.content}, Elements: {self.elements}" + ) + return self + + class MockUserSession: + """ + A class representing a mock user session. + + Attributes: + session_data (dict): A dictionary to store session data. + + Methods: + get(key): Retrieves the value associated with the given key from the session data. + set(key, value): Sets the value associated with the given key in the session data. + """ + + def __init__(self): + self.session_data = {} + + def get(self, key): + """ + Retrieves the value associated with the given key from the session data. + + Args: + key (str): The key to retrieve the value for. + + Returns: + The value associated with the given key, or None if the key is not found. + """ + return self.session_data.get(key, None) + + def set(self, key, value): + """ + Sets the value associated with the given key in the session data. + + Args: + key (str): The key to set the value for. + value: The value to be associated with the key. + """ + self.session_data[key] = value + + class MockChainlit: + """ + A mock implementation of the Chainlit class. + """ + + def __init__(self): + self.Message = MockMessage + self.user_session = MockUserSession() + self.__name__ = "chainlit" + self.step = None + + def Text(self, name, content, display): + """ + Creates a text element. + + Args: + text (str): The text content. + + Returns: + dict: A dictionary containing the text element. + """ + return {"type": "Text", "text": content} + + cl_mock = MockChainlit() + + return cl_mock + + +# Method to run a supplied function to override chainlit's run_sync method +def run_async_coroutine(coroutine): + """ + Runs an asynchronous coroutine in a separate event loop and returns the result. + + Args: + coroutine: The coroutine to be executed asynchronously. + + Returns: + The result of the coroutine execution. + + Raises: + asyncio.TimeoutError: If the coroutine execution times out. + + """ + + def start_loop(loop): + asyncio.set_event_loop(loop) + loop.run_forever() + + new_loop = asyncio.new_event_loop() + t = threading.Thread(target=start_loop, args=(new_loop,)) + t.start() + future = asyncio.run_coroutine_threadsafe(coroutine, new_loop) + try: + return future.result(timeout=10) + except asyncio.TimeoutError: + print("Coroutine execution timed out.") + return None + + +def run_chainlit_mock(chat_history: str) -> str: + """ + This function is used to run the chainlit script and monitor its output. + TODO It is a temporary workaround because running the exact chainlit code + does not exit all asynchronous threads and hangs. This workaround is temporary + and should be replaced by breaking e2e testing into data recipes API and + the assistant. Testing both independently is way less complicated. + + Args: + chat_history (str): A string containing the chat history. + + Returns: + result (str): The result of the chainlit script running with input history + + """ + + all_output = "" + result = "" + print("Monitoring chainlit output") + process = subprocess.Popen( + ["python3", "call_assistant.py", "--chat_history", chat_history], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE, + ) + print(process) + while True: + output = process.stdout.readline() + print(output) + if output == b"" and process.poll() is not None: + print( + "Process finished with No output, try running call_assistant by hand to debug." + ) + break + if output: + all_output += output.decode("utf-8") + print(output.strip()) + if FINISH_PHRASE in str(output).lower(): + print(FINISH_PHRASE) + print("Killing process") + os.kill(process.pid, signal.SIGKILL) + print(OUTPUT_TAG) + if OUTPUT_TAG in all_output: + result = all_output.split(OUTPUT_TAG)[1].strip() + print("Result:", result) + else: + result = "Unparsable output" + break + time.sleep(0.1) + return result + + +def run_sync(func, *args, **kwargs): + """ + Run a function synchronously or asynchronously depending on its type. + + Args: + func: The function to be executed. + *args: Positional arguments to be passed to the function. + **kwargs: Keyword arguments to be passed to the function. + + Returns: + The result of the function execution. + + Raises: + None. + + """ + if inspect.iscoroutinefunction(func): + # Use the alternative approach for coroutine functions + coroutine = func(*args, **kwargs) + return run_async_coroutine(coroutine) + elif asyncio.iscoroutine(func): + # Directly pass the coroutine object + return run_async_coroutine(func) + else: + # Handle synchronous function + return func(*args, **kwargs) + + +async def test_using_app_code_async(chat_history, timeout=5): + + cl_mock = setup_mock_class() + import app as app + + app.run_sync = run_sync + app.cl = cl_mock + + await app.start_chat() + + thread_id = app.cl.user_session.get("thread_id") + + # Here build history + chat_history = chat_history.replace("\\", "") + print(">>>>>>>> Chat history:", chat_history) + history = json.loads(chat_history) + last_message = history[-1] + app_chat_history = app.cl.user_session.get("chat_history") + for message in history: + role = message["author"] + msg = message["content"] + await app.add_message_to_thread(thread_id, role, msg) + app_chat_history.append({"role": role, "content": msg}) + app.cl.user_session.set("chat_history", history) + + print("<<<<<<<< Last message:", last_message) + + msg = cl_mock.Message(author="user", content=last_message["content"], elements=[]) + await app.process_message(msg) + + messages = app.sync_openai_client.beta.threads.messages.list(thread_id) + result = messages.data[0].content[0].text.value + + return result + + +def test_using_app_code(chat_history): + loop = asyncio.new_event_loop() + asyncio.set_event_loop(loop) + result = loop.run_until_complete(test_using_app_code_async(chat_history)) + loop.close() + return result + + +def main_direct_function(): + """ + TODO + For testing direct function call, which hangs even though finished because of + some issue with async. Left here for future reference for somebody to fix so + the script execution and kill hack can be retired. + + """ + # chat_history = '[{\"author\": \"user\",\"content\": \"Hi\"},{\"author\":\"assistant\content\": \"Hello! How can I help you today?\"},{\"author\": \"assistant\",\"content\": \"What is the total population of Mali?\"}]' + chat_history = '[{"author": "user","content": "Hi"}' + + result = test_using_app_code(chat_history) + print("OUTPUT") + print(result) + print("OUTPUT") + + +def main(): + + parser = argparse.ArgumentParser( + description="Process check in and check out operations (i.e. extracting recipes and recipes from the database for quality checks and edits)." + ) + + parser.add_argument( + "--chat_history", + type=str, + required=True, + help=""" + A list containing the chat history, of the format (but in one line) ... + + '[{\"author\": \"user\",\"content\": \"Hi\"},{\"author\":\"assistant\",\"content\": \"Hello! How can I help you today?\"},{\"author\": \"assistant\",\"content\": \"What is the total population of Mali?\"}]' + """, + ) + + args = parser.parse_args() + chat_history = args.chat_history + + if chat_history: + result = test_using_app_code(chat_history) + print(OUTPUT_TAG) + print(result) + print(OUTPUT_TAG) + + # Do not remove this line + print(FINISH_PHRASE) + + +if __name__ == "__main__": + # main_direct_function() + main() diff --git a/flows/chainlit-ui-evaluation/check_evaluation_results.py b/flows/chainlit-ui-evaluation/check_evaluation_results.py new file mode 100644 index 00000000..352ed21e --- /dev/null +++ b/flows/chainlit-ui-evaluation/check_evaluation_results.py @@ -0,0 +1,37 @@ +# This script runs after GitHub action to check promptflow evaluation +import json +import os +import subprocess +import sys + + +def check_result(run_name="base_run", cutoff=100.0): + """ + Check the evaluation result for a given run. + + Args: + run_name (str, optional): The name of the run to check. Defaults to "base_run". + cutoff (float, optional): The cutoff value for passing the evaluation. Defaults to 100.0. + + Returns: + dict: The evaluation result. + + Raises: + SystemExit: If the evaluation result is below the cutoff value. + """ + cmd = f"pf run show-metrics -n {run_name}" + print(cmd) + # Run cmd and capture output + result = subprocess.check_output(cmd, shell=True, text=True) + print(result) + result = json.loads(result) + print(result["gpt_groundedness_pass_rate(%)"]) + if result["gpt_groundedness_pass_rate(%)"] < cutoff: + sys.exit(f"FAILED!!!! Run {run_name} failed with score {result}.") + else: + print(f"Run {run_name} passed with score {result}.") + return result + + +if __name__ == "__main__": + check_result() diff --git a/flows/chainlit-ui-evaluation/concat_scores.py b/flows/chainlit-ui-evaluation/concat_scores.py new file mode 100644 index 00000000..112bb9a1 --- /dev/null +++ b/flows/chainlit-ui-evaluation/concat_scores.py @@ -0,0 +1,39 @@ +import re + +import numpy as np +from promptflow import tool + + +@tool +def concat_results(groundesness_score: str): + """ + Concatenates the results of groundedness scores. + + Args: + groundesness_score (str): The groundedness score. + + Returns: + dict: A dictionary containing the concatenated results of groundedness scores. + """ + + load_list = [{"name": "gpt_groundedness", "score": groundesness_score}] + score_list = [] + errors = [] + for item in load_list: + try: + score = item["score"] + match = re.search(r"\d", score) + if match: + score = match.group() + score = float(score) + except Exception as e: + score = np.nan + errors.append({"name": item["name"], "msg": str(e), "data": item["score"]}) + score_list.append({"name": item["name"], "score": score}) + + variant_level_result = {} + for item in score_list: + item_name = str(item["name"]) + variant_level_result[item_name] = item["score"] + variant_level_result[item_name + "_pass_rate"] = 1 if item["score"] > 3 else 0 + return variant_level_result diff --git a/flows/chainlit-ui-evaluation/data.jsonl b/flows/chainlit-ui-evaluation/data.jsonl new file mode 100644 index 00000000..1602a71b --- /dev/null +++ b/flows/chainlit-ui-evaluation/data.jsonl @@ -0,0 +1,2 @@ +{"query": "What is the total population of Mali", "context": "The total population of Mali is 17839995"} +{"query": "Can you provide the total population for Ethiopia and its breakdown by gender?", "context": "The total population for Ethiopia is 102,536,656, with a gender breakdown as follows:\n\nMale population: 51,389,381\nFemale population: 51,147,275"} diff --git a/flows/chainlit-ui-evaluation/flow.dag.yaml b/flows/chainlit-ui-evaluation/flow.dag.yaml new file mode 100644 index 00000000..92107c0b --- /dev/null +++ b/flows/chainlit-ui-evaluation/flow.dag.yaml @@ -0,0 +1,71 @@ +$schema: https://azuremlschemas.azureedge.net/promptflow/latest/Flow.schema.json +environment: + python_requirements_txt: requirements.txt +inputs: + query: + type: string + default: What's the total population of Mali? + context: + type: string + default: '"The answer is:\n\n \n The answer is: + **17,907,114.0**\n\n Metadata for the + answer:\n {\"params\": {\"country_code\": \"MLI\"}, + \"attribution\": + \"https://data.humdata.org/dataset/ce21c7db-d8f0-40f8-adc2-452d2d2d105c\", + \"data_url\": + \"https://data.humdata.org/dataset/ce21c7db-d8f0-40f8-adc2-452d2d2d105c/resource/6f243ba2-4d4a-4663-a7c4-e917dbbde73a/download/mli_pop_adm0_v2.csv\", + \"time_period\": {\"start\": \"2018-01-01\", \"end\": + \"2018-12-31T23:59:59\"}}"' + chat_history: + type: string + default: '[{"author": "user","content": "Hi!"}]' +outputs: + agent_output: + type: string + reference: ${call_assistant.output.response} + groundedness_score: + type: string + reference: ${groundedness_score.output} + context: + type: string + reference: ${inputs.context} + query: + type: string + reference: ${inputs.query} +nodes: +- name: call_assistant + type: python + source: + type: code + path: call_assistant.py + inputs: + query: ${inputs.query} + chat_history: ${inputs.chat_history} +- name: groundedness_score + type: llm + source: + type: code + path: groundedness_score.jinja2 + inputs: + deployment_name: gpt-4-turbo + answer: ${call_assistant.output.response} + context: ${inputs.context} + temperature: 1 + model: gpt-4-turbo-preview + connection: azure_openai + api: chat +- name: concat_scores + type: python + source: + type: code + path: concat_scores.py + inputs: + groundesness_score: ${groundedness_score.output} +- name: aggregate_variant_results + type: python + source: + type: code + path: aggregate_variant_results.py + inputs: + results: ${concat_scores.output} + aggregation: true diff --git a/flows/chainlit-ui-evaluation/groundedness_score.jinja2 b/flows/chainlit-ui-evaluation/groundedness_score.jinja2 new file mode 100644 index 00000000..182a6329 --- /dev/null +++ b/flows/chainlit-ui-evaluation/groundedness_score.jinja2 @@ -0,0 +1,35 @@ +System: +You are an AI assistant. You will be given the definition of an evaluation metric for assessing the quality of an answer in a question-answering task. Your job is to compute an accurate evaluation score using the provided evaluation metric. +User: +You will be presented with a CONTEXT and an ANSWER about that CONTEXT. You need to decide whether the ANSWER is entailed by the CONTEXT by choosing one of the following rating: +1. 5: The ANSWER follows logically from the information contained in the CONTEXT. +2. 1: The ANSWER is logically false from the information contained in the CONTEXT. +3. an integer score between 1 and 5 and if such integer score does not exists, use 1: It is not possible to determine whether the ANSWER is true or false without further information. + +Read the passage of information thoroughly and select the correct answer from the three answer labels. Read the CONTEXT thoroughly to ensure you know what the CONTEXT entails. + +Note the ANSWER is generated by a computer system, it can contain certain symbols, which should not be a negative factor in the evaluation. +Independent Examples: +## Example Task #1 Input: +{"CONTEXT": "The Academy Awards, also known as the Oscars are awards for artistic and technical merit for the film industry. They are presented annually by the Academy of Motion Picture Arts and Sciences, in recognition of excellence in cinematic achievements as assessed by the Academy's voting membership. The Academy Awards are regarded by many as the most prestigious, significant awards in the entertainment industry in the United States and worldwide.", "ANSWER": "Oscar is presented every other two years"} +## Example Task #1 Output: +1 +## Example Task #2 Input: +{"CONTEXT": "The Academy Awards, also known as the Oscars are awards for artistic and technical merit for the film industry. They are presented annually by the Academy of Motion Picture Arts and Sciences, in recognition of excellence in cinematic achievements as assessed by the Academy's voting membership. The Academy Awards are regarded by many as the most prestigious, significant awards in the entertainment industry in the United States and worldwide.", "ANSWER": "Oscar is very important awards in the entertainment industry in the United States. And it's also significant worldwide"} +## Example Task #2 Output: +5 +## Example Task #3 Input: +{"CONTEXT": "In Quebec, an allophone is a resident, usually an immigrant, whose mother tongue or home language is neither French nor English.", "ANSWER": "In Quebec, an allophone is a resident, usually an immigrant, whose mother tongue or home language is not French."} +## Example Task #3 Output: +5 +## Example Task #4 Input: +{"CONTEXT": "Some are reported as not having been wanted at all.", "ANSWER": "All are reported as being completely and fully wanted."} +## Example Task #4 Output: +1 + +Reminder: The return values for each task should be correctly formatted as an integer between 1 and 5. Do not repeat the context. + +## Actual Task Input: +{"CONTEXT": {{context}}, "ANSWER": {{answer}}} + +Actual Task Output: \ No newline at end of file diff --git a/flows/chainlit-ui-evaluation/openai.yaml b/flows/chainlit-ui-evaluation/openai.yaml new file mode 100644 index 00000000..68d25b7a --- /dev/null +++ b/flows/chainlit-ui-evaluation/openai.yaml @@ -0,0 +1,4 @@ +$schema: https://azuremlschemas.azureedge.net/promptflow/latest/OpenAIConnection.schema.json +name: open_ai_connection +type: open_ai +api_key: "" \ No newline at end of file diff --git a/templates/generate_intent_from_history_prompt.jinja2 b/templates/generate_intent_from_history_prompt.jinja2 index dd88e2f4..1d669f63 100644 --- a/templates/generate_intent_from_history_prompt.jinja2 +++ b/templates/generate_intent_from_history_prompt.jinja2 @@ -10,12 +10,6 @@ Here is the chat history: {{ chat_history }} -Important: - -- Be careful to note the chat history, they may have just asked a follow-up question -- Put more emphasis on their last input, it has a stronger influence on the intent than earlier inputs in chat_history -- include all entities such as places - Intent format: The intent should cature any of these fields that have been specified in the user's request and history: @@ -36,8 +30,80 @@ user_preferences: Any user-specific preferences for the analysis or output (e.g. Here are some examples of what your output should look like: -plot a bar chart of population by state in Nigeria for 2023 using HDX(HAPI) data, highlighting top 5 states as an image -generate a line chart of average commodity prices by year for 2022-2023 for Haiti +======== EXAMPLE 1 + +The user asked this question: + +What is the total population of Mali? + +Here is the chat history: + +[{"author": "user","content": "Hi"},{"author": "user","content": "What is the total population of Mali"}] + +Output: + +{ + "intent": "provide the total population of Mali", + "reason": "The user's last request clearly asked for the total population of Mali in their last input. There is no need for further clarification." +} + +======== EXAMPLE 2 + +The user asked this question: + +Can you give me average commodity prices by year for 2022-2023 for Haiti? + +Here is the chat history: + +[{"author": "user","content": "Can you give me average commodity prices by year for 2022-2023 for Haiti?"}] + +Output: + +{ + "intent": "generate a line chart of average commodity prices by year for 2022-2023 for Haiti", + "reason": "The user's last request specifies the action (generate), visualization type (line chart), data type (average commodity prices), time range (2022-2023), and location (Haiti). There is no need for further clarification." +} + +======== EXAMPLE 3 + +The user asked this question: + +Plot a bar chart for the population of Nigeria by admin 1, and highlight the top 5 states please + +Here is the chat history: + +[{"author": "user","content": "Do you have data on conflicts in Ethiopia"}, {"author": "user","content": "Plot a map for the population of Nigeria by admin 1"}] + +Output: + +{ + "intent": "plot a bar chart of population by state in Nigeria for 2023 using HDX(HAPI) data, highlighting top 5 states as an image", + "reason": "The user changed context, their last request specifies the action (plot), visualization type (bar chart), disaggregation (by state), data source (HDX(HAPI) data), location (Nigeria), and output format (image). The user also requested to highlight the top 5 states. There is no need for further clarification." +} + +======== EXAMPLE 4 + +The user asked this question: + +Plot that by state on a map + +Here is the chat history: + +[{"author": "user","content": "What's the total populkation of Nigeria?"}, {"author": "user","content": "Plot that by state on a map"}] + +Output: + +{ + "intent": "plot a map of the total population of Nigeria by state as an image", + "reason": "The user's last request was a follow-on from a previous request and specifies the action (plot), visualization type (map), disaggregation (by state), data type (total population), location (Nigeria), and output format (image). There is no need for further clarification." +} + +======================================================================= + +Important: + +- The user's last input is the most important, but pay attention to the chat history to see if they changed context are are asking something entirely new, or are asking a follow-up question +- include all entities such as places Task: diff --git a/ui/chat-chainlit-assistant/app.py b/ui/chat-chainlit-assistant/app.py index 73880a13..209b2c40 100644 --- a/ui/chat-chainlit-assistant/app.py +++ b/ui/chat-chainlit-assistant/app.py @@ -72,158 +72,181 @@ config.ui.name = bot_name -class EventHandler(AssistantEventHandler): - - def __init__(self, assistant_name: str) -> None: - """ - Initializes a new instance of the ChatChainlitAssistant class. - - Args: - assistant_name (str): The name of the assistant. +def get_event_handler(cl, assistant_name): # noqa: C901 + """ + Returns an instance of the EventHandler class, which is responsible for handling events in the ChatChainlitAssistant. - Returns: - None - """ - super().__init__() - self.current_message: cl.Message = None - self.current_step: cl.Step = None - self.current_tool_call = None - self.current_message_text = "" - self.assistant_name = assistant_name - - @override - def on_event(self, event): - """ - Handles the incoming event and performs the necessary actions based on the event type. + Args: + cl: The ChatClient instance used for communication with the chat service. + assistant_name (str): The name of the assistant. - Args: - event: The event object containing information about the event. + Returns: + EventHandler: An instance of the EventHandler class. + """ - Returns: - None - """ - print(event.event) - run_id = event.data.id - if event.event == "thread.message.created": - self.current_message = run_sync(cl.Message(content="").send()) - self.current_message_text = "" - print("Run started") - if event.event == "thread.message.completed": - self.handle_message_completed(event.data, run_id) - elif event.event == "thread.run.requires_action": - self.handle_requires_action(event.data, run_id) - elif event.event == "thread.message.delta": - self.handle_message_delta(event.data) - else: - print(json.dumps(str(event.data), indent=4)) - print(f"Unhandled event: {event.event}") + class EventHandler(AssistantEventHandler): - def handle_message_delta(self, data): - """ - Handles the message delta data. + def __init__(self, assistant_name: str) -> None: + """ + Initializes a new instance of the ChatChainlitAssistant class. - Args: - data: The message delta data. + Args: + assistant_name (str): The name of the assistant. - Returns: - None - """ - for content in data.delta.content: - if content.type == "text": - content = content.text.value - if content is not None: - self.current_message_text += content - run_sync(self.current_message.stream_token(content)) - elif content.type == "image_file": - file_id = content.image_file.file_id - image_data = sync_openai_client.files.content(file_id) - image_data_bytes = image_data.read() - png_file = f"{images_loc}{file_id}.png" - print(f"Writing image to {png_file}") - with open(png_file, "wb") as file: - file.write(image_data_bytes) - image = cl.Image(path=png_file, display="inline", size="large") - print(f"Image: {png_file}") - if not self.current_message.elements: - self.current_message.elements = [] - self.current_message.elements.append(image) - run_sync(self.current_message.update()) + Returns: + None + """ + super().__init__() + self.current_message: cl.Message = None + self.current_step: cl.Step = None + self.current_tool_call = None + self.current_message_text = "" + self.assistant_name = assistant_name + self.cl = cl + + @override + def on_event(self, event): + """ + Handles the incoming event and performs the necessary actions based on the event type. + + Args: + event: The event object containing information about the event. + + Returns: + None + """ + print(event.event) + run_id = event.data.id + if event.event == "thread.message.created": + self.current_message = self.cl.Message(content="") + self.current_message = run_sync(self.current_message.send()) + self.current_message_text = "" + print("Run started") + if event.event == "thread.message.completed": + self.handle_message_completed(event.data, run_id) + elif event.event == "thread.run.requires_action": + self.handle_requires_action(event.data, run_id) + elif event.event == "thread.message.delta": + self.handle_message_delta(event.data) + elif event.event == "thread.run.completed": + print("Run completed") else: - print(f"Unhandled delta type: {content.type}") + print(json.dumps(str(event.data), indent=4)) + print(f"Unhandled event: {event.event}") + + def handle_message_delta(self, data): + """ + Handles the message delta data. + + Args: + data: The message delta data. + + Returns: + None + """ + for content in data.delta.content: + if content.type == "text": + content = content.text.value + if content is not None: + self.current_message_text += content + run_sync(self.current_message.stream_token(content)) + elif content.type == "image_file": + file_id = content.image_file.file_id + image_data = sync_openai_client.files.content(file_id) + image_data_bytes = image_data.read() + png_file = f"{images_loc}{file_id}.png" + print(f"Writing image to {png_file}") + with open(png_file, "wb") as file: + file.write(image_data_bytes) + image = cl.Image(path=png_file, display="inline", size="large") + print(f"Image: {png_file}") + if not self.current_message.elements: + self.current_message.elements = [] + self.current_message.elements.append(image) + run_sync(self.current_message.update()) + else: + print(f"Unhandled delta type: {content.type}") + + def handle_message_completed(self, data, run_id): + """ + Handles the completion of a message. + + Args: + data: The data associated with the completed message. + run_id: The ID of the message run. + + Returns: + None + """ + # Add footer to self message. We have to start a new message so it's in right order + # TODO combine streaming with image and footer + run_sync(self.current_message.update()) + self.current_message = run_sync( + cl.Message(content="", disable_feedback=True).send() + ) - def handle_message_completed(self, data, run_id): - """ - Handles the completion of a message. + word_count = len(self.current_message_text.split()) + if word_count > 10: + run_sync(self.current_message.stream_token(llm_footer)) + run_sync(self.current_message.update()) - Args: - data: The data associated with the completed message. - run_id: The ID of the message run. + def handle_requires_action(self, data, run_id): + """ + Handles the required action by executing the specified tools and submitting the tool outputs. - Returns: - None - """ - # Add footer to self message. We have to start a new message so it's in right order - # TODO combine streaming with image and footer - run_sync(self.current_message.update()) - self.current_message = run_sync( - cl.Message(content="", disable_feedback=True).send() - ) + Args: + data: The data containing the required action information. + run_id: The ID of the current run. - word_count = len(self.current_message_text.split()) - if word_count > 10: - run_sync(self.current_message.stream_token(llm_footer)) - run_sync(self.current_message.update()) + Returns: + None + """ + tool_outputs = [] - def handle_requires_action(self, data, run_id): - """ - Handles the required action by executing the specified tools and submitting the tool outputs. + for tool in data.required_action.submit_tool_outputs.tool_calls: + print(tool) - Args: - data: The data containing the required action information. - run_id: The ID of the current run. + function_name = tool.function.name + function_args = tool.function.arguments - Returns: - None - """ - tool_outputs = [] + function_output = run_function(function_name, function_args) - for tool in data.required_action.submit_tool_outputs.tool_calls: - print(tool) + tool_outputs.append( + {"tool_call_id": tool.id, "output": function_output} + ) - function_name = tool.function.name - function_args = tool.function.arguments + print("TOOL OUTPUTS: ") - function_output = run_function(function_name, function_args) + print(tool_outputs) - tool_outputs.append({"tool_call_id": tool.id, "output": function_output}) + # Submit all tool_outputs at the same time + self.submit_tool_outputs(tool_outputs, run_id) - print("TOOL OUTPUTS: ") + def submit_tool_outputs(self, tool_outputs, run_id): + """ + Submits the tool outputs to the current run. - print(tool_outputs) + Args: + tool_outputs (list): A list of tool outputs to be submitted. + run_id (str): The ID of the current run. - # Submit all tool_outputs at the same time - self.submit_tool_outputs(tool_outputs, run_id) + Returns: + None + """ + event_handler = get_event_handler(cl, assistant.name) + with sync_openai_client.beta.threads.runs.submit_tool_outputs_stream( + thread_id=self.current_run.thread_id, + run_id=self.current_run.id, + tool_outputs=tool_outputs, + event_handler=event_handler, + ) as stream: + # Needs this line, or it doesn't work! :) + for text in stream.text_deltas: + print(text) - def submit_tool_outputs(self, tool_outputs, run_id): - """ - Submits the tool outputs to the current run. + event_handler = EventHandler(assistant_name) - Args: - tool_outputs (list): A list of tool outputs to be submitted. - run_id (str): The ID of the current run. - - Returns: - None - """ - with sync_openai_client.beta.threads.runs.submit_tool_outputs_stream( - thread_id=self.current_run.thread_id, - run_id=self.current_run.id, - tool_outputs=tool_outputs, - event_handler=EventHandler(assistant_name=self.assistant_name), - ) as stream: - # Needs this line, or it doesn't work! :) - for text in stream.text_deltas: - print(text) + return event_handler def run_function(function_name, function_args): @@ -365,18 +388,21 @@ async def start_chat(): It also sends an avatar and a welcome message to the chat. Returns: - None + dict: The thread object returned by the OpenAI API. """ # Create a Thread thread = await async_openai_client.beta.threads.create() # Store thread ID in user session for later use cl.user_session.set("thread_id", thread.id) + await cl.Message( content="Hi. I'm your humanitarian AI assistant.", disable_feedback=True ).send() cl.user_session.set("chat_history", []) + return thread + def get_metadata_footer(metadata): """ @@ -444,6 +470,9 @@ def check_memories_recipes(user_input: str, history=[]) -> str: memory_response = None meta_data_msg = "" + print("History:") + print(history) + memory = call_get_memory_recipe_api( user_input, history=str(history), generate_intent="true" ) @@ -535,87 +564,6 @@ def check_memories_recipes(user_input: str, history=[]) -> str: async_check_memories_recipes = make_async(check_memories_recipes) -@cl.on_message -async def main(message: cl.Message): - """ - Process the user's message and interact with the assistant. - - Args: - message (cl.Message): The user's message. - - Returns: - None - """ - thread_id = cl.user_session.get("thread_id") - - # Azure doesn't yet support attachments - if os.getenv("ASSISTANTS_API_TYPE") == "openai": - - attachments = await process_files(message.elements) - - # Add a Message to the Thread - await async_openai_client.beta.threads.messages.create( - thread_id=thread_id, - role="user", - content=message.content, - attachments=attachments, - ) - else: - - # Add a Message to the Thread - await async_openai_client.beta.threads.messages.create( - thread_id=thread_id, - role="user", - content=message.content, - ) - - # Append to chat history - chat_history = cl.user_session.get("chat_history") - chat_history.append({"role": "user", "content": message.content}) - cl.user_session.set("chat_history", chat_history) - - # Check recipes - msg = await cl.Message("").send() - memory_found, memory_content, memory_response, meta_data_msg = ( - await async_check_memories_recipes(message.content, chat_history) - ) - - # memory_foundy=False - - # Message to the thread. If a memory add it as the assistant - if memory_found is True: - print("Adding memory to thread") - await async_openai_client.beta.threads.messages.create( - thread_id=thread_id, - role="assistant", - content=memory_content, - # attachments=attachments, - ) - - msg.content = memory_response["content"] - msg.elements = memory_response["elements"] - await msg.update() - - # TODO really should be part of message above so feedback can apply - await cl.Message(meta_data_msg).send() - - # No need to send anything - return - - # msg.content = "Can't find anything in my memories, let me do some analysis ..." - msg.content = "" - await msg.update() - - # Create and Stream a Run - print(f"Creating and streaming a run {assistant.id}") - with sync_openai_client.beta.threads.runs.stream( - thread_id=thread_id, - assistant_id=assistant.id, - event_handler=EventHandler(assistant_name=assistant.name), - ) as stream: - stream.until_done() - - @cl.on_audio_chunk async def on_audio_chunk(chunk: cl.AudioChunk): """ @@ -670,10 +618,6 @@ async def on_audio_end(elements: list[Element]): # elements=[input_audio_el, *elements], ).send() - msg = cl.Message(author="You", content=transcription, elements=elements) - - await main(message=msg) - @cl.password_auth_callback def auth_callback(username: str, password: str): @@ -693,3 +637,91 @@ def auth_callback(username: str, password: str): ) else: return None + + +async def add_message_to_thread(thread_id, role, content, message=None): + """ + Add a message to a thread. + + Args: + thread_id (str): The ID of the thread. + role (str): The role of the message author. + content (str): The content of the message. + message (cl.Message): The message object. + + Returns: + None + """ + + print(f"Content: {content}") + + attachments = [] + + # Azure doesn't yet support attachments + if os.getenv("ASSISTANTS_API_TYPE") == "openai": + if message is not None: + attachments = await process_files(message.elements) + + await async_openai_client.beta.threads.messages.create( + thread_id=thread_id, + role=role, + content=content, + attachments=attachments, + ) + + +@cl.on_message +async def process_message(message: cl.Message): + """ + Process the user's message and interact with the assistant. + + Args: + message (cl.Message): The user's message. + + Returns: + None + """ + + thread_id = cl.user_session.get("thread_id") + chat_history = cl.user_session.get("chat_history") + chat_history.append({"role": "user", "content": message.content}) + + # Add user message to thread + print(f"Adding user message {message.content} to thread {thread_id}") + await add_message_to_thread(thread_id, "user", message.content, message) + + # Check memories/recipes + msg = await cl.Message("").send() + memory_found, memory_content, memory_response, meta_data_msg = ( + await async_check_memories_recipes(message.content, chat_history) + ) + + if memory_found is True: + print("Adding memory to thread as assistant") + await add_message_to_thread(thread_id, "assistant", memory_content) + + # output memory artifacts + msg.content = memory_response["content"] + msg.elements = memory_response["elements"] + await msg.update() + + # TODO really should be part of message above so feedback can apply + await cl.Message(meta_data_msg).send() + + else: + + # msg.content = "Can't find anything in my memories, let me do some analysis ..." + msg.content = "" + await msg.update() + + # Create and Stream a Run to assistant + print(f"Creating and streaming a run {assistant.id}") + event_handler = get_event_handler(cl, assistant.name) + with sync_openai_client.beta.threads.runs.stream( + thread_id=thread_id, + assistant_id=assistant.id, + event_handler=event_handler, + ) as stream: + stream.until_done() + + cl.user_session.set("chat_history", chat_history)