Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added Async Await for openAI #51

Draft
wants to merge 3 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
213 changes: 136 additions & 77 deletions callautomation-openai-sample/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,44 +5,45 @@
from azure.eventgrid import EventGridEvent, SystemEventNames
import requests
from flask import Flask, Response, request, json
import functools
from logging import INFO
import re
from aiohttp import web
from azure.communication.callautomation import (
CallAutomationClient,
PhoneNumberIdentifier,
RecognizeInputType,
TextSource
)
from azure.communication.callautomation.aio import (
CallAutomationClient
)
from azure.core.messaging import CloudEvent
import openai

from openai.api_resources import (
ChatCompletion
)
import asyncio
from openai import AsyncAzureOpenAI

# Your ACS resource connection string
ACS_CONNECTION_STRING = "<ACS_CONNECTION_STRING>"
ACS_CONNECTION_STRING = ""

# Cognitive service endpoint
COGNITIVE_SERVICE_ENDPOINT="<COGNITIVE_SERVICE_ENDPOINT>"
COGNITIVE_SERVICE_ENDPOINT=""

# Cognitive service endpoint
AZURE_OPENAI_SERVICE_KEY = "<AZURE_OPENAI_SERVICE_KEY>"
AZURE_OPENAI_SERVICE_KEY = ""

# Open AI service endpoint
AZURE_OPENAI_SERVICE_ENDPOINT="<AZURE_OPENAI_SERVICE_ENDPOINT>"
AZURE_OPENAI_SERVICE_ENDPOINT=""

# Azure Open AI Deployment Model Name
AZURE_OPENAI_DEPLOYMENT_MODEL_NAME="<AZURE_OPENAI_DEPLOYMENT_MODEL_NAME>"
AZURE_OPENAI_DEPLOYMENT_MODEL_NAME="call-automation-deployment"

# Azure Open AI Deployment Model
AZURE_OPENAI_DEPLOYMENT_MODEL="gpt-3.5-turbo"

# Agent Phone Number
AGENT_PHONE_NUMBER="<AGENT_PHONE_NUMBER>"
AGENT_PHONE_NUMBER=""

# Callback events URI to handle callback events.
CALLBACK_URI_HOST = "<CALLBACK_URI_HOST_WITH_PROTOCOL>"
CALLBACK_URI_HOST = ""
CALLBACK_EVENTS_URI = CALLBACK_URI_HOST + "/api/callbacks"

ANSWER_PROMPT_SYSTEM_TEMPLATE = """
Expand Down Expand Up @@ -76,68 +77,109 @@
recording_chunks_location = []
max_retry = 2

openai.api_key = AZURE_OPENAI_SERVICE_KEY
openai.api_base = AZURE_OPENAI_SERVICE_ENDPOINT # your endpoint should look like the following https://YOUR_RESOURCE_NAME.openai.azure.com/
openai.api_type = 'azure'
openai.api_version = '2023-05-15' # this may change in the future

app = Flask(__name__)

def get_chat_completions_async(system_prompt,user_prompt):
openai.api_key = AZURE_OPENAI_SERVICE_KEY
""" def async_action():
def wrapper(func):
@functools.wraps(func)
async def wrapped(*args, **kwargs):
# Some fancy foo stuff
return await func(*args, **kwargs)
return wrapped
return wrapper """

def async_action(f):
@functools.wraps(f)
async def wrapped(*args, **kwargs):
return await f(*args, **kwargs)
return wrapped

openai_client = AsyncAzureOpenAI(
# This is the default and can be omitted
api_key=AZURE_OPENAI_SERVICE_KEY,
api_version="2024-02-01",
azure_endpoint=AZURE_OPENAI_SERVICE_ENDPOINT
)


async def get_chat_completions_async(system_prompt,user_prompt):
""" openai.api_key = AZURE_OPENAI_SERVICE_KEY
openai.api_base = AZURE_OPENAI_SERVICE_ENDPOINT # your endpoint should look like the following https://YOUR_RESOURCE_NAME.openai.azure.com/
openai.api_type = 'azure'
openai.api_version = '2023-05-15' # this may change in the future
openai.api_version = '2023-05-15' # this may change in the future """

# Define your chat completions request
chat_request = [
{"role": "system", "content": f"{system_prompt}"},
{"role": "user", "content": f"In less than 200 characters: respond to this question: {user_prompt}?"}
]

app.logger.info("get_chat_completions_async")


""" models = openai_client.models
for model in models:
app.logger.info(model) """
global response_content
global response
try:
response = ChatCompletion.create(model=AZURE_OPENAI_DEPLOYMENT_MODEL,deployment_id=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME, messages=chat_request,max_tokens = 1000)
except ex:
app.logger.info("error in openai api call : %s",ex)

response = await openai_client.chat.completions.create(
model=AZURE_OPENAI_DEPLOYMENT_MODEL_NAME,
messages=chat_request,
max_tokens = 1000)
except Exception as ex:
app.logger.info("error in openai api call : %s", ex)

# Extract the response content
if response is not None :
response_content = response['choices'][0]['message']['content']
response_content = response.choices[0].message.content
else :
response_content=""
return response_content
response_content = ""

app.logger.info("chat gpt resonse content : %s", response_content)
return response_content


def get_chat_gpt_response(speech_input):
return get_chat_completions_async(ANSWER_PROMPT_SYSTEM_TEMPLATE,speech_input)
async def get_chat_gpt_response(speech_input):
app.logger.info("get_chat_gpt_response, speech_text =%s",
speech_input)
return await get_chat_completions_async(ANSWER_PROMPT_SYSTEM_TEMPLATE,speech_input)

def handle_recognize(replyText,callerId,call_connection_id,context=""):

async def handle_recognize(replyText,callerId,call_connection_id,context=""):
play_source = TextSource(text=replyText, voice_name="en-US-NancyNeural")
recognize_result=call_automation_client.get_call_connection(call_connection_id).start_recognizing_media(
recognize_result= await call_automation_client.get_call_connection(call_connection_id).start_recognizing_media(
input_type=RecognizeInputType.SPEECH,
target_participant=PhoneNumberIdentifier(callerId),
end_silence_timeout=10,
play_prompt=play_source,
operation_context=context)
app.logger.info("handle_recognize : data=%s",recognize_result)

def handle_play(call_connection_id, text_to_play, context):

async def handle_play(call_connection_id, text_to_play, context):
play_source = TextSource(text=text_to_play, voice_name= "en-US-NancyNeural")
call_automation_client.get_call_connection(call_connection_id).play_media_to_all(play_source,
await call_automation_client.get_call_connection(call_connection_id).play_media_to_all(play_source,
operation_context=context)

def handle_hangup(call_connection_id):
call_automation_client.get_call_connection(call_connection_id).hang_up(is_for_everyone=True)

def detect_escalate_to_agent_intent(speech_text, logger):
return has_intent_async(user_query=speech_text, intent_description="talk to agent", logger=logger)

def has_intent_async(user_query, intent_description, logger):
async def handle_hangup(call_connection_id):
await call_automation_client.get_call_connection(call_connection_id).hang_up(is_for_everyone=True)


async def detect_escalate_to_agent_intent(speech_text, logger):
return await has_intent_async(user_query=speech_text, intent_description="talk to agent", logger=logger)


async def has_intent_async(user_query, intent_description, logger):
is_match=False
system_prompt = "You are a helpful assistant"
combined_prompt = f"In 1 word: does {user_query} have a similar meaning as {intent_description}?"

logger.info("has_intent_async method executing")

#combined_prompt = base_user_prompt.format(user_query, intent_description)
response = get_chat_completions_async(system_prompt, combined_prompt)
response = await get_chat_completions_async(system_prompt, combined_prompt)
if "yes" in response.lower():
is_match =True
logger.info(f"OpenAI results: is_match={is_match}, customer_query='{user_query}', intent_description='{intent_description}'")
Expand All @@ -150,7 +192,9 @@ def get_sentiment_score(sentiment_score):
return int(match.group()) if match else -1

@app.route("/api/incomingCall", methods=['POST'])
def incoming_call_handler():
#@async_action
async def incoming_call_handler():
#data = await request.get_json()
for event_dict in request.json:
event = EventGridEvent.from_dict(event_dict)
app.logger.info("incoming event data --> %s", event.data)
Expand All @@ -175,17 +219,19 @@ def incoming_call_handler():

app.logger.info("callback url: %s", callback_uri)

answer_call_result = call_automation_client.answer_call(incoming_call_context=incoming_call_context,
answer_call_result = await call_automation_client.answer_call(incoming_call_context=incoming_call_context,
cognitive_services_endpoint=COGNITIVE_SERVICE_ENDPOINT,
callback_url=callback_uri)
app.logger.info("Answered call for connection id: %s",
answer_call_result.call_connection_id)
return Response(status=200)

@app.route("/api/callbacks/<contextId>", methods=["POST"])
def handle_callback(contextId):
#@async_action
async def handle_callback(contextId):
try:
global caller_id , call_connection_id
#request_json = await request.get_json()
app.logger.info("Request Json: %s", request.json)
for event_dict in request.json:
event = CloudEvent.from_dict(event_dict)
Expand All @@ -198,7 +244,7 @@ def handle_callback(contextId):

app.logger.info("call connected : data=%s", event.data)
if event.type == "Microsoft.Communication.CallConnected":
handle_recognize(HELLO_PROMPT,
await handle_recognize(HELLO_PROMPT,
caller_id,call_connection_id,
context="GetFreeFormText")

Expand All @@ -208,55 +254,57 @@ def handle_callback(contextId):
app.logger.info("Recognition completed, speech_text =%s",
speech_text);
if speech_text is not None and len(speech_text) > 0:
if detect_escalate_to_agent_intent(speech_text=speech_text,logger=app.logger):
""" if await detect_escalate_to_agent_intent(speech_text=speech_text,logger=app.logger):
handle_play(call_connection_id=call_connection_id,text_to_play=END_CALL_PHRASE_TO_CONNECT_AGENT,context=CONNECT_AGENT_CONTEXT)
else: """
app.logger.info("sending text to opanai, speech_text =%s",
speech_text);
chat_gpt_response= await get_chat_gpt_response(speech_text)
app.logger.info(f"Chat GPT response:{chat_gpt_response}")
regex = re.compile(CHAT_RESPONSE_EXTRACT_PATTERN)
match = regex.search(chat_gpt_response)
if match:
answer = match.group(1)
sentiment_score = match.group(2).strip()
intent = match.group(3)
category = match.group(4)
app.logger.info(f"Chat GPT Answer={answer}, Sentiment Rating={sentiment_score}, Intent={intent}, Category={category}")
score=get_sentiment_score(sentiment_score)
app.logger.info(f"Score={score}")
if -1 < score < 5:
app.logger.info(f"Score is less than 5")
await handle_play(call_connection_id=call_connection_id,text_to_play=CONNECT_AGENT_PROMPT,context=CONNECT_AGENT_CONTEXT)
else:
app.logger.info(f"Score is more than 5")
await handle_recognize(answer,caller_id,call_connection_id,context="OpenAISample")
else:
chat_gpt_response= get_chat_gpt_response(speech_text)
app.logger.info(f"Chat GPT response:{chat_gpt_response}")
regex = re.compile(CHAT_RESPONSE_EXTRACT_PATTERN)
match = regex.search(chat_gpt_response)
if match:
answer = match.group(1)
sentiment_score = match.group(2).strip()
intent = match.group(3)
category = match.group(4)
app.logger.info(f"Chat GPT Answer={answer}, Sentiment Rating={sentiment_score}, Intent={intent}, Category={category}")
score=get_sentiment_score(sentiment_score)
app.logger.info(f"Score={score}")
if -1 < score < 5:
app.logger.info(f"Score is less than 5")
handle_play(call_connection_id=call_connection_id,text_to_play=CONNECT_AGENT_PROMPT,context=CONNECT_AGENT_CONTEXT)
else:
app.logger.info(f"Score is more than 5")
handle_recognize(answer,caller_id,call_connection_id,context="OpenAISample")
else:
app.logger.info("No match found")
handle_recognize(chat_gpt_response,caller_id,call_connection_id,context="OpenAISample")
app.logger.info("No match found")
await handle_recognize(chat_gpt_response,caller_id,call_connection_id,context="OpenAISample")

elif event.type == "Microsoft.Communication.RecognizeFailed":
resultInformation = event.data['resultInformation']
reasonCode = resultInformation['subCode']
context=event.data['operationContext']
global max_retry
if reasonCode == 8510 and 0 < max_retry:
handle_recognize(TIMEOUT_SILENCE_PROMPT,caller_id,call_connection_id)
await handle_recognize(TIMEOUT_SILENCE_PROMPT,caller_id,call_connection_id)
max_retry -= 1
else:
handle_play(call_connection_id,GOODBYE_PROMPT, GOODBYE_CONTEXT)
await handle_play(call_connection_id,GOODBYE_PROMPT, GOODBYE_CONTEXT)

elif event.type == "Microsoft.Communication.PlayCompleted":
context=event.data['operationContext']
context=event.data['operationContext']
app.logger.info(f"Context: " + context)
if context.lower() == TRANSFER_FAILED_CONTEXT.lower() or context.lower() == GOODBYE_CONTEXT.lower() :
handle_hangup(call_connection_id)
await handle_hangup(call_connection_id)
elif context.lower() == CONNECT_AGENT_CONTEXT.lower():
if not AGENT_PHONE_NUMBER or AGENT_PHONE_NUMBER.isspace():
app.logger.info(f"Agent phone number is empty")
handle_play(call_connection_id=call_connection_id,text_to_play=AGENT_PHONE_NUMBER_EMPTY_PROMPT)
await handle_play(call_connection_id=call_connection_id,text_to_play=AGENT_PHONE_NUMBER_EMPTY_PROMPT)
else:
app.logger.info(f"Initializing the Call transfer...")
transfer_destination=PhoneNumberIdentifier(AGENT_PHONE_NUMBER)
call_connection_client =call_automation_client.get_call_connection(call_connection_id=call_connection_id)
call_connection_client.transfer_call_to_participant(target_participant=transfer_destination)
await call_automation_client.get_call_connection(call_connection_id=call_connection_id).transfer_call_to_participant(target_participant=transfer_destination)
app.logger.info(f"Transfer call initiated: {context}")

elif event.type == "Microsoft.Communication.CallTransferAccepted":
Expand All @@ -268,15 +316,26 @@ def handle_callback(contextId):
sub_code = resultInformation['subCode']
# check for message extraction and code
app.logger.info(f"Encountered error during call transfer, message=, code=, subCode={sub_code}")
handle_play(call_connection_id=call_connection_id,text_to_play=CALLTRANSFER_FAILURE_PROMPT, context=TRANSFER_FAILED_CONTEXT)
await handle_play(call_connection_id=call_connection_id,text_to_play=CALLTRANSFER_FAILURE_PROMPT, context=TRANSFER_FAILED_CONTEXT)
return Response(status=200)
except Exception as ex:
app.logger.info("error in event handling")
except ex:
app.logger.info(f"error in event handling exception = {ex}")

@app.route("/")
def hello():
return "Hello ACS CallAutomation!..test"

async def init_app():
"""Initialize the aiohttp web application."""
app = web.Application()
app.router.add_post('/api/incomingCall', incoming_call_handler)
app.router.add_post('/api/callbacks/<contextId>', handle_callback)
return app

if __name__ == '__main__':
app.logger.setLevel(INFO)
app.run(port=8080)

#loop = asyncio.get_event_loop()
#web.run_app(init_app(), host='0.0.0.0', port=5000, loop=loop)
#asyncio.set_event_loop_policy(asyncio.WindowsSelectorEventLoopPolicy())
2 changes: 1 addition & 1 deletion callautomation-openai-sample/requirements.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
Flask>=2.3.2
azure-eventgrid==4.11.0
azure-communication-callautomation==1.1.0
openai==0.28.1
openai==1.30.2