diff --git a/callautomation-openai-sample/main.py b/callautomation-openai-sample/main.py index 2d6c37a..3f430f8 100644 --- a/callautomation-openai-sample/main.py +++ b/callautomation-openai-sample/main.py @@ -5,44 +5,45 @@ from azure.eventgrid import EventGridEvent, SystemEventNames import requests from flask import Flask, Response, request, json +import functools from logging import INFO import re +from aiohttp import web from azure.communication.callautomation import ( - CallAutomationClient, PhoneNumberIdentifier, RecognizeInputType, TextSource ) +from azure.communication.callautomation.aio import ( + CallAutomationClient + ) from azure.core.messaging import CloudEvent -import openai - -from openai.api_resources import ( - ChatCompletion -) +import asyncio +from openai import AsyncAzureOpenAI # Your ACS resource connection string -ACS_CONNECTION_STRING = "" +ACS_CONNECTION_STRING = "" # Cognitive service endpoint -COGNITIVE_SERVICE_ENDPOINT="" +COGNITIVE_SERVICE_ENDPOINT="" # Cognitive service endpoint -AZURE_OPENAI_SERVICE_KEY = "" +AZURE_OPENAI_SERVICE_KEY = "" # Open AI service endpoint -AZURE_OPENAI_SERVICE_ENDPOINT="" +AZURE_OPENAI_SERVICE_ENDPOINT="" # Azure Open AI Deployment Model Name -AZURE_OPENAI_DEPLOYMENT_MODEL_NAME="" +AZURE_OPENAI_DEPLOYMENT_MODEL_NAME="call-automation-deployment" # Azure Open AI Deployment Model AZURE_OPENAI_DEPLOYMENT_MODEL="gpt-3.5-turbo" # Agent Phone Number -AGENT_PHONE_NUMBER="" +AGENT_PHONE_NUMBER="" # Callback events URI to handle callback events. -CALLBACK_URI_HOST = "" +CALLBACK_URI_HOST = "" CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks" ANSWER_PROMPT_SYSTEM_TEMPLATE = """ @@ -76,18 +77,36 @@ recording_chunks_location = [] max_retry = 2 -openai.api_key = AZURE_OPENAI_SERVICE_KEY -openai.api_base = AZURE_OPENAI_SERVICE_ENDPOINT # your endpoint should look like the following https://YOUR_RESOURCE_NAME.openai.azure.com/ -openai.api_type = 'azure' -openai.api_version = '2023-05-15' # this may change in the future - app = Flask(__name__) -def get_chat_completions_async(system_prompt,user_prompt): - openai.api_key = AZURE_OPENAI_SERVICE_KEY +""" def async_action(): + def wrapper(func): + @functools.wraps(func) + async def wrapped(*args, **kwargs): + # Some fancy foo stuff + return await func(*args, **kwargs) + return wrapped + return wrapper """ + +def async_action(f): + @functools.wraps(f) + async def wrapped(*args, **kwargs): + return await f(*args, **kwargs) + return wrapped + +openai_client = AsyncAzureOpenAI( + # This is the default and can be omitted + api_key=AZURE_OPENAI_SERVICE_KEY, + api_version="2024-02-01", + azure_endpoint=AZURE_OPENAI_SERVICE_ENDPOINT +) + + +async def get_chat_completions_async(system_prompt,user_prompt): + """ openai.api_key = AZURE_OPENAI_SERVICE_KEY openai.api_base = AZURE_OPENAI_SERVICE_ENDPOINT # your endpoint should look like the following https://YOUR_RESOURCE_NAME.openai.azure.com/ openai.api_type = 'azure' - openai.api_version = '2023-05-15' # this may change in the future + openai.api_version = '2023-05-15' # this may change in the future """ # Define your chat completions request chat_request = [ @@ -95,25 +114,41 @@ def get_chat_completions_async(system_prompt,user_prompt): {"role": "user", "content": f"In less than 200 characters: respond to this question: {user_prompt}?"} ] + app.logger.info("get_chat_completions_async") + + + """ models = openai_client.models + for model in models: + app.logger.info(model) """ global response_content + global response try: - response = ChatCompletion.create(model=AZURE_OPENAI_DEPLOYMENT_MODEL,deployment_id=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME, messages=chat_request,max_tokens = 1000) - except ex: - app.logger.info("error in openai api call : %s",ex) - + response = await openai_client.chat.completions.create( + model=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME, + messages=chat_request, + max_tokens = 1000) + except Exception as ex: + app.logger.info("error in openai api call : %s", ex) + # Extract the response content if response is not None : - response_content = response['choices'][0]['message']['content'] + response_content = response.choices[0].message.content else : - response_content="" - return response_content + response_content = "" + + app.logger.info("chat gpt resonse content : %s", response_content) + return response_content + -def get_chat_gpt_response(speech_input): - return get_chat_completions_async(ANSWER_PROMPT_SYSTEM_TEMPLATE,speech_input) +async def get_chat_gpt_response(speech_input): + app.logger.info("get_chat_gpt_response, speech_text =%s", + speech_input) + return await get_chat_completions_async(ANSWER_PROMPT_SYSTEM_TEMPLATE,speech_input) -def handle_recognize(replyText,callerId,call_connection_id,context=""): + +async def handle_recognize(replyText,callerId,call_connection_id,context=""): play_source = TextSource(text=replyText, voice_name="en-US-NancyNeural") - recognize_result=call_automation_client.get_call_connection(call_connection_id).start_recognizing_media( + recognize_result= await call_automation_client.get_call_connection(call_connection_id).start_recognizing_media( input_type=RecognizeInputType.SPEECH, target_participant=PhoneNumberIdentifier(callerId), end_silence_timeout=10, @@ -121,23 +156,30 @@ def handle_recognize(replyText,callerId,call_connection_id,context=""): operation_context=context) app.logger.info("handle_recognize : data=%s",recognize_result) -def handle_play(call_connection_id, text_to_play, context): + +async def handle_play(call_connection_id, text_to_play, context): play_source = TextSource(text=text_to_play, voice_name= "en-US-NancyNeural") - call_automation_client.get_call_connection(call_connection_id).play_media_to_all(play_source, + await call_automation_client.get_call_connection(call_connection_id).play_media_to_all(play_source, operation_context=context) - -def handle_hangup(call_connection_id): - call_automation_client.get_call_connection(call_connection_id).hang_up(is_for_everyone=True) -def detect_escalate_to_agent_intent(speech_text, logger): - return has_intent_async(user_query=speech_text, intent_description="talk to agent", logger=logger) -def has_intent_async(user_query, intent_description, logger): +async def handle_hangup(call_connection_id): + await call_automation_client.get_call_connection(call_connection_id).hang_up(is_for_everyone=True) + + +async def detect_escalate_to_agent_intent(speech_text, logger): + return await has_intent_async(user_query=speech_text, intent_description="talk to agent", logger=logger) + + +async def has_intent_async(user_query, intent_description, logger): is_match=False system_prompt = "You are a helpful assistant" combined_prompt = f"In 1 word: does {user_query} have a similar meaning as {intent_description}?" + + logger.info("has_intent_async method executing") + #combined_prompt = base_user_prompt.format(user_query, intent_description) - response = get_chat_completions_async(system_prompt, combined_prompt) + response = await get_chat_completions_async(system_prompt, combined_prompt) if "yes" in response.lower(): is_match =True logger.info(f"OpenAI results: is_match={is_match}, customer_query='{user_query}', intent_description='{intent_description}'") @@ -150,7 +192,9 @@ def get_sentiment_score(sentiment_score): return int(match.group()) if match else -1 @app.route("/api/incomingCall", methods=['POST']) -def incoming_call_handler(): +#@async_action +async def incoming_call_handler(): + #data = await request.get_json() for event_dict in request.json: event = EventGridEvent.from_dict(event_dict) app.logger.info("incoming event data --> %s", event.data) @@ -175,7 +219,7 @@ def incoming_call_handler(): app.logger.info("callback url: %s", callback_uri) - answer_call_result = call_automation_client.answer_call(incoming_call_context=incoming_call_context, + answer_call_result = await call_automation_client.answer_call(incoming_call_context=incoming_call_context, cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT, callback_url=callback_uri) app.logger.info("Answered call for connection id: %s", @@ -183,9 +227,11 @@ def incoming_call_handler(): return Response(status=200) @app.route("/api/callbacks/", methods=["POST"]) -def handle_callback(contextId): +#@async_action +async def handle_callback(contextId): try: global caller_id , call_connection_id + #request_json = await request.get_json() app.logger.info("Request Json: %s", request.json) for event_dict in request.json: event = CloudEvent.from_dict(event_dict) @@ -198,7 +244,7 @@ def handle_callback(contextId): app.logger.info("call connected : data=%s", event.data) if event.type == "Microsoft.Communication.CallConnected": - handle_recognize(HELLO_PROMPT, + await handle_recognize(HELLO_PROMPT, caller_id,call_connection_id, context="GetFreeFormText") @@ -208,30 +254,32 @@ def handle_callback(contextId): app.logger.info("Recognition completed, speech_text =%s", speech_text); if speech_text is not None and len(speech_text) > 0: - if detect_escalate_to_agent_intent(speech_text=speech_text,logger=app.logger): + """ if await detect_escalate_to_agent_intent(speech_text=speech_text,logger=app.logger): handle_play(call_connection_id=call_connection_id,text_to_play=END_CALL_PHRASE_TO_CONNECT_AGENT,context=CONNECT_AGENT_CONTEXT) + else: """ + app.logger.info("sending text to opanai, speech_text =%s", + speech_text); + chat_gpt_response= await get_chat_gpt_response(speech_text) + app.logger.info(f"Chat GPT response:{chat_gpt_response}") + regex = re.compile(CHAT_RESPONSE_EXTRACT_PATTERN) + match = regex.search(chat_gpt_response) + if match: + answer = match.group(1) + sentiment_score = match.group(2).strip() + intent = match.group(3) + category = match.group(4) + app.logger.info(f"Chat GPT Answer={answer}, Sentiment Rating={sentiment_score}, Intent={intent}, Category={category}") + score=get_sentiment_score(sentiment_score) + app.logger.info(f"Score={score}") + if -1 < score < 5: + app.logger.info(f"Score is less than 5") + await handle_play(call_connection_id=call_connection_id,text_to_play=CONNECT_AGENT_PROMPT,context=CONNECT_AGENT_CONTEXT) + else: + app.logger.info(f"Score is more than 5") + await handle_recognize(answer,caller_id,call_connection_id,context="OpenAISample") else: - chat_gpt_response= get_chat_gpt_response(speech_text) - app.logger.info(f"Chat GPT response:{chat_gpt_response}") - regex = re.compile(CHAT_RESPONSE_EXTRACT_PATTERN) - match = regex.search(chat_gpt_response) - if match: - answer = match.group(1) - sentiment_score = match.group(2).strip() - intent = match.group(3) - category = match.group(4) - app.logger.info(f"Chat GPT Answer={answer}, Sentiment Rating={sentiment_score}, Intent={intent}, Category={category}") - score=get_sentiment_score(sentiment_score) - app.logger.info(f"Score={score}") - if -1 < score < 5: - app.logger.info(f"Score is less than 5") - handle_play(call_connection_id=call_connection_id,text_to_play=CONNECT_AGENT_PROMPT,context=CONNECT_AGENT_CONTEXT) - else: - app.logger.info(f"Score is more than 5") - handle_recognize(answer,caller_id,call_connection_id,context="OpenAISample") - else: - app.logger.info("No match found") - handle_recognize(chat_gpt_response,caller_id,call_connection_id,context="OpenAISample") + app.logger.info("No match found") + await handle_recognize(chat_gpt_response,caller_id,call_connection_id,context="OpenAISample") elif event.type == "Microsoft.Communication.RecognizeFailed": resultInformation = event.data['resultInformation'] @@ -239,24 +287,24 @@ def handle_callback(contextId): context=event.data['operationContext'] global max_retry if reasonCode == 8510 and 0 < max_retry: - handle_recognize(TIMEOUT_SILENCE_PROMPT,caller_id,call_connection_id) + await handle_recognize(TIMEOUT_SILENCE_PROMPT,caller_id,call_connection_id) max_retry -= 1 else: - handle_play(call_connection_id,GOODBYE_PROMPT, GOODBYE_CONTEXT) + await handle_play(call_connection_id,GOODBYE_PROMPT, GOODBYE_CONTEXT) elif event.type == "Microsoft.Communication.PlayCompleted": - context=event.data['operationContext'] + context=event.data['operationContext'] + app.logger.info(f"Context: " + context) if context.lower() == TRANSFER_FAILED_CONTEXT.lower() or context.lower() == GOODBYE_CONTEXT.lower() : - handle_hangup(call_connection_id) + await handle_hangup(call_connection_id) elif context.lower() == CONNECT_AGENT_CONTEXT.lower(): if not AGENT_PHONE_NUMBER or AGENT_PHONE_NUMBER.isspace(): app.logger.info(f"Agent phone number is empty") - handle_play(call_connection_id=call_connection_id,text_to_play=AGENT_PHONE_NUMBER_EMPTY_PROMPT) + await handle_play(call_connection_id=call_connection_id,text_to_play=AGENT_PHONE_NUMBER_EMPTY_PROMPT) else: app.logger.info(f"Initializing the Call transfer...") transfer_destination=PhoneNumberIdentifier(AGENT_PHONE_NUMBER) - call_connection_client =call_automation_client.get_call_connection(call_connection_id=call_connection_id) - call_connection_client.transfer_call_to_participant(target_participant=transfer_destination) + await call_automation_client.get_call_connection(call_connection_id=call_connection_id).transfer_call_to_participant(target_participant=transfer_destination) app.logger.info(f"Transfer call initiated: {context}") elif event.type == "Microsoft.Communication.CallTransferAccepted": @@ -268,15 +316,26 @@ def handle_callback(contextId): sub_code = resultInformation['subCode'] # check for message extraction and code app.logger.info(f"Encountered error during call transfer, message=, code=, subCode={sub_code}") - handle_play(call_connection_id=call_connection_id,text_to_play=CALLTRANSFER_FAILURE_PROMPT, context=TRANSFER_FAILED_CONTEXT) + await handle_play(call_connection_id=call_connection_id,text_to_play=CALLTRANSFER_FAILURE_PROMPT, context=TRANSFER_FAILED_CONTEXT) return Response(status=200) - except Exception as ex: - app.logger.info("error in event handling") + except ex: + app.logger.info(f"error in event handling exception = {ex}") @app.route("/") def hello(): return "Hello ACS CallAutomation!..test" +async def init_app(): + """Initialize the aiohttp web application.""" + app = web.Application() + app.router.add_post('/api/incomingCall', incoming_call_handler) + app.router.add_post('/api/callbacks/', handle_callback) + return app + if __name__ == '__main__': app.logger.setLevel(INFO) app.run(port=8080) + + #loop = asyncio.get_event_loop() + #web.run_app(init_app(), host='0.0.0.0', port=5000, loop=loop) + #asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy()) diff --git a/callautomation-openai-sample/requirements.txt b/callautomation-openai-sample/requirements.txt index 94dc155..d7cfa1d 100644 --- a/callautomation-openai-sample/requirements.txt +++ b/callautomation-openai-sample/requirements.txt @@ -1,4 +1,4 @@ Flask>=2.3.2 azure-eventgrid==4.11.0 azure-communication-callautomation==1.1.0 -openai==0.28.1 \ No newline at end of file +openai==1.30.2 \ No newline at end of file