diff --git a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/.env.exemple b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/.env.exemple new file mode 100644 index 0000000000..e6138c9bf9 --- /dev/null +++ b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/.env.exemple @@ -0,0 +1,11 @@ +#for LangFuse dataset provider +LANGFUSE_SECRET_KEY= +LANGFUSE_PUBLIC_KEY= +LANGFUSE_HOST= + +# for LangsSmith dataset_provider +LANGCHAIN_API_KEY= + +# for smarttribune_consumer.py script +API_KEY= +API_SECRET= \ No newline at end of file diff --git a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/README.md b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/README.md index d109c2b438..2950a6aa46 100644 --- a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/README.md +++ b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/README.md @@ -181,12 +181,12 @@ To configure the OpenSearch vector store, you can use the following environment ### generate_dataset.py -Generates a testing dataset based on an input file. The input file should have the correct format (see generate_datset_input.xlsx for sample). The generated dataset can be saved on filesystem, using the --csv-output option, on langsmith, using the --langsmith-dataset-name option, or both. +Generates a testing dataset based on an input file. The input file should have the correct format (see generate_datset_input.xlsx for sample). The generated dataset can be saved on filesystem, using the --csv-output option, on langsmith, using the --langsmith-dataset-name option, on langfuse using the --langfuse-dataset-name option, or both. ``` Usage: - generate_dataset.py [-v] --range= [--csv-output=] [ --langsmith-dataset-name= ] [--locale=] [--no-answer=] - generate_dataset.py [-v] --sheet=... [--csv-output=] [ --langsmith-dataset-name= ] [--locale=] [--no-answer=] + generate_dataset.py [-v] --range= [--csv-output=] [ --langsmith-dataset-name= ] [ --langfuse-dataset-name= ] [--locale=] [--no-answer=] + generate_dataset.py [-v] --sheet=... [--csv-output=] [ --langsmith-dataset-name= ] [ --langfuse-dataset-name= ] [--locale=] [--no-answer=] Arguments: input_excel path to the input excel file @@ -196,22 +196,22 @@ Options: --sheet= Sheet numbers to be parsed. Indices are 0-indexed. --csv-output= Output path of csv file to be generated. --langsmith-dataset-name= Name of the dataset to be saved on langsmith. + --langfuse-dataset-name= Name of the dataset to be saved on langfuse. --locale= Locale to be included in de dataset. [default: French] --no-answer= Label of no_answer to be included in the dataset. [default: NO_RAG_SENTENCE] -h --help Show this screen --version Show version -v Verbose output for debugging (without this option, script will be silent but for errors) - -Generates a testing dataset based on an input file. The input file should have the correct format (see generate_datset_input.xlsx for sample). The generated dataset can be saved on filesystem, using the --csv-output option, on langsmith, using the --langsmith-dataset-name option, or both. +Generates a testing dataset based on an input file. The input file should have the correct format (see generate_datset_input.xlsx for sample). The generated dataset can be saved on filesystem, using the --csv-output option, on langsmith, using the --langsmith-dataset-name option, on langfuse using the --langfuse-dataset-name option, or both. ``` ### rag_testing_tool.py -Retrieval-Augmented Generation (RAG) endpoint settings testing tool based on LangSmith's SDK: runs a specific RAG Settings configuration against a reference dataset. +Retrieval-Augmented Generation (RAG) endpoint settings testing tool based on LangSmith's or LangFuse's SDK: runs a specific RAG Settings configuration against a reference dataset. ``` Usage: - rag_testing_tool.py [-v] [] + rag_testing_tool.py [-v] [] rag_testing_tool.py -h | --help rag_testing_tool.py --version @@ -221,6 +221,7 @@ Arguments: provider, indexation session's unique id, and 'k', i.e. nb of retrieved docs (question and chat history are ignored, as they will come from the dataset) + dataset_provider the dataset provider (langsmith or langfuse) dataset_name the reference dataset name test_name name of the test run @@ -232,7 +233,7 @@ Options: be silent but for errors) ``` -Build a RAG (Lang)chain from the RAG Query and runs it against the provided LangSmith dataset. The chain is created anew for each entry of the dataset, and if a delay is provided each chain creation will be delayed accordingly. +Build a RAG (Lang)chain from the RAG Query and runs it against the provided LangSmith or LangSmith dataset. The chain is created anew for each entry of the dataset, and if a delay is provided each chain creation will be delayed accordingly. ### export_run_results.py Export a LangSmith dataset run results, in csv format. @@ -256,3 +257,27 @@ The exported CSV file will have these columns : 'Reference input'|'Reference output'|'Response 1'|'Sources 1'|...|'Response N'|'Sources N' NB: There will be as many responses as run sessions ``` + +### export_run_results_langfuse.py + +Export a LangFuse dataset run results, in csv format. + +``` +Usage: + export_run_results_langfuse.py [-v] ... + export_run_results_langfuse.py -h | --help + export_run_results_langfuse.py --version + +Arguments: + dataset_name dataset id + runs_names list of session ids + +Options: + -h --help Show this screen + --version Show version + -v Verbose output for debugging + +The exported CSV file will have these columns : +'Reference input'|'Reference output'|'Response 1'|'Sources 1'|...|'Response N'|'Sources N' +NB: There will be as many responses as run sessions +``` diff --git a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/docs/rag_testing_tools.png b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/docs/rag_testing_tools.png index fd9181feaf..ed6fe2df47 100644 Binary files a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/docs/rag_testing_tools.png and b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/docs/rag_testing_tools.png differ diff --git a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/export_run_results.py b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/export_run_results.py index 4f4b78e521..45df8faf1a 100644 --- a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/export_run_results.py +++ b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/export_run_results.py @@ -1,99 +1,140 @@ -"""Export a LangSmith dataset run results. - +""" +Export a LangSmith or LangFuse dataset run results. Usage: - export_run_results.py [-v] ... - export_run_results.py -h | --help - export_run_results.py --version - + export_run_results_both.py [-v] ... + export_run_results_both.py -h | --help + export_run_results_both.py --version + Arguments: - dataset_id dataset id - session_ids list of session ids - + dataset_provider specify either 'langfuse' or 'langsmith' + dataset_id_or_name dataset id if langsmith or name if langfuse + session_or_run_ids list of session or run ids + Options: + -v Verbose output -h --help Show this screen --version Show version - -v Verbose output for debugging + The exported CSV file will have these columns : 'Reference input'|'Reference output'|'Response 1'|'Sources 1'|...|'Response N'|'Sources N' NB: There will be as many responses as run sessions """ + import csv import json import logging import os +import sys import time import requests from docopt import docopt +from dotenv import load_dotenv +from langfuse import Langfuse -def first_true(iterable, predicate=None): +# Function to create the CSV header, works for both providers +def create_csv_header(_runs_names, provider, dataset_id): """ - Returns the first element of the iterator that satisfies the given predicate + Create the CSV file header. + This function handles both LangFuse and LangSmith providers. + Args: - iterable: the iterator - predicate: the predicate + _runs_names: List of run/session IDs. + provider: The provider being used (either 'langfuse' or 'langsmith'). + dataset_id : dataset id for langsmith or dataset name for langfuse - Returns: element of the iterator + Returns: + The CSV file header as a list. """ - return next(filter(predicate, iterable), None) + header_topic = 'Thématique de la question' + header_reference_input = 'Entrée de référence' + header_reference_output = 'Sortie de référence' + header_answer = 'Réponse' + header_sources = 'Sources' + csv_header = [header_topic, header_reference_input, header_reference_output] -def get_session_name(_id: str, sessions) -> str: - """ - Get session name. - Args: - _id: the session id - sessions: the sessions + if provider == 'langfuse': + counter = 1 + for run_name in _runs_names: + csv_header.append(f"{header_answer} {counter} ({run_name})") + csv_header.append(f"{header_sources} {counter} ({run_name})") + counter += 1 + + elif provider == 'langsmith': + sessions_content = get_sessions(dataset_id) + counter = 1 + for session_id in _runs_names: + session_name = get_session_name(session_id, sessions_content) + csv_header.append(f"{header_answer} {counter} ({session_name})") + csv_header.append(f"{header_sources} {counter} ({session_name})") + counter += 1 - Returns: the session name - """ - return first_true(sessions, predicate=lambda x: x['id'] == _id)['name'] + return csv_header -def create_csv_header(_session_ids: list[str]): +# LangFuse-specific functions +def fetch_trace_by_item_and_dataset_run(dataset_run, item): """ - Create the CSV file header + Fetches the trace for a dataset item from a LangFuse dataset run. + Args: - _session_ids: the session ids + dataset_run: The dataset run with items. + item: The dataset item. - Returns: the CSV file header + Returns: + Trace data if found, otherwise None. """ + for item_run in dataset_run: + if item.id == item_run.dataset_item_id: + trace = client.fetch_trace(item_run.trace_id) + return trace.data + return None - # CSV Headers - header_topic = 'Thématique de la question' - header_reference_input = 'Entrée de référence' - header_reference_output = 'Sortie de référence' - header_answer = 'Réponse' - header_sources = 'Sources' - # Init csv header - csv_header = [header_topic, header_reference_input, header_reference_output] +def append_runs_langfuse(dataset_item, _runs_names): + """ + Append LangFuse run data to the CSV. - # Get sessions info - sessions_content = get_sessions(dataset_id) + Args: + dataset_item: The dataset item. + _runs_names: List of run names. - # Complete csv header with sessions name - counter = 1 - for session_id in _session_ids: - session_name = get_session_name(session_id, sessions_content) - csv_header.append(f"{header_answer} {counter} ({session_name})") - csv_header.append(f"{header_sources} {counter} ({session_name})") - counter += 1 + Returns: + A list representing a line in the CSV. + """ + csv_line = [ + dataset_item.metadata["topic"] if dataset_item.metadata else "", + dataset_item.input["question"], + dataset_item.expected_output["answer"] + ] - return csv_header + for _run_name in _runs_names: + dataset_run = client.get_dataset_run(dataset_name=dataset_name, dataset_run_name=_run_name) + trace = fetch_trace_by_item_and_dataset_run(dataset_run.dataset_run_items, dataset_item) + if trace is None or trace.output is None or not isinstance(trace.output, dict): + csv_line.append('') # Empty if no trace is found + csv_line.append('') # Empty for sources as well + else: + csv_line.append(trace.output["answer"]) # Append the answer + csv_line.append(','.join([doc["metadata"]["url"] for doc in trace.output["source_documents"]])) + + return csv_line +# LangSmith-specific functions (restored) def get_sessions(_dataset_id): """ - Fetch the dataset run sessions + Fetches the dataset run sessions for LangSmith. + Args: - _dataset_id: the dataset id + _dataset_id: The dataset ID. - Returns: the dataset run sessions + Returns: + The sessions as a list. """ - logging.info(f'Call the LangSmith API to get run sessions for dataset_id={_dataset_id}.') sessions_response = requests.get( f'{base_url}/sessions?reference_dataset={_dataset_id}', @@ -104,25 +145,70 @@ def get_sessions(_dataset_id): logging.debug(f"Number of example runs obtained = {len(sessions_content)}") return sessions_content else: - logging.error(f"Failed to get example runs. \n" - f"Http code : {sessions_response.status_code} \n" - f"Content : {sessions_response.content}") + logging.error( + f"Failed to get example runs. \nHttp code: {sessions_response.status_code} \nContent: {sessions_response.content}") raise RuntimeError -def get_dataset_examples(nb_example): +def get_example_runs_content(example_id): """ - Fetch the dataset examples + Fetch runs of an example Args: - nb_example: number of examples to fetch + example_id: the example id + + Returns: the runs + """ + + logging.info(f'Call the LangSmith API to get dataset runs for the example_id={example_id}.') + example_runs_response = requests.post( + f'{base_url}/runs/query', + json={ + "reference_example": [ + example_id + ], + "is_root": "true", + "filter": "eq(is_root, true)", + "select": [ + "status", + # "inputs", + "outputs", + "end_time", + "total_cost", + # "extra", + "feedback_stats", + "error" + ], + "limit": 15 + }, + headers={"x-api-key": _LANGSMITH_API_KEY}, + ) + if example_runs_response.status_code == 200: + example_runs_content = json.loads(example_runs_response.content) + logging.debug(f"Number of example runs obtained = {len(example_runs_content)}") + return example_runs_content + else: + logging.error(f"Failed to get example runs. \n" + f"Http code : {example_runs_response.status_code} \n" + f"Content : {example_runs_response.content}") + raise RuntimeError + - Returns: the dataset examples +# Restoring get_dataset_examples function for LangSmith +def get_dataset_examples(nb_example, dataset_id): """ + Fetch the dataset examples from LangSmith. + Args: + nb_example: The number of examples to fetch. + dataset_id: The dataset ID to fetch examples for. + + Returns: + The dataset examples. + """ examples = [] counter = nb_example offset = 0 - limit = 100 # less_than_equal should be less than or equal to 100 + limit = 100 # Less than or equal to 100 while counter > 0: logging.info(f'Call the LangSmith API to get {limit} dataset examples, starting from {offset}.') @@ -137,18 +223,64 @@ def get_dataset_examples(nb_example): logging.debug(f"Number of examples obtained = {len(dataset_examples_content)}") logging.debug(f"Number of examples collected = {len(examples)}/{nb_example}") else: - logging.error(f"Failed to get dataset examples. \n" - f"Http code : {dataset_examples_response.status_code} \n" - f"Content : {dataset_examples_response.content}") + logging.error( + f"Failed to get dataset examples. \nHttp code: {dataset_examples_response.status_code} \nContent: {dataset_examples_response.content}") raise RuntimeError counter -= limit offset += limit - logging.info(f"Fetched dataset examples = {len(examples)}/{nb_example} -> {len(examples) / nb_example * 100:.2f}%") + logging.info(f"Fetched dataset examples = {len(examples)}/{nb_example}") return examples +def append_runs_langsmith(dataset_example, _session_ids): + """ + Append LangSmith run data to the CSV. + + Args: + dataset_example: The dataset example. + _session_ids: List of session IDs. + + Returns: + A list representing a line in the CSV. + """ + csv_line = [ + dataset_example["inputs"]["metadata"]["topic"], + dataset_example["inputs"]["question"], + dataset_example["outputs"]["answer"] + ] + + example_runs_content = get_example_runs_content(dataset_example['id']) + + for _id in _session_ids: + run = first_true(example_runs_content['runs'], lambda x: x['session_id'] == _id) + if run is None or run["outputs"] is None: + csv_line.append('') + csv_line.append('') + elif run["error"]: + csv_line.append(run["error"]) + csv_line.append('') + else: + csv_line.append(run["outputs"]["answer"]) + csv_line.append(','.join([doc["metadata"]["url"] for doc in run["outputs"]["source_documents"]])) + + return csv_line + + +def get_session_name(_id: str, sessions) -> str: + """ + Get session name. + Args: + _id: the session id + sessions: the sessions + + Returns: the session name + """ + + return first_true(sessions, predicate=lambda x: x['id'] == _id)['name'] + + def get_example_runs_content(example_id): """ Fetch runs of an example @@ -192,111 +324,80 @@ def get_example_runs_content(example_id): raise RuntimeError -def append_example_runs(dataset_example, _session_ids): +def first_true(iterable, predicate=None): """ - Append in a CSV line, the fetched runs for the given dataset example + Returns the first element in the iterable that satisfies the predicate. + Args: - dataset_example: the dataset example - _session_ids: the session ids + iterable: The iterable to search. + predicate: The condition to check for. - Returns: The CSV line + Returns: + The first element satisfying the predicate. """ + return next(filter(predicate, iterable), None) - # Init csv line - csv_line = [ - dataset_example["inputs"]["metadata"]["topic"], - dataset_example["inputs"]["question"], - dataset_example["outputs"]["answer"] - ] - - # Get example runs - example_runs_content = get_example_runs_content(dataset_example['id']) - # Complete csv line with example run result - for _id in _session_ids: - run = first_true(example_runs_content['runs'], predicate=lambda x: x['session_id'] == _id) - if run is None: - csv_line.append('') - csv_line.append('') - elif run["error"]: - csv_line.append(run["error"]) - csv_line.append('') - else: - csv_line.append(run["outputs"]["answer"]) - csv_line.append( - ','.join([doc["metadata"]["url"] for doc in run["outputs"]["source_documents"]]) - ) +# Check for environment variables from LangFuse and LangSmith +def check_environment_variables(provider): + """ + Checks the required environment variables based on the provider. - return csv_line + Args: + provider: The provider being used ('langfuse' or 'langsmith'). + """ + if provider == 'langfuse': + if not os.getenv('LANGFUSE_SECRET_KEY'): + logging.error('Cannot proceed: LANGFUSE_SECRET_KEY is not defined.') + sys.exit(1) + if not os.getenv('LANGFUSE_HOST'): + logging.error('Cannot proceed: LANGFUSE_HOST is not defined.') + sys.exit(1) + elif provider == 'langsmith': + if not os.getenv('LANGCHAIN_API_KEY'): + logging.error('Cannot proceed: LANGCHAIN_API_KEY is not defined.') + sys.exit(1) if __name__ == '__main__': start_time = time.time() + load_dotenv() # Load environment variables from .env file - cli_args = docopt(__doc__, version='Webscraper 0.1.0') - # Set logging level - log_format = '%(levelname)s:%(module)s:%(message)s' - logging.basicConfig( - level=logging.DEBUG if cli_args['-v'] else logging.INFO, format=log_format - ) - # Get LangSmith API key from environment - _LANGSMITH_API_KEY = os.environ["LANGCHAIN_API_KEY"] - # The LangSmith API base url - base_url = 'https://api.smith.langchain.com/api/v1' - # The script arguments - dataset_id = cli_args[''] - session_ids = cli_args[''] - - try: - logging.info(f'Call the LangSmith API to get the dataset information for dataset_id={dataset_id}.') - dataset_info_response = requests.get( - f'{base_url}/datasets?id={dataset_id}', - headers={"x-api-key": _LANGSMITH_API_KEY}, - ) + cli_args = docopt(__doc__, version='Export Run Results 0.1.0') - # Exit the programme if an error occurs - if dataset_info_response.status_code != 200: - logging.error(f"Failed to get dataset information. \n" - f"Http code : {dataset_info_response.status_code} \n" - f"Content : {dataset_info_response.content}") - exit(1) - - # No error occurred, continue loading content - dataset_info_content = json.loads(dataset_info_response.content) - example_counter = dataset_info_content[0]['example_count'] - logging.info(f"Number of examples in dataset = {example_counter}") - - # Get dataset examples - dataset_examples = get_dataset_examples(example_counter) - - # Exit the programme if no runs is found - if len(dataset_examples) == 0: - logging.error("No runs found !") - exit(1) - - # Get the runs of all examples, then create a csv file - # CSV filename - output_csv_file = f"export_run_result_{dataset_id}_{int(time.time())}.csv" - # CSV header line - csv_lines = [create_csv_header(session_ids)] - - # CSV data lines - index = 1 - for example in dataset_examples: - csv_lines.append(append_example_runs(example, session_ids)) - progress = index / example_counter * 100 - logging.info(f"Example processed : {index}/{example_counter} - Progression : {progress:.2f}%") - index += 1 - - # Creation of CSV file - with open(output_csv_file, 'w', newline='') as csv_file: - writer = csv.writer(csv_file, delimiter='|') - writer.writerows(csv_lines) - logging.info(f"Successful csv generation. Filename : {output_csv_file}") - except requests.exceptions.RequestException as e: - logging.error("A connection error has occurred : %s", e) - - logging.info( - 'End of execution. (Duration : %.2f seconds)', - time.time() - start_time - ) + provider = cli_args[''] + log_format = '%(levelname)s:%(module)s:%(message)s' + logging.basicConfig(level=logging.DEBUG if cli_args['-v'] else logging.INFO, format=log_format) + + check_environment_variables(provider) # Check environment variables based on provider + + csv_lines = [] + if provider == 'langfuse': + dataset_name = cli_args[''] + runs_names = cli_args[''] + client = Langfuse() + dataset = client.get_dataset(name=dataset_name) + csv_lines = [create_csv_header(runs_names, provider, dataset_name)] + for item in dataset.items: + csv_lines.append(append_runs_langfuse(item, runs_names)) + + elif provider == 'langsmith': + # The LangSmith API base url + base_url = 'https://api.smith.langchain.com/api/v1' + # Get LangSmith API key from environment + _LANGSMITH_API_KEY = os.environ["LANGCHAIN_API_KEY"] + dataset_id = cli_args[''] + session_ids = cli_args[''] + dataset_info = get_sessions(dataset_id) + examples = get_dataset_examples(len(dataset_info), dataset_id) + csv_lines = [create_csv_header(session_ids, provider, dataset_id)] + for example in examples: + csv_lines.append(append_runs_langsmith(example, session_ids)) + + output_csv_file = f"export_run_result_{provider}_{int(time.time())}.csv" + with open(output_csv_file, 'w', newline='') as csv_file: + writer = csv.writer(csv_file, delimiter='|') + writer.writerows(csv_lines) + + logging.info(f"CSV file successfully generated: {output_csv_file}") + logging.info(f"Total execution time: {time.time() - start_time:.2f} seconds") diff --git a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/generate_dataset.py b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/generate_dataset.py index a5180e7fdf..3fc0efa55f 100644 --- a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/generate_dataset.py +++ b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/generate_dataset.py @@ -15,8 +15,8 @@ """Dataset generator. This script takes an excel file as input and generates a csv dataset as output. The generated dataset can also be directly sent to langsmith. Usage: - generate_dataset.py [-v] --range= [--csv-output=] [ --langsmith-dataset-name= ] [--locale=] [--no-answer=] - generate_dataset.py [-v] --sheet=... [--csv-output=] [ --langsmith-dataset-name= ] [--locale=] [--no-answer=] + generate_dataset.py [-v] --range= [--csv-output=] [ --langsmith-dataset-name= ] [ --langfuse-dataset-name= ] [--locale=] [--no-answer=] + generate_dataset.py [-v] --sheet=... [--csv-output=] [ --langsmith-dataset-name= ] [ --langfuse-dataset-name= ] [--locale=] [--no-answer=] Arguments: input_excel path to the input excel file @@ -26,13 +26,14 @@ --sheet= Sheet numbers to be parsed. Indices are 0-indexed. --csv-output= Output path of csv file to be generated. --langsmith-dataset-name= Name of the dataset to be saved on langsmith. + --langfuse-dataset-name= Name of the dataset to be saved on langfuse. --locale= Locale to be included in de dataset. [default: French] --no-answer= Label of no_answer to be included in the dataset. [default: NO_RAG_SENTENCE] -h --help Show this screen --version Show version -v Verbose output for debugging (without this option, script will be silent but for errors) -Generates a testing dataset based on an input file. The input file should have the correct format (see generate_datset_input.xlsx for sample). The generated dataset can be saved on filesystem, using the --csv-output option, on langsmith, using the --langsmith-dataset-name option, or both. +Generates a testing dataset based on an input file. The input file should have the correct format (see generate_datset_input.xlsx for sample). The generated dataset can be saved on filesystem, using the --csv-output option, on langsmith, using the --langsmith-dataset-name option, on langfuse using the --langfuse-dataset-name option, or both. """ import logging @@ -43,6 +44,8 @@ import pandas as pd from docopt import docopt +from dotenv import load_dotenv +from langfuse import Langfuse from langsmith import Client @@ -57,64 +60,64 @@ def _generate_dataset( def _parse_sheet(filename: str, sheet_index: int) -> pd.DataFrame: - logging.debug("Parsing sheet %s", sheet_index) + logging.debug('Parsing sheet %s', sheet_index) df = pd.read_excel(filename, sheet_name=sheet_index, header=None) df = df.drop(0, axis=0) # Remove header row df = df.drop([3, 4, 6, 7], axis=1) # Remove unnecessary columns - df = df.rename(columns={0: "topic", 1: "question", 2: "answer", 5: "quality"}) - df = df.loc[df["quality"].notnull()] # Ignore not annotated questions - df = df.loc[df["answer"].notnull()] # Ignore not answered questions - df["topic"] = df["topic"].ffill() # Fill in blanks in topic + df = df.rename(columns={0: 'topic', 1: 'question', 2: 'answer', 5: 'quality'}) + df = df.loc[df['quality'].notnull()] # Ignore not annotated questions + df = df.loc[df['answer'].notnull()] # Ignore not answered questions + df['topic'] = df['topic'].ffill() # Fill in blanks in topic return df def _add_locale(dataset: pd.DataFrame, locale: str) -> pd.DataFrame: - logging.debug("Using locale %s", locale) + logging.debug('Using locale %s', locale) return dataset.assign(locale=locale) def _add_no_answer(dataset: pd.DataFrame, no_answer: str) -> pd.DataFrame: - logging.debug("Using no_answer label %s", no_answer) + logging.debug('Using no_answer label %s', no_answer) return dataset.assign(no_answer=no_answer) def _parse_range(input_range: str) -> List[int]: - [a, b] = input_range.split(",") + [a, b] = input_range.split(',') return [i for i in range(int(a), int(b) + 1)] def _save_on_fs(dataset: pd.DataFrame, path: str): - logging.info("Saving dataset on path %s", path) + logging.info('Saving dataset on path %s', path) dataset.to_csv(path, index=False) def _send_to_langsmith(dataset: pd.DataFrame, dataset_name: str): # Transforms dataset to langsmith format - records = dataset.to_json(orient="records") + records = dataset.to_json(orient='records') records = loads(str(records)) # Creates dataset in langsmith client = Client() - logging.info("Creating dataset %s on langsmith...", dataset_name) + logging.info('Creating dataset %s on langsmith...', dataset_name) ls_dataset = client.create_dataset(dataset_name=dataset_name) - logging.info("Creating examples on langsmith dataset id %s...", ls_dataset.id) + logging.info('Creating examples on langsmith dataset id %s...', ls_dataset.id) client.create_examples( inputs=[ { - "question": r["question"], - "locale": r["locale"], - "no_answer": r["no_answer"], - "metadata": { - "topic": r["topic"], + 'question': r['question'], + 'locale': r['locale'], + 'no_answer': r['no_answer'], + 'metadata': { + 'topic': r['topic'], }, } for r in records ], outputs=[ { - "answer": r["answer"], - "quality": r["quality"], + 'answer': r['answer'], + 'quality': r['quality'], } for r in records ], @@ -122,49 +125,103 @@ def _send_to_langsmith(dataset: pd.DataFrame, dataset_name: str): ) -if __name__ == "__main__": - cli_args = docopt(__doc__, version="Dataset generator 0.1.0") +def _send_to_langfuse(dataset: pd.DataFrame, dataset_name: str): + # Transforms dataset to JSON format + records = dataset.to_json(orient='records') + records = loads(str(records)) + + # Initializes the Langfuse client + client = Langfuse() + + logging.info('Creating dataset %s on Langfuse...', dataset_name) + + # Creates dataset in Langfuse + lf_dataset = client.create_dataset(name=dataset_name) + + logging.info('Creating examples on Langfuse dataset id %s...', lf_dataset.id) + + # Prepares inputs and outputs + inputs = [ + { + 'question': r['question'], + 'locale': r['locale'], + 'no_answer': r['no_answer'], + } + for r in records + ] + metadatas = [{'topic': r['topic']} for r in records] + + outputs = [ + { + 'answer': r['answer'], + 'quality': r['quality'], + } + for r in records + ] + + # Creates examples in the dataset on Langfuse + for input, metadata, output in zip(inputs, metadatas, outputs): + logging.info('import data') + client.create_dataset_item( + dataset_name=dataset_name, + input=input, + expected_output=output, + metadata=metadata, + ) + +if __name__ == '__main__': + cli_args = docopt(__doc__, version='Dataset generator 0.1.0') + load_dotenv() # Set logging level - log_format = "%(levelname)s:%(module)s:%(message)s" + log_format = '%(levelname)s:%(module)s:%(message)s' logging.basicConfig( - level=logging.DEBUG if cli_args["-v"] else logging.INFO, format=log_format + level=logging.DEBUG if cli_args['-v'] else logging.INFO, format=log_format ) # check if input filer exists - filename = cli_args[""] + filename = cli_args[''] if not os.path.isfile(filename): - logging.error("Specified input excel file was not found.") + logging.error('Specified input excel file was not found.') exit(1) # check if langsmith creds is set - langsmith_dataset_name = cli_args["--langsmith-dataset-name"] - if langsmith_dataset_name and not os.environ.get("LANGCHAIN_API_KEY"): - logging.error("Envvar LANGCHAIN_API_KEY not found.") + langsmith_dataset_name = cli_args['--langsmith-dataset-name'] + if langsmith_dataset_name and not os.environ.get('LANGCHAIN_API_KEY'): + logging.error('Envvar LANGCHAIN_API_KEY not found.') + exit(1) + + # check if langfuse creds is set + langfuse_dataset_name = cli_args['--langfuse-dataset-name'] + if langfuse_dataset_name and not os.environ.get('LANGFUSE_PUBLIC_KEY'): + logging.error('Envvar LANGFUSE_PUBLIC_KEY not found.') exit(1) # check if output file can be written - output_path = cli_args["--csv-output"] + output_path = cli_args['--csv-output'] if output_path and not Path(output_path).parent.exists(): logging.error( - "Cannot proceed: directory %s does not exist", Path(output_path).parent + 'Cannot proceed: directory %s does not exist', Path(output_path).parent ) exit(1) - if cli_args.get("--range") is not None: - sheet_indices = _parse_range(str(cli_args["--range"])) + if cli_args.get('--range') is not None: + sheet_indices = _parse_range(str(cli_args['--range'])) else: - sheet_indices = [int(i) for i in cli_args["--sheet"]] + sheet_indices = [int(i) for i in cli_args['--sheet']] dataset = _generate_dataset( filename=filename, sheet_indices=sheet_indices, - locale=cli_args["--locale"] or "French", - no_answer=cli_args["--no-answer"] or "NO_RAG_SENTENCE", + locale=cli_args['--locale'] or 'French', + no_answer=cli_args['--no-answer'] or 'NO_RAG_SENTENCE', ) if output_path: _save_on_fs(dataset, output_path) if langsmith_dataset_name: - _send_to_langsmith(dataset, cli_args["--langsmith-dataset-name"]) + _send_to_langsmith(dataset, cli_args['--langsmith-dataset-name']) + + if langfuse_dataset_name: + _send_to_langfuse(dataset, cli_args['--langfuse-dataset-name']) diff --git a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/poetry.lock b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/poetry.lock index 00e25a609c..4f4c330af7 100644 --- a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/poetry.lock +++ b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/poetry.lock @@ -1,4 +1,4 @@ -# This file is automatically @generated by Poetry 1.7.1 and should not be changed by hand. +# This file is automatically @generated by Poetry 1.8.3 and should not be changed by hand. [[package]] name = "aiohappyeyeballs" @@ -241,6 +241,17 @@ files = [ {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, ] +[[package]] +name = "backoff" +version = "2.2.1" +description = "Function decoration for backoff and retry" +optional = false +python-versions = ">=3.7,<4.0" +files = [ + {file = "backoff-2.2.1-py3-none-any.whl", hash = "sha256:63579f9a0628e06278f7e47b7d7d5b6ce20dc65c5e96a6f3ca99a6adca0396e8"}, + {file = "backoff-2.2.1.tar.gz", hash = "sha256:03f829f5bb1923180821643f8753b0502c3b682293992485b0eef2807afa5cba"}, +] + [[package]] name = "beautifulsoup4" version = "4.12.3" @@ -1142,6 +1153,31 @@ langchain = ["langchain (>=0.0.309)"] llama-index = ["llama-index (>=0.10.12,<2.0.0)"] openai = ["openai (>=0.27.8)"] +[[package]] +name = "langfuse" +version = "2.43.3" +description = "A client library for accessing langfuse" +optional = false +python-versions = "<4.0,>=3.8.1" +files = [ + {file = "langfuse-2.43.3-py3-none-any.whl", hash = "sha256:62a368009dd26f698905321a52929ab4e75996a871f41db2892beb5257ab69d2"}, + {file = "langfuse-2.43.3.tar.gz", hash = "sha256:046d872d0d0053d02816d5e5a610be0e4ae7ebb69e65d979111fc522be965691"}, +] + +[package.dependencies] +anyio = ">=4.4.0,<5.0.0" +backoff = ">=1.10.0" +httpx = ">=0.15.4,<1.0" +idna = ">=3.7,<4.0" +packaging = ">=23.2,<24.0" +pydantic = ">=1.10.7,<3.0" +wrapt = ">=1.14,<2.0" + +[package.extras] +langchain = ["langchain (>=0.0.309)"] +llama-index = ["llama-index (>=0.10.12,<2.0.0)"] +openai = ["openai (>=0.27.8)"] + [[package]] name = "langsmith" version = "0.1.106" @@ -2572,4 +2608,4 @@ multidict = ">=4.0" [metadata] lock-version = "2.0" python-versions = "^3.9" -content-hash = "08beb72f353d7cf745df16de29f41e8958e4d97a32af5f6fbea48b6d4fc32bf3" +content-hash = "90e2e97777d72e62dcccbc56564a6a34400ea660e4093ee304ba03f5a7137ba7" diff --git a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/pyproject.toml b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/pyproject.toml index 707d0cc992..79b3182389 100644 --- a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/pyproject.toml +++ b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/pyproject.toml @@ -22,6 +22,7 @@ pre-commit = "^3.7.1" aiometer = "^0.5.0" aiohttp = "^3.9.5" aiohttp-socks = "^0.8.4" +langfuse = "^2.43.3" [build-system] requires = ["poetry-core"] diff --git a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/rag_testing_tool.py b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/rag_testing_tool.py index 9109e68561..0bc8da1607 100644 --- a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/rag_testing_tool.py +++ b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/rag_testing_tool.py @@ -13,11 +13,11 @@ # limitations under the License. # """Retrieval-Augmented Generation (RAG) endpoint settings testing tool based -on LangSmith's SDK: runs a specific RAG Settings configuration against a +on LangSmith's or LangFuse's SDK: runs a specific RAG Settings configuration against a reference dataset. Usage: - rag_testing_tool.py [-v] [] + rag_testing_tool.py [-v] [] rag_testing_tool.py -h | --help rag_testing_tool.py --version @@ -27,6 +27,7 @@ provider, indexation session's unique id, and 'k', i.e. nb of retrieved docs (question and chat history are ignored, as they will come from the dataset) + dataset_provider the dataset provider (langsmith or langfuse) dataset_name the reference dataset name test_name name of the test run @@ -38,7 +39,7 @@ be silent but for errors) Build a RAG (Lang)chain from the RAG Query and runs it against the provided -LangSmith dataset. The chain is created anew for each entry of the dataset, and +LangSmith or LangFuse dataset. The chain is created anew for each entry of the dataset, and if a delay is provided, each chain creation will be delayed accordingly. """ import json @@ -52,10 +53,14 @@ from docopt import docopt from dotenv import load_dotenv -from langsmith import Client - from gen_ai_orchestrator.routers.requests.requests import RagQuery from gen_ai_orchestrator.services.langchain.rag_chain import create_rag_chain +from langfuse import Langfuse +from langsmith import Client +from tenacity import ( + retry, + stop_after_attempt, + wait_random_exponential, ) def test_rag(args): @@ -66,6 +71,7 @@ def test_rag(args): args (dict): A dictionary containing command-line arguments. Expecting keys: '' + '' '' '' '' @@ -88,7 +94,41 @@ def _construct_chain(): 'chat_history': lambda x: x['chat_history'] if 'chat_history' in x else [], } | create_rag_chain(RagQuery(**rag_query)) - client = Client() + @retry(wait=wait_random_exponential(min=10, max=60), stop=stop_after_attempt(5)) + def run_dataset(run_name_dataset): + + if args[''].lower() == 'langsmith': + client = Client() + client.run_on_dataset( + + dataset_name=args[''], + llm_or_chain_factory=_construct_chain, + project_name=run_name_dataset, + project_metadata={ + 'index_session_id': index_session_id, + 'k': k, + }, + concurrency_level=concurrency_level, + ) + elif args[''].lower() == 'langfuse': + client = Langfuse() + dataset = client.get_dataset(args['']) + + for item in dataset.items: + callback_handlers = [] + handler = item.get_langchain_handler( + run_name=run_name_dataset, + run_metadata={ + 'index_session_id': index_session_id, + 'k': k, + }, + ) + callback_handlers.append(handler) + _construct_chain().invoke( + item.input, config={'callbacks': callback_handlers} + ) + client.flush() + search_params = rag_query['document_search_params'] index_session_id = search_params['filter'][0]['term'][ 'metadata.index_session_id.keyword' @@ -101,17 +141,8 @@ def _construct_chain(): # one at a time if args['']: concurrency_level = 1 - - client.run_on_dataset( - dataset_name=args[''], - llm_or_chain_factory=_construct_chain, - project_name=args[''] + '-' + str(uuid4())[:8], - project_metadata={ - 'index_session_id': index_session_id, - 'k': k, - }, - concurrency_level=concurrency_level, - ) + run_name_dataset = args[''] + '-' + str(uuid4())[:8] + run_dataset(run_name_dataset) duration = datetime.now() - start_time hours, remainder = divmod(duration.seconds, 3600) @@ -133,22 +164,38 @@ def _construct_chain(): ) load_dotenv() - - # Check env (LangSmith) - langchain_endpoint = os.getenv('LANGCHAIN_ENDPOINT') - if not langchain_endpoint: - logging.error( - 'Cannot proceed: LANGCHAIN_ENDPOINT env variable is not defined (define it in a .env file)' - ) - sys.exit(1) - - langchain_apikey = os.getenv('LANGCHAIN_API_KEY') - if not langchain_apikey: + if cli_args[''].lower() == 'langsmith': + # Check env (LangSmith) + langchain_apikey = os.getenv('LANGCHAIN_API_KEY') + if not langchain_apikey: + logging.error( + 'Cannot proceed: LANGCHAIN_API_KEY env variable is not defined (define it in a .env file)' + ) + sys.exit(1) + elif cli_args[''].lower() == 'langfuse': + langfuse_secret_key = os.getenv('LANGFUSE_SECRET_KEY') + if not langfuse_secret_key: + logging.error( + 'Cannot proceed: LANGFUSE_SECRET_KEY env variable is not defined (define it in a .env file)' + ) + sys.exit(1) + langchain_host = os.getenv('LANGFUSE_HOST') + if not langchain_host: + logging.error( + 'Cannot proceed: LANGFUSE_HOST env variable is not defined (define it in a .env file)' + ) + sys.exit(1) + langfuse_public_key = os.getenv('LANGFUSE_PUBLIC_KEY') + if not langfuse_public_key: + logging.error( + 'Cannot proceed: LANGFUSE_PUBLIC_KEY env variable is not defined (define it in a .env file)' + ) + sys.exit(1) + else: logging.error( - 'Cannot proceed: LANGCHAIN_API_KEY env variable is not defined (define it in a .env file)' + 'Cannot proceed: dataset_provider is not valid, only langfuse or langsmith' ) sys.exit(1) - # Check args: # - RAGQuery JSON file rag_query_file_path = Path(cli_args['']) diff --git a/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/ragquery.json.example b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/ragquery.json.example new file mode 100644 index 0000000000..0780246006 --- /dev/null +++ b/gen-ai/orchestrator-server/src/main/python/tock-llm-indexing-tools/ragquery.json.example @@ -0,0 +1,55 @@ +{ + "history": [], + "question_answering_llm_setting": { + "provider": "AzureOpenAIService", + "api_key": { + "type": "Raw", + "value": "*****************" + }, + "model": "gpt-4o", + "deployment_name": "*******************", + "repetition_penalty":1.0, + "max_new_tokens":256, + "api_base": "******************************", + "api_version": "2024-03-01-preview", + "temperature": 0.4 , + "prompt": "Use the following context to answer the question at the end.\nIf you don't know the answer, just say {no_answer}.\n Context: {context}\nQuestion: {question} \n Answer in {locale}:" + }, + "question_answering_prompt_inputs": { + "question": "How to get started playing guitar ?", + "no_answer": "Sorry, I don't know.", + "locale": "French" + }, + "embedding_question_em_setting": { + "provider": "AzureOpenAIService", + "api_key": { + "type": "Raw", + "value": "*****************************" + }, + "deployment_name": "********************", + "api_base": "***********************************", + "api_version": "2024-03-01-preview", + "model": "text-embedding-ada-002" + }, + "document_index_name": "ns-03-bot-cmso", + "document_search_params": { + "provider": "OpenSearch", + "filter": [ + { + "term": { + "metadata.index_session_id.keyword": "****************************************" + } + } + ], + "k": 4 + }, + "observability_setting": { + "provider": "Langfuse", + "url": "http://localhost:3000", + "secret_key":{ + "type": "Raw", + "value": "************************" + }, + "public_key":"********************************" + } +} \ No newline at end of file